Welcome to the Alumet developer book! This book is for plugin developers and Alumet contributors.

Here, you can learn:

  • how to make Alumet measure new things (to support a new hardware component, for example)
  • how to implement a custom filter or estimation model on top of the measured data
  • how to implement new outputs for the data (to support a new database, for instance)
  • how to participate in the Alumet project

If you want learn how to use Alumet, for example to measure your energy consumption on CPU and GPU, head over to the user book.

🚧 Work in progress

This book (and the Alumet project as a whole) is a work in progress. If you are interested in contributing, please contact us. You can send messages in the "Discussions" section of the GitHub repository.

Prerequisites

You need to install Rust, Git, and a code editor/IDE.

A recent version of Rust is required (at least 1.76 for now). You can run rustc --version to check your version. The easiest way to install a recent version of Rust is to use rustup.

To write Alumet plugins, a basic understanding of the Rust language is required. For simple plugins, you will not need advanced features such as Send or async. Fundamental notions such as ownership, structures, packages and error handling will be useful.

Are you ready? Let's measure!

Technical documentation

This book is meant to be a guide for developers. It is not exhaustive and does not document every type nor function that Alumet provide. If you are looking for a particular function or technical feature, please read the technical documentation of the alumet crate. It contains additional code examples. It is a good idea to have the documentation open alongside this book.

Note: the "rustdoc" is only updated when a new release of the core of Alumet is released. To obtain the documentation of the latest code, clone the alumet repository and run cargo doc --open.

Before writing code, let's understand what we are working on.

Functional View

The following diagram, copied from the user book, shows a simplified view of what Alumet does.

From the perspective of a user, Alumet is a tool that gathers measurements from sources (on the left), performs some computations ("models"), and export the data (on the right). For instance, Alumet can measure the energy consumption of your CPU and GPU. With some formulas, it can estimate the energy consumed by a particular process or Kubernetes pod. It can then write the measured and estimated values to a file or database.

This is a functional view that suits users well. But you, a developer, need to go behind the scenes!

Measurement Pipeline

Here is another diagram that shows the (high-level) internal design of Alumet.

As you can see, the core of Alumet contains an extensible measurement pipeline with tree steps (in orange):

  1. obtain measurements from the sources
  2. transform the measurements
  3. write the data to the outputs

Alumet is a generic framework, and its pipeline is entirely agnostic to the hardware components and software platform. By itself, it contains no sources, no transform functions (or "transforms"), and no outputs. These pipeline elements (in blue) are provided by plugins.

Plugins

An Alumet plugin is a software component that is added on top of the Alumet framework. It can act on the measurement pipeline, for instance by registering new pipeline elements (in blue in the diagram above).

Follow the plugin tutorial to start writing plugins.

Agents

An Alumet agent is an executable binary that combines the framework, some plugins and some additional code. It is the user-facing application that gathers the measurements and "does things".

We provide a standard agent (its code is in the app-agent crate) but you can also create custom agents. Refer to the contributing guide for more information about the content of our main Git repository.

Concurrency and parallelism

In general, an Alumet agent uses multiple plugins at the same time. Each plugin create many sources, a few transforms and a few outputs. Because the number of sources can be high, and because outputs can take a long time to serialize their measurements, the pipeline elements are not run one after another, but concurrently. On a multicore processor (which you almost certainly have), this means that multiple pipeline elements probably run at the same time.

This is implemented by making good use of Rust async features and the Tokio runtime. For most plugins, this concurrency is not important as it is automatically managed by the Alumet framework.

Your first plugin (tutorial)

The best way to get a good understanding of how Alumet plugins work is to crate one yourself!

In this chapter, you will create your first plugin, run it with an agent, and add new features incrementally.

Please go the next step.

Basic setup

Prerequisites

Before attempting anything, please check that you have a recent version of the Rust toolchain (at least 1.76 for now). You can run rustc --version to check your version. The easiest way to install a recent version of Rust is to use rustup.

