Skip to main content

hydro_lang/compile/
deploy.rs

1use std::collections::HashMap;
2use std::io::Error;
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Sink, Stream};
8use proc_macro2::Span;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
12use stageleft::QuotedWithContext;
13
14use super::built::build_inner;
15use super::compiled::CompiledFlow;
16use super::deploy_provider::{
17    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
18};
19use super::ir::HydroRoot;
20use crate::live_collections::stream::{Ordering, Retries};
21use crate::location::dynamic::LocationId;
22use crate::location::external_process::{
23    ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
24};
25use crate::location::{Cluster, External, Location, LocationKey, LocationType, Process};
26use crate::staging_util::Invariant;
27use crate::telemetry::Sidecar;
28
29pub struct DeployFlow<'a, D>
30where
31    D: Deploy<'a>,
32{
33    pub(super) ir: Vec<HydroRoot>,
34
35    pub(super) locations: SlotMap<LocationKey, LocationType>,
36    pub(super) location_names: SecondaryMap<LocationKey, String>,
37
38    /// Deployed instances of each process in the flow
39    pub(super) processes: SparseSecondaryMap<LocationKey, D::Process>,
40    pub(super) clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
41    pub(super) externals: SparseSecondaryMap<LocationKey, D::External>,
42
43    /// Sidecars which may be added to each location (process or cluster, not externals).
44    /// See [`crate::telemetry::Sidecar`].
45    pub(super) sidecars: SparseSecondaryMap<LocationKey, Vec<syn::Expr>>,
46
47    /// Application name used in telemetry.
48    pub(super) flow_name: String,
49
50    pub(super) _phantom: Invariant<'a, D>,
51}
52
53impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
54    pub fn ir(&self) -> &Vec<HydroRoot> {
55        &self.ir
56    }
57
58    /// Application name used in telemetry.
59    pub fn flow_name(&self) -> &str {
60        &self.flow_name
61    }
62
63    pub fn with_process<P>(
64        mut self,
65        process: &Process<P>,
66        spec: impl IntoProcessSpec<'a, D>,
67    ) -> Self {
68        self.processes.insert(
69            process.key,
70            spec.into_process_spec()
71                .build(process.key, &self.location_names[process.key]),
72        );
73        self
74    }
75
76    /// TODO(mingwei): unstable API
77    #[doc(hidden)]
78    pub fn with_process_erased(
79        mut self,
80        process_loc_key: LocationKey,
81        spec: impl IntoProcessSpec<'a, D>,
82    ) -> Self {
83        assert_eq!(
84            Some(&LocationType::Process),
85            self.locations.get(process_loc_key),
86            "No process with the given `LocationKey` was found."
87        );
88        self.processes.insert(
89            process_loc_key,
90            spec.into_process_spec()
91                .build(process_loc_key, &self.location_names[process_loc_key]),
92        );
93        self
94    }
95
96    pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
97        mut self,
98        spec: impl Fn() -> S,
99    ) -> Self {
100        for (location_key, &location_type) in self.locations.iter() {
101            if LocationType::Process == location_type {
102                self.processes
103                    .entry(location_key)
104                    .expect("location was removed")
105                    .or_insert_with(|| {
106                        spec()
107                            .into_process_spec()
108                            .build(location_key, &self.location_names[location_key])
109                    });
110            }
111        }
112        self
113    }
114
115    pub fn with_cluster<C>(mut self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
116        self.clusters.insert(
117            cluster.key,
118            spec.build(cluster.key, &self.location_names[cluster.key]),
119        );
120        self
121    }
122
123    /// TODO(mingwei): unstable API
124    #[doc(hidden)]
125    pub fn with_cluster_erased(
126        mut self,
127        cluster_loc_key: LocationKey,
128        spec: impl ClusterSpec<'a, D>,
129    ) -> Self {
130        assert_eq!(
131            Some(&LocationType::Cluster),
132            self.locations.get(cluster_loc_key),
133            "No cluster with the given `LocationKey` was found."
134        );
135        self.clusters.insert(
136            cluster_loc_key,
137            spec.build(cluster_loc_key, &self.location_names[cluster_loc_key]),
138        );
139        self
140    }
141
142    pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
143        mut self,
144        spec: impl Fn() -> S,
145    ) -> Self {
146        for (location_key, &location_type) in self.locations.iter() {
147            if LocationType::Cluster == location_type {
148                self.clusters
149                    .entry(location_key)
150                    .expect("location was removed")
151                    .or_insert_with(|| {
152                        spec().build(location_key, &self.location_names[location_key])
153                    });
154            }
155        }
156        self
157    }
158
159    pub fn with_external<P>(
160        mut self,
161        external: &External<P>,
162        spec: impl ExternalSpec<'a, D>,
163    ) -> Self {
164        self.externals.insert(
165            external.key,
166            spec.build(external.key, &self.location_names[external.key]),
167        );
168        self
169    }
170
171    pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
172        mut self,
173        spec: impl Fn() -> S,
174    ) -> Self {
175        for (location_key, &location_type) in self.locations.iter() {
176            if LocationType::External == location_type {
177                self.externals
178                    .entry(location_key)
179                    .expect("location was removed")
180                    .or_insert_with(|| {
181                        spec().build(location_key, &self.location_names[location_key])
182                    });
183            }
184        }
185        self
186    }
187
188    /// Adds a [`Sidecar`] to all processes and clusters in the flow.
189    pub fn with_sidecar_all(mut self, sidecar: &impl Sidecar) -> Self {
190        for (location_key, &location_type) in self.locations.iter() {
191            if !matches!(location_type, LocationType::Process | LocationType::Cluster) {
192                continue;
193            }
194
195            let location_name = &self.location_names[location_key];
196
197            let sidecar = sidecar.to_expr(
198                self.flow_name(),
199                location_key,
200                location_type,
201                location_name,
202                &quote::format_ident!("{}", super::DFIR_IDENT),
203            );
204            self.sidecars
205                .entry(location_key)
206                .expect("location was removed")
207                .or_default()
208                .push(sidecar);
209        }
210
211        self
212    }
213
214    /// Adds a [`Sidecar`] to the given location.
215    pub fn with_sidecar_internal(
216        mut self,
217        location_key: LocationKey,
218        sidecar: &impl Sidecar,
219    ) -> Self {
220        let location_type = self.locations[location_key];
221        let location_name = &self.location_names[location_key];
222        let sidecar = sidecar.to_expr(
223            self.flow_name(),
224            location_key,
225            location_type,
226            location_name,
227            &quote::format_ident!("{}", super::DFIR_IDENT),
228        );
229        self.sidecars
230            .entry(location_key)
231            .expect("location was removed")
232            .or_default()
233            .push(sidecar);
234        self
235    }
236
237    /// Adds a [`Sidecar`] to a specific process in the flow.
238    pub fn with_sidecar_process(self, process: &Process<()>, sidecar: &impl Sidecar) -> Self {
239        self.with_sidecar_internal(process.key, sidecar)
240    }
241
242    /// Adds a [`Sidecar`] to a specific cluster in the flow.
243    pub fn with_sidecar_cluster(self, cluster: &Cluster<()>, sidecar: &impl Sidecar) -> Self {
244        self.with_sidecar_internal(cluster.key, sidecar)
245    }
246
247    /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) without networking.
248    /// Useful for generating Mermaid diagrams of the DFIR.
249    ///
250    /// (This returned DFIR will not compile due to the networking missing).
251    pub fn preview_compile(&mut self) -> CompiledFlow<'a> {
252        // NOTE: `build_inner` does not actually mutate the IR, but `&mut` is required
253        // only because the shared traversal logic requires it
254        CompiledFlow {
255            dfir: build_inner::<D>(&mut self.ir),
256            extra_stmts: SparseSecondaryMap::new(),
257            sidecars: SparseSecondaryMap::new(),
258            _phantom: PhantomData,
259        }
260    }
261
262    /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) including networking.
263    ///
264    /// (This does not compile the DFIR itself, instead use [`Self::deploy`] to compile & deploy the DFIR).
265    pub fn compile(mut self) -> CompiledFlow<'a> {
266        self.compile_internal()
267    }
268
269    /// Same as [`Self::compile`] but does not invalidate `self`, for internal use.
270    ///
271    /// Empties `self.sidecars` and modifies `self.ir`, leaving `self` in a partial state.
272    fn compile_internal(&mut self) -> CompiledFlow<'a> {
273        let mut seen_tees: HashMap<_, _> = HashMap::new();
274        let mut extra_stmts = SparseSecondaryMap::new();
275        for leaf in self.ir.iter_mut() {
276            leaf.compile_network::<D>(
277                &mut extra_stmts,
278                &mut seen_tees,
279                &self.processes,
280                &self.clusters,
281                &self.externals,
282            );
283        }
284
285        CompiledFlow {
286            dfir: build_inner::<D>(&mut self.ir),
287            extra_stmts,
288            sidecars: std::mem::take(&mut self.sidecars),
289            _phantom: PhantomData,
290        }
291    }
292
293    /// Creates the variables for cluster IDs and adds them into `extra_stmts`.
294    fn cluster_id_stmts(&self, extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>) {
295        #[expect(
296            clippy::disallowed_methods,
297            reason = "nondeterministic iteration order, will be sorted"
298        )]
299        let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
300        all_clusters_sorted.sort();
301
302        for cluster_key in all_clusters_sorted {
303            let self_id_ident = syn::Ident::new(
304                &format!("__hydro_lang_cluster_self_id_{}", cluster_key),
305                Span::call_site(),
306            );
307            let self_id_expr = D::cluster_self_id().splice_untyped();
308            extra_stmts
309                .entry(cluster_key)
310                .expect("location was removed")
311                .or_default()
312                .push(syn::parse_quote! {
313                    let #self_id_ident = &*Box::leak(Box::new(#self_id_expr));
314                });
315
316            let process_cluster_locations = self.location_names.keys().filter(|&location_key| {
317                self.processes.contains_key(location_key)
318                    || self.clusters.contains_key(location_key)
319            });
320            for other_location in process_cluster_locations {
321                let other_id_ident = syn::Ident::new(
322                    &format!("__hydro_lang_cluster_ids_{}", cluster_key),
323                    Span::call_site(),
324                );
325                let other_id_expr = D::cluster_ids(cluster_key).splice_untyped();
326                extra_stmts
327                    .entry(other_location)
328                    .expect("location was removed")
329                    .or_default()
330                    .push(syn::parse_quote! {
331                        let #other_id_ident = #other_id_expr;
332                    });
333            }
334        }
335    }
336
337    /// Compiles and deploys the flow.
338    ///
339    /// Rough outline of steps:
340    /// * Compiles the Hydro into DFIR.
341    /// * Instantiates nodes as configured.
342    /// * Compiles the corresponding DFIR into binaries for nodes as needed.
343    /// * Connects up networking as needed.
344    #[must_use]
345    pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
346        let CompiledFlow {
347            dfir,
348            mut extra_stmts,
349            mut sidecars,
350            _phantom,
351        } = self.compile_internal();
352
353        let mut compiled = dfir;
354        self.cluster_id_stmts(&mut extra_stmts);
355        let mut meta = D::Meta::default();
356
357        let (processes, clusters, externals) = (
358            self.processes
359                .into_iter()
360                .filter(|&(node_key, ref node)| {
361                    if let Some(ir) = compiled.remove(node_key) {
362                        node.instantiate(
363                            env,
364                            &mut meta,
365                            ir,
366                            extra_stmts.remove(node_key).as_deref().unwrap_or_default(),
367                            sidecars.remove(node_key).as_deref().unwrap_or_default(),
368                        );
369                        true
370                    } else {
371                        false
372                    }
373                })
374                .collect::<SparseSecondaryMap<_, _>>(),
375            self.clusters
376                .into_iter()
377                .filter(|&(cluster_key, ref cluster)| {
378                    if let Some(ir) = compiled.remove(cluster_key) {
379                        cluster.instantiate(
380                            env,
381                            &mut meta,
382                            ir,
383                            extra_stmts
384                                .remove(cluster_key)
385                                .as_deref()
386                                .unwrap_or_default(),
387                            sidecars.remove(cluster_key).as_deref().unwrap_or_default(),
388                        );
389                        true
390                    } else {
391                        false
392                    }
393                })
394                .collect::<SparseSecondaryMap<_, _>>(),
395            self.externals
396                .into_iter()
397                .inspect(|&(external_key, ref external)| {
398                    assert!(!extra_stmts.contains_key(external_key));
399                    assert!(!sidecars.contains_key(external_key));
400                    external.instantiate(env, &mut meta, Default::default(), &[], &[]);
401                })
402                .collect::<SparseSecondaryMap<_, _>>(),
403        );
404
405        for location_key in self.locations.keys() {
406            if let Some(node) = processes.get(location_key) {
407                node.update_meta(&meta);
408            } else if let Some(cluster) = clusters.get(location_key) {
409                cluster.update_meta(&meta);
410            } else if let Some(external) = externals.get(location_key) {
411                external.update_meta(&meta);
412            }
413        }
414
415        let mut seen_tees_connect = HashMap::new();
416        for leaf in self.ir.iter_mut() {
417            leaf.connect_network(&mut seen_tees_connect);
418        }
419
420        DeployResult {
421            location_names: self.location_names,
422            processes,
423            clusters,
424            externals,
425        }
426    }
427}
428
429pub struct DeployResult<'a, D: Deploy<'a>> {
430    location_names: SecondaryMap<LocationKey, String>,
431    processes: SparseSecondaryMap<LocationKey, D::Process>,
432    clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
433    externals: SparseSecondaryMap<LocationKey, D::External>,
434}
435
436impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
437    pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
438        let LocationId::Process(location_key) = p.id() else {
439            panic!("Process ID expected")
440        };
441        self.processes.get(location_key).unwrap()
442    }
443
444    pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
445        let LocationId::Cluster(location_key) = c.id() else {
446            panic!("Cluster ID expected")
447        };
448        self.clusters.get(location_key).unwrap()
449    }
450
451    pub fn get_external<P>(&self, e: &External<P>) -> &D::External {
452        self.externals.get(e.key).unwrap()
453    }
454
455    pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, &str, &D::Process)> {
456        self.location_names
457            .iter()
458            .filter_map(|(location_key, location_name)| {
459                self.processes
460                    .get(location_key)
461                    .map(|process| (LocationId::Process(location_key), &**location_name, process))
462            })
463    }
464
465    pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, &str, &D::Cluster)> {
466        self.location_names
467            .iter()
468            .filter_map(|(location_key, location_name)| {
469                self.clusters
470                    .get(location_key)
471                    .map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
472            })
473    }
474
475    #[deprecated(note = "use `connect` instead")]
476    pub async fn connect_bytes<M>(
477        &self,
478        port: ExternalBytesPort<M>,
479    ) -> (
480        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
481        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
482    ) {
483        self.connect(port).await
484    }
485
486    #[deprecated(note = "use `connect` instead")]
487    pub async fn connect_sink_bytes<M>(
488        &self,
489        port: ExternalBytesPort<M>,
490    ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
491        self.connect(port).await.1
492    }
493
494    pub async fn connect_bincode<
495        InT: Serialize + 'static,
496        OutT: DeserializeOwned + 'static,
497        Many,
498    >(
499        &self,
500        port: ExternalBincodeBidi<InT, OutT, Many>,
501    ) -> (
502        Pin<Box<dyn Stream<Item = OutT>>>,
503        Pin<Box<dyn Sink<InT, Error = Error>>>,
504    ) {
505        self.externals
506            .get(port.process_key)
507            .unwrap()
508            .as_bincode_bidi(port.port_id)
509            .await
510    }
511
512    #[deprecated(note = "use `connect` instead")]
513    pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
514        &self,
515        port: ExternalBincodeSink<T, Many>,
516    ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
517        self.connect(port).await
518    }
519
520    #[deprecated(note = "use `connect` instead")]
521    pub async fn connect_source_bytes(
522        &self,
523        port: ExternalBytesPort,
524    ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
525        self.connect(port).await.0
526    }
527
528    #[deprecated(note = "use `connect` instead")]
529    pub async fn connect_source_bincode<
530        T: Serialize + DeserializeOwned + 'static,
531        O: Ordering,
532        R: Retries,
533    >(
534        &self,
535        port: ExternalBincodeStream<T, O, R>,
536    ) -> Pin<Box<dyn Stream<Item = T>>> {
537        self.connect(port).await
538    }
539
540    pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
541        &'b self,
542        port: P,
543    ) -> <P as ConnectableAsync<&'b Self>>::Output {
544        port.connect(self).await
545    }
546}
547
548#[cfg(stageleft_runtime)]
549#[cfg(feature = "deploy")]
550#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
551impl DeployResult<'_, crate::deploy::HydroDeploy> {
552    /// Get the raw port handle.
553    pub fn raw_port<M>(
554        &self,
555        port: ExternalBytesPort<M>,
556    ) -> hydro_deploy::custom_service::CustomClientPort {
557        self.externals
558            .get(port.process_key)
559            .unwrap()
560            .raw_port(port.port_id)
561    }
562}
563
564pub trait ConnectableAsync<Ctx> {
565    type Output;
566
567    fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
568}
569
570impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
571    type Output = (
572        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
573        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
574    );
575
576    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
577        ctx.externals
578            .get(self.process_key)
579            .unwrap()
580            .as_bytes_bidi(self.port_id)
581            .await
582    }
583}
584
585impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
586    ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
587{
588    type Output = Pin<Box<dyn Stream<Item = T>>>;
589
590    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
591        ctx.externals
592            .get(self.process_key)
593            .unwrap()
594            .as_bincode_source(self.port_id)
595            .await
596    }
597}
598
599impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
600    for ExternalBincodeSink<T, Many>
601{
602    type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
603
604    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
605        ctx.externals
606            .get(self.process_key)
607            .unwrap()
608            .as_bincode_sink(self.port_id)
609            .await
610    }
611}