Writing with outputs
The last step of the measurement pipeline is to export the measurements with an output.
In this chapter, you will implement an output that writes the measurements to a file with a simple textual format.
Text output: the idea
Like transforms, outputs are automatically triggered by Alumet when they need to process some measurements. The text output will look at every measurement point and write the data to a file.
Implementation
Define a structure and implement the Output
trait on it (alumet::pipeline::Output
).
#![allow(dead_code, unused_variables)]
use alumet::measurement::{MeasurementAccumulator, MeasurementBuffer, Timestamp};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::pipeline::{Output, Source, Transform};
pub struct ExamplePlugin;
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
Ok(None) // no config for the moment
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
Ok(Box::new(ExamplePlugin))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
struct ExampleSource {
// TODO
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, t: Timestamp) -> Result<(), PollError> {
todo!()
}
}
struct ExampleTransform {
// TODO
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
todo!()
}
}
struct ExampleOutput {
// TODO
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
todo!()
}
}
For efficiency reasons, we open the File only once, on startup, and wrap it in a buffered writer. We store the writer in the output.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
In write
, we loop on the measurements, build a string with the fields we want to save, and write it to the file.
Some fields, such as the timestamp
, cannot be directly converted to a nice human-readable string, therefore they are converted to a more appropriate type.
Regarding the metric
, we could print its id
, but:
- It means nothing to the end user.
- It is not guaranteed to be stable (it can change depending on the plugins that are enabled, the exact version of the framework, etc.).
Usually, the preferred way to deal with this issue is to use the metric name instead.
While not directly available in the measurement point, it can be obtained from the OutputContext
, as shown below.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
Registration
In start
, open the file in write mode, create the output and add it to the pipeline.
Because we are using the standard file API, which is blocking, we use add_blocking_output
.
This tells Alumet to avoid mixing this output with non-blocking asynchronous tasks, which could prevent them from running properly while the output waits for I/O operations.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}
Recap
Here is the full start
method with the metrics, source, transform and output.
use std::fs::File;
use std::io::BufWriter;
use std::time::{Duration, SystemTime};
use alumet::measurement::{
MeasurementAccumulator, MeasurementBuffer, MeasurementPoint, Timestamp, WrappedMeasurementValue,
};
use alumet::metrics::{MetricId, RawMetricId, TypedMetricId};
use alumet::pipeline::elements::error::{PollError, TransformError, WriteError};
use alumet::pipeline::elements::output::OutputContext;
use alumet::pipeline::elements::transform::TransformContext;
use alumet::pipeline::{trigger, Output, Source, Transform};
use alumet::plugin::rust::{deserialize_config, serialize_config};
use alumet::plugin::{rust::AlumetPlugin, AlumetPluginStart, ConfigTable};
use alumet::resources::{Resource, ResourceConsumer};
use alumet::units::Unit;
use anyhow::Context;
use serde::{Deserialize, Serialize};
pub struct ExamplePlugin {
config: Config,
}
impl AlumetPlugin for ExamplePlugin {
fn name() -> &'static str {
"example" // the name of your plugin, in lowercase, without the "plugin-" prefix
}
fn version() -> &'static str {
env!("CARGO_PKG_VERSION") // gets the version from the Cargo.toml of the plugin crate
}
fn default_config() -> anyhow::Result<Option<ConfigTable>> {
let config = serialize_config(Config::default())?;
Ok(Some(config))
}
fn init(config: ConfigTable) -> anyhow::Result<Box<Self>> {
let config = deserialize_config(config)?;
Ok(Box::new(ExamplePlugin { config }))
}
fn start(&mut self, alumet: &mut AlumetPluginStart) -> anyhow::Result<()> {
log::info!("Hello!");
// Create a metric for the source.
let counter_metric = alumet.create_metric::<u64>(
// ^^^ type
"example_source_call_counter", // name
Unit::Unity, // unit
"number of times the example source has been called", // description
)?;
// Create a metric for the transform.
let diff_metric = alumet.create_metric::<u64>(
"example_source_call_diff",
Unit::Unity,
"number of times the example source has been called since the previous measurement",
)?;
// Create the source
let source = ExampleSource {
metric: counter_metric,
counter: 0,
};
// Configure how the source is triggered: the interval depends on the config
let trigger = trigger::builder::time_interval(self.config.poll_interval).build()?;
// Add the source to the measurement pipeline
alumet.add_source(Box::new(source), trigger);
// Create the transform
let transform = ExampleTransform {
counter_metric: counter_metric.untyped_id(),
previous_counter: None,
diff_metric,
};
// Add the transform to the measurement pipeline
alumet.add_transform(Box::new(transform));
// Open the file and writer
let writer = BufWriter::new(File::create("alumet-tutorial-output.txt")?);
// Create the output
let output = ExampleOutput { writer };
// Add the output to the measurement pipeline
alumet.add_blocking_output(Box::new(output));
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Bye!");
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct Config {
/// Time between each activation of the counter source.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(1),
}
}
}
struct ExampleSource {
metric: TypedMetricId<u64>,
counter: u64,
}
impl Source for ExampleSource {
fn poll(&mut self, acc: &mut MeasurementAccumulator, timestamp: Timestamp) -> Result<(), PollError> {
let n_calls = self.counter;
self.counter += 1;
let point = MeasurementPoint::new(
timestamp,
self.metric,
Resource::LocalMachine,
ResourceConsumer::LocalMachine,
n_calls, // measured value
);
acc.push(point);
Ok(())
}
}
struct ExampleTransform {
counter_metric: RawMetricId,
previous_counter: Option<u64>,
diff_metric: TypedMetricId<u64>,
}
impl Transform for ExampleTransform {
fn apply(&mut self, measurements: &mut MeasurementBuffer, _ctx: &TransformContext) -> Result<(), TransformError> {
// Find the relevant measurement points and update the last known counter.
let mut latest_counter = None;
for m in measurements.iter() {
if m.metric == self.counter_metric {
let value = match m.value {
WrappedMeasurementValue::F64(_) => {
unreachable!("wrong counter type, expected u64")
}
WrappedMeasurementValue::U64(c) => c,
};
latest_counter = Some((value, m.timestamp));
}
}
// Compute the difference, if we have enough value to do so (previous and latest)
if let (Some(previous), Some((latest, t))) = (self.previous_counter, latest_counter) {
let diff = latest - previous;
// Push the new measurement to the buffer
measurements.push(MeasurementPoint::new(
t, // For convenience, we use the timestamp of the latest counter update
self.diff_metric, // Use the new metric
Resource::LocalMachine, // No specific resource
ResourceConsumer::LocalMachine, // No specific consumer
diff, // The computed value
));
}
// Update the internal state, if possible.
// In the case where there are other sources,
// the buffer may contain no measurements from our example source.
if let Some((latest, _)) = latest_counter {
self.previous_counter = Some(latest);
}
Ok(())
}
}
struct ExampleOutput {
writer: BufWriter<File>,
}
impl Output for ExampleOutput {
fn write(&mut self, measurements: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
use std::io::Write; // necessary for the writeln! macro
for m in measurements.iter() {
// Get a human-readable time from the timestamp.
// We later use its Debug implementation to convert it to a string easily.
let time = SystemTime::from(m.timestamp);
// The measurement point contains the metric id, but it means nothing to a user.
// Use the OutputContext to obtain the find the metric definition and obtain its name.
let metric_name = &ctx
.metrics
.by_id(&m.metric)
.with_context(|| format!("unregistered metric id: {}", m.metric.as_u64()))?
.name;
// Convert the value to a string. Multiple types are supported, handle them all.
let value_str = match m.value {
WrappedMeasurementValue::F64(x) => x.to_string(),
WrappedMeasurementValue::U64(x) => x.to_string(),
};
// The `resource` and `consumer` are each made of two parts: kind and id.
let resource_kind = m.resource.kind();
let resource_id = m.resource.id_display();
let consumer_kind = m.consumer.kind();
let consumer_id = m.consumer.id_display();
// There can be an arbitrary number of key-value attributes, use `Vec::join` to convert it to a single string.
let attributes_str = m
.attributes()
.map(|(key, value)| format!("{key}='{value}'"))
.collect::<Vec<_>>()
.join(",");
// Write one line to the file.
writeln!(&mut self.writer, "{time:?}: {metric_name} = {value_str}; resource = {resource_kind}/{resource_id}; consumer = {consumer_kind}/{consumer_id}; attributes = [{attributes_str}]")?;
}
Ok(())
}
}