1#![expect(
2 mismatched_lifetime_syntaxes,
3 reason = "https://github.com/BrynCooke/buildstructor/issues/200"
4)]
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Weak};
9
10use anyhow::Result;
11use futures::{FutureExt, StreamExt, TryStreamExt};
12
13use super::aws::AwsNetwork;
14use super::gcp::GcpNetwork;
15use super::{
16 CustomService, GcpComputeEngineHost, Host, LocalhostHost, ResourcePool, ResourceResult,
17 Service, progress,
18};
19use crate::{AwsEc2Host, AzureHost, HostTargetType, ServiceBuilder};
20
21pub struct Deployment {
22 pub hosts: Vec<Weak<dyn Host>>,
23 pub services: Vec<Weak<dyn Service>>,
24 pub resource_pool: ResourcePool,
25 localhost_host: Option<Arc<LocalhostHost>>,
26 last_resource_result: Option<Arc<ResourceResult>>,
27 next_host_id: usize,
28 next_service_id: usize,
29}
30
31impl Default for Deployment {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl Deployment {
38 pub fn new() -> Self {
39 let mut ret = Self {
40 hosts: Vec::new(),
41 services: Vec::new(),
42 resource_pool: ResourcePool::default(),
43 localhost_host: None,
44 last_resource_result: None,
45 next_host_id: 0,
46 next_service_id: 0,
47 };
48
49 ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
50 ret
51 }
52
53 #[expect(non_snake_case, reason = "constructor-esque")]
54 pub fn Localhost(&self) -> Arc<LocalhostHost> {
55 self.localhost_host.clone().unwrap()
56 }
57
58 #[expect(non_snake_case, reason = "constructor-esque")]
59 pub fn CustomService(
60 &mut self,
61 on: Arc<dyn Host>,
62 external_ports: Vec<u16>,
63 ) -> Arc<CustomService> {
64 self.add_service(|id, on| CustomService::new(id, on, external_ports), on)
65 }
66
67 pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
69 self.deploy().await?;
71 self.start().await?;
72 trigger.await;
73 self.stop().await?;
74 Ok(())
75 }
76
77 pub async fn start_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
81 self.start().await?;
83 trigger.await;
84 self.stop().await?;
85 Ok(())
86 }
87
88 pub async fn run_ctrl_c(&mut self) -> Result<()> {
90 self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
91 }
92
93 pub async fn start_ctrl_c(&mut self) -> Result<()> {
97 self.start_until(tokio::signal::ctrl_c().map(|_| ())).await
98 }
99
100 pub async fn deploy(&mut self) -> Result<()> {
101 self.services.retain(|weak| weak.strong_count() > 0);
102
103 progress::ProgressTracker::with_group("deploy", Some(3), || async {
104 let mut resource_batch = super::ResourceBatch::new();
105
106 for service in self.services.iter().filter_map(Weak::upgrade) {
107 service.collect_resources(&mut resource_batch);
108 }
109
110 for host in self.hosts.iter().filter_map(Weak::upgrade) {
111 host.collect_resources(&mut resource_batch);
112 }
113
114 let resource_result = Arc::new(
115 progress::ProgressTracker::with_group("provision", Some(1), || async {
116 resource_batch
117 .provision(&mut self.resource_pool, self.last_resource_result.clone())
118 .await
119 })
120 .await?,
121 );
122 self.last_resource_result = Some(resource_result.clone());
123
124 for host in self.hosts.iter().filter_map(Weak::upgrade) {
125 host.provision(&resource_result);
126 }
127
128 let upgraded_services = self
129 .services
130 .iter()
131 .filter_map(Weak::upgrade)
132 .collect::<Vec<_>>();
133
134 progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || {
135 let services_future = upgraded_services
136 .iter()
137 .map(|service: &Arc<dyn Service>| {
138 let resource_result = &resource_result;
139 async move { service.deploy(resource_result).await }
140 })
141 .collect::<Vec<_>>();
142
143 futures::stream::iter(services_future)
144 .buffer_unordered(16)
145 .try_fold((), |_, _| async { Ok(()) })
146 })
147 .await?;
148
149 progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || {
150 let all_services_ready =
151 upgraded_services
152 .iter()
153 .map(|service: &Arc<dyn Service>| async move {
154 service.ready().await?;
155 Ok(()) as Result<()>
156 });
157
158 futures::future::try_join_all(all_services_ready)
159 })
160 .await?;
161
162 Ok(())
163 })
164 .await
165 }
166
167 pub async fn start(&mut self) -> Result<()> {
168 self.services.retain(|weak| weak.strong_count() > 0);
169
170 progress::ProgressTracker::with_group("start", None, || {
171 let all_services_start = self.services.iter().filter_map(Weak::upgrade).map(
172 |service: Arc<dyn Service>| async move {
173 service.start().await?;
174 Ok(()) as Result<()>
175 },
176 );
177
178 futures::future::try_join_all(all_services_start)
179 })
180 .await?;
181 Ok(())
182 }
183
184 pub async fn stop(&mut self) -> Result<()> {
185 self.services.retain(|weak| weak.strong_count() > 0);
186
187 progress::ProgressTracker::with_group("stop", None, || {
188 let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
189 |service: Arc<dyn Service>| async move {
190 service.stop().await?;
191 Ok(()) as Result<()>
192 },
193 );
194
195 futures::future::try_join_all(all_services_stop)
196 })
197 .await?;
198 Ok(())
199 }
200}
201
202impl Deployment {
203 pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
204 let arc = Arc::new(host(self.next_host_id));
205 self.next_host_id += 1;
206
207 self.hosts.push(Arc::downgrade(&arc) as Weak<dyn Host>);
208 arc
209 }
210
211 pub fn add_service<T: Service + 'static>(
212 &mut self,
213 service: impl ServiceBuilder<Service = T>,
214 on: Arc<dyn Host>,
215 ) -> Arc<T> {
216 let arc = Arc::new(service.build(self.next_service_id, on));
217 self.next_service_id += 1;
218
219 self.services
220 .push(Arc::downgrade(&arc) as Weak<dyn Service>);
221 arc
222 }
223}
224
225#[buildstructor::buildstructor]
227impl Deployment {
228 #[builder(entry = "GcpComputeEngineHost", exit = "add")]
229 pub fn add_gcp_compute_engine_host(
230 &mut self,
231 project: String,
232 machine_type: String,
233 image: String,
234 target_type: Option<HostTargetType>,
235 region: String,
236 network: Arc<GcpNetwork>,
237 user: Option<String>,
238 display_name: Option<String>,
239 ) -> Arc<GcpComputeEngineHost> {
240 self.add_host(|id| {
241 GcpComputeEngineHost::new(
242 id,
243 project,
244 machine_type,
245 image,
246 target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
247 region,
248 network,
249 user,
250 display_name,
251 )
252 })
253 }
254
255 #[builder(entry = "AzureHost", exit = "add")]
256 pub fn add_azure_host(
257 &mut self,
258 project: String,
259 os_type: String, machine_size: String,
261 image: Option<HashMap<String, String>>,
262 target_type: Option<HostTargetType>,
263 region: String,
264 user: Option<String>,
265 ) -> Arc<AzureHost> {
266 self.add_host(|id| {
267 AzureHost::new(
268 id,
269 project,
270 os_type,
271 machine_size,
272 image,
273 target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
274 region,
275 user,
276 )
277 })
278 }
279
280 #[builder(entry = "AwsEc2Host", exit = "add")]
281 pub fn add_aws_ec2_host(
282 &mut self,
283 region: String,
284 instance_type: String,
285 target_type: Option<HostTargetType>,
286 ami: String,
287 network: Arc<AwsNetwork>,
288 user: Option<String>,
289 display_name: Option<String>,
290 ) -> Arc<AwsEc2Host> {
291 self.add_host(|id| {
292 AwsEc2Host::new(
293 id,
294 region,
295 instance_type,
296 target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
297 ami,
298 network,
299 user,
300 display_name,
301 )
302 })
303 }
304}