Skip to main content

hydro_deploy/
deployment.rs

1#![expect(
2    mismatched_lifetime_syntaxes,
3    reason = "https://github.com/BrynCooke/buildstructor/issues/200"
4)]
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Mutex, Weak};
9
10use anyhow::Result;
11use futures::{FutureExt, StreamExt, TryStreamExt};
12
13use crate::aws::{AwsCloudwatchLogGroup, AwsEc2IamInstanceProfile, AwsNetwork};
14use crate::gcp::GcpNetwork;
15use crate::{
16    AwsEc2Host, AzureHost, CustomService, GcpComputeEngineHost, Host, HostTargetType,
17    LocalhostHost, ResourcePool, ResourceResult, Service, ServiceBuilder, progress,
18};
19
20pub struct Deployment {
21    pub hosts: Vec<Weak<dyn Host>>,
22    pub services: Vec<Weak<dyn Service>>,
23    pub resource_pool: ResourcePool,
24    localhost_host: Option<Arc<LocalhostHost>>,
25    last_resource_result: Option<Arc<ResourceResult>>,
26    next_host_id: usize,
27    next_service_id: usize,
28}
29
30impl Default for Deployment {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl Deployment {
37    pub fn new() -> Self {
38        let mut ret = Self {
39            hosts: Vec::new(),
40            services: Vec::new(),
41            resource_pool: ResourcePool::default(),
42            localhost_host: None,
43            last_resource_result: None,
44            next_host_id: 0,
45            next_service_id: 0,
46        };
47
48        ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
49        ret
50    }
51
52    #[expect(non_snake_case, reason = "constructor-esque")]
53    pub fn Localhost(&self) -> Arc<LocalhostHost> {
54        self.localhost_host.clone().unwrap()
55    }
56
57    #[expect(non_snake_case, reason = "constructor-esque")]
58    pub fn CustomService(
59        &mut self,
60        on: Arc<dyn Host>,
61        external_ports: Vec<u16>,
62    ) -> Arc<CustomService> {
63        self.add_service(|id, on| CustomService::new(id, on, external_ports), on)
64    }
65
66    /// Runs `deploy()`, and `start()`, waits for the trigger future, then runs `stop()`.
67    pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
68        // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
69        self.deploy().await?;
70        self.start().await?;
71        trigger.await;
72        self.stop().await?;
73        Ok(())
74    }
75
76    /// Runs `start()`, waits for the trigger future, then runs `stop()`.
77    /// This is useful if you need to initiate external network connections between
78    /// `deploy()` and `start()`.
79    pub async fn start_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
80        // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
81        self.start().await?;
82        trigger.await;
83        self.stop().await?;
84        Ok(())
85    }
86
87    /// Runs `deploy()`, and `start()`, waits for CTRL+C, then runs `stop()`.
88    pub async fn run_ctrl_c(&mut self) -> Result<()> {
89        self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
90    }
91
92    /// Runs `start()`, waits for CTRL+C, then runs `stop()`.
93    /// This is useful if you need to initiate external network connections between
94    /// `deploy()` and `start()`.
95    pub async fn start_ctrl_c(&mut self) -> Result<()> {
96        self.start_until(tokio::signal::ctrl_c().map(|_| ())).await
97    }
98
99    pub async fn deploy(&mut self) -> Result<()> {
100        self.services.retain(|weak| weak.strong_count() > 0);
101
102        progress::ProgressTracker::with_group("deploy", Some(3), || async {
103            let mut resource_batch = super::ResourceBatch::new();
104
105            for service in self.services.iter().filter_map(Weak::upgrade) {
106                service.collect_resources(&mut resource_batch);
107            }
108
109            for host in self.hosts.iter().filter_map(Weak::upgrade) {
110                host.collect_resources(&mut resource_batch);
111            }
112
113            let resource_result = Arc::new(
114                progress::ProgressTracker::with_group("provision", Some(1), || async {
115                    resource_batch
116                        .provision(&mut self.resource_pool, self.last_resource_result.clone())
117                        .await
118                })
119                .await?,
120            );
121            self.last_resource_result = Some(resource_result.clone());
122
123            for host in self.hosts.iter().filter_map(Weak::upgrade) {
124                host.provision(&resource_result);
125            }
126
127            let upgraded_services = self
128                .services
129                .iter()
130                .filter_map(Weak::upgrade)
131                .collect::<Vec<_>>();
132
133            progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || {
134                let services_future = upgraded_services
135                    .iter()
136                    .map(|service: &Arc<dyn Service>| {
137                        let resource_result = &resource_result;
138                        async move { service.deploy(resource_result).await }
139                    })
140                    .collect::<Vec<_>>();
141
142                futures::stream::iter(services_future)
143                    .buffer_unordered(16)
144                    .try_fold((), |_, _| async { Ok(()) })
145            })
146            .await?;
147
148            progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || {
149                let all_services_ready =
150                    upgraded_services
151                        .iter()
152                        .map(|service: &Arc<dyn Service>| async move {
153                            service.ready().await?;
154                            Ok(()) as Result<()>
155                        });
156
157                futures::future::try_join_all(all_services_ready)
158            })
159            .await?;
160
161            Ok(())
162        })
163        .await
164    }
165
166    pub async fn start(&mut self) -> Result<()> {
167        self.services.retain(|weak| weak.strong_count() > 0);
168
169        progress::ProgressTracker::with_group("start", None, || {
170            let all_services_start = self.services.iter().filter_map(Weak::upgrade).map(
171                |service: Arc<dyn Service>| async move {
172                    service.start().await?;
173                    Ok(()) as Result<()>
174                },
175            );
176
177            futures::future::try_join_all(all_services_start)
178        })
179        .await?;
180        Ok(())
181    }
182
183    pub async fn stop(&mut self) -> Result<()> {
184        self.services.retain(|weak| weak.strong_count() > 0);
185
186        progress::ProgressTracker::with_group("stop", None, || {
187            let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
188                |service: Arc<dyn Service>| async move {
189                    service.stop().await?;
190                    Ok(()) as Result<()>
191                },
192            );
193
194            futures::future::try_join_all(all_services_stop)
195        })
196        .await?;
197        Ok(())
198    }
199}
200
201impl Deployment {
202    pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
203        let arc = Arc::new(host(self.next_host_id));
204        self.next_host_id += 1;
205
206        self.hosts.push(Arc::downgrade(&arc) as Weak<dyn Host>);
207        arc
208    }
209
210    pub fn add_service<T: Service + 'static>(
211        &mut self,
212        service: impl ServiceBuilder<Service = T>,
213        on: Arc<dyn Host>,
214    ) -> Arc<T> {
215        let arc = Arc::new(service.build(self.next_service_id, on));
216        self.next_service_id += 1;
217
218        self.services
219            .push(Arc::downgrade(&arc) as Weak<dyn Service>);
220        arc
221    }
222}
223
224/// Buildstructor methods.
225#[buildstructor::buildstructor]
226impl Deployment {
227    #[builder(entry = "GcpComputeEngineHost", exit = "add", visibility = "pub")]
228    fn add_gcp_compute_engine_host(
229        &mut self,
230        project: String,
231        machine_type: String,
232        image: String,
233        target_type: Option<HostTargetType>,
234        region: String,
235        network: Arc<GcpNetwork>,
236        user: Option<String>,
237        display_name: Option<String>,
238    ) -> Arc<GcpComputeEngineHost> {
239        self.add_host(|id| {
240            GcpComputeEngineHost::new(
241                id,
242                project,
243                machine_type,
244                image,
245                target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
246                region,
247                network,
248                user,
249                display_name,
250            )
251        })
252    }
253
254    #[builder(entry = "AzureHost", exit = "add", visibility = "pub")]
255    fn add_azure_host(
256        &mut self,
257        project: String,
258        os_type: String, // linux or windows
259        machine_size: String,
260        image: Option<HashMap<String, String>>,
261        target_type: Option<HostTargetType>,
262        region: String,
263        user: Option<String>,
264    ) -> Arc<AzureHost> {
265        self.add_host(|id| {
266            AzureHost::new(
267                id,
268                project,
269                os_type,
270                machine_size,
271                image,
272                target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
273                region,
274                user,
275            )
276        })
277    }
278
279    #[builder(entry = "AwsEc2Host", exit = "add", visibility = "pub")]
280    fn add_aws_ec2_host(
281        &mut self,
282        region: String,
283        instance_type: String,
284        target_type: Option<HostTargetType>,
285        ami: String,
286        network: Arc<AwsNetwork>,
287        iam_instance_profile: Option<Arc<Mutex<AwsEc2IamInstanceProfile>>>,
288        cloudwatch_log_group: Option<Arc<Mutex<AwsCloudwatchLogGroup>>>,
289        // `metrics_collected`: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html#CloudWatch-Agent-Configuration-File-Metricssection
290        cwa_metrics_collected: Option<serde_json::Value>,
291        user: Option<String>,
292        display_name: Option<String>,
293    ) -> Arc<AwsEc2Host> {
294        self.add_host(|id| {
295            AwsEc2Host::new(
296                id,
297                region,
298                instance_type,
299                target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
300                ami,
301                network,
302                iam_instance_profile,
303                cloudwatch_log_group,
304                cwa_metrics_collected,
305                user,
306                display_name,
307            )
308        })
309    }
310}