hydro_lang/deploy/
deploy_graph_containerized.rs

1//! Deployment backend for Hydro that uses Docker to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11    BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12    RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::deploy::DeployResult;
33use crate::compile::deploy_provider::{
34    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
35};
36use crate::compile::trybuild::generate::create_graph_trybuild;
37use crate::location::dynamic::LocationId;
38use crate::location::member_id::TaglessMemberId;
39use crate::location::{MembershipEvent, NetworkHint};
40
41/// represents a docker network
42#[derive(Clone, Debug)]
43pub struct DockerNetwork {
44    name: String,
45}
46
47impl DockerNetwork {
48    /// creates a new docker network (will actually be created when deployment.start() is called).
49    pub fn new(name: String) -> Self {
50        Self {
51            name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
52        }
53    }
54}
55
56/// Represents a process running in a docker container
57#[derive(Clone)]
58pub struct DockerDeployProcess {
59    id: usize,
60    name: String,
61    next_port: Rc<RefCell<u16>>,
62    rust_crate: Rc<RefCell<Option<RustCrate>>>,
63
64    exposed_ports: Rc<RefCell<Vec<u16>>>,
65
66    docker_container_name: Rc<RefCell<Option<String>>>,
67
68    compilation_options: Option<String>,
69
70    config: Vec<String>,
71
72    network: DockerNetwork,
73}
74
75impl Node for DockerDeployProcess {
76    type Port = u16;
77    type Meta = ();
78    type InstantiateEnv = DockerDeploy;
79
80    #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
81    fn next_port(&self) -> Self::Port {
82        let port = {
83            let mut borrow = self.next_port.borrow_mut();
84            let port = *borrow;
85            *borrow += 1;
86            port
87        };
88
89        port
90    }
91
92    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
93    fn update_meta(&self, _meta: &Self::Meta) {}
94
95    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
96    fn instantiate(
97        &self,
98        _env: &mut Self::InstantiateEnv,
99        meta: &mut Self::Meta,
100        graph: DfirGraph,
101        extra_stmts: Vec<syn::Stmt>,
102    ) {
103        let (bin_name, config) =
104            create_graph_trybuild(graph, extra_stmts, &Some(self.name.clone()), true);
105
106        let mut ret = RustCrate::new(config.project_dir)
107            .target_dir(config.target_dir)
108            .example(bin_name.clone())
109            .no_default_features();
110
111        ret = ret.display_name("test_display_name");
112
113        ret = ret.features(vec!["hydro___feature_docker_runtime".to_string()]);
114
115        if let Some(features) = config.features {
116            ret = ret.features(features);
117        }
118
119        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
120        ret = ret.config("build.incremental = false");
121
122        *self.rust_crate.borrow_mut() = Some(ret);
123    }
124}
125
126/// Represents a logical cluster, which can be a variable amount of individual containers.
127#[derive(Clone)]
128pub struct DockerDeployCluster {
129    id: usize,
130    name: String,
131    next_port: Rc<RefCell<u16>>,
132    rust_crate: Rc<RefCell<Option<RustCrate>>>,
133
134    docker_container_name: Rc<RefCell<Vec<String>>>,
135
136    compilation_options: Option<String>,
137
138    config: Vec<String>,
139
140    count: usize,
141}
142
143impl Node for DockerDeployCluster {
144    type Port = u16;
145    type Meta = ();
146    type InstantiateEnv = DockerDeploy;
147
148    #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
149    fn next_port(&self) -> Self::Port {
150        let port = {
151            let mut borrow = self.next_port.borrow_mut();
152            let port = *borrow;
153            *borrow += 1;
154            port
155        };
156
157        port
158    }
159
160    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
161    fn update_meta(&self, _meta: &Self::Meta) {}
162
163    #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, extra_stmts = extra_stmts.len()))]
164    fn instantiate(
165        &self,
166        _env: &mut Self::InstantiateEnv,
167        _meta: &mut Self::Meta,
168        graph: DfirGraph,
169        extra_stmts: Vec<syn::Stmt>,
170    ) {
171        let (bin_name, config) =
172            create_graph_trybuild(graph, extra_stmts, &Some(self.name.clone()), true);
173
174        let mut ret = RustCrate::new(config.project_dir)
175            .target_dir(config.target_dir)
176            .example(bin_name.clone())
177            .no_default_features();
178
179        ret = ret.display_name("test_display_name");
180
181        ret = ret.features(vec!["hydro___feature_deploy_integration".to_string()]);
182
183        if let Some(features) = config.features {
184            ret = ret.features(features);
185        }
186
187        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
188        ret = ret.config("build.incremental = false");
189
190        *self.rust_crate.borrow_mut() = Some(ret);
191    }
192}
193
194/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
195#[derive(Clone, Debug)]
196pub struct DockerDeployExternal {
197    name: String,
198    next_port: Rc<RefCell<u16>>,
199
200    ports: Rc<RefCell<HashMap<usize, u16>>>,
201
202    #[expect(clippy::type_complexity, reason = "internal code")]
203    connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
204}
205
206impl Node for DockerDeployExternal {
207    type Port = u16;
208    type Meta = ();
209    type InstantiateEnv = DockerDeploy;
210
211    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
212    fn next_port(&self) -> Self::Port {
213        let port = {
214            let mut borrow = self.next_port.borrow_mut();
215            let port = *borrow;
216            *borrow += 1;
217            port
218        };
219
220        port
221    }
222
223    #[instrument(level = "trace", skip_all, fields(name = self.name))]
224    fn update_meta(&self, _meta: &Self::Meta) {}
225
226    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
227    fn instantiate(
228        &self,
229        _env: &mut Self::InstantiateEnv,
230        meta: &mut Self::Meta,
231        graph: DfirGraph,
232        extra_stmts: Vec<syn::Stmt>,
233    ) {
234        trace!(name: "surface", surface = graph.surface_syntax_string());
235    }
236}
237
238type DynSourceSink<Out, In, InErr> = (
239    Pin<Box<dyn Stream<Item = Out>>>,
240    Pin<Box<dyn Sink<In, Error = InErr>>>,
241);
242
243impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
244    #[instrument(level = "trace", skip_all, fields(name = self.name, %key, %port))]
245    fn register(&self, key: usize, port: <DockerDeploy as Deploy>::Port) {
246        self.ports.borrow_mut().insert(key, port);
247    }
248
249    #[instrument(level = "trace", skip_all, fields(name = self.name, %key))]
250    fn raw_port(&self, key: usize) -> <DockerDeploy as Deploy<'a>>::ExternalRawPort {
251        todo!()
252    }
253
254    fn as_bytes_bidi(
255        &self,
256        key: usize,
257    ) -> impl Future<
258        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
259    > + 'a {
260        let _span = tracing::trace_span!("as_bytes_bidi", name = %self.name, %key).entered(); // the instrument macro doesn't work here because of lifetime issues?
261        async { todo!() }
262    }
263
264    fn as_bincode_bidi<InT, OutT>(
265        &self,
266        key: usize,
267    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
268    where
269        InT: serde::Serialize + 'static,
270        OutT: serde::de::DeserializeOwned + 'static,
271    {
272        let _span = tracing::trace_span!("as_bincode_bidi", name = %self.name, %key).entered(); // the instrument macro doesn't work here because of lifetime issues?
273        async { todo!() }
274    }
275
276    fn as_bincode_sink<T>(
277        &self,
278        key: usize,
279    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
280    where
281        T: serde::Serialize + 'static,
282    {
283        let guard = tracing::trace_span!("as_bincode_sink", name = %self.name, %key).entered();
284
285        let local_port = *self.ports.borrow().get(&key).unwrap();
286        let (docker_container_name, remote_port, _) = self
287            .connection_info
288            .borrow()
289            .get(&local_port)
290            .unwrap()
291            .clone();
292
293        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
294
295        async move {
296            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
297            let remote_ip_address = "localhost";
298
299            Box::pin(
300                LazySink::new(move || {
301                    Box::pin(async move {
302                        trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
303
304                        let stream =
305                            TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
306                                .await?;
307
308                        trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
309
310                        Result::<_, std::io::Error>::Ok(FramedWrite::new(
311                            stream,
312                            LengthDelimitedCodec::new(),
313                        ))
314                    })
315                })
316                .with(move |v| async move {
317                    Ok(Bytes::from(bincode::serialize(&v).unwrap()))
318                }),
319            ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
320        }
321        .instrument(guard.exit())
322    }
323
324    fn as_bincode_source<T>(
325        &self,
326        key: usize,
327    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
328    where
329        T: serde::de::DeserializeOwned + 'static,
330    {
331        let guard = tracing::trace_span!("as_bincode_sink", name = %self.name, %key).entered();
332
333        let local_port = *self.ports.borrow().get(&key).unwrap();
334        let (docker_container_name, remote_port, _) = self
335            .connection_info
336            .borrow()
337            .get(&local_port)
338            .unwrap()
339            .clone();
340
341        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
342
343        async move {
344
345            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
346            let remote_ip_address = "localhost";
347
348            trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
349
350            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
351                .await
352                .unwrap();
353
354            trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
355
356            Box::pin(
357                FramedRead::new(stream, LengthDelimitedCodec::new())
358                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
359            ) as Pin<Box<dyn Stream<Item = T>>>
360        }
361        .instrument(guard.exit())
362    }
363}
364
365#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
366async fn find_dynamically_allocated_docker_port(
367    docker_container_name: &str,
368    destination_port: u16,
369) -> u16 {
370    let docker = Docker::connect_with_local_defaults().unwrap();
371
372    let container_info = docker
373        .inspect_container(docker_container_name, None::<InspectContainerOptions>)
374        .await
375        .unwrap();
376
377    trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
378
379    // container_info={"1001/tcp": Some([PortBinding { host_ip: Some("0.0.0.0"), host_port: Some("32771") }, PortBinding { host_ip: Some("::"), host_port: Some("32771") }])} destination_port=1001
380    let remote_port = container_info
381        .network_settings
382        .as_ref()
383        .unwrap()
384        .ports
385        .as_ref()
386        .unwrap()
387        .get(&format!("{destination_port}/tcp"))
388        .unwrap()
389        .as_ref()
390        .unwrap()
391        .iter()
392        .find(|v| v.host_ip == Some("0.0.0.0".to_string()))
393        .unwrap()
394        .host_port
395        .as_ref()
396        .unwrap()
397        .parse()
398        .unwrap();
399
400    remote_port
401}
402
403/// For deploying to a local docker instance
404pub struct DockerDeploy {
405    docker_processes: Vec<DockerDeployProcessSpec>,
406    docker_clusters: Vec<DockerDeployClusterSpec>,
407    network: DockerNetwork,
408    deployment_instance: String,
409}
410
411#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
412async fn create_and_start_container(
413    docker: &Docker,
414    container_name: &str,
415    image_name: &str,
416    network_name: &str,
417    deployment_instance: &str,
418) -> Result<(), anyhow::Error> {
419    let config = ContainerCreateBody {
420        image: Some(image_name.to_string()),
421        hostname: Some(container_name.to_string()),
422        host_config: Some(HostConfig {
423            binds: Some(vec![
424                "/var/run/docker.sock:/var/run/docker.sock".to_string(),
425            ]),
426            publish_all_ports: Some(true),
427            port_bindings: Some(HashMap::new()), /* Due to a bug in docker, if you don't send empty port bindings with publish_all_ports set to true and with a docker image that has EXPOSE directives in it, docker will crash because it will try to write to a map in memory that it has not initialized yet. Setting port_bindings explicitly to an empty map will initialize it first so that it does not break. */
428            ..Default::default()
429        }),
430        env: Some(vec![
431            format!("CONTAINER_NAME={container_name}"),
432            format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
433            format!("RUST_LOG=trace"),
434        ]),
435        networking_config: Some(NetworkingConfig {
436            endpoints_config: Some(HashMap::from([(
437                network_name.to_string(),
438                EndpointSettings {
439                    ..Default::default()
440                },
441            )])),
442        }),
443        tty: Some(true),
444        ..Default::default()
445    };
446
447    let options = CreateContainerOptions {
448        name: Some(container_name.to_string()),
449        ..Default::default()
450    };
451
452    tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
453    docker.create_container(Some(options), config).await?;
454    docker
455        .start_container(container_name, None::<StartContainerOptions>)
456        .await?;
457
458    Ok(())
459}
460
461#[instrument(level = "trace", skip_all, fields(%image_name))]
462async fn build_and_create_image(
463    rust_crate: &Rc<RefCell<Option<RustCrate>>>,
464    compilation_options: &Option<String>,
465    config: &[String],
466    exposed_ports: &[u16],
467    image_name: &str,
468) -> Result<(), anyhow::Error> {
469    let mut rust_crate = rust_crate
470        .borrow_mut()
471        .take()
472        .unwrap()
473        .rustflags(compilation_options.clone().unwrap_or("".to_string()));
474
475    for cfg in config {
476        rust_crate = rust_crate.config(cfg);
477    }
478
479    let build_output = match build_crate_memoized(
480        rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
481    )
482    .await
483    {
484        Ok(build_output) => build_output,
485        Err(BuildError::FailedToBuildCrate {
486            exit_status,
487            diagnostics,
488            text_lines,
489            stderr_lines,
490        }) => {
491            let diagnostics = diagnostics
492                .into_iter()
493                .map(|d| d.rendered.unwrap())
494                .collect::<Vec<_>>()
495                .join("\n");
496            let text_lines = text_lines.join("\n");
497            let stderr_lines = stderr_lines.join("\n");
498
499            anyhow::bail!(
500                r#"
501Failed to build crate {exit_status:?}
502--- diagnostics
503---
504{diagnostics}
505---
506---
507---
508
509--- text_lines
510---
511---
512{text_lines}
513---
514---
515---
516
517--- stderr_lines
518---
519---
520{stderr_lines}
521---
522---
523---"#
524            );
525        }
526        Err(err) => {
527            anyhow::bail!("Failed to build crate {err:?}");
528        }
529    };
530
531    let docker = Docker::connect_with_local_defaults()?;
532
533    let mut tar_data = Vec::new();
534    {
535        let mut tar = Builder::new(&mut tar_data);
536
537        let exposed_ports = exposed_ports
538            .iter()
539            .map(|port| format!("EXPOSE {port}/tcp"))
540            .collect::<Vec<_>>()
541            .join("\n");
542
543        let dockerfile_content = format!(
544            r#"
545                FROM scratch
546                {exposed_ports}
547                COPY app /app
548                CMD ["/app"]
549            "#,
550        );
551
552        trace!(name: "dockerfile", %dockerfile_content);
553
554        let mut header = Header::new_gnu();
555        header.set_path("Dockerfile")?;
556        header.set_size(dockerfile_content.len() as u64);
557        header.set_cksum();
558        tar.append(&header, dockerfile_content.as_bytes())?;
559
560        let mut header = Header::new_gnu();
561        header.set_path("app")?;
562        header.set_size(build_output.bin_data.len() as u64);
563        header.set_mode(0o755);
564        header.set_cksum();
565        tar.append(&header, &build_output.bin_data[..])?;
566
567        tar.finish()?;
568    }
569
570    let build_options = BuildImageOptions {
571        dockerfile: "Dockerfile".to_owned(),
572        t: Some(image_name.to_string()),
573        rm: true,
574        ..Default::default()
575    };
576
577    use bollard::errors::Error;
578
579    let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
580    let mut build_stream = docker.build_image(build_options, None, Some(body));
581    while let Some(msg) = build_stream.next().await {
582        match msg {
583            Ok(_) => {}
584            Err(e) => match e {
585                Error::DockerStreamError { error } => {
586                    return Err(anyhow::anyhow!(
587                        "Docker build failed: DockerStreamError: {{ error: {error} }}"
588                    ));
589                }
590                _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
591            },
592        }
593    }
594
595    Ok(())
596}
597
598impl DockerDeploy {
599    /// Create a new deployment
600    pub fn new(network: DockerNetwork) -> Self {
601        Self {
602            docker_processes: Vec::new(),
603            docker_clusters: Vec::new(),
604            network,
605            deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
606        }
607    }
608
609    /// Add an internal docker service to the deployment.
610    pub fn add_localhost_docker(
611        &mut self,
612        compilation_options: Option<String>,
613        config: Vec<String>,
614    ) -> DockerDeployProcessSpec {
615        let process = DockerDeployProcessSpec {
616            compilation_options,
617            config,
618            network: self.network.clone(),
619            deployment_instance: self.deployment_instance.clone(),
620        };
621
622        self.docker_processes.push(process.clone());
623
624        process
625    }
626
627    /// Add an internal docker cluster to the deployment.
628    pub fn add_localhost_docker_cluster(
629        &mut self,
630        compilation_options: Option<String>,
631        config: Vec<String>,
632        count: usize,
633    ) -> DockerDeployClusterSpec {
634        let cluster = DockerDeployClusterSpec {
635            compilation_options,
636            config,
637            count,
638            deployment_instance: self.deployment_instance.clone(),
639        };
640
641        self.docker_clusters.push(cluster.clone());
642
643        cluster
644    }
645
646    /// Add an external process to the deployment.
647    pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
648        DockerDeployExternalSpec { name }
649    }
650
651    /// Get the deployment instance from this deployment.
652    pub fn get_deployment_instance(&self) -> String {
653        self.deployment_instance.clone()
654    }
655
656    /// Create docker images.
657    #[instrument(level = "trace", skip_all)]
658    pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
659        for (_, _, process) in nodes.get_all_processes() {
660            let exposed_ports = process.exposed_ports.borrow().clone();
661
662            build_and_create_image(
663                &process.rust_crate,
664                &process.compilation_options,
665                &process.config,
666                &exposed_ports,
667                &process.name,
668            )
669            .await?;
670        }
671
672        for (_, _, cluster) in nodes.get_all_clusters() {
673            build_and_create_image(
674                &cluster.rust_crate,
675                &cluster.compilation_options,
676                &cluster.config,
677                &[], // clusters don't have exposed ports.
678                &cluster.name,
679            )
680            .await?;
681        }
682
683        Ok(())
684    }
685
686    /// Start the deployment, tell docker to create containers from the existing provisioned images.
687    #[instrument(level = "trace", skip_all)]
688    pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
689        let docker = Docker::connect_with_local_defaults()?;
690
691        match docker
692            .create_network(NetworkCreateRequest {
693                name: self.network.name.clone(),
694                driver: Some("bridge".to_string()),
695                ..Default::default()
696            })
697            .await
698        {
699            Ok(v) => v.id,
700            Err(e) => {
701                panic!("Failed to create docker network: {e:?}");
702            }
703        };
704
705        for (_, _, process) in nodes.get_all_processes() {
706            let docker_container_name: String = get_docker_container_name(&process.name, None);
707            *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
708
709            create_and_start_container(
710                &docker,
711                &docker_container_name,
712                &process.name,
713                &self.network.name,
714                &self.deployment_instance,
715            )
716            .await?;
717        }
718
719        for (_, _, cluster) in nodes.get_all_clusters() {
720            for num in 0..cluster.count {
721                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
722                cluster
723                    .docker_container_name
724                    .borrow_mut()
725                    .push(docker_container_name.clone());
726
727                create_and_start_container(
728                    &docker,
729                    &docker_container_name,
730                    &cluster.name,
731                    &self.network.name,
732                    &self.deployment_instance,
733                )
734                .await?;
735            }
736        }
737
738        Ok(())
739    }
740
741    /// Stop the deployment, destroy all containers
742    #[instrument(level = "trace", skip_all)]
743    pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
744        let docker = Docker::connect_with_local_defaults()?;
745
746        for (_, _, process) in nodes.get_all_processes() {
747            let docker_container_name: String = get_docker_container_name(&process.name, None);
748
749            docker
750                .kill_container(&docker_container_name, None::<KillContainerOptions>)
751                .await?;
752        }
753
754        for (_, _, cluster) in nodes.get_all_clusters() {
755            for num in 0..cluster.count {
756                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
757
758                docker
759                    .kill_container(&docker_container_name, None::<KillContainerOptions>)
760                    .await?;
761            }
762        }
763
764        Ok(())
765    }
766
767    /// remove containers, images, and networks.
768    #[instrument(level = "trace", skip_all)]
769    pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
770        let docker = Docker::connect_with_local_defaults()?;
771
772        for (_, _, process) in nodes.get_all_processes() {
773            let docker_container_name: String = get_docker_container_name(&process.name, None);
774
775            docker
776                .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
777                .await?;
778        }
779
780        for (_, _, cluster) in nodes.get_all_clusters() {
781            for num in 0..cluster.count {
782                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
783
784                docker
785                    .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
786                    .await?;
787            }
788        }
789
790        docker
791            .remove_network(&self.network.name)
792            .await
793            .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
794
795        use bollard::query_parameters::RemoveImageOptions;
796
797        for (_, _, process) in nodes.get_all_processes() {
798            docker
799                .remove_image(&process.name, None::<RemoveImageOptions>, None)
800                .await?;
801        }
802
803        for (_, _, cluster) in nodes.get_all_clusters() {
804            docker
805                .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
806                .await?;
807        }
808
809        Ok(())
810    }
811}
812
813impl<'a> Deploy<'a> for DockerDeploy {
814    type InstantiateEnv = Self;
815    type Process = DockerDeployProcess;
816    type Cluster = DockerDeployCluster;
817    type External = DockerDeployExternal;
818    type Port = u16;
819    type ExternalRawPort = ();
820    type Meta = ();
821    type GraphId = ();
822
823    #[instrument(level = "trace", skip_all, ret)]
824    fn allocate_process_port(process: &Self::Process) -> Self::Port {
825        process.next_port()
826    }
827
828    #[instrument(level = "trace", skip_all, ret)]
829    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
830        cluster.next_port()
831    }
832
833    #[instrument(level = "trace", skip_all, ret)]
834    fn allocate_external_port(external: &Self::External) -> Self::Port {
835        external.next_port()
836    }
837
838    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
839    fn o2o_sink_source(
840        p1: &Self::Process,
841        p1_port: &Self::Port,
842        p2: &Self::Process,
843        p2_port: &Self::Port,
844    ) -> (syn::Expr, syn::Expr) {
845        let bind_addr = format!("0.0.0.0:{}", p2_port);
846        let target = format!("{}:{p2_port}", p2.name);
847
848        deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
849    }
850
851    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
852    fn o2o_connect(
853        p1: &Self::Process,
854        p1_port: &Self::Port,
855        p2: &Self::Process,
856        p2_port: &Self::Port,
857    ) -> Box<dyn FnOnce()> {
858        let serialized = format!(
859            "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
860            p1.name, p2.name
861        );
862
863        Box::new(move || {
864            trace!(name: "o2o_connect thunk", %serialized);
865        })
866    }
867
868    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
869    fn o2m_sink_source(
870        p1: &Self::Process,
871        p1_port: &Self::Port,
872        c2: &Self::Cluster,
873        c2_port: &Self::Port,
874    ) -> (syn::Expr, syn::Expr) {
875        deploy_containerized_o2m(*c2_port)
876    }
877
878    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
879    fn o2m_connect(
880        p1: &Self::Process,
881        p1_port: &Self::Port,
882        c2: &Self::Cluster,
883        c2_port: &Self::Port,
884    ) -> Box<dyn FnOnce()> {
885        let serialized = format!(
886            "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
887            p1.name, c2.name
888        );
889
890        Box::new(move || {
891            trace!(name: "o2m_connect thunk", %serialized);
892        })
893    }
894
895    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
896    fn m2o_sink_source(
897        c1: &Self::Cluster,
898        c1_port: &Self::Port,
899        p2: &Self::Process,
900        p2_port: &Self::Port,
901    ) -> (syn::Expr, syn::Expr) {
902        deploy_containerized_m2o(*p2_port, &p2.name)
903    }
904
905    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
906    fn m2o_connect(
907        c1: &Self::Cluster,
908        c1_port: &Self::Port,
909        p2: &Self::Process,
910        p2_port: &Self::Port,
911    ) -> Box<dyn FnOnce()> {
912        let serialized = format!(
913            "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
914            c1.name, p2.name
915        );
916
917        Box::new(move || {
918            trace!(name: "m2o_connect thunk", %serialized);
919        })
920    }
921
922    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
923    fn m2m_sink_source(
924        c1: &Self::Cluster,
925        c1_port: &Self::Port,
926        c2: &Self::Cluster,
927        c2_port: &Self::Port,
928    ) -> (syn::Expr, syn::Expr) {
929        deploy_containerized_m2m(*c2_port)
930    }
931
932    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
933    fn m2m_connect(
934        c1: &Self::Cluster,
935        c1_port: &Self::Port,
936        c2: &Self::Cluster,
937        c2_port: &Self::Port,
938    ) -> Box<dyn FnOnce()> {
939        let serialized = format!(
940            "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
941            c1.name, c2.name
942        );
943
944        Box::new(move || {
945            trace!(name: "m2m_connect thunk", %serialized);
946        })
947    }
948
949    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
950    fn e2o_many_source(
951        extra_stmts: &mut Vec<syn::Stmt>,
952        p2: &Self::Process,
953        p2_port: &Self::Port,
954        _codec_type: &syn::Type,
955        shared_handle: String,
956    ) -> syn::Expr {
957        todo!()
958    }
959
960    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
961    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
962        todo!()
963    }
964
965    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
966    fn e2o_source(
967        extra_stmts: &mut Vec<syn::Stmt>,
968        p1: &Self::External,
969        p1_port: &Self::Port,
970        p2: &Self::Process,
971        p2_port: &Self::Port,
972        _codec_type: &syn::Type,
973        shared_handle: String,
974    ) -> syn::Expr {
975        p1.connection_info.borrow_mut().insert(
976            *p1_port,
977            (
978                p2.docker_container_name.clone(),
979                *p2_port,
980                p2.network.clone(),
981            ),
982        );
983
984        p2.exposed_ports.borrow_mut().push(*p2_port);
985
986        let socket_ident = syn::Ident::new(
987            &format!("__hydro_deploy_{}_socket", &shared_handle),
988            Span::call_site(),
989        );
990
991        let source_ident = syn::Ident::new(
992            &format!("__hydro_deploy_{}_source", &shared_handle),
993            Span::call_site(),
994        );
995
996        let sink_ident = syn::Ident::new(
997            &format!("__hydro_deploy_{}_sink", &shared_handle),
998            Span::call_site(),
999        );
1000
1001        let bind_addr = format!("0.0.0.0:{}", p2_port);
1002
1003        extra_stmts.push(syn::parse_quote! {
1004            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1005        });
1006
1007        let create_expr = deploy_containerized_external_sink_source_ident(socket_ident.clone());
1008
1009        extra_stmts.push(syn::parse_quote! {
1010            let (#sink_ident, #source_ident) = (#create_expr).split();
1011        });
1012
1013        parse_quote!(#source_ident)
1014    }
1015
1016    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1017    fn e2o_connect(
1018        p1: &Self::External,
1019        p1_port: &Self::Port,
1020        p2: &Self::Process,
1021        p2_port: &Self::Port,
1022        many: bool,
1023        server_hint: NetworkHint,
1024    ) -> Box<dyn FnOnce()> {
1025        let serialized = format!(
1026            "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
1027            p1.name, p2.name
1028        );
1029
1030        Box::new(move || {
1031            trace!(name: "e2o_connect thunk", %serialized);
1032        })
1033    }
1034
1035    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1036    fn o2e_sink(
1037        p1: &Self::Process,
1038        p1_port: &Self::Port,
1039        p2: &Self::External,
1040        p2_port: &Self::Port,
1041        shared_handle: String,
1042    ) -> syn::Expr {
1043        let sink_ident = syn::Ident::new(
1044            &format!("__hydro_deploy_{}_sink", &shared_handle),
1045            Span::call_site(),
1046        );
1047        parse_quote!(#sink_ident)
1048    }
1049
1050    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1051    fn cluster_ids(
1052        of_cluster: usize,
1053    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1054        cluster_ids()
1055    }
1056
1057    #[instrument(level = "trace", skip_all)]
1058    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1059        cluster_self_id()
1060    }
1061
1062    #[instrument(level = "trace", skip_all, fields(?location_id))]
1063    fn cluster_membership_stream(
1064        location_id: &LocationId,
1065    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1066    {
1067        cluster_membership_stream(location_id)
1068    }
1069}
1070
1071const CONTAINER_ALPHABET: [char; 36] = [
1072    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1073    'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1074];
1075
1076#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location, %deployment_instance))]
1077fn get_docker_image_name(name_hint: &str, location: usize, deployment_instance: &str) -> String {
1078    let name_hint = name_hint
1079        .split("::")
1080        .last()
1081        .unwrap()
1082        .to_string()
1083        .to_ascii_lowercase()
1084        .replace(".", "-")
1085        .replace("_", "-")
1086        .replace("::", "-");
1087
1088    let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1089
1090    format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location}")
1091}
1092
1093#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1094fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1095    if let Some(instance) = instance {
1096        format!("{image_name}-{instance}")
1097    } else {
1098        image_name.to_string()
1099    }
1100}
1101/// Represents a Process running in a docker container
1102#[derive(Clone)]
1103pub struct DockerDeployProcessSpec {
1104    compilation_options: Option<String>,
1105    config: Vec<String>,
1106    network: DockerNetwork,
1107    deployment_instance: String,
1108}
1109
1110impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1111    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1112    fn build(self, id: usize, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1113        DockerDeployProcess {
1114            id,
1115            name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1116
1117            next_port: Rc::new(RefCell::new(1000)),
1118            rust_crate: Rc::new(RefCell::new(None)),
1119
1120            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1121
1122            docker_container_name: Rc::new(RefCell::new(None)),
1123
1124            compilation_options: self.compilation_options,
1125            config: self.config,
1126
1127            network: self.network.clone(),
1128        }
1129    }
1130}
1131
1132/// Represents a Cluster running across `count` docker containers.
1133#[derive(Clone)]
1134pub struct DockerDeployClusterSpec {
1135    compilation_options: Option<String>,
1136    config: Vec<String>,
1137    count: usize,
1138    deployment_instance: String,
1139}
1140
1141impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1142    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1143    fn build(self, id: usize, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1144        DockerDeployCluster {
1145            id,
1146            name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1147
1148            next_port: Rc::new(RefCell::new(1000)),
1149            rust_crate: Rc::new(RefCell::new(None)),
1150
1151            docker_container_name: Rc::new(RefCell::new(Vec::new())),
1152
1153            compilation_options: self.compilation_options,
1154            config: self.config,
1155
1156            count: self.count,
1157        }
1158    }
1159}
1160
1161/// Represents an external process outside of the management of hydro deploy.
1162pub struct DockerDeployExternalSpec {
1163    name: String,
1164}
1165
1166impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1167    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1168    fn build(self, id: usize, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1169        DockerDeployExternal {
1170            name: self.name,
1171            next_port: Rc::new(RefCell::new(10000)),
1172            ports: Rc::new(RefCell::new(HashMap::new())),
1173            connection_info: Rc::new(RefCell::new(HashMap::new())),
1174        }
1175    }
1176}