hydro_lang/deploy/
deploy_graph.rs

1//! Deployment backend for Hydro that uses [`hydro_deploy`] to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use stageleft::{QuotedWithContext, RuntimeData};
25use syn::parse_quote;
26
27use super::deploy_runtime::*;
28use crate::compile::deploy_provider::{
29    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
30};
31use crate::compile::trybuild::generate::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
32use crate::location::dynamic::LocationId;
33use crate::location::member_id::TaglessMemberId;
34use crate::location::{MembershipEvent, NetworkHint};
35use crate::staging_util::get_this_crate;
36
37/// Deployment backend that uses [`hydro_deploy`] for provisioning and launching.
38///
39/// Automatically used when you call [`crate::compile::builder::FlowBuilder::deploy`] and pass in
40/// an `&mut` reference to [`hydro_deploy::Deployment`] as the deployment context.
41pub enum HydroDeploy {}
42
43impl<'a> Deploy<'a> for HydroDeploy {
44    type InstantiateEnv = Deployment;
45    type Process = DeployNode;
46    type Cluster = DeployCluster;
47    type External = DeployExternal;
48    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
49    type GraphId = ();
50    type Port = String;
51    type ExternalRawPort = CustomClientPort;
52
53    fn allocate_process_port(process: &Self::Process) -> Self::Port {
54        process.next_port()
55    }
56
57    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
58        cluster.next_port()
59    }
60
61    fn allocate_external_port(external: &Self::External) -> Self::Port {
62        external.next_port()
63    }
64
65    fn o2o_sink_source(
66        _p1: &Self::Process,
67        p1_port: &Self::Port,
68        _p2: &Self::Process,
69        p2_port: &Self::Port,
70    ) -> (syn::Expr, syn::Expr) {
71        let p1_port = p1_port.as_str();
72        let p2_port = p2_port.as_str();
73        deploy_o2o(
74            RuntimeData::new("__hydro_lang_trybuild_cli"),
75            p1_port,
76            p2_port,
77        )
78    }
79
80    fn o2o_connect(
81        p1: &Self::Process,
82        p1_port: &Self::Port,
83        p2: &Self::Process,
84        p2_port: &Self::Port,
85    ) -> Box<dyn FnOnce()> {
86        let p1 = p1.clone();
87        let p1_port = p1_port.clone();
88        let p2 = p2.clone();
89        let p2_port = p2_port.clone();
90
91        Box::new(move || {
92            let self_underlying_borrow = p1.underlying.borrow();
93            let self_underlying = self_underlying_borrow.as_ref().unwrap();
94            let source_port = self_underlying.get_port(p1_port.clone());
95
96            let other_underlying_borrow = p2.underlying.borrow();
97            let other_underlying = other_underlying_borrow.as_ref().unwrap();
98            let recipient_port = other_underlying.get_port(p2_port.clone());
99
100            source_port.send_to(&recipient_port)
101        })
102    }
103
104    fn o2m_sink_source(
105        _p1: &Self::Process,
106        p1_port: &Self::Port,
107        _c2: &Self::Cluster,
108        c2_port: &Self::Port,
109    ) -> (syn::Expr, syn::Expr) {
110        let p1_port = p1_port.as_str();
111        let c2_port = c2_port.as_str();
112        deploy_o2m(
113            RuntimeData::new("__hydro_lang_trybuild_cli"),
114            p1_port,
115            c2_port,
116        )
117    }
118
119    fn o2m_connect(
120        p1: &Self::Process,
121        p1_port: &Self::Port,
122        c2: &Self::Cluster,
123        c2_port: &Self::Port,
124    ) -> Box<dyn FnOnce()> {
125        let p1 = p1.clone();
126        let p1_port = p1_port.clone();
127        let c2 = c2.clone();
128        let c2_port = c2_port.clone();
129
130        Box::new(move || {
131            let self_underlying_borrow = p1.underlying.borrow();
132            let self_underlying = self_underlying_borrow.as_ref().unwrap();
133            let source_port = self_underlying.get_port(p1_port.clone());
134
135            let recipient_port = DemuxSink {
136                demux: c2
137                    .members
138                    .borrow()
139                    .iter()
140                    .enumerate()
141                    .map(|(id, c)| {
142                        (
143                            id as u32,
144                            Arc::new(c.underlying.get_port(c2_port.clone()))
145                                as Arc<dyn RustCrateSink + 'static>,
146                        )
147                    })
148                    .collect(),
149            };
150
151            source_port.send_to(&recipient_port)
152        })
153    }
154
155    fn m2o_sink_source(
156        _c1: &Self::Cluster,
157        c1_port: &Self::Port,
158        _p2: &Self::Process,
159        p2_port: &Self::Port,
160    ) -> (syn::Expr, syn::Expr) {
161        let c1_port = c1_port.as_str();
162        let p2_port = p2_port.as_str();
163        deploy_m2o(
164            RuntimeData::new("__hydro_lang_trybuild_cli"),
165            c1_port,
166            p2_port,
167        )
168    }
169
170    fn m2o_connect(
171        c1: &Self::Cluster,
172        c1_port: &Self::Port,
173        p2: &Self::Process,
174        p2_port: &Self::Port,
175    ) -> Box<dyn FnOnce()> {
176        let c1 = c1.clone();
177        let c1_port = c1_port.clone();
178        let p2 = p2.clone();
179        let p2_port = p2_port.clone();
180
181        Box::new(move || {
182            let other_underlying_borrow = p2.underlying.borrow();
183            let other_underlying = other_underlying_borrow.as_ref().unwrap();
184            let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
185
186            for (i, node) in c1.members.borrow().iter().enumerate() {
187                let source_port = node.underlying.get_port(c1_port.clone());
188
189                TaggedSource {
190                    source: Arc::new(source_port),
191                    tag: i as u32,
192                }
193                .send_to(&recipient_port);
194            }
195        })
196    }
197
198    fn m2m_sink_source(
199        _c1: &Self::Cluster,
200        c1_port: &Self::Port,
201        _c2: &Self::Cluster,
202        c2_port: &Self::Port,
203    ) -> (syn::Expr, syn::Expr) {
204        let c1_port = c1_port.as_str();
205        let c2_port = c2_port.as_str();
206        deploy_m2m(
207            RuntimeData::new("__hydro_lang_trybuild_cli"),
208            c1_port,
209            c2_port,
210        )
211    }
212
213    fn m2m_connect(
214        c1: &Self::Cluster,
215        c1_port: &Self::Port,
216        c2: &Self::Cluster,
217        c2_port: &Self::Port,
218    ) -> Box<dyn FnOnce()> {
219        let c1 = c1.clone();
220        let c1_port = c1_port.clone();
221        let c2 = c2.clone();
222        let c2_port = c2_port.clone();
223
224        Box::new(move || {
225            for (i, sender) in c1.members.borrow().iter().enumerate() {
226                let source_port = sender.underlying.get_port(c1_port.clone());
227
228                let recipient_port = DemuxSink {
229                    demux: c2
230                        .members
231                        .borrow()
232                        .iter()
233                        .enumerate()
234                        .map(|(id, c)| {
235                            (
236                                id as u32,
237                                Arc::new(c.underlying.get_port(c2_port.clone()).merge())
238                                    as Arc<dyn RustCrateSink + 'static>,
239                            )
240                        })
241                        .collect(),
242                };
243
244                TaggedSource {
245                    source: Arc::new(source_port),
246                    tag: i as u32,
247                }
248                .send_to(&recipient_port);
249            }
250        })
251    }
252
253    fn e2o_many_source(
254        extra_stmts: &mut Vec<syn::Stmt>,
255        _p2: &Self::Process,
256        p2_port: &Self::Port,
257        codec_type: &syn::Type,
258        shared_handle: String,
259    ) -> syn::Expr {
260        let connect_ident = syn::Ident::new(
261            &format!("__hydro_deploy_many_{}_connect", &shared_handle),
262            Span::call_site(),
263        );
264        let source_ident = syn::Ident::new(
265            &format!("__hydro_deploy_many_{}_source", &shared_handle),
266            Span::call_site(),
267        );
268        let sink_ident = syn::Ident::new(
269            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
270            Span::call_site(),
271        );
272        let membership_ident = syn::Ident::new(
273            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
274            Span::call_site(),
275        );
276
277        let root = get_this_crate();
278
279        extra_stmts.push(syn::parse_quote! {
280            let #connect_ident = __hydro_lang_trybuild_cli
281                .port(#p2_port)
282                .connect::<#root::runtime_support::dfir_rs::util::deploy::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
283        });
284
285        extra_stmts.push(syn::parse_quote! {
286            let #source_ident = #connect_ident.source;
287        });
288
289        extra_stmts.push(syn::parse_quote! {
290            let #sink_ident = #connect_ident.sink;
291        });
292
293        extra_stmts.push(syn::parse_quote! {
294            let #membership_ident = #connect_ident.membership;
295        });
296
297        parse_quote!(#source_ident)
298    }
299
300    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
301        let sink_ident = syn::Ident::new(
302            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
303            Span::call_site(),
304        );
305        parse_quote!(#sink_ident)
306    }
307
308    fn e2o_source(
309        extra_stmts: &mut Vec<syn::Stmt>,
310        _p1: &Self::External,
311        _p1_port: &Self::Port,
312        _p2: &Self::Process,
313        p2_port: &Self::Port,
314        codec_type: &syn::Type,
315        shared_handle: String,
316    ) -> syn::Expr {
317        let connect_ident = syn::Ident::new(
318            &format!("__hydro_deploy_{}_connect", &shared_handle),
319            Span::call_site(),
320        );
321        let source_ident = syn::Ident::new(
322            &format!("__hydro_deploy_{}_source", &shared_handle),
323            Span::call_site(),
324        );
325        let sink_ident = syn::Ident::new(
326            &format!("__hydro_deploy_{}_sink", &shared_handle),
327            Span::call_site(),
328        );
329
330        let root = get_this_crate();
331
332        extra_stmts.push(syn::parse_quote! {
333            let #connect_ident = __hydro_lang_trybuild_cli
334                .port(#p2_port)
335                .connect::<#root::runtime_support::dfir_rs::util::deploy::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
336        });
337
338        extra_stmts.push(syn::parse_quote! {
339            let #source_ident = #connect_ident.source;
340        });
341
342        extra_stmts.push(syn::parse_quote! {
343            let #sink_ident = #connect_ident.sink;
344        });
345
346        parse_quote!(#source_ident)
347    }
348
349    fn e2o_connect(
350        p1: &Self::External,
351        p1_port: &Self::Port,
352        p2: &Self::Process,
353        p2_port: &Self::Port,
354        _many: bool,
355        server_hint: NetworkHint,
356    ) -> Box<dyn FnOnce()> {
357        let p1 = p1.clone();
358        let p1_port = p1_port.clone();
359        let p2 = p2.clone();
360        let p2_port = p2_port.clone();
361
362        Box::new(move || {
363            let self_underlying_borrow = p1.underlying.borrow();
364            let self_underlying = self_underlying_borrow.as_ref().unwrap();
365            let source_port = self_underlying.declare_many_client();
366
367            let other_underlying_borrow = p2.underlying.borrow();
368            let other_underlying = other_underlying_borrow.as_ref().unwrap();
369            let recipient_port = other_underlying.get_port_with_hint(
370                p2_port.clone(),
371                match server_hint {
372                    NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
373                    NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
374                },
375            );
376
377            source_port.send_to(&recipient_port);
378
379            p1.client_ports
380                .borrow_mut()
381                .insert(p1_port.clone(), source_port);
382        })
383    }
384
385    fn o2e_sink(
386        _p1: &Self::Process,
387        _p1_port: &Self::Port,
388        _p2: &Self::External,
389        _p2_port: &Self::Port,
390        shared_handle: String,
391    ) -> syn::Expr {
392        let sink_ident = syn::Ident::new(
393            &format!("__hydro_deploy_{}_sink", &shared_handle),
394            Span::call_site(),
395        );
396        parse_quote!(#sink_ident)
397    }
398
399    fn cluster_ids(
400        of_cluster: usize,
401    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
402        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
403    }
404
405    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
406        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
407    }
408
409    fn cluster_membership_stream(
410        location_id: &LocationId,
411    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
412    {
413        cluster_membership_stream(location_id)
414    }
415}
416
417#[expect(missing_docs, reason = "TODO")]
418pub trait DeployCrateWrapper {
419    fn underlying(&self) -> Arc<RustCrateService>;
420
421    fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
422        self.underlying().stdout()
423    }
424
425    fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
426        self.underlying().stderr()
427    }
428
429    fn stdout_filter(
430        &self,
431        prefix: impl Into<String>,
432    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
433        self.underlying().stdout_filter(prefix.into())
434    }
435
436    fn stderr_filter(
437        &self,
438        prefix: impl Into<String>,
439    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
440        self.underlying().stderr_filter(prefix.into())
441    }
442
443    fn tracing_results(&self) -> Option<TracingResults> {
444        self.underlying().tracing_results().cloned()
445    }
446}
447
448#[expect(missing_docs, reason = "TODO")]
449#[derive(Clone)]
450pub struct TrybuildHost {
451    host: Arc<dyn Host>,
452    display_name: Option<String>,
453    rustflags: Option<String>,
454    additional_hydro_features: Vec<String>,
455    features: Vec<String>,
456    tracing: Option<TracingOptions>,
457    build_envs: Vec<(String, String)>,
458    name_hint: Option<String>,
459    cluster_idx: Option<usize>,
460}
461
462impl From<Arc<dyn Host>> for TrybuildHost {
463    fn from(host: Arc<dyn Host>) -> Self {
464        Self {
465            host,
466            display_name: None,
467            rustflags: None,
468            additional_hydro_features: vec![],
469            features: vec![],
470            tracing: None,
471            build_envs: vec![],
472            name_hint: None,
473            cluster_idx: None,
474        }
475    }
476}
477
478impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
479    fn from(host: Arc<H>) -> Self {
480        Self {
481            host,
482            display_name: None,
483            rustflags: None,
484            additional_hydro_features: vec![],
485            features: vec![],
486            tracing: None,
487            build_envs: vec![],
488            name_hint: None,
489            cluster_idx: None,
490        }
491    }
492}
493
494#[expect(missing_docs, reason = "TODO")]
495impl TrybuildHost {
496    pub fn new(host: Arc<dyn Host>) -> Self {
497        Self {
498            host,
499            display_name: None,
500            rustflags: None,
501            additional_hydro_features: vec![],
502            features: vec![],
503            tracing: None,
504            build_envs: vec![],
505            name_hint: None,
506            cluster_idx: None,
507        }
508    }
509
510    pub fn display_name(self, display_name: impl Into<String>) -> Self {
511        if self.display_name.is_some() {
512            panic!("{} already set", name_of!(display_name in Self));
513        }
514
515        Self {
516            display_name: Some(display_name.into()),
517            ..self
518        }
519    }
520
521    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
522        if self.rustflags.is_some() {
523            panic!("{} already set", name_of!(rustflags in Self));
524        }
525
526        Self {
527            rustflags: Some(rustflags.into()),
528            ..self
529        }
530    }
531
532    pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
533        Self {
534            additional_hydro_features,
535            ..self
536        }
537    }
538
539    pub fn features(self, features: Vec<String>) -> Self {
540        Self {
541            features: self.features.into_iter().chain(features).collect(),
542            ..self
543        }
544    }
545
546    pub fn tracing(self, tracing: TracingOptions) -> Self {
547        if self.tracing.is_some() {
548            panic!("{} already set", name_of!(tracing in Self));
549        }
550
551        Self {
552            tracing: Some(tracing),
553            ..self
554        }
555    }
556
557    pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
558        Self {
559            build_envs: self
560                .build_envs
561                .into_iter()
562                .chain(std::iter::once((key.into(), value.into())))
563                .collect(),
564            ..self
565        }
566    }
567}
568
569impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
570    type ProcessSpec = TrybuildHost;
571    fn into_process_spec(self) -> TrybuildHost {
572        TrybuildHost {
573            host: self,
574            display_name: None,
575            rustflags: None,
576            additional_hydro_features: vec![],
577            features: vec![],
578            tracing: None,
579            build_envs: vec![],
580            name_hint: None,
581            cluster_idx: None,
582        }
583    }
584}
585
586impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
587    type ProcessSpec = TrybuildHost;
588    fn into_process_spec(self) -> TrybuildHost {
589        TrybuildHost {
590            host: self,
591            display_name: None,
592            rustflags: None,
593            additional_hydro_features: vec![],
594            features: vec![],
595            tracing: None,
596            build_envs: vec![],
597            name_hint: None,
598            cluster_idx: None,
599        }
600    }
601}
602
603#[expect(missing_docs, reason = "TODO")]
604#[derive(Clone)]
605pub struct DeployExternal {
606    next_port: Rc<RefCell<usize>>,
607    host: Arc<dyn Host>,
608    underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
609    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
610    allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
611}
612
613impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
614    fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
615        assert!(
616            self.allocated_ports
617                .borrow_mut()
618                .insert(key, port.clone())
619                .is_none_or(|old| old == port)
620        );
621    }
622
623    fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy<'_>>::ExternalRawPort {
624        self.client_ports
625            .borrow()
626            .get(self.allocated_ports.borrow().get(&key).unwrap())
627            .unwrap()
628            .clone()
629    }
630
631    fn as_bytes_bidi(
632        &self,
633        key: usize,
634    ) -> impl Future<
635        Output = (
636            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
637            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
638        ),
639    > + 'a {
640        let port = self.raw_port(key);
641
642        async move {
643            let (source, sink) = port.connect().await.into_source_sink();
644            (
645                Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
646                Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
647            )
648        }
649    }
650
651    fn as_bincode_bidi<InT, OutT>(
652        &self,
653        key: usize,
654    ) -> impl Future<
655        Output = (
656            Pin<Box<dyn Stream<Item = OutT>>>,
657            Pin<Box<dyn Sink<InT, Error = Error>>>,
658        ),
659    > + 'a
660    where
661        InT: Serialize + 'static,
662        OutT: DeserializeOwned + 'static,
663    {
664        let port = self.raw_port(key);
665        async move {
666            let (source, sink) = port.connect().await.into_source_sink();
667            (
668                Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
669                    as Pin<Box<dyn Stream<Item = OutT>>>,
670                Box::pin(
671                    sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
672                ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
673            )
674        }
675    }
676
677    fn as_bincode_sink<T: Serialize + 'static>(
678        &self,
679        key: usize,
680    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
681        let port = self.raw_port(key);
682        async move {
683            let sink = port.connect().await.into_sink();
684            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
685                as Pin<Box<dyn Sink<T, Error = Error>>>
686        }
687    }
688
689    fn as_bincode_source<T: DeserializeOwned + 'static>(
690        &self,
691        key: usize,
692    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
693        let port = self.raw_port(key);
694        async move {
695            let source = port.connect().await.into_source();
696            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
697                as Pin<Box<dyn Stream<Item = T>>>
698        }
699    }
700}
701
702impl Node for DeployExternal {
703    type Port = String;
704    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
705    type InstantiateEnv = Deployment;
706
707    fn next_port(&self) -> Self::Port {
708        let next_port = *self.next_port.borrow();
709        *self.next_port.borrow_mut() += 1;
710
711        format!("port_{}", next_port)
712    }
713
714    fn instantiate(
715        &self,
716        env: &mut Self::InstantiateEnv,
717        _meta: &mut Self::Meta,
718        _graph: DfirGraph,
719        _extra_stmts: Vec<syn::Stmt>,
720    ) {
721        let service = env.CustomService(self.host.clone(), vec![]);
722        *self.underlying.borrow_mut() = Some(service);
723    }
724
725    fn update_meta(&self, _meta: &Self::Meta) {}
726}
727
728impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
729    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
730        DeployExternal {
731            next_port: Rc::new(RefCell::new(0)),
732            host: self,
733            underlying: Rc::new(RefCell::new(None)),
734            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
735            client_ports: Rc::new(RefCell::new(HashMap::new())),
736        }
737    }
738}
739
740impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
741    fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
742        DeployExternal {
743            next_port: Rc::new(RefCell::new(0)),
744            host: self,
745            underlying: Rc::new(RefCell::new(None)),
746            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
747            client_ports: Rc::new(RefCell::new(HashMap::new())),
748        }
749    }
750}
751
752pub(crate) enum CrateOrTrybuild {
753    Crate(RustCrate, Arc<dyn Host>),
754    Trybuild(TrybuildHost),
755}
756
757#[expect(missing_docs, reason = "TODO")]
758#[derive(Clone)]
759pub struct DeployNode {
760    id: usize,
761    next_port: Rc<RefCell<usize>>,
762    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
763    underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
764}
765
766impl DeployCrateWrapper for DeployNode {
767    fn underlying(&self) -> Arc<RustCrateService> {
768        Arc::clone(self.underlying.borrow().as_ref().unwrap())
769    }
770}
771
772impl Node for DeployNode {
773    type Port = String;
774    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
775    type InstantiateEnv = Deployment;
776
777    fn next_port(&self) -> String {
778        let next_port = *self.next_port.borrow();
779        *self.next_port.borrow_mut() += 1;
780
781        format!("port_{}", next_port)
782    }
783
784    fn update_meta(&self, meta: &Self::Meta) {
785        let underlying_node = self.underlying.borrow();
786        underlying_node.as_ref().unwrap().update_meta(HydroMeta {
787            clusters: meta.clone(),
788            cluster_id: None,
789            subgraph_id: self.id,
790        });
791    }
792
793    fn instantiate(
794        &self,
795        env: &mut Self::InstantiateEnv,
796        _meta: &mut Self::Meta,
797        graph: DfirGraph,
798        extra_stmts: Vec<syn::Stmt>,
799    ) {
800        let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
801            CrateOrTrybuild::Crate(c, host) => (c, host),
802            CrateOrTrybuild::Trybuild(trybuild) => {
803                let (bin_name, config) =
804                    create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint, false);
805                let host = trybuild.host.clone();
806                (
807                    create_trybuild_service(
808                        trybuild,
809                        &config.project_dir,
810                        &config.target_dir,
811                        &config.features,
812                        &bin_name,
813                    ),
814                    host,
815                )
816            }
817        };
818
819        *self.underlying.borrow_mut() = Some(env.add_service(service, host));
820    }
821}
822
823#[expect(missing_docs, reason = "TODO")]
824#[derive(Clone)]
825pub struct DeployClusterNode {
826    underlying: Arc<RustCrateService>,
827}
828
829impl DeployCrateWrapper for DeployClusterNode {
830    fn underlying(&self) -> Arc<RustCrateService> {
831        self.underlying.clone()
832    }
833}
834#[expect(missing_docs, reason = "TODO")]
835#[derive(Clone)]
836pub struct DeployCluster {
837    id: usize,
838    next_port: Rc<RefCell<usize>>,
839    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
840    members: Rc<RefCell<Vec<DeployClusterNode>>>,
841    name_hint: Option<String>,
842}
843
844impl DeployCluster {
845    #[expect(missing_docs, reason = "TODO")]
846    pub fn members(&self) -> Vec<DeployClusterNode> {
847        self.members.borrow().clone()
848    }
849}
850
851impl Node for DeployCluster {
852    type Port = String;
853    type Meta = HashMap<usize, Vec<TaglessMemberId>>;
854    type InstantiateEnv = Deployment;
855
856    fn next_port(&self) -> String {
857        let next_port = *self.next_port.borrow();
858        *self.next_port.borrow_mut() += 1;
859
860        format!("port_{}", next_port)
861    }
862
863    fn instantiate(
864        &self,
865        env: &mut Self::InstantiateEnv,
866        meta: &mut Self::Meta,
867        graph: DfirGraph,
868        extra_stmts: Vec<syn::Stmt>,
869    ) {
870        let has_trybuild = self
871            .cluster_spec
872            .borrow()
873            .as_ref()
874            .unwrap()
875            .iter()
876            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
877
878        let maybe_trybuild = if has_trybuild {
879            Some(create_graph_trybuild(
880                graph,
881                extra_stmts,
882                &self.name_hint,
883                false,
884            ))
885        } else {
886            None
887        };
888
889        let cluster_nodes = self
890            .cluster_spec
891            .borrow_mut()
892            .take()
893            .unwrap()
894            .into_iter()
895            .map(|spec| {
896                let (service, host) = match spec {
897                    CrateOrTrybuild::Crate(c, host) => (c, host),
898                    CrateOrTrybuild::Trybuild(trybuild) => {
899                        let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
900                        let host = trybuild.host.clone();
901                        (
902                            create_trybuild_service(
903                                trybuild,
904                                &config.project_dir,
905                                &config.target_dir,
906                                &config.features,
907                                bin_name,
908                            ),
909                            host,
910                        )
911                    }
912                };
913
914                env.add_service(service, host)
915            })
916            .collect::<Vec<_>>();
917        meta.insert(
918            self.id,
919            (0..(cluster_nodes.len() as u32))
920                .map(TaglessMemberId::from_raw_id)
921                .collect(),
922        );
923        *self.members.borrow_mut() = cluster_nodes
924            .into_iter()
925            .map(|n| DeployClusterNode { underlying: n })
926            .collect();
927    }
928
929    fn update_meta(&self, meta: &Self::Meta) {
930        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
931            node.underlying.update_meta(HydroMeta {
932                clusters: meta.clone(),
933                cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
934                subgraph_id: self.id,
935            });
936        }
937    }
938}
939
940#[expect(missing_docs, reason = "TODO")]
941#[derive(Clone)]
942pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
943
944impl DeployProcessSpec {
945    #[expect(missing_docs, reason = "TODO")]
946    pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
947        Self(t, host)
948    }
949}
950
951impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
952    fn build(self, id: usize, _name_hint: &str) -> DeployNode {
953        DeployNode {
954            id,
955            next_port: Rc::new(RefCell::new(0)),
956            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
957            underlying: Rc::new(RefCell::new(None)),
958        }
959    }
960}
961
962impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
963    fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
964        self.name_hint = Some(format!("{} (process {id})", name_hint));
965        DeployNode {
966            id,
967            next_port: Rc::new(RefCell::new(0)),
968            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
969            underlying: Rc::new(RefCell::new(None)),
970        }
971    }
972}
973
974#[expect(missing_docs, reason = "TODO")]
975#[derive(Clone)]
976pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
977
978impl DeployClusterSpec {
979    #[expect(missing_docs, reason = "TODO")]
980    pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
981        Self(crates)
982    }
983}
984
985impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
986    fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
987        DeployCluster {
988            id,
989            next_port: Rc::new(RefCell::new(0)),
990            cluster_spec: Rc::new(RefCell::new(Some(
991                self.0
992                    .into_iter()
993                    .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
994                    .collect(),
995            ))),
996            members: Rc::new(RefCell::new(vec![])),
997            name_hint: None,
998        }
999    }
1000}
1001
1002impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1003    fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1004        let name_hint = format!("{} (cluster {id})", name_hint);
1005        DeployCluster {
1006            id,
1007            next_port: Rc::new(RefCell::new(0)),
1008            cluster_spec: Rc::new(RefCell::new(Some(
1009                self.into_iter()
1010                    .enumerate()
1011                    .map(|(idx, b)| {
1012                        let mut b = b.into();
1013                        b.name_hint = Some(name_hint.clone());
1014                        b.cluster_idx = Some(idx);
1015                        CrateOrTrybuild::Trybuild(b)
1016                    })
1017                    .collect(),
1018            ))),
1019            members: Rc::new(RefCell::new(vec![])),
1020            name_hint: Some(name_hint),
1021        }
1022    }
1023}
1024
1025fn create_trybuild_service(
1026    trybuild: TrybuildHost,
1027    dir: &std::path::PathBuf,
1028    target_dir: &std::path::PathBuf,
1029    features: &Option<Vec<String>>,
1030    bin_name: &str,
1031) -> RustCrate {
1032    let mut ret = RustCrate::new(dir)
1033        .target_dir(target_dir)
1034        .example(bin_name)
1035        .no_default_features();
1036
1037    if let Some(display_name) = trybuild.display_name {
1038        ret = ret.display_name(display_name);
1039    } else if let Some(name_hint) = trybuild.name_hint {
1040        if let Some(cluster_idx) = trybuild.cluster_idx {
1041            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1042        } else {
1043            ret = ret.display_name(name_hint);
1044        }
1045    }
1046
1047    if let Some(rustflags) = trybuild.rustflags {
1048        ret = ret.rustflags(rustflags);
1049    }
1050
1051    if let Some(tracing) = trybuild.tracing {
1052        ret = ret.tracing(tracing);
1053    }
1054
1055    ret = ret.features(
1056        vec!["hydro___feature_deploy_integration".to_string()]
1057            .into_iter()
1058            .chain(
1059                trybuild
1060                    .additional_hydro_features
1061                    .into_iter()
1062                    .map(|runtime_feature| {
1063                        assert!(
1064                            HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1065                            "{runtime_feature} is not a valid Hydro runtime feature"
1066                        );
1067                        format!("hydro___feature_{runtime_feature}")
1068                    }),
1069            )
1070            .chain(trybuild.features),
1071    );
1072
1073    for (key, value) in trybuild.build_envs {
1074        ret = ret.build_env(key, value);
1075    }
1076
1077    ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1078    ret = ret.config("build.incremental = false");
1079
1080    if let Some(features) = features {
1081        ret = ret.features(features);
1082    }
1083
1084    ret
1085}