Creating the plugin crate

The first thing to do is to initialize a new crate for the plugin. Alumet plugins are not executables by themselves, they are library crates.

You have two options:

  1. fork the official Alumet repository and develop there (best if you want to contribute to Alumet)
  2. work in your own repository (best if you want to be independent)

Creating the plugin in a fork of the official repository

Fork the Alumet repository on GitHub.

Clone your fork with git clone and open the root directory of Alumet.

You should see several files and folders:

.
β”œβ”€β”€ Cargo.lock
β”œβ”€β”€ Cargo.toml
β”œβ”€β”€ LICENSE
β”œβ”€β”€ LICENSE.fr.txt
β”œβ”€β”€ README.md
β”œβ”€β”€ alumet/
β”œβ”€β”€ app-agent/
β”œβ”€β”€ plugin-csv/
β”œβ”€β”€ plugin-nvidia/
β”œβ”€β”€ plugin-rapl/
β”œβ”€β”€ plugin-relay/
β”œβ”€β”€ target/
β”œβ”€β”€ ...

Let's make a crate for your plugin! By convention, plugins contained in the main repository should be prefixed with plugin-:

cargo init --lib plugin-example

This will create a new directory named plugin-example, with some files in it. Cargo should also modify the root Cargo.toml to add your plugin to the list of members, like this:

members = [
    "alumet",
    # ... other crates here
    # the line below has been added automatically
    "plugin-example",
]

Finally, use cargo add to declare some dependencies. Every plugin needs to depend on at least alumet and anyhow. We also add log to display nice log messages (avoid println! in Alumet!).

cargo add alumet anyhow log

Make sure that the alumet dependency is local and does not include a version number:

[package]
name = "plugin-example"
version = "0.1.0"
edition = "2021"

[dependencies]
alumet = { path = "../alumet" } # LOCAL + NO VERSION
anyhow = "1.0"
log = "0.4"

Creating the plugin in a separate repository

Initialize a crate with cargo:

cargo init --lib plugin-example

Finally, use cargo add to declare some dependencies. Every plugin needs to depend on at least alumet and anyhow. We'll also add log to display nice log messages (avoid println! in Alumet!).

cargo add alumet anyhow log

Since your plugin is not in the main repository of Alumet, the dependency on alumet will not be local, but rather downloaded from crates.io.

Coding - v0.1

Now, the fun part: let's code your plugin!

Plugin Structure

Open the lib.rs file in your plugin source directory. For now, it contains a minimal library generated by cargo init. You will replace it with a minimal Alumet plugin.

To define a (static) Alumet plugin, you just need two things:

  • a structure
  • an implementation of the AlumetPlugin trait for this structure

Here is what it looks like:

use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};

pub struct ExamplePlugin;

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(None) // no config for the moment
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        Ok(Box::new(ExamplePlugin))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");
        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

Note that the plugin structure is public. It needs to be, otherwise agents (executable applications that perform the measurements with Alumet) won't be able to use the plugin. For now, the structure is empty. It can contain anything you want, in particular configuration options. We will show an example later.

Running your plugin

To try your plugin, you need to modify an Alumet agent so that it loads the plugin on startup.

If you develop your plugin outside of the main Alumet repository, you probably have your own agent, maybe based on a local copy of official Alumet agents. If you develop your plugin in the main Alumet repository, here is what you need to do:

  1. Choose an agent to modify.
  2. Add your plugin as a dependency of the agent.
  3. Edit one or two lines or code to make the agent load your plugin.

For this tutorial, we will modify the "local" agent, which works on its own (unlike the relay mode for instance). In the Alumet repository, open a Terminal in the app-agent folder.

Add your plugin to dependencies of the agent:

cargo add plugin-example

Open app-agent/src/bin/local.rs and add your plugin to the list of plugins. For a first test, you can remove other plugins in order to see the messages of your plugin more easily. Let's keep the CSV plugin, though. It will be useful to see what the measurements produced by your plugin (in the next steps of the tutorial).

