Skip to main content

hydro_lang/deploy/
deploy_graph_containerized.rs

1//! Deployment backend for Hydro that uses Docker to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11    BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12    RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{LocationKey, MembershipEvent, NetworkHint};
41
42/// represents a docker network
43#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45    name: String,
46}
47
48impl DockerNetwork {
49    /// creates a new docker network (will actually be created when deployment.start() is called).
50    pub fn new(name: String) -> Self {
51        Self {
52            name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53        }
54    }
55}
56
57/// Represents a process running in a docker container
58#[derive(Clone)]
59pub struct DockerDeployProcess {
60    key: LocationKey,
61    name: String,
62    next_port: Rc<RefCell<u16>>,
63    rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65    exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67    docker_container_name: Rc<RefCell<Option<String>>>,
68
69    compilation_options: Option<String>,
70
71    config: Vec<String>,
72
73    network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77    type Port = u16;
78    type Meta = ();
79    type InstantiateEnv = DockerDeploy;
80
81    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
82    fn next_port(&self) -> Self::Port {
83        let port = {
84            let mut borrow = self.next_port.borrow_mut();
85            let port = *borrow;
86            *borrow += 1;
87            port
88        };
89
90        port
91    }
92
93    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
94    fn update_meta(&self, _meta: &Self::Meta) {}
95
96    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
97    fn instantiate(
98        &self,
99        _env: &mut Self::InstantiateEnv,
100        meta: &mut Self::Meta,
101        graph: DfirGraph,
102        extra_stmts: &[syn::Stmt],
103        sidecars: &[syn::Expr],
104    ) {
105        let (bin_name, config) = create_graph_trybuild(
106            graph,
107            extra_stmts,
108            sidecars,
109            Some(&self.name),
110            crate::compile::trybuild::generate::DeployMode::Containerized,
111            LinkingMode::Static,
112        );
113
114        let mut ret = RustCrate::new(config.project_dir)
115            .target_dir(config.target_dir)
116            .example(bin_name)
117            .no_default_features();
118
119        ret = ret.display_name("test_display_name");
120
121        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
122
123        if let Some(features) = config.features {
124            ret = ret.features(features);
125        }
126
127        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
128        ret = ret.config("build.incremental = false");
129
130        *self.rust_crate.borrow_mut() = Some(ret);
131    }
132}
133
134/// Represents a logical cluster, which can be a variable amount of individual containers.
135#[derive(Clone)]
136pub struct DockerDeployCluster {
137    key: LocationKey,
138    name: String,
139    next_port: Rc<RefCell<u16>>,
140    rust_crate: Rc<RefCell<Option<RustCrate>>>,
141
142    docker_container_name: Rc<RefCell<Vec<String>>>,
143
144    compilation_options: Option<String>,
145
146    config: Vec<String>,
147
148    count: usize,
149}
150
151impl Node for DockerDeployCluster {
152    type Port = u16;
153    type Meta = ();
154    type InstantiateEnv = DockerDeploy;
155
156    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
157    fn next_port(&self) -> Self::Port {
158        let port = {
159            let mut borrow = self.next_port.borrow_mut();
160            let port = *borrow;
161            *borrow += 1;
162            port
163        };
164
165        port
166    }
167
168    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
169    fn update_meta(&self, _meta: &Self::Meta) {}
170
171    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
172    fn instantiate(
173        &self,
174        _env: &mut Self::InstantiateEnv,
175        _meta: &mut Self::Meta,
176        graph: DfirGraph,
177        extra_stmts: &[syn::Stmt],
178        sidecars: &[syn::Expr],
179    ) {
180        let (bin_name, config) = create_graph_trybuild(
181            graph,
182            extra_stmts,
183            sidecars,
184            Some(&self.name),
185            crate::compile::trybuild::generate::DeployMode::Containerized,
186            LinkingMode::Static,
187        );
188
189        let mut ret = RustCrate::new(config.project_dir)
190            .target_dir(config.target_dir)
191            .example(bin_name)
192            .no_default_features();
193
194        ret = ret.display_name("test_display_name");
195
196        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
197
198        if let Some(features) = config.features {
199            ret = ret.features(features);
200        }
201
202        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
203        ret = ret.config("build.incremental = false");
204
205        *self.rust_crate.borrow_mut() = Some(ret);
206    }
207}
208
209/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
210#[derive(Clone, Debug)]
211pub struct DockerDeployExternal {
212    name: String,
213    next_port: Rc<RefCell<u16>>,
214
215    ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
216
217    #[expect(clippy::type_complexity, reason = "internal code")]
218    connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
219}
220
221impl Node for DockerDeployExternal {
222    type Port = u16;
223    type Meta = ();
224    type InstantiateEnv = DockerDeploy;
225
226    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
227    fn next_port(&self) -> Self::Port {
228        let port = {
229            let mut borrow = self.next_port.borrow_mut();
230            let port = *borrow;
231            *borrow += 1;
232            port
233        };
234
235        port
236    }
237
238    #[instrument(level = "trace", skip_all, fields(name = self.name))]
239    fn update_meta(&self, _meta: &Self::Meta) {}
240
241    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
242    fn instantiate(
243        &self,
244        _env: &mut Self::InstantiateEnv,
245        meta: &mut Self::Meta,
246        graph: DfirGraph,
247        extra_stmts: &[syn::Stmt],
248        sidecars: &[syn::Expr],
249    ) {
250        trace!(name: "surface", surface = graph.surface_syntax_string());
251    }
252}
253
254type DynSourceSink<Out, In, InErr> = (
255    Pin<Box<dyn Stream<Item = Out>>>,
256    Pin<Box<dyn Sink<In, Error = InErr>>>,
257);
258
259impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
260    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
261    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
262        self.ports.borrow_mut().insert(external_port_id, port);
263    }
264
265    fn as_bytes_bidi(
266        &self,
267        external_port_id: ExternalPortId,
268    ) -> impl Future<
269        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
270    > + 'a {
271        let guard =
272            tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
273
274        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
275        let (docker_container_name, remote_port, _) = self
276            .connection_info
277            .borrow()
278            .get(&local_port)
279            .unwrap()
280            .clone();
281
282        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
283
284        async move {
285            let local_port =
286                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
287            let remote_ip_address = "localhost";
288
289            trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
290
291            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
292                .await
293                .unwrap();
294
295            trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
296
297            let (rx, tx) = stream.into_split();
298
299            let source = Box::pin(
300                FramedRead::new(rx, LengthDelimitedCodec::new()),
301            ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
302
303            let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
304                as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
305
306            (source, sink)
307        }
308        .instrument(guard.exit())
309    }
310
311    fn as_bincode_bidi<InT, OutT>(
312        &self,
313        external_port_id: ExternalPortId,
314    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
315    where
316        InT: serde::Serialize + 'static,
317        OutT: serde::de::DeserializeOwned + 'static,
318    {
319        let guard =
320            tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
321
322        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
323        let (docker_container_name, remote_port, _) = self
324            .connection_info
325            .borrow()
326            .get(&local_port)
327            .unwrap()
328            .clone();
329
330        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
331
332        async move {
333            let local_port =
334                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
335            let remote_ip_address = "localhost";
336
337            trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
338
339            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
340                .await
341                .unwrap();
342
343            trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
344
345            let (rx, tx) = stream.into_split();
346
347            let source = Box::pin(
348                FramedRead::new(rx, LengthDelimitedCodec::new())
349                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
350            ) as Pin<Box<dyn Stream<Item = OutT>>>;
351
352            let sink = Box::pin(
353                FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
354                    Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
355                }),
356            ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
357
358            (source, sink)
359        }
360        .instrument(guard.exit())
361    }
362
363    fn as_bincode_sink<T>(
364        &self,
365        external_port_id: ExternalPortId,
366    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
367    where
368        T: serde::Serialize + 'static,
369    {
370        let guard =
371            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
372
373        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
374        let (docker_container_name, remote_port, _) = self
375            .connection_info
376            .borrow()
377            .get(&local_port)
378            .unwrap()
379            .clone();
380
381        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
382
383        async move {
384            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
385            let remote_ip_address = "localhost";
386
387            Box::pin(
388                LazySink::new(move || {
389                    Box::pin(async move {
390                        trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
391
392                        let stream =
393                            TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
394                                .await?;
395
396                        trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
397
398                        Result::<_, std::io::Error>::Ok(FramedWrite::new(
399                            stream,
400                            LengthDelimitedCodec::new(),
401                        ))
402                    })
403                })
404                .with(move |v| async move {
405                    Ok(Bytes::from(bincode::serialize(&v).unwrap()))
406                }),
407            ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
408        }
409        .instrument(guard.exit())
410    }
411
412    fn as_bincode_source<T>(
413        &self,
414        external_port_id: ExternalPortId,
415    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
416    where
417        T: serde::de::DeserializeOwned + 'static,
418    {
419        let guard =
420            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
421
422        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
423        let (docker_container_name, remote_port, _) = self
424            .connection_info
425            .borrow()
426            .get(&local_port)
427            .unwrap()
428            .clone();
429
430        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
431
432        async move {
433
434            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
435            let remote_ip_address = "localhost";
436
437            trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
438
439            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
440                .await
441                .unwrap();
442
443            trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
444
445            Box::pin(
446                FramedRead::new(stream, LengthDelimitedCodec::new())
447                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
448            ) as Pin<Box<dyn Stream<Item = T>>>
449        }
450        .instrument(guard.exit())
451    }
452}
453
454#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
455async fn find_dynamically_allocated_docker_port(
456    docker_container_name: &str,
457    destination_port: u16,
458) -> u16 {
459    let docker = Docker::connect_with_local_defaults().unwrap();
460
461    let container_info = docker
462        .inspect_container(docker_container_name, None::<InspectContainerOptions>)
463        .await
464        .unwrap();
465
466    trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
467
468    // container_info={"1001/tcp": Some([PortBinding { host_ip: Some("0.0.0.0"), host_port: Some("32771") }, PortBinding { host_ip: Some("::"), host_port: Some("32771") }])} destination_port=1001
469    let remote_port = container_info
470        .network_settings
471        .as_ref()
472        .unwrap()
473        .ports
474        .as_ref()
475        .unwrap()
476        .get(&format!("{destination_port}/tcp"))
477        .unwrap()
478        .as_ref()
479        .unwrap()
480        .iter()
481        .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
482        .unwrap()
483        .host_port
484        .as_ref()
485        .unwrap()
486        .parse()
487        .unwrap();
488
489    remote_port
490}
491
492/// For deploying to a local docker instance
493pub struct DockerDeploy {
494    docker_processes: Vec<DockerDeployProcessSpec>,
495    docker_clusters: Vec<DockerDeployClusterSpec>,
496    network: DockerNetwork,
497    deployment_instance: String,
498}
499
500#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
501async fn create_and_start_container(
502    docker: &Docker,
503    container_name: &str,
504    image_name: &str,
505    network_name: &str,
506    deployment_instance: &str,
507) -> Result<(), anyhow::Error> {
508    let config = ContainerCreateBody {
509        image: Some(image_name.to_owned()),
510        hostname: Some(container_name.to_owned()),
511        host_config: Some(HostConfig {
512            binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
513            publish_all_ports: Some(true),
514            port_bindings: Some(HashMap::new()), /* Due to a bug in docker, if you don't send empty port bindings with publish_all_ports set to true and with a docker image that has EXPOSE directives in it, docker will crash because it will try to write to a map in memory that it has not initialized yet. Setting port_bindings explicitly to an empty map will initialize it first so that it does not break. */
515            ..Default::default()
516        }),
517        env: Some(vec![
518            format!("CONTAINER_NAME={container_name}"),
519            format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
520            format!("RUST_LOG=trace"),
521        ]),
522        networking_config: Some(NetworkingConfig {
523            endpoints_config: Some(HashMap::from([(
524                network_name.to_owned(),
525                EndpointSettings {
526                    ..Default::default()
527                },
528            )])),
529        }),
530        tty: Some(true),
531        ..Default::default()
532    };
533
534    let options = CreateContainerOptions {
535        name: Some(container_name.to_owned()),
536        ..Default::default()
537    };
538
539    tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
540    docker.create_container(Some(options), config).await?;
541    docker
542        .start_container(container_name, None::<StartContainerOptions>)
543        .await?;
544
545    Ok(())
546}
547
548#[instrument(level = "trace", skip_all, fields(%image_name))]
549async fn build_and_create_image(
550    rust_crate: &Rc<RefCell<Option<RustCrate>>>,
551    compilation_options: Option<&str>,
552    config: &[String],
553    exposed_ports: &[u16],
554    image_name: &str,
555) -> Result<(), anyhow::Error> {
556    let mut rust_crate = rust_crate
557        .borrow_mut()
558        .take()
559        .unwrap()
560        .rustflags(compilation_options.unwrap_or_default());
561
562    for cfg in config {
563        rust_crate = rust_crate.config(cfg);
564    }
565
566    let build_output = match build_crate_memoized(
567        rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
568    )
569    .await
570    {
571        Ok(build_output) => build_output,
572        Err(BuildError::FailedToBuildCrate {
573            exit_status,
574            diagnostics,
575            text_lines,
576            stderr_lines,
577        }) => {
578            let diagnostics = diagnostics
579                .into_iter()
580                .map(|d| d.rendered.unwrap())
581                .collect::<Vec<_>>()
582                .join("\n");
583            let text_lines = text_lines.join("\n");
584            let stderr_lines = stderr_lines.join("\n");
585
586            anyhow::bail!(
587                r#"
588Failed to build crate {exit_status:?}
589--- diagnostics
590---
591{diagnostics}
592---
593---
594---
595
596--- text_lines
597---
598---
599{text_lines}
600---
601---
602---
603
604--- stderr_lines
605---
606---
607{stderr_lines}
608---
609---
610---"#
611            );
612        }
613        Err(err) => {
614            anyhow::bail!("Failed to build crate {err:?}");
615        }
616    };
617
618    let docker = Docker::connect_with_local_defaults()?;
619
620    let mut tar_data = Vec::new();
621    {
622        let mut tar = Builder::new(&mut tar_data);
623
624        let exposed_ports = exposed_ports
625            .iter()
626            .map(|port| format!("EXPOSE {port}/tcp"))
627            .collect::<Vec<_>>()
628            .join("\n");
629
630        let dockerfile_content = format!(
631            r#"
632                FROM scratch
633                {exposed_ports}
634                COPY app /app
635                CMD ["/app"]
636            "#,
637        );
638
639        trace!(name: "dockerfile", %dockerfile_content);
640
641        let mut header = Header::new_gnu();
642        header.set_path("Dockerfile")?;
643        header.set_size(dockerfile_content.len() as u64);
644        header.set_cksum();
645        tar.append(&header, dockerfile_content.as_bytes())?;
646
647        let mut header = Header::new_gnu();
648        header.set_path("app")?;
649        header.set_size(build_output.bin_data.len() as u64);
650        header.set_mode(0o755);
651        header.set_cksum();
652        tar.append(&header, &build_output.bin_data[..])?;
653
654        tar.finish()?;
655    }
656
657    let build_options = BuildImageOptions {
658        dockerfile: "Dockerfile".to_owned(),
659        t: Some(image_name.to_owned()),
660        rm: true,
661        ..Default::default()
662    };
663
664    use bollard::errors::Error;
665
666    let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
667    let mut build_stream = docker.build_image(build_options, None, Some(body));
668    while let Some(msg) = build_stream.next().await {
669        match msg {
670            Ok(_) => {}
671            Err(e) => match e {
672                Error::DockerStreamError { error } => {
673                    return Err(anyhow::anyhow!(
674                        "Docker build failed: DockerStreamError: {{ error: {error} }}"
675                    ));
676                }
677                _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
678            },
679        }
680    }
681
682    Ok(())
683}
684
685impl DockerDeploy {
686    /// Create a new deployment
687    pub fn new(network: DockerNetwork) -> Self {
688        Self {
689            docker_processes: Vec::new(),
690            docker_clusters: Vec::new(),
691            network,
692            deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
693        }
694    }
695
696    /// Add an internal docker service to the deployment.
697    pub fn add_localhost_docker(
698        &mut self,
699        compilation_options: Option<String>,
700        config: Vec<String>,
701    ) -> DockerDeployProcessSpec {
702        let process = DockerDeployProcessSpec {
703            compilation_options,
704            config,
705            network: self.network.clone(),
706            deployment_instance: self.deployment_instance.clone(),
707        };
708
709        self.docker_processes.push(process.clone());
710
711        process
712    }
713
714    /// Add an internal docker cluster to the deployment.
715    pub fn add_localhost_docker_cluster(
716        &mut self,
717        compilation_options: Option<String>,
718        config: Vec<String>,
719        count: usize,
720    ) -> DockerDeployClusterSpec {
721        let cluster = DockerDeployClusterSpec {
722            compilation_options,
723            config,
724            count,
725            deployment_instance: self.deployment_instance.clone(),
726        };
727
728        self.docker_clusters.push(cluster.clone());
729
730        cluster
731    }
732
733    /// Add an external process to the deployment.
734    pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
735        DockerDeployExternalSpec { name }
736    }
737
738    /// Get the deployment instance from this deployment.
739    pub fn get_deployment_instance(&self) -> String {
740        self.deployment_instance.clone()
741    }
742
743    /// Create docker images.
744    #[instrument(level = "trace", skip_all)]
745    pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
746        for (_, _, process) in nodes.get_all_processes() {
747            let exposed_ports = process.exposed_ports.borrow().clone();
748
749            build_and_create_image(
750                &process.rust_crate,
751                process.compilation_options.as_deref(),
752                &process.config,
753                &exposed_ports,
754                &process.name,
755            )
756            .await?;
757        }
758
759        for (_, _, cluster) in nodes.get_all_clusters() {
760            build_and_create_image(
761                &cluster.rust_crate,
762                cluster.compilation_options.as_deref(),
763                &cluster.config,
764                &[], // clusters don't have exposed ports.
765                &cluster.name,
766            )
767            .await?;
768        }
769
770        Ok(())
771    }
772
773    /// Start the deployment, tell docker to create containers from the existing provisioned images.
774    #[instrument(level = "trace", skip_all)]
775    pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
776        let docker = Docker::connect_with_local_defaults()?;
777
778        match docker
779            .create_network(NetworkCreateRequest {
780                name: self.network.name.clone(),
781                driver: Some("bridge".to_owned()),
782                ..Default::default()
783            })
784            .await
785        {
786            Ok(v) => v.id,
787            Err(e) => {
788                panic!("Failed to create docker network: {e:?}");
789            }
790        };
791
792        for (_, _, process) in nodes.get_all_processes() {
793            let docker_container_name: String = get_docker_container_name(&process.name, None);
794            *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
795
796            create_and_start_container(
797                &docker,
798                &docker_container_name,
799                &process.name,
800                &self.network.name,
801                &self.deployment_instance,
802            )
803            .await?;
804        }
805
806        for (_, _, cluster) in nodes.get_all_clusters() {
807            for num in 0..cluster.count {
808                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
809                cluster
810                    .docker_container_name
811                    .borrow_mut()
812                    .push(docker_container_name.clone());
813
814                create_and_start_container(
815                    &docker,
816                    &docker_container_name,
817                    &cluster.name,
818                    &self.network.name,
819                    &self.deployment_instance,
820                )
821                .await?;
822            }
823        }
824
825        Ok(())
826    }
827
828    /// Stop the deployment, destroy all containers
829    #[instrument(level = "trace", skip_all)]
830    pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
831        let docker = Docker::connect_with_local_defaults()?;
832
833        for (_, _, process) in nodes.get_all_processes() {
834            let docker_container_name: String = get_docker_container_name(&process.name, None);
835
836            docker
837                .kill_container(&docker_container_name, None::<KillContainerOptions>)
838                .await?;
839        }
840
841        for (_, _, cluster) in nodes.get_all_clusters() {
842            for num in 0..cluster.count {
843                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
844
845                docker
846                    .kill_container(&docker_container_name, None::<KillContainerOptions>)
847                    .await?;
848            }
849        }
850
851        Ok(())
852    }
853
854    /// remove containers, images, and networks.
855    #[instrument(level = "trace", skip_all)]
856    pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
857        let docker = Docker::connect_with_local_defaults()?;
858
859        for (_, _, process) in nodes.get_all_processes() {
860            let docker_container_name: String = get_docker_container_name(&process.name, None);
861
862            docker
863                .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
864                .await?;
865        }
866
867        for (_, _, cluster) in nodes.get_all_clusters() {
868            for num in 0..cluster.count {
869                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
870
871                docker
872                    .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
873                    .await?;
874            }
875        }
876
877        docker
878            .remove_network(&self.network.name)
879            .await
880            .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
881
882        use bollard::query_parameters::RemoveImageOptions;
883
884        for (_, _, process) in nodes.get_all_processes() {
885            docker
886                .remove_image(&process.name, None::<RemoveImageOptions>, None)
887                .await?;
888        }
889
890        for (_, _, cluster) in nodes.get_all_clusters() {
891            docker
892                .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
893                .await?;
894        }
895
896        Ok(())
897    }
898}
899
900impl<'a> Deploy<'a> for DockerDeploy {
901    type Meta = ();
902    type InstantiateEnv = Self;
903
904    type Process = DockerDeployProcess;
905    type Cluster = DockerDeployCluster;
906    type External = DockerDeployExternal;
907
908    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
909    fn o2o_sink_source(
910        p1: &Self::Process,
911        p1_port: &<Self::Process as Node>::Port,
912        p2: &Self::Process,
913        p2_port: &<Self::Process as Node>::Port,
914    ) -> (syn::Expr, syn::Expr) {
915        let bind_addr = format!("0.0.0.0:{}", p2_port);
916        let target = format!("{}:{p2_port}", p2.name);
917
918        deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
919    }
920
921    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
922    fn o2o_connect(
923        p1: &Self::Process,
924        p1_port: &<Self::Process as Node>::Port,
925        p2: &Self::Process,
926        p2_port: &<Self::Process as Node>::Port,
927    ) -> Box<dyn FnOnce()> {
928        let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
929
930        Box::new(move || {
931            trace!(name: "o2o_connect thunk", %serialized);
932        })
933    }
934
935    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
936    fn o2m_sink_source(
937        p1: &Self::Process,
938        p1_port: &<Self::Process as Node>::Port,
939        c2: &Self::Cluster,
940        c2_port: &<Self::Cluster as Node>::Port,
941    ) -> (syn::Expr, syn::Expr) {
942        deploy_containerized_o2m(*c2_port)
943    }
944
945    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
946    fn o2m_connect(
947        p1: &Self::Process,
948        p1_port: &<Self::Process as Node>::Port,
949        c2: &Self::Cluster,
950        c2_port: &<Self::Cluster as Node>::Port,
951    ) -> Box<dyn FnOnce()> {
952        let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
953
954        Box::new(move || {
955            trace!(name: "o2m_connect thunk", %serialized);
956        })
957    }
958
959    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
960    fn m2o_sink_source(
961        c1: &Self::Cluster,
962        c1_port: &<Self::Cluster as Node>::Port,
963        p2: &Self::Process,
964        p2_port: &<Self::Process as Node>::Port,
965    ) -> (syn::Expr, syn::Expr) {
966        deploy_containerized_m2o(*p2_port, &p2.name)
967    }
968
969    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
970    fn m2o_connect(
971        c1: &Self::Cluster,
972        c1_port: &<Self::Cluster as Node>::Port,
973        p2: &Self::Process,
974        p2_port: &<Self::Process as Node>::Port,
975    ) -> Box<dyn FnOnce()> {
976        let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
977
978        Box::new(move || {
979            trace!(name: "m2o_connect thunk", %serialized);
980        })
981    }
982
983    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
984    fn m2m_sink_source(
985        c1: &Self::Cluster,
986        c1_port: &<Self::Cluster as Node>::Port,
987        c2: &Self::Cluster,
988        c2_port: &<Self::Cluster as Node>::Port,
989    ) -> (syn::Expr, syn::Expr) {
990        deploy_containerized_m2m(*c2_port)
991    }
992
993    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
994    fn m2m_connect(
995        c1: &Self::Cluster,
996        c1_port: &<Self::Cluster as Node>::Port,
997        c2: &Self::Cluster,
998        c2_port: &<Self::Cluster as Node>::Port,
999    ) -> Box<dyn FnOnce()> {
1000        let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1001
1002        Box::new(move || {
1003            trace!(name: "m2m_connect thunk", %serialized);
1004        })
1005    }
1006
1007    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1008    fn e2o_many_source(
1009        extra_stmts: &mut Vec<syn::Stmt>,
1010        p2: &Self::Process,
1011        p2_port: &<Self::Process as Node>::Port,
1012        codec_type: &syn::Type,
1013        shared_handle: String,
1014    ) -> syn::Expr {
1015        p2.exposed_ports.borrow_mut().push(*p2_port);
1016
1017        let socket_ident = syn::Ident::new(
1018            &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1019            Span::call_site(),
1020        );
1021
1022        let source_ident = syn::Ident::new(
1023            &format!("__hydro_deploy_many_{}_source", &shared_handle),
1024            Span::call_site(),
1025        );
1026
1027        let sink_ident = syn::Ident::new(
1028            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1029            Span::call_site(),
1030        );
1031
1032        let membership_ident = syn::Ident::new(
1033            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1034            Span::call_site(),
1035        );
1036
1037        let bind_addr = format!("0.0.0.0:{}", p2_port);
1038
1039        extra_stmts.push(syn::parse_quote! {
1040            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1041        });
1042
1043        let root = crate::staging_util::get_this_crate();
1044
1045        extra_stmts.push(syn::parse_quote! {
1046            let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1047        });
1048
1049        parse_quote!(#source_ident)
1050    }
1051
1052    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1053    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1054        let sink_ident = syn::Ident::new(
1055            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1056            Span::call_site(),
1057        );
1058        parse_quote!(#sink_ident)
1059    }
1060
1061    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1062    fn e2o_source(
1063        extra_stmts: &mut Vec<syn::Stmt>,
1064        p1: &Self::External,
1065        p1_port: &<Self::External as Node>::Port,
1066        p2: &Self::Process,
1067        p2_port: &<Self::Process as Node>::Port,
1068        _codec_type: &syn::Type,
1069        shared_handle: String,
1070    ) -> syn::Expr {
1071        p1.connection_info.borrow_mut().insert(
1072            *p1_port,
1073            (
1074                p2.docker_container_name.clone(),
1075                *p2_port,
1076                p2.network.clone(),
1077            ),
1078        );
1079
1080        p2.exposed_ports.borrow_mut().push(*p2_port);
1081
1082        let socket_ident = syn::Ident::new(
1083            &format!("__hydro_deploy_{}_socket", &shared_handle),
1084            Span::call_site(),
1085        );
1086
1087        let source_ident = syn::Ident::new(
1088            &format!("__hydro_deploy_{}_source", &shared_handle),
1089            Span::call_site(),
1090        );
1091
1092        let sink_ident = syn::Ident::new(
1093            &format!("__hydro_deploy_{}_sink", &shared_handle),
1094            Span::call_site(),
1095        );
1096
1097        let bind_addr = format!("0.0.0.0:{}", p2_port);
1098
1099        extra_stmts.push(syn::parse_quote! {
1100            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1101        });
1102
1103        let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1104
1105        extra_stmts.push(syn::parse_quote! {
1106            let (#sink_ident, #source_ident) = (#create_expr).split();
1107        });
1108
1109        parse_quote!(#source_ident)
1110    }
1111
1112    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1113    fn e2o_connect(
1114        p1: &Self::External,
1115        p1_port: &<Self::External as Node>::Port,
1116        p2: &Self::Process,
1117        p2_port: &<Self::Process as Node>::Port,
1118        many: bool,
1119        server_hint: NetworkHint,
1120    ) -> Box<dyn FnOnce()> {
1121        if server_hint != NetworkHint::Auto {
1122            panic!(
1123                "Docker deployment only supports NetworkHint::Auto, got {:?}",
1124                server_hint
1125            );
1126        }
1127
1128        // For many connections, we need to populate connection_info so as_bincode_bidi can find it
1129        if many {
1130            p1.connection_info.borrow_mut().insert(
1131                *p1_port,
1132                (
1133                    p2.docker_container_name.clone(),
1134                    *p2_port,
1135                    p2.network.clone(),
1136                ),
1137            );
1138        }
1139
1140        let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1141
1142        Box::new(move || {
1143            trace!(name: "e2o_connect thunk", %serialized);
1144        })
1145    }
1146
1147    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1148    fn o2e_sink(
1149        p1: &Self::Process,
1150        p1_port: &<Self::Process as Node>::Port,
1151        p2: &Self::External,
1152        p2_port: &<Self::External as Node>::Port,
1153        shared_handle: String,
1154    ) -> syn::Expr {
1155        let sink_ident = syn::Ident::new(
1156            &format!("__hydro_deploy_{}_sink", &shared_handle),
1157            Span::call_site(),
1158        );
1159        parse_quote!(#sink_ident)
1160    }
1161
1162    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1163    fn cluster_ids(
1164        of_cluster: LocationKey,
1165    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1166        cluster_ids()
1167    }
1168
1169    #[instrument(level = "trace", skip_all)]
1170    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1171        cluster_self_id()
1172    }
1173
1174    #[instrument(level = "trace", skip_all, fields(?location_id))]
1175    fn cluster_membership_stream(
1176        location_id: &LocationId,
1177    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1178    {
1179        cluster_membership_stream(location_id)
1180    }
1181}
1182
1183const CONTAINER_ALPHABET: [char; 36] = [
1184    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1185    'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1186];
1187
1188#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1189fn get_docker_image_name(
1190    name_hint: &str,
1191    location_key: LocationKey,
1192    deployment_instance: &str,
1193) -> String {
1194    let name_hint = name_hint
1195        .split("::")
1196        .last()
1197        .unwrap()
1198        .to_ascii_lowercase()
1199        .replace(".", "-")
1200        .replace("_", "-")
1201        .replace("::", "-");
1202
1203    let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1204
1205    format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location_key}")
1206}
1207
1208#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1209fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1210    if let Some(instance) = instance {
1211        format!("{image_name}-{instance}")
1212    } else {
1213        image_name.to_owned()
1214    }
1215}
1216/// Represents a Process running in a docker container
1217#[derive(Clone)]
1218pub struct DockerDeployProcessSpec {
1219    compilation_options: Option<String>,
1220    config: Vec<String>,
1221    network: DockerNetwork,
1222    deployment_instance: String,
1223}
1224
1225impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1226    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1227    fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1228        DockerDeployProcess {
1229            key,
1230            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1231
1232            next_port: Rc::new(RefCell::new(1000)),
1233            rust_crate: Rc::new(RefCell::new(None)),
1234
1235            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1236
1237            docker_container_name: Rc::new(RefCell::new(None)),
1238
1239            compilation_options: self.compilation_options,
1240            config: self.config,
1241
1242            network: self.network.clone(),
1243        }
1244    }
1245}
1246
1247/// Represents a Cluster running across `count` docker containers.
1248#[derive(Clone)]
1249pub struct DockerDeployClusterSpec {
1250    compilation_options: Option<String>,
1251    config: Vec<String>,
1252    count: usize,
1253    deployment_instance: String,
1254}
1255
1256impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1257    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1258    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1259        DockerDeployCluster {
1260            key,
1261            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1262
1263            next_port: Rc::new(RefCell::new(1000)),
1264            rust_crate: Rc::new(RefCell::new(None)),
1265
1266            docker_container_name: Rc::new(RefCell::new(Vec::new())),
1267
1268            compilation_options: self.compilation_options,
1269            config: self.config,
1270
1271            count: self.count,
1272        }
1273    }
1274}
1275
1276/// Represents an external process outside of the management of hydro deploy.
1277pub struct DockerDeployExternalSpec {
1278    name: String,
1279}
1280
1281impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1282    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1283    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1284        DockerDeployExternal {
1285            name: self.name,
1286            next_port: Rc::new(RefCell::new(10000)),
1287            ports: Rc::new(RefCell::new(HashMap::new())),
1288            connection_info: Rc::new(RefCell::new(HashMap::new())),
1289        }
1290    }
1291}