Introduction
Welcome to the Alumet developer book!
🚧 Work in progress
Unfortunately, there is not much here for the moment. Please come back later!
Chapter 1
Create an Alumet plugin step by step
The best way to get a good understanding of how Alumet's Plugin works is to do it yourself. So this chapter will create an input plugin, which read a random byte from a file.
Create the plugin
In order to create the plugin, we need to initialize a new library. So first, go to the root directory of Alumet. You should have different folders containing the different plugins:
.
├── Cargo.lock
├── Cargo.toml
├── LICENSE
├── LICENSE.fr.txt
├── README.md
├── alumet/
├── alumet-api-dynamic/
├── alumet-api-macros/
├── alumet-config.toml
├── alumet-output.csv
├── app-agent/
├── app-relay-collector/
├── plugin-csv/
├── plugin-influxdb/
├── plugin-nvidia/
├── plugin-oar2/
├── plugin-perf/
├── plugin-rapl/
├── plugin-relay/
├── plugin-socket-control/
├── target/
├── test-dynamic-plugin-c/
├── test-dynamic-plugin-rust/
└── test-dynamic-plugins/
So let's create our plugin using:
cargo init --lib my-plugin
Now, go to the Cargo.toml at the root and you should see this new library:
members = [
"alumet",
"my-plugin",
]
Now, you can fulfil the TOML of the newly created library with data you want. For example:
[package]
name = "my-plugin"
version = "0.1.0"
edition = "2021"
[dependencies]
alumet = { path = "../alumet" }
Implement MyPlugin
Let's go to the newly created folder containing the new library. We will use the lib.rs file.
To define our plugin, we need to create a Rust structure: MyPlugin. This structure will contain all necessary for the plugin to work.
Let's take an easy structure having 1 fields: config
. Config will contain the configuration of the plugin.
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
For now, the Metrics structure only contains field: a_metric
. This is a TypedMetricId
and its type is an u64
Implement Config
As you can see, MyPlugin contains a Config value. This Config is a structure where you can define value of configuration for the plugin. Let's define it:
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
The poll_interval will be the time between two measurements. Feel free to add new element in the configuration if needed.
For Alumet a Configuration structure needs to implement the Default
trait, which define the default value if not modified by the user.
Let's do it:
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
The default value of poll_interval
is a duration of 1 second.
Implement AlumetPlugin
First, let's create a MyPluginSource
struct:
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
We have a structure: MyPlugin let's implement the AlumetPlugin
trait, this will transform our structure in an Alumet plugin
defining some functions:
- name()
- version()
- init()
- default_config()
- start()
- stop()
Let's define these for our plugin:
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
Let's focus on the start
function.
We want to create a new metric to match with the Metrics structure's field. In this structure, we have one field: a_metric
.
We use the create_metric()
function of the alumet::plugin::AlumetStart
. We specify the kind of value (u64
), the name
of the metric, its unit and the last argument is the description:
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
Now that we have our metric, we need to add a Source to Alumet.
The MyPluginSource
structure will be used as a buffer to retrieve values. We need to add this as Alumet source:
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
Currently, you should have an error about your initial source, it's because the trait bound
MyPluginSource: alumet::pipeline::Source
is not satisfied. We are now going to implement Source
to fix this.
Implement Source
In this part, we will implement the Source trait for our MyPluginSource
structure.
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
This function is called by Alumet each time a measure is needed, so it's in this function that we need to retrieve the value. For this example, let's read data from the /dev/urandom file. Here is the code:
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
N.b. This will only work on UNIX like OS which does have a file at "/dev/urandom"
We are now able to get the value. The next step is to send this value to Alumet. In order to push data to alumet, we first need to create a measurement point and then push it to the MeasurementAccumulator. I also add as an example an attribute the same as value but divided by 2:
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
So final code of poll
function is:
extern crate alumet;
extern crate anyhow;
extern crate humantime_serde;
extern crate serde;
use alumet::{
measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp},
metrics::TypedMetricId,
pipeline::{elements::error::PollError, trigger::TriggerSpec, Source},
plugin::{
rust::{deserialize_config, serialize_config, AlumetPlugin},
AlumetPluginStart, AlumetPostStart, ConfigTable,
},
resources::{Resource, ResourceConsumer},
units::{PrefixedUnit, Unit, UnitPrefix},
};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, time::Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
pub struct MyPlugin {
config: Config,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug)]
struct MyPluginSource {
byte_metric: TypedMetricId<u64>,
}
impl AlumetPlugin for MyPlugin {
// So we define the name of the plugin.
fn name() -> &'static str {
"MyPlugin"
}
// We also define it's version.
fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// We use the default config by default and on initialization.
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(Some(serialize_config(Config::default())?))
}
// We also use the default config on initialization and we deserialize the config
// to take in count if there is a different config than the default one.
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(MyPlugin {
config,
}))
}
// The start function is here to register metrics, sources and output.
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
let byte_metric =
alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?;
// We create a source from ThePluginSource structure.
let initial_source = Box::new(MyPluginSource {
byte_metric
});
// Then we add it to the alumet sources, adding the poll_interval value previously defined in the config.
alumet.add_source(
initial_source,
TriggerSpec::at_interval(self.config.poll_interval),
);
Ok(())
}
// The stop function is called after all the metrics, sources and output previously
// registered have been stopped and unregistered.
fn stop(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
impl Source for MyPluginSource {
fn poll(
&mut self,
measurements: &mut MeasurementAccumulator,
timestamp: Timestamp,
) -> Result<(), PollError> {
let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data
let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer)
rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer
let value = u64::from_le_bytes(buffer);
let measurement = MeasurementPoint::new(
timestamp,
self.byte_metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
value,
)
.with_attr("double", value.div_euclid(2));
measurements.push(measurement );
Ok(())
}
}
Add to the app-agent
To use the plugin, we need to add it to the app-agent. To do so, several steps are needed.
Add the plugin to the cargo.toml
It's very easy, just add your plugin library with a matching version in the toml file:
[dependencies]
my-plugin = {version= "0.1.0", path = "../my-plugin"}
or by running:
cargo add my-plugin --path ../my-plugin
Import in the app-agent
In the app-agent main file, import using use:
use my_plugin::MyPlugin;
And then add this newly imported plugin to the statics_plugins macro:
let plugins = static_plugins![MyPlugin, CsvPlugin, SocketControlPlugin];
In this example, we have 3 plugins used by the app-agent:
- MyPlugin
- CsvPlugin
- SocketControlPlugin
You can now build Alumet.
Good practices
In this example, all the code is in one file. As a real plugin could be more complex with more code, separate your code in several files is a good option. You can separate as you prefer but here is an example.
- File1: All about the plugin,...
- File2: All about the poll function, creation of measurementPoint,...
- File3: All about the value, how to retrieve, process,.. them.
Final code
#![allow(unused)] fn main() { extern crate alumet; extern crate anyhow; extern crate humantime_serde; extern crate serde; use alumet::{ measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp}, metrics::TypedMetricId, pipeline::{elements::error::PollError, trigger::TriggerSpec, Source}, plugin::{ rust::{deserialize_config, serialize_config, AlumetPlugin}, AlumetPluginStart, AlumetPostStart, ConfigTable, }, resources::{Resource, ResourceConsumer}, units::{PrefixedUnit, Unit, UnitPrefix}, }; use serde::{Deserialize, Serialize}; use std::{fs::File, io::Read, time::Duration}; #[derive(Serialize, Deserialize, Debug)] struct Config { #[serde(with = "humantime_serde")] poll_interval: Duration, } pub struct MyPlugin { config: Config, } impl Default for Config { fn default() -> Self { Self { poll_interval: Duration::from_secs(1), } } } #[derive(Debug)] struct MyPluginSource { byte_metric: TypedMetricId<u64>, } impl AlumetPlugin for MyPlugin { // So we define the name of the plugin. fn name() -> &'static str { "MyPlugin" } // We also define it's version. fn version() -> &'static str { env!("CARGO_PKG_VERSION") } // We use the default config by default and on initialization. fn default_config() -> anyhow::Result<Option<ConfigTable>> { Ok(Some(serialize_config(Config::default())?)) } // We also use the default config on initialization and we deserialize the config // to take in count if there is a different config than the default one. fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> { let config = deserialize_config(config)?; Ok(Box::new(MyPlugin { config, })) } // The start function is here to register metrics, sources and output. fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> { let byte_metric = alumet.create_metric::<u64>("random_byte", Unit::Byte, "A random number")?; // We create a source from ThePluginSource structure. let initial_source = Box::new(MyPluginSource { byte_metric }); // Then we add it to the alumet sources, adding the poll_interval value previously defined in the config. alumet.add_source( initial_source, TriggerSpec::at_interval(self.config.poll_interval), ); Ok(()) } // The stop function is called after all the metrics, sources and output previously // registered have been stopped and unregistered. fn stop(&mut self) -> anyhow::Result<()> { Ok(()) } } impl Source for MyPluginSource { fn poll( &mut self, measurements: &mut MeasurementAccumulator, timestamp: Timestamp, ) -> Result<(), PollError> { let mut rng = File::open("/dev/urandom")?; // Open the "/dev/urandom" file to obtain random data let mut buffer = [0u8; 8]; // Create a mutable buffer of type [u8; 8] (an array of 8 unsigned 8-bit integer) rng.read_exact(&mut buffer)?; // Read enough byte from the file and store the value in the buffer let value = u64::from_le_bytes(buffer); let measurement = MeasurementPoint::new( timestamp, self.byte_metric, Resource::LocalMachine, ResourceConsumer::LocalMachine, value, ) .with_attr("double", value.div_euclid(2)); measurements.push(measurement ); Ok(()) } } }