Measuring with sources
For the moment, your plugin doesn't measure anything. You will now write your first "source" in order to obtain measurement points, that will be passed to the rest of the Alumet pipeline.
For your first source, you will implement a simple counter that measures the number of times it has been triggered.
Counter source: the idea
The counter source works in the following way.
It has an internal state: the current value of the counter. The Alumet framework manages a timer and periodically triggers the source. When triggered, the source creates a measurement point containing the counter's value and appends it to a buffer provided by Alumet. The framework then passes the data to the rest of the measurement pipeline.
To distinguish the values of the counter from other unrelated measurements (such as the data produced by other sources), each measurement point must indicate its metric id. The metric id, represented by the rounded (M) on the diagram, indicates what "object" is being measured. Your plugin must create a new metric on startup (see below), store its id somewhere, and use in to produce measurement points.
Defining a metric
What is a metric?
A metric represents an "object", something that can be measured. In Alumet, the measurement pipeline does not carry raw values but measurement points, which contain some additional information. Every point contains a metric id, which associate it with the definition of a metric.
Unlike some other tools, the list of metrics is not part of the framework: nothing is hard-coded. Plugins can create new metrics by following the standard model provided by Alumet. As explained in the docs, a metric is defined by:
- a unique name
- a unit of measurement (is it energy? time?) - Alumet follows the UCUM (Unified Code for Units of Measure) standard.
- a type of measured value (is it an integer? a float?)
- a textual description
Read more about metrics here: Metrics & Measurements Points
Creating a metric
Okay, let's think about the metric that you need for your counter source.
- name: We'll call it
example_source_call_counter
, which reflects what we measure: we count how many times the source has been called by Alumet - unit: Since this is a simple counter, it has no unit! In the UCUM, this corresponds to the "unity" unit.
- type: Integer, here we choose
u64
to keep it simple. - description: Something short and explicit, for example
"number of times the example source has been called"
.
To register this new metric in Alumet, call create_metric
in the start
method of your plugin.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
If all goes well, you obtain a TypedMetricId
that you can use to refer to the metric at every step of the Alumet pipeline, in particular in a source.
Creating a metric may fail in case of a duplicate name (note the ?
that handles the potential error - we'll talk about it later).
Therefore, you should not choose names that are too generic. Instead, you should choose explicit and precise names. If applicable, include some information about the kind of sensor.
Here are some examples:
- bad metric names (too vague):
metric
,counter
,measured_energy
- good metric names:
acpi_zone_temperature
,rapl_consumed_energy
,estimated_gpu_power
,kernel_cpu_usage
Defining a simple source
Implementing the counter source
To define a source, define a structure and implement the Source
trait on it (alumet::pipeline::Source
).
#![allow(dead_code, unused_variables)]
use alumet::measurement::{MeasurementAccumulator, MeasurementBuffer, Timestamp};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::pipeline::{Output, Source, Transform};
pub struct ExamplePlugin;
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(None) // no config for the moment
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
Ok(Box::new(ExamplePlugin))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
struct ExampleSource {
// TODO
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, t: Timestamp) -> Result<(), PollError> {
todo!()
}
}
struct ExampleTransform {
// TODO
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
todo!()
}
}
struct ExampleOutput {
// TODO
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
todo!()
}
}
Complete the poll
method by following these steps, and add the required fields along the way:
- Measure, i.e. obtain the measurements.
Here, we will simply increment a counter. The counter is part of the state of the source, hence it will be a field in the
ExampleSource
structure, of typeu64
. - Create one
MeasurementPoint
for every measured value. To do that, we need to know which metric we are measuring. The previously obtained metric id will be another field of theExampleSource
structure, of typeTypedMetricId
. - Push the points to the
MeasurementAccumulator
.
The result looks like this:
use std::time::Duration;
use alumet::measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp};
use alumet::metrics::TypedMetricId;
use alumet::pipeline::elements::error::PollError;
use alumet::pipeline::{trigger, Source};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
pub struct ExamplePlugin;
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(None) // no config for the moment
}
fn init(_config: ConfigTable) -> anyhow::Result<Box<Self>> {
Ok(Box::new(ExamplePlugin))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: Alumet will call the source every 1s
let trigger = trigger::builder::time_interval(Duration::from_secs(1)).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
Since our counter is not related to a particular resource nor consumer, we use the special value LocalMachine
.
It indicates that it's a "global" measurement, with the whole machine as a scope (here, "machine" is intentionally not precisely defined: if you're in a VM, it represents the VM, if you're running on a bare-metal node, it's this node).
Read more about the concept of resource and consumer here: Metrics & Measurement Points
Registering the counter source
Now that you have defined a source, you need to create it and add it to the Alumet pipeline with add_source
.
Do this in the start
method of your plugin.
use std::time::Duration;
use alumet::measurement::{MeasurementAccumulator, MeasurementPoint, Timestamp};
use alumet::metrics::TypedMetricId;
use alumet::pipeline::elements::error::PollError;
use alumet::pipeline::{trigger, Source};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
pub struct ExamplePlugin;
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(None) // no config for the moment
}
fn init(_config: ConfigTable) -> anyhow::Result<Box<Self>> {
Ok(Box::new(ExamplePlugin))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: Alumet will call the source every 1s
let trigger = trigger::builder::time_interval(Duration::from_secs(1)).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
Tip: you can click on the eye 👁️ icon to show the whole plugin code.
To add the source to the pipeline, it is required to provide a Trigger
.
As its name implies, it will trigger the source in a certain way.
Here, we use time_interval
to build a Trigger
that will call the source every second.
Final result
Finally, you can test your plugin again by running the local agent:
cargo run --bin alumet-local-agent --features local_x86
Note how the source is automatically shut down by Alumet when you stop the agent.
A word about errors
In this chapter, we did not need to manage errors in a complicated way because there was almost no source of failure.
Most of our functions returned Ok(())
, and we used ?
to propagate errors, for example in start
.
Please refer to Error Handling in Plugins to learn how to handle errors in more complex cases.