hydro_deploy/localhost/
launched_binary.rs

1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3use std::process::{ExitStatus, Stdio};
4use std::sync::OnceLock;
5
6use anyhow::{Result, bail};
7use async_process::Command;
8use async_trait::async_trait;
9use futures::io::BufReader as FuturesBufReader;
10use futures::{AsyncBufReadExt as _, AsyncWriteExt as _};
11use inferno::collapse::Collapse;
12use inferno::collapse::perf::Folder as PerfFolder;
13use tempfile::NamedTempFile;
14use tokio::io::{AsyncBufReadExt as _, BufReader as TokioBufReader};
15use tokio::sync::{mpsc, oneshot};
16use tokio_util::compat::FuturesAsyncReadCompatExt;
17use tokio_util::io::SyncIoBridge;
18
19#[cfg(any(target_os = "macos", target_family = "windows"))]
20use super::samply::{FxProfile, samply_to_folded};
21use crate::progress::ProgressTracker;
22use crate::rust_crate::flamegraph::handle_fold_data;
23use crate::rust_crate::tracing_options::TracingOptions;
24use crate::util::{PriorityBroadcast, prioritized_broadcast};
25use crate::{LaunchedBinary, TracingResults};
26
27pub(super) struct TracingDataLocal {
28    pub(super) outfile: NamedTempFile,
29}
30
31pub struct LaunchedLocalhostBinary {
32    /// Must use async mutex -- we will .await methods within the child (while holding lock).
33    child: tokio::sync::Mutex<async_process::Child>,
34    tracing_config: Option<TracingOptions>,
35    tracing_data_local: std::sync::Mutex<Option<TracingDataLocal>>,
36    tracing_results: OnceLock<TracingResults>,
37    stdin_sender: mpsc::UnboundedSender<String>,
38    stdout_broadcast: PriorityBroadcast,
39    stderr_broadcast: PriorityBroadcast,
40}
41
42#[cfg(unix)]
43impl Drop for LaunchedLocalhostBinary {
44    fn drop(&mut self) {
45        let child = self.child.get_mut();
46
47        if let Ok(Some(_)) = child.try_status() {
48            return;
49        }
50
51        let pid = child.id();
52        if let Err(e) = nix::sys::signal::kill(
53            nix::unistd::Pid::from_raw(pid as i32),
54            nix::sys::signal::SIGTERM,
55        ) {
56            ProgressTracker::println(format!("Failed to SIGTERM process {}: {}", pid, e));
57        }
58    }
59}
60
61impl LaunchedLocalhostBinary {
62    pub(super) fn new(
63        mut child: async_process::Child,
64        id: String,
65        tracing_config: Option<TracingOptions>,
66        tracing_data_local: Option<TracingDataLocal>,
67    ) -> Self {
68        let (stdin_sender, mut stdin_receiver) = mpsc::unbounded_channel::<String>();
69        let mut stdin = child.stdin.take().unwrap();
70        tokio::spawn(async move {
71            while let Some(line) = stdin_receiver.recv().await {
72                if stdin.write_all(line.as_bytes()).await.is_err() {
73                    break;
74                }
75
76                stdin.flush().await.ok();
77            }
78        });
79
80        let id_clone = id.clone();
81        let stdout_broadcast = prioritized_broadcast(
82            FuturesBufReader::new(child.stdout.take().unwrap()).lines(),
83            move |s| ProgressTracker::println(format!("[{id_clone}] {s}")),
84        );
85        let stderr_broadcast = prioritized_broadcast(
86            FuturesBufReader::new(child.stderr.take().unwrap()).lines(),
87            move |s| ProgressTracker::println(format!("[{id} stderr] {s}")),
88        );
89
90        Self {
91            child: tokio::sync::Mutex::new(child),
92            tracing_config,
93            tracing_data_local: std::sync::Mutex::new(tracing_data_local),
94            tracing_results: OnceLock::new(),
95            stdin_sender,
96            stdout_broadcast,
97            stderr_broadcast,
98        }
99    }
100}
101
102#[async_trait]
103impl LaunchedBinary for LaunchedLocalhostBinary {
104    fn stdin(&self) -> mpsc::UnboundedSender<String> {
105        self.stdin_sender.clone()
106    }
107
108    fn deploy_stdout(&self) -> oneshot::Receiver<String> {
109        self.stdout_broadcast.receive_priority()
110    }
111
112    fn stdout(&self) -> mpsc::UnboundedReceiver<String> {
113        self.stdout_broadcast.receive(None)
114    }
115
116    fn stderr(&self) -> mpsc::UnboundedReceiver<String> {
117        self.stderr_broadcast.receive(None)
118    }
119
120    fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
121        self.stdout_broadcast.receive(Some(prefix))
122    }
123
124    fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String> {
125        self.stderr_broadcast.receive(Some(prefix))
126    }
127
128    fn tracing_results(&self) -> Option<&TracingResults> {
129        self.tracing_results.get()
130    }
131
132    fn exit_code(&self) -> Option<i32> {
133        self.child
134            .try_lock()
135            .ok()
136            .and_then(|mut child| child.try_status().ok())
137            .flatten()
138            .map(exit_code)
139    }
140
141    async fn wait(&self) -> Result<i32> {
142        Ok(exit_code(self.child.lock().await.status().await?))
143    }
144
145    async fn stop(&self) -> Result<()> {
146        if let Err(err) = { self.child.lock().await.kill() }
147            && !matches!(err.kind(), std::io::ErrorKind::InvalidInput)
148        {
149            Err(err)?;
150        }
151
152        // Run perf post-processing and download perf output.
153        if let Some(tracing_config) = self.tracing_config.as_ref() {
154            assert!(
155                self.tracing_results.get().is_none(),
156                "`tracing_results` already set! Was `stop()` called twice? This is a bug."
157            );
158            let tracing_data =
159                {
160                    self.tracing_data_local.lock().unwrap().take().expect(
161                        "`tracing_data_local` empty, was `stop()` called twice? This is a bug.",
162                    )
163                };
164
165            if cfg!(any(target_os = "macos", target_family = "windows")) {
166                if let Some(samply_outfile) = tracing_config.samply_outfile.as_ref() {
167                    std::fs::copy(&tracing_data.outfile, samply_outfile)?;
168                }
169            } else if cfg!(target_family = "unix")
170                && let Some(perf_outfile) = tracing_config.perf_raw_outfile.as_ref()
171            {
172                std::fs::copy(&tracing_data.outfile, perf_outfile)?;
173            }
174
175            let fold_data = if cfg!(any(target_os = "macos", target_family = "windows")) {
176                #[cfg(any(target_os = "macos", target_family = "windows"))]
177                {
178                    let deserializer = &mut serde_json::Deserializer::from_reader(
179                        std::fs::File::open(tracing_data.outfile.path())?,
180                    );
181                    let loaded = serde_path_to_error::deserialize::<_, FxProfile>(deserializer)?;
182
183                    ProgressTracker::leaf("processing samply", samply_to_folded(loaded))
184                        .await
185                        .into()
186                }
187
188                #[cfg(not(any(target_os = "macos", target_family = "windows")))]
189                {
190                    unreachable!()
191                }
192            } else if cfg!(target_family = "unix") {
193                // Run perf script.
194                let mut perf_script = Command::new("perf")
195                    .args(["script", "--symfs=/", "-i"])
196                    .arg(tracing_data.outfile.path())
197                    .stdout(Stdio::piped())
198                    .stderr(Stdio::piped())
199                    .spawn()?;
200
201                let stdout = perf_script.stdout.take().unwrap().compat();
202                let mut stderr_lines =
203                    TokioBufReader::new(perf_script.stderr.take().unwrap().compat()).lines();
204
205                let mut fold_er =
206                    PerfFolder::from(tracing_config.fold_perf_options.clone().unwrap_or_default());
207
208                // Pattern on `()` to make sure no `Result`s are ignored.
209                let ((), fold_data, ()) = tokio::try_join!(
210                    async move {
211                        // Log stderr.
212                        while let Ok(Some(s)) = stderr_lines.next_line().await {
213                            ProgressTracker::println(format!("[perf script stderr] {s}"));
214                        }
215                        Result::<_>::Ok(())
216                    },
217                    async move {
218                        // Stream `perf script` stdout and fold.
219                        tokio::task::spawn_blocking(move || {
220                            let mut fold_data = Vec::new();
221                            fold_er.collapse(
222                                SyncIoBridge::new(tokio::io::BufReader::new(stdout)),
223                                &mut fold_data,
224                            )?;
225                            Ok(fold_data)
226                        })
227                        .await?
228                    },
229                    async move {
230                        // Close stdin and wait for command exit.
231                        perf_script.status().await?;
232                        Ok(())
233                    },
234                )?;
235                fold_data
236            } else {
237                bail!(
238                    "Unknown OS for samply/perf tracing: {}",
239                    std::env::consts::OS
240                );
241            };
242
243            handle_fold_data(tracing_config, fold_data.clone()).await?;
244
245            self.tracing_results
246                .set(TracingResults {
247                    folded_data: fold_data,
248                })
249                .expect("`tracing_results` already set! This is a bug.");
250        };
251
252        Ok(())
253    }
254}
255
256fn exit_code(c: ExitStatus) -> i32 {
257    #[cfg(unix)]
258    return c.code().or(c.signal()).unwrap();
259    #[cfg(not(unix))]
260    return c.code().unwrap();
261}