let plugins = static_plugins![
-    plugin_rapl::RaplPlugin,
-    // ... (other plugins)
+    plugin_example::ExamplePlugin,
    plugin_csv::CsvPlugin,
];

Note: with cargo, we use the name that we declared for the crate, plugin-example (look at plugin-example/Cargo.toml), but in code we use plugin_example. This is because hyphens are not valid characters in Rust identifiers, hence they are converted to underscores.

Finally, you can test your plugin! Run the local agent:

cargo run --bin alumet-local-agent --features local_x86

You should see your plugin in the list of enabled plugins, and it should print the message Hello! (from its start method).

Stop the agent with Ctrl+C. Your plugin should print the message Bye! (from its stop method).

Measuring with sources

For the moment, your plugin doesn't measure anything. You will now write your first "source" in order to obtain measurement points, that will be passed to the rest of the Alumet pipeline.

For your first source, you will implement a simple counter that measures the number of times it has been triggered.

Counter source: the idea

The counter source works in the following way.

It has an internal state: the current value of the counter. The Alumet framework manages a timer and periodically triggers the source. When triggered, the source creates a measurement point containing the counter's value and appends it to a buffer provided by Alumet. The framework then passes the data to the rest of the measurement pipeline.

To distinguish the values of the counter from other unrelated measurements (such as the data produced by other sources), each measurement point must indicate its metric id. The metric id, represented by the rounded (M) on the diagram, indicates what "object" is being measured. Your plugin must create a new metric on startup (see below), store its id somewhere, and use in to produce measurement points.

Defining a metric

What is a metric?

A metric represents an "object", something that can be measured. In Alumet, the measurement pipeline does not carry raw values but measurement points, which contain some additional information. Every point contains a metric id, which associate it with the definition of a metric.

Unlike some other tools, the list of metrics is not part of the framework: nothing is hard-coded. Plugins can create new metrics by following the standard model provided by Alumet. As explained in the docs, a metric is defined by:

Read more about metrics here: Metrics & Measurements Points

Creating a metric

Okay, let's think about the metric that you need for your counter source.

  • name: We'll call it example_source_call_counter, which reflects what we measure: we count how many times the source has been called by Alumet
  • unit: Since this is a simple counter, it has no unit! In the UCUM, this corresponds to the "unity" unit.
  • type: Integer, here we choose u64 to keep it simple.
  • description: Something short and explicit, for example "number of times the example source has been called".

To register this new metric in Alumet, call create_metric in the start method of your plugin.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

If all goes well, you obtain a TypedMetricId that you can use to refer to the metric at every step of the Alumet pipeline, in particular in a source.

Creating a metric may fail in case of a duplicate name (note the ? that handles the potential error - we'll talk about it later).

Therefore, you should not choose names that are too generic. Instead, you should choose explicit and precise names. If applicable, include some information about the kind of sensor.

Here are some examples:

  • bad metric names (too vague): metric, counter, measured_energy
  • good metric names: acpi_zone_temperature, rapl_consumed_energy, estimated_gpu_power, kernel_cpu_usage

Defining a simple source

Implementing the counter source

To define a source, define a structure and implement the Source trait on it (alumet::pipeline::Source).

#![allow(dead_code, unused_variables)]
use alumet::measurement::{MeasurementAccumulator, MeasurementBuffer, Timestamp};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::pipeline::{Output, Source, Transform};

pub struct ExamplePlugin;

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(None) // no config for the moment
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        Ok(Box::new(ExamplePlugin))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");
        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

struct ExampleSource {
    // TODO
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, t: Timestamp) -> Result<(), PollError> {
        todo!()
    }
}

struct ExampleTransform {
    // TODO
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        todo!()
    }
}

struct ExampleOutput {
    // TODO
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        todo!()
    }
}

