hydro_deploy/localhost/
mod.rs

1use std::net::SocketAddr;
2use std::sync::{Arc, OnceLock};
3
4use anyhow::{Result, bail};
5use async_process::{Command, Stdio};
6use async_trait::async_trait;
7use hydro_deploy_integration::ServerBindConfig;
8
9use crate::progress::ProgressTracker;
10use crate::rust_crate::build::BuildOutput;
11use crate::rust_crate::tracing_options::TracingOptions;
12use crate::{
13    BaseServerStrategy, ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary,
14    LaunchedHost, PortNetworkHint, ResourceBatch, ResourceResult,
15};
16
17pub mod launched_binary;
18pub use launched_binary::*;
19
20#[cfg(any(target_os = "macos", target_family = "windows"))]
21mod samply;
22
23static LOCAL_LIBDIR: OnceLock<String> = OnceLock::new();
24
25#[derive(Debug)]
26pub struct LocalhostHost {
27    pub id: usize,
28    client_only: bool,
29}
30
31impl LocalhostHost {
32    pub fn new(id: usize) -> LocalhostHost {
33        LocalhostHost {
34            id,
35            client_only: false,
36        }
37    }
38
39    pub fn client_only(&self) -> LocalhostHost {
40        LocalhostHost {
41            id: self.id,
42            client_only: true,
43        }
44    }
45}
46
47impl Host for LocalhostHost {
48    fn target_type(&self) -> HostTargetType {
49        HostTargetType::Local
50    }
51
52    fn request_port_base(&self, _bind_type: &BaseServerStrategy) {}
53    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
54    fn request_custom_binary(&self) {}
55
56    fn id(&self) -> usize {
57        self.id
58    }
59
60    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
61        Some(Arc::new(LaunchedLocalhost))
62    }
63
64    fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
65        Arc::new(LaunchedLocalhost)
66    }
67
68    fn strategy_as_server<'a>(
69        &'a self,
70        connection_from: &dyn Host,
71        network_hint: PortNetworkHint,
72    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
73        if self.client_only {
74            anyhow::bail!("Localhost cannot be a server if it is client only")
75        }
76
77        if matches!(network_hint, PortNetworkHint::Auto)
78            && connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id))
79        {
80            Ok((
81                ClientStrategy::UnixSocket(self.id),
82                Box::new(|_| BaseServerStrategy::UnixSocket),
83            ))
84        } else if matches!(
85            network_hint,
86            PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
87        ) && connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self))
88        {
89            Ok((
90                ClientStrategy::InternalTcpPort(self),
91                Box::new(move |_| {
92                    BaseServerStrategy::InternalTcpPort(match network_hint {
93                        PortNetworkHint::Auto => None,
94                        PortNetworkHint::TcpPort(port) => port,
95                    })
96                }),
97            ))
98        } else {
99            anyhow::bail!("Could not find a strategy to connect to localhost")
100        }
101    }
102
103    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
104        match typ {
105            ClientStrategy::UnixSocket(id) => {
106                #[cfg(unix)]
107                {
108                    self.id == id
109                }
110
111                #[cfg(not(unix))]
112                {
113                    let _ = id;
114                    false
115                }
116            }
117            ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
118            ClientStrategy::ForwardedTcpPort(_) => true,
119        }
120    }
121}
122
123struct LaunchedLocalhost;
124
125#[async_trait]
126impl LaunchedHost for LaunchedLocalhost {
127    fn base_server_config(&self, bind_type: &BaseServerStrategy) -> ServerBindConfig {
128        match bind_type {
129            BaseServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
130            BaseServerStrategy::InternalTcpPort(port) => {
131                ServerBindConfig::TcpPort("127.0.0.1".to_string(), *port)
132            }
133            BaseServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
134        }
135    }
136
137    async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
138        Ok(())
139    }
140
141    async fn launch_binary(
142        &self,
143        id: String,
144        binary: &BuildOutput,
145        args: &[String],
146        tracing: Option<TracingOptions>,
147    ) -> Result<Box<dyn LaunchedBinary>> {
148        let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
149            if cfg!(any(target_os = "macos", target_family = "windows")) {
150                // samply
151                ProgressTracker::println(
152                    format!("[{id} tracing] Profiling binary with `samply`.",),
153                );
154                let samply_outfile = tempfile::NamedTempFile::new()?;
155
156                let mut command = Command::new("samply");
157                command
158                    .arg("record")
159                    .arg("--save-only")
160                    .arg("--output")
161                    .arg(samply_outfile.as_ref())
162                    .arg(&binary.bin_path)
163                    .args(args);
164                (Some(samply_outfile), command)
165            } else if cfg!(target_family = "unix") {
166                // perf
167                ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
168                let perf_outfile = tempfile::NamedTempFile::new()?;
169
170                let mut command = Command::new("perf");
171                command
172                    .args([
173                        "record",
174                        "-F",
175                        &tracing.frequency.to_string(),
176                        "-e",
177                        "cycles:u",
178                        "--call-graph",
179                        "dwarf,65528",
180                        "-o",
181                    ])
182                    .arg(perf_outfile.as_ref())
183                    .arg(&binary.bin_path)
184                    .args(args);
185
186                (Some(perf_outfile), command)
187            } else {
188                bail!(
189                    "Unknown OS for samply/perf tracing: {}",
190                    std::env::consts::OS
191                );
192            }
193        } else {
194            let mut command = Command::new(&binary.bin_path);
195            command.args(args);
196            (None, command)
197        };
198
199        // from cargo (https://github.com/rust-lang/cargo/blob/master/crates/cargo-util/src/paths.rs#L38)
200        let dylib_path_var = if cfg!(windows) {
201            "PATH"
202        } else if cfg!(target_os = "macos") {
203            "DYLD_FALLBACK_LIBRARY_PATH"
204        } else if cfg!(target_os = "aix") {
205            "LIBPATH"
206        } else {
207            "LD_LIBRARY_PATH"
208        };
209
210        let local_libdir = LOCAL_LIBDIR.get_or_init(|| {
211            std::process::Command::new("rustc")
212                .arg("--print")
213                .arg("target-libdir")
214                .output()
215                .map(|output| String::from_utf8(output.stdout).unwrap().trim().to_string())
216                .unwrap()
217        });
218
219        command.env(
220            dylib_path_var,
221            std::env::var_os(dylib_path_var).map_or_else(
222                || {
223                    std::env::join_paths(
224                        [
225                            binary.shared_library_path.as_ref(),
226                            Some(&std::path::PathBuf::from(local_libdir)),
227                        ]
228                        .into_iter()
229                        .flatten(),
230                    )
231                    .unwrap()
232                },
233                |paths| {
234                    let mut paths = std::env::split_paths(&paths).collect::<Vec<_>>();
235                    paths.insert(0, std::path::PathBuf::from(local_libdir));
236                    if let Some(shared_path) = &binary.shared_library_path {
237                        paths.insert(0, shared_path.to_path_buf());
238                    }
239                    std::env::join_paths(paths).unwrap()
240                },
241            ),
242        );
243
244        command
245            .stdin(Stdio::piped())
246            .stdout(Stdio::piped())
247            .stderr(Stdio::piped());
248
249        #[cfg(not(target_family = "unix"))]
250        command.kill_on_drop(true);
251
252        ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
253
254        let child = command.spawn().map_err(|e| {
255            let msg = if maybe_perf_outfile.is_some() && std::io::ErrorKind::NotFound == e.kind() {
256                "Tracing executable not found, ensure it is installed"
257            } else {
258                "Failed to execute command"
259            };
260            anyhow::Error::new(e).context(format!("{}: {:?}", msg, command))
261        })?;
262
263        Ok(Box::new(LaunchedLocalhostBinary::new(
264            child,
265            id,
266            tracing,
267            maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
268        )))
269    }
270
271    async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
272        Ok(*addr)
273    }
274}