hydro_deploy/localhost/
mod.rs1use std::net::SocketAddr;
2use std::sync::{Arc, OnceLock};
3
4use anyhow::{Result, bail};
5use async_process::{Command, Stdio};
6use async_trait::async_trait;
7use hydro_deploy_integration::ServerBindConfig;
8
9use crate::progress::ProgressTracker;
10use crate::rust_crate::build::BuildOutput;
11use crate::rust_crate::tracing_options::TracingOptions;
12use crate::{
13 BaseServerStrategy, ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary,
14 LaunchedHost, PortNetworkHint, ResourceBatch, ResourceResult,
15};
16
17pub mod launched_binary;
18pub use launched_binary::*;
19
20#[cfg(any(target_os = "macos", target_family = "windows"))]
21mod samply;
22
23static LOCAL_LIBDIR: OnceLock<String> = OnceLock::new();
24
25#[derive(Debug)]
26pub struct LocalhostHost {
27 pub id: usize,
28 client_only: bool,
29}
30
31impl LocalhostHost {
32 pub fn new(id: usize) -> LocalhostHost {
33 LocalhostHost {
34 id,
35 client_only: false,
36 }
37 }
38
39 pub fn client_only(&self) -> LocalhostHost {
40 LocalhostHost {
41 id: self.id,
42 client_only: true,
43 }
44 }
45}
46
47impl Host for LocalhostHost {
48 fn target_type(&self) -> HostTargetType {
49 HostTargetType::Local
50 }
51
52 fn request_port_base(&self, _bind_type: &BaseServerStrategy) {}
53 fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
54 fn request_custom_binary(&self) {}
55
56 fn id(&self) -> usize {
57 self.id
58 }
59
60 fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
61 Some(Arc::new(LaunchedLocalhost))
62 }
63
64 fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
65 Arc::new(LaunchedLocalhost)
66 }
67
68 fn strategy_as_server<'a>(
69 &'a self,
70 connection_from: &dyn Host,
71 network_hint: PortNetworkHint,
72 ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
73 if self.client_only {
74 anyhow::bail!("Localhost cannot be a server if it is client only")
75 }
76
77 if matches!(network_hint, PortNetworkHint::Auto)
78 && connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id))
79 {
80 Ok((
81 ClientStrategy::UnixSocket(self.id),
82 Box::new(|_| BaseServerStrategy::UnixSocket),
83 ))
84 } else if matches!(
85 network_hint,
86 PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
87 ) && connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self))
88 {
89 Ok((
90 ClientStrategy::InternalTcpPort(self),
91 Box::new(move |_| {
92 BaseServerStrategy::InternalTcpPort(match network_hint {
93 PortNetworkHint::Auto => None,
94 PortNetworkHint::TcpPort(port) => port,
95 })
96 }),
97 ))
98 } else {
99 anyhow::bail!("Could not find a strategy to connect to localhost")
100 }
101 }
102
103 fn can_connect_to(&self, typ: ClientStrategy) -> bool {
104 match typ {
105 ClientStrategy::UnixSocket(id) => {
106 #[cfg(unix)]
107 {
108 self.id == id
109 }
110
111 #[cfg(not(unix))]
112 {
113 let _ = id;
114 false
115 }
116 }
117 ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
118 ClientStrategy::ForwardedTcpPort(_) => true,
119 }
120 }
121}
122
123struct LaunchedLocalhost;
124
125#[async_trait]
126impl LaunchedHost for LaunchedLocalhost {
127 fn base_server_config(&self, bind_type: &BaseServerStrategy) -> ServerBindConfig {
128 match bind_type {
129 BaseServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
130 BaseServerStrategy::InternalTcpPort(port) => {
131 ServerBindConfig::TcpPort("127.0.0.1".to_string(), *port)
132 }
133 BaseServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
134 }
135 }
136
137 async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
138 Ok(())
139 }
140
141 async fn launch_binary(
142 &self,
143 id: String,
144 binary: &BuildOutput,
145 args: &[String],
146 tracing: Option<TracingOptions>,
147 ) -> Result<Box<dyn LaunchedBinary>> {
148 let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
149 if cfg!(any(target_os = "macos", target_family = "windows")) {
150 ProgressTracker::println(
152 format!("[{id} tracing] Profiling binary with `samply`.",),
153 );
154 let samply_outfile = tempfile::NamedTempFile::new()?;
155
156 let mut command = Command::new("samply");
157 command
158 .arg("record")
159 .arg("--save-only")
160 .arg("--output")
161 .arg(samply_outfile.as_ref())
162 .arg(&binary.bin_path)
163 .args(args);
164 (Some(samply_outfile), command)
165 } else if cfg!(target_family = "unix") {
166 ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
168 let perf_outfile = tempfile::NamedTempFile::new()?;
169
170 let mut command = Command::new("perf");
171 command
172 .args([
173 "record",
174 "-F",
175 &tracing.frequency.to_string(),
176 "-e",
177 "cycles:u",
178 "--call-graph",
179 "dwarf,65528",
180 "-o",
181 ])
182 .arg(perf_outfile.as_ref())
183 .arg(&binary.bin_path)
184 .args(args);
185
186 (Some(perf_outfile), command)
187 } else {
188 bail!(
189 "Unknown OS for samply/perf tracing: {}",
190 std::env::consts::OS
191 );
192 }
193 } else {
194 let mut command = Command::new(&binary.bin_path);
195 command.args(args);
196 (None, command)
197 };
198
199 let dylib_path_var = if cfg!(windows) {
201 "PATH"
202 } else if cfg!(target_os = "macos") {
203 "DYLD_FALLBACK_LIBRARY_PATH"
204 } else if cfg!(target_os = "aix") {
205 "LIBPATH"
206 } else {
207 "LD_LIBRARY_PATH"
208 };
209
210 let local_libdir = LOCAL_LIBDIR.get_or_init(|| {
211 std::process::Command::new("rustc")
212 .arg("--print")
213 .arg("target-libdir")
214 .output()
215 .map(|output| String::from_utf8(output.stdout).unwrap().trim().to_string())
216 .unwrap()
217 });
218
219 command.env(
220 dylib_path_var,
221 std::env::var_os(dylib_path_var).map_or_else(
222 || {
223 std::env::join_paths(
224 [
225 binary.shared_library_path.as_ref(),
226 Some(&std::path::PathBuf::from(local_libdir)),
227 ]
228 .into_iter()
229 .flatten(),
230 )
231 .unwrap()
232 },
233 |paths| {
234 let mut paths = std::env::split_paths(&paths).collect::<Vec<_>>();
235 paths.insert(0, std::path::PathBuf::from(local_libdir));
236 if let Some(shared_path) = &binary.shared_library_path {
237 paths.insert(0, shared_path.to_path_buf());
238 }
239 std::env::join_paths(paths).unwrap()
240 },
241 ),
242 );
243
244 command
245 .stdin(Stdio::piped())
246 .stdout(Stdio::piped())
247 .stderr(Stdio::piped());
248
249 #[cfg(not(target_family = "unix"))]
250 command.kill_on_drop(true);
251
252 ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
253
254 let child = command.spawn().map_err(|e| {
255 let msg = if maybe_perf_outfile.is_some() && std::io::ErrorKind::NotFound == e.kind() {
256 "Tracing executable not found, ensure it is installed"
257 } else {
258 "Failed to execute command"
259 };
260 anyhow::Error::new(e).context(format!("{}: {:?}", msg, command))
261 })?;
262
263 Ok(Box::new(LaunchedLocalhostBinary::new(
264 child,
265 id,
266 tracing,
267 maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
268 )))
269 }
270
271 async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
272 Ok(*addr)
273 }
274}