Skip to main content

hydro_lang/telemetry/
emf.rs

1//! AWS CloudWatch embedded metric format (EMF).
2//!
3//! <https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html>
4#[cfg(feature = "runtime_support")]
5use std::marker::Unpin;
6#[cfg(feature = "runtime_support")]
7use std::panic::AssertUnwindSafe;
8use std::time::Duration;
9#[cfg(feature = "runtime_support")]
10use std::time::SystemTime;
11
12#[cfg(feature = "runtime_support")]
13use dfir_rs::Never;
14#[cfg(feature = "runtime_support")]
15use dfir_rs::scheduled::graph::Dfir;
16#[cfg(feature = "runtime_support")]
17use dfir_rs::scheduled::metrics::DfirMetrics;
18#[cfg(feature = "runtime_support")]
19use futures::FutureExt;
20use quote::quote;
21#[cfg(feature = "runtime_support")]
22use serde_json::json;
23use syn::parse_quote;
24#[cfg(feature = "runtime_support")]
25use tokio::io::{AsyncWrite, AsyncWriteExt};
26#[cfg(feature = "runtime_support")]
27use tokio_metrics::RuntimeMetrics;
28
29use crate::location::{LocationKey, LocationType};
30use crate::staging_util::get_this_crate;
31use crate::telemetry::Sidecar;
32
33/// Default file path for [`RecordMetricsSidecar`].
34pub const DEFAULT_FILE_PATH: &str = "/var/log/hydro/metrics.log";
35/// Default interval for [`RecordMetricsSidecar`].
36pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(30);
37
38/// A sidecar which records metrics to a file via EMF.
39pub struct RecordMetricsSidecar {
40    file_path: String,
41    interval: Duration,
42}
43
44#[buildstructor::buildstructor]
45impl RecordMetricsSidecar {
46    /// Build an instance. Any `None` will be replaced with the default value.
47    #[builder]
48    pub fn new(file_path: Option<String>, interval: Option<Duration>) -> Self {
49        Self {
50            file_path: file_path.unwrap_or_else(|| DEFAULT_FILE_PATH.to_owned()),
51            interval: interval.unwrap_or(DEFAULT_INTERVAL),
52        }
53    }
54}
55
56impl Sidecar for RecordMetricsSidecar {
57    fn to_expr(
58        &self,
59        flow_name: &str,
60        _location_key: LocationKey,
61        _location_type: LocationType,
62        location_name: &str,
63        dfir_ident: &syn::Ident,
64    ) -> syn::Expr {
65        let Self {
66            file_path,
67            interval,
68        } = self;
69
70        let root = get_this_crate();
71        let namespace = flow_name.replace(char::is_whitespace, "_");
72        let interval: proc_macro2::TokenStream = {
73            let secs = interval.as_secs();
74            let nanos = interval.subsec_nanos();
75            quote!(::std::time::Duration::new(#secs, #nanos))
76        };
77
78        parse_quote! {
79            #root::telemetry::emf::record_metrics_sidecar(&#dfir_ident, #namespace, #location_name, #file_path, #interval)
80        }
81    }
82}
83
84/// Record both Dfir and Tokio metrics, at the given interval, forever.
85#[cfg(feature = "runtime_support")]
86#[doc(hidden)]
87pub fn record_metrics_sidecar(
88    dfir: &Dfir<'_>,
89    namespace: &'static str,
90    location_name: &'static str,
91    file_path: &'static str,
92    interval: Duration,
93) -> impl 'static + Future<Output = Never> {
94    assert!(!namespace.contains(char::is_whitespace));
95
96    let mut dfir_intervals = dfir.metrics_intervals();
97
98    async move {
99        // Attempt to create log file parent dir.
100        if let Some(parent_dir) = std::path::Path::new(file_path).parent()
101            && let Err(e) = tokio::fs::create_dir_all(parent_dir).await
102        {
103            // TODO(minwgei): use `tracing` once deployments set up tracing logging (setup moved out of stdout)
104            eprintln!("Failed to create log file directory for EMF metrics: {}", e);
105        }
106
107        // Only attempt to get Tokio runtime within async to be safe.
108        let rt_monitor = tokio_metrics::RuntimeMonitor::new(&tokio::runtime::Handle::current());
109        let mut rt_intervals = rt_monitor.intervals();
110
111        loop {
112            let _ = tokio::time::sleep(interval).await;
113
114            let dfir_metrics = dfir_intervals.take_interval();
115            let rt_metrics = rt_intervals.next().unwrap();
116
117            let unwind_result = AssertUnwindSafe(async {
118                let timestamp = SystemTime::now();
119
120                let file = tokio::fs::OpenOptions::new()
121                    .write(true)
122                    .create(true)
123                    .truncate(false)
124                    .append(true)
125                    .open(file_path)
126                    .await
127                    .expect("Failed to open log file for EMF metrics.");
128                let mut writer = tokio::io::BufWriter::new(file);
129
130                record_metrics_dfir(
131                    namespace,
132                    location_name,
133                    timestamp,
134                    dfir_metrics,
135                    &mut writer,
136                )
137                .await
138                .unwrap();
139
140                record_metrics_tokio(namespace, location_name, timestamp, rt_metrics, &mut writer)
141                    .await
142                    .unwrap();
143
144                writer.shutdown().await.unwrap();
145            })
146            .catch_unwind()
147            .await;
148
149            if let Err(panic_reason) = unwind_result {
150                // TODO(minwgei): use `tracing` once deployments set up tracing logging (setup coordination moved out of stdout)
151                eprintln!("Panic in metrics sidecar: {panic_reason:?}");
152            }
153        }
154    }
155}
156
157#[cfg(feature = "runtime_support")]
158/// Records DFIR metrics.
159async fn record_metrics_dfir<W>(
160    namespace: &str,
161    location_name: &str,
162    timestamp: SystemTime,
163    metrics: DfirMetrics,
164    writer: &mut W,
165) -> Result<(), std::io::Error>
166where
167    W: AsyncWrite + Unpin,
168{
169    let ts_millis = timestamp
170        .duration_since(SystemTime::UNIX_EPOCH)
171        .unwrap()
172        .as_millis();
173
174    // Handoffs
175    for (hoff_id, hoff_metrics) in metrics.handoffs.iter() {
176        let emf = json!({
177            "_aws": {
178                "Timestamp": ts_millis,
179                "CloudWatchMetrics": [
180                    {
181                        "Namespace": namespace,
182                        "Dimensions": [["LocationName"], ["LocationName", "HandoffId"]],
183                        "Metrics": [
184                            {"Name": "CurrItemsCount", "Unit": Unit::Count},
185                            {"Name": "TotalItemsCount", "Unit": Unit::Count},
186                        ]
187                    }
188                ]
189            },
190            "LocationName": location_name,
191            "HandoffId": hoff_id.to_string(),
192            "CurrItemsCount": hoff_metrics.curr_items_count(),
193            "TotalItemsCount": hoff_metrics.total_items_count(),
194        })
195        .to_string();
196        writer.write_all(emf.as_bytes()).await?;
197        writer.write_u8(b'\n').await?;
198    }
199
200    // Subgraphs
201    for (sg_id, sg_metrics) in metrics.subgraphs.iter() {
202        let emf = json!({
203            "_aws": {
204                "Timestamp": ts_millis,
205                "CloudWatchMetrics": [
206                    {
207                        "Namespace": namespace,
208                        "Dimensions": [["LocationName"], ["LocationName", "SubgraphId"]],
209                        "Metrics": [
210                            {"Name": "TotalRunCount", "Unit": Unit::Count},
211                            {"Name": "TotalPollDuration", "Unit": Unit::Microseconds},
212                            {"Name": "TotalPollCount", "Unit": Unit::Count},
213                            {"Name": "TotalIdleDuration", "Unit": Unit::Microseconds},
214                            {"Name": "TotalIdleCount", "Unit": Unit::Count},
215                        ]
216                    }
217                ]
218            },
219            "LocationName": location_name,
220            "SubgraphId": sg_id.to_string(),
221            "TotalRunCount": sg_metrics.total_run_count(),
222            "TotalPollDuration": sg_metrics.total_poll_duration().as_micros(),
223            "TotalPollCount": sg_metrics.total_poll_count(),
224            "TotalIdleDuration": sg_metrics.total_idle_duration().as_micros(),
225            "TotalIdleCount": sg_metrics.total_idle_count(),
226        })
227        .to_string();
228        writer.write_all(emf.as_bytes()).await?;
229        writer.write_u8(b'\n').await?;
230    }
231
232    Ok(())
233}
234
235#[cfg(feature = "runtime_support")]
236/// Records tokio runtime metrics.
237async fn record_metrics_tokio<W>(
238    namespace: &str,
239    location_name: &str,
240    timestamp: SystemTime,
241    rt_metrics: RuntimeMetrics,
242    writer: &mut W,
243) -> Result<(), std::io::Error>
244where
245    W: AsyncWrite + Unpin,
246{
247    let ts_millis = timestamp
248        .duration_since(SystemTime::UNIX_EPOCH)
249        .unwrap()
250        .as_millis();
251
252    // Tokio RuntimeMetrics
253    let emf = json!({
254        "_aws": {
255            "Timestamp": ts_millis,
256            "CloudWatchMetrics": [
257                {
258                    "Namespace": namespace,
259                    "Dimensions": [["LocationName"]],
260                    "Metrics": [
261                        // {"Name": "LiveTasksCount", "Unit": Unit::Count}, // https://github.com/tokio-rs/tokio-metrics/pull/108
262                        {"Name": "TotalBusyDuration", "Unit": Unit::Microseconds},
263                        {"Name": "GlobalQueueDepth", "Unit": Unit::Count},
264                    ]
265                }
266            ]
267        },
268        "LocationName": location_name,
269        // "LiveTasksCount": rt_metrics.live_tasks_count, // https://github.com/tokio-rs/tokio-metrics/pull/108
270        "TotalBusyDuration": rt_metrics.total_busy_duration.as_micros(),
271        "GlobalQueueDepth": rt_metrics.global_queue_depth,
272        // The rest of the tokio runtime metrics are `cfg(tokio_unstable)`
273    })
274    .to_string();
275    writer.write_all(emf.as_bytes()).await?;
276    writer.write_u8(b'\n').await?;
277
278    Ok(())
279}
280
281/// AWS CloudWatch EMF units.
282///
283/// <https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html#ACW-Type-MetricDatum-Unit>
284#[expect(missing_docs, reason = "self-explanatory")]
285#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
286pub enum Unit {
287    /// None
288    #[default]
289    None,
290    Seconds,
291    Microseconds,
292    Milliseconds,
293    Bytes,
294    Kilobytes,
295    Megabytes,
296    Gigabytes,
297    Terabytes,
298    Bits,
299    Kilobits,
300    Megabits,
301    Gigabits,
302    Terabits,
303    Percent,
304    Count,
305    #[serde(rename = "Bytes/Second")]
306    BytesPerSecond,
307    #[serde(rename = "Kilobytes/Second")]
308    KilobytesPerSecond,
309    #[serde(rename = "Megabytes/Second")]
310    MegabytesPerSecond,
311    #[serde(rename = "Gigabytes/Second")]
312    GigabytesPerSecond,
313    #[serde(rename = "Terabytes/Second")]
314    TerabytesPerSecond,
315    #[serde(rename = "Bits/Second")]
316    BitsPerSecond,
317    #[serde(rename = "Kilobits/Second")]
318    KilobitsPerSecond,
319    #[serde(rename = "Megabits/Second")]
320    MegabitsPerSecond,
321    #[serde(rename = "Gigabits/Second")]
322    GigabitsPerSecond,
323    #[serde(rename = "Terabits/Second")]
324    TerabitsPerSecond,
325    #[serde(rename = "Count/Second")]
326    CountPerSecond,
327}