Skip to main content

hydro_deploy/localhost/
mod.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::{Arc, OnceLock};
4
5use anyhow::{Result, bail};
6use async_process::{Command, Stdio};
7use async_trait::async_trait;
8use hydro_deploy_integration::ServerBindConfig;
9
10use crate::progress::ProgressTracker;
11use crate::rust_crate::build::BuildOutput;
12use crate::rust_crate::tracing_options::TracingOptions;
13use crate::{
14    BaseServerStrategy, ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary,
15    LaunchedHost, PortNetworkHint, ResourceBatch, ResourceResult,
16};
17
18pub mod launched_binary;
19pub use launched_binary::*;
20
21#[cfg(feature = "profile-folding")]
22#[cfg(any(target_os = "macos", target_family = "windows"))]
23mod samply;
24
25static LOCAL_LIBDIR: OnceLock<String> = OnceLock::new();
26
27#[derive(Debug)]
28pub struct LocalhostHost {
29    pub id: usize,
30    client_only: bool,
31}
32
33impl LocalhostHost {
34    pub fn new(id: usize) -> LocalhostHost {
35        LocalhostHost {
36            id,
37            client_only: false,
38        }
39    }
40
41    pub fn client_only(&self) -> LocalhostHost {
42        LocalhostHost {
43            id: self.id,
44            client_only: true,
45        }
46    }
47}
48
49impl Host for LocalhostHost {
50    fn target_type(&self) -> HostTargetType {
51        HostTargetType::Local
52    }
53
54    fn request_port_base(&self, _bind_type: &BaseServerStrategy) {}
55    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
56    fn request_custom_binary(&self) {}
57
58    fn id(&self) -> usize {
59        self.id
60    }
61
62    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
63        Some(Arc::new(LaunchedLocalhost))
64    }
65
66    fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
67        Arc::new(LaunchedLocalhost)
68    }
69
70    fn strategy_as_server<'a>(
71        &'a self,
72        connection_from: &dyn Host,
73        network_hint: PortNetworkHint,
74    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
75        if self.client_only {
76            anyhow::bail!("Localhost cannot be a server if it is client only")
77        }
78
79        if matches!(network_hint, PortNetworkHint::Auto)
80            && connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id))
81        {
82            Ok((
83                ClientStrategy::UnixSocket(self.id),
84                Box::new(|_| BaseServerStrategy::UnixSocket),
85            ))
86        } else if matches!(
87            network_hint,
88            PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
89        ) && connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self))
90        {
91            Ok((
92                ClientStrategy::InternalTcpPort(self),
93                Box::new(move |_| {
94                    BaseServerStrategy::InternalTcpPort(match network_hint {
95                        PortNetworkHint::Auto => None,
96                        PortNetworkHint::TcpPort(port) => port,
97                    })
98                }),
99            ))
100        } else {
101            anyhow::bail!("Could not find a strategy to connect to localhost")
102        }
103    }
104
105    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
106        match typ {
107            ClientStrategy::UnixSocket(id) => {
108                #[cfg(unix)]
109                {
110                    self.id == id
111                }
112
113                #[cfg(not(unix))]
114                {
115                    let _ = id;
116                    false
117                }
118            }
119            ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
120            ClientStrategy::ForwardedTcpPort(_) => true,
121        }
122    }
123}
124
125struct LaunchedLocalhost;
126
127#[async_trait]
128impl LaunchedHost for LaunchedLocalhost {
129    fn base_server_config(&self, bind_type: &BaseServerStrategy) -> ServerBindConfig {
130        match bind_type {
131            BaseServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
132            BaseServerStrategy::InternalTcpPort(port) => {
133                ServerBindConfig::TcpPort("127.0.0.1".to_owned(), *port)
134            }
135            BaseServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
136        }
137    }
138
139    async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
140        Ok(())
141    }
142
143    async fn launch_binary(
144        &self,
145        id: String,
146        binary: &BuildOutput,
147        args: &[String],
148        tracing: Option<TracingOptions>,
149        env: &HashMap<String, String>,
150        pin_to_core: Option<usize>,
151    ) -> Result<Box<dyn LaunchedBinary>> {
152        if pin_to_core.is_some() {
153            ProgressTracker::println(format!(
154                "[{id}] pin_to_core is not supported on localhost, ignoring"
155            ));
156        }
157
158        let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
159            if cfg!(any(target_os = "macos", target_family = "windows")) {
160                // samply
161                ProgressTracker::println(
162                    format!("[{id} tracing] Profiling binary with `samply`.",),
163                );
164                let samply_outfile = tempfile::NamedTempFile::new()?;
165
166                let mut command = Command::new("samply");
167                command
168                    .arg("record")
169                    .arg("--save-only")
170                    .arg("--output")
171                    .arg(samply_outfile.as_ref())
172                    .arg(&binary.bin_path)
173                    .args(args);
174                (Some(samply_outfile), command)
175            } else if cfg!(target_family = "unix") {
176                // perf
177                ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
178                let perf_outfile = tempfile::NamedTempFile::new()?;
179
180                let mut command = Command::new("perf");
181                command
182                    .args([
183                        "record",
184                        "-F",
185                        &tracing.frequency.to_string(),
186                        "-e",
187                        "cycles:u",
188                        "--call-graph",
189                        "dwarf,65528",
190                        "-o",
191                    ])
192                    .arg(perf_outfile.as_ref())
193                    .arg(&binary.bin_path)
194                    .args(args);
195
196                (Some(perf_outfile), command)
197            } else {
198                bail!(
199                    "Unknown OS for samply/perf tracing: {}",
200                    std::env::consts::OS
201                );
202            }
203        } else {
204            let mut command = Command::new(&binary.bin_path);
205            command.args(args);
206            (None, command)
207        };
208
209        // from cargo (https://github.com/rust-lang/cargo/blob/master/crates/cargo-util/src/paths.rs#L38)
210        let dylib_path_var = if cfg!(windows) {
211            "PATH"
212        } else if cfg!(target_os = "macos") {
213            "DYLD_FALLBACK_LIBRARY_PATH"
214        } else if cfg!(target_os = "aix") {
215            "LIBPATH"
216        } else {
217            "LD_LIBRARY_PATH"
218        };
219
220        let local_libdir = LOCAL_LIBDIR.get_or_init(|| {
221            std::process::Command::new("rustc")
222                .arg("--print")
223                .arg("target-libdir")
224                .output()
225                .map(|output| str::from_utf8(&output.stdout).unwrap().trim().to_owned())
226                .unwrap()
227        });
228
229        command.env(
230            dylib_path_var,
231            std::env::var_os(dylib_path_var).map_or_else(
232                || {
233                    std::env::join_paths(
234                        [
235                            binary.shared_library_path.as_ref(),
236                            Some(&std::path::PathBuf::from(local_libdir)),
237                        ]
238                        .into_iter()
239                        .flatten(),
240                    )
241                    .unwrap()
242                },
243                |paths| {
244                    let mut paths = std::env::split_paths(&paths).collect::<Vec<_>>();
245                    paths.insert(0, std::path::PathBuf::from(local_libdir));
246                    if let Some(shared_path) = &binary.shared_library_path {
247                        paths.insert(0, shared_path.to_path_buf());
248                    }
249                    std::env::join_paths(paths).unwrap()
250                },
251            ),
252        );
253
254        command.envs(env);
255
256        command
257            .stdin(Stdio::piped())
258            .stdout(Stdio::piped())
259            .stderr(Stdio::piped());
260
261        #[cfg(not(target_family = "unix"))]
262        command.kill_on_drop(true);
263
264        ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
265
266        let child = command.spawn().map_err(|e| {
267            let msg = if maybe_perf_outfile.is_some() && std::io::ErrorKind::NotFound == e.kind() {
268                "Tracing executable not found, ensure it is installed"
269            } else {
270                "Failed to execute command"
271            };
272            anyhow::Error::new(e).context(format!("{}: {:?}", msg, command))
273        })?;
274
275        Ok(Box::new(LaunchedLocalhostBinary::new(
276            child,
277            id,
278            tracing,
279            maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
280        )))
281    }
282
283    async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
284        Ok(*addr)
285    }
286}