A First Configuration
The counter source that you have implemented in the previous section uses a fixed polling interval. Instead of using a hard-coded value, it would be better to provide a configuration option so that we can choose the acquisition frequency of the source before starting the Alumet agent.
On startup, the standard agent reads a configuration file in the TOML format. It contains some options for the agent, and one section per plugin. Each plugin is free to declare what it needs in its configuration section, and to process it how it wants to. The best practice is to deserialize the configuration section to a Rust structure. This is what you will implement in this chapter.
Config structure
To serialize and deserialize the configuration section, Alumet uses serde
, which is the de-facto standard framework for (de)serialization in Rust.
Add it to the dependencies by running this command in the plugin's directory:
cargo add serde --features derive
Then, simply define a new structure for the config and use a derive
macro to make serde
generate the deserialization code for you. We also generate the serialization code (by deriving Serialize
), which you will need soon.
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
poll_interval: Duration
}
To (de)serialize the duration in a human-readable way, such as "10s"
for 10 seconds, we need another dependency.
Add it with cargo
, and modify the Config
structure to use it.
cargo add humantime-serde
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
Config loading
As explained in the introduction, each plugin gets its own configuration section.
It is accessible in init
.
Modify init
to get your config and store it in the plugin structure.
Doing so will allow you to use the config in start
.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
Of course, you also need to update the plugin structure accordingly.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
Default config
Though it is not mandatory, you should provide default values for the configuration of your plugin.
Implement the standard Default
trait for the Config
struct.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
You can now call Config::default()
to obtain a Config
structure filled with default values.
Use this in default_config
to return your default configuration.
Note that you must use serialize_config
(alumet::plugin::rust::serialize_config
) to convert your configuration structure into a ConfigTable
, which is a "universal" configuration type provided by Alumet. Of course, serialize_config
internally uses serde
, that is why it was needed to derive the Serialize
trait.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
Using the config in start
Now that the plugin stores its deserialized config in its structure, you can use it in start
to change the polling interval of the "counter" source that you have previously implemented.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}