1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11 BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12 RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{LocationKey, MembershipEvent, NetworkHint};
41
42#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45 name: String,
46}
47
48impl DockerNetwork {
49 pub fn new(name: String) -> Self {
51 Self {
52 name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53 }
54 }
55}
56
57#[derive(Clone)]
59pub struct DockerDeployProcess {
60 key: LocationKey,
61 name: String,
62 next_port: Rc<RefCell<u16>>,
63 rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65 exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67 docker_container_name: Rc<RefCell<Option<String>>>,
68
69 compilation_options: Option<String>,
70
71 config: Vec<String>,
72
73 network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77 type Port = u16;
78 type Meta = ();
79 type InstantiateEnv = DockerDeploy;
80
81 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
82 fn next_port(&self) -> Self::Port {
83 let port = {
84 let mut borrow = self.next_port.borrow_mut();
85 let port = *borrow;
86 *borrow += 1;
87 port
88 };
89
90 port
91 }
92
93 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
94 fn update_meta(&self, _meta: &Self::Meta) {}
95
96 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
97 fn instantiate(
98 &self,
99 _env: &mut Self::InstantiateEnv,
100 meta: &mut Self::Meta,
101 graph: DfirGraph,
102 extra_stmts: &[syn::Stmt],
103 sidecars: &[syn::Expr],
104 ) {
105 let (bin_name, config) = create_graph_trybuild(
106 graph,
107 extra_stmts,
108 sidecars,
109 Some(&self.name),
110 crate::compile::trybuild::generate::DeployMode::Containerized,
111 LinkingMode::Static,
112 );
113
114 let mut ret = RustCrate::new(config.project_dir)
115 .target_dir(config.target_dir)
116 .example(bin_name)
117 .no_default_features();
118
119 ret = ret.display_name("test_display_name");
120
121 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
122
123 if let Some(features) = config.features {
124 ret = ret.features(features);
125 }
126
127 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
128 ret = ret.config("build.incremental = false");
129
130 *self.rust_crate.borrow_mut() = Some(ret);
131 }
132}
133
134#[derive(Clone)]
136pub struct DockerDeployCluster {
137 key: LocationKey,
138 name: String,
139 next_port: Rc<RefCell<u16>>,
140 rust_crate: Rc<RefCell<Option<RustCrate>>>,
141
142 docker_container_name: Rc<RefCell<Vec<String>>>,
143
144 compilation_options: Option<String>,
145
146 config: Vec<String>,
147
148 count: usize,
149}
150
151impl Node for DockerDeployCluster {
152 type Port = u16;
153 type Meta = ();
154 type InstantiateEnv = DockerDeploy;
155
156 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
157 fn next_port(&self) -> Self::Port {
158 let port = {
159 let mut borrow = self.next_port.borrow_mut();
160 let port = *borrow;
161 *borrow += 1;
162 port
163 };
164
165 port
166 }
167
168 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
169 fn update_meta(&self, _meta: &Self::Meta) {}
170
171 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
172 fn instantiate(
173 &self,
174 _env: &mut Self::InstantiateEnv,
175 _meta: &mut Self::Meta,
176 graph: DfirGraph,
177 extra_stmts: &[syn::Stmt],
178 sidecars: &[syn::Expr],
179 ) {
180 let (bin_name, config) = create_graph_trybuild(
181 graph,
182 extra_stmts,
183 sidecars,
184 Some(&self.name),
185 crate::compile::trybuild::generate::DeployMode::Containerized,
186 LinkingMode::Static,
187 );
188
189 let mut ret = RustCrate::new(config.project_dir)
190 .target_dir(config.target_dir)
191 .example(bin_name)
192 .no_default_features();
193
194 ret = ret.display_name("test_display_name");
195
196 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
197
198 if let Some(features) = config.features {
199 ret = ret.features(features);
200 }
201
202 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
203 ret = ret.config("build.incremental = false");
204
205 *self.rust_crate.borrow_mut() = Some(ret);
206 }
207}
208
209#[derive(Clone, Debug)]
211pub struct DockerDeployExternal {
212 name: String,
213 next_port: Rc<RefCell<u16>>,
214
215 ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
216
217 #[expect(clippy::type_complexity, reason = "internal code")]
218 connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
219}
220
221impl Node for DockerDeployExternal {
222 type Port = u16;
223 type Meta = ();
224 type InstantiateEnv = DockerDeploy;
225
226 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
227 fn next_port(&self) -> Self::Port {
228 let port = {
229 let mut borrow = self.next_port.borrow_mut();
230 let port = *borrow;
231 *borrow += 1;
232 port
233 };
234
235 port
236 }
237
238 #[instrument(level = "trace", skip_all, fields(name = self.name))]
239 fn update_meta(&self, _meta: &Self::Meta) {}
240
241 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
242 fn instantiate(
243 &self,
244 _env: &mut Self::InstantiateEnv,
245 meta: &mut Self::Meta,
246 graph: DfirGraph,
247 extra_stmts: &[syn::Stmt],
248 sidecars: &[syn::Expr],
249 ) {
250 trace!(name: "surface", surface = graph.surface_syntax_string());
251 }
252}
253
254type DynSourceSink<Out, In, InErr> = (
255 Pin<Box<dyn Stream<Item = Out>>>,
256 Pin<Box<dyn Sink<In, Error = InErr>>>,
257);
258
259impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
260 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
261 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
262 self.ports.borrow_mut().insert(external_port_id, port);
263 }
264
265 fn as_bytes_bidi(
266 &self,
267 external_port_id: ExternalPortId,
268 ) -> impl Future<
269 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
270 > + 'a {
271 let guard =
272 tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
273
274 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
275 let (docker_container_name, remote_port, _) = self
276 .connection_info
277 .borrow()
278 .get(&local_port)
279 .unwrap()
280 .clone();
281
282 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
283
284 async move {
285 let local_port =
286 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
287 let remote_ip_address = "localhost";
288
289 trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
290
291 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
292 .await
293 .unwrap();
294
295 trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
296
297 let (rx, tx) = stream.into_split();
298
299 let source = Box::pin(
300 FramedRead::new(rx, LengthDelimitedCodec::new()),
301 ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
302
303 let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
304 as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
305
306 (source, sink)
307 }
308 .instrument(guard.exit())
309 }
310
311 fn as_bincode_bidi<InT, OutT>(
312 &self,
313 external_port_id: ExternalPortId,
314 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
315 where
316 InT: serde::Serialize + 'static,
317 OutT: serde::de::DeserializeOwned + 'static,
318 {
319 let guard =
320 tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
321
322 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
323 let (docker_container_name, remote_port, _) = self
324 .connection_info
325 .borrow()
326 .get(&local_port)
327 .unwrap()
328 .clone();
329
330 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
331
332 async move {
333 let local_port =
334 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
335 let remote_ip_address = "localhost";
336
337 trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
338
339 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
340 .await
341 .unwrap();
342
343 trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
344
345 let (rx, tx) = stream.into_split();
346
347 let source = Box::pin(
348 FramedRead::new(rx, LengthDelimitedCodec::new())
349 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
350 ) as Pin<Box<dyn Stream<Item = OutT>>>;
351
352 let sink = Box::pin(
353 FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
354 Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
355 }),
356 ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
357
358 (source, sink)
359 }
360 .instrument(guard.exit())
361 }
362
363 fn as_bincode_sink<T>(
364 &self,
365 external_port_id: ExternalPortId,
366 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
367 where
368 T: serde::Serialize + 'static,
369 {
370 let guard =
371 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
372
373 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
374 let (docker_container_name, remote_port, _) = self
375 .connection_info
376 .borrow()
377 .get(&local_port)
378 .unwrap()
379 .clone();
380
381 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
382
383 async move {
384 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
385 let remote_ip_address = "localhost";
386
387 Box::pin(
388 LazySink::new(move || {
389 Box::pin(async move {
390 trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
391
392 let stream =
393 TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
394 .await?;
395
396 trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
397
398 Result::<_, std::io::Error>::Ok(FramedWrite::new(
399 stream,
400 LengthDelimitedCodec::new(),
401 ))
402 })
403 })
404 .with(move |v| async move {
405 Ok(Bytes::from(bincode::serialize(&v).unwrap()))
406 }),
407 ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
408 }
409 .instrument(guard.exit())
410 }
411
412 fn as_bincode_source<T>(
413 &self,
414 external_port_id: ExternalPortId,
415 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
416 where
417 T: serde::de::DeserializeOwned + 'static,
418 {
419 let guard =
420 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
421
422 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
423 let (docker_container_name, remote_port, _) = self
424 .connection_info
425 .borrow()
426 .get(&local_port)
427 .unwrap()
428 .clone();
429
430 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
431
432 async move {
433
434 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
435 let remote_ip_address = "localhost";
436
437 trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
438
439 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
440 .await
441 .unwrap();
442
443 trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
444
445 Box::pin(
446 FramedRead::new(stream, LengthDelimitedCodec::new())
447 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
448 ) as Pin<Box<dyn Stream<Item = T>>>
449 }
450 .instrument(guard.exit())
451 }
452}
453
454#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
455async fn find_dynamically_allocated_docker_port(
456 docker_container_name: &str,
457 destination_port: u16,
458) -> u16 {
459 let docker = Docker::connect_with_local_defaults().unwrap();
460
461 let container_info = docker
462 .inspect_container(docker_container_name, None::<InspectContainerOptions>)
463 .await
464 .unwrap();
465
466 trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
467
468 let remote_port = container_info
470 .network_settings
471 .as_ref()
472 .unwrap()
473 .ports
474 .as_ref()
475 .unwrap()
476 .get(&format!("{destination_port}/tcp"))
477 .unwrap()
478 .as_ref()
479 .unwrap()
480 .iter()
481 .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
482 .unwrap()
483 .host_port
484 .as_ref()
485 .unwrap()
486 .parse()
487 .unwrap();
488
489 remote_port
490}
491
492pub struct DockerDeploy {
494 docker_processes: Vec<DockerDeployProcessSpec>,
495 docker_clusters: Vec<DockerDeployClusterSpec>,
496 network: DockerNetwork,
497 deployment_instance: String,
498}
499
500#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
501async fn create_and_start_container(
502 docker: &Docker,
503 container_name: &str,
504 image_name: &str,
505 network_name: &str,
506 deployment_instance: &str,
507) -> Result<(), anyhow::Error> {
508 let config = ContainerCreateBody {
509 image: Some(image_name.to_owned()),
510 hostname: Some(container_name.to_owned()),
511 host_config: Some(HostConfig {
512 binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
513 publish_all_ports: Some(true),
514 port_bindings: Some(HashMap::new()), ..Default::default()
516 }),
517 env: Some(vec![
518 format!("CONTAINER_NAME={container_name}"),
519 format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
520 format!("RUST_LOG=trace"),
521 ]),
522 networking_config: Some(NetworkingConfig {
523 endpoints_config: Some(HashMap::from([(
524 network_name.to_owned(),
525 EndpointSettings {
526 ..Default::default()
527 },
528 )])),
529 }),
530 tty: Some(true),
531 ..Default::default()
532 };
533
534 let options = CreateContainerOptions {
535 name: Some(container_name.to_owned()),
536 ..Default::default()
537 };
538
539 tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
540 docker.create_container(Some(options), config).await?;
541 docker
542 .start_container(container_name, None::<StartContainerOptions>)
543 .await?;
544
545 Ok(())
546}
547
548#[instrument(level = "trace", skip_all, fields(%image_name))]
549async fn build_and_create_image(
550 rust_crate: &Rc<RefCell<Option<RustCrate>>>,
551 compilation_options: Option<&str>,
552 config: &[String],
553 exposed_ports: &[u16],
554 image_name: &str,
555) -> Result<(), anyhow::Error> {
556 let mut rust_crate = rust_crate
557 .borrow_mut()
558 .take()
559 .unwrap()
560 .rustflags(compilation_options.unwrap_or_default());
561
562 for cfg in config {
563 rust_crate = rust_crate.config(cfg);
564 }
565
566 let build_output = match build_crate_memoized(
567 rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
568 )
569 .await
570 {
571 Ok(build_output) => build_output,
572 Err(BuildError::FailedToBuildCrate {
573 exit_status,
574 diagnostics,
575 text_lines,
576 stderr_lines,
577 }) => {
578 let diagnostics = diagnostics
579 .into_iter()
580 .map(|d| d.rendered.unwrap())
581 .collect::<Vec<_>>()
582 .join("\n");
583 let text_lines = text_lines.join("\n");
584 let stderr_lines = stderr_lines.join("\n");
585
586 anyhow::bail!(
587 r#"
588Failed to build crate {exit_status:?}
589--- diagnostics
590---
591{diagnostics}
592---
593---
594---
595
596--- text_lines
597---
598---
599{text_lines}
600---
601---
602---
603
604--- stderr_lines
605---
606---
607{stderr_lines}
608---
609---
610---"#
611 );
612 }
613 Err(err) => {
614 anyhow::bail!("Failed to build crate {err:?}");
615 }
616 };
617
618 let docker = Docker::connect_with_local_defaults()?;
619
620 let mut tar_data = Vec::new();
621 {
622 let mut tar = Builder::new(&mut tar_data);
623
624 let exposed_ports = exposed_ports
625 .iter()
626 .map(|port| format!("EXPOSE {port}/tcp"))
627 .collect::<Vec<_>>()
628 .join("\n");
629
630 let dockerfile_content = format!(
631 r#"
632 FROM scratch
633 {exposed_ports}
634 COPY app /app
635 CMD ["/app"]
636 "#,
637 );
638
639 trace!(name: "dockerfile", %dockerfile_content);
640
641 let mut header = Header::new_gnu();
642 header.set_path("Dockerfile")?;
643 header.set_size(dockerfile_content.len() as u64);
644 header.set_cksum();
645 tar.append(&header, dockerfile_content.as_bytes())?;
646
647 let mut header = Header::new_gnu();
648 header.set_path("app")?;
649 header.set_size(build_output.bin_data.len() as u64);
650 header.set_mode(0o755);
651 header.set_cksum();
652 tar.append(&header, &build_output.bin_data[..])?;
653
654 tar.finish()?;
655 }
656
657 let build_options = BuildImageOptions {
658 dockerfile: "Dockerfile".to_owned(),
659 t: Some(image_name.to_owned()),
660 rm: true,
661 ..Default::default()
662 };
663
664 use bollard::errors::Error;
665
666 let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
667 let mut build_stream = docker.build_image(build_options, None, Some(body));
668 while let Some(msg) = build_stream.next().await {
669 match msg {
670 Ok(_) => {}
671 Err(e) => match e {
672 Error::DockerStreamError { error } => {
673 return Err(anyhow::anyhow!(
674 "Docker build failed: DockerStreamError: {{ error: {error} }}"
675 ));
676 }
677 _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
678 },
679 }
680 }
681
682 Ok(())
683}
684
685impl DockerDeploy {
686 pub fn new(network: DockerNetwork) -> Self {
688 Self {
689 docker_processes: Vec::new(),
690 docker_clusters: Vec::new(),
691 network,
692 deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
693 }
694 }
695
696 pub fn add_localhost_docker(
698 &mut self,
699 compilation_options: Option<String>,
700 config: Vec<String>,
701 ) -> DockerDeployProcessSpec {
702 let process = DockerDeployProcessSpec {
703 compilation_options,
704 config,
705 network: self.network.clone(),
706 deployment_instance: self.deployment_instance.clone(),
707 };
708
709 self.docker_processes.push(process.clone());
710
711 process
712 }
713
714 pub fn add_localhost_docker_cluster(
716 &mut self,
717 compilation_options: Option<String>,
718 config: Vec<String>,
719 count: usize,
720 ) -> DockerDeployClusterSpec {
721 let cluster = DockerDeployClusterSpec {
722 compilation_options,
723 config,
724 count,
725 deployment_instance: self.deployment_instance.clone(),
726 };
727
728 self.docker_clusters.push(cluster.clone());
729
730 cluster
731 }
732
733 pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
735 DockerDeployExternalSpec { name }
736 }
737
738 pub fn get_deployment_instance(&self) -> String {
740 self.deployment_instance.clone()
741 }
742
743 #[instrument(level = "trace", skip_all)]
745 pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
746 for (_, _, process) in nodes.get_all_processes() {
747 let exposed_ports = process.exposed_ports.borrow().clone();
748
749 build_and_create_image(
750 &process.rust_crate,
751 process.compilation_options.as_deref(),
752 &process.config,
753 &exposed_ports,
754 &process.name,
755 )
756 .await?;
757 }
758
759 for (_, _, cluster) in nodes.get_all_clusters() {
760 build_and_create_image(
761 &cluster.rust_crate,
762 cluster.compilation_options.as_deref(),
763 &cluster.config,
764 &[], &cluster.name,
766 )
767 .await?;
768 }
769
770 Ok(())
771 }
772
773 #[instrument(level = "trace", skip_all)]
775 pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
776 let docker = Docker::connect_with_local_defaults()?;
777
778 match docker
779 .create_network(NetworkCreateRequest {
780 name: self.network.name.clone(),
781 driver: Some("bridge".to_owned()),
782 ..Default::default()
783 })
784 .await
785 {
786 Ok(v) => v.id,
787 Err(e) => {
788 panic!("Failed to create docker network: {e:?}");
789 }
790 };
791
792 for (_, _, process) in nodes.get_all_processes() {
793 let docker_container_name: String = get_docker_container_name(&process.name, None);
794 *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
795
796 create_and_start_container(
797 &docker,
798 &docker_container_name,
799 &process.name,
800 &self.network.name,
801 &self.deployment_instance,
802 )
803 .await?;
804 }
805
806 for (_, _, cluster) in nodes.get_all_clusters() {
807 for num in 0..cluster.count {
808 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
809 cluster
810 .docker_container_name
811 .borrow_mut()
812 .push(docker_container_name.clone());
813
814 create_and_start_container(
815 &docker,
816 &docker_container_name,
817 &cluster.name,
818 &self.network.name,
819 &self.deployment_instance,
820 )
821 .await?;
822 }
823 }
824
825 Ok(())
826 }
827
828 #[instrument(level = "trace", skip_all)]
830 pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
831 let docker = Docker::connect_with_local_defaults()?;
832
833 for (_, _, process) in nodes.get_all_processes() {
834 let docker_container_name: String = get_docker_container_name(&process.name, None);
835
836 docker
837 .kill_container(&docker_container_name, None::<KillContainerOptions>)
838 .await?;
839 }
840
841 for (_, _, cluster) in nodes.get_all_clusters() {
842 for num in 0..cluster.count {
843 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
844
845 docker
846 .kill_container(&docker_container_name, None::<KillContainerOptions>)
847 .await?;
848 }
849 }
850
851 Ok(())
852 }
853
854 #[instrument(level = "trace", skip_all)]
856 pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
857 let docker = Docker::connect_with_local_defaults()?;
858
859 for (_, _, process) in nodes.get_all_processes() {
860 let docker_container_name: String = get_docker_container_name(&process.name, None);
861
862 docker
863 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
864 .await?;
865 }
866
867 for (_, _, cluster) in nodes.get_all_clusters() {
868 for num in 0..cluster.count {
869 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
870
871 docker
872 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
873 .await?;
874 }
875 }
876
877 docker
878 .remove_network(&self.network.name)
879 .await
880 .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
881
882 use bollard::query_parameters::RemoveImageOptions;
883
884 for (_, _, process) in nodes.get_all_processes() {
885 docker
886 .remove_image(&process.name, None::<RemoveImageOptions>, None)
887 .await?;
888 }
889
890 for (_, _, cluster) in nodes.get_all_clusters() {
891 docker
892 .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
893 .await?;
894 }
895
896 Ok(())
897 }
898}
899
900impl<'a> Deploy<'a> for DockerDeploy {
901 type Meta = ();
902 type InstantiateEnv = Self;
903
904 type Process = DockerDeployProcess;
905 type Cluster = DockerDeployCluster;
906 type External = DockerDeployExternal;
907
908 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
909 fn o2o_sink_source(
910 p1: &Self::Process,
911 p1_port: &<Self::Process as Node>::Port,
912 p2: &Self::Process,
913 p2_port: &<Self::Process as Node>::Port,
914 ) -> (syn::Expr, syn::Expr) {
915 let bind_addr = format!("0.0.0.0:{}", p2_port);
916 let target = format!("{}:{p2_port}", p2.name);
917
918 deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
919 }
920
921 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
922 fn o2o_connect(
923 p1: &Self::Process,
924 p1_port: &<Self::Process as Node>::Port,
925 p2: &Self::Process,
926 p2_port: &<Self::Process as Node>::Port,
927 ) -> Box<dyn FnOnce()> {
928 let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
929
930 Box::new(move || {
931 trace!(name: "o2o_connect thunk", %serialized);
932 })
933 }
934
935 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
936 fn o2m_sink_source(
937 p1: &Self::Process,
938 p1_port: &<Self::Process as Node>::Port,
939 c2: &Self::Cluster,
940 c2_port: &<Self::Cluster as Node>::Port,
941 ) -> (syn::Expr, syn::Expr) {
942 deploy_containerized_o2m(*c2_port)
943 }
944
945 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
946 fn o2m_connect(
947 p1: &Self::Process,
948 p1_port: &<Self::Process as Node>::Port,
949 c2: &Self::Cluster,
950 c2_port: &<Self::Cluster as Node>::Port,
951 ) -> Box<dyn FnOnce()> {
952 let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
953
954 Box::new(move || {
955 trace!(name: "o2m_connect thunk", %serialized);
956 })
957 }
958
959 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
960 fn m2o_sink_source(
961 c1: &Self::Cluster,
962 c1_port: &<Self::Cluster as Node>::Port,
963 p2: &Self::Process,
964 p2_port: &<Self::Process as Node>::Port,
965 ) -> (syn::Expr, syn::Expr) {
966 deploy_containerized_m2o(*p2_port, &p2.name)
967 }
968
969 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
970 fn m2o_connect(
971 c1: &Self::Cluster,
972 c1_port: &<Self::Cluster as Node>::Port,
973 p2: &Self::Process,
974 p2_port: &<Self::Process as Node>::Port,
975 ) -> Box<dyn FnOnce()> {
976 let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
977
978 Box::new(move || {
979 trace!(name: "m2o_connect thunk", %serialized);
980 })
981 }
982
983 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
984 fn m2m_sink_source(
985 c1: &Self::Cluster,
986 c1_port: &<Self::Cluster as Node>::Port,
987 c2: &Self::Cluster,
988 c2_port: &<Self::Cluster as Node>::Port,
989 ) -> (syn::Expr, syn::Expr) {
990 deploy_containerized_m2m(*c2_port)
991 }
992
993 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
994 fn m2m_connect(
995 c1: &Self::Cluster,
996 c1_port: &<Self::Cluster as Node>::Port,
997 c2: &Self::Cluster,
998 c2_port: &<Self::Cluster as Node>::Port,
999 ) -> Box<dyn FnOnce()> {
1000 let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1001
1002 Box::new(move || {
1003 trace!(name: "m2m_connect thunk", %serialized);
1004 })
1005 }
1006
1007 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1008 fn e2o_many_source(
1009 extra_stmts: &mut Vec<syn::Stmt>,
1010 p2: &Self::Process,
1011 p2_port: &<Self::Process as Node>::Port,
1012 codec_type: &syn::Type,
1013 shared_handle: String,
1014 ) -> syn::Expr {
1015 p2.exposed_ports.borrow_mut().push(*p2_port);
1016
1017 let socket_ident = syn::Ident::new(
1018 &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1019 Span::call_site(),
1020 );
1021
1022 let source_ident = syn::Ident::new(
1023 &format!("__hydro_deploy_many_{}_source", &shared_handle),
1024 Span::call_site(),
1025 );
1026
1027 let sink_ident = syn::Ident::new(
1028 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1029 Span::call_site(),
1030 );
1031
1032 let membership_ident = syn::Ident::new(
1033 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1034 Span::call_site(),
1035 );
1036
1037 let bind_addr = format!("0.0.0.0:{}", p2_port);
1038
1039 extra_stmts.push(syn::parse_quote! {
1040 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1041 });
1042
1043 let root = crate::staging_util::get_this_crate();
1044
1045 extra_stmts.push(syn::parse_quote! {
1046 let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1047 });
1048
1049 parse_quote!(#source_ident)
1050 }
1051
1052 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1053 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1054 let sink_ident = syn::Ident::new(
1055 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1056 Span::call_site(),
1057 );
1058 parse_quote!(#sink_ident)
1059 }
1060
1061 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1062 fn e2o_source(
1063 extra_stmts: &mut Vec<syn::Stmt>,
1064 p1: &Self::External,
1065 p1_port: &<Self::External as Node>::Port,
1066 p2: &Self::Process,
1067 p2_port: &<Self::Process as Node>::Port,
1068 _codec_type: &syn::Type,
1069 shared_handle: String,
1070 ) -> syn::Expr {
1071 p1.connection_info.borrow_mut().insert(
1072 *p1_port,
1073 (
1074 p2.docker_container_name.clone(),
1075 *p2_port,
1076 p2.network.clone(),
1077 ),
1078 );
1079
1080 p2.exposed_ports.borrow_mut().push(*p2_port);
1081
1082 let socket_ident = syn::Ident::new(
1083 &format!("__hydro_deploy_{}_socket", &shared_handle),
1084 Span::call_site(),
1085 );
1086
1087 let source_ident = syn::Ident::new(
1088 &format!("__hydro_deploy_{}_source", &shared_handle),
1089 Span::call_site(),
1090 );
1091
1092 let sink_ident = syn::Ident::new(
1093 &format!("__hydro_deploy_{}_sink", &shared_handle),
1094 Span::call_site(),
1095 );
1096
1097 let bind_addr = format!("0.0.0.0:{}", p2_port);
1098
1099 extra_stmts.push(syn::parse_quote! {
1100 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1101 });
1102
1103 let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1104
1105 extra_stmts.push(syn::parse_quote! {
1106 let (#sink_ident, #source_ident) = (#create_expr).split();
1107 });
1108
1109 parse_quote!(#source_ident)
1110 }
1111
1112 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1113 fn e2o_connect(
1114 p1: &Self::External,
1115 p1_port: &<Self::External as Node>::Port,
1116 p2: &Self::Process,
1117 p2_port: &<Self::Process as Node>::Port,
1118 many: bool,
1119 server_hint: NetworkHint,
1120 ) -> Box<dyn FnOnce()> {
1121 if server_hint != NetworkHint::Auto {
1122 panic!(
1123 "Docker deployment only supports NetworkHint::Auto, got {:?}",
1124 server_hint
1125 );
1126 }
1127
1128 if many {
1130 p1.connection_info.borrow_mut().insert(
1131 *p1_port,
1132 (
1133 p2.docker_container_name.clone(),
1134 *p2_port,
1135 p2.network.clone(),
1136 ),
1137 );
1138 }
1139
1140 let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1141
1142 Box::new(move || {
1143 trace!(name: "e2o_connect thunk", %serialized);
1144 })
1145 }
1146
1147 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1148 fn o2e_sink(
1149 p1: &Self::Process,
1150 p1_port: &<Self::Process as Node>::Port,
1151 p2: &Self::External,
1152 p2_port: &<Self::External as Node>::Port,
1153 shared_handle: String,
1154 ) -> syn::Expr {
1155 let sink_ident = syn::Ident::new(
1156 &format!("__hydro_deploy_{}_sink", &shared_handle),
1157 Span::call_site(),
1158 );
1159 parse_quote!(#sink_ident)
1160 }
1161
1162 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1163 fn cluster_ids(
1164 of_cluster: LocationKey,
1165 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1166 cluster_ids()
1167 }
1168
1169 #[instrument(level = "trace", skip_all)]
1170 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1171 cluster_self_id()
1172 }
1173
1174 #[instrument(level = "trace", skip_all, fields(?location_id))]
1175 fn cluster_membership_stream(
1176 location_id: &LocationId,
1177 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1178 {
1179 cluster_membership_stream(location_id)
1180 }
1181}
1182
1183const CONTAINER_ALPHABET: [char; 36] = [
1184 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1185 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1186];
1187
1188#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1189fn get_docker_image_name(
1190 name_hint: &str,
1191 location_key: LocationKey,
1192 deployment_instance: &str,
1193) -> String {
1194 let name_hint = name_hint
1195 .split("::")
1196 .last()
1197 .unwrap()
1198 .to_ascii_lowercase()
1199 .replace(".", "-")
1200 .replace("_", "-")
1201 .replace("::", "-");
1202
1203 let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1204
1205 format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location_key}")
1206}
1207
1208#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1209fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1210 if let Some(instance) = instance {
1211 format!("{image_name}-{instance}")
1212 } else {
1213 image_name.to_owned()
1214 }
1215}
1216#[derive(Clone)]
1218pub struct DockerDeployProcessSpec {
1219 compilation_options: Option<String>,
1220 config: Vec<String>,
1221 network: DockerNetwork,
1222 deployment_instance: String,
1223}
1224
1225impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1226 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1227 fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1228 DockerDeployProcess {
1229 key,
1230 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1231
1232 next_port: Rc::new(RefCell::new(1000)),
1233 rust_crate: Rc::new(RefCell::new(None)),
1234
1235 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1236
1237 docker_container_name: Rc::new(RefCell::new(None)),
1238
1239 compilation_options: self.compilation_options,
1240 config: self.config,
1241
1242 network: self.network.clone(),
1243 }
1244 }
1245}
1246
1247#[derive(Clone)]
1249pub struct DockerDeployClusterSpec {
1250 compilation_options: Option<String>,
1251 config: Vec<String>,
1252 count: usize,
1253 deployment_instance: String,
1254}
1255
1256impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1257 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1258 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1259 DockerDeployCluster {
1260 key,
1261 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1262
1263 next_port: Rc::new(RefCell::new(1000)),
1264 rust_crate: Rc::new(RefCell::new(None)),
1265
1266 docker_container_name: Rc::new(RefCell::new(Vec::new())),
1267
1268 compilation_options: self.compilation_options,
1269 config: self.config,
1270
1271 count: self.count,
1272 }
1273 }
1274}
1275
1276pub struct DockerDeployExternalSpec {
1278 name: String,
1279}
1280
1281impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1282 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1283 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1284 DockerDeployExternal {
1285 name: self.name,
1286 next_port: Rc::new(RefCell::new(10000)),
1287 ports: Rc::new(RefCell::new(HashMap::new())),
1288 connection_info: Rc::new(RefCell::new(HashMap::new())),
1289 }
1290 }
1291}