Transforming measurements
At this point, you have a plugin that produces measurements with a source. The next step in the measurement pipeline is the transform step. It sees all the measurement points produced by the sources, can filter them, modify them and compute new values.
In your first source, the "counter", you produced a measurement that contained the number of times that the source was called. This number was always growing.
It corresponded to the metric example_source_call_counter
.
For your first transform, you will use the values produced by this source and compute the difference between each value of the counter.
🚧 Work in progress
For the moment, you are a little "alone" when writing transforms: the API is rather low-level. We are working on a more friendly API, built in Alumet, that will simplify the implementation of transforms that need to work on specific metrics. You can share your use case and wishes with us in the Discussions.
Difference transform: the idea
The "difference" transform works in the following way.
It has an internal state: the previous value of the counter. Alumet periodically triggers the "counter" source and passes the measurement buffer to your transform, which can inspect and modify it. It will find the value of the counter, c, and compute the difference with the previously known value (if there is one).
Here is a simplified timeline of the transform's operations. In reality, a single buffer can contain multiple updates of the counter, and each of them need to be taken into account.
The measurement points produced by the transform are associated with the new metric (M2), which we will define as explained in chapter 2. Measurement points produced by the "counter" source use a different metric, schematically called (M1) in the diagrams.
Implementation
Define a structure and implement the Transform
trait on it (alumet::pipeline::Transform
).
#![allow(dead_code, unused_variables)]
use alumet::measurement::{MeasurementAccumulator, MeasurementBuffer, Timestamp};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::pipeline::{Output, Source, Transform};
pub struct ExamplePlugin;
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(None) // no config for the moment
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
Ok(Box::new(ExamplePlugin))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
struct ExampleSource {
// TODO
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, t: Timestamp) -> Result<(), PollError> {
todo!()
}
}
struct ExampleTransform {
// TODO
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
todo!()
}
}
struct ExampleOutput {
// TODO
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
todo!()
}
}
In the transform structure, we need the following fields:
- An integer to store the previous value of the counter. On the first time, there is no previous value, hence we use
Option<u64>
. - The id of the metric associated with the counter. This will allow the transform to find the right values if multiple metrics are present in the incoming buffer (which is often the case in a realistic setup).
- The id of the metric associated with the difference. We will use it to construct the new measurement points.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
In apply
, we find all the relevant points, compute the difference and update the internal state.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
Creation and Registration
The transform is now implemented, let's use it.
To build the transform, you first need a new metric, which you must create in start
as follows:
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
Then, you can create the transform structure, and add it to the pipeline with add_transform
.
Note that this is similar to the registration of sources.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
Here is the full start
method after this modification.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}