1use std::any::Any;
2use std::collections::{BTreeMap, HashMap};
3use std::fmt::Debug;
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use anyhow::Result;
8use append_only_vec::AppendOnlyVec;
9use async_trait::async_trait;
10use hydro_deploy_integration::ServerBindConfig;
11use rust_crate::build::BuildOutput;
12use rust_crate::tracing_options::TracingOptions;
13use tokio::sync::{mpsc, oneshot};
14
15pub mod deployment;
16pub use deployment::Deployment;
17
18pub mod progress;
19
20pub mod localhost;
21pub use localhost::LocalhostHost;
22
23pub mod ssh;
24
25pub mod gcp;
26pub use gcp::GcpComputeEngineHost;
27
28pub mod azure;
29pub use azure::AzureHost;
30
31pub mod aws;
32pub use aws::{AwsEc2Host, AwsNetwork};
33
34pub mod rust_crate;
35pub use rust_crate::RustCrate;
36
37pub mod custom_service;
38pub use custom_service::CustomService;
39
40pub mod terraform;
41
42pub mod util;
43
44#[derive(Default)]
45pub struct ResourcePool {
46 pub terraform: terraform::TerraformPool,
47}
48
49pub struct ResourceBatch {
50 pub terraform: terraform::TerraformBatch,
51}
52
53impl ResourceBatch {
54 fn new() -> ResourceBatch {
55 ResourceBatch {
56 terraform: terraform::TerraformBatch::default(),
57 }
58 }
59
60 async fn provision(
61 self,
62 pool: &mut ResourcePool,
63 last_result: Option<Arc<ResourceResult>>,
64 ) -> Result<ResourceResult> {
65 Ok(ResourceResult {
66 terraform: self.terraform.provision(&mut pool.terraform).await?,
67 _last_result: last_result,
68 })
69 }
70}
71
72#[derive(Debug)]
73pub struct ResourceResult {
74 pub terraform: terraform::TerraformResult,
75 _last_result: Option<Arc<ResourceResult>>,
76}
77
78#[cfg(feature = "profile-folding")]
79#[derive(Clone, Debug)]
80pub struct TracingResults {
81 pub folded_data: Vec<u8>,
82}
83
84#[async_trait]
85pub trait LaunchedBinary: Send + Sync {
86 fn stdin(&self) -> mpsc::UnboundedSender<String>;
87
88 fn deploy_stdout(&self) -> oneshot::Receiver<String>;
93
94 fn stdout(&self) -> mpsc::UnboundedReceiver<String>;
95 fn stderr(&self) -> mpsc::UnboundedReceiver<String>;
96 fn stdout_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
97 fn stderr_filter(&self, prefix: String) -> mpsc::UnboundedReceiver<String>;
98
99 #[cfg(feature = "profile-folding")]
100 fn tracing_results(&self) -> Option<&TracingResults>;
101
102 fn exit_code(&self) -> Option<i32>;
103
104 async fn wait(&self) -> Result<i32>;
106 async fn stop(&self) -> Result<()>;
108}
109
110#[async_trait]
111pub trait LaunchedHost: Send + Sync {
112 fn base_server_config(&self, strategy: &BaseServerStrategy) -> ServerBindConfig;
115
116 fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig {
117 match strategy {
118 ServerStrategy::Direct(b) => self.base_server_config(b),
119 ServerStrategy::Many(b) => {
120 ServerBindConfig::MultiConnection(Box::new(self.base_server_config(b)))
121 }
122 ServerStrategy::Demux(demux) => ServerBindConfig::Demux(
123 demux
124 .iter()
125 .map(|(key, underlying)| (*key, self.server_config(underlying)))
126 .collect(),
127 ),
128 ServerStrategy::Merge(merge) => ServerBindConfig::Merge(
129 merge
130 .iter()
131 .map(|underlying| self.server_config(underlying))
132 .collect(),
133 ),
134 ServerStrategy::Tagged(underlying, id) => {
135 ServerBindConfig::Tagged(Box::new(self.server_config(underlying)), *id)
136 }
137 ServerStrategy::Null => ServerBindConfig::Null,
138 }
139 }
140
141 async fn copy_binary(&self, binary: &BuildOutput) -> Result<()>;
142
143 async fn launch_binary(
144 &self,
145 id: String,
146 binary: &BuildOutput,
147 args: &[String],
148 perf: Option<TracingOptions>,
149 env: &HashMap<String, String>,
150 pin_to_core: Option<usize>,
151 ) -> Result<Box<dyn LaunchedBinary>>;
152
153 async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
154}
155
156pub enum BaseServerStrategy {
157 UnixSocket,
158 InternalTcpPort(Option<u16>),
159 ExternalTcpPort(
160 u16,
162 ),
163}
164
165pub enum ServerStrategy {
167 Direct(BaseServerStrategy),
168 Many(BaseServerStrategy),
169 Demux(BTreeMap<u32, ServerStrategy>),
170 Merge(Box<AppendOnlyVec<ServerStrategy>>),
172 Tagged(Box<ServerStrategy>, u32),
173 Null,
174}
175
176pub enum ClientStrategy<'a> {
178 UnixSocket(
179 usize,
181 ),
182 InternalTcpPort(
183 &'a dyn Host,
185 ),
186 ForwardedTcpPort(
187 &'a dyn Host,
189 ),
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
193pub enum HostTargetType {
194 Local,
195 Linux(LinuxCompileType),
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
199pub enum LinuxCompileType {
200 Glibc,
201 Musl,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
205pub enum PortNetworkHint {
206 Auto,
207 TcpPort(Option<u16>),
208}
209
210pub type HostStrategyGetter = Box<dyn FnOnce(&dyn Any) -> BaseServerStrategy>;
211
212pub trait Host: Any + Send + Sync + Debug {
213 fn target_type(&self) -> HostTargetType;
214
215 fn request_port_base(&self, bind_type: &BaseServerStrategy);
216
217 fn request_port(&self, bind_type: &ServerStrategy) {
218 match bind_type {
219 ServerStrategy::Direct(base) => self.request_port_base(base),
220 ServerStrategy::Many(base) => self.request_port_base(base),
221 ServerStrategy::Demux(demux) => {
222 for bind_type in demux.values() {
223 self.request_port(bind_type);
224 }
225 }
226 ServerStrategy::Merge(merge) => {
227 for bind_type in merge.iter() {
228 self.request_port(bind_type);
229 }
230 }
231 ServerStrategy::Tagged(underlying, _) => {
232 self.request_port(underlying);
233 }
234 ServerStrategy::Null => {}
235 }
236 }
237
238 fn id(&self) -> usize;
240
241 fn request_custom_binary(&self);
243
244 fn collect_resources(&self, resource_batch: &mut ResourceBatch);
248
249 fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost>;
253
254 fn launched(&self) -> Option<Arc<dyn LaunchedHost>>;
255
256 fn strategy_as_server<'a>(
259 &'a self,
260 connection_from: &dyn Host,
261 server_tcp_port_hint: PortNetworkHint,
262 ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)>;
263
264 fn can_connect_to(&self, typ: ClientStrategy) -> bool;
266}
267
268#[async_trait]
269pub trait Service: Send + Sync {
270 fn collect_resources(&self, resource_batch: &mut ResourceBatch);
277
278 async fn deploy(&self, resource_result: &Arc<ResourceResult>) -> Result<()>;
280
281 async fn ready(&self) -> Result<()>;
284
285 async fn start(&self) -> Result<()>;
287
288 async fn stop(&self) -> Result<()>;
290}
291
292pub trait ServiceBuilder {
293 type Service: Service + 'static;
294 fn build(self, id: usize, on: Arc<dyn Host>) -> Self::Service;
295}
296
297impl<S: Service + 'static, This: FnOnce(usize, Arc<dyn Host>) -> S> ServiceBuilder for This {
298 type Service = S;
299 fn build(self, id: usize, on: Arc<dyn Host>) -> Self::Service {
300 (self)(id, on)
301 }
302}