1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11 BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12 RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::deploy::DeployResult;
33use crate::compile::deploy_provider::{
34 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
35};
36use crate::compile::trybuild::generate::create_graph_trybuild;
37use crate::location::dynamic::LocationId;
38use crate::location::member_id::TaglessMemberId;
39use crate::location::{MembershipEvent, NetworkHint};
40
41#[derive(Clone, Debug)]
43pub struct DockerNetwork {
44 name: String,
45}
46
47impl DockerNetwork {
48 pub fn new(name: String) -> Self {
50 Self {
51 name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
52 }
53 }
54}
55
56#[derive(Clone)]
58pub struct DockerDeployProcess {
59 id: usize,
60 name: String,
61 next_port: Rc<RefCell<u16>>,
62 rust_crate: Rc<RefCell<Option<RustCrate>>>,
63
64 exposed_ports: Rc<RefCell<Vec<u16>>>,
65
66 docker_container_name: Rc<RefCell<Option<String>>>,
67
68 compilation_options: Option<String>,
69
70 config: Vec<String>,
71
72 network: DockerNetwork,
73}
74
75impl Node for DockerDeployProcess {
76 type Port = u16;
77 type Meta = ();
78 type InstantiateEnv = DockerDeploy;
79
80 #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
81 fn next_port(&self) -> Self::Port {
82 let port = {
83 let mut borrow = self.next_port.borrow_mut();
84 let port = *borrow;
85 *borrow += 1;
86 port
87 };
88
89 port
90 }
91
92 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
93 fn update_meta(&self, _meta: &Self::Meta) {}
94
95 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
96 fn instantiate(
97 &self,
98 _env: &mut Self::InstantiateEnv,
99 meta: &mut Self::Meta,
100 graph: DfirGraph,
101 extra_stmts: Vec<syn::Stmt>,
102 ) {
103 let (bin_name, config) =
104 create_graph_trybuild(graph, extra_stmts, &Some(self.name.clone()), true);
105
106 let mut ret = RustCrate::new(config.project_dir)
107 .target_dir(config.target_dir)
108 .example(bin_name.clone())
109 .no_default_features();
110
111 ret = ret.display_name("test_display_name");
112
113 ret = ret.features(vec!["hydro___feature_docker_runtime".to_string()]);
114
115 if let Some(features) = config.features {
116 ret = ret.features(features);
117 }
118
119 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
120 ret = ret.config("build.incremental = false");
121
122 *self.rust_crate.borrow_mut() = Some(ret);
123 }
124}
125
126#[derive(Clone)]
128pub struct DockerDeployCluster {
129 id: usize,
130 name: String,
131 next_port: Rc<RefCell<u16>>,
132 rust_crate: Rc<RefCell<Option<RustCrate>>>,
133
134 docker_container_name: Rc<RefCell<Vec<String>>>,
135
136 compilation_options: Option<String>,
137
138 config: Vec<String>,
139
140 count: usize,
141}
142
143impl Node for DockerDeployCluster {
144 type Port = u16;
145 type Meta = ();
146 type InstantiateEnv = DockerDeploy;
147
148 #[instrument(level = "trace", skip_all, ret, fields(id = self.id, name = self.name))]
149 fn next_port(&self) -> Self::Port {
150 let port = {
151 let mut borrow = self.next_port.borrow_mut();
152 let port = *borrow;
153 *borrow += 1;
154 port
155 };
156
157 port
158 }
159
160 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name))]
161 fn update_meta(&self, _meta: &Self::Meta) {}
162
163 #[instrument(level = "trace", skip_all, fields(id = self.id, name = self.name, extra_stmts = extra_stmts.len()))]
164 fn instantiate(
165 &self,
166 _env: &mut Self::InstantiateEnv,
167 _meta: &mut Self::Meta,
168 graph: DfirGraph,
169 extra_stmts: Vec<syn::Stmt>,
170 ) {
171 let (bin_name, config) =
172 create_graph_trybuild(graph, extra_stmts, &Some(self.name.clone()), true);
173
174 let mut ret = RustCrate::new(config.project_dir)
175 .target_dir(config.target_dir)
176 .example(bin_name.clone())
177 .no_default_features();
178
179 ret = ret.display_name("test_display_name");
180
181 ret = ret.features(vec!["hydro___feature_deploy_integration".to_string()]);
182
183 if let Some(features) = config.features {
184 ret = ret.features(features);
185 }
186
187 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
188 ret = ret.config("build.incremental = false");
189
190 *self.rust_crate.borrow_mut() = Some(ret);
191 }
192}
193
194#[derive(Clone, Debug)]
196pub struct DockerDeployExternal {
197 name: String,
198 next_port: Rc<RefCell<u16>>,
199
200 ports: Rc<RefCell<HashMap<usize, u16>>>,
201
202 #[expect(clippy::type_complexity, reason = "internal code")]
203 connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
204}
205
206impl Node for DockerDeployExternal {
207 type Port = u16;
208 type Meta = ();
209 type InstantiateEnv = DockerDeploy;
210
211 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
212 fn next_port(&self) -> Self::Port {
213 let port = {
214 let mut borrow = self.next_port.borrow_mut();
215 let port = *borrow;
216 *borrow += 1;
217 port
218 };
219
220 port
221 }
222
223 #[instrument(level = "trace", skip_all, fields(name = self.name))]
224 fn update_meta(&self, _meta: &Self::Meta) {}
225
226 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
227 fn instantiate(
228 &self,
229 _env: &mut Self::InstantiateEnv,
230 meta: &mut Self::Meta,
231 graph: DfirGraph,
232 extra_stmts: Vec<syn::Stmt>,
233 ) {
234 trace!(name: "surface", surface = graph.surface_syntax_string());
235 }
236}
237
238type DynSourceSink<Out, In, InErr> = (
239 Pin<Box<dyn Stream<Item = Out>>>,
240 Pin<Box<dyn Sink<In, Error = InErr>>>,
241);
242
243impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
244 #[instrument(level = "trace", skip_all, fields(name = self.name, %key, %port))]
245 fn register(&self, key: usize, port: <DockerDeploy as Deploy>::Port) {
246 self.ports.borrow_mut().insert(key, port);
247 }
248
249 #[instrument(level = "trace", skip_all, fields(name = self.name, %key))]
250 fn raw_port(&self, key: usize) -> <DockerDeploy as Deploy<'a>>::ExternalRawPort {
251 todo!()
252 }
253
254 fn as_bytes_bidi(
255 &self,
256 key: usize,
257 ) -> impl Future<
258 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
259 > + 'a {
260 let _span = tracing::trace_span!("as_bytes_bidi", name = %self.name, %key).entered(); async { todo!() }
262 }
263
264 fn as_bincode_bidi<InT, OutT>(
265 &self,
266 key: usize,
267 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
268 where
269 InT: serde::Serialize + 'static,
270 OutT: serde::de::DeserializeOwned + 'static,
271 {
272 let _span = tracing::trace_span!("as_bincode_bidi", name = %self.name, %key).entered(); async { todo!() }
274 }
275
276 fn as_bincode_sink<T>(
277 &self,
278 key: usize,
279 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
280 where
281 T: serde::Serialize + 'static,
282 {
283 let guard = tracing::trace_span!("as_bincode_sink", name = %self.name, %key).entered();
284
285 let local_port = *self.ports.borrow().get(&key).unwrap();
286 let (docker_container_name, remote_port, _) = self
287 .connection_info
288 .borrow()
289 .get(&local_port)
290 .unwrap()
291 .clone();
292
293 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
294
295 async move {
296 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
297 let remote_ip_address = "localhost";
298
299 Box::pin(
300 LazySink::new(move || {
301 Box::pin(async move {
302 trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
303
304 let stream =
305 TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
306 .await?;
307
308 trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
309
310 Result::<_, std::io::Error>::Ok(FramedWrite::new(
311 stream,
312 LengthDelimitedCodec::new(),
313 ))
314 })
315 })
316 .with(move |v| async move {
317 Ok(Bytes::from(bincode::serialize(&v).unwrap()))
318 }),
319 ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
320 }
321 .instrument(guard.exit())
322 }
323
324 fn as_bincode_source<T>(
325 &self,
326 key: usize,
327 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
328 where
329 T: serde::de::DeserializeOwned + 'static,
330 {
331 let guard = tracing::trace_span!("as_bincode_sink", name = %self.name, %key).entered();
332
333 let local_port = *self.ports.borrow().get(&key).unwrap();
334 let (docker_container_name, remote_port, _) = self
335 .connection_info
336 .borrow()
337 .get(&local_port)
338 .unwrap()
339 .clone();
340
341 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
342
343 async move {
344
345 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
346 let remote_ip_address = "localhost";
347
348 trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
349
350 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
351 .await
352 .unwrap();
353
354 trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
355
356 Box::pin(
357 FramedRead::new(stream, LengthDelimitedCodec::new())
358 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
359 ) as Pin<Box<dyn Stream<Item = T>>>
360 }
361 .instrument(guard.exit())
362 }
363}
364
365#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
366async fn find_dynamically_allocated_docker_port(
367 docker_container_name: &str,
368 destination_port: u16,
369) -> u16 {
370 let docker = Docker::connect_with_local_defaults().unwrap();
371
372 let container_info = docker
373 .inspect_container(docker_container_name, None::<InspectContainerOptions>)
374 .await
375 .unwrap();
376
377 trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
378
379 let remote_port = container_info
381 .network_settings
382 .as_ref()
383 .unwrap()
384 .ports
385 .as_ref()
386 .unwrap()
387 .get(&format!("{destination_port}/tcp"))
388 .unwrap()
389 .as_ref()
390 .unwrap()
391 .iter()
392 .find(|v| v.host_ip == Some("0.0.0.0".to_string()))
393 .unwrap()
394 .host_port
395 .as_ref()
396 .unwrap()
397 .parse()
398 .unwrap();
399
400 remote_port
401}
402
403pub struct DockerDeploy {
405 docker_processes: Vec<DockerDeployProcessSpec>,
406 docker_clusters: Vec<DockerDeployClusterSpec>,
407 network: DockerNetwork,
408 deployment_instance: String,
409}
410
411#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
412async fn create_and_start_container(
413 docker: &Docker,
414 container_name: &str,
415 image_name: &str,
416 network_name: &str,
417 deployment_instance: &str,
418) -> Result<(), anyhow::Error> {
419 let config = ContainerCreateBody {
420 image: Some(image_name.to_string()),
421 hostname: Some(container_name.to_string()),
422 host_config: Some(HostConfig {
423 binds: Some(vec![
424 "/var/run/docker.sock:/var/run/docker.sock".to_string(),
425 ]),
426 publish_all_ports: Some(true),
427 port_bindings: Some(HashMap::new()), ..Default::default()
429 }),
430 env: Some(vec![
431 format!("CONTAINER_NAME={container_name}"),
432 format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
433 format!("RUST_LOG=trace"),
434 ]),
435 networking_config: Some(NetworkingConfig {
436 endpoints_config: Some(HashMap::from([(
437 network_name.to_string(),
438 EndpointSettings {
439 ..Default::default()
440 },
441 )])),
442 }),
443 tty: Some(true),
444 ..Default::default()
445 };
446
447 let options = CreateContainerOptions {
448 name: Some(container_name.to_string()),
449 ..Default::default()
450 };
451
452 tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
453 docker.create_container(Some(options), config).await?;
454 docker
455 .start_container(container_name, None::<StartContainerOptions>)
456 .await?;
457
458 Ok(())
459}
460
461#[instrument(level = "trace", skip_all, fields(%image_name))]
462async fn build_and_create_image(
463 rust_crate: &Rc<RefCell<Option<RustCrate>>>,
464 compilation_options: &Option<String>,
465 config: &[String],
466 exposed_ports: &[u16],
467 image_name: &str,
468) -> Result<(), anyhow::Error> {
469 let mut rust_crate = rust_crate
470 .borrow_mut()
471 .take()
472 .unwrap()
473 .rustflags(compilation_options.clone().unwrap_or("".to_string()));
474
475 for cfg in config {
476 rust_crate = rust_crate.config(cfg);
477 }
478
479 let build_output = match build_crate_memoized(
480 rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
481 )
482 .await
483 {
484 Ok(build_output) => build_output,
485 Err(BuildError::FailedToBuildCrate {
486 exit_status,
487 diagnostics,
488 text_lines,
489 stderr_lines,
490 }) => {
491 let diagnostics = diagnostics
492 .into_iter()
493 .map(|d| d.rendered.unwrap())
494 .collect::<Vec<_>>()
495 .join("\n");
496 let text_lines = text_lines.join("\n");
497 let stderr_lines = stderr_lines.join("\n");
498
499 anyhow::bail!(
500 r#"
501Failed to build crate {exit_status:?}
502--- diagnostics
503---
504{diagnostics}
505---
506---
507---
508
509--- text_lines
510---
511---
512{text_lines}
513---
514---
515---
516
517--- stderr_lines
518---
519---
520{stderr_lines}
521---
522---
523---"#
524 );
525 }
526 Err(err) => {
527 anyhow::bail!("Failed to build crate {err:?}");
528 }
529 };
530
531 let docker = Docker::connect_with_local_defaults()?;
532
533 let mut tar_data = Vec::new();
534 {
535 let mut tar = Builder::new(&mut tar_data);
536
537 let exposed_ports = exposed_ports
538 .iter()
539 .map(|port| format!("EXPOSE {port}/tcp"))
540 .collect::<Vec<_>>()
541 .join("\n");
542
543 let dockerfile_content = format!(
544 r#"
545 FROM scratch
546 {exposed_ports}
547 COPY app /app
548 CMD ["/app"]
549 "#,
550 );
551
552 trace!(name: "dockerfile", %dockerfile_content);
553
554 let mut header = Header::new_gnu();
555 header.set_path("Dockerfile")?;
556 header.set_size(dockerfile_content.len() as u64);
557 header.set_cksum();
558 tar.append(&header, dockerfile_content.as_bytes())?;
559
560 let mut header = Header::new_gnu();
561 header.set_path("app")?;
562 header.set_size(build_output.bin_data.len() as u64);
563 header.set_mode(0o755);
564 header.set_cksum();
565 tar.append(&header, &build_output.bin_data[..])?;
566
567 tar.finish()?;
568 }
569
570 let build_options = BuildImageOptions {
571 dockerfile: "Dockerfile".to_owned(),
572 t: Some(image_name.to_string()),
573 rm: true,
574 ..Default::default()
575 };
576
577 use bollard::errors::Error;
578
579 let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
580 let mut build_stream = docker.build_image(build_options, None, Some(body));
581 while let Some(msg) = build_stream.next().await {
582 match msg {
583 Ok(_) => {}
584 Err(e) => match e {
585 Error::DockerStreamError { error } => {
586 return Err(anyhow::anyhow!(
587 "Docker build failed: DockerStreamError: {{ error: {error} }}"
588 ));
589 }
590 _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
591 },
592 }
593 }
594
595 Ok(())
596}
597
598impl DockerDeploy {
599 pub fn new(network: DockerNetwork) -> Self {
601 Self {
602 docker_processes: Vec::new(),
603 docker_clusters: Vec::new(),
604 network,
605 deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
606 }
607 }
608
609 pub fn add_localhost_docker(
611 &mut self,
612 compilation_options: Option<String>,
613 config: Vec<String>,
614 ) -> DockerDeployProcessSpec {
615 let process = DockerDeployProcessSpec {
616 compilation_options,
617 config,
618 network: self.network.clone(),
619 deployment_instance: self.deployment_instance.clone(),
620 };
621
622 self.docker_processes.push(process.clone());
623
624 process
625 }
626
627 pub fn add_localhost_docker_cluster(
629 &mut self,
630 compilation_options: Option<String>,
631 config: Vec<String>,
632 count: usize,
633 ) -> DockerDeployClusterSpec {
634 let cluster = DockerDeployClusterSpec {
635 compilation_options,
636 config,
637 count,
638 deployment_instance: self.deployment_instance.clone(),
639 };
640
641 self.docker_clusters.push(cluster.clone());
642
643 cluster
644 }
645
646 pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
648 DockerDeployExternalSpec { name }
649 }
650
651 pub fn get_deployment_instance(&self) -> String {
653 self.deployment_instance.clone()
654 }
655
656 #[instrument(level = "trace", skip_all)]
658 pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
659 for (_, _, process) in nodes.get_all_processes() {
660 let exposed_ports = process.exposed_ports.borrow().clone();
661
662 build_and_create_image(
663 &process.rust_crate,
664 &process.compilation_options,
665 &process.config,
666 &exposed_ports,
667 &process.name,
668 )
669 .await?;
670 }
671
672 for (_, _, cluster) in nodes.get_all_clusters() {
673 build_and_create_image(
674 &cluster.rust_crate,
675 &cluster.compilation_options,
676 &cluster.config,
677 &[], &cluster.name,
679 )
680 .await?;
681 }
682
683 Ok(())
684 }
685
686 #[instrument(level = "trace", skip_all)]
688 pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
689 let docker = Docker::connect_with_local_defaults()?;
690
691 match docker
692 .create_network(NetworkCreateRequest {
693 name: self.network.name.clone(),
694 driver: Some("bridge".to_string()),
695 ..Default::default()
696 })
697 .await
698 {
699 Ok(v) => v.id,
700 Err(e) => {
701 panic!("Failed to create docker network: {e:?}");
702 }
703 };
704
705 for (_, _, process) in nodes.get_all_processes() {
706 let docker_container_name: String = get_docker_container_name(&process.name, None);
707 *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
708
709 create_and_start_container(
710 &docker,
711 &docker_container_name,
712 &process.name,
713 &self.network.name,
714 &self.deployment_instance,
715 )
716 .await?;
717 }
718
719 for (_, _, cluster) in nodes.get_all_clusters() {
720 for num in 0..cluster.count {
721 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
722 cluster
723 .docker_container_name
724 .borrow_mut()
725 .push(docker_container_name.clone());
726
727 create_and_start_container(
728 &docker,
729 &docker_container_name,
730 &cluster.name,
731 &self.network.name,
732 &self.deployment_instance,
733 )
734 .await?;
735 }
736 }
737
738 Ok(())
739 }
740
741 #[instrument(level = "trace", skip_all)]
743 pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
744 let docker = Docker::connect_with_local_defaults()?;
745
746 for (_, _, process) in nodes.get_all_processes() {
747 let docker_container_name: String = get_docker_container_name(&process.name, None);
748
749 docker
750 .kill_container(&docker_container_name, None::<KillContainerOptions>)
751 .await?;
752 }
753
754 for (_, _, cluster) in nodes.get_all_clusters() {
755 for num in 0..cluster.count {
756 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
757
758 docker
759 .kill_container(&docker_container_name, None::<KillContainerOptions>)
760 .await?;
761 }
762 }
763
764 Ok(())
765 }
766
767 #[instrument(level = "trace", skip_all)]
769 pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
770 let docker = Docker::connect_with_local_defaults()?;
771
772 for (_, _, process) in nodes.get_all_processes() {
773 let docker_container_name: String = get_docker_container_name(&process.name, None);
774
775 docker
776 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
777 .await?;
778 }
779
780 for (_, _, cluster) in nodes.get_all_clusters() {
781 for num in 0..cluster.count {
782 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
783
784 docker
785 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
786 .await?;
787 }
788 }
789
790 docker
791 .remove_network(&self.network.name)
792 .await
793 .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
794
795 use bollard::query_parameters::RemoveImageOptions;
796
797 for (_, _, process) in nodes.get_all_processes() {
798 docker
799 .remove_image(&process.name, None::<RemoveImageOptions>, None)
800 .await?;
801 }
802
803 for (_, _, cluster) in nodes.get_all_clusters() {
804 docker
805 .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
806 .await?;
807 }
808
809 Ok(())
810 }
811}
812
813impl<'a> Deploy<'a> for DockerDeploy {
814 type InstantiateEnv = Self;
815 type Process = DockerDeployProcess;
816 type Cluster = DockerDeployCluster;
817 type External = DockerDeployExternal;
818 type Port = u16;
819 type ExternalRawPort = ();
820 type Meta = ();
821 type GraphId = ();
822
823 #[instrument(level = "trace", skip_all, ret)]
824 fn allocate_process_port(process: &Self::Process) -> Self::Port {
825 process.next_port()
826 }
827
828 #[instrument(level = "trace", skip_all, ret)]
829 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
830 cluster.next_port()
831 }
832
833 #[instrument(level = "trace", skip_all, ret)]
834 fn allocate_external_port(external: &Self::External) -> Self::Port {
835 external.next_port()
836 }
837
838 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
839 fn o2o_sink_source(
840 p1: &Self::Process,
841 p1_port: &Self::Port,
842 p2: &Self::Process,
843 p2_port: &Self::Port,
844 ) -> (syn::Expr, syn::Expr) {
845 let bind_addr = format!("0.0.0.0:{}", p2_port);
846 let target = format!("{}:{p2_port}", p2.name);
847
848 deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
849 }
850
851 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
852 fn o2o_connect(
853 p1: &Self::Process,
854 p1_port: &Self::Port,
855 p2: &Self::Process,
856 p2_port: &Self::Port,
857 ) -> Box<dyn FnOnce()> {
858 let serialized = format!(
859 "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
860 p1.name, p2.name
861 );
862
863 Box::new(move || {
864 trace!(name: "o2o_connect thunk", %serialized);
865 })
866 }
867
868 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
869 fn o2m_sink_source(
870 p1: &Self::Process,
871 p1_port: &Self::Port,
872 c2: &Self::Cluster,
873 c2_port: &Self::Port,
874 ) -> (syn::Expr, syn::Expr) {
875 deploy_containerized_o2m(*c2_port)
876 }
877
878 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
879 fn o2m_connect(
880 p1: &Self::Process,
881 p1_port: &Self::Port,
882 c2: &Self::Cluster,
883 c2_port: &Self::Port,
884 ) -> Box<dyn FnOnce()> {
885 let serialized = format!(
886 "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
887 p1.name, c2.name
888 );
889
890 Box::new(move || {
891 trace!(name: "o2m_connect thunk", %serialized);
892 })
893 }
894
895 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
896 fn m2o_sink_source(
897 c1: &Self::Cluster,
898 c1_port: &Self::Port,
899 p2: &Self::Process,
900 p2_port: &Self::Port,
901 ) -> (syn::Expr, syn::Expr) {
902 deploy_containerized_m2o(*p2_port, &p2.name)
903 }
904
905 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
906 fn m2o_connect(
907 c1: &Self::Cluster,
908 c1_port: &Self::Port,
909 p2: &Self::Process,
910 p2_port: &Self::Port,
911 ) -> Box<dyn FnOnce()> {
912 let serialized = format!(
913 "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
914 c1.name, p2.name
915 );
916
917 Box::new(move || {
918 trace!(name: "m2o_connect thunk", %serialized);
919 })
920 }
921
922 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
923 fn m2m_sink_source(
924 c1: &Self::Cluster,
925 c1_port: &Self::Port,
926 c2: &Self::Cluster,
927 c2_port: &Self::Port,
928 ) -> (syn::Expr, syn::Expr) {
929 deploy_containerized_m2m(*c2_port)
930 }
931
932 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
933 fn m2m_connect(
934 c1: &Self::Cluster,
935 c1_port: &Self::Port,
936 c2: &Self::Cluster,
937 c2_port: &Self::Port,
938 ) -> Box<dyn FnOnce()> {
939 let serialized = format!(
940 "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
941 c1.name, c2.name
942 );
943
944 Box::new(move || {
945 trace!(name: "m2m_connect thunk", %serialized);
946 })
947 }
948
949 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
950 fn e2o_many_source(
951 extra_stmts: &mut Vec<syn::Stmt>,
952 p2: &Self::Process,
953 p2_port: &Self::Port,
954 _codec_type: &syn::Type,
955 shared_handle: String,
956 ) -> syn::Expr {
957 todo!()
958 }
959
960 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
961 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
962 todo!()
963 }
964
965 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
966 fn e2o_source(
967 extra_stmts: &mut Vec<syn::Stmt>,
968 p1: &Self::External,
969 p1_port: &Self::Port,
970 p2: &Self::Process,
971 p2_port: &Self::Port,
972 _codec_type: &syn::Type,
973 shared_handle: String,
974 ) -> syn::Expr {
975 p1.connection_info.borrow_mut().insert(
976 *p1_port,
977 (
978 p2.docker_container_name.clone(),
979 *p2_port,
980 p2.network.clone(),
981 ),
982 );
983
984 p2.exposed_ports.borrow_mut().push(*p2_port);
985
986 let socket_ident = syn::Ident::new(
987 &format!("__hydro_deploy_{}_socket", &shared_handle),
988 Span::call_site(),
989 );
990
991 let source_ident = syn::Ident::new(
992 &format!("__hydro_deploy_{}_source", &shared_handle),
993 Span::call_site(),
994 );
995
996 let sink_ident = syn::Ident::new(
997 &format!("__hydro_deploy_{}_sink", &shared_handle),
998 Span::call_site(),
999 );
1000
1001 let bind_addr = format!("0.0.0.0:{}", p2_port);
1002
1003 extra_stmts.push(syn::parse_quote! {
1004 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1005 });
1006
1007 let create_expr = deploy_containerized_external_sink_source_ident(socket_ident.clone());
1008
1009 extra_stmts.push(syn::parse_quote! {
1010 let (#sink_ident, #source_ident) = (#create_expr).split();
1011 });
1012
1013 parse_quote!(#source_ident)
1014 }
1015
1016 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1017 fn e2o_connect(
1018 p1: &Self::External,
1019 p1_port: &Self::Port,
1020 p2: &Self::Process,
1021 p2_port: &Self::Port,
1022 many: bool,
1023 server_hint: NetworkHint,
1024 ) -> Box<dyn FnOnce()> {
1025 let serialized = format!(
1026 "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
1027 p1.name, p2.name
1028 );
1029
1030 Box::new(move || {
1031 trace!(name: "e2o_connect thunk", %serialized);
1032 })
1033 }
1034
1035 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1036 fn o2e_sink(
1037 p1: &Self::Process,
1038 p1_port: &Self::Port,
1039 p2: &Self::External,
1040 p2_port: &Self::Port,
1041 shared_handle: String,
1042 ) -> syn::Expr {
1043 let sink_ident = syn::Ident::new(
1044 &format!("__hydro_deploy_{}_sink", &shared_handle),
1045 Span::call_site(),
1046 );
1047 parse_quote!(#sink_ident)
1048 }
1049
1050 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1051 fn cluster_ids(
1052 of_cluster: usize,
1053 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1054 cluster_ids()
1055 }
1056
1057 #[instrument(level = "trace", skip_all)]
1058 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1059 cluster_self_id()
1060 }
1061
1062 #[instrument(level = "trace", skip_all, fields(?location_id))]
1063 fn cluster_membership_stream(
1064 location_id: &LocationId,
1065 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1066 {
1067 cluster_membership_stream(location_id)
1068 }
1069}
1070
1071const CONTAINER_ALPHABET: [char; 36] = [
1072 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1073 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1074];
1075
1076#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location, %deployment_instance))]
1077fn get_docker_image_name(name_hint: &str, location: usize, deployment_instance: &str) -> String {
1078 let name_hint = name_hint
1079 .split("::")
1080 .last()
1081 .unwrap()
1082 .to_string()
1083 .to_ascii_lowercase()
1084 .replace(".", "-")
1085 .replace("_", "-")
1086 .replace("::", "-");
1087
1088 let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1089
1090 format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location}")
1091}
1092
1093#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1094fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1095 if let Some(instance) = instance {
1096 format!("{image_name}-{instance}")
1097 } else {
1098 image_name.to_string()
1099 }
1100}
1101#[derive(Clone)]
1103pub struct DockerDeployProcessSpec {
1104 compilation_options: Option<String>,
1105 config: Vec<String>,
1106 network: DockerNetwork,
1107 deployment_instance: String,
1108}
1109
1110impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1111 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1112 fn build(self, id: usize, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1113 DockerDeployProcess {
1114 id,
1115 name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1116
1117 next_port: Rc::new(RefCell::new(1000)),
1118 rust_crate: Rc::new(RefCell::new(None)),
1119
1120 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1121
1122 docker_container_name: Rc::new(RefCell::new(None)),
1123
1124 compilation_options: self.compilation_options,
1125 config: self.config,
1126
1127 network: self.network.clone(),
1128 }
1129 }
1130}
1131
1132#[derive(Clone)]
1134pub struct DockerDeployClusterSpec {
1135 compilation_options: Option<String>,
1136 config: Vec<String>,
1137 count: usize,
1138 deployment_instance: String,
1139}
1140
1141impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1142 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1143 fn build(self, id: usize, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1144 DockerDeployCluster {
1145 id,
1146 name: get_docker_image_name(name_hint, id, &self.deployment_instance),
1147
1148 next_port: Rc::new(RefCell::new(1000)),
1149 rust_crate: Rc::new(RefCell::new(None)),
1150
1151 docker_container_name: Rc::new(RefCell::new(Vec::new())),
1152
1153 compilation_options: self.compilation_options,
1154 config: self.config,
1155
1156 count: self.count,
1157 }
1158 }
1159}
1160
1161pub struct DockerDeployExternalSpec {
1163 name: String,
1164}
1165
1166impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1167 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
1168 fn build(self, id: usize, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1169 DockerDeployExternal {
1170 name: self.name,
1171 next_port: Rc::new(RefCell::new(10000)),
1172 ports: Rc::new(RefCell::new(HashMap::new())),
1173 connection_info: Rc::new(RefCell::new(HashMap::new())),
1174 }
1175 }
1176}