hydro_deploy/localhost/
launched_binary.rs1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3use std::process::{ExitStatus, Stdio};
4use std::sync::OnceLock;
5
6use anyhow::{Result, bail};
7use async_process::Command;
8use async_trait::async_trait;
9use futures::io::BufReader as FuturesBufReader;
10use futures::{AsyncBufReadExt as _, AsyncWriteExt as _};
11use inferno::collapse::Collapse;
12use inferno::collapse::perf::Folder as PerfFolder;
13use tempfile::NamedTempFile;
14use tokio::io::{AsyncBufReadExt as _, BufReader as TokioBufReader};
15use tokio::sync::{mpsc, oneshot};
16use tokio_util::compat::FuturesAsyncReadCompatExt;
17use tokio_util::io::SyncIoBridge;
18
19#[cfg(any(target_os = "macos", target_family = "windows"))]
20use super::samply::{FxProfile, samply_to_folded};
21use crate::progress::ProgressTracker;
22use crate::rust_crate::flamegraph::handle_fold_data;
23use crate::rust_crate::tracing_options::TracingOptions;
24use crate::util::{PriorityBroadcast, prioritized_broadcast};
25use crate::{LaunchedBinary, TracingResults};
26
27pub(super) struct TracingDataLocal {
28 pub(super) outfile: NamedTempFile,
29}
30
31pub struct LaunchedLocalhostBinary {
32 child: tokio::sync::Mutex<async_process::Child>,
34 tracing_config: Option<TracingOptions>,
35 tracing_data_local: std::sync::Mutex<Option<TracingDataLocal>>,
36 tracing_results: OnceLock<TracingResults>,
37 stdin_sender: mpsc::UnboundedSender<String>,
38 stdout_broadcast: PriorityBroadcast,
39 stderr_broadcast: PriorityBroadcast,
40}
41
42#[cfg(unix)]
43impl Drop for LaunchedLocalhostBinary {
44 fn drop(&mut self) {
45 let child = self.child.get_mut();
46
47 if let Ok(Some(_)) = child.try_status() {
48 return;
49 }
50
51 let pid = child.id();
52 if let Err(e) = nix::sys::signal::kill(
53 nix::unistd::Pid::from_raw(pid as i32),
54 nix::sys::signal::SIGTERM,
55 ) {
56 ProgressTracker::println(format!("Failed to SIGTERM process {}: {}", pid, e));
57 }
58 }
59}
60
61impl LaunchedLocalhostBinary {
62 pub(super) fn new(
63 mut child: async_process::Child,
64 id: String,
65 tracing_config: Option<TracingOptions>,
66 tracing_data_local: Option<TracingDataLocal>,
67 ) -> Self {
68 let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
69 let mut stdin = child.stdin.take().unwrap();
70 tokio::spawn(async move {
71 while let Some(line) = stdin_receiver.recv().await {
72 if stdin.write_all(line.as_bytes()).await.is_err() {
73 break;
74 }
75
76 stdin.flush().await.ok();
77 }
78 });
79
80 let id_clone = id.clone();
81 let stdout_broadcast = prioritized_broadcast(
82 FuturesBufReader::new(child.stdout.take().unwrap()).lines(),
83 move |s| ProgressTracker::println(format!("[{id_clone}] {s}")),
84 );
85 let stderr_broadcast = prioritized_broadcast(
86 FuturesBufReader::new(child.stderr.take().unwrap()).lines(),
87 move |s| ProgressTracker::println(format!("[{id} stderr] {s}")),
88 );
89
90 Self {
91 child: tokio::sync::Mutex::new(child),
92 tracing_config,
93 tracing_data_local: std::sync::Mutex::new(tracing_data_local),
94 tracing_results: OnceLock::new(),
95 stdin_sender,
96 stdout_broadcast,
97 stderr_broadcast,
98 }
99 }
100}
101
102#[async_trait]
103impl LaunchedBinary for LaunchedLocalhostBinary {
104 fn stdin(&self) -> mpsc::UnboundedSender<String> {
105 self.stdin_sender.clone()
106 }
107
108 fn deploy_stdout(&self) -> oneshot::Receiver<String> {
109 self.stdout_broadcast.receive_priority()
110 }
111
112 fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
113 self.stdout_broadcast.receive(None)
114 }
115
116 fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
117 self.stderr_broadcast.receive(None)
118 }
119
120 fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
121 self.stdout_broadcast.receive(Some(prefix))
122 }
123
124 fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
125 self.stderr_broadcast.receive(Some(prefix))
126 }
127
128 fn tracing_results(&self) -> Option<&TracingResults> {
129 self.tracing_results.get()
130 }
131
132 fn exit_code(&self) -> Option<i32> {
133 self.child
134 .try_lock()
135 .ok()
136 .and_then(|mut child| child.try_status().ok())
137 .flatten()
138 .map(exit_code)
139 }
140
141 async fn wait(&self) -> Result<i32> {
142 Ok(exit_code(self.child.lock().await.status().await?))
143 }
144
145 async fn stop(&self) -> Result<()> {
146 if let Err(err) = { self.child.lock().await.kill() }
147 && !matches!(err.kind(), std::io::ErrorKind::InvalidInput)
148 {
149 Err(err)?;
150 }
151
152 if let Some(tracing_config) = self.tracing_config.as_ref() {
154 assert!(
155 self.tracing_results.get().is_none(),
156 "`tracing_results` already set! Was `stop()` called twice? This is a bug."
157 );
158 let tracing_data =
159 {
160 self.tracing_data_local.lock().unwrap().take().expect(
161 "`tracing_data_local` empty, was `stop()` called twice? This is a bug.",
162 )
163 };
164
165 if cfg!(any(target_os = "macos", target_family = "windows")) {
166 if let Some(samply_outfile) = tracing_config.samply_outfile.as_ref() {
167 std::fs::copy(&tracing_data.outfile, samply_outfile)?;
168 }
169 } else if cfg!(target_family = "unix")
170 && let Some(perf_outfile) = tracing_config.perf_raw_outfile.as_ref()
171 {
172 std::fs::copy(&tracing_data.outfile, perf_outfile)?;
173 }
174
175 let fold_data = if cfg!(any(target_os = "macos", target_family = "windows")) {
176 #[cfg(any(target_os = "macos", target_family = "windows"))]
177 {
178 let deserializer = &mut serde_json::Deserializer::from_reader(
179 std::fs::File::open(tracing_data.outfile.path())?,
180 );
181 let loaded = serde_path_to_error::deserialize::<_, FxProfile>(deserializer)?;
182
183 ProgressTracker::leaf("processing samply", samply_to_folded(loaded))
184 .await
185 .into()
186 }
187
188 #[cfg(not(any(target_os = "macos", target_family = "windows")))]
189 {
190 unreachable!()
191 }
192 } else if cfg!(target_family = "unix") {
193 let mut perf_script = Command::new("perf")
195 .args(["script", "--symfs=/", "-i"])
196 .arg(tracing_data.outfile.path())
197 .stdout(Stdio::piped())
198 .stderr(Stdio::piped())
199 .spawn()?;
200
201 let stdout = perf_script.stdout.take().unwrap().compat();
202 let mut stderr_lines =
203 TokioBufReader::new(perf_script.stderr.take().unwrap().compat()).lines();
204
205 let mut fold_er =
206 PerfFolder::from(tracing_config.fold_perf_options.clone().unwrap_or_default());
207
208 let ((), fold_data, ()) = tokio::try_join!(
210 async move {
211 while let Ok(Some(s)) = stderr_lines.next_line().await {
213 ProgressTracker::println(format!("[perf script stderr] {s}"));
214 }
215 Result::<_>::Ok(())
216 },
217 async move {
218 tokio::task::spawn_blocking(move || {
220 let mut fold_data = Vec::new();
221 fold_er.collapse(
222 SyncIoBridge::new(tokio::io::BufReader::new(stdout)),
223 &mut fold_data,
224 )?;
225 Ok(fold_data)
226 })
227 .await?
228 },
229 async move {
230 perf_script.status().await?;
232 Ok(())
233 },
234 )?;
235 fold_data
236 } else {
237 bail!(
238 "Unknown OS for samply/perf tracing: {}",
239 std::env::consts::OS
240 );
241 };
242
243 handle_fold_data(tracing_config, fold_data.clone()).await?;
244
245 self.tracing_results
246 .set(TracingResults {
247 folded_data: fold_data,
248 })
249 .expect("`tracing_results` already set! This is a bug.");
250 };
251
252 Ok(())
253 }
254}
255
256fn exit_code(c: ExitStatus) -> i32 {
257 #[cfg(unix)]
258 return c.code().or(c.signal()).unwrap();
259 #[cfg(not(unix))]
260 return c.code().unwrap();
261}