hydro_deploy/
custom_service.rs

1use std::any::TypeId;
2use std::ops::Deref;
3use std::sync::{Arc, OnceLock, Weak};
4
5use anyhow::{Result, bail};
6use async_trait::async_trait;
7use hydro_deploy_integration::ConnectedDirect;
8pub use hydro_deploy_integration::ServerPort;
9
10use crate::rust_crate::ports::{
11    ReverseSinkInstantiator, RustCrateServer, RustCrateSink, RustCrateSource, ServerConfig,
12    SourcePath,
13};
14use crate::{
15    BaseServerStrategy, Host, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, Service,
16};
17
18/// Represents an unknown, third-party service that is not part of the Hydroflow ecosystem.
19pub struct CustomService {
20    _id: usize,
21    on: Arc<dyn Host>,
22
23    /// The ports that the service wishes to expose to the public internet.
24    external_ports: Vec<u16>,
25
26    launched_host: OnceLock<Arc<dyn LaunchedHost>>,
27}
28
29impl CustomService {
30    pub fn new(id: usize, on: Arc<dyn Host>, external_ports: Vec<u16>) -> Self {
31        Self {
32            _id: id,
33            on,
34            external_ports,
35            launched_host: OnceLock::new(),
36        }
37    }
38
39    pub fn declare_client(self: &Arc<Self>) -> CustomClientPort {
40        CustomClientPort::new(Arc::downgrade(self), false)
41    }
42
43    pub fn declare_many_client(self: &Arc<Self>) -> CustomClientPort {
44        CustomClientPort::new(Arc::downgrade(self), true)
45    }
46}
47
48#[async_trait]
49impl Service for CustomService {
50    fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {
51        if self.launched_host.get().is_some() {
52            return;
53        }
54
55        let host = &self.on;
56
57        for port in self.external_ports.iter() {
58            host.request_port_base(&BaseServerStrategy::ExternalTcpPort(*port));
59        }
60    }
61
62    async fn deploy(&self, resource_result: &Arc<ResourceResult>) -> Result<()> {
63        self.launched_host.get_or_init(|| {
64            let host = &self.on;
65            let launched = host.provision(resource_result);
66            launched
67        });
68
69        Ok(())
70    }
71
72    async fn ready(&self) -> Result<()> {
73        Ok(())
74    }
75
76    async fn start(&self) -> Result<()> {
77        Ok(())
78    }
79
80    async fn stop(&self) -> Result<()> {
81        Ok(())
82    }
83}
84
85#[derive(Clone)]
86pub struct CustomClientPort {
87    pub on: Weak<CustomService>,
88    many: bool,
89    client_port: OnceLock<ServerConfig>,
90}
91
92impl CustomClientPort {
93    fn new(on: Weak<CustomService>, many: bool) -> Self {
94        Self {
95            on,
96            many,
97            client_port: OnceLock::new(),
98        }
99    }
100
101    pub async fn server_port(&self) -> ServerPort {
102        self.client_port
103            .get()
104            .unwrap()
105            .load_instantiated(&|p| p)
106            .await
107    }
108
109    pub async fn connect(&self) -> ConnectedDirect {
110        self.client_port
111            .get()
112            .unwrap()
113            .load_instantiated(&|p| p)
114            .await
115            .instantiate()
116            .await
117            .connect::<ConnectedDirect>()
118    }
119}
120
121impl RustCrateSource for CustomClientPort {
122    fn source_path(&self) -> SourcePath {
123        if self.many {
124            SourcePath::Many(self.on.upgrade().unwrap().on.clone())
125        } else {
126            SourcePath::Direct(self.on.upgrade().unwrap().on.clone())
127        }
128    }
129
130    fn host(&self) -> Arc<dyn Host> {
131        panic!("Custom services cannot be used as the server")
132    }
133
134    fn server(&self) -> Arc<dyn RustCrateServer> {
135        panic!("Custom services cannot be used as the server")
136    }
137
138    fn record_server_config(&self, config: ServerConfig) {
139        self.client_port
140            .set(config)
141            .map_err(drop) // `ServerConfig` doesn't implement `Debug` for `.expect()`.
142            .expect("Cannot call `record_server_config()` multiple times.");
143    }
144
145    fn record_server_strategy(&self, _config: ServerStrategy) {
146        panic!("Custom services cannot be used as the server")
147    }
148}
149
150impl RustCrateSink for CustomClientPort {
151    fn instantiate(&self, _client_path: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>> {
152        bail!("Custom services cannot be used as the server")
153    }
154
155    fn instantiate_reverse(
156        &self,
157        server_host: &Arc<dyn Host>,
158        server_sink: Arc<dyn RustCrateServer>,
159        wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
160    ) -> Result<ReverseSinkInstantiator> {
161        let client = self.on.upgrade().unwrap();
162
163        let server_host = server_host.clone();
164
165        let (conn_type, bind_type) =
166            server_host.strategy_as_server(client.on.deref(), crate::PortNetworkHint::Auto)?;
167
168        let client_port = wrap_client_port(ServerConfig::from_strategy(&conn_type, server_sink));
169
170        Ok(Box::new(move |me| {
171            assert_eq!(
172                me.type_id(),
173                TypeId::of::<CustomClientPort>(),
174                "`instantiate_reverse` received different type than expected."
175            );
176            me.downcast_ref::<CustomClientPort>()
177                .unwrap()
178                .record_server_config(client_port);
179            ServerStrategy::Direct(bind_type(&*server_host))
180        }))
181    }
182}