Introduction

Welcome to the Alumet developer book!

🚧 Work in progress

Unfortunately, there is not much here for the moment. Please come back later!

Chapter 1

Create an Alumet plugin step by step

The best way to get a good understanding of how Alumet's Plugin works is to do it yourself. So this chapter will create an input plugin, which read a random byte from a file.

Create the plugin

In order to create the plugin, we need to initialize a new library. So first, go to the root directory of Alumet. You should have different folders containing the different plugins:

.
├── Cargo.lock
├── Cargo.toml
├── LICENSE
├── LICENSE.fr.txt
├── README.md
├── alumet/
├── alumet-api-dynamic/
├── alumet-api-macros/
├── alumet-config.toml
├── alumet-output.csv
├── app-agent/
├── app-relay-collector/
├── plugin-csv/
├── plugin-influxdb/
├── plugin-nvidia/
├── plugin-oar2/
├── plugin-perf/
├── plugin-rapl/
├── plugin-relay/
├── plugin-socket-control/
├── target/
├── test-dynamic-plugin-c/
├── test-dynamic-plugin-rust/
└── test-dynamic-plugins/

So let's create our plugin using:

cargo init --lib my-plugin

Now, go to the Cargo.toml at the root and you should see this new library:

members = [
    "alumet",
    "my-plugin",
]

Now, you can fulfil the TOML of the newly created library with data you want. For example:

[package]
name = "my-plugin"
version = "0.1.0"
edition = "2021"

[dependencies]
alumet = { path = "../alumet" }

Implement MyPlugin

Let's go to the newly created folder containing the new library. We will use the lib.rs file.

To define our plugin, we need to create a Rust structure: MyPlugin. This structure will contain all necessary for the plugin to work. Let's take an easy structure having 1 fields: config. Config will contain the configuration of the plugin.

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

For now, the Metrics structure only contains field: a_metric. This is a TypedMetricId and its type is an u64

Implement Config

As you can see, MyPlugin contains a Config value. This Config is a structure where you can define value of configuration for the plugin. Let's define it:

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

The poll_interval will be the time between two measurements. Feel free to add new element in the configuration if needed.

For Alumet a Configuration structure needs to implement the Default trait, which define the default value if not modified by the user. Let's do it:

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

The default value of poll_interval is a duration of 1 second.

Implement AlumetPlugin

First, let's create a MyPluginSource struct:

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

We have a structure: MyPlugin let's implement the AlumetPlugin trait, this will transform our structure in an Alumet plugin defining some functions:

  • name()
  • version()
  • init()
  • default_config()
  • start()
  • stop()

Let's define these for our plugin:

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

Let's focus on the start function. We want to create a new metric to match with the Metrics structure's field. In this structure, we have one field: a_metric. We use the create_metric() function of the alumet::plugin::AlumetStart. We specify the kind of value (u64), the name of the metric, its unit and the last argument is the description:

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

Now that we have our metric, we need to add a Source to Alumet.

The MyPluginSource structure will be used as a buffer to retrieve values. We need to add this as Alumet source:

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

Currently, you should have an error about your initial source, it's because the trait bound MyPluginSource: alumet::pipeline::Source is not satisfied. We are now going to implement Source to fix this.

Implement Source

In this part, we will implement the Source trait for our MyPluginSource structure.

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

This function is called by Alumet each time a measure is needed, so it's in this function that we need to retrieve the value. For this example, let's read data from the /dev/urandom file. Here is the code:

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

N.b. This will only work on UNIX like OS which does have a file at "/dev/urandom"

We are now able to get the value. The next step is to send this value to Alumet. In order to push data to alumet, we first need to create a measurement point and then push it to the MeasurementAccumulator. I also add as an example an attribute the same as value but divided by 2:

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

So final code of poll function is:

extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}

Add to the app-agent

To use the plugin, we need to add it to the app-agent. To do so, several steps are needed.

Add the plugin to the cargo.toml

It's very easy, just add your plugin library with a matching version in the toml file:

[dependencies]
my-plugin = {version= "0.1.0", path = "../my-plugin"}

or by running:

cargo add my-plugin --path ../my-plugin

Import in the app-agent

In the app-agent main file, import using use:

use my_plugin::MyPlugin;

And then add this newly imported plugin to the statics_plugins macro:

let plugins = static_plugins![MyPlugin, CsvPlugin, SocketControlPlugin];

In this example, we have 3 plugins used by the app-agent:

  • MyPlugin
  • CsvPlugin
  • SocketControlPlugin

You can now build Alumet.

Good practices

In this example, all the code is in one file. As a real plugin could be more complex with more code, separate your code in several files is a good option. You can separate as you prefer but here is an example.

  • File1: All about the plugin,...
  • File2: All about the poll function, creation of measurementPoint,...
  • File3: All about the value, how to retrieve, process,.. them.

Final code

#![allow(unused)]
fn main() {
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
    measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
    metrics::TypedMetricId,
    pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
    plugin::{
        rust::{deserialize_config, serialize_config, AlumetPlugin},
        AlumetPluginStart, AlumetPostStart, ConfigTable,
    },
    resources::{Resource, ResourceConsumer},
    units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Config {
    #[serde(with = "humantime_serde")]
    poll_interval: Duration,
}

pub struct MyPlugin {
    config: Config,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(1),
        }
    }
}

#[derive(Debug)]
struct MyPluginSource {
    byte_metric: TypedMetricId<u64>,
}

impl AlumetPlugin for MyPlugin {
    // So we define the name of the plugin.
    fn name() -> &'static str {
        "MyPlugin"
    }

    // We also define it's version.
    fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    // We use the default config by default and on initialization.
    fn default_config() -> anyhow::Result<Option<ConfigTable>> {
        Ok(Some(serialize_config(Config::default())?))
    }

    // We also use the default config on initialization and we deserialize the config
    // to take in count if there is a different config than the default one.
    fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
        let config = deserialize_config(config)?;
        Ok(Box::new(MyPlugin {
            config,
        }))
    }

    // The start function is here to register metrics, sources and output.
    fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
        let byte_metric =
            alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
        // We create a source from ThePluginSource structure.
        let initial_source = Box::new(MyPluginSource {
            byte_metric
        });

        // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
        alumet.add_source(
            initial_source,
            TriggerSpec::at_interval(self.config.poll_interval),
        );
        Ok(())
    }
    // The stop function is called after all the metrics, sources and output previously
    // registered have been stopped and unregistered.
    fn stop(&mut self) -> anyhow::Result<()> {
        Ok(())
    }
}

impl Source for MyPluginSource {
    fn poll(
        &mut self,
        measurements: &mut MeasurementAccumulator,
        timestamp: Timestamp,
    ) -> Result<(), PollError> {
        let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data

        let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
        rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
        let value = u64::from_le_bytes(buffer);
        let measurement = MeasurementPoint::new(
            timestamp,
            self.byte_metric,
            Resource::LocalMachine,
            ResourceConsumer::LocalMachine,
            value,
        )
        .with_attr("double", value.div_euclid(2));
        measurements.push(measurement );

        Ok(())
    }
}
}