Measuring with sources

For the moment, your plugin doesn't measure anything. You will now write your first "source" in order to obtain measurement points, that will be passed to the rest of the Alumet pipeline.

For your first source, you will implement a simple counter that measures the number of times it has been triggered.

Counter source: the idea

The counter source works in the following way.

It has an internal state: the current value of the counter. The Alumet framework manages a timer and periodically triggers the source. When triggered, the source creates a measurement point containing the counter's value and appends it to a buffer provided by Alumet. The framework then passes the data to the rest of the measurement pipeline.

To distinguish the values of the counter from other unrelated measurements (such as the data produced by other sources), each measurement point must indicate its metric id. The metric id, represented by the rounded (M) on the diagram, indicates what "object" is being measured. Your plugin must create a new metric on startup (see below), store its id somewhere, and use in to produce measurement points.

Defining a metric

What is a metric?

A metric represents an "object", something that can be measured. In Alumet, the measurement pipeline does not carry raw values but measurement points, which contain some additional information. Every point contains a metric id, which associate it with the definition of a metric.

Unlike some other tools, the list of metrics is not part of the framework: nothing is hard-coded. Plugins can create new metrics by following the standard model provided by Alumet. As explained in the docs, a metric is defined by:

Read more about metrics here: Metrics & Measurements Points

Creating a metric

Okay, let's think about the metric that you need for your counter source.

  • name: We'll call it example_source_call_counter, which reflects what we measure: we count how many times the source has been called by Alumet
  • unit: Since this is a simple counter, it has no unit! In the UCUM, this corresponds to the "unity" unit.
  • type: Integer, here we choose u64 to keep it simple.
  • description: Something short and explicit, for example "number of times the example source has been called".

To register this new metric in Alumet, call create_metric in the start method of your plugin.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

If all goes well, you obtain a TypedMetricId that you can use to refer to the metric at every step of the Alumet pipeline, in particular in a source.

Creating a metric may fail in case of a duplicate name (note the ? that handles the potential error - we'll talk about it later).

Therefore, you should not choose names that are too generic. Instead, you should choose explicit and precise names. If applicable, include some information about the kind of sensor.

Here are some examples:

  • bad metric names (too vague): metric, counter, measured_energy
  • good metric names: acpi_zone_temperature, rapl_consumed_energy, estimated_gpu_power, kernel_cpu_usage

Defining a simple source

Implementing the counter source

To define a source, define a structure and implement the Source trait on it (alumet::pipeline::Source).

#![allow(dead_code, unused_variables)]
use alumet::measurement::{MeasurementAccumulator, MeasurementBuffer, Timestamp};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::pipeline::{Output, Source, Transform};

pub struct ExamplePlugin;

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(None) // no config for the moment
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        Ok(Box::new(ExamplePlugin))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");
        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

struct ExampleSource {
    // TODO
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, t: Timestamp) -> Result<(), PollError> {
        todo!()
    }
}

struct ExampleTransform {
    // TODO
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        todo!()
    }
}

struct ExampleOutput {
    // TODO
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        todo!()
    }
}

Complete the poll method by following these steps, and add the required fields along the way:

  1. Measure, i.e. obtain the measurements. Here, we will simply increment a counter. The counter is part of the state of the source, hence it will be a field in the ExampleSource structure, of type u64.
  2. Create one MeasurementPoint for every measured value. To do that, we need to know which metric we are measuring. The previously obtained metric id will be another field of the ExampleSource structure, of type TypedMetricId.
  3. Push the points to the MeasurementAccumulator.

The result looks like this:

use std::time::Duration;

use alumet::measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp};
use alumet::metrics::TypedMetricId;
use alumet::pipeline::elements::error::PollError;
use alumet::pipeline::{trigger, Source};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;

pub struct ExamplePlugin;

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(None) // no config for the moment
    }

    fn init(_config: ConfigTable) -> anyhow::Result<Box<Self>> {
        Ok(Box::new(ExamplePlugin))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: Alumet will call the source every 1s
        let trigger = trigger::builder::time_interval(Duration::from_secs(1)).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

Since our counter is not related to a particular resource nor consumer, we use the special value LocalMachine. It indicates that it's a "global" measurement, with the whole machine as a scope (here, "machine" is intentionally not precisely defined: if you're in a VM, it represents the VM, if you're running on a bare-metal node, it's this node).

Read more about the concept of resource and consumer here: Metrics & Measurement Points

Registering the counter source

Now that you have defined a source, you need to create it and add it to the Alumet pipeline with add_source. Do this in the start method of your plugin.

use std::time::Duration;

use alumet::measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp};
use alumet::metrics::TypedMetricId;
use alumet::pipeline::elements::error::PollError;
use alumet::pipeline::{trigger, Source};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;

pub struct ExamplePlugin;

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(None) // no config for the moment
    }

    fn init(_config: ConfigTable) -> anyhow::Result<Box<Self>> {
        Ok(Box::new(ExamplePlugin))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: Alumet will call the source every 1s
        let trigger = trigger::builder::time_interval(Duration::from_secs(1)).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

Tip: you can click on the eye 👁️ icon to show the whole plugin code.

To add the source to the pipeline, it is required to provide a Trigger. As its name implies, it will trigger the source in a certain way. Here, we use time_interval to build a Trigger that will call the source every second.

Final result

Finally, you can test your plugin again by running the local agent:

cargo run --bin alumet-local-agent --features local_x86

Note how the source is automatically shut down by Alumet when you stop the agent.

A word about errors

In this chapter, we did not need to manage errors in a complicated way because there was almost no source of failure. Most of our functions returned Ok(()), and we used ? to propagate errors, for example in start.

Please refer to Error Handling in Plugins to learn how to handle errors in more complex cases.