Skip to main content

hydro_deploy/
lib.rs

1use std::any::Any;
2use std::collections::{BTreeMap, HashMap};
3use std::fmt::Debug;
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use anyhow::Result;
8use append_only_vec::AppendOnlyVec;
9use async_trait::async_trait;
10use hydro_deploy_integration::ServerBindConfig;
11use rust_crate::build::BuildOutput;
12use rust_crate::tracing_options::TracingOptions;
13use tokio::sync::{mpsc, oneshot};
14
15pub mod deployment;
16pub use deployment::Deployment;
17
18pub mod progress;
19
20pub mod localhost;
21pub use localhost::LocalhostHost;
22
23pub mod ssh;
24
25pub mod gcp;
26pub use gcp::GcpComputeEngineHost;
27
28pub mod azure;
29pub use azure::AzureHost;
30
31pub mod aws;
32pub use aws::{AwsEc2Host, AwsNetwork};
33
34pub mod rust_crate;
35pub use rust_crate::RustCrate;
36
37pub mod custom_service;
38pub use custom_service::CustomService;
39
40pub mod terraform;
41
42pub mod util;
43
44#[derive(Default)]
45pub struct ResourcePool {
46    pub terraform: terraform::TerraformPool,
47}
48
49pub struct ResourceBatch {
50    pub terraform: terraform::TerraformBatch,
51}
52
53impl ResourceBatch {
54    fn new() -> ResourceBatch {
55        ResourceBatch {
56            terraform: terraform::TerraformBatch::default(),
57        }
58    }
59
60    async fn provision(
61        self,
62        pool: &mut ResourcePool,
63        last_result: Option<Arc<ResourceResult>>,
64    ) -> Result<ResourceResult> {
65        Ok(ResourceResult {
66            terraform: self.terraform.provision(&mut pool.terraform).await?,
67            _last_result: last_result,
68        })
69    }
70}
71
72#[derive(Debug)]
73pub struct ResourceResult {
74    pub terraform: terraform::TerraformResult,
75    _last_result: Option<Arc<ResourceResult>>,
76}
77
78#[cfg(feature = "profile-folding")]
79#[derive(Clone, Debug)]
80pub struct TracingResults {
81    pub folded_data: Vec<u8>,
82}
83
84#[async_trait]
85pub trait LaunchedBinary: Send + Sync {
86    fn stdin(&self) -> mpsc::UnboundedSender<String>;
87
88    /// Provides a oneshot channel to handshake with the binary,
89    /// with the guarantee that as long as deploy is holding on
90    /// to a handle, none of the messages will also be broadcast
91    /// to the user-facing [`LaunchedBinary::stdout`] channel.
92    fn deploy_stdout(&self) -> oneshot::Receiver<String>;
93
94    fn stdout(&self) -> mpsc::UnboundedReceiver<String>;
95    fn stderr(&self) -> mpsc::UnboundedReceiver<String>;
96    fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
97    fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
98
99    #[cfg(feature = "profile-folding")]
100    fn tracing_results(&self) -> Option<&TracingResults>;
101
102    fn exit_code(&self) -> Option<i32>;
103
104    /// Wait for the process to stop on its own. Returns the exit code.
105    async fn wait(&self) -> Result<i32>;
106    /// If the process is still running, force stop it. Then run post-run tasks.
107    async fn stop(&self) -> Result<()>;
108}
109
110#[async_trait]
111pub trait LaunchedHost: Send + Sync {
112    /// Given a pre-selected network type, computes concrete information needed for a service
113    /// to listen to network connections (such as the IP address to bind to).
114    fn base_server_config(&self, strategy: &BaseServerStrategy) -> ServerBindConfig;
115
116    fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig {
117        match strategy {
118            ServerStrategy::Direct(b) => self.base_server_config(b),
119            ServerStrategy::Many(b) => {
120                ServerBindConfig::MultiConnection(Box::new(self.base_server_config(b)))
121            }
122            ServerStrategy::Demux(demux) => ServerBindConfig::Demux(
123                demux
124                    .iter()
125                    .map(|(key, underlying)| (*key, self.server_config(underlying)))
126                    .collect(),
127            ),
128            ServerStrategy::Merge(merge) => ServerBindConfig::Merge(
129                merge
130                    .iter()
131                    .map(|underlying| self.server_config(underlying))
132                    .collect(),
133            ),
134            ServerStrategy::Tagged(underlying, id) => {
135                ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id)
136            }
137            ServerStrategy::Null => ServerBindConfig::Null,
138        }
139    }
140
141    async fn copy_binary(&self, binary: &BuildOutput) -> Result<()>;
142
143    async fn launch_binary(
144        &self,
145        id: String,
146        binary: &BuildOutput,
147        args: &[String],
148        perf: Option<TracingOptions>,
149        env: &HashMap<String, String>,
150        pin_to_core: Option<usize>,
151    ) -> Result<Box<dyn LaunchedBinary>>;
152
153    async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
154}
155
156pub enum BaseServerStrategy {
157    UnixSocket,
158    InternalTcpPort(Option<u16>),
159    ExternalTcpPort(
160        /// The port number to bind to, which must be explicit to open the firewall.
161        u16,
162    ),
163}
164
165/// Types of connection that a service can receive when configured as the server.
166pub enum ServerStrategy {
167    Direct(BaseServerStrategy),
168    Many(BaseServerStrategy),
169    Demux(BTreeMap<u32, ServerStrategy>),
170    /// AppendOnlyVec has a quite large inline array, so we box it.
171    Merge(Box<AppendOnlyVec<ServerStrategy>>),
172    Tagged(Box<ServerStrategy>, u32),
173    Null,
174}
175
176/// Like BindType, but includes metadata for determining whether a connection is possible.
177pub enum ClientStrategy<'a> {
178    UnixSocket(
179        /// Unique identifier for the host this socket will be on.
180        usize,
181    ),
182    InternalTcpPort(
183        /// The host that this port is available on.
184        &'a dyn Host,
185    ),
186    ForwardedTcpPort(
187        /// The host that this port is available on.
188        &'a dyn Host,
189    ),
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
193pub enum HostTargetType {
194    Local,
195    Linux(LinuxCompileType),
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
199pub enum LinuxCompileType {
200    Glibc,
201    Musl,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
205pub enum PortNetworkHint {
206    Auto,
207    TcpPort(Option<u16>),
208}
209
210pub type HostStrategyGetter = Box<dyn FnOnce(&dyn Any) -> BaseServerStrategy>;
211
212pub trait Host: Any + Send + Sync + Debug {
213    fn target_type(&self) -> HostTargetType;
214
215    fn request_port_base(&self, bind_type: &BaseServerStrategy);
216
217    fn request_port(&self, bind_type: &ServerStrategy) {
218        match bind_type {
219            ServerStrategy::Direct(base) => self.request_port_base(base),
220            ServerStrategy::Many(base) => self.request_port_base(base),
221            ServerStrategy::Demux(demux) => {
222                for bind_type in demux.values() {
223                    self.request_port(bind_type);
224                }
225            }
226            ServerStrategy::Merge(merge) => {
227                for bind_type in merge.iter() {
228                    self.request_port(bind_type);
229                }
230            }
231            ServerStrategy::Tagged(underlying, _) => {
232                self.request_port(underlying);
233            }
234            ServerStrategy::Null => {}
235        }
236    }
237
238    /// An identifier for this host, which is unique within a deployment.
239    fn id(&self) -> usize;
240
241    /// Configures the host to support copying and running a custom binary.
242    fn request_custom_binary(&self);
243
244    /// Makes requests for physical resources (servers) that this host needs to run.
245    ///
246    /// This should be called before `provision` is called.
247    fn collect_resources(&self, resource_batch: &mut ResourceBatch);
248
249    /// Connects to the acquired resources and prepares the host to run services.
250    ///
251    /// This should be called after `collect_resources` is called.
252    fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost>;
253
254    fn launched(&self) -> Option<Arc<dyn LaunchedHost>>;
255
256    /// Identifies a network type that this host can use for connections if it is the server.
257    /// The host will be `None` if the connection is from the same host as the target.
258    fn strategy_as_server<'a>(
259        &'a self,
260        connection_from: &dyn Host,
261        server_tcp_port_hint: PortNetworkHint,
262    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)>;
263
264    /// Determines whether this host can connect to another host using the given strategy.
265    fn can_connect_to(&self, typ: ClientStrategy) -> bool;
266}
267
268#[async_trait]
269pub trait Service: Send + Sync {
270    /// Makes requests for physical resources server ports that this service needs to run.
271    /// This should **not** recursively call `collect_resources` on the host, since
272    /// we guarantee that `collect_resources` is only called once per host.
273    ///
274    /// This should also perform any "free", non-blocking computations (compilations),
275    /// because the `deploy` method will be called after these resources are allocated.
276    fn collect_resources(&self, resource_batch: &mut ResourceBatch);
277
278    /// Connects to the acquired resources and prepares the service to be launched.
279    async fn deploy(&self, resource_result: &Arc<ResourceResult>) -> Result<()>;
280
281    /// Launches the service, which should start listening for incoming network
282    /// connections. The service should not start computing at this point.
283    async fn ready(&self) -> Result<()>;
284
285    /// Starts the service by having it connect to other services and start computations.
286    async fn start(&self) -> Result<()>;
287
288    /// Stops the service by having it disconnect from other services and stop computations.
289    async fn stop(&self) -> Result<()>;
290}
291
292pub trait ServiceBuilder {
293    type Service: Service + 'static;
294    fn build(self, id: usize, on: Arc<dyn Host>) -> Self::Service;
295}
296
297impl<S: Service + 'static, This: FnOnce(usize, Arc<dyn Host>) -> S> ServiceBuilder for This {
298    type Service = S;
299    fn build(self, id: usize, on: Arc<dyn Host>) -> Self::Service {
300        (self)(id, on)
301    }
302}