Complete the poll method by following these steps, and add the required fields along the way:

  1. Measure, i.e. obtain the measurements. Here, we will simply increment a counter. The counter is part of the state of the source, hence it will be a field in the ExampleSource structure, of type u64.
  2. Create one MeasurementPoint for every measured value. To do that, we need to know which metric we are measuring. The previously obtained metric id will be another field of the ExampleSource structure, of type TypedMetricId.
  3. Push the points to the MeasurementAccumulator.

The result looks like this:

use std::time::Duration;

use alumet::measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp};
use alumet::metrics::TypedMetricId;
use alumet::pipeline::elements::error::PollError;
use alumet::pipeline::{trigger, Source};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;

pub struct ExamplePlugin;

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(None) // no config for the moment
    }

    fn init(_config: ConfigTable) -> anyhow::Result<Box<Self>> {
        Ok(Box::new(ExamplePlugin))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: Alumet will call the source every 1s
        let trigger = trigger::builder::time_interval(Duration::from_secs(1)).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

Since our counter is not related to a particular resource nor consumer, we use the special value LocalMachine. It indicates that it's a "global" measurement, with the whole machine as a scope (here, "machine" is intentionally not precisely defined: if you're in a VM, it represents the VM, if you're running on a bare-metal node, it's this node).

Read more about the concept of resource and consumer here: Metrics & Measurement Points

Registering the counter source

Now that you have defined a source, you need to create it and add it to the Alumet pipeline with add_source. Do this in the start method of your plugin.

use std::time::Duration;

use alumet::measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp};
use alumet::metrics::TypedMetricId;
use alumet::pipeline::elements::error::PollError;
use alumet::pipeline::{trigger, Source};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;

pub struct ExamplePlugin;

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(None) // no config for the moment
    }

    fn init(_config: ConfigTable) -> anyhow::Result<Box<Self>> {
        Ok(Box::new(ExamplePlugin))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: Alumet will call the source every 1s
        let trigger = trigger::builder::time_interval(Duration::from_secs(1)).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

Tip: you can click on the eye πŸ‘οΈ icon to show the whole plugin code.

To add the source to the pipeline, it is required to provide a Trigger. As its name implies, it will trigger the source in a certain way. Here, we use time_interval to build a Trigger that will call the source every second.

Final result

Finally, you can test your plugin again by running the local agent:

cargo run --bin alumet-local-agent --features local_x86

Note how the source is automatically shut down by Alumet when you stop the agent.

A word about errors

In this chapter, we did not need to manage errors in a complicated way because there was almost no source of failure. Most of our functions returned Ok(()), and we used ? to propagate errors, for example in start.

Please refer to Error Handling in Plugins to learn how to handle errors in more complex cases.

A First Configuration

The counter source that you have implemented in the previous section uses a fixed polling interval. Instead of using a hard-coded value, it would be better to provide a configuration option so that we can choose the acquisition frequency of the source before starting the Alumet agent.

On startup, the standard agent reads a configuration file in the TOML format. It contains some options for the agent, and one section per plugin. Each plugin is free to declare what it needs in its configuration section, and to process it how it wants to. The best practice is to deserialize the configuration section to a Rust structure. This is what you will implement in this chapter.

Config structure

To serialize and deserialize the configuration section, Alumet uses serde, which is the de-facto standard framework for (de)serialization in Rust. Add it to the dependencies by running this command in the plugin's directory:

cargo add serde --features derive

Then, simply define a new structure for the config and use a derive macro to make serde generate the deserialization code for you. We also generate the serialization code (by deriving Serialize), which you will need soon.

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    poll_interval: Duration
}

To (de)serialize the duration in a human-readable way, such as "10s" for 10 seconds, we need another dependency. Add it with cargo, and modify the Config structure to use it.

cargo add humantime-serde
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Config loading

As explained in the introduction, each plugin gets its own configuration section. It is accessible in init.

Modify init to get your config and store it in the plugin structure. Doing so will allow you to use the config in start.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Of course, you also need to update the plugin structure accordingly.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Default config

Though it is not mandatory, you should provide default values for the configuration of your plugin.

