1#[cfg(feature = "runtime_support")]
5use std::marker::Unpin;
6#[cfg(feature = "runtime_support")]
7use std::panic::AssertUnwindSafe;
8use std::time::Duration;
9#[cfg(feature = "runtime_support")]
10use std::time::SystemTime;
11
12#[cfg(feature = "runtime_support")]
13use dfir_rs::Never;
14#[cfg(feature = "runtime_support")]
15use dfir_rs::scheduled::graph::Dfir;
16#[cfg(feature = "runtime_support")]
17use dfir_rs::scheduled::metrics::DfirMetrics;
18#[cfg(feature = "runtime_support")]
19use futures::FutureExt;
20use quote::quote;
21#[cfg(feature = "runtime_support")]
22use serde_json::json;
23use syn::parse_quote;
24#[cfg(feature = "runtime_support")]
25use tokio::io::{AsyncWrite, AsyncWriteExt};
26#[cfg(feature = "runtime_support")]
27use tokio_metrics::RuntimeMetrics;
28
29use crate::location::{LocationKey, LocationType};
30use crate::staging_util::get_this_crate;
31use crate::telemetry::Sidecar;
32
33pub const DEFAULT_FILE_PATH: &str = "/var/log/hydro/metrics.log";
35pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(30);
37
38pub struct RecordMetricsSidecar {
40 file_path: String,
41 interval: Duration,
42}
43
44#[buildstructor::buildstructor]
45impl RecordMetricsSidecar {
46 #[builder]
48 pub fn new(file_path: Option<String>, interval: Option<Duration>) -> Self {
49 Self {
50 file_path: file_path.unwrap_or_else(|| DEFAULT_FILE_PATH.to_owned()),
51 interval: interval.unwrap_or(DEFAULT_INTERVAL),
52 }
53 }
54}
55
56impl Sidecar for RecordMetricsSidecar {
57 fn to_expr(
58 &self,
59 flow_name: &str,
60 _location_key: LocationKey,
61 _location_type: LocationType,
62 location_name: &str,
63 dfir_ident: &syn::Ident,
64 ) -> syn::Expr {
65 let Self {
66 file_path,
67 interval,
68 } = self;
69
70 let root = get_this_crate();
71 let namespace = flow_name.replace(char::is_whitespace, "_");
72 let interval: proc_macro2::TokenStream = {
73 let secs = interval.as_secs();
74 let nanos = interval.subsec_nanos();
75 quote!(::std::time::Duration::new(#secs, #nanos))
76 };
77
78 parse_quote! {
79 #root::telemetry::emf::record_metrics_sidecar(&#dfir_ident, #namespace, #location_name, #file_path, #interval)
80 }
81 }
82}
83
84#[cfg(feature = "runtime_support")]
86#[doc(hidden)]
87pub fn record_metrics_sidecar(
88 dfir: &Dfir<'_>,
89 namespace: &'static str,
90 location_name: &'static str,
91 file_path: &'static str,
92 interval: Duration,
93) -> impl 'static + Future<Output = Never> {
94 assert!(!namespace.contains(char::is_whitespace));
95
96 let mut dfir_intervals = dfir.metrics_intervals();
97
98 async move {
99 if let Some(parent_dir) = std::path::Path::new(file_path).parent()
101 && let Err(e) = tokio::fs::create_dir_all(parent_dir).await
102 {
103 eprintln!("Failed to create log file directory for EMF metrics: {}", e);
105 }
106
107 let rt_monitor = tokio_metrics::RuntimeMonitor::new(&tokio::runtime::Handle::current());
109 let mut rt_intervals = rt_monitor.intervals();
110
111 loop {
112 let _ = tokio::time::sleep(interval).await;
113
114 let dfir_metrics = dfir_intervals.take_interval();
115 let rt_metrics = rt_intervals.next().unwrap();
116
117 let unwind_result = AssertUnwindSafe(async {
118 let timestamp = SystemTime::now();
119
120 let file = tokio::fs::OpenOptions::new()
121 .write(true)
122 .create(true)
123 .truncate(false)
124 .append(true)
125 .open(file_path)
126 .await
127 .expect("Failed to open log file for EMF metrics.");
128 let mut writer = tokio::io::BufWriter::new(file);
129
130 record_metrics_dfir(
131 namespace,
132 location_name,
133 timestamp,
134 dfir_metrics,
135 &mut writer,
136 )
137 .await
138 .unwrap();
139
140 record_metrics_tokio(namespace, location_name, timestamp, rt_metrics, &mut writer)
141 .await
142 .unwrap();
143
144 writer.shutdown().await.unwrap();
145 })
146 .catch_unwind()
147 .await;
148
149 if let Err(panic_reason) = unwind_result {
150 eprintln!("Panic in metrics sidecar: {panic_reason:?}");
152 }
153 }
154 }
155}
156
157#[cfg(feature = "runtime_support")]
158async fn record_metrics_dfir<W>(
160 namespace: &str,
161 location_name: &str,
162 timestamp: SystemTime,
163 metrics: DfirMetrics,
164 writer: &mut W,
165) -> Result<(), std::io::Error>
166where
167 W: AsyncWrite + Unpin,
168{
169 let ts_millis = timestamp
170 .duration_since(SystemTime::UNIX_EPOCH)
171 .unwrap()
172 .as_millis();
173
174 for (hoff_id, hoff_metrics) in metrics.handoffs.iter() {
176 let emf = json!({
177 "_aws": {
178 "Timestamp": ts_millis,
179 "CloudWatchMetrics": [
180 {
181 "Namespace": namespace,
182 "Dimensions": [["LocationName"], ["LocationName", "HandoffId"]],
183 "Metrics": [
184 {"Name": "CurrItemsCount", "Unit": Unit::Count},
185 {"Name": "TotalItemsCount", "Unit": Unit::Count},
186 ]
187 }
188 ]
189 },
190 "LocationName": location_name,
191 "HandoffId": hoff_id.to_string(),
192 "CurrItemsCount": hoff_metrics.curr_items_count(),
193 "TotalItemsCount": hoff_metrics.total_items_count(),
194 })
195 .to_string();
196 writer.write_all(emf.as_bytes()).await?;
197 writer.write_u8(b'\n').await?;
198 }
199
200 for (sg_id, sg_metrics) in metrics.subgraphs.iter() {
202 let emf = json!({
203 "_aws": {
204 "Timestamp": ts_millis,
205 "CloudWatchMetrics": [
206 {
207 "Namespace": namespace,
208 "Dimensions": [["LocationName"], ["LocationName", "SubgraphId"]],
209 "Metrics": [
210 {"Name": "TotalRunCount", "Unit": Unit::Count},
211 {"Name": "TotalPollDuration", "Unit": Unit::Microseconds},
212 {"Name": "TotalPollCount", "Unit": Unit::Count},
213 {"Name": "TotalIdleDuration", "Unit": Unit::Microseconds},
214 {"Name": "TotalIdleCount", "Unit": Unit::Count},
215 ]
216 }
217 ]
218 },
219 "LocationName": location_name,
220 "SubgraphId": sg_id.to_string(),
221 "TotalRunCount": sg_metrics.total_run_count(),
222 "TotalPollDuration": sg_metrics.total_poll_duration().as_micros(),
223 "TotalPollCount": sg_metrics.total_poll_count(),
224 "TotalIdleDuration": sg_metrics.total_idle_duration().as_micros(),
225 "TotalIdleCount": sg_metrics.total_idle_count(),
226 })
227 .to_string();
228 writer.write_all(emf.as_bytes()).await?;
229 writer.write_u8(b'\n').await?;
230 }
231
232 Ok(())
233}
234
235#[cfg(feature = "runtime_support")]
236async fn record_metrics_tokio<W>(
238 namespace: &str,
239 location_name: &str,
240 timestamp: SystemTime,
241 rt_metrics: RuntimeMetrics,
242 writer: &mut W,
243) -> Result<(), std::io::Error>
244where
245 W: AsyncWrite + Unpin,
246{
247 let ts_millis = timestamp
248 .duration_since(SystemTime::UNIX_EPOCH)
249 .unwrap()
250 .as_millis();
251
252 let emf = json!({
254 "_aws": {
255 "Timestamp": ts_millis,
256 "CloudWatchMetrics": [
257 {
258 "Namespace": namespace,
259 "Dimensions": [["LocationName"]],
260 "Metrics": [
261 {"Name": "TotalBusyDuration", "Unit": Unit::Microseconds},
263 {"Name": "GlobalQueueDepth", "Unit": Unit::Count},
264 ]
265 }
266 ]
267 },
268 "LocationName": location_name,
269 "TotalBusyDuration": rt_metrics.total_busy_duration.as_micros(),
271 "GlobalQueueDepth": rt_metrics.global_queue_depth,
272 })
274 .to_string();
275 writer.write_all(emf.as_bytes()).await?;
276 writer.write_u8(b'\n').await?;
277
278 Ok(())
279}
280
281#[expect(missing_docs, reason = "self-explanatory")]
285#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
286pub enum Unit {
287 #[default]
289 None,
290 Seconds,
291 Microseconds,
292 Milliseconds,
293 Bytes,
294 Kilobytes,
295 Megabytes,
296 Gigabytes,
297 Terabytes,
298 Bits,
299 Kilobits,
300 Megabits,
301 Gigabits,
302 Terabits,
303 Percent,
304 Count,
305 #[serde(rename = "Bytes/Second")]
306 BytesPerSecond,
307 #[serde(rename = "Kilobytes/Second")]
308 KilobytesPerSecond,
309 #[serde(rename = "Megabytes/Second")]
310 MegabytesPerSecond,
311 #[serde(rename = "Gigabytes/Second")]
312 GigabytesPerSecond,
313 #[serde(rename = "Terabytes/Second")]
314 TerabytesPerSecond,
315 #[serde(rename = "Bits/Second")]
316 BitsPerSecond,
317 #[serde(rename = "Kilobits/Second")]
318 KilobitsPerSecond,
319 #[serde(rename = "Megabits/Second")]
320 MegabitsPerSecond,
321 #[serde(rename = "Gigabits/Second")]
322 GigabitsPerSecond,
323 #[serde(rename = "Terabits/Second")]
324 TerabitsPerSecond,
325 #[serde(rename = "Count/Second")]
326 CountPerSecond,
327}