hydro_deploy/
custom_service.rs1use std::any::TypeId;
2use std::ops::Deref;
3use std::sync::{Arc, OnceLock, Weak};
4
5use anyhow::{Result, bail};
6use async_trait::async_trait;
7use hydro_deploy_integration::ConnectedDirect;
8pub use hydro_deploy_integration::ServerPort;
9
10use crate::rust_crate::ports::{
11 ReverseSinkInstantiator, RustCrateServer, RustCrateSink, RustCrateSource, ServerConfig,
12 SourcePath,
13};
14use crate::{
15 BaseServerStrategy, Host, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, Service,
16};
17
18pub struct CustomService {
20 _id: usize,
21 on: Arc<dyn Host>,
22
23 external_ports: Vec<u16>,
25
26 launched_host: OnceLock<Arc<dyn LaunchedHost>>,
27}
28
29impl CustomService {
30 pub fn new(id: usize, on: Arc<dyn Host>, external_ports: Vec<u16>) -> Self {
31 Self {
32 _id: id,
33 on,
34 external_ports,
35 launched_host: OnceLock::new(),
36 }
37 }
38
39 pub fn declare_client(self: &Arc<Self>) -> CustomClientPort {
40 CustomClientPort::new(Arc::downgrade(self), false)
41 }
42
43 pub fn declare_many_client(self: &Arc<Self>) -> CustomClientPort {
44 CustomClientPort::new(Arc::downgrade(self), true)
45 }
46}
47
48#[async_trait]
49impl Service for CustomService {
50 fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {
51 if self.launched_host.get().is_some() {
52 return;
53 }
54
55 let host = &self.on;
56
57 for port in self.external_ports.iter() {
58 host.request_port_base(&BaseServerStrategy::ExternalTcpPort(*port));
59 }
60 }
61
62 async fn deploy(&self, resource_result: &Arc<ResourceResult>) -> Result<()> {
63 self.launched_host.get_or_init(|| {
64 let host = &self.on;
65 let launched = host.provision(resource_result);
66 launched
67 });
68
69 Ok(())
70 }
71
72 async fn ready(&self) -> Result<()> {
73 Ok(())
74 }
75
76 async fn start(&self) -> Result<()> {
77 Ok(())
78 }
79
80 async fn stop(&self) -> Result<()> {
81 Ok(())
82 }
83}
84
85#[derive(Clone)]
86pub struct CustomClientPort {
87 pub on: Weak<CustomService>,
88 many: bool,
89 client_port: OnceLock<ServerConfig>,
90}
91
92impl CustomClientPort {
93 fn new(on: Weak<CustomService>, many: bool) -> Self {
94 Self {
95 on,
96 many,
97 client_port: OnceLock::new(),
98 }
99 }
100
101 pub async fn server_port(&self) -> ServerPort {
102 self.client_port
103 .get()
104 .unwrap()
105 .load_instantiated(&|p| p)
106 .await
107 }
108
109 pub async fn connect(&self) -> ConnectedDirect {
110 self.client_port
111 .get()
112 .unwrap()
113 .load_instantiated(&|p| p)
114 .await
115 .instantiate()
116 .await
117 .connect::<ConnectedDirect>()
118 }
119}
120
121impl RustCrateSource for CustomClientPort {
122 fn source_path(&self) -> SourcePath {
123 if self.many {
124 SourcePath::Many(self.on.upgrade().unwrap().on.clone())
125 } else {
126 SourcePath::Direct(self.on.upgrade().unwrap().on.clone())
127 }
128 }
129
130 fn host(&self) -> Arc<dyn Host> {
131 panic!("Custom services cannot be used as the server")
132 }
133
134 fn server(&self) -> Arc<dyn RustCrateServer> {
135 panic!("Custom services cannot be used as the server")
136 }
137
138 fn record_server_config(&self, config: ServerConfig) {
139 self.client_port
140 .set(config)
141 .map_err(drop) .expect("Cannot call `record_server_config()` multiple times.");
143 }
144
145 fn record_server_strategy(&self, _config: ServerStrategy) {
146 panic!("Custom services cannot be used as the server")
147 }
148}
149
150impl RustCrateSink for CustomClientPort {
151 fn instantiate(&self, _client_path: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>> {
152 bail!("Custom services cannot be used as the server")
153 }
154
155 fn instantiate_reverse(
156 &self,
157 server_host: &Arc<dyn Host>,
158 server_sink: Arc<dyn RustCrateServer>,
159 wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
160 ) -> Result<ReverseSinkInstantiator> {
161 let client = self.on.upgrade().unwrap();
162
163 let server_host = server_host.clone();
164
165 let (conn_type, bind_type) =
166 server_host.strategy_as_server(client.on.deref(), crate::PortNetworkHint::Auto)?;
167
168 let client_port = wrap_client_port(ServerConfig::from_strategy(&conn_type, server_sink));
169
170 Ok(Box::new(move |me| {
171 assert_eq!(
172 me.type_id(),
173 TypeId::of::<CustomClientPort>(),
174 "`instantiate_reverse` received different type than expected."
175 );
176 me.downcast_ref::<CustomClientPort>()
177 .unwrap()
178 .record_server_config(client_port);
179 ServerStrategy::Direct(bind_type(&*server_host))
180 }))
181 }
182}