Skip to main content

hydro_lang/deploy/
deploy_graph.rs

1//! Deployment backend for Hydro that uses [`hydro_deploy`] to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use slotmap::SparseSecondaryMap;
25use stageleft::{QuotedWithContext, RuntimeData};
26use syn::parse_quote;
27
28use super::deploy_runtime::*;
29use crate::compile::builder::ExternalPortId;
30use crate::compile::deploy_provider::{
31    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
32};
33use crate::compile::trybuild::generate::{
34    HYDRO_RUNTIME_FEATURES, LinkingMode, create_graph_trybuild,
35};
36use crate::location::dynamic::LocationId;
37use crate::location::member_id::TaglessMemberId;
38use crate::location::{LocationKey, MembershipEvent, NetworkHint};
39use crate::staging_util::get_this_crate;
40
41/// Deployment backend that uses [`hydro_deploy`] for provisioning and launching.
42///
43/// Automatically used when you call [`crate::compile::builder::FlowBuilder::deploy`] and pass in
44/// an `&mut` reference to [`hydro_deploy::Deployment`] as the deployment context.
45pub enum HydroDeploy {}
46
47impl<'a> Deploy<'a> for HydroDeploy {
48    /// Map from Cluster location ID to member IDs.
49    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
50    type InstantiateEnv = Deployment;
51
52    type Process = DeployNode;
53    type Cluster = DeployCluster;
54    type External = DeployExternal;
55
56    fn o2o_sink_source(
57        _p1: &Self::Process,
58        p1_port: &<Self::Process as Node>::Port,
59        _p2: &Self::Process,
60        p2_port: &<Self::Process as Node>::Port,
61    ) -> (syn::Expr, syn::Expr) {
62        let p1_port = p1_port.as_str();
63        let p2_port = p2_port.as_str();
64        deploy_o2o(
65            RuntimeData::new("__hydro_lang_trybuild_cli"),
66            p1_port,
67            p2_port,
68        )
69    }
70
71    fn o2o_connect(
72        p1: &Self::Process,
73        p1_port: &<Self::Process as Node>::Port,
74        p2: &Self::Process,
75        p2_port: &<Self::Process as Node>::Port,
76    ) -> Box<dyn FnOnce()> {
77        let p1 = p1.clone();
78        let p1_port = p1_port.clone();
79        let p2 = p2.clone();
80        let p2_port = p2_port.clone();
81
82        Box::new(move || {
83            let self_underlying_borrow = p1.underlying.borrow();
84            let self_underlying = self_underlying_borrow.as_ref().unwrap();
85            let source_port = self_underlying.get_port(p1_port.clone());
86
87            let other_underlying_borrow = p2.underlying.borrow();
88            let other_underlying = other_underlying_borrow.as_ref().unwrap();
89            let recipient_port = other_underlying.get_port(p2_port.clone());
90
91            source_port.send_to(&recipient_port)
92        })
93    }
94
95    fn o2m_sink_source(
96        _p1: &Self::Process,
97        p1_port: &<Self::Process as Node>::Port,
98        _c2: &Self::Cluster,
99        c2_port: &<Self::Cluster as Node>::Port,
100    ) -> (syn::Expr, syn::Expr) {
101        let p1_port = p1_port.as_str();
102        let c2_port = c2_port.as_str();
103        deploy_o2m(
104            RuntimeData::new("__hydro_lang_trybuild_cli"),
105            p1_port,
106            c2_port,
107        )
108    }
109
110    fn o2m_connect(
111        p1: &Self::Process,
112        p1_port: &<Self::Process as Node>::Port,
113        c2: &Self::Cluster,
114        c2_port: &<Self::Cluster as Node>::Port,
115    ) -> Box<dyn FnOnce()> {
116        let p1 = p1.clone();
117        let p1_port = p1_port.clone();
118        let c2 = c2.clone();
119        let c2_port = c2_port.clone();
120
121        Box::new(move || {
122            let self_underlying_borrow = p1.underlying.borrow();
123            let self_underlying = self_underlying_borrow.as_ref().unwrap();
124            let source_port = self_underlying.get_port(p1_port.clone());
125
126            let recipient_port = DemuxSink {
127                demux: c2
128                    .members
129                    .borrow()
130                    .iter()
131                    .enumerate()
132                    .map(|(id, c)| {
133                        (
134                            id as u32,
135                            Arc::new(c.underlying.get_port(c2_port.clone()))
136                                as Arc<dyn RustCrateSink + 'static>,
137                        )
138                    })
139                    .collect(),
140            };
141
142            source_port.send_to(&recipient_port)
143        })
144    }
145
146    fn m2o_sink_source(
147        _c1: &Self::Cluster,
148        c1_port: &<Self::Cluster as Node>::Port,
149        _p2: &Self::Process,
150        p2_port: &<Self::Process as Node>::Port,
151    ) -> (syn::Expr, syn::Expr) {
152        let c1_port = c1_port.as_str();
153        let p2_port = p2_port.as_str();
154        deploy_m2o(
155            RuntimeData::new("__hydro_lang_trybuild_cli"),
156            c1_port,
157            p2_port,
158        )
159    }
160
161    fn m2o_connect(
162        c1: &Self::Cluster,
163        c1_port: &<Self::Cluster as Node>::Port,
164        p2: &Self::Process,
165        p2_port: &<Self::Process as Node>::Port,
166    ) -> Box<dyn FnOnce()> {
167        let c1 = c1.clone();
168        let c1_port = c1_port.clone();
169        let p2 = p2.clone();
170        let p2_port = p2_port.clone();
171
172        Box::new(move || {
173            let other_underlying_borrow = p2.underlying.borrow();
174            let other_underlying = other_underlying_borrow.as_ref().unwrap();
175            let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
176
177            for (i, node) in c1.members.borrow().iter().enumerate() {
178                let source_port = node.underlying.get_port(c1_port.clone());
179
180                TaggedSource {
181                    source: Arc::new(source_port),
182                    tag: i as u32,
183                }
184                .send_to(&recipient_port);
185            }
186        })
187    }
188
189    fn m2m_sink_source(
190        _c1: &Self::Cluster,
191        c1_port: &<Self::Cluster as Node>::Port,
192        _c2: &Self::Cluster,
193        c2_port: &<Self::Cluster as Node>::Port,
194    ) -> (syn::Expr, syn::Expr) {
195        let c1_port = c1_port.as_str();
196        let c2_port = c2_port.as_str();
197        deploy_m2m(
198            RuntimeData::new("__hydro_lang_trybuild_cli"),
199            c1_port,
200            c2_port,
201        )
202    }
203
204    fn m2m_connect(
205        c1: &Self::Cluster,
206        c1_port: &<Self::Cluster as Node>::Port,
207        c2: &Self::Cluster,
208        c2_port: &<Self::Cluster as Node>::Port,
209    ) -> Box<dyn FnOnce()> {
210        let c1 = c1.clone();
211        let c1_port = c1_port.clone();
212        let c2 = c2.clone();
213        let c2_port = c2_port.clone();
214
215        Box::new(move || {
216            for (i, sender) in c1.members.borrow().iter().enumerate() {
217                let source_port = sender.underlying.get_port(c1_port.clone());
218
219                let recipient_port = DemuxSink {
220                    demux: c2
221                        .members
222                        .borrow()
223                        .iter()
224                        .enumerate()
225                        .map(|(id, c)| {
226                            (
227                                id as u32,
228                                Arc::new(c.underlying.get_port(c2_port.clone()).merge())
229                                    as Arc<dyn RustCrateSink + 'static>,
230                            )
231                        })
232                        .collect(),
233                };
234
235                TaggedSource {
236                    source: Arc::new(source_port),
237                    tag: i as u32,
238                }
239                .send_to(&recipient_port);
240            }
241        })
242    }
243
244    fn e2o_many_source(
245        extra_stmts: &mut Vec<syn::Stmt>,
246        _p2: &Self::Process,
247        p2_port: &<Self::Process as Node>::Port,
248        codec_type: &syn::Type,
249        shared_handle: String,
250    ) -> syn::Expr {
251        let connect_ident = syn::Ident::new(
252            &format!("__hydro_deploy_many_{}_connect", &shared_handle),
253            Span::call_site(),
254        );
255        let source_ident = syn::Ident::new(
256            &format!("__hydro_deploy_many_{}_source", &shared_handle),
257            Span::call_site(),
258        );
259        let sink_ident = syn::Ident::new(
260            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
261            Span::call_site(),
262        );
263        let membership_ident = syn::Ident::new(
264            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
265            Span::call_site(),
266        );
267
268        let root = get_this_crate();
269
270        extra_stmts.push(syn::parse_quote! {
271            let #connect_ident = __hydro_lang_trybuild_cli
272                .port(#p2_port)
273                .connect::<#root::runtime_support::hydro_deploy_integration::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
274        });
275
276        extra_stmts.push(syn::parse_quote! {
277            let #source_ident = #connect_ident.source;
278        });
279
280        extra_stmts.push(syn::parse_quote! {
281            let #sink_ident = #connect_ident.sink;
282        });
283
284        extra_stmts.push(syn::parse_quote! {
285            let #membership_ident = #connect_ident.membership;
286        });
287
288        parse_quote!(#source_ident)
289    }
290
291    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
292        let sink_ident = syn::Ident::new(
293            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
294            Span::call_site(),
295        );
296        parse_quote!(#sink_ident)
297    }
298
299    fn e2o_source(
300        extra_stmts: &mut Vec<syn::Stmt>,
301        _p1: &Self::External,
302        _p1_port: &<Self::External as Node>::Port,
303        _p2: &Self::Process,
304        p2_port: &<Self::Process as Node>::Port,
305        codec_type: &syn::Type,
306        shared_handle: String,
307    ) -> syn::Expr {
308        let connect_ident = syn::Ident::new(
309            &format!("__hydro_deploy_{}_connect", &shared_handle),
310            Span::call_site(),
311        );
312        let source_ident = syn::Ident::new(
313            &format!("__hydro_deploy_{}_source", &shared_handle),
314            Span::call_site(),
315        );
316        let sink_ident = syn::Ident::new(
317            &format!("__hydro_deploy_{}_sink", &shared_handle),
318            Span::call_site(),
319        );
320
321        let root = get_this_crate();
322
323        extra_stmts.push(syn::parse_quote! {
324            let #connect_ident = __hydro_lang_trybuild_cli
325                .port(#p2_port)
326                .connect::<#root::runtime_support::hydro_deploy_integration::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
327        });
328
329        extra_stmts.push(syn::parse_quote! {
330            let #source_ident = #connect_ident.source;
331        });
332
333        extra_stmts.push(syn::parse_quote! {
334            let #sink_ident = #connect_ident.sink;
335        });
336
337        parse_quote!(#source_ident)
338    }
339
340    fn e2o_connect(
341        p1: &Self::External,
342        p1_port: &<Self::External as Node>::Port,
343        p2: &Self::Process,
344        p2_port: &<Self::Process as Node>::Port,
345        _many: bool,
346        server_hint: NetworkHint,
347    ) -> Box<dyn FnOnce()> {
348        let p1 = p1.clone();
349        let p1_port = p1_port.clone();
350        let p2 = p2.clone();
351        let p2_port = p2_port.clone();
352
353        Box::new(move || {
354            let self_underlying_borrow = p1.underlying.borrow();
355            let self_underlying = self_underlying_borrow.as_ref().unwrap();
356            let source_port = self_underlying.declare_many_client();
357
358            let other_underlying_borrow = p2.underlying.borrow();
359            let other_underlying = other_underlying_borrow.as_ref().unwrap();
360            let recipient_port = other_underlying.get_port_with_hint(
361                p2_port.clone(),
362                match server_hint {
363                    NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
364                    NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
365                },
366            );
367
368            source_port.send_to(&recipient_port);
369
370            p1.client_ports
371                .borrow_mut()
372                .insert(p1_port.clone(), source_port);
373        })
374    }
375
376    fn o2e_sink(
377        _p1: &Self::Process,
378        _p1_port: &<Self::Process as Node>::Port,
379        _p2: &Self::External,
380        _p2_port: &<Self::External as Node>::Port,
381        shared_handle: String,
382    ) -> syn::Expr {
383        let sink_ident = syn::Ident::new(
384            &format!("__hydro_deploy_{}_sink", &shared_handle),
385            Span::call_site(),
386        );
387        parse_quote!(#sink_ident)
388    }
389
390    fn cluster_ids(
391        of_cluster: LocationKey,
392    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
393        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
394    }
395
396    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
397        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
398    }
399
400    fn cluster_membership_stream(
401        location_id: &LocationId,
402    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
403    {
404        cluster_membership_stream(location_id)
405    }
406}
407
408#[expect(missing_docs, reason = "TODO")]
409pub trait DeployCrateWrapper {
410    fn underlying(&self) -> Arc<RustCrateService>;
411
412    fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
413        self.underlying().stdout()
414    }
415
416    fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
417        self.underlying().stderr()
418    }
419
420    fn stdout_filter(
421        &self,
422        prefix: impl Into<String>,
423    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
424        self.underlying().stdout_filter(prefix.into())
425    }
426
427    fn stderr_filter(
428        &self,
429        prefix: impl Into<String>,
430    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
431        self.underlying().stderr_filter(prefix.into())
432    }
433}
434
435#[expect(missing_docs, reason = "TODO")]
436#[derive(Clone)]
437pub struct TrybuildHost {
438    host: Arc<dyn Host>,
439    display_name: Option<String>,
440    rustflags: Option<String>,
441    profile: Option<String>,
442    additional_hydro_features: Vec<String>,
443    features: Vec<String>,
444    tracing: Option<TracingOptions>,
445    build_envs: Vec<(String, String)>,
446    name_hint: Option<String>,
447    cluster_idx: Option<usize>,
448}
449
450impl From<Arc<dyn Host>> for TrybuildHost {
451    fn from(host: Arc<dyn Host>) -> Self {
452        Self {
453            host,
454            display_name: None,
455            rustflags: None,
456            profile: None,
457            additional_hydro_features: vec![],
458            features: vec![],
459            tracing: None,
460            build_envs: vec![],
461            name_hint: None,
462            cluster_idx: None,
463        }
464    }
465}
466
467impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
468    fn from(host: Arc<H>) -> Self {
469        Self {
470            host,
471            display_name: None,
472            rustflags: None,
473            profile: None,
474            additional_hydro_features: vec![],
475            features: vec![],
476            tracing: None,
477            build_envs: vec![],
478            name_hint: None,
479            cluster_idx: None,
480        }
481    }
482}
483
484#[expect(missing_docs, reason = "TODO")]
485impl TrybuildHost {
486    pub fn new(host: Arc<dyn Host>) -> Self {
487        Self {
488            host,
489            display_name: None,
490            rustflags: None,
491            profile: None,
492            additional_hydro_features: vec![],
493            features: vec![],
494            tracing: None,
495            build_envs: vec![],
496            name_hint: None,
497            cluster_idx: None,
498        }
499    }
500
501    pub fn display_name(self, display_name: impl Into<String>) -> Self {
502        if self.display_name.is_some() {
503            panic!("{} already set", name_of!(display_name in Self));
504        }
505
506        Self {
507            display_name: Some(display_name.into()),
508            ..self
509        }
510    }
511
512    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
513        if self.rustflags.is_some() {
514            panic!("{} already set", name_of!(rustflags in Self));
515        }
516
517        Self {
518            rustflags: Some(rustflags.into()),
519            ..self
520        }
521    }
522
523    pub fn profile(self, profile: impl Into<String>) -> Self {
524        if self.profile.is_some() {
525            panic!("{} already set", name_of!(profile in Self));
526        }
527
528        Self {
529            profile: Some(profile.into()),
530            ..self
531        }
532    }
533
534    pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
535        Self {
536            additional_hydro_features,
537            ..self
538        }
539    }
540
541    pub fn features(self, features: Vec<String>) -> Self {
542        Self {
543            features: self.features.into_iter().chain(features).collect(),
544            ..self
545        }
546    }
547
548    pub fn tracing(self, tracing: TracingOptions) -> Self {
549        if self.tracing.is_some() {
550            panic!("{} already set", name_of!(tracing in Self));
551        }
552
553        Self {
554            tracing: Some(tracing),
555            ..self
556        }
557    }
558
559    pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
560        Self {
561            build_envs: self
562                .build_envs
563                .into_iter()
564                .chain(std::iter::once((key.into(), value.into())))
565                .collect(),
566            ..self
567        }
568    }
569}
570
571impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
572    type ProcessSpec = TrybuildHost;
573    fn into_process_spec(self) -> TrybuildHost {
574        TrybuildHost {
575            host: self,
576            display_name: None,
577            rustflags: None,
578            profile: None,
579            additional_hydro_features: vec![],
580            features: vec![],
581            tracing: None,
582            build_envs: vec![],
583            name_hint: None,
584            cluster_idx: None,
585        }
586    }
587}
588
589impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
590    type ProcessSpec = TrybuildHost;
591    fn into_process_spec(self) -> TrybuildHost {
592        TrybuildHost {
593            host: self,
594            display_name: None,
595            rustflags: None,
596            profile: None,
597            additional_hydro_features: vec![],
598            features: vec![],
599            tracing: None,
600            build_envs: vec![],
601            name_hint: None,
602            cluster_idx: None,
603        }
604    }
605}
606
607#[expect(missing_docs, reason = "TODO")]
608#[derive(Clone)]
609pub struct DeployExternal {
610    next_port: Rc<RefCell<usize>>,
611    host: Arc<dyn Host>,
612    underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
613    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
614    allocated_ports: Rc<RefCell<HashMap<ExternalPortId, String>>>,
615}
616
617impl DeployExternal {
618    pub(crate) fn raw_port(&self, external_port_id: ExternalPortId) -> CustomClientPort {
619        self.client_ports
620            .borrow()
621            .get(
622                self.allocated_ports
623                    .borrow()
624                    .get(&external_port_id)
625                    .unwrap(),
626            )
627            .unwrap()
628            .clone()
629    }
630}
631
632impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
633    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
634        assert!(
635            self.allocated_ports
636                .borrow_mut()
637                .insert(external_port_id, port.clone())
638                .is_none_or(|old| old == port)
639        );
640    }
641
642    fn as_bytes_bidi(
643        &self,
644        external_port_id: ExternalPortId,
645    ) -> impl Future<
646        Output = (
647            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
648            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
649        ),
650    > + 'a {
651        let port = self.raw_port(external_port_id);
652
653        async move {
654            let (source, sink) = port.connect().await.into_source_sink();
655            (
656                Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
657                Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
658            )
659        }
660    }
661
662    fn as_bincode_bidi<InT, OutT>(
663        &self,
664        external_port_id: ExternalPortId,
665    ) -> impl Future<
666        Output = (
667            Pin<Box<dyn Stream<Item = OutT>>>,
668            Pin<Box<dyn Sink<InT, Error = Error>>>,
669        ),
670    > + 'a
671    where
672        InT: Serialize + 'static,
673        OutT: DeserializeOwned + 'static,
674    {
675        let port = self.raw_port(external_port_id);
676        async move {
677            let (source, sink) = port.connect().await.into_source_sink();
678            (
679                Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
680                    as Pin<Box<dyn Stream<Item = OutT>>>,
681                Box::pin(
682                    sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
683                ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
684            )
685        }
686    }
687
688    fn as_bincode_sink<T: Serialize + 'static>(
689        &self,
690        external_port_id: ExternalPortId,
691    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
692        let port = self.raw_port(external_port_id);
693        async move {
694            let sink = port.connect().await.into_sink();
695            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
696                as Pin<Box<dyn Sink<T, Error = Error>>>
697        }
698    }
699
700    fn as_bincode_source<T: DeserializeOwned + 'static>(
701        &self,
702        external_port_id: ExternalPortId,
703    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
704        let port = self.raw_port(external_port_id);
705        async move {
706            let source = port.connect().await.into_source();
707            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
708                as Pin<Box<dyn Stream<Item = T>>>
709        }
710    }
711}
712
713impl Node for DeployExternal {
714    type Port = String;
715    /// Map from Cluster location ID to member IDs.
716    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
717    type InstantiateEnv = Deployment;
718
719    fn next_port(&self) -> Self::Port {
720        let next_port = *self.next_port.borrow();
721        *self.next_port.borrow_mut() += 1;
722
723        format!("port_{}", next_port)
724    }
725
726    fn instantiate(
727        &self,
728        env: &mut Self::InstantiateEnv,
729        _meta: &mut Self::Meta,
730        _graph: DfirGraph,
731        extra_stmts: &[syn::Stmt],
732        sidecars: &[syn::Expr],
733    ) {
734        assert!(extra_stmts.is_empty());
735        assert!(sidecars.is_empty());
736        let service = env.CustomService(self.host.clone(), vec![]);
737        *self.underlying.borrow_mut() = Some(service);
738    }
739
740    fn update_meta(&self, _meta: &Self::Meta) {}
741}
742
743impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
744    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
745        DeployExternal {
746            next_port: Rc::new(RefCell::new(0)),
747            host: self,
748            underlying: Rc::new(RefCell::new(None)),
749            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
750            client_ports: Rc::new(RefCell::new(HashMap::new())),
751        }
752    }
753}
754
755impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
756    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
757        DeployExternal {
758            next_port: Rc::new(RefCell::new(0)),
759            host: self,
760            underlying: Rc::new(RefCell::new(None)),
761            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
762            client_ports: Rc::new(RefCell::new(HashMap::new())),
763        }
764    }
765}
766
767pub(crate) enum CrateOrTrybuild {
768    Crate(RustCrate, Arc<dyn Host>),
769    Trybuild(TrybuildHost),
770}
771
772#[expect(missing_docs, reason = "TODO")]
773#[derive(Clone)]
774pub struct DeployNode {
775    next_port: Rc<RefCell<usize>>,
776    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
777    underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
778}
779
780impl DeployCrateWrapper for DeployNode {
781    fn underlying(&self) -> Arc<RustCrateService> {
782        Arc::clone(self.underlying.borrow().as_ref().unwrap())
783    }
784}
785
786impl Node for DeployNode {
787    type Port = String;
788    /// Map from Cluster location ID to member IDs.
789    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
790    type InstantiateEnv = Deployment;
791
792    fn next_port(&self) -> String {
793        let next_port = *self.next_port.borrow();
794        *self.next_port.borrow_mut() += 1;
795
796        format!("port_{}", next_port)
797    }
798
799    fn update_meta(&self, meta: &Self::Meta) {
800        let underlying_node = self.underlying.borrow();
801        underlying_node.as_ref().unwrap().update_meta(HydroMeta {
802            clusters: meta.clone(),
803            cluster_id: None,
804        });
805    }
806
807    fn instantiate(
808        &self,
809        env: &mut Self::InstantiateEnv,
810        _meta: &mut Self::Meta,
811        graph: DfirGraph,
812        extra_stmts: &[syn::Stmt],
813        sidecars: &[syn::Expr],
814    ) {
815        let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
816            CrateOrTrybuild::Crate(c, host) => (c, host),
817            CrateOrTrybuild::Trybuild(trybuild) => {
818                // Determine linking mode based on host target type
819                let linking_mode = if !cfg!(target_os = "windows")
820                    && trybuild.host.target_type() == hydro_deploy::HostTargetType::Local
821                {
822                    // When compiling for local, prefer dynamic linking to reduce binary size
823                    // Windows is currently not supported due to https://github.com/bevyengine/bevy/pull/2016
824                    LinkingMode::Dynamic
825                } else {
826                    LinkingMode::Static
827                };
828                let (bin_name, config) = create_graph_trybuild(
829                    graph,
830                    extra_stmts,
831                    sidecars,
832                    trybuild.name_hint.as_deref(),
833                    crate::compile::trybuild::generate::DeployMode::HydroDeploy,
834                    linking_mode,
835                );
836                let host = trybuild.host.clone();
837                (
838                    create_trybuild_service(
839                        trybuild,
840                        &config.project_dir,
841                        &config.target_dir,
842                        config.features.as_deref(),
843                        &bin_name,
844                        &config.linking_mode,
845                    ),
846                    host,
847                )
848            }
849        };
850
851        *self.underlying.borrow_mut() = Some(env.add_service(service, host));
852    }
853}
854
855#[expect(missing_docs, reason = "TODO")]
856#[derive(Clone)]
857pub struct DeployClusterNode {
858    underlying: Arc<RustCrateService>,
859}
860
861impl DeployCrateWrapper for DeployClusterNode {
862    fn underlying(&self) -> Arc<RustCrateService> {
863        self.underlying.clone()
864    }
865}
866#[expect(missing_docs, reason = "TODO")]
867#[derive(Clone)]
868pub struct DeployCluster {
869    key: LocationKey,
870    next_port: Rc<RefCell<usize>>,
871    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
872    members: Rc<RefCell<Vec<DeployClusterNode>>>,
873    name_hint: Option<String>,
874}
875
876impl DeployCluster {
877    #[expect(missing_docs, reason = "TODO")]
878    pub fn members(&self) -> Vec<DeployClusterNode> {
879        self.members.borrow().clone()
880    }
881}
882
883impl Node for DeployCluster {
884    type Port = String;
885    /// Map from Cluster location ID to member IDs.
886    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
887    type InstantiateEnv = Deployment;
888
889    fn next_port(&self) -> String {
890        let next_port = *self.next_port.borrow();
891        *self.next_port.borrow_mut() += 1;
892
893        format!("port_{}", next_port)
894    }
895
896    fn instantiate(
897        &self,
898        env: &mut Self::InstantiateEnv,
899        meta: &mut Self::Meta,
900        graph: DfirGraph,
901        extra_stmts: &[syn::Stmt],
902        sidecars: &[syn::Expr],
903    ) {
904        let has_trybuild = self
905            .cluster_spec
906            .borrow()
907            .as_ref()
908            .unwrap()
909            .iter()
910            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
911
912        // For clusters, use static linking if ANY host is non-local (conservative approach)
913        let linking_mode = if !cfg!(target_os = "windows")
914            && self
915                .cluster_spec
916                .borrow()
917                .as_ref()
918                .unwrap()
919                .iter()
920                .all(|spec| match spec {
921                    CrateOrTrybuild::Crate(_, _) => true, // crates handle their own linking
922                    CrateOrTrybuild::Trybuild(t) => {
923                        t.host.target_type() == hydro_deploy::HostTargetType::Local
924                    }
925                }) {
926            // See comment above for Windows exception
927            LinkingMode::Dynamic
928        } else {
929            LinkingMode::Static
930        };
931
932        let maybe_trybuild = if has_trybuild {
933            Some(create_graph_trybuild(
934                graph,
935                extra_stmts,
936                sidecars,
937                self.name_hint.as_deref(),
938                crate::compile::trybuild::generate::DeployMode::HydroDeploy,
939                linking_mode,
940            ))
941        } else {
942            None
943        };
944
945        let cluster_nodes = self
946            .cluster_spec
947            .borrow_mut()
948            .take()
949            .unwrap()
950            .into_iter()
951            .map(|spec| {
952                let (service, host) = match spec {
953                    CrateOrTrybuild::Crate(c, host) => (c, host),
954                    CrateOrTrybuild::Trybuild(trybuild) => {
955                        let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
956                        let host = trybuild.host.clone();
957                        (
958                            create_trybuild_service(
959                                trybuild,
960                                &config.project_dir,
961                                &config.target_dir,
962                                config.features.as_deref(),
963                                bin_name,
964                                &config.linking_mode,
965                            ),
966                            host,
967                        )
968                    }
969                };
970
971                env.add_service(service, host)
972            })
973            .collect::<Vec<_>>();
974        meta.insert(
975            self.key,
976            (0..(cluster_nodes.len() as u32))
977                .map(TaglessMemberId::from_raw_id)
978                .collect(),
979        );
980        *self.members.borrow_mut() = cluster_nodes
981            .into_iter()
982            .map(|n| DeployClusterNode { underlying: n })
983            .collect();
984    }
985
986    fn update_meta(&self, meta: &Self::Meta) {
987        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
988            node.underlying.update_meta(HydroMeta {
989                clusters: meta.clone(),
990                cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
991            });
992        }
993    }
994}
995
996#[expect(missing_docs, reason = "TODO")]
997#[derive(Clone)]
998pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
999
1000impl DeployProcessSpec {
1001    #[expect(missing_docs, reason = "TODO")]
1002    pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
1003        Self(t, host)
1004    }
1005}
1006
1007impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
1008    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployNode {
1009        DeployNode {
1010            next_port: Rc::new(RefCell::new(0)),
1011            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
1012            underlying: Rc::new(RefCell::new(None)),
1013        }
1014    }
1015}
1016
1017impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
1018    fn build(mut self, key: LocationKey, name_hint: &str) -> DeployNode {
1019        self.name_hint = Some(format!("{} (process {})", name_hint, key));
1020        DeployNode {
1021            next_port: Rc::new(RefCell::new(0)),
1022            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
1023            underlying: Rc::new(RefCell::new(None)),
1024        }
1025    }
1026}
1027
1028#[expect(missing_docs, reason = "TODO")]
1029#[derive(Clone)]
1030pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1031
1032impl DeployClusterSpec {
1033    #[expect(missing_docs, reason = "TODO")]
1034    pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1035        Self(crates)
1036    }
1037}
1038
1039impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1040    fn build(self, key: LocationKey, _name_hint: &str) -> DeployCluster {
1041        DeployCluster {
1042            key,
1043            next_port: Rc::new(RefCell::new(0)),
1044            cluster_spec: Rc::new(RefCell::new(Some(
1045                self.0
1046                    .into_iter()
1047                    .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1048                    .collect(),
1049            ))),
1050            members: Rc::new(RefCell::new(vec![])),
1051            name_hint: None,
1052        }
1053    }
1054}
1055
1056impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1057    fn build(self, key: LocationKey, name_hint: &str) -> DeployCluster {
1058        let name_hint = format!("{} (cluster {})", name_hint, key);
1059        DeployCluster {
1060            key,
1061            next_port: Rc::new(RefCell::new(0)),
1062            cluster_spec: Rc::new(RefCell::new(Some(
1063                self.into_iter()
1064                    .enumerate()
1065                    .map(|(idx, b)| {
1066                        let mut b = b.into();
1067                        b.name_hint = Some(name_hint.clone());
1068                        b.cluster_idx = Some(idx);
1069                        CrateOrTrybuild::Trybuild(b)
1070                    })
1071                    .collect(),
1072            ))),
1073            members: Rc::new(RefCell::new(vec![])),
1074            name_hint: Some(name_hint),
1075        }
1076    }
1077}
1078
1079fn create_trybuild_service(
1080    trybuild: TrybuildHost,
1081    dir: &std::path::Path,
1082    target_dir: &std::path::PathBuf,
1083    features: Option<&[String]>,
1084    bin_name: &str,
1085    linking_mode: &LinkingMode,
1086) -> RustCrate {
1087    // For dynamic linking, use the dylib-examples crate; for static, use the base crate
1088    let crate_dir = match linking_mode {
1089        LinkingMode::Dynamic => dir.join("dylib-examples"),
1090        LinkingMode::Static => dir.to_path_buf(),
1091    };
1092
1093    let mut ret = RustCrate::new(&crate_dir)
1094        .target_dir(target_dir)
1095        .example(bin_name)
1096        .no_default_features();
1097
1098    ret = ret.set_is_dylib(matches!(linking_mode, LinkingMode::Dynamic));
1099
1100    if let Some(display_name) = trybuild.display_name {
1101        ret = ret.display_name(display_name);
1102    } else if let Some(name_hint) = trybuild.name_hint {
1103        if let Some(cluster_idx) = trybuild.cluster_idx {
1104            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1105        } else {
1106            ret = ret.display_name(name_hint);
1107        }
1108    }
1109
1110    if let Some(rustflags) = trybuild.rustflags {
1111        ret = ret.rustflags(rustflags);
1112    }
1113
1114    if let Some(profile) = trybuild.profile {
1115        ret = ret.profile(profile);
1116    }
1117
1118    if let Some(tracing) = trybuild.tracing {
1119        ret = ret.tracing(tracing);
1120    }
1121
1122    ret = ret.features(
1123        vec!["hydro___feature_deploy_integration".to_owned()]
1124            .into_iter()
1125            .chain(
1126                trybuild
1127                    .additional_hydro_features
1128                    .into_iter()
1129                    .map(|runtime_feature| {
1130                        assert!(
1131                            HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1132                            "{runtime_feature} is not a valid Hydro runtime feature"
1133                        );
1134                        format!("hydro___feature_{runtime_feature}")
1135                    }),
1136            )
1137            .chain(trybuild.features),
1138    );
1139
1140    for (key, value) in trybuild.build_envs {
1141        ret = ret.build_env(key, value);
1142    }
1143
1144    ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1145    ret = ret.config("build.incremental = false");
1146
1147    if let Some(features) = features {
1148        ret = ret.features(features);
1149    }
1150
1151    ret
1152}