Implement the standard Default trait for the Config struct.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

You can now call Config::default() to obtain a Config structure filled with default values. Use this in default_config to return your default configuration.

Note that you must use serialize_config (alumet::plugin::rust::serialize_config) to convert your configuration structure into a ConfigTable, which is a "universal" configuration type provided by Alumet. Of course, serialize_config internally uses serde, that is why it was needed to derive the Serialize trait.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Using the config in start

Now that the plugin stores its deserialized config in its structure, you can use it in start to change the polling interval of the "counter" source that you have previously implemented.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Transforming measurements

At this point, you have a plugin that produces measurements with a source. The next step in the measurement pipeline is the transform step. It sees all the measurement points produced by the sources, can filter them, modify them and compute new values.

In your first source, the "counter", you produced a measurement that contained the number of times that the source was called. This number was always growing. It corresponded to the metric example_source_call_counter.

For your first transform, you will use the values produced by this source and compute the difference between each value of the counter.

🚧 Work in progress

For the moment, you are a little "alone" when writing transforms: the API is rather low-level. We are working on a more friendly API, built in Alumet, that will simplify the implementation of transforms that need to work on specific metrics. You can share your use case and wishes with us in the Discussions.

Difference transform: the idea

The "difference" transform works in the following way.

It has an internal state: the previous value of the counter. Alumet periodically triggers the "counter" source and passes the measurement buffer to your transform, which can inspect and modify it. It will find the value of the counter, c, and compute the difference with the previously known value (if there is one).

Here is a simplified timeline of the transform's operations. In reality, a single buffer can contain multiple updates of the counter, and each of them need to be taken into account.

The measurement points produced by the transform are associated with the new metric (M2), which we will define as explained in chapter 2. Measurement points produced by the "counter" source use a different metric, schematically called (M1) in the diagrams.

Implementation

Define a structure and implement the Transform trait on it (alumet::pipeline::Transform).

#![allow(dead_code, unused_variables)]
use alumet::measurement::{MeasurementAccumulator, MeasurementBuffer, Timestamp};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::pipeline::{Output, Source, Transform};

pub struct ExamplePlugin;

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(None) // no config for the moment
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        Ok(Box::new(ExamplePlugin))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");
        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

struct ExampleSource {
    // TODO
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, t: Timestamp) -> Result<(), PollError> {
        todo!()
    }
}

struct ExampleTransform {
    // TODO
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        todo!()
    }
}

struct ExampleOutput {
    // TODO
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        todo!()
    }
}

In the transform structure, we need the following fields:

  • An integer to store the previous value of the counter. On the first time, there is no previous value, hence we use Option<u64>.
  • The id of the metric associated with the counter. This will allow the transform to find the right values if multiple metrics are present in the incoming buffer (which is often the case in a realistic setup).
  • The id of the metric associated with the difference. We will use it to construct the new measurement points.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

In apply, we find all the relevant points, compute the difference and update the internal state.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Creation and Registration

The transform is now implemented, let's use it.

To build the transform, you first need a new metric, which you must create in start as follows:

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Then, you can create the transform structure, and add it to the pipeline with add_transform. Note that this is similar to the registration of sources.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Here is the full start method after this modification.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Writing with outputs

The last step of the measurement pipeline is to export the measurements with an output.

In this chapter, you will implement an output that writes the measurements to a file with a simple textual format.

Text output: the idea

Like transforms, outputs are automatically triggered by Alumet when they need to process some measurements. The text output will look at every measurement point and write the data to a file.

Implementation

Define a structure and implement the Output trait on it (alumet::pipeline::Output).

#![allow(dead_code, unused_variables)]
use alumet::measurement::{MeasurementAccumulator, MeasurementBuffer, Timestamp};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::pipeline::{Output, Source, Transform};

pub struct ExamplePlugin;

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(None) // no config for the moment
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        Ok(Box::new(ExamplePlugin))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");
        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

struct ExampleSource {
    // TODO
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, t: Timestamp) -> Result<(), PollError> {
        todo!()
    }
}

struct ExampleTransform {
    // TODO
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        todo!()
    }
}

struct ExampleOutput {
    // TODO
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        todo!()
    }
}

For efficiency reasons, we open the File only once, on startup, and wrap it in a buffered writer. We store the writer in the output.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

In write, we loop on the measurements, build a string with the fields we want to save, and write it to the file. Some fields, such as the timestamp, cannot be directly converted to a nice human-readable string, therefore they are converted to a more appropriate type.

Regarding the metric, we could print its id, but:

  1. It means nothing to the end user.
  2. It is not guaranteed to be stable (it can change depending on the plugins that are enabled, the exact version of the framework, etc.).

Usually, the preferred way to deal with this issue is to use the metric name instead. While not directly available in the measurement point, it can be obtained from the OutputContext, as shown below.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Registration

In start, open the file in write mode, create the output and add it to the pipeline. Because we are using the standard file API, which is blocking, we use add_blocking_output. This tells Alumet to avoid mixing this output with non-blocking asynchronous tasks, which could prevent them from running properly while the output waits for I/O operations.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Recap

Here is the full start method with the metrics, source, transform and output.

use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};

use alumet::measurement::{
    MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};

pub struct ExamplePlugin {
    config: Config,
}

impl AlumetPlugin for ExamplePlugin {
    fn name() -> &'static str {
        "example" // the name of your plugin, in lowercase, without the "plugin-" prefix
    }

    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
    }

    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        let config = serialize_config(Config::default())?;
        Ok(Some(config))
    }

    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(ExamplePlugin { config }))
    }

    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        log::info!("Hello!");

        // Create a metric for the source.
        let counter_metric = alumet.create_metric::<u64>(
            //                                      ^^^ type
            "example_source_call_counter",                        // name
            Unit::Unity,                                          // unit
            "number of times the example source has been called", // description
        )?;
        // Create a metric for the transform.
        let diff_metric = alumet.create_metric::<u64>(
            "example_source_call_diff",
            Unit::Unity,
            "number of times the example source has been called since the previous measurement",
        )?;

        // Create the source
        let source = ExampleSource {
            metric: counter_metric,
            counter: 0,
        };

        // Configure how the source is triggered: the interval depends on the config
        let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;

        // Add the source to the measurement pipeline
        alumet.add_source(Box::new(source), trigger);

        // Create the transform
        let transform = ExampleTransform {
            counter_metric: counter_metric.untyped_id(),
            previous_counter: None,
            diff_metric,
        };

        // Add the transform to the measurement pipeline
        alumet.add_transform(Box::new(transform));

        // Open the file and writer
        let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);

        // Create the output
        let output = ExampleOutput { writer };

        // Add the output to the measurement pipeline
        alumet.add_blocking_output(Box::new(output));

        Ok(())
    }

    fn stop(&mut self) -> anyhow::Result<()> {
        log::info!("Bye!");
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct Config {
    /// Time between each activation of the counter source.
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

struct ExampleSource {
    metric: TypedMetricId<u64>,
    counter: u64,
}

impl Source for ExampleSource {
    fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
        let n_calls = self.counter;
        self.counter += 1;

        let point = MeasurementPoint::new(
            timestamp,
            self.metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            n_calls, // measured value
        );
        acc.push(point);
        Ok(())
    }
}

struct ExampleTransform {
    counter_metric: RawMetricId,
    previous_counter: Option<u64>,
    diff_metric: TypedMetricId<u64>,
}

impl Transform for ExampleTransform {
    fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
        // Find the relevant measurement points and update the last known counter.
        let mut latest_counter = None;
        for m in measurements.iter() {
            if m.metric == self.counter_metric {
                let value = match m.value {
                    WrappedMeasurementValue::F64(_) => {
                        unreachable!("wrong counter type, expected u64")
                    }
                    WrappedMeasurementValue::U64(c) => c,
                };
                latest_counter = Some((value, m.timestamp));
            }
        }

        // Compute the difference, if we have enough value to do so (previous and latest)
        if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
            let diff = latest - previous;
            // Push the new measurement to the buffer
            measurements.push(MeasurementPoint::new(
                t,                              // For convenience, we use the timestamp of the latest counter update
                self.diff_metric,               // Use the new metric
                Resource::LocalMachine,         // No specific resource
                ResourceConsumer::LocalMachine, // No specific consumer
                diff,                           // The computed value
            ));
        }

        // Update the internal state, if possible.
        // In the case where there are other sources,
        // the buffer may contain no measurements from our example source.
        if let Some((latest, _)) = latest_counter {
            self.previous_counter = Some(latest);
        }

        Ok(())
    }
}

struct ExampleOutput {
    writer: BufWriter<File>,
}

impl Output for ExampleOutput {
    fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
        use std::io::Write; // necessary for the writeln! macro

        for m in measurements.iter() {
            // Get a human-readable time from the timestamp.
            // We later use its Debug implementation to convert it to a string easily.
            let time = SystemTime::from(m.timestamp);

            // The measurement point contains the metric id, but it means nothing to a user.
            // Use the OutputContext to obtain the find the metric definition and obtain its name.
            let metric_name = &ctx
                .metrics
                .by_id(&m.metric)
                .with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
                .name;

            // Convert the value to a string. Multiple types are supported, handle them all.
            let value_str = match m.value {
                WrappedMeasurementValue::F64(x) => x.to_string(),
                WrappedMeasurementValue::U64(x) => x.to_string(),
            };

            // The `resource` and `consumer` are each made of two parts: kind and id.
            let resource_kind = m.resource.kind();
            let resource_id = m.resource.id_display();
            let consumer_kind = m.consumer.kind();
            let consumer_id = m.consumer.id_display();

            // There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
            let attributes_str = m
                .attributes()
                .map(|(key, value)| format!("{key}='{value}'"))
                .collect::<Vec<_>>()
                .join(",");

            // Write one line to the file.
            writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
        }
        Ok(())
    }
}

Metrics & Measurement Points

Measurement points

Alumet can manage a large number of measurements that come from different sources implemented in multiple plugins. Its measurement pipeline does not carry raw values, but measurement points which contain some metadata in addition to the raw value.

The following diagram represents the data contained in a point, and its relation with metrics. It is explained in details in the next sections.

Metrics

In particular, we want to know what "object" is being measured, what kind of information is obtained: is it the temperature of an ACPI sensor? The energy consumed by the CPU? The memory reserved by a process? What is the associated unit? This knowledge is stored in a metric.

Alumet offers a standard way of defining metrics, in a way that makes all the measurements useful and rigorous. As explained in the docs, a metric is defined by:

For efficiency reasons, Alumet assigns a unique id to every metric, and uses this id instead of the full definition or name. Each measurement point hence contains a metric id.

More than metrics: resources and resource consumers

Sometimes, we need a scope that is more precise than the metric definition. For instance, when measuring the use of the CPU by the OS kernel, we are interested in knowing the value per CPU core.

This could be implemented by creating one metric for each case: kernel_cpu_usage_core0, kernel_cpu_usage_core1, ... Some monitoring tools use this strategy. However, it is is too limiting: it complicates the operations that you can apply on the data (think of filters, aggregates, etc.), and it does not scale well when you have multidimensional information to add to the measurements (CPU core id, hostname, etc.).

Therefore, we have chosen a different model for Alumet. First, arbitrary key-value pairs can be attached to a measurement point. We call them attributes. Second, two common pieces of information are always present in a measurement point: the resource and resource consumer.

  • An attribute is a key-value pair that can be attached to a measurement point. Its content is completely arbitrary.
  • A resource is something that can be "utilized" or "consumed". It is usually related to a piece of hardware. For example, CPU cores and RAM are resources to Alumet.
  • A consumer is something that uses a resource. It is usually a software component. For example, a process is a resource consumer to Alumet.

Attributes are optional, but the resource and consumer fields are mandatory.

Since Alumet supports multidimensional data, no "dimension" should not appear in metric names. Furthermore, the preferred word separator is _.

  • bad metric names: kernel_cpu_usage_core0, rapl.energy.NODE-A123, nvidia-gpu-[00:08.0]-estimated_power
  • good metric names: kernel_cpu_usage, rapl_consumed_energy, estimated_gpu_power

Error Handling for Plugins

In the plugin tutorial, we did not need to manage errors in a complicated way because there was almost no source of failure. Most of our functions returned Ok(()), and we used ? to propagate errors, for example in start.

In this chapter, you will discover how to handle errors in more realistic cases. If you are not familiar with Rust approach to error handling, please read the corresponding chapter of the Rust book.

Anyhow

Alumet uses anyhow to simplify error handling in the plugin API. It provides a "universal" error type anyhow::Error, which can wrap any error that implements the standard trait std::error::Error. In most cases, we simply replace Result<T, E> with anyhow::Result<T>.

Good practices:

  • Propagate errors with ?.
  • Add some context, when appropriate, with with_context (takes a closure called on error - use this when formatting a string) or context (takes a context directly). This is especially useful for errors related to file IO, because they do not include the file path.

Here is an example:

fn read_file(path: &str) -> Result<String, std::io::error::Error> {
    std::fs::read_to_string(path)
}

fn parse_file_content(content: &str) -> Result<u64, ParseIntError> {
    content.parse()
}

fn f() -> anyhow::Result<u64> {
    let file = "example.txt";
    // Note how `read_file` and `parse_file_content` return different error types,
    // but anyhow treat them exactly the same
    let content = read_file(file).with_context(|| format!("failed to read file {file}"))?;
    let value = parse_file_content(content).with_context(|| format!("invalid content: {content}"))?;
    Ok(value)
}

Try to modify your plugin's `init`:
```rust,ignore
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
    // Here we use .context because we know the error message at compile-time,
    // there is no formatting.
    std::fs::read_to_string("example.txt").context("failed to read example.txt")?;
    Ok(Box::new(ExamplePlugin))
}

It is also possible to create an anyhow::Error "manually" with the anyhow! macro:

use anyhow::anyhow;

fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
    if true { // for testing
        return Err(anyhow!("manual error here));
    }
    Ok(Box::new(ExamplePlugin))
}

Pipeline errors

In the pipeline elements (sources, transforms, outputs), Alumet makes a distinction between multiple kinds of errors. In particular, it is useful to distinguish between:

  • fatal errors, which indicate that the element is broken and cannot be used anymore. If a source, transform or output returns a fatal error, Alumet will discard it.
  • non-fatal errors, which indicate that the error does not compromise the element and that we can keep it. If a source, transform or output returns a non-fatal error, Alumet will keep it in the pipeline.

The precise semantics depend on the element. See:

  • alumet::pipeline::error::PollError for sources
  • alumet::pipeline::error::TransformError for transforms
  • alumet::pipeline::error::WriteError for outputs

These error types can wrap any anyhow::Error, and default to the fatal kind.

As an exercise, modify your source to fail with two different approaches:

fn poll(...) -> Result<(), PollError> {
+   return Err(anyhow!("cannot poll").into());
    // ...
}
use alumet::pipeline::elements::error::PollRetry;

fn poll(...) -> Result<(), PollError> {
+   return Err(anyhow!("cannot poll").retry_poll());
    // ...
}

Panics

As explained in the Rust book, panics should not be used for reporting "regular" errors such as parsing invalid data. Panics should be used when you're in a state that cannot be handled, when continuing could be insecure or harmful.

A general rule is: avoid panicking in your plugin. Use Result instead (see paragraph about Anyhow). If you panic in plugin's methods like start or stop, the Alumet agent will crash.