hydro_deploy/rust_crate/
ports.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::ops::Deref;
5use std::sync::{Arc, Weak};
6
7use anyhow::{Result, bail};
8use append_only_vec::AppendOnlyVec;
9use async_recursion::async_recursion;
10use dyn_clone::DynClone;
11use hydro_deploy_integration::ServerPort;
12use tokio::sync::RwLock;
13
14use super::RustCrateService;
15use crate::{ClientStrategy, Host, LaunchedHost, PortNetworkHint, ServerStrategy};
16
17pub trait RustCrateSource: Send + Sync {
18    fn source_path(&self) -> SourcePath;
19    fn record_server_config(&self, config: ServerConfig);
20
21    fn host(&self) -> Arc<dyn Host>;
22    fn server(&self) -> Arc<dyn RustCrateServer>;
23    fn record_server_strategy(&self, config: ServerStrategy);
24
25    fn wrap_reverse_server_config(&self, config: ServerConfig) -> ServerConfig {
26        config
27    }
28
29    fn send_to(&self, sink: &dyn RustCrateSink) {
30        let forward_res = sink.instantiate(&self.source_path());
31        if let Ok(instantiated) = forward_res {
32            self.record_server_config(instantiated());
33        } else {
34            drop(forward_res);
35            let instantiated = sink
36                .instantiate_reverse(&self.host(), self.server(), &|p| {
37                    self.wrap_reverse_server_config(p)
38                })
39                .unwrap();
40            self.record_server_strategy(instantiated(sink));
41        }
42    }
43}
44
45pub trait RustCrateServer: DynClone + Debug + Send + Sync {
46    fn get_port(&self) -> ServerPort;
47    fn launched_host(&self) -> Arc<dyn LaunchedHost>;
48}
49
50pub type ReverseSinkInstantiator = Box<dyn FnOnce(&dyn Any) -> ServerStrategy>;
51
52pub trait RustCrateSink: Any + Send + Sync {
53    /// Instantiate the sink as the source host connecting to the sink host.
54    /// Returns a thunk that can be called to perform mutations that instantiate the sink.
55    fn instantiate(&self, client_path: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>>;
56
57    /// Instantiate the sink, but as the sink host connecting to the source host.
58    /// Returns a thunk that can be called to perform mutations that instantiate the sink, taking a mutable reference to this sink.
59    fn instantiate_reverse(
60        &self,
61        server_host: &Arc<dyn Host>,
62        server_sink: Arc<dyn RustCrateServer>,
63        wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
64    ) -> Result<ReverseSinkInstantiator>;
65}
66
67pub struct TaggedSource {
68    pub source: Arc<dyn RustCrateSource>,
69    pub tag: u32,
70}
71
72impl RustCrateSource for TaggedSource {
73    fn source_path(&self) -> SourcePath {
74        SourcePath::Tagged(Box::new(self.source.source_path()), self.tag)
75    }
76
77    fn record_server_config(&self, config: ServerConfig) {
78        self.source.record_server_config(config);
79    }
80
81    fn host(&self) -> Arc<dyn Host> {
82        self.source.host()
83    }
84
85    fn server(&self) -> Arc<dyn RustCrateServer> {
86        self.source.server()
87    }
88
89    fn wrap_reverse_server_config(&self, config: ServerConfig) -> ServerConfig {
90        ServerConfig::Tagged(Box::new(config), self.tag)
91    }
92
93    fn record_server_strategy(&self, config: ServerStrategy) {
94        self.source.record_server_strategy(config);
95    }
96}
97
98pub struct NullSourceSink;
99
100impl RustCrateSource for NullSourceSink {
101    fn source_path(&self) -> SourcePath {
102        SourcePath::Null
103    }
104
105    fn host(&self) -> Arc<dyn Host> {
106        panic!("null source has no host")
107    }
108
109    fn server(&self) -> Arc<dyn RustCrateServer> {
110        panic!("null source has no server")
111    }
112
113    fn record_server_config(&self, _config: ServerConfig) {}
114    fn record_server_strategy(&self, _config: ServerStrategy) {}
115}
116
117impl RustCrateSink for NullSourceSink {
118    fn instantiate(&self, _client_path: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>> {
119        Ok(Box::new(|| ServerConfig::Null))
120    }
121
122    fn instantiate_reverse(
123        &self,
124        _server_host: &Arc<dyn Host>,
125        _server_sink: Arc<dyn RustCrateServer>,
126        _wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
127    ) -> Result<ReverseSinkInstantiator> {
128        Ok(Box::new(|_| ServerStrategy::Null))
129    }
130}
131
132pub struct DemuxSink {
133    pub demux: HashMap<u32, Arc<dyn RustCrateSink>>,
134}
135
136impl RustCrateSink for DemuxSink {
137    fn instantiate(&self, client_host: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>> {
138        let mut thunk_map = HashMap::new();
139        for (key, target) in &self.demux {
140            thunk_map.insert(*key, target.instantiate(client_host)?);
141        }
142
143        Ok(Box::new(move || {
144            let instantiated_map = thunk_map
145                .into_iter()
146                .map(|(key, thunk)| (key, (thunk)()))
147                .collect();
148
149            ServerConfig::Demux(instantiated_map)
150        }))
151    }
152
153    fn instantiate_reverse(
154        &self,
155        server_host: &Arc<dyn Host>,
156        server_sink: Arc<dyn RustCrateServer>,
157        wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
158    ) -> Result<ReverseSinkInstantiator> {
159        let mut thunk_map = HashMap::new();
160        for (key, target) in &self.demux {
161            thunk_map.insert(
162                *key,
163                target.instantiate_reverse(
164                    server_host,
165                    server_sink.clone(),
166                    // the parent wrapper selects the demux port for the parent defn, so do that first
167                    &|p| ServerConfig::DemuxSelect(Box::new(wrap_client_port(p)), *key),
168                )?,
169            );
170        }
171
172        Ok(Box::new(move |me| {
173            let me = me.downcast_ref::<DemuxSink>().unwrap();
174            let instantiated_map = thunk_map
175                .into_iter()
176                .map(|(key, thunk)| (key, (thunk)(me.demux.get(&key).unwrap())))
177                .collect();
178
179            ServerStrategy::Demux(instantiated_map)
180        }))
181    }
182}
183
184#[derive(Clone, Debug)]
185pub struct RustCratePortConfig {
186    pub service: Weak<RustCrateService>,
187    pub service_host: Arc<dyn Host>,
188    pub service_server_defns: Arc<RwLock<HashMap<String, ServerPort>>>,
189    pub network_hint: PortNetworkHint,
190    pub port: String,
191    pub merge: bool,
192}
193
194impl RustCratePortConfig {
195    pub fn merge(mut self) -> Self {
196        self.merge = true;
197        self
198    }
199}
200
201impl RustCrateSource for RustCratePortConfig {
202    fn source_path(&self) -> SourcePath {
203        SourcePath::Direct(self.service.upgrade().unwrap().on.clone())
204    }
205
206    fn host(&self) -> Arc<dyn Host> {
207        self.service_host.clone()
208    }
209
210    fn server(&self) -> Arc<dyn RustCrateServer> {
211        let from = self.service.upgrade().unwrap();
212
213        Arc::new(RustCratePortConfig {
214            service: Arc::downgrade(&from),
215            service_host: from.on.clone(),
216            service_server_defns: from.server_defns.clone(),
217            network_hint: self.network_hint,
218            port: self.port.clone(),
219            merge: false,
220        })
221    }
222
223    fn record_server_config(&self, config: ServerConfig) {
224        let from = self.service.upgrade().unwrap();
225        // TODO(shadaj): if already in this map, we want to broadcast
226        assert!(
227            from.port_to_server.insert(self.port.clone(), config),
228            "The port configuration is incorrect, for example, are you using a ConnectedDirect instead of a ConnectedDemux?"
229        );
230    }
231
232    fn record_server_strategy(&self, config: ServerStrategy) {
233        let from = self.service.upgrade().unwrap();
234        assert!(
235            from.port_to_bind.insert(self.port.clone(), config),
236            "port already set!"
237        );
238    }
239}
240
241impl RustCrateServer for RustCratePortConfig {
242    fn get_port(&self) -> ServerPort {
243        // we are in `deployment.start()`, so no one should be writing
244        let server_defns = self.service_server_defns.try_read().unwrap();
245        server_defns.get(&self.port).unwrap().clone()
246    }
247
248    fn launched_host(&self) -> Arc<dyn LaunchedHost> {
249        self.service_host.launched().unwrap()
250    }
251}
252
253pub enum SourcePath {
254    Null,
255    Direct(Arc<dyn Host>),
256    Many(Arc<dyn Host>),
257    Tagged(Box<SourcePath>, u32),
258}
259
260impl SourcePath {
261    #[expect(
262        clippy::type_complexity,
263        reason = "internals (dyn Fn to defer instantiation)"
264    )]
265    fn plan<T: RustCrateServer + Clone + 'static>(
266        &self,
267        server: &T,
268        server_host: &dyn Host,
269        network_hint: PortNetworkHint,
270    ) -> Result<(Box<dyn FnOnce(&dyn Any) -> ServerStrategy>, ServerConfig)> {
271        match self {
272            SourcePath::Direct(client_host) => {
273                let (conn_type, bind_type) =
274                    server_host.strategy_as_server(client_host.deref(), network_hint)?;
275                let base_config = ServerConfig::from_strategy(&conn_type, Arc::new(server.clone()));
276                Ok((
277                    Box::new(|host| ServerStrategy::Direct(bind_type(host))),
278                    base_config,
279                ))
280            }
281
282            SourcePath::Many(client_host) => {
283                let (conn_type, bind_type) =
284                    server_host.strategy_as_server(client_host.deref(), network_hint)?;
285                let base_config = ServerConfig::from_strategy(&conn_type, Arc::new(server.clone()));
286                Ok((
287                    Box::new(|host| ServerStrategy::Many(bind_type(host))),
288                    base_config,
289                ))
290            }
291
292            SourcePath::Tagged(underlying, tag) => {
293                let (bind_type, base_config) =
294                    underlying.plan(server, server_host, network_hint)?;
295                let tag = *tag;
296                Ok((
297                    Box::new(move |host| ServerStrategy::Tagged(Box::new(bind_type(host)), tag)),
298                    ServerConfig::TaggedUnwrap(Box::new(base_config)),
299                ))
300            }
301
302            SourcePath::Null => Ok((Box::new(|_| ServerStrategy::Null), ServerConfig::Null)),
303        }
304    }
305}
306
307impl RustCrateSink for RustCratePortConfig {
308    fn instantiate(&self, client_path: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>> {
309        let server = self.service.upgrade().unwrap();
310
311        let server_host = server.on.clone();
312
313        let (bind_type, base_config) =
314            client_path.plan(self, server_host.deref(), self.network_hint)?;
315
316        let server = server.clone();
317        let merge = self.merge;
318        let port = self.port.clone();
319        Ok(Box::new(move || {
320            let bind_type = (bind_type)(&*server.on);
321
322            if merge {
323                let merge_config = server
324                    .port_to_bind
325                    .get_or_insert_owned(port, || ServerStrategy::Merge(Default::default()));
326                let ServerStrategy::Merge(merge) = merge_config else {
327                    panic!("Expected a merge connection definition")
328                };
329                merge.push(bind_type);
330                ServerConfig::MergeSelect(Box::new(base_config), merge.len() - 1)
331            } else {
332                assert!(
333                    server.port_to_bind.insert(port.clone(), bind_type),
334                    "port already set!"
335                );
336                base_config
337            }
338        }))
339    }
340
341    fn instantiate_reverse(
342        &self,
343        server_host: &Arc<dyn Host>,
344        server_sink: Arc<dyn RustCrateServer>,
345        wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
346    ) -> Result<ReverseSinkInstantiator> {
347        if !matches!(self.network_hint, PortNetworkHint::Auto) {
348            bail!("Trying to form collection where I am the client, but I have server hint")
349        }
350
351        let client = self.service.upgrade().unwrap();
352
353        let server_host = server_host.clone();
354
355        let (conn_type, bind_type) =
356            server_host.strategy_as_server(&*client.on, PortNetworkHint::Auto)?;
357        let client_port = wrap_client_port(ServerConfig::from_strategy(&conn_type, server_sink));
358
359        let client = client.clone();
360        let merge = self.merge;
361        let port = self.port.clone();
362        Ok(Box::new(move |_| {
363            if merge {
364                let merge_config = client
365                    .port_to_server
366                    .get_or_insert_owned(port, || ServerConfig::Merge(Default::default()));
367                let ServerConfig::Merge(merge) = merge_config else {
368                    panic!()
369                };
370                merge.push(client_port);
371            } else {
372                assert!(
373                    client.port_to_server.insert(port.clone(), client_port),
374                    "port already set!"
375                );
376            };
377
378            ServerStrategy::Direct((bind_type)(&*client.on))
379        }))
380    }
381}
382
383#[derive(Clone, Debug)]
384pub enum ServerConfig {
385    Direct(Arc<dyn RustCrateServer>),
386    Forwarded(Arc<dyn RustCrateServer>),
387    /// A demux that will be used at runtime to listen to many connections.
388    Demux(HashMap<u32, ServerConfig>),
389    /// The other side of a demux, with a port to extract the appropriate connection.
390    DemuxSelect(Box<ServerConfig>, u32),
391    /// A merge that will be used at runtime to combine many connections.
392    /// AppendOnlyVec has a quite large inline array, so we box it.
393    Merge(Box<AppendOnlyVec<ServerConfig>>),
394    /// The other side of a merge, with a port to extract the appropriate connection.
395    MergeSelect(Box<ServerConfig>, usize),
396    Tagged(Box<ServerConfig>, u32),
397    TaggedUnwrap(Box<ServerConfig>),
398    Null,
399}
400
401impl ServerConfig {
402    pub fn from_strategy(
403        strategy: &ClientStrategy,
404        server: Arc<dyn RustCrateServer>,
405    ) -> ServerConfig {
406        match strategy {
407            ClientStrategy::UnixSocket(_) | ClientStrategy::InternalTcpPort(_) => {
408                ServerConfig::Direct(server)
409            }
410            ClientStrategy::ForwardedTcpPort(_) => ServerConfig::Forwarded(server),
411        }
412    }
413}
414
415#[async_recursion]
416async fn forward_connection(conn: &ServerPort, target: &dyn LaunchedHost) -> ServerPort {
417    match conn {
418        ServerPort::UnixSocket(_) => panic!("Expected a TCP port to be forwarded"),
419        ServerPort::TcpPort(addr) => ServerPort::TcpPort(target.forward_port(addr).await.unwrap()),
420        ServerPort::Demux(demux) => {
421            let mut forwarded_map = HashMap::new();
422            for (key, conn) in demux {
423                forwarded_map.insert(*key, forward_connection(conn, target).await);
424            }
425            ServerPort::Demux(forwarded_map)
426        }
427        ServerPort::Merge(merge) => {
428            let mut forwarded_vec = Vec::new();
429            for conn in merge {
430                forwarded_vec.push(forward_connection(conn, target).await);
431            }
432            ServerPort::Merge(forwarded_vec)
433        }
434        ServerPort::Tagged(underlying, id) => {
435            ServerPort::Tagged(Box::new(forward_connection(underlying, target).await), *id)
436        }
437        ServerPort::Null => ServerPort::Null,
438    }
439}
440
441impl ServerConfig {
442    #[async_recursion]
443    pub async fn load_instantiated(
444        &self,
445        select: &(dyn Fn(ServerPort) -> ServerPort + Send + Sync),
446    ) -> ServerPort {
447        match self {
448            ServerConfig::Direct(server) => select(server.get_port()),
449
450            ServerConfig::Forwarded(server) => {
451                let selected = select(server.get_port());
452                forward_connection(&selected, server.launched_host().as_ref()).await
453            }
454
455            ServerConfig::Demux(demux) => {
456                let mut demux_map = HashMap::new();
457                for (key, conn) in demux {
458                    demux_map.insert(*key, conn.load_instantiated(select).await);
459                }
460                ServerPort::Demux(demux_map)
461            }
462
463            ServerConfig::DemuxSelect(underlying, key) => {
464                let key = *key;
465                underlying
466                    .load_instantiated(
467                        &(move |p| {
468                            if let ServerPort::Demux(mut mapping) = p {
469                                select(mapping.remove(&key).unwrap())
470                            } else {
471                                panic!("Expected a demux connection definition")
472                            }
473                        }),
474                    )
475                    .await
476            }
477
478            ServerConfig::Merge(merge) => {
479                let mut merge_vec = Vec::new();
480                for conn in merge.iter() {
481                    merge_vec.push(conn.load_instantiated(select).await);
482                }
483                ServerPort::Merge(merge_vec)
484            }
485
486            ServerConfig::MergeSelect(underlying, key) => {
487                let key = *key;
488                underlying
489                    .load_instantiated(
490                        &(move |p| {
491                            if let ServerPort::Merge(mut mapping) = p {
492                                select(mapping.remove(key))
493                            } else {
494                                panic!("Expected a merge connection definition")
495                            }
496                        }),
497                    )
498                    .await
499            }
500
501            ServerConfig::Tagged(underlying, id) => {
502                ServerPort::Tagged(Box::new(underlying.load_instantiated(select).await), *id)
503            }
504
505            ServerConfig::TaggedUnwrap(underlying) => {
506                let loaded = underlying.load_instantiated(select).await;
507                if let ServerPort::Tagged(underlying, _) = loaded {
508                    *underlying
509                } else {
510                    panic!("Expected a tagged connection definition")
511                }
512            }
513
514            ServerConfig::Null => ServerPort::Null,
515        }
516    }
517}