1#![expect(
2 mismatched_lifetime_syntaxes,
3 reason = "https://github.com/BrynCooke/buildstructor/issues/200"
4)]
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Mutex, Weak};
9
10use anyhow::Result;
11use futures::{FutureExt, StreamExt, TryStreamExt};
12
13use crate::aws::{AwsCloudwatchLogGroup, AwsEc2IamInstanceProfile, AwsNetwork};
14use crate::gcp::GcpNetwork;
15use crate::{
16 AwsEc2Host, AzureHost, CustomService, GcpComputeEngineHost, Host, HostTargetType,
17 LocalhostHost, ResourcePool, ResourceResult, Service, ServiceBuilder, progress,
18};
19
20pub struct Deployment {
21 pub hosts: Vec<Weak<dyn Host>>,
22 pub services: Vec<Weak<dyn Service>>,
23 pub resource_pool: ResourcePool,
24 localhost_host: Option<Arc<LocalhostHost>>,
25 last_resource_result: Option<Arc<ResourceResult>>,
26 next_host_id: usize,
27 next_service_id: usize,
28}
29
30impl Default for Deployment {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl Deployment {
37 pub fn new() -> Self {
38 let mut ret = Self {
39 hosts: Vec::new(),
40 services: Vec::new(),
41 resource_pool: ResourcePool::default(),
42 localhost_host: None,
43 last_resource_result: None,
44 next_host_id: 0,
45 next_service_id: 0,
46 };
47
48 ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
49 ret
50 }
51
52 #[expect(non_snake_case, reason = "constructor-esque")]
53 pub fn Localhost(&self) -> Arc<LocalhostHost> {
54 self.localhost_host.clone().unwrap()
55 }
56
57 #[expect(non_snake_case, reason = "constructor-esque")]
58 pub fn CustomService(
59 &mut self,
60 on: Arc<dyn Host>,
61 external_ports: Vec<u16>,
62 ) -> Arc<CustomService> {
63 self.add_service(|id, on| CustomService::new(id, on, external_ports), on)
64 }
65
66 pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
68 self.deploy().await?;
70 self.start().await?;
71 trigger.await;
72 self.stop().await?;
73 Ok(())
74 }
75
76 pub async fn start_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
80 self.start().await?;
82 trigger.await;
83 self.stop().await?;
84 Ok(())
85 }
86
87 pub async fn run_ctrl_c(&mut self) -> Result<()> {
89 self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
90 }
91
92 pub async fn start_ctrl_c(&mut self) -> Result<()> {
96 self.start_until(tokio::signal::ctrl_c().map(|_| ())).await
97 }
98
99 pub async fn deploy(&mut self) -> Result<()> {
100 self.services.retain(|weak| weak.strong_count() > 0);
101
102 progress::ProgressTracker::with_group("deploy", Some(3), || async {
103 let mut resource_batch = super::ResourceBatch::new();
104
105 for service in self.services.iter().filter_map(Weak::upgrade) {
106 service.collect_resources(&mut resource_batch);
107 }
108
109 for host in self.hosts.iter().filter_map(Weak::upgrade) {
110 host.collect_resources(&mut resource_batch);
111 }
112
113 let resource_result = Arc::new(
114 progress::ProgressTracker::with_group("provision", Some(1), || async {
115 resource_batch
116 .provision(&mut self.resource_pool, self.last_resource_result.clone())
117 .await
118 })
119 .await?,
120 );
121 self.last_resource_result = Some(resource_result.clone());
122
123 for host in self.hosts.iter().filter_map(Weak::upgrade) {
124 host.provision(&resource_result);
125 }
126
127 let upgraded_services = self
128 .services
129 .iter()
130 .filter_map(Weak::upgrade)
131 .collect::<Vec<_>>();
132
133 progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || {
134 let services_future = upgraded_services
135 .iter()
136 .map(|service: &Arc<dyn Service>| {
137 let resource_result = &resource_result;
138 async move { service.deploy(resource_result).await }
139 })
140 .collect::<Vec<_>>();
141
142 futures::stream::iter(services_future)
143 .buffer_unordered(16)
144 .try_fold((), |_, _| async { Ok(()) })
145 })
146 .await?;
147
148 progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || {
149 let all_services_ready =
150 upgraded_services
151 .iter()
152 .map(|service: &Arc<dyn Service>| async move {
153 service.ready().await?;
154 Ok(()) as Result<()>
155 });
156
157 futures::future::try_join_all(all_services_ready)
158 })
159 .await?;
160
161 Ok(())
162 })
163 .await
164 }
165
166 pub async fn start(&mut self) -> Result<()> {
167 self.services.retain(|weak| weak.strong_count() > 0);
168
169 progress::ProgressTracker::with_group("start", None, || {
170 let all_services_start = self.services.iter().filter_map(Weak::upgrade).map(
171 |service: Arc<dyn Service>| async move {
172 service.start().await?;
173 Ok(()) as Result<()>
174 },
175 );
176
177 futures::future::try_join_all(all_services_start)
178 })
179 .await?;
180 Ok(())
181 }
182
183 pub async fn stop(&mut self) -> Result<()> {
184 self.services.retain(|weak| weak.strong_count() > 0);
185
186 progress::ProgressTracker::with_group("stop", None, || {
187 let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
188 |service: Arc<dyn Service>| async move {
189 service.stop().await?;
190 Ok(()) as Result<()>
191 },
192 );
193
194 futures::future::try_join_all(all_services_stop)
195 })
196 .await?;
197 Ok(())
198 }
199}
200
201impl Deployment {
202 pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
203 let arc = Arc::new(host(self.next_host_id));
204 self.next_host_id += 1;
205
206 self.hosts.push(Arc::downgrade(&arc) as Weak<dyn Host>);
207 arc
208 }
209
210 pub fn add_service<T: Service + 'static>(
211 &mut self,
212 service: impl ServiceBuilder<Service = T>,
213 on: Arc<dyn Host>,
214 ) -> Arc<T> {
215 let arc = Arc::new(service.build(self.next_service_id, on));
216 self.next_service_id += 1;
217
218 self.services
219 .push(Arc::downgrade(&arc) as Weak<dyn Service>);
220 arc
221 }
222}
223
224#[buildstructor::buildstructor]
226impl Deployment {
227 #[builder(entry = "GcpComputeEngineHost", exit = "add", visibility = "pub")]
228 fn add_gcp_compute_engine_host(
229 &mut self,
230 project: String,
231 machine_type: String,
232 image: String,
233 target_type: Option<HostTargetType>,
234 region: String,
235 network: Arc<GcpNetwork>,
236 user: Option<String>,
237 display_name: Option<String>,
238 ) -> Arc<GcpComputeEngineHost> {
239 self.add_host(|id| {
240 GcpComputeEngineHost::new(
241 id,
242 project,
243 machine_type,
244 image,
245 target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
246 region,
247 network,
248 user,
249 display_name,
250 )
251 })
252 }
253
254 #[builder(entry = "AzureHost", exit = "add", visibility = "pub")]
255 fn add_azure_host(
256 &mut self,
257 project: String,
258 os_type: String, machine_size: String,
260 image: Option<HashMap<String, String>>,
261 target_type: Option<HostTargetType>,
262 region: String,
263 user: Option<String>,
264 ) -> Arc<AzureHost> {
265 self.add_host(|id| {
266 AzureHost::new(
267 id,
268 project,
269 os_type,
270 machine_size,
271 image,
272 target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
273 region,
274 user,
275 )
276 })
277 }
278
279 #[builder(entry = "AwsEc2Host", exit = "add", visibility = "pub")]
280 fn add_aws_ec2_host(
281 &mut self,
282 region: String,
283 instance_type: String,
284 target_type: Option<HostTargetType>,
285 ami: String,
286 network: Arc<AwsNetwork>,
287 iam_instance_profile: Option<Arc<Mutex<AwsEc2IamInstanceProfile>>>,
288 cloudwatch_log_group: Option<Arc<Mutex<AwsCloudwatchLogGroup>>>,
289 cwa_metrics_collected: Option<serde_json::Value>,
291 user: Option<String>,
292 display_name: Option<String>,
293 ) -> Arc<AwsEc2Host> {
294 self.add_host(|id| {
295 AwsEc2Host::new(
296 id,
297 region,
298 instance_type,
299 target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
300 ami,
301 network,
302 iam_instance_profile,
303 cloudwatch_log_group,
304 cwa_metrics_collected,
305 user,
306 display_name,
307 )
308 })
309 }
310}