Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26use crate::compile::builder::{CycleId, ExternalPortId};
27#[cfg(feature = "build")]
28use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
29use crate::location::dynamic::LocationId;
30use crate::location::{LocationKey, NetworkHint};
31
32pub mod backtrace;
33use backtrace::Backtrace;
34
35/// Wrapper that displays only the tokens of a parsed expr.
36///
37/// Boxes `syn::Type` which is ~240 bytes.
38#[derive(Clone, Hash)]
39pub struct DebugExpr(pub Box<syn::Expr>);
40
41impl From<syn::Expr> for DebugExpr {
42    fn from(expr: syn::Expr) -> Self {
43        Self(Box::new(expr))
44    }
45}
46
47impl Deref for DebugExpr {
48    type Target = syn::Expr;
49
50    fn deref(&self) -> &Self::Target {
51        &self.0
52    }
53}
54
55impl ToTokens for DebugExpr {
56    fn to_tokens(&self, tokens: &mut TokenStream) {
57        self.0.to_tokens(tokens);
58    }
59}
60
61impl Debug for DebugExpr {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        write!(f, "{}", self.0.to_token_stream())
64    }
65}
66
67impl Display for DebugExpr {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        let original = self.0.as_ref().clone();
70        let simplified = simplify_q_macro(original);
71
72        // For now, just use quote formatting without trying to parse as a statement
73        // This avoids the syn::parse_quote! issues entirely
74        write!(f, "q!({})", quote::quote!(#simplified))
75    }
76}
77
78/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
79fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
80    // Try to parse the token string as a syn::Expr
81    // Use a visitor to simplify q! macro expansions
82    let mut simplifier = QMacroSimplifier::new();
83    simplifier.visit_expr_mut(&mut expr);
84
85    // If we found and simplified a q! macro, return the simplified version
86    if let Some(simplified) = simplifier.simplified_result {
87        simplified
88    } else {
89        expr
90    }
91}
92
93/// AST visitor that simplifies q! macro expansions
94#[derive(Default)]
95pub struct QMacroSimplifier {
96    pub simplified_result: Option<syn::Expr>,
97}
98
99impl QMacroSimplifier {
100    pub fn new() -> Self {
101        Self::default()
102    }
103}
104
105impl VisitMut for QMacroSimplifier {
106    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
107        // Check if we already found a result to avoid further processing
108        if self.simplified_result.is_some() {
109            return;
110        }
111
112        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
113            // Look for calls to stageleft::runtime_support::fn*
114            && self.is_stageleft_runtime_support_call(&path_expr.path)
115            // Try to extract the closure from the arguments
116            && let Some(closure) = self.extract_closure_from_args(&call.args)
117        {
118            self.simplified_result = Some(closure);
119            return;
120        }
121
122        // Continue visiting child expressions using the default implementation
123        // Use the default visitor to avoid infinite recursion
124        syn::visit_mut::visit_expr_mut(self, expr);
125    }
126}
127
128impl QMacroSimplifier {
129    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
130        // Check if this is a call to stageleft::runtime_support::fn*
131        if let Some(last_segment) = path.segments.last() {
132            let fn_name = last_segment.ident.to_string();
133            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
134            fn_name.contains("_type_hint")
135                && path.segments.len() > 2
136                && path.segments[0].ident == "stageleft"
137                && path.segments[1].ident == "runtime_support"
138        } else {
139            false
140        }
141    }
142
143    fn extract_closure_from_args(
144        &self,
145        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
146    ) -> Option<syn::Expr> {
147        // Look through the arguments for a closure expression
148        for arg in args {
149            if let syn::Expr::Closure(_) = arg {
150                return Some(arg.clone());
151            }
152            // Also check for closures nested in other expressions (like blocks)
153            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
154                return Some(closure_expr);
155            }
156        }
157        None
158    }
159
160    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
161        let mut visitor = ClosureFinder {
162            found_closure: None,
163            prefer_inner_blocks: true,
164        };
165        visitor.visit_expr(expr);
166        visitor.found_closure
167    }
168}
169
170/// Visitor that finds closures in expressions with special block handling
171struct ClosureFinder {
172    found_closure: Option<syn::Expr>,
173    prefer_inner_blocks: bool,
174}
175
176impl<'ast> Visit<'ast> for ClosureFinder {
177    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
178        // If we already found a closure, don't continue searching
179        if self.found_closure.is_some() {
180            return;
181        }
182
183        match expr {
184            syn::Expr::Closure(_) => {
185                self.found_closure = Some(expr.clone());
186            }
187            syn::Expr::Block(block) if self.prefer_inner_blocks => {
188                // Special handling for blocks - look for inner blocks that contain closures
189                for stmt in &block.block.stmts {
190                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
191                        && let syn::Expr::Block(_) = stmt_expr
192                    {
193                        // Check if this nested block contains a closure
194                        let mut inner_visitor = ClosureFinder {
195                            found_closure: None,
196                            prefer_inner_blocks: false, // Avoid infinite recursion
197                        };
198                        inner_visitor.visit_expr(stmt_expr);
199                        if inner_visitor.found_closure.is_some() {
200                            // Found a closure in an inner block, return that block
201                            self.found_closure = Some(stmt_expr.clone());
202                            return;
203                        }
204                    }
205                }
206
207                // If no inner block with closure found, continue with normal visitation
208                visit::visit_expr(self, expr);
209
210                // If we found a closure, just return the closure itself, not the whole block
211                // unless we're in the special case where we want the containing block
212                if self.found_closure.is_some() {
213                    // The closure was found during visitation, no need to wrap in block
214                }
215            }
216            _ => {
217                // Use default visitor behavior for all other expressions
218                visit::visit_expr(self, expr);
219            }
220        }
221    }
222}
223
224/// Debug displays the type's tokens.
225///
226/// Boxes `syn::Type` which is ~320 bytes.
227#[derive(Clone, PartialEq, Eq, Hash)]
228pub struct DebugType(pub Box<syn::Type>);
229
230impl From<syn::Type> for DebugType {
231    fn from(t: syn::Type) -> Self {
232        Self(Box::new(t))
233    }
234}
235
236impl Deref for DebugType {
237    type Target = syn::Type;
238
239    fn deref(&self) -> &Self::Target {
240        &self.0
241    }
242}
243
244impl ToTokens for DebugType {
245    fn to_tokens(&self, tokens: &mut TokenStream) {
246        self.0.to_tokens(tokens);
247    }
248}
249
250impl Debug for DebugType {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        write!(f, "{}", self.0.to_token_stream())
253    }
254}
255
256pub enum DebugInstantiate {
257    Building,
258    Finalized(Box<DebugInstantiateFinalized>),
259}
260
261#[cfg_attr(
262    not(feature = "build"),
263    expect(
264        dead_code,
265        reason = "sink, source unused without `feature = \"build\"`."
266    )
267)]
268pub struct DebugInstantiateFinalized {
269    sink: syn::Expr,
270    source: syn::Expr,
271    connect_fn: Option<Box<dyn FnOnce()>>,
272}
273
274impl From<DebugInstantiateFinalized> for DebugInstantiate {
275    fn from(f: DebugInstantiateFinalized) -> Self {
276        Self::Finalized(Box::new(f))
277    }
278}
279
280impl Debug for DebugInstantiate {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        write!(f, "<network instantiate>")
283    }
284}
285
286impl Hash for DebugInstantiate {
287    fn hash<H: Hasher>(&self, _state: &mut H) {
288        // Do nothing
289    }
290}
291
292impl Clone for DebugInstantiate {
293    fn clone(&self) -> Self {
294        match self {
295            DebugInstantiate::Building => DebugInstantiate::Building,
296            DebugInstantiate::Finalized(_) => {
297                panic!("DebugInstantiate::Finalized should not be cloned")
298            }
299        }
300    }
301}
302
303/// Tracks the instantiation state of a `ClusterMembers` source.
304///
305/// During `compile_network`, the first `ClusterMembers` node for a given
306/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
307/// receives the expression returned by `Deploy::cluster_membership_stream`.
308/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
309/// during code-gen they simply reference the tee output of the first node
310/// instead of creating a redundant `source_stream`.
311#[derive(Debug, Hash, Clone)]
312pub enum ClusterMembersState {
313    /// Not yet instantiated.
314    Uninit,
315    /// The primary instance: holds the stream expression and will emit
316    /// `source_stream(expr) -> tee()` during code-gen.
317    Stream(DebugExpr),
318    /// A secondary instance that references the tee output of the primary.
319    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
320    /// can derive the deterministic tee ident without extra state.
321    Tee(LocationId, LocationId),
322}
323
324/// A source in a Hydro graph, where data enters the graph.
325#[derive(Debug, Hash, Clone)]
326pub enum HydroSource {
327    Stream(DebugExpr),
328    ExternalNetwork(),
329    Iter(DebugExpr),
330    Spin(),
331    ClusterMembers(LocationId, ClusterMembersState),
332    Embedded(syn::Ident),
333    EmbeddedSingleton(syn::Ident),
334}
335
336#[cfg(feature = "build")]
337/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
338/// and simulations.
339///
340/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
341/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
342pub trait DfirBuilder {
343    /// Whether the representation of singletons should include intermediate states.
344    fn singleton_intermediates(&self) -> bool;
345
346    /// Gets the DFIR builder for the given location, creating it if necessary.
347    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
348
349    fn batch(
350        &mut self,
351        in_ident: syn::Ident,
352        in_location: &LocationId,
353        in_kind: &CollectionKind,
354        out_ident: &syn::Ident,
355        out_location: &LocationId,
356        op_meta: &HydroIrOpMetadata,
357    );
358    fn yield_from_tick(
359        &mut self,
360        in_ident: syn::Ident,
361        in_location: &LocationId,
362        in_kind: &CollectionKind,
363        out_ident: &syn::Ident,
364        out_location: &LocationId,
365    );
366
367    fn begin_atomic(
368        &mut self,
369        in_ident: syn::Ident,
370        in_location: &LocationId,
371        in_kind: &CollectionKind,
372        out_ident: &syn::Ident,
373        out_location: &LocationId,
374        op_meta: &HydroIrOpMetadata,
375    );
376    fn end_atomic(
377        &mut self,
378        in_ident: syn::Ident,
379        in_location: &LocationId,
380        in_kind: &CollectionKind,
381        out_ident: &syn::Ident,
382    );
383
384    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
385    fn observe_nondet(
386        &mut self,
387        trusted: bool,
388        location: &LocationId,
389        in_ident: syn::Ident,
390        in_kind: &CollectionKind,
391        out_ident: &syn::Ident,
392        out_kind: &CollectionKind,
393        op_meta: &HydroIrOpMetadata,
394    );
395
396    #[expect(clippy::too_many_arguments, reason = "TODO")]
397    fn create_network(
398        &mut self,
399        from: &LocationId,
400        to: &LocationId,
401        input_ident: syn::Ident,
402        out_ident: &syn::Ident,
403        serialize: Option<&DebugExpr>,
404        sink: syn::Expr,
405        source: syn::Expr,
406        deserialize: Option<&DebugExpr>,
407        tag_id: usize,
408        networking_info: &crate::networking::NetworkingInfo,
409    );
410
411    fn create_external_source(
412        &mut self,
413        on: &LocationId,
414        source_expr: syn::Expr,
415        out_ident: &syn::Ident,
416        deserialize: Option<&DebugExpr>,
417        tag_id: usize,
418    );
419
420    fn create_external_output(
421        &mut self,
422        on: &LocationId,
423        sink_expr: syn::Expr,
424        input_ident: &syn::Ident,
425        serialize: Option<&DebugExpr>,
426        tag_id: usize,
427    );
428}
429
430#[cfg(feature = "build")]
431impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
432    fn singleton_intermediates(&self) -> bool {
433        false
434    }
435
436    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
437        self.entry(location.root().key())
438            .expect("location was removed")
439            .or_default()
440    }
441
442    fn batch(
443        &mut self,
444        in_ident: syn::Ident,
445        in_location: &LocationId,
446        in_kind: &CollectionKind,
447        out_ident: &syn::Ident,
448        _out_location: &LocationId,
449        _op_meta: &HydroIrOpMetadata,
450    ) {
451        let builder = self.get_dfir_mut(in_location.root());
452        if in_kind.is_bounded()
453            && matches!(
454                in_kind,
455                CollectionKind::Singleton { .. }
456                    | CollectionKind::Optional { .. }
457                    | CollectionKind::KeyedSingleton { .. }
458            )
459        {
460            assert!(in_location.is_top_level());
461            builder.add_dfir(
462                parse_quote! {
463                    #out_ident = #in_ident -> persist::<'static>();
464                },
465                None,
466                None,
467            );
468        } else {
469            builder.add_dfir(
470                parse_quote! {
471                    #out_ident = #in_ident;
472                },
473                None,
474                None,
475            );
476        }
477    }
478
479    fn yield_from_tick(
480        &mut self,
481        in_ident: syn::Ident,
482        in_location: &LocationId,
483        _in_kind: &CollectionKind,
484        out_ident: &syn::Ident,
485        _out_location: &LocationId,
486    ) {
487        let builder = self.get_dfir_mut(in_location.root());
488        builder.add_dfir(
489            parse_quote! {
490                #out_ident = #in_ident;
491            },
492            None,
493            None,
494        );
495    }
496
497    fn begin_atomic(
498        &mut self,
499        in_ident: syn::Ident,
500        in_location: &LocationId,
501        _in_kind: &CollectionKind,
502        out_ident: &syn::Ident,
503        _out_location: &LocationId,
504        _op_meta: &HydroIrOpMetadata,
505    ) {
506        let builder = self.get_dfir_mut(in_location.root());
507        builder.add_dfir(
508            parse_quote! {
509                #out_ident = #in_ident;
510            },
511            None,
512            None,
513        );
514    }
515
516    fn end_atomic(
517        &mut self,
518        in_ident: syn::Ident,
519        in_location: &LocationId,
520        _in_kind: &CollectionKind,
521        out_ident: &syn::Ident,
522    ) {
523        let builder = self.get_dfir_mut(in_location.root());
524        builder.add_dfir(
525            parse_quote! {
526                #out_ident = #in_ident;
527            },
528            None,
529            None,
530        );
531    }
532
533    fn observe_nondet(
534        &mut self,
535        _trusted: bool,
536        location: &LocationId,
537        in_ident: syn::Ident,
538        _in_kind: &CollectionKind,
539        out_ident: &syn::Ident,
540        _out_kind: &CollectionKind,
541        _op_meta: &HydroIrOpMetadata,
542    ) {
543        let builder = self.get_dfir_mut(location);
544        builder.add_dfir(
545            parse_quote! {
546                #out_ident = #in_ident;
547            },
548            None,
549            None,
550        );
551    }
552
553    fn create_network(
554        &mut self,
555        from: &LocationId,
556        to: &LocationId,
557        input_ident: syn::Ident,
558        out_ident: &syn::Ident,
559        serialize: Option<&DebugExpr>,
560        sink: syn::Expr,
561        source: syn::Expr,
562        deserialize: Option<&DebugExpr>,
563        tag_id: usize,
564        _networking_info: &crate::networking::NetworkingInfo,
565    ) {
566        let sender_builder = self.get_dfir_mut(from);
567        if let Some(serialize_pipeline) = serialize {
568            sender_builder.add_dfir(
569                parse_quote! {
570                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
571                },
572                None,
573                // operator tag separates send and receive, which otherwise have the same next_stmt_id
574                Some(&format!("send{}", tag_id)),
575            );
576        } else {
577            sender_builder.add_dfir(
578                parse_quote! {
579                    #input_ident -> dest_sink(#sink);
580                },
581                None,
582                Some(&format!("send{}", tag_id)),
583            );
584        }
585
586        let receiver_builder = self.get_dfir_mut(to);
587        if let Some(deserialize_pipeline) = deserialize {
588            receiver_builder.add_dfir(
589                parse_quote! {
590                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
591                },
592                None,
593                Some(&format!("recv{}", tag_id)),
594            );
595        } else {
596            receiver_builder.add_dfir(
597                parse_quote! {
598                    #out_ident = source_stream(#source);
599                },
600                None,
601                Some(&format!("recv{}", tag_id)),
602            );
603        }
604    }
605
606    fn create_external_source(
607        &mut self,
608        on: &LocationId,
609        source_expr: syn::Expr,
610        out_ident: &syn::Ident,
611        deserialize: Option<&DebugExpr>,
612        tag_id: usize,
613    ) {
614        let receiver_builder = self.get_dfir_mut(on);
615        if let Some(deserialize_pipeline) = deserialize {
616            receiver_builder.add_dfir(
617                parse_quote! {
618                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
619                },
620                None,
621                Some(&format!("recv{}", tag_id)),
622            );
623        } else {
624            receiver_builder.add_dfir(
625                parse_quote! {
626                    #out_ident = source_stream(#source_expr);
627                },
628                None,
629                Some(&format!("recv{}", tag_id)),
630            );
631        }
632    }
633
634    fn create_external_output(
635        &mut self,
636        on: &LocationId,
637        sink_expr: syn::Expr,
638        input_ident: &syn::Ident,
639        serialize: Option<&DebugExpr>,
640        tag_id: usize,
641    ) {
642        let sender_builder = self.get_dfir_mut(on);
643        if let Some(serialize_fn) = serialize {
644            sender_builder.add_dfir(
645                parse_quote! {
646                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
647                },
648                None,
649                // operator tag separates send and receive, which otherwise have the same next_stmt_id
650                Some(&format!("send{}", tag_id)),
651            );
652        } else {
653            sender_builder.add_dfir(
654                parse_quote! {
655                    #input_ident -> dest_sink(#sink_expr);
656                },
657                None,
658                Some(&format!("send{}", tag_id)),
659            );
660        }
661    }
662}
663
664#[cfg(feature = "build")]
665pub enum BuildersOrCallback<'a, L, N>
666where
667    L: FnMut(&mut HydroRoot, &mut usize),
668    N: FnMut(&mut HydroNode, &mut usize),
669{
670    Builders(&'a mut dyn DfirBuilder),
671    Callback(L, N),
672}
673
674/// An root in a Hydro graph, which is an pipeline that doesn't emit
675/// any downstream values. Traversals over the dataflow graph and
676/// generating DFIR IR start from roots.
677#[derive(Debug, Hash)]
678pub enum HydroRoot {
679    ForEach {
680        f: DebugExpr,
681        input: Box<HydroNode>,
682        op_metadata: HydroIrOpMetadata,
683    },
684    SendExternal {
685        to_external_key: LocationKey,
686        to_port_id: ExternalPortId,
687        to_many: bool,
688        unpaired: bool,
689        serialize_fn: Option<DebugExpr>,
690        instantiate_fn: DebugInstantiate,
691        input: Box<HydroNode>,
692        op_metadata: HydroIrOpMetadata,
693    },
694    DestSink {
695        sink: DebugExpr,
696        input: Box<HydroNode>,
697        op_metadata: HydroIrOpMetadata,
698    },
699    CycleSink {
700        cycle_id: CycleId,
701        input: Box<HydroNode>,
702        op_metadata: HydroIrOpMetadata,
703    },
704    EmbeddedOutput {
705        ident: syn::Ident,
706        input: Box<HydroNode>,
707        op_metadata: HydroIrOpMetadata,
708    },
709    Null {
710        input: Box<HydroNode>,
711        op_metadata: HydroIrOpMetadata,
712    },
713}
714
715impl HydroRoot {
716    #[cfg(feature = "build")]
717    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
718    pub fn compile_network<'a, D>(
719        &mut self,
720        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
721        seen_tees: &mut SeenSharedNodes,
722        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
723        processes: &SparseSecondaryMap<LocationKey, D::Process>,
724        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
725        externals: &SparseSecondaryMap<LocationKey, D::External>,
726        env: &mut D::InstantiateEnv,
727    ) where
728        D: Deploy<'a>,
729    {
730        let refcell_extra_stmts = RefCell::new(extra_stmts);
731        let refcell_env = RefCell::new(env);
732        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
733        self.transform_bottom_up(
734            &mut |l| {
735                if let HydroRoot::SendExternal {
736                    input,
737                    to_external_key,
738                    to_port_id,
739                    to_many,
740                    unpaired,
741                    instantiate_fn,
742                    ..
743                } = l
744                {
745                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
746                        DebugInstantiate::Building => {
747                            let to_node = externals
748                                .get(*to_external_key)
749                                .unwrap_or_else(|| {
750                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
751                                })
752                                .clone();
753
754                            match input.metadata().location_id.root() {
755                                &LocationId::Process(process_key) => {
756                                    if *to_many {
757                                        (
758                                            (
759                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
760                                                parse_quote!(DUMMY),
761                                            ),
762                                            Box::new(|| {}) as Box<dyn FnOnce()>,
763                                        )
764                                    } else {
765                                        let from_node = processes
766                                            .get(process_key)
767                                            .unwrap_or_else(|| {
768                                                panic!("A process used in the graph was not instantiated: {}", process_key)
769                                            })
770                                            .clone();
771
772                                        let sink_port = from_node.next_port();
773                                        let source_port = to_node.next_port();
774
775                                        if *unpaired {
776                                            use stageleft::quote_type;
777                                            use tokio_util::codec::LengthDelimitedCodec;
778
779                                            to_node.register(*to_port_id, source_port.clone());
780
781                                            let _ = D::e2o_source(
782                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
783                                                &to_node, &source_port,
784                                                &from_node, &sink_port,
785                                                &quote_type::<LengthDelimitedCodec>(),
786                                                format!("{}_{}", *to_external_key, *to_port_id)
787                                            );
788                                        }
789
790                                        (
791                                            (
792                                                D::o2e_sink(
793                                                    &from_node,
794                                                    &sink_port,
795                                                    &to_node,
796                                                    &source_port,
797                                                    format!("{}_{}", *to_external_key, *to_port_id)
798                                                ),
799                                                parse_quote!(DUMMY),
800                                            ),
801                                            if *unpaired {
802                                                D::e2o_connect(
803                                                    &to_node,
804                                                    &source_port,
805                                                    &from_node,
806                                                    &sink_port,
807                                                    *to_many,
808                                                    NetworkHint::Auto,
809                                                )
810                                            } else {
811                                                Box::new(|| {}) as Box<dyn FnOnce()>
812                                            },
813                                        )
814                                    }
815                                }
816                                LocationId::Cluster(_) => todo!("SendExternal from a cluster location is not yet supported"),
817                                _ => panic!()
818                            }
819                        },
820
821                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
822                    };
823
824                    *instantiate_fn = DebugInstantiateFinalized {
825                        sink: sink_expr,
826                        source: source_expr,
827                        connect_fn: Some(connect_fn),
828                    }
829                    .into();
830                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
831                    let element_type = match &input.metadata().collection_kind {
832                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
833                        _ => panic!("Embedded output must have Stream collection kind"),
834                    };
835                    let location_key = match input.metadata().location_id.root() {
836                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
837                        _ => panic!("Embedded output must be on a process or cluster"),
838                    };
839                    D::register_embedded_output(
840                        &mut refcell_env.borrow_mut(),
841                        location_key,
842                        ident,
843                        &element_type,
844                    );
845                }
846            },
847            &mut |n| {
848                if let HydroNode::Network {
849                    name,
850                    networking_info,
851                    input,
852                    instantiate_fn,
853                    metadata,
854                    ..
855                } = n
856                {
857                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
858                        DebugInstantiate::Building => instantiate_network::<D>(
859                            &mut refcell_env.borrow_mut(),
860                            input.metadata().location_id.root(),
861                            metadata.location_id.root(),
862                            processes,
863                            clusters,
864                            name.as_deref(),
865                            networking_info,
866                        ),
867
868                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
869                    };
870
871                    *instantiate_fn = DebugInstantiateFinalized {
872                        sink: sink_expr,
873                        source: source_expr,
874                        connect_fn: Some(connect_fn),
875                    }
876                    .into();
877                } else if let HydroNode::ExternalInput {
878                    from_external_key,
879                    from_port_id,
880                    from_many,
881                    codec_type,
882                    port_hint,
883                    instantiate_fn,
884                    metadata,
885                    ..
886                } = n
887                {
888                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
889                        DebugInstantiate::Building => {
890                            let from_node = externals
891                                .get(*from_external_key)
892                                .unwrap_or_else(|| {
893                                    panic!(
894                                        "A external used in the graph was not instantiated: {}",
895                                        from_external_key,
896                                    )
897                                })
898                                .clone();
899
900                            match metadata.location_id.root() {
901                                &LocationId::Process(process_key) => {
902                                    let to_node = processes
903                                        .get(process_key)
904                                        .unwrap_or_else(|| {
905                                            panic!("A process used in the graph was not instantiated: {}", process_key)
906                                        })
907                                        .clone();
908
909                                    let sink_port = from_node.next_port();
910                                    let source_port = to_node.next_port();
911
912                                    from_node.register(*from_port_id, sink_port.clone());
913
914                                    (
915                                        (
916                                            parse_quote!(DUMMY),
917                                            if *from_many {
918                                                D::e2o_many_source(
919                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
920                                                    &to_node, &source_port,
921                                                    codec_type.0.as_ref(),
922                                                    format!("{}_{}", *from_external_key, *from_port_id)
923                                                )
924                                            } else {
925                                                D::e2o_source(
926                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
927                                                    &from_node, &sink_port,
928                                                    &to_node, &source_port,
929                                                    codec_type.0.as_ref(),
930                                                    format!("{}_{}", *from_external_key, *from_port_id)
931                                                )
932                                            },
933                                        ),
934                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
935                                    )
936                                }
937                                LocationId::Cluster(_) => todo!("ExternalInput to a cluster location is not yet supported"),
938                                _ => panic!()
939                            }
940                        },
941
942                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
943                    };
944
945                    *instantiate_fn = DebugInstantiateFinalized {
946                        sink: sink_expr,
947                        source: source_expr,
948                        connect_fn: Some(connect_fn),
949                    }
950                    .into();
951                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
952                    let element_type = match &metadata.collection_kind {
953                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
954                        _ => panic!("Embedded source must have Stream collection kind"),
955                    };
956                    let location_key = match metadata.location_id.root() {
957                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
958                        _ => panic!("Embedded source must be on a process or cluster"),
959                    };
960                    D::register_embedded_stream_input(
961                        &mut refcell_env.borrow_mut(),
962                        location_key,
963                        ident,
964                        &element_type,
965                    );
966                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
967                    let element_type = match &metadata.collection_kind {
968                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
969                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
970                    };
971                    let location_key = match metadata.location_id.root() {
972                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
973                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
974                    };
975                    D::register_embedded_singleton_input(
976                        &mut refcell_env.borrow_mut(),
977                        location_key,
978                        ident,
979                        &element_type,
980                    );
981                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
982                    match state {
983                        ClusterMembersState::Uninit => {
984                            let at_location = metadata.location_id.root().clone();
985                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
986                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
987                                // First occurrence: call cluster_membership_stream and mark as Stream.
988                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
989                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
990                                    &(),
991                                );
992                                *state = ClusterMembersState::Stream(expr.into());
993                            } else {
994                                // Already instantiated for this (at, target) pair: just tee.
995                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
996                            }
997                        }
998                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
999                            panic!("cluster members already finalized");
1000                        }
1001                    }
1002                }
1003            },
1004            seen_tees,
1005            false,
1006        );
1007    }
1008
1009    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1010        self.transform_bottom_up(
1011            &mut |l| {
1012                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1013                    match instantiate_fn {
1014                        DebugInstantiate::Building => panic!("network not built"),
1015
1016                        DebugInstantiate::Finalized(finalized) => {
1017                            (finalized.connect_fn.take().unwrap())();
1018                        }
1019                    }
1020                }
1021            },
1022            &mut |n| {
1023                if let HydroNode::Network { instantiate_fn, .. }
1024                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1025                {
1026                    match instantiate_fn {
1027                        DebugInstantiate::Building => panic!("network not built"),
1028
1029                        DebugInstantiate::Finalized(finalized) => {
1030                            (finalized.connect_fn.take().unwrap())();
1031                        }
1032                    }
1033                }
1034            },
1035            seen_tees,
1036            false,
1037        );
1038    }
1039
1040    pub fn transform_bottom_up(
1041        &mut self,
1042        transform_root: &mut impl FnMut(&mut HydroRoot),
1043        transform_node: &mut impl FnMut(&mut HydroNode),
1044        seen_tees: &mut SeenSharedNodes,
1045        check_well_formed: bool,
1046    ) {
1047        self.transform_children(
1048            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1049            seen_tees,
1050        );
1051
1052        transform_root(self);
1053    }
1054
1055    pub fn transform_children(
1056        &mut self,
1057        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1058        seen_tees: &mut SeenSharedNodes,
1059    ) {
1060        match self {
1061            HydroRoot::ForEach { input, .. }
1062            | HydroRoot::SendExternal { input, .. }
1063            | HydroRoot::DestSink { input, .. }
1064            | HydroRoot::CycleSink { input, .. }
1065            | HydroRoot::EmbeddedOutput { input, .. }
1066            | HydroRoot::Null { input, .. } => {
1067                transform(input, seen_tees);
1068            }
1069        }
1070    }
1071
1072    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1073        match self {
1074            HydroRoot::ForEach {
1075                f,
1076                input,
1077                op_metadata,
1078            } => HydroRoot::ForEach {
1079                f: f.clone(),
1080                input: Box::new(input.deep_clone(seen_tees)),
1081                op_metadata: op_metadata.clone(),
1082            },
1083            HydroRoot::SendExternal {
1084                to_external_key,
1085                to_port_id,
1086                to_many,
1087                unpaired,
1088                serialize_fn,
1089                instantiate_fn,
1090                input,
1091                op_metadata,
1092            } => HydroRoot::SendExternal {
1093                to_external_key: *to_external_key,
1094                to_port_id: *to_port_id,
1095                to_many: *to_many,
1096                unpaired: *unpaired,
1097                serialize_fn: serialize_fn.clone(),
1098                instantiate_fn: instantiate_fn.clone(),
1099                input: Box::new(input.deep_clone(seen_tees)),
1100                op_metadata: op_metadata.clone(),
1101            },
1102            HydroRoot::DestSink {
1103                sink,
1104                input,
1105                op_metadata,
1106            } => HydroRoot::DestSink {
1107                sink: sink.clone(),
1108                input: Box::new(input.deep_clone(seen_tees)),
1109                op_metadata: op_metadata.clone(),
1110            },
1111            HydroRoot::CycleSink {
1112                cycle_id,
1113                input,
1114                op_metadata,
1115            } => HydroRoot::CycleSink {
1116                cycle_id: *cycle_id,
1117                input: Box::new(input.deep_clone(seen_tees)),
1118                op_metadata: op_metadata.clone(),
1119            },
1120            HydroRoot::EmbeddedOutput {
1121                ident,
1122                input,
1123                op_metadata,
1124            } => HydroRoot::EmbeddedOutput {
1125                ident: ident.clone(),
1126                input: Box::new(input.deep_clone(seen_tees)),
1127                op_metadata: op_metadata.clone(),
1128            },
1129            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1130                input: Box::new(input.deep_clone(seen_tees)),
1131                op_metadata: op_metadata.clone(),
1132            },
1133        }
1134    }
1135
1136    #[cfg(feature = "build")]
1137    pub fn emit(
1138        &mut self,
1139        graph_builders: &mut dyn DfirBuilder,
1140        seen_tees: &mut SeenSharedNodes,
1141        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1142        next_stmt_id: &mut usize,
1143    ) {
1144        self.emit_core(
1145            &mut BuildersOrCallback::<
1146                fn(&mut HydroRoot, &mut usize),
1147                fn(&mut HydroNode, &mut usize),
1148            >::Builders(graph_builders),
1149            seen_tees,
1150            built_tees,
1151            next_stmt_id,
1152        );
1153    }
1154
1155    #[cfg(feature = "build")]
1156    pub fn emit_core(
1157        &mut self,
1158        builders_or_callback: &mut BuildersOrCallback<
1159            impl FnMut(&mut HydroRoot, &mut usize),
1160            impl FnMut(&mut HydroNode, &mut usize),
1161        >,
1162        seen_tees: &mut SeenSharedNodes,
1163        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1164        next_stmt_id: &mut usize,
1165    ) {
1166        match self {
1167            HydroRoot::ForEach { f, input, .. } => {
1168                let input_ident =
1169                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1170
1171                match builders_or_callback {
1172                    BuildersOrCallback::Builders(graph_builders) => {
1173                        graph_builders
1174                            .get_dfir_mut(&input.metadata().location_id)
1175                            .add_dfir(
1176                                parse_quote! {
1177                                    #input_ident -> for_each(#f);
1178                                },
1179                                None,
1180                                Some(&next_stmt_id.to_string()),
1181                            );
1182                    }
1183                    BuildersOrCallback::Callback(leaf_callback, _) => {
1184                        leaf_callback(self, next_stmt_id);
1185                    }
1186                }
1187
1188                *next_stmt_id += 1;
1189            }
1190
1191            HydroRoot::SendExternal {
1192                serialize_fn,
1193                instantiate_fn,
1194                input,
1195                ..
1196            } => {
1197                let input_ident =
1198                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1199
1200                match builders_or_callback {
1201                    BuildersOrCallback::Builders(graph_builders) => {
1202                        let (sink_expr, _) = match instantiate_fn {
1203                            DebugInstantiate::Building => (
1204                                syn::parse_quote!(DUMMY_SINK),
1205                                syn::parse_quote!(DUMMY_SOURCE),
1206                            ),
1207
1208                            DebugInstantiate::Finalized(finalized) => {
1209                                (finalized.sink.clone(), finalized.source.clone())
1210                            }
1211                        };
1212
1213                        graph_builders.create_external_output(
1214                            &input.metadata().location_id,
1215                            sink_expr,
1216                            &input_ident,
1217                            serialize_fn.as_ref(),
1218                            *next_stmt_id,
1219                        );
1220                    }
1221                    BuildersOrCallback::Callback(leaf_callback, _) => {
1222                        leaf_callback(self, next_stmt_id);
1223                    }
1224                }
1225
1226                *next_stmt_id += 1;
1227            }
1228
1229            HydroRoot::DestSink { sink, input, .. } => {
1230                let input_ident =
1231                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1232
1233                match builders_or_callback {
1234                    BuildersOrCallback::Builders(graph_builders) => {
1235                        graph_builders
1236                            .get_dfir_mut(&input.metadata().location_id)
1237                            .add_dfir(
1238                                parse_quote! {
1239                                    #input_ident -> dest_sink(#sink);
1240                                },
1241                                None,
1242                                Some(&next_stmt_id.to_string()),
1243                            );
1244                    }
1245                    BuildersOrCallback::Callback(leaf_callback, _) => {
1246                        leaf_callback(self, next_stmt_id);
1247                    }
1248                }
1249
1250                *next_stmt_id += 1;
1251            }
1252
1253            HydroRoot::CycleSink {
1254                cycle_id, input, ..
1255            } => {
1256                let input_ident =
1257                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1258
1259                match builders_or_callback {
1260                    BuildersOrCallback::Builders(graph_builders) => {
1261                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1262                            CollectionKind::KeyedSingleton {
1263                                key_type,
1264                                value_type,
1265                                ..
1266                            }
1267                            | CollectionKind::KeyedStream {
1268                                key_type,
1269                                value_type,
1270                                ..
1271                            } => {
1272                                parse_quote!((#key_type, #value_type))
1273                            }
1274                            CollectionKind::Stream { element_type, .. }
1275                            | CollectionKind::Singleton { element_type, .. }
1276                            | CollectionKind::Optional { element_type, .. } => {
1277                                parse_quote!(#element_type)
1278                            }
1279                        };
1280
1281                        let cycle_id_ident = cycle_id.as_ident();
1282                        graph_builders
1283                            .get_dfir_mut(&input.metadata().location_id)
1284                            .add_dfir(
1285                                parse_quote! {
1286                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1287                                },
1288                                None,
1289                                None,
1290                            );
1291                    }
1292                    // No ID, no callback
1293                    BuildersOrCallback::Callback(_, _) => {}
1294                }
1295            }
1296
1297            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1298                let input_ident =
1299                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1300
1301                match builders_or_callback {
1302                    BuildersOrCallback::Builders(graph_builders) => {
1303                        graph_builders
1304                            .get_dfir_mut(&input.metadata().location_id)
1305                            .add_dfir(
1306                                parse_quote! {
1307                                    #input_ident -> for_each(&mut #ident);
1308                                },
1309                                None,
1310                                Some(&next_stmt_id.to_string()),
1311                            );
1312                    }
1313                    BuildersOrCallback::Callback(leaf_callback, _) => {
1314                        leaf_callback(self, next_stmt_id);
1315                    }
1316                }
1317
1318                *next_stmt_id += 1;
1319            }
1320
1321            HydroRoot::Null { input, .. } => {
1322                let input_ident =
1323                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1324
1325                match builders_or_callback {
1326                    BuildersOrCallback::Builders(graph_builders) => {
1327                        graph_builders
1328                            .get_dfir_mut(&input.metadata().location_id)
1329                            .add_dfir(
1330                                parse_quote! {
1331                                    #input_ident -> for_each(|_| {});
1332                                },
1333                                None,
1334                                Some(&next_stmt_id.to_string()),
1335                            );
1336                    }
1337                    BuildersOrCallback::Callback(leaf_callback, _) => {
1338                        leaf_callback(self, next_stmt_id);
1339                    }
1340                }
1341
1342                *next_stmt_id += 1;
1343            }
1344        }
1345    }
1346
1347    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1348        match self {
1349            HydroRoot::ForEach { op_metadata, .. }
1350            | HydroRoot::SendExternal { op_metadata, .. }
1351            | HydroRoot::DestSink { op_metadata, .. }
1352            | HydroRoot::CycleSink { op_metadata, .. }
1353            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1354            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1355        }
1356    }
1357
1358    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1359        match self {
1360            HydroRoot::ForEach { op_metadata, .. }
1361            | HydroRoot::SendExternal { op_metadata, .. }
1362            | HydroRoot::DestSink { op_metadata, .. }
1363            | HydroRoot::CycleSink { op_metadata, .. }
1364            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1365            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1366        }
1367    }
1368
1369    pub fn input(&self) -> &HydroNode {
1370        match self {
1371            HydroRoot::ForEach { input, .. }
1372            | HydroRoot::SendExternal { input, .. }
1373            | HydroRoot::DestSink { input, .. }
1374            | HydroRoot::CycleSink { input, .. }
1375            | HydroRoot::EmbeddedOutput { input, .. }
1376            | HydroRoot::Null { input, .. } => input,
1377        }
1378    }
1379
1380    pub fn input_metadata(&self) -> &HydroIrMetadata {
1381        self.input().metadata()
1382    }
1383
1384    pub fn print_root(&self) -> String {
1385        match self {
1386            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1387            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1388            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1389            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1390            HydroRoot::EmbeddedOutput { ident, .. } => {
1391                format!("EmbeddedOutput({})", ident)
1392            }
1393            HydroRoot::Null { .. } => "Null".to_owned(),
1394        }
1395    }
1396
1397    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1398        match self {
1399            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1400                transform(f);
1401            }
1402            HydroRoot::SendExternal { .. }
1403            | HydroRoot::CycleSink { .. }
1404            | HydroRoot::EmbeddedOutput { .. }
1405            | HydroRoot::Null { .. } => {}
1406        }
1407    }
1408}
1409
1410#[cfg(feature = "build")]
1411pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1412    let mut builders = SecondaryMap::new();
1413    let mut seen_tees = HashMap::new();
1414    let mut built_tees = HashMap::new();
1415    let mut next_stmt_id = 0;
1416    for leaf in ir {
1417        leaf.emit(
1418            &mut builders,
1419            &mut seen_tees,
1420            &mut built_tees,
1421            &mut next_stmt_id,
1422        );
1423    }
1424    builders
1425}
1426
1427#[cfg(feature = "build")]
1428pub fn traverse_dfir(
1429    ir: &mut [HydroRoot],
1430    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1431    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1432) {
1433    let mut seen_tees = HashMap::new();
1434    let mut built_tees = HashMap::new();
1435    let mut next_stmt_id = 0;
1436    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1437    ir.iter_mut().for_each(|leaf| {
1438        leaf.emit_core(
1439            &mut callback,
1440            &mut seen_tees,
1441            &mut built_tees,
1442            &mut next_stmt_id,
1443        );
1444    });
1445}
1446
1447pub fn transform_bottom_up(
1448    ir: &mut [HydroRoot],
1449    transform_root: &mut impl FnMut(&mut HydroRoot),
1450    transform_node: &mut impl FnMut(&mut HydroNode),
1451    check_well_formed: bool,
1452) {
1453    let mut seen_tees = HashMap::new();
1454    ir.iter_mut().for_each(|leaf| {
1455        leaf.transform_bottom_up(
1456            transform_root,
1457            transform_node,
1458            &mut seen_tees,
1459            check_well_formed,
1460        );
1461    });
1462}
1463
1464pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1465    let mut seen_tees = HashMap::new();
1466    ir.iter()
1467        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1468        .collect()
1469}
1470
1471type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1472thread_local! {
1473    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1474}
1475
1476pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1477    PRINTED_TEES.with(|printed_tees| {
1478        let mut printed_tees_mut = printed_tees.borrow_mut();
1479        *printed_tees_mut = Some((0, HashMap::new()));
1480        drop(printed_tees_mut);
1481
1482        let ret = f();
1483
1484        let mut printed_tees_mut = printed_tees.borrow_mut();
1485        *printed_tees_mut = None;
1486
1487        ret
1488    })
1489}
1490
1491pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1492
1493impl SharedNode {
1494    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1495        Rc::as_ptr(&self.0)
1496    }
1497}
1498
1499impl Debug for SharedNode {
1500    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1501        PRINTED_TEES.with(|printed_tees| {
1502            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1503            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1504
1505            if let Some(printed_tees_mut) = printed_tees_mut {
1506                if let Some(existing) = printed_tees_mut
1507                    .1
1508                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1509                {
1510                    write!(f, "<shared {}>", existing)
1511                } else {
1512                    let next_id = printed_tees_mut.0;
1513                    printed_tees_mut.0 += 1;
1514                    printed_tees_mut
1515                        .1
1516                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1517                    drop(printed_tees_mut_borrow);
1518                    write!(f, "<shared {}>: ", next_id)?;
1519                    Debug::fmt(&self.0.borrow(), f)
1520                }
1521            } else {
1522                drop(printed_tees_mut_borrow);
1523                write!(f, "<shared>: ")?;
1524                Debug::fmt(&self.0.borrow(), f)
1525            }
1526        })
1527    }
1528}
1529
1530impl Hash for SharedNode {
1531    fn hash<H: Hasher>(&self, state: &mut H) {
1532        self.0.borrow_mut().hash(state);
1533    }
1534}
1535
1536#[derive(Clone, PartialEq, Eq, Debug)]
1537pub enum BoundKind {
1538    Unbounded,
1539    Bounded,
1540}
1541
1542#[derive(Clone, PartialEq, Eq, Debug)]
1543pub enum StreamOrder {
1544    NoOrder,
1545    TotalOrder,
1546}
1547
1548#[derive(Clone, PartialEq, Eq, Debug)]
1549pub enum StreamRetry {
1550    AtLeastOnce,
1551    ExactlyOnce,
1552}
1553
1554#[derive(Clone, PartialEq, Eq, Debug)]
1555pub enum KeyedSingletonBoundKind {
1556    Unbounded,
1557    BoundedValue,
1558    Bounded,
1559}
1560
1561#[derive(Clone, PartialEq, Eq, Debug)]
1562pub enum CollectionKind {
1563    Stream {
1564        bound: BoundKind,
1565        order: StreamOrder,
1566        retry: StreamRetry,
1567        element_type: DebugType,
1568    },
1569    Singleton {
1570        bound: BoundKind,
1571        element_type: DebugType,
1572    },
1573    Optional {
1574        bound: BoundKind,
1575        element_type: DebugType,
1576    },
1577    KeyedStream {
1578        bound: BoundKind,
1579        value_order: StreamOrder,
1580        value_retry: StreamRetry,
1581        key_type: DebugType,
1582        value_type: DebugType,
1583    },
1584    KeyedSingleton {
1585        bound: KeyedSingletonBoundKind,
1586        key_type: DebugType,
1587        value_type: DebugType,
1588    },
1589}
1590
1591impl CollectionKind {
1592    pub fn is_bounded(&self) -> bool {
1593        matches!(
1594            self,
1595            CollectionKind::Stream {
1596                bound: BoundKind::Bounded,
1597                ..
1598            } | CollectionKind::Singleton {
1599                bound: BoundKind::Bounded,
1600                ..
1601            } | CollectionKind::Optional {
1602                bound: BoundKind::Bounded,
1603                ..
1604            } | CollectionKind::KeyedStream {
1605                bound: BoundKind::Bounded,
1606                ..
1607            } | CollectionKind::KeyedSingleton {
1608                bound: KeyedSingletonBoundKind::Bounded,
1609                ..
1610            }
1611        )
1612    }
1613}
1614
1615#[derive(Clone)]
1616pub struct HydroIrMetadata {
1617    pub location_id: LocationId,
1618    pub collection_kind: CollectionKind,
1619    pub cardinality: Option<usize>,
1620    pub tag: Option<String>,
1621    pub op: HydroIrOpMetadata,
1622}
1623
1624// HydroIrMetadata shouldn't be used to hash or compare
1625impl Hash for HydroIrMetadata {
1626    fn hash<H: Hasher>(&self, _: &mut H) {}
1627}
1628
1629impl PartialEq for HydroIrMetadata {
1630    fn eq(&self, _: &Self) -> bool {
1631        true
1632    }
1633}
1634
1635impl Eq for HydroIrMetadata {}
1636
1637impl Debug for HydroIrMetadata {
1638    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1639        f.debug_struct("HydroIrMetadata")
1640            .field("location_id", &self.location_id)
1641            .field("collection_kind", &self.collection_kind)
1642            .finish()
1643    }
1644}
1645
1646/// Metadata that is specific to the operator itself, rather than its outputs.
1647/// This is available on _both_ inner nodes and roots.
1648#[derive(Clone)]
1649pub struct HydroIrOpMetadata {
1650    pub backtrace: Backtrace,
1651    pub cpu_usage: Option<f64>,
1652    pub network_recv_cpu_usage: Option<f64>,
1653    pub id: Option<usize>,
1654}
1655
1656impl HydroIrOpMetadata {
1657    #[expect(
1658        clippy::new_without_default,
1659        reason = "explicit calls to new ensure correct backtrace bounds"
1660    )]
1661    pub fn new() -> HydroIrOpMetadata {
1662        Self::new_with_skip(1)
1663    }
1664
1665    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1666        HydroIrOpMetadata {
1667            backtrace: Backtrace::get_backtrace(2 + skip_count),
1668            cpu_usage: None,
1669            network_recv_cpu_usage: None,
1670            id: None,
1671        }
1672    }
1673}
1674
1675impl Debug for HydroIrOpMetadata {
1676    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1677        f.debug_struct("HydroIrOpMetadata").finish()
1678    }
1679}
1680
1681impl Hash for HydroIrOpMetadata {
1682    fn hash<H: Hasher>(&self, _: &mut H) {}
1683}
1684
1685/// An intermediate node in a Hydro graph, which consumes data
1686/// from upstream nodes and emits data to downstream nodes.
1687#[derive(Debug, Hash)]
1688pub enum HydroNode {
1689    Placeholder,
1690
1691    /// Manually "casts" between two different collection kinds.
1692    ///
1693    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1694    /// correctness checks. In particular, the user must ensure that every possible
1695    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1696    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1697    /// collection. This ensures that the simulator does not miss any possible outputs.
1698    Cast {
1699        inner: Box<HydroNode>,
1700        metadata: HydroIrMetadata,
1701    },
1702
1703    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1704    /// interpretation of the input stream.
1705    ///
1706    /// In production, this simply passes through the input, but in simulation, this operator
1707    /// explicitly selects a randomized interpretation.
1708    ObserveNonDet {
1709        inner: Box<HydroNode>,
1710        trusted: bool, // if true, we do not need to simulate non-determinism
1711        metadata: HydroIrMetadata,
1712    },
1713
1714    Source {
1715        source: HydroSource,
1716        metadata: HydroIrMetadata,
1717    },
1718
1719    SingletonSource {
1720        value: DebugExpr,
1721        first_tick_only: bool,
1722        metadata: HydroIrMetadata,
1723    },
1724
1725    CycleSource {
1726        cycle_id: CycleId,
1727        metadata: HydroIrMetadata,
1728    },
1729
1730    Tee {
1731        inner: SharedNode,
1732        metadata: HydroIrMetadata,
1733    },
1734
1735    Partition {
1736        inner: SharedNode,
1737        f: DebugExpr,
1738        is_true: bool,
1739        metadata: HydroIrMetadata,
1740    },
1741
1742    BeginAtomic {
1743        inner: Box<HydroNode>,
1744        metadata: HydroIrMetadata,
1745    },
1746
1747    EndAtomic {
1748        inner: Box<HydroNode>,
1749        metadata: HydroIrMetadata,
1750    },
1751
1752    Batch {
1753        inner: Box<HydroNode>,
1754        metadata: HydroIrMetadata,
1755    },
1756
1757    YieldConcat {
1758        inner: Box<HydroNode>,
1759        metadata: HydroIrMetadata,
1760    },
1761
1762    Chain {
1763        first: Box<HydroNode>,
1764        second: Box<HydroNode>,
1765        metadata: HydroIrMetadata,
1766    },
1767
1768    ChainFirst {
1769        first: Box<HydroNode>,
1770        second: Box<HydroNode>,
1771        metadata: HydroIrMetadata,
1772    },
1773
1774    CrossProduct {
1775        left: Box<HydroNode>,
1776        right: Box<HydroNode>,
1777        metadata: HydroIrMetadata,
1778    },
1779
1780    CrossSingleton {
1781        left: Box<HydroNode>,
1782        right: Box<HydroNode>,
1783        metadata: HydroIrMetadata,
1784    },
1785
1786    Join {
1787        left: Box<HydroNode>,
1788        right: Box<HydroNode>,
1789        metadata: HydroIrMetadata,
1790    },
1791
1792    Difference {
1793        pos: Box<HydroNode>,
1794        neg: Box<HydroNode>,
1795        metadata: HydroIrMetadata,
1796    },
1797
1798    AntiJoin {
1799        pos: Box<HydroNode>,
1800        neg: Box<HydroNode>,
1801        metadata: HydroIrMetadata,
1802    },
1803
1804    ResolveFutures {
1805        input: Box<HydroNode>,
1806        metadata: HydroIrMetadata,
1807    },
1808    ResolveFuturesOrdered {
1809        input: Box<HydroNode>,
1810        metadata: HydroIrMetadata,
1811    },
1812
1813    Map {
1814        f: DebugExpr,
1815        input: Box<HydroNode>,
1816        metadata: HydroIrMetadata,
1817    },
1818    FlatMap {
1819        f: DebugExpr,
1820        input: Box<HydroNode>,
1821        metadata: HydroIrMetadata,
1822    },
1823    Filter {
1824        f: DebugExpr,
1825        input: Box<HydroNode>,
1826        metadata: HydroIrMetadata,
1827    },
1828    FilterMap {
1829        f: DebugExpr,
1830        input: Box<HydroNode>,
1831        metadata: HydroIrMetadata,
1832    },
1833
1834    DeferTick {
1835        input: Box<HydroNode>,
1836        metadata: HydroIrMetadata,
1837    },
1838    Enumerate {
1839        input: Box<HydroNode>,
1840        metadata: HydroIrMetadata,
1841    },
1842    Inspect {
1843        f: DebugExpr,
1844        input: Box<HydroNode>,
1845        metadata: HydroIrMetadata,
1846    },
1847
1848    Unique {
1849        input: Box<HydroNode>,
1850        metadata: HydroIrMetadata,
1851    },
1852
1853    Sort {
1854        input: Box<HydroNode>,
1855        metadata: HydroIrMetadata,
1856    },
1857    Fold {
1858        init: DebugExpr,
1859        acc: DebugExpr,
1860        input: Box<HydroNode>,
1861        metadata: HydroIrMetadata,
1862    },
1863
1864    Scan {
1865        init: DebugExpr,
1866        acc: DebugExpr,
1867        input: Box<HydroNode>,
1868        metadata: HydroIrMetadata,
1869    },
1870    FoldKeyed {
1871        init: DebugExpr,
1872        acc: DebugExpr,
1873        input: Box<HydroNode>,
1874        metadata: HydroIrMetadata,
1875    },
1876
1877    Reduce {
1878        f: DebugExpr,
1879        input: Box<HydroNode>,
1880        metadata: HydroIrMetadata,
1881    },
1882    ReduceKeyed {
1883        f: DebugExpr,
1884        input: Box<HydroNode>,
1885        metadata: HydroIrMetadata,
1886    },
1887    ReduceKeyedWatermark {
1888        f: DebugExpr,
1889        input: Box<HydroNode>,
1890        watermark: Box<HydroNode>,
1891        metadata: HydroIrMetadata,
1892    },
1893
1894    Network {
1895        name: Option<String>,
1896        networking_info: crate::networking::NetworkingInfo,
1897        serialize_fn: Option<DebugExpr>,
1898        instantiate_fn: DebugInstantiate,
1899        deserialize_fn: Option<DebugExpr>,
1900        input: Box<HydroNode>,
1901        metadata: HydroIrMetadata,
1902    },
1903
1904    ExternalInput {
1905        from_external_key: LocationKey,
1906        from_port_id: ExternalPortId,
1907        from_many: bool,
1908        codec_type: DebugType,
1909        port_hint: NetworkHint,
1910        instantiate_fn: DebugInstantiate,
1911        deserialize_fn: Option<DebugExpr>,
1912        metadata: HydroIrMetadata,
1913    },
1914
1915    Counter {
1916        tag: String,
1917        duration: DebugExpr,
1918        prefix: String,
1919        input: Box<HydroNode>,
1920        metadata: HydroIrMetadata,
1921    },
1922}
1923
1924pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1925pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1926
1927impl HydroNode {
1928    pub fn transform_bottom_up(
1929        &mut self,
1930        transform: &mut impl FnMut(&mut HydroNode),
1931        seen_tees: &mut SeenSharedNodes,
1932        check_well_formed: bool,
1933    ) {
1934        self.transform_children(
1935            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1936            seen_tees,
1937        );
1938
1939        transform(self);
1940
1941        let self_location = self.metadata().location_id.root();
1942
1943        if check_well_formed {
1944            match &*self {
1945                HydroNode::Network { .. } => {}
1946                _ => {
1947                    self.input_metadata().iter().for_each(|i| {
1948                        if i.location_id.root() != self_location {
1949                            panic!(
1950                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1951                                i,
1952                                i.location_id.root(),
1953                                self,
1954                                self_location
1955                            )
1956                        }
1957                    });
1958                }
1959            }
1960        }
1961    }
1962
1963    #[inline(always)]
1964    pub fn transform_children(
1965        &mut self,
1966        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1967        seen_tees: &mut SeenSharedNodes,
1968    ) {
1969        match self {
1970            HydroNode::Placeholder => {
1971                panic!();
1972            }
1973
1974            HydroNode::Source { .. }
1975            | HydroNode::SingletonSource { .. }
1976            | HydroNode::CycleSource { .. }
1977            | HydroNode::ExternalInput { .. } => {}
1978
1979            HydroNode::Tee { inner, .. } => {
1980                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1981                    *inner = SharedNode(transformed.clone());
1982                } else {
1983                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1984                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1985                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1986                    transform(&mut orig, seen_tees);
1987                    *transformed_cell.borrow_mut() = orig;
1988                    *inner = SharedNode(transformed_cell);
1989                }
1990            }
1991
1992            HydroNode::Partition { inner, .. } => {
1993                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1994                    *inner = SharedNode(transformed.clone());
1995                } else {
1996                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1997                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1998                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1999                    transform(&mut orig, seen_tees);
2000                    *transformed_cell.borrow_mut() = orig;
2001                    *inner = SharedNode(transformed_cell);
2002                }
2003            }
2004
2005            HydroNode::Cast { inner, .. }
2006            | HydroNode::ObserveNonDet { inner, .. }
2007            | HydroNode::BeginAtomic { inner, .. }
2008            | HydroNode::EndAtomic { inner, .. }
2009            | HydroNode::Batch { inner, .. }
2010            | HydroNode::YieldConcat { inner, .. } => {
2011                transform(inner.as_mut(), seen_tees);
2012            }
2013
2014            HydroNode::Chain { first, second, .. } => {
2015                transform(first.as_mut(), seen_tees);
2016                transform(second.as_mut(), seen_tees);
2017            }
2018
2019            HydroNode::ChainFirst { first, second, .. } => {
2020                transform(first.as_mut(), seen_tees);
2021                transform(second.as_mut(), seen_tees);
2022            }
2023
2024            HydroNode::CrossSingleton { left, right, .. }
2025            | HydroNode::CrossProduct { left, right, .. }
2026            | HydroNode::Join { left, right, .. } => {
2027                transform(left.as_mut(), seen_tees);
2028                transform(right.as_mut(), seen_tees);
2029            }
2030
2031            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2032                transform(pos.as_mut(), seen_tees);
2033                transform(neg.as_mut(), seen_tees);
2034            }
2035
2036            HydroNode::ReduceKeyedWatermark {
2037                input, watermark, ..
2038            } => {
2039                transform(input.as_mut(), seen_tees);
2040                transform(watermark.as_mut(), seen_tees);
2041            }
2042
2043            HydroNode::Map { input, .. }
2044            | HydroNode::ResolveFutures { input, .. }
2045            | HydroNode::ResolveFuturesOrdered { input, .. }
2046            | HydroNode::FlatMap { input, .. }
2047            | HydroNode::Filter { input, .. }
2048            | HydroNode::FilterMap { input, .. }
2049            | HydroNode::Sort { input, .. }
2050            | HydroNode::DeferTick { input, .. }
2051            | HydroNode::Enumerate { input, .. }
2052            | HydroNode::Inspect { input, .. }
2053            | HydroNode::Unique { input, .. }
2054            | HydroNode::Network { input, .. }
2055            | HydroNode::Fold { input, .. }
2056            | HydroNode::Scan { input, .. }
2057            | HydroNode::FoldKeyed { input, .. }
2058            | HydroNode::Reduce { input, .. }
2059            | HydroNode::ReduceKeyed { input, .. }
2060            | HydroNode::Counter { input, .. } => {
2061                transform(input.as_mut(), seen_tees);
2062            }
2063        }
2064    }
2065
2066    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2067        match self {
2068            HydroNode::Placeholder => HydroNode::Placeholder,
2069            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2070                inner: Box::new(inner.deep_clone(seen_tees)),
2071                metadata: metadata.clone(),
2072            },
2073            HydroNode::ObserveNonDet {
2074                inner,
2075                trusted,
2076                metadata,
2077            } => HydroNode::ObserveNonDet {
2078                inner: Box::new(inner.deep_clone(seen_tees)),
2079                trusted: *trusted,
2080                metadata: metadata.clone(),
2081            },
2082            HydroNode::Source { source, metadata } => HydroNode::Source {
2083                source: source.clone(),
2084                metadata: metadata.clone(),
2085            },
2086            HydroNode::SingletonSource {
2087                value,
2088                first_tick_only,
2089                metadata,
2090            } => HydroNode::SingletonSource {
2091                value: value.clone(),
2092                first_tick_only: *first_tick_only,
2093                metadata: metadata.clone(),
2094            },
2095            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2096                cycle_id: *cycle_id,
2097                metadata: metadata.clone(),
2098            },
2099            HydroNode::Tee { inner, metadata } => {
2100                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2101                    HydroNode::Tee {
2102                        inner: SharedNode(transformed.clone()),
2103                        metadata: metadata.clone(),
2104                    }
2105                } else {
2106                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2107                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2108                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2109                    *new_rc.borrow_mut() = cloned;
2110                    HydroNode::Tee {
2111                        inner: SharedNode(new_rc),
2112                        metadata: metadata.clone(),
2113                    }
2114                }
2115            }
2116            HydroNode::Partition {
2117                inner,
2118                f,
2119                is_true,
2120                metadata,
2121            } => {
2122                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2123                    HydroNode::Partition {
2124                        inner: SharedNode(transformed.clone()),
2125                        f: f.clone(),
2126                        is_true: *is_true,
2127                        metadata: metadata.clone(),
2128                    }
2129                } else {
2130                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2131                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2132                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2133                    *new_rc.borrow_mut() = cloned;
2134                    HydroNode::Partition {
2135                        inner: SharedNode(new_rc),
2136                        f: f.clone(),
2137                        is_true: *is_true,
2138                        metadata: metadata.clone(),
2139                    }
2140                }
2141            }
2142            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2143                inner: Box::new(inner.deep_clone(seen_tees)),
2144                metadata: metadata.clone(),
2145            },
2146            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2147                inner: Box::new(inner.deep_clone(seen_tees)),
2148                metadata: metadata.clone(),
2149            },
2150            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2151                inner: Box::new(inner.deep_clone(seen_tees)),
2152                metadata: metadata.clone(),
2153            },
2154            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2155                inner: Box::new(inner.deep_clone(seen_tees)),
2156                metadata: metadata.clone(),
2157            },
2158            HydroNode::Chain {
2159                first,
2160                second,
2161                metadata,
2162            } => HydroNode::Chain {
2163                first: Box::new(first.deep_clone(seen_tees)),
2164                second: Box::new(second.deep_clone(seen_tees)),
2165                metadata: metadata.clone(),
2166            },
2167            HydroNode::ChainFirst {
2168                first,
2169                second,
2170                metadata,
2171            } => HydroNode::ChainFirst {
2172                first: Box::new(first.deep_clone(seen_tees)),
2173                second: Box::new(second.deep_clone(seen_tees)),
2174                metadata: metadata.clone(),
2175            },
2176            HydroNode::CrossProduct {
2177                left,
2178                right,
2179                metadata,
2180            } => HydroNode::CrossProduct {
2181                left: Box::new(left.deep_clone(seen_tees)),
2182                right: Box::new(right.deep_clone(seen_tees)),
2183                metadata: metadata.clone(),
2184            },
2185            HydroNode::CrossSingleton {
2186                left,
2187                right,
2188                metadata,
2189            } => HydroNode::CrossSingleton {
2190                left: Box::new(left.deep_clone(seen_tees)),
2191                right: Box::new(right.deep_clone(seen_tees)),
2192                metadata: metadata.clone(),
2193            },
2194            HydroNode::Join {
2195                left,
2196                right,
2197                metadata,
2198            } => HydroNode::Join {
2199                left: Box::new(left.deep_clone(seen_tees)),
2200                right: Box::new(right.deep_clone(seen_tees)),
2201                metadata: metadata.clone(),
2202            },
2203            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2204                pos: Box::new(pos.deep_clone(seen_tees)),
2205                neg: Box::new(neg.deep_clone(seen_tees)),
2206                metadata: metadata.clone(),
2207            },
2208            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2209                pos: Box::new(pos.deep_clone(seen_tees)),
2210                neg: Box::new(neg.deep_clone(seen_tees)),
2211                metadata: metadata.clone(),
2212            },
2213            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2214                input: Box::new(input.deep_clone(seen_tees)),
2215                metadata: metadata.clone(),
2216            },
2217            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2218                HydroNode::ResolveFuturesOrdered {
2219                    input: Box::new(input.deep_clone(seen_tees)),
2220                    metadata: metadata.clone(),
2221                }
2222            }
2223            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2224                f: f.clone(),
2225                input: Box::new(input.deep_clone(seen_tees)),
2226                metadata: metadata.clone(),
2227            },
2228            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2229                f: f.clone(),
2230                input: Box::new(input.deep_clone(seen_tees)),
2231                metadata: metadata.clone(),
2232            },
2233            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2234                f: f.clone(),
2235                input: Box::new(input.deep_clone(seen_tees)),
2236                metadata: metadata.clone(),
2237            },
2238            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2239                f: f.clone(),
2240                input: Box::new(input.deep_clone(seen_tees)),
2241                metadata: metadata.clone(),
2242            },
2243            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2244                input: Box::new(input.deep_clone(seen_tees)),
2245                metadata: metadata.clone(),
2246            },
2247            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2248                input: Box::new(input.deep_clone(seen_tees)),
2249                metadata: metadata.clone(),
2250            },
2251            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2252                f: f.clone(),
2253                input: Box::new(input.deep_clone(seen_tees)),
2254                metadata: metadata.clone(),
2255            },
2256            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2257                input: Box::new(input.deep_clone(seen_tees)),
2258                metadata: metadata.clone(),
2259            },
2260            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2261                input: Box::new(input.deep_clone(seen_tees)),
2262                metadata: metadata.clone(),
2263            },
2264            HydroNode::Fold {
2265                init,
2266                acc,
2267                input,
2268                metadata,
2269            } => HydroNode::Fold {
2270                init: init.clone(),
2271                acc: acc.clone(),
2272                input: Box::new(input.deep_clone(seen_tees)),
2273                metadata: metadata.clone(),
2274            },
2275            HydroNode::Scan {
2276                init,
2277                acc,
2278                input,
2279                metadata,
2280            } => HydroNode::Scan {
2281                init: init.clone(),
2282                acc: acc.clone(),
2283                input: Box::new(input.deep_clone(seen_tees)),
2284                metadata: metadata.clone(),
2285            },
2286            HydroNode::FoldKeyed {
2287                init,
2288                acc,
2289                input,
2290                metadata,
2291            } => HydroNode::FoldKeyed {
2292                init: init.clone(),
2293                acc: acc.clone(),
2294                input: Box::new(input.deep_clone(seen_tees)),
2295                metadata: metadata.clone(),
2296            },
2297            HydroNode::ReduceKeyedWatermark {
2298                f,
2299                input,
2300                watermark,
2301                metadata,
2302            } => HydroNode::ReduceKeyedWatermark {
2303                f: f.clone(),
2304                input: Box::new(input.deep_clone(seen_tees)),
2305                watermark: Box::new(watermark.deep_clone(seen_tees)),
2306                metadata: metadata.clone(),
2307            },
2308            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2309                f: f.clone(),
2310                input: Box::new(input.deep_clone(seen_tees)),
2311                metadata: metadata.clone(),
2312            },
2313            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2314                f: f.clone(),
2315                input: Box::new(input.deep_clone(seen_tees)),
2316                metadata: metadata.clone(),
2317            },
2318            HydroNode::Network {
2319                name,
2320                networking_info,
2321                serialize_fn,
2322                instantiate_fn,
2323                deserialize_fn,
2324                input,
2325                metadata,
2326            } => HydroNode::Network {
2327                name: name.clone(),
2328                networking_info: networking_info.clone(),
2329                serialize_fn: serialize_fn.clone(),
2330                instantiate_fn: instantiate_fn.clone(),
2331                deserialize_fn: deserialize_fn.clone(),
2332                input: Box::new(input.deep_clone(seen_tees)),
2333                metadata: metadata.clone(),
2334            },
2335            HydroNode::ExternalInput {
2336                from_external_key,
2337                from_port_id,
2338                from_many,
2339                codec_type,
2340                port_hint,
2341                instantiate_fn,
2342                deserialize_fn,
2343                metadata,
2344            } => HydroNode::ExternalInput {
2345                from_external_key: *from_external_key,
2346                from_port_id: *from_port_id,
2347                from_many: *from_many,
2348                codec_type: codec_type.clone(),
2349                port_hint: *port_hint,
2350                instantiate_fn: instantiate_fn.clone(),
2351                deserialize_fn: deserialize_fn.clone(),
2352                metadata: metadata.clone(),
2353            },
2354            HydroNode::Counter {
2355                tag,
2356                duration,
2357                prefix,
2358                input,
2359                metadata,
2360            } => HydroNode::Counter {
2361                tag: tag.clone(),
2362                duration: duration.clone(),
2363                prefix: prefix.clone(),
2364                input: Box::new(input.deep_clone(seen_tees)),
2365                metadata: metadata.clone(),
2366            },
2367        }
2368    }
2369
2370    #[cfg(feature = "build")]
2371    pub fn emit_core(
2372        &mut self,
2373        builders_or_callback: &mut BuildersOrCallback<
2374            impl FnMut(&mut HydroRoot, &mut usize),
2375            impl FnMut(&mut HydroNode, &mut usize),
2376        >,
2377        seen_tees: &mut SeenSharedNodes,
2378        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2379        next_stmt_id: &mut usize,
2380    ) -> syn::Ident {
2381        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2382
2383        self.transform_bottom_up(
2384            &mut |node: &mut HydroNode| {
2385                let out_location = node.metadata().location_id.clone();
2386                match node {
2387                    HydroNode::Placeholder => {
2388                        panic!()
2389                    }
2390
2391                    HydroNode::Cast { .. } => {
2392                        // Cast passes through the input ident unchanged
2393                        // The input ident is already on the stack from processing the child
2394                        match builders_or_callback {
2395                            BuildersOrCallback::Builders(_) => {}
2396                            BuildersOrCallback::Callback(_, node_callback) => {
2397                                node_callback(node, next_stmt_id);
2398                            }
2399                        }
2400
2401                        *next_stmt_id += 1;
2402                        // input_ident stays on stack as output
2403                    }
2404
2405                    HydroNode::ObserveNonDet {
2406                        inner,
2407                        trusted,
2408                        metadata,
2409                        ..
2410                    } => {
2411                        let inner_ident = ident_stack.pop().unwrap();
2412
2413                        let observe_ident =
2414                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2415
2416                        match builders_or_callback {
2417                            BuildersOrCallback::Builders(graph_builders) => {
2418                                graph_builders.observe_nondet(
2419                                    *trusted,
2420                                    &inner.metadata().location_id,
2421                                    inner_ident,
2422                                    &inner.metadata().collection_kind,
2423                                    &observe_ident,
2424                                    &metadata.collection_kind,
2425                                    &metadata.op,
2426                                );
2427                            }
2428                            BuildersOrCallback::Callback(_, node_callback) => {
2429                                node_callback(node, next_stmt_id);
2430                            }
2431                        }
2432
2433                        *next_stmt_id += 1;
2434
2435                        ident_stack.push(observe_ident);
2436                    }
2437
2438                    HydroNode::Batch {
2439                        inner, metadata, ..
2440                    } => {
2441                        let inner_ident = ident_stack.pop().unwrap();
2442
2443                        let batch_ident =
2444                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2445
2446                        match builders_or_callback {
2447                            BuildersOrCallback::Builders(graph_builders) => {
2448                                graph_builders.batch(
2449                                    inner_ident,
2450                                    &inner.metadata().location_id,
2451                                    &inner.metadata().collection_kind,
2452                                    &batch_ident,
2453                                    &out_location,
2454                                    &metadata.op,
2455                                );
2456                            }
2457                            BuildersOrCallback::Callback(_, node_callback) => {
2458                                node_callback(node, next_stmt_id);
2459                            }
2460                        }
2461
2462                        *next_stmt_id += 1;
2463
2464                        ident_stack.push(batch_ident);
2465                    }
2466
2467                    HydroNode::YieldConcat { inner, .. } => {
2468                        let inner_ident = ident_stack.pop().unwrap();
2469
2470                        let yield_ident =
2471                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2472
2473                        match builders_or_callback {
2474                            BuildersOrCallback::Builders(graph_builders) => {
2475                                graph_builders.yield_from_tick(
2476                                    inner_ident,
2477                                    &inner.metadata().location_id,
2478                                    &inner.metadata().collection_kind,
2479                                    &yield_ident,
2480                                    &out_location,
2481                                );
2482                            }
2483                            BuildersOrCallback::Callback(_, node_callback) => {
2484                                node_callback(node, next_stmt_id);
2485                            }
2486                        }
2487
2488                        *next_stmt_id += 1;
2489
2490                        ident_stack.push(yield_ident);
2491                    }
2492
2493                    HydroNode::BeginAtomic { inner, metadata } => {
2494                        let inner_ident = ident_stack.pop().unwrap();
2495
2496                        let begin_ident =
2497                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2498
2499                        match builders_or_callback {
2500                            BuildersOrCallback::Builders(graph_builders) => {
2501                                graph_builders.begin_atomic(
2502                                    inner_ident,
2503                                    &inner.metadata().location_id,
2504                                    &inner.metadata().collection_kind,
2505                                    &begin_ident,
2506                                    &out_location,
2507                                    &metadata.op,
2508                                );
2509                            }
2510                            BuildersOrCallback::Callback(_, node_callback) => {
2511                                node_callback(node, next_stmt_id);
2512                            }
2513                        }
2514
2515                        *next_stmt_id += 1;
2516
2517                        ident_stack.push(begin_ident);
2518                    }
2519
2520                    HydroNode::EndAtomic { inner, .. } => {
2521                        let inner_ident = ident_stack.pop().unwrap();
2522
2523                        let end_ident =
2524                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2525
2526                        match builders_or_callback {
2527                            BuildersOrCallback::Builders(graph_builders) => {
2528                                graph_builders.end_atomic(
2529                                    inner_ident,
2530                                    &inner.metadata().location_id,
2531                                    &inner.metadata().collection_kind,
2532                                    &end_ident,
2533                                );
2534                            }
2535                            BuildersOrCallback::Callback(_, node_callback) => {
2536                                node_callback(node, next_stmt_id);
2537                            }
2538                        }
2539
2540                        *next_stmt_id += 1;
2541
2542                        ident_stack.push(end_ident);
2543                    }
2544
2545                    HydroNode::Source {
2546                        source, metadata, ..
2547                    } => {
2548                        if let HydroSource::ExternalNetwork() = source {
2549                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2550                        } else {
2551                            let source_ident =
2552                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2553
2554                            let source_stmt = match source {
2555                                HydroSource::Stream(expr) => {
2556                                    debug_assert!(metadata.location_id.is_top_level());
2557                                    parse_quote! {
2558                                        #source_ident = source_stream(#expr);
2559                                    }
2560                                }
2561
2562                                HydroSource::ExternalNetwork() => {
2563                                    unreachable!()
2564                                }
2565
2566                                HydroSource::Iter(expr) => {
2567                                    if metadata.location_id.is_top_level() {
2568                                        parse_quote! {
2569                                            #source_ident = source_iter(#expr);
2570                                        }
2571                                    } else {
2572                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2573                                        parse_quote! {
2574                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2575                                        }
2576                                    }
2577                                }
2578
2579                                HydroSource::Spin() => {
2580                                    debug_assert!(metadata.location_id.is_top_level());
2581                                    parse_quote! {
2582                                        #source_ident = spin();
2583                                    }
2584                                }
2585
2586                                HydroSource::ClusterMembers(target_loc, state) => {
2587                                    debug_assert!(metadata.location_id.is_top_level());
2588
2589                                    let members_tee_ident = syn::Ident::new(
2590                                        &format!(
2591                                            "__cluster_members_tee_{}_{}",
2592                                            metadata.location_id.root().key(),
2593                                            target_loc.key(),
2594                                        ),
2595                                        Span::call_site(),
2596                                    );
2597
2598                                    match state {
2599                                        ClusterMembersState::Stream(d) => {
2600                                            parse_quote! {
2601                                                #members_tee_ident = source_stream(#d) -> tee();
2602                                                #source_ident = #members_tee_ident;
2603                                            }
2604                                        },
2605                                        ClusterMembersState::Uninit => syn::parse_quote! {
2606                                            #source_ident = source_stream(DUMMY);
2607                                        },
2608                                        ClusterMembersState::Tee(..) => parse_quote! {
2609                                            #source_ident = #members_tee_ident;
2610                                        },
2611                                    }
2612                                }
2613
2614                                HydroSource::Embedded(ident) => {
2615                                    parse_quote! {
2616                                        #source_ident = source_stream(#ident);
2617                                    }
2618                                }
2619
2620                                HydroSource::EmbeddedSingleton(ident) => {
2621                                    parse_quote! {
2622                                        #source_ident = source_iter([#ident]);
2623                                    }
2624                                }
2625                            };
2626
2627                            match builders_or_callback {
2628                                BuildersOrCallback::Builders(graph_builders) => {
2629                                    let builder = graph_builders.get_dfir_mut(&out_location);
2630                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2631                                }
2632                                BuildersOrCallback::Callback(_, node_callback) => {
2633                                    node_callback(node, next_stmt_id);
2634                                }
2635                            }
2636
2637                            *next_stmt_id += 1;
2638
2639                            ident_stack.push(source_ident);
2640                        }
2641                    }
2642
2643                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2644                        let source_ident =
2645                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2646
2647                        match builders_or_callback {
2648                            BuildersOrCallback::Builders(graph_builders) => {
2649                                let builder = graph_builders.get_dfir_mut(&out_location);
2650
2651                                if *first_tick_only {
2652                                    assert!(
2653                                        !metadata.location_id.is_top_level(),
2654                                        "first_tick_only SingletonSource must be inside a tick"
2655                                    );
2656                                }
2657
2658                                if *first_tick_only
2659                                    || (metadata.location_id.is_top_level()
2660                                        && metadata.collection_kind.is_bounded())
2661                                {
2662                                    builder.add_dfir(
2663                                        parse_quote! {
2664                                            #source_ident = source_iter([#value]);
2665                                        },
2666                                        None,
2667                                        Some(&next_stmt_id.to_string()),
2668                                    );
2669                                } else {
2670                                    builder.add_dfir(
2671                                        parse_quote! {
2672                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2673                                        },
2674                                        None,
2675                                        Some(&next_stmt_id.to_string()),
2676                                    );
2677                                }
2678                            }
2679                            BuildersOrCallback::Callback(_, node_callback) => {
2680                                node_callback(node, next_stmt_id);
2681                            }
2682                        }
2683
2684                        *next_stmt_id += 1;
2685
2686                        ident_stack.push(source_ident);
2687                    }
2688
2689                    HydroNode::CycleSource { cycle_id, .. } => {
2690                        let ident = cycle_id.as_ident();
2691
2692                        match builders_or_callback {
2693                            BuildersOrCallback::Builders(_) => {}
2694                            BuildersOrCallback::Callback(_, node_callback) => {
2695                                node_callback(node, next_stmt_id);
2696                            }
2697                        }
2698
2699                        // consume a stmt id even though we did not emit anything so that we can instrument this
2700                        *next_stmt_id += 1;
2701
2702                        ident_stack.push(ident);
2703                    }
2704
2705                    HydroNode::Tee { inner, .. } => {
2706                        let ret_ident = if let Some(built_idents) =
2707                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2708                        {
2709                            match builders_or_callback {
2710                                BuildersOrCallback::Builders(_) => {}
2711                                BuildersOrCallback::Callback(_, node_callback) => {
2712                                    node_callback(node, next_stmt_id);
2713                                }
2714                            }
2715
2716                            built_idents[0].clone()
2717                        } else {
2718                            // The inner node was already processed by transform_bottom_up,
2719                            // so its ident is on the stack
2720                            let inner_ident = ident_stack.pop().unwrap();
2721
2722                            let tee_ident =
2723                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2724
2725                            built_tees.insert(
2726                                inner.0.as_ref() as *const RefCell<HydroNode>,
2727                                vec![tee_ident.clone()],
2728                            );
2729
2730                            match builders_or_callback {
2731                                BuildersOrCallback::Builders(graph_builders) => {
2732                                    let builder = graph_builders.get_dfir_mut(&out_location);
2733                                    builder.add_dfir(
2734                                        parse_quote! {
2735                                            #tee_ident = #inner_ident -> tee();
2736                                        },
2737                                        None,
2738                                        Some(&next_stmt_id.to_string()),
2739                                    );
2740                                }
2741                                BuildersOrCallback::Callback(_, node_callback) => {
2742                                    node_callback(node, next_stmt_id);
2743                                }
2744                            }
2745
2746                            tee_ident
2747                        };
2748
2749                        // we consume a stmt id regardless of if we emit the tee() operator,
2750                        // so that during rewrites we touch all recipients of the tee()
2751
2752                        *next_stmt_id += 1;
2753                        ident_stack.push(ret_ident);
2754                    }
2755
2756                    HydroNode::Partition {
2757                        inner, f, is_true, ..
2758                    } => {
2759                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
2760                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2761                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2762                            match builders_or_callback {
2763                                BuildersOrCallback::Builders(_) => {}
2764                                BuildersOrCallback::Callback(_, node_callback) => {
2765                                    node_callback(node, next_stmt_id);
2766                                }
2767                            }
2768
2769                            let idx = if is_true { 0 } else { 1 };
2770                            built_idents[idx].clone()
2771                        } else {
2772                            // The inner node was already processed by transform_bottom_up,
2773                            // so its ident is on the stack
2774                            let inner_ident = ident_stack.pop().unwrap();
2775
2776                            let partition_ident = syn::Ident::new(
2777                                &format!("stream_{}_partition", *next_stmt_id),
2778                                Span::call_site(),
2779                            );
2780                            let true_ident = syn::Ident::new(
2781                                &format!("stream_{}_true", *next_stmt_id),
2782                                Span::call_site(),
2783                            );
2784                            let false_ident = syn::Ident::new(
2785                                &format!("stream_{}_false", *next_stmt_id),
2786                                Span::call_site(),
2787                            );
2788
2789                            built_tees.insert(
2790                                ptr,
2791                                vec![true_ident.clone(), false_ident.clone()],
2792                            );
2793
2794                            match builders_or_callback {
2795                                BuildersOrCallback::Builders(graph_builders) => {
2796                                    let builder = graph_builders.get_dfir_mut(&out_location);
2797                                    builder.add_dfir(
2798                                        parse_quote! {
2799                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2800                                            #true_ident = #partition_ident[0];
2801                                            #false_ident = #partition_ident[1];
2802                                        },
2803                                        None,
2804                                        Some(&next_stmt_id.to_string()),
2805                                    );
2806                                }
2807                                BuildersOrCallback::Callback(_, node_callback) => {
2808                                    node_callback(node, next_stmt_id);
2809                                }
2810                            }
2811
2812                            if is_true { true_ident } else { false_ident }
2813                        };
2814
2815                        *next_stmt_id += 1;
2816                        ident_stack.push(ret_ident);
2817                    }
2818
2819                    HydroNode::Chain { .. } => {
2820                        // Children are processed left-to-right, so second is on top
2821                        let second_ident = ident_stack.pop().unwrap();
2822                        let first_ident = ident_stack.pop().unwrap();
2823
2824                        let chain_ident =
2825                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2826
2827                        match builders_or_callback {
2828                            BuildersOrCallback::Builders(graph_builders) => {
2829                                let builder = graph_builders.get_dfir_mut(&out_location);
2830                                builder.add_dfir(
2831                                    parse_quote! {
2832                                        #chain_ident = chain();
2833                                        #first_ident -> [0]#chain_ident;
2834                                        #second_ident -> [1]#chain_ident;
2835                                    },
2836                                    None,
2837                                    Some(&next_stmt_id.to_string()),
2838                                );
2839                            }
2840                            BuildersOrCallback::Callback(_, node_callback) => {
2841                                node_callback(node, next_stmt_id);
2842                            }
2843                        }
2844
2845                        *next_stmt_id += 1;
2846
2847                        ident_stack.push(chain_ident);
2848                    }
2849
2850                    HydroNode::ChainFirst { .. } => {
2851                        let second_ident = ident_stack.pop().unwrap();
2852                        let first_ident = ident_stack.pop().unwrap();
2853
2854                        let chain_ident =
2855                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2856
2857                        match builders_or_callback {
2858                            BuildersOrCallback::Builders(graph_builders) => {
2859                                let builder = graph_builders.get_dfir_mut(&out_location);
2860                                builder.add_dfir(
2861                                    parse_quote! {
2862                                        #chain_ident = chain_first_n(1);
2863                                        #first_ident -> [0]#chain_ident;
2864                                        #second_ident -> [1]#chain_ident;
2865                                    },
2866                                    None,
2867                                    Some(&next_stmt_id.to_string()),
2868                                );
2869                            }
2870                            BuildersOrCallback::Callback(_, node_callback) => {
2871                                node_callback(node, next_stmt_id);
2872                            }
2873                        }
2874
2875                        *next_stmt_id += 1;
2876
2877                        ident_stack.push(chain_ident);
2878                    }
2879
2880                    HydroNode::CrossSingleton { right, .. } => {
2881                        let right_ident = ident_stack.pop().unwrap();
2882                        let left_ident = ident_stack.pop().unwrap();
2883
2884                        let cross_ident =
2885                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2886
2887                        match builders_or_callback {
2888                            BuildersOrCallback::Builders(graph_builders) => {
2889                                let builder = graph_builders.get_dfir_mut(&out_location);
2890
2891                                if right.metadata().location_id.is_top_level()
2892                                    && right.metadata().collection_kind.is_bounded()
2893                                {
2894                                    builder.add_dfir(
2895                                        parse_quote! {
2896                                            #cross_ident = cross_singleton();
2897                                            #left_ident -> [input]#cross_ident;
2898                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2899                                        },
2900                                        None,
2901                                        Some(&next_stmt_id.to_string()),
2902                                    );
2903                                } else {
2904                                    builder.add_dfir(
2905                                        parse_quote! {
2906                                            #cross_ident = cross_singleton();
2907                                            #left_ident -> [input]#cross_ident;
2908                                            #right_ident -> [single]#cross_ident;
2909                                        },
2910                                        None,
2911                                        Some(&next_stmt_id.to_string()),
2912                                    );
2913                                }
2914                            }
2915                            BuildersOrCallback::Callback(_, node_callback) => {
2916                                node_callback(node, next_stmt_id);
2917                            }
2918                        }
2919
2920                        *next_stmt_id += 1;
2921
2922                        ident_stack.push(cross_ident);
2923                    }
2924
2925                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2926                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2927                            parse_quote!(cross_join_multiset)
2928                        } else {
2929                            parse_quote!(join_multiset)
2930                        };
2931
2932                        let (HydroNode::CrossProduct { left, right, .. }
2933                        | HydroNode::Join { left, right, .. }) = node
2934                        else {
2935                            unreachable!()
2936                        };
2937
2938                        let is_top_level = left.metadata().location_id.is_top_level()
2939                            && right.metadata().location_id.is_top_level();
2940                        let left_lifetime = if left.metadata().location_id.is_top_level() {
2941                            quote!('static)
2942                        } else {
2943                            quote!('tick)
2944                        };
2945
2946                        let right_lifetime = if right.metadata().location_id.is_top_level() {
2947                            quote!('static)
2948                        } else {
2949                            quote!('tick)
2950                        };
2951
2952                        let right_ident = ident_stack.pop().unwrap();
2953                        let left_ident = ident_stack.pop().unwrap();
2954
2955                        let stream_ident =
2956                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2957
2958                        match builders_or_callback {
2959                            BuildersOrCallback::Builders(graph_builders) => {
2960                                let builder = graph_builders.get_dfir_mut(&out_location);
2961                                builder.add_dfir(
2962                                    if is_top_level {
2963                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
2964                                        // a multiset_delta() to negate the replay behavior
2965                                        parse_quote! {
2966                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2967                                            #left_ident -> [0]#stream_ident;
2968                                            #right_ident -> [1]#stream_ident;
2969                                        }
2970                                    } else {
2971                                        parse_quote! {
2972                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2973                                            #left_ident -> [0]#stream_ident;
2974                                            #right_ident -> [1]#stream_ident;
2975                                        }
2976                                    }
2977                                    ,
2978                                    None,
2979                                    Some(&next_stmt_id.to_string()),
2980                                );
2981                            }
2982                            BuildersOrCallback::Callback(_, node_callback) => {
2983                                node_callback(node, next_stmt_id);
2984                            }
2985                        }
2986
2987                        *next_stmt_id += 1;
2988
2989                        ident_stack.push(stream_ident);
2990                    }
2991
2992                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2993                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2994                            parse_quote!(difference)
2995                        } else {
2996                            parse_quote!(anti_join)
2997                        };
2998
2999                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3000                            node
3001                        else {
3002                            unreachable!()
3003                        };
3004
3005                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3006                            quote!('static)
3007                        } else {
3008                            quote!('tick)
3009                        };
3010
3011                        let neg_ident = ident_stack.pop().unwrap();
3012                        let pos_ident = ident_stack.pop().unwrap();
3013
3014                        let stream_ident =
3015                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3016
3017                        match builders_or_callback {
3018                            BuildersOrCallback::Builders(graph_builders) => {
3019                                let builder = graph_builders.get_dfir_mut(&out_location);
3020                                builder.add_dfir(
3021                                    parse_quote! {
3022                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3023                                        #pos_ident -> [pos]#stream_ident;
3024                                        #neg_ident -> [neg]#stream_ident;
3025                                    },
3026                                    None,
3027                                    Some(&next_stmt_id.to_string()),
3028                                );
3029                            }
3030                            BuildersOrCallback::Callback(_, node_callback) => {
3031                                node_callback(node, next_stmt_id);
3032                            }
3033                        }
3034
3035                        *next_stmt_id += 1;
3036
3037                        ident_stack.push(stream_ident);
3038                    }
3039
3040                    HydroNode::ResolveFutures { .. } => {
3041                        let input_ident = ident_stack.pop().unwrap();
3042
3043                        let futures_ident =
3044                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3045
3046                        match builders_or_callback {
3047                            BuildersOrCallback::Builders(graph_builders) => {
3048                                let builder = graph_builders.get_dfir_mut(&out_location);
3049                                builder.add_dfir(
3050                                    parse_quote! {
3051                                        #futures_ident = #input_ident -> resolve_futures();
3052                                    },
3053                                    None,
3054                                    Some(&next_stmt_id.to_string()),
3055                                );
3056                            }
3057                            BuildersOrCallback::Callback(_, node_callback) => {
3058                                node_callback(node, next_stmt_id);
3059                            }
3060                        }
3061
3062                        *next_stmt_id += 1;
3063
3064                        ident_stack.push(futures_ident);
3065                    }
3066
3067                    HydroNode::ResolveFuturesOrdered { .. } => {
3068                        let input_ident = ident_stack.pop().unwrap();
3069
3070                        let futures_ident =
3071                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3072
3073                        match builders_or_callback {
3074                            BuildersOrCallback::Builders(graph_builders) => {
3075                                let builder = graph_builders.get_dfir_mut(&out_location);
3076                                builder.add_dfir(
3077                                    parse_quote! {
3078                                        #futures_ident = #input_ident -> resolve_futures_ordered();
3079                                    },
3080                                    None,
3081                                    Some(&next_stmt_id.to_string()),
3082                                );
3083                            }
3084                            BuildersOrCallback::Callback(_, node_callback) => {
3085                                node_callback(node, next_stmt_id);
3086                            }
3087                        }
3088
3089                        *next_stmt_id += 1;
3090
3091                        ident_stack.push(futures_ident);
3092                    }
3093
3094                    HydroNode::Map { f, .. } => {
3095                        let input_ident = ident_stack.pop().unwrap();
3096
3097                        let map_ident =
3098                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3099
3100                        match builders_or_callback {
3101                            BuildersOrCallback::Builders(graph_builders) => {
3102                                let builder = graph_builders.get_dfir_mut(&out_location);
3103                                builder.add_dfir(
3104                                    parse_quote! {
3105                                        #map_ident = #input_ident -> map(#f);
3106                                    },
3107                                    None,
3108                                    Some(&next_stmt_id.to_string()),
3109                                );
3110                            }
3111                            BuildersOrCallback::Callback(_, node_callback) => {
3112                                node_callback(node, next_stmt_id);
3113                            }
3114                        }
3115
3116                        *next_stmt_id += 1;
3117
3118                        ident_stack.push(map_ident);
3119                    }
3120
3121                    HydroNode::FlatMap { f, .. } => {
3122                        let input_ident = ident_stack.pop().unwrap();
3123
3124                        let flat_map_ident =
3125                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3126
3127                        match builders_or_callback {
3128                            BuildersOrCallback::Builders(graph_builders) => {
3129                                let builder = graph_builders.get_dfir_mut(&out_location);
3130                                builder.add_dfir(
3131                                    parse_quote! {
3132                                        #flat_map_ident = #input_ident -> flat_map(#f);
3133                                    },
3134                                    None,
3135                                    Some(&next_stmt_id.to_string()),
3136                                );
3137                            }
3138                            BuildersOrCallback::Callback(_, node_callback) => {
3139                                node_callback(node, next_stmt_id);
3140                            }
3141                        }
3142
3143                        *next_stmt_id += 1;
3144
3145                        ident_stack.push(flat_map_ident);
3146                    }
3147
3148                    HydroNode::Filter { f, .. } => {
3149                        let input_ident = ident_stack.pop().unwrap();
3150
3151                        let filter_ident =
3152                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3153
3154                        match builders_or_callback {
3155                            BuildersOrCallback::Builders(graph_builders) => {
3156                                let builder = graph_builders.get_dfir_mut(&out_location);
3157                                builder.add_dfir(
3158                                    parse_quote! {
3159                                        #filter_ident = #input_ident -> filter(#f);
3160                                    },
3161                                    None,
3162                                    Some(&next_stmt_id.to_string()),
3163                                );
3164                            }
3165                            BuildersOrCallback::Callback(_, node_callback) => {
3166                                node_callback(node, next_stmt_id);
3167                            }
3168                        }
3169
3170                        *next_stmt_id += 1;
3171
3172                        ident_stack.push(filter_ident);
3173                    }
3174
3175                    HydroNode::FilterMap { f, .. } => {
3176                        let input_ident = ident_stack.pop().unwrap();
3177
3178                        let filter_map_ident =
3179                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3180
3181                        match builders_or_callback {
3182                            BuildersOrCallback::Builders(graph_builders) => {
3183                                let builder = graph_builders.get_dfir_mut(&out_location);
3184                                builder.add_dfir(
3185                                    parse_quote! {
3186                                        #filter_map_ident = #input_ident -> filter_map(#f);
3187                                    },
3188                                    None,
3189                                    Some(&next_stmt_id.to_string()),
3190                                );
3191                            }
3192                            BuildersOrCallback::Callback(_, node_callback) => {
3193                                node_callback(node, next_stmt_id);
3194                            }
3195                        }
3196
3197                        *next_stmt_id += 1;
3198
3199                        ident_stack.push(filter_map_ident);
3200                    }
3201
3202                    HydroNode::Sort { .. } => {
3203                        let input_ident = ident_stack.pop().unwrap();
3204
3205                        let sort_ident =
3206                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3207
3208                        match builders_or_callback {
3209                            BuildersOrCallback::Builders(graph_builders) => {
3210                                let builder = graph_builders.get_dfir_mut(&out_location);
3211                                builder.add_dfir(
3212                                    parse_quote! {
3213                                        #sort_ident = #input_ident -> sort();
3214                                    },
3215                                    None,
3216                                    Some(&next_stmt_id.to_string()),
3217                                );
3218                            }
3219                            BuildersOrCallback::Callback(_, node_callback) => {
3220                                node_callback(node, next_stmt_id);
3221                            }
3222                        }
3223
3224                        *next_stmt_id += 1;
3225
3226                        ident_stack.push(sort_ident);
3227                    }
3228
3229                    HydroNode::DeferTick { .. } => {
3230                        let input_ident = ident_stack.pop().unwrap();
3231
3232                        let defer_tick_ident =
3233                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3234
3235                        match builders_or_callback {
3236                            BuildersOrCallback::Builders(graph_builders) => {
3237                                let builder = graph_builders.get_dfir_mut(&out_location);
3238                                builder.add_dfir(
3239                                    parse_quote! {
3240                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3241                                    },
3242                                    None,
3243                                    Some(&next_stmt_id.to_string()),
3244                                );
3245                            }
3246                            BuildersOrCallback::Callback(_, node_callback) => {
3247                                node_callback(node, next_stmt_id);
3248                            }
3249                        }
3250
3251                        *next_stmt_id += 1;
3252
3253                        ident_stack.push(defer_tick_ident);
3254                    }
3255
3256                    HydroNode::Enumerate { input, .. } => {
3257                        let input_ident = ident_stack.pop().unwrap();
3258
3259                        let enumerate_ident =
3260                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3261
3262                        match builders_or_callback {
3263                            BuildersOrCallback::Builders(graph_builders) => {
3264                                let builder = graph_builders.get_dfir_mut(&out_location);
3265                                let lifetime = if input.metadata().location_id.is_top_level() {
3266                                    quote!('static)
3267                                } else {
3268                                    quote!('tick)
3269                                };
3270                                builder.add_dfir(
3271                                    parse_quote! {
3272                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3273                                    },
3274                                    None,
3275                                    Some(&next_stmt_id.to_string()),
3276                                );
3277                            }
3278                            BuildersOrCallback::Callback(_, node_callback) => {
3279                                node_callback(node, next_stmt_id);
3280                            }
3281                        }
3282
3283                        *next_stmt_id += 1;
3284
3285                        ident_stack.push(enumerate_ident);
3286                    }
3287
3288                    HydroNode::Inspect { f, .. } => {
3289                        let input_ident = ident_stack.pop().unwrap();
3290
3291                        let inspect_ident =
3292                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3293
3294                        match builders_or_callback {
3295                            BuildersOrCallback::Builders(graph_builders) => {
3296                                let builder = graph_builders.get_dfir_mut(&out_location);
3297                                builder.add_dfir(
3298                                    parse_quote! {
3299                                        #inspect_ident = #input_ident -> inspect(#f);
3300                                    },
3301                                    None,
3302                                    Some(&next_stmt_id.to_string()),
3303                                );
3304                            }
3305                            BuildersOrCallback::Callback(_, node_callback) => {
3306                                node_callback(node, next_stmt_id);
3307                            }
3308                        }
3309
3310                        *next_stmt_id += 1;
3311
3312                        ident_stack.push(inspect_ident);
3313                    }
3314
3315                    HydroNode::Unique { input, .. } => {
3316                        let input_ident = ident_stack.pop().unwrap();
3317
3318                        let unique_ident =
3319                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3320
3321                        match builders_or_callback {
3322                            BuildersOrCallback::Builders(graph_builders) => {
3323                                let builder = graph_builders.get_dfir_mut(&out_location);
3324                                let lifetime = if input.metadata().location_id.is_top_level() {
3325                                    quote!('static)
3326                                } else {
3327                                    quote!('tick)
3328                                };
3329
3330                                builder.add_dfir(
3331                                    parse_quote! {
3332                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3333                                    },
3334                                    None,
3335                                    Some(&next_stmt_id.to_string()),
3336                                );
3337                            }
3338                            BuildersOrCallback::Callback(_, node_callback) => {
3339                                node_callback(node, next_stmt_id);
3340                            }
3341                        }
3342
3343                        *next_stmt_id += 1;
3344
3345                        ident_stack.push(unique_ident);
3346                    }
3347
3348                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3349                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3350                            if input.metadata().location_id.is_top_level()
3351                                && input.metadata().collection_kind.is_bounded()
3352                            {
3353                                parse_quote!(fold_no_replay)
3354                            } else {
3355                                parse_quote!(fold)
3356                            }
3357                        } else if matches!(node, HydroNode::Scan { .. }) {
3358                            parse_quote!(scan)
3359                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3360                            if input.metadata().location_id.is_top_level()
3361                                && input.metadata().collection_kind.is_bounded()
3362                            {
3363                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3364                            } else {
3365                                parse_quote!(fold_keyed)
3366                            }
3367                        } else {
3368                            unreachable!()
3369                        };
3370
3371                        let (HydroNode::Fold { input, .. }
3372                        | HydroNode::FoldKeyed { input, .. }
3373                        | HydroNode::Scan { input, .. }) = node
3374                        else {
3375                            unreachable!()
3376                        };
3377
3378                        let lifetime = if input.metadata().location_id.is_top_level() {
3379                            quote!('static)
3380                        } else {
3381                            quote!('tick)
3382                        };
3383
3384                        let input_ident = ident_stack.pop().unwrap();
3385
3386                        let (HydroNode::Fold { init, acc, .. }
3387                        | HydroNode::FoldKeyed { init, acc, .. }
3388                        | HydroNode::Scan { init, acc, .. }) = &*node
3389                        else {
3390                            unreachable!()
3391                        };
3392
3393                        let fold_ident =
3394                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3395
3396                        match builders_or_callback {
3397                            BuildersOrCallback::Builders(graph_builders) => {
3398                                if matches!(node, HydroNode::Fold { .. })
3399                                    && node.metadata().location_id.is_top_level()
3400                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3401                                    && graph_builders.singleton_intermediates()
3402                                    && !node.metadata().collection_kind.is_bounded()
3403                                {
3404                                    let builder = graph_builders.get_dfir_mut(&out_location);
3405
3406                                    let acc: syn::Expr = parse_quote!({
3407                                        let mut __inner = #acc;
3408                                        move |__state, __value| {
3409                                            __inner(__state, __value);
3410                                            Some(__state.clone())
3411                                        }
3412                                    });
3413
3414                                    builder.add_dfir(
3415                                        parse_quote! {
3416                                            source_iter([(#init)()]) -> [0]#fold_ident;
3417                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3418                                            #fold_ident = chain();
3419                                        },
3420                                        None,
3421                                        Some(&next_stmt_id.to_string()),
3422                                    );
3423                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3424                                    && node.metadata().location_id.is_top_level()
3425                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3426                                    && graph_builders.singleton_intermediates()
3427                                    && !node.metadata().collection_kind.is_bounded()
3428                                {
3429                                    let builder = graph_builders.get_dfir_mut(&out_location);
3430
3431                                    let acc: syn::Expr = parse_quote!({
3432                                        let mut __init = #init;
3433                                        let mut __inner = #acc;
3434                                        move |__state, __kv: (_, _)| {
3435                                            // TODO(shadaj): we can avoid the clone when the entry exists
3436                                            let __state = __state
3437                                                .entry(::std::clone::Clone::clone(&__kv.0))
3438                                                .or_insert_with(|| (__init)());
3439                                            __inner(__state, __kv.1);
3440                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3441                                        }
3442                                    });
3443
3444                                    builder.add_dfir(
3445                                        parse_quote! {
3446                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3447                                        },
3448                                        None,
3449                                        Some(&next_stmt_id.to_string()),
3450                                    );
3451                                } else {
3452                                    let builder = graph_builders.get_dfir_mut(&out_location);
3453                                    builder.add_dfir(
3454                                        parse_quote! {
3455                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3456                                        },
3457                                        None,
3458                                        Some(&next_stmt_id.to_string()),
3459                                    );
3460                                }
3461                            }
3462                            BuildersOrCallback::Callback(_, node_callback) => {
3463                                node_callback(node, next_stmt_id);
3464                            }
3465                        }
3466
3467                        *next_stmt_id += 1;
3468
3469                        ident_stack.push(fold_ident);
3470                    }
3471
3472                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3473                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3474                            if input.metadata().location_id.is_top_level()
3475                                && input.metadata().collection_kind.is_bounded()
3476                            {
3477                                parse_quote!(reduce_no_replay)
3478                            } else {
3479                                parse_quote!(reduce)
3480                            }
3481                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3482                            if input.metadata().location_id.is_top_level()
3483                                && input.metadata().collection_kind.is_bounded()
3484                            {
3485                                todo!(
3486                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3487                                )
3488                            } else {
3489                                parse_quote!(reduce_keyed)
3490                            }
3491                        } else {
3492                            unreachable!()
3493                        };
3494
3495                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3496                        else {
3497                            unreachable!()
3498                        };
3499
3500                        let lifetime = if input.metadata().location_id.is_top_level() {
3501                            quote!('static)
3502                        } else {
3503                            quote!('tick)
3504                        };
3505
3506                        let input_ident = ident_stack.pop().unwrap();
3507
3508                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3509                        else {
3510                            unreachable!()
3511                        };
3512
3513                        let reduce_ident =
3514                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3515
3516                        match builders_or_callback {
3517                            BuildersOrCallback::Builders(graph_builders) => {
3518                                if matches!(node, HydroNode::Reduce { .. })
3519                                    && node.metadata().location_id.is_top_level()
3520                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3521                                    && graph_builders.singleton_intermediates()
3522                                    && !node.metadata().collection_kind.is_bounded()
3523                                {
3524                                    todo!(
3525                                        "Reduce with optional intermediates is not yet supported in simulator"
3526                                    );
3527                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3528                                    && node.metadata().location_id.is_top_level()
3529                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3530                                    && graph_builders.singleton_intermediates()
3531                                    && !node.metadata().collection_kind.is_bounded()
3532                                {
3533                                    todo!(
3534                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3535                                    );
3536                                } else {
3537                                    let builder = graph_builders.get_dfir_mut(&out_location);
3538                                    builder.add_dfir(
3539                                        parse_quote! {
3540                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3541                                        },
3542                                        None,
3543                                        Some(&next_stmt_id.to_string()),
3544                                    );
3545                                }
3546                            }
3547                            BuildersOrCallback::Callback(_, node_callback) => {
3548                                node_callback(node, next_stmt_id);
3549                            }
3550                        }
3551
3552                        *next_stmt_id += 1;
3553
3554                        ident_stack.push(reduce_ident);
3555                    }
3556
3557                    HydroNode::ReduceKeyedWatermark {
3558                        f,
3559                        input,
3560                        metadata,
3561                        ..
3562                    } => {
3563                        let lifetime = if input.metadata().location_id.is_top_level() {
3564                            quote!('static)
3565                        } else {
3566                            quote!('tick)
3567                        };
3568
3569                        // watermark is processed second, so it's on top
3570                        let watermark_ident = ident_stack.pop().unwrap();
3571                        let input_ident = ident_stack.pop().unwrap();
3572
3573                        let chain_ident = syn::Ident::new(
3574                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3575                            Span::call_site(),
3576                        );
3577
3578                        let fold_ident =
3579                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3580
3581                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3582                            && input.metadata().collection_kind.is_bounded()
3583                        {
3584                            parse_quote!(fold_no_replay)
3585                        } else {
3586                            parse_quote!(fold)
3587                        };
3588
3589                        match builders_or_callback {
3590                            BuildersOrCallback::Builders(graph_builders) => {
3591                                if metadata.location_id.is_top_level()
3592                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3593                                    && graph_builders.singleton_intermediates()
3594                                    && !metadata.collection_kind.is_bounded()
3595                                {
3596                                    todo!(
3597                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3598                                    )
3599                                } else {
3600                                    let builder = graph_builders.get_dfir_mut(&out_location);
3601                                    builder.add_dfir(
3602                                        parse_quote! {
3603                                            #chain_ident = chain();
3604                                            #input_ident
3605                                                -> map(|x| (Some(x), None))
3606                                                -> [0]#chain_ident;
3607                                            #watermark_ident
3608                                                -> map(|watermark| (None, Some(watermark)))
3609                                                -> [1]#chain_ident;
3610
3611                                            #fold_ident = #chain_ident
3612                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3613                                                    let __reduce_keyed_fn = #f;
3614                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3615                                                        if let Some((k, v)) = opt_payload {
3616                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3617                                                                if k < curr_watermark {
3618                                                                    return;
3619                                                                }
3620                                                            }
3621                                                            match map.entry(k) {
3622                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3623                                                                    e.insert(v);
3624                                                                }
3625                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3626                                                                    __reduce_keyed_fn(e.get_mut(), v);
3627                                                                }
3628                                                            }
3629                                                        } else {
3630                                                            let watermark = opt_watermark.unwrap();
3631                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3632                                                                if watermark <= curr_watermark {
3633                                                                    return;
3634                                                                }
3635                                                            }
3636                                                            *opt_curr_watermark = opt_watermark;
3637                                                            map.retain(|k, _| *k >= watermark);
3638                                                        }
3639                                                    }
3640                                                })
3641                                                -> flat_map(|(map, _curr_watermark)| map);
3642                                        },
3643                                        None,
3644                                        Some(&next_stmt_id.to_string()),
3645                                    );
3646                                }
3647                            }
3648                            BuildersOrCallback::Callback(_, node_callback) => {
3649                                node_callback(node, next_stmt_id);
3650                            }
3651                        }
3652
3653                        *next_stmt_id += 1;
3654
3655                        ident_stack.push(fold_ident);
3656                    }
3657
3658                    HydroNode::Network {
3659                        networking_info,
3660                        serialize_fn: serialize_pipeline,
3661                        instantiate_fn,
3662                        deserialize_fn: deserialize_pipeline,
3663                        input,
3664                        ..
3665                    } => {
3666                        let input_ident = ident_stack.pop().unwrap();
3667
3668                        let receiver_stream_ident =
3669                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3670
3671                        match builders_or_callback {
3672                            BuildersOrCallback::Builders(graph_builders) => {
3673                                let (sink_expr, source_expr) = match instantiate_fn {
3674                                    DebugInstantiate::Building => (
3675                                        syn::parse_quote!(DUMMY_SINK),
3676                                        syn::parse_quote!(DUMMY_SOURCE),
3677                                    ),
3678
3679                                    DebugInstantiate::Finalized(finalized) => {
3680                                        (finalized.sink.clone(), finalized.source.clone())
3681                                    }
3682                                };
3683
3684                                graph_builders.create_network(
3685                                    &input.metadata().location_id,
3686                                    &out_location,
3687                                    input_ident,
3688                                    &receiver_stream_ident,
3689                                    serialize_pipeline.as_ref(),
3690                                    sink_expr,
3691                                    source_expr,
3692                                    deserialize_pipeline.as_ref(),
3693                                    *next_stmt_id,
3694                                    networking_info,
3695                                );
3696                            }
3697                            BuildersOrCallback::Callback(_, node_callback) => {
3698                                node_callback(node, next_stmt_id);
3699                            }
3700                        }
3701
3702                        *next_stmt_id += 1;
3703
3704                        ident_stack.push(receiver_stream_ident);
3705                    }
3706
3707                    HydroNode::ExternalInput {
3708                        instantiate_fn,
3709                        deserialize_fn: deserialize_pipeline,
3710                        ..
3711                    } => {
3712                        let receiver_stream_ident =
3713                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3714
3715                        match builders_or_callback {
3716                            BuildersOrCallback::Builders(graph_builders) => {
3717                                let (_, source_expr) = match instantiate_fn {
3718                                    DebugInstantiate::Building => (
3719                                        syn::parse_quote!(DUMMY_SINK),
3720                                        syn::parse_quote!(DUMMY_SOURCE),
3721                                    ),
3722
3723                                    DebugInstantiate::Finalized(finalized) => {
3724                                        (finalized.sink.clone(), finalized.source.clone())
3725                                    }
3726                                };
3727
3728                                graph_builders.create_external_source(
3729                                    &out_location,
3730                                    source_expr,
3731                                    &receiver_stream_ident,
3732                                    deserialize_pipeline.as_ref(),
3733                                    *next_stmt_id,
3734                                );
3735                            }
3736                            BuildersOrCallback::Callback(_, node_callback) => {
3737                                node_callback(node, next_stmt_id);
3738                            }
3739                        }
3740
3741                        *next_stmt_id += 1;
3742
3743                        ident_stack.push(receiver_stream_ident);
3744                    }
3745
3746                    HydroNode::Counter {
3747                        tag,
3748                        duration,
3749                        prefix,
3750                        ..
3751                    } => {
3752                        let input_ident = ident_stack.pop().unwrap();
3753
3754                        let counter_ident =
3755                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3756
3757                        match builders_or_callback {
3758                            BuildersOrCallback::Builders(graph_builders) => {
3759                                let arg = format!("{}({})", prefix, tag);
3760                                let builder = graph_builders.get_dfir_mut(&out_location);
3761                                builder.add_dfir(
3762                                    parse_quote! {
3763                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
3764                                    },
3765                                    None,
3766                                    Some(&next_stmt_id.to_string()),
3767                                );
3768                            }
3769                            BuildersOrCallback::Callback(_, node_callback) => {
3770                                node_callback(node, next_stmt_id);
3771                            }
3772                        }
3773
3774                        *next_stmt_id += 1;
3775
3776                        ident_stack.push(counter_ident);
3777                    }
3778                }
3779            },
3780            seen_tees,
3781            false,
3782        );
3783
3784        ident_stack
3785            .pop()
3786            .expect("ident_stack should have exactly one element after traversal")
3787    }
3788
3789    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3790        match self {
3791            HydroNode::Placeholder => {
3792                panic!()
3793            }
3794            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3795            HydroNode::Source { source, .. } => match source {
3796                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3797                HydroSource::ExternalNetwork()
3798                | HydroSource::Spin()
3799                | HydroSource::ClusterMembers(_, _)
3800                | HydroSource::Embedded(_)
3801                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
3802            },
3803            HydroNode::SingletonSource { value, .. } => {
3804                transform(value);
3805            }
3806            HydroNode::CycleSource { .. }
3807            | HydroNode::Tee { .. }
3808            | HydroNode::YieldConcat { .. }
3809            | HydroNode::BeginAtomic { .. }
3810            | HydroNode::EndAtomic { .. }
3811            | HydroNode::Batch { .. }
3812            | HydroNode::Chain { .. }
3813            | HydroNode::ChainFirst { .. }
3814            | HydroNode::CrossProduct { .. }
3815            | HydroNode::CrossSingleton { .. }
3816            | HydroNode::ResolveFutures { .. }
3817            | HydroNode::ResolveFuturesOrdered { .. }
3818            | HydroNode::Join { .. }
3819            | HydroNode::Difference { .. }
3820            | HydroNode::AntiJoin { .. }
3821            | HydroNode::DeferTick { .. }
3822            | HydroNode::Enumerate { .. }
3823            | HydroNode::Unique { .. }
3824            | HydroNode::Sort { .. } => {}
3825            HydroNode::Map { f, .. }
3826            | HydroNode::FlatMap { f, .. }
3827            | HydroNode::Filter { f, .. }
3828            | HydroNode::FilterMap { f, .. }
3829            | HydroNode::Inspect { f, .. }
3830            | HydroNode::Partition { f, .. }
3831            | HydroNode::Reduce { f, .. }
3832            | HydroNode::ReduceKeyed { f, .. }
3833            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3834                transform(f);
3835            }
3836            HydroNode::Fold { init, acc, .. }
3837            | HydroNode::Scan { init, acc, .. }
3838            | HydroNode::FoldKeyed { init, acc, .. } => {
3839                transform(init);
3840                transform(acc);
3841            }
3842            HydroNode::Network {
3843                serialize_fn,
3844                deserialize_fn,
3845                ..
3846            } => {
3847                if let Some(serialize_fn) = serialize_fn {
3848                    transform(serialize_fn);
3849                }
3850                if let Some(deserialize_fn) = deserialize_fn {
3851                    transform(deserialize_fn);
3852                }
3853            }
3854            HydroNode::ExternalInput { deserialize_fn, .. } => {
3855                if let Some(deserialize_fn) = deserialize_fn {
3856                    transform(deserialize_fn);
3857                }
3858            }
3859            HydroNode::Counter { duration, .. } => {
3860                transform(duration);
3861            }
3862        }
3863    }
3864
3865    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3866        &self.metadata().op
3867    }
3868
3869    pub fn metadata(&self) -> &HydroIrMetadata {
3870        match self {
3871            HydroNode::Placeholder => {
3872                panic!()
3873            }
3874            HydroNode::Cast { metadata, .. } => metadata,
3875            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3876            HydroNode::Source { metadata, .. } => metadata,
3877            HydroNode::SingletonSource { metadata, .. } => metadata,
3878            HydroNode::CycleSource { metadata, .. } => metadata,
3879            HydroNode::Tee { metadata, .. } => metadata,
3880            HydroNode::Partition { metadata, .. } => metadata,
3881            HydroNode::YieldConcat { metadata, .. } => metadata,
3882            HydroNode::BeginAtomic { metadata, .. } => metadata,
3883            HydroNode::EndAtomic { metadata, .. } => metadata,
3884            HydroNode::Batch { metadata, .. } => metadata,
3885            HydroNode::Chain { metadata, .. } => metadata,
3886            HydroNode::ChainFirst { metadata, .. } => metadata,
3887            HydroNode::CrossProduct { metadata, .. } => metadata,
3888            HydroNode::CrossSingleton { metadata, .. } => metadata,
3889            HydroNode::Join { metadata, .. } => metadata,
3890            HydroNode::Difference { metadata, .. } => metadata,
3891            HydroNode::AntiJoin { metadata, .. } => metadata,
3892            HydroNode::ResolveFutures { metadata, .. } => metadata,
3893            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3894            HydroNode::Map { metadata, .. } => metadata,
3895            HydroNode::FlatMap { metadata, .. } => metadata,
3896            HydroNode::Filter { metadata, .. } => metadata,
3897            HydroNode::FilterMap { metadata, .. } => metadata,
3898            HydroNode::DeferTick { metadata, .. } => metadata,
3899            HydroNode::Enumerate { metadata, .. } => metadata,
3900            HydroNode::Inspect { metadata, .. } => metadata,
3901            HydroNode::Unique { metadata, .. } => metadata,
3902            HydroNode::Sort { metadata, .. } => metadata,
3903            HydroNode::Scan { metadata, .. } => metadata,
3904            HydroNode::Fold { metadata, .. } => metadata,
3905            HydroNode::FoldKeyed { metadata, .. } => metadata,
3906            HydroNode::Reduce { metadata, .. } => metadata,
3907            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3908            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3909            HydroNode::ExternalInput { metadata, .. } => metadata,
3910            HydroNode::Network { metadata, .. } => metadata,
3911            HydroNode::Counter { metadata, .. } => metadata,
3912        }
3913    }
3914
3915    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3916        &mut self.metadata_mut().op
3917    }
3918
3919    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3920        match self {
3921            HydroNode::Placeholder => {
3922                panic!()
3923            }
3924            HydroNode::Cast { metadata, .. } => metadata,
3925            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3926            HydroNode::Source { metadata, .. } => metadata,
3927            HydroNode::SingletonSource { metadata, .. } => metadata,
3928            HydroNode::CycleSource { metadata, .. } => metadata,
3929            HydroNode::Tee { metadata, .. } => metadata,
3930            HydroNode::Partition { metadata, .. } => metadata,
3931            HydroNode::YieldConcat { metadata, .. } => metadata,
3932            HydroNode::BeginAtomic { metadata, .. } => metadata,
3933            HydroNode::EndAtomic { metadata, .. } => metadata,
3934            HydroNode::Batch { metadata, .. } => metadata,
3935            HydroNode::Chain { metadata, .. } => metadata,
3936            HydroNode::ChainFirst { metadata, .. } => metadata,
3937            HydroNode::CrossProduct { metadata, .. } => metadata,
3938            HydroNode::CrossSingleton { metadata, .. } => metadata,
3939            HydroNode::Join { metadata, .. } => metadata,
3940            HydroNode::Difference { metadata, .. } => metadata,
3941            HydroNode::AntiJoin { metadata, .. } => metadata,
3942            HydroNode::ResolveFutures { metadata, .. } => metadata,
3943            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3944            HydroNode::Map { metadata, .. } => metadata,
3945            HydroNode::FlatMap { metadata, .. } => metadata,
3946            HydroNode::Filter { metadata, .. } => metadata,
3947            HydroNode::FilterMap { metadata, .. } => metadata,
3948            HydroNode::DeferTick { metadata, .. } => metadata,
3949            HydroNode::Enumerate { metadata, .. } => metadata,
3950            HydroNode::Inspect { metadata, .. } => metadata,
3951            HydroNode::Unique { metadata, .. } => metadata,
3952            HydroNode::Sort { metadata, .. } => metadata,
3953            HydroNode::Scan { metadata, .. } => metadata,
3954            HydroNode::Fold { metadata, .. } => metadata,
3955            HydroNode::FoldKeyed { metadata, .. } => metadata,
3956            HydroNode::Reduce { metadata, .. } => metadata,
3957            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3958            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3959            HydroNode::ExternalInput { metadata, .. } => metadata,
3960            HydroNode::Network { metadata, .. } => metadata,
3961            HydroNode::Counter { metadata, .. } => metadata,
3962        }
3963    }
3964
3965    pub fn input(&self) -> Vec<&HydroNode> {
3966        match self {
3967            HydroNode::Placeholder => {
3968                panic!()
3969            }
3970            HydroNode::Source { .. }
3971            | HydroNode::SingletonSource { .. }
3972            | HydroNode::ExternalInput { .. }
3973            | HydroNode::CycleSource { .. }
3974            | HydroNode::Tee { .. }
3975            | HydroNode::Partition { .. } => {
3976                // Tee/Partition should find their input in separate special ways
3977                vec![]
3978            }
3979            HydroNode::Cast { inner, .. }
3980            | HydroNode::ObserveNonDet { inner, .. }
3981            | HydroNode::YieldConcat { inner, .. }
3982            | HydroNode::BeginAtomic { inner, .. }
3983            | HydroNode::EndAtomic { inner, .. }
3984            | HydroNode::Batch { inner, .. } => {
3985                vec![inner]
3986            }
3987            HydroNode::Chain { first, second, .. } => {
3988                vec![first, second]
3989            }
3990            HydroNode::ChainFirst { first, second, .. } => {
3991                vec![first, second]
3992            }
3993            HydroNode::CrossProduct { left, right, .. }
3994            | HydroNode::CrossSingleton { left, right, .. }
3995            | HydroNode::Join { left, right, .. } => {
3996                vec![left, right]
3997            }
3998            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3999                vec![pos, neg]
4000            }
4001            HydroNode::Map { input, .. }
4002            | HydroNode::FlatMap { input, .. }
4003            | HydroNode::Filter { input, .. }
4004            | HydroNode::FilterMap { input, .. }
4005            | HydroNode::Sort { input, .. }
4006            | HydroNode::DeferTick { input, .. }
4007            | HydroNode::Enumerate { input, .. }
4008            | HydroNode::Inspect { input, .. }
4009            | HydroNode::Unique { input, .. }
4010            | HydroNode::Network { input, .. }
4011            | HydroNode::Counter { input, .. }
4012            | HydroNode::ResolveFutures { input, .. }
4013            | HydroNode::ResolveFuturesOrdered { input, .. }
4014            | HydroNode::Fold { input, .. }
4015            | HydroNode::FoldKeyed { input, .. }
4016            | HydroNode::Reduce { input, .. }
4017            | HydroNode::ReduceKeyed { input, .. }
4018            | HydroNode::Scan { input, .. } => {
4019                vec![input]
4020            }
4021            HydroNode::ReduceKeyedWatermark {
4022                input, watermark, ..
4023            } => {
4024                vec![input, watermark]
4025            }
4026        }
4027    }
4028
4029    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4030        self.input()
4031            .iter()
4032            .map(|input_node| input_node.metadata())
4033            .collect()
4034    }
4035
4036    /// Returns `true` if this node is a Tee or Partition whose inner Rc
4037    /// has other live references, meaning the upstream is already driven
4038    /// by another consumer and does not need a Null sink.
4039    pub fn is_shared_with_others(&self) -> bool {
4040        match self {
4041            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4042                Rc::strong_count(&inner.0) > 1
4043            }
4044            _ => false,
4045        }
4046    }
4047
4048    pub fn print_root(&self) -> String {
4049        match self {
4050            HydroNode::Placeholder => {
4051                panic!()
4052            }
4053            HydroNode::Cast { .. } => "Cast()".to_owned(),
4054            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4055            HydroNode::Source { source, .. } => format!("Source({:?})", source),
4056            HydroNode::SingletonSource {
4057                value,
4058                first_tick_only,
4059                ..
4060            } => format!(
4061                "SingletonSource({:?}, first_tick_only={})",
4062                value, first_tick_only
4063            ),
4064            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4065            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4066            HydroNode::Partition { f, is_true, .. } => {
4067                format!("Partition({:?}, is_true={})", f, is_true)
4068            }
4069            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4070            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4071            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4072            HydroNode::Batch { .. } => "Batch()".to_owned(),
4073            HydroNode::Chain { first, second, .. } => {
4074                format!("Chain({}, {})", first.print_root(), second.print_root())
4075            }
4076            HydroNode::ChainFirst { first, second, .. } => {
4077                format!(
4078                    "ChainFirst({}, {})",
4079                    first.print_root(),
4080                    second.print_root()
4081                )
4082            }
4083            HydroNode::CrossProduct { left, right, .. } => {
4084                format!(
4085                    "CrossProduct({}, {})",
4086                    left.print_root(),
4087                    right.print_root()
4088                )
4089            }
4090            HydroNode::CrossSingleton { left, right, .. } => {
4091                format!(
4092                    "CrossSingleton({}, {})",
4093                    left.print_root(),
4094                    right.print_root()
4095                )
4096            }
4097            HydroNode::Join { left, right, .. } => {
4098                format!("Join({}, {})", left.print_root(), right.print_root())
4099            }
4100            HydroNode::Difference { pos, neg, .. } => {
4101                format!("Difference({}, {})", pos.print_root(), neg.print_root())
4102            }
4103            HydroNode::AntiJoin { pos, neg, .. } => {
4104                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4105            }
4106            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4107            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4108            HydroNode::Map { f, .. } => format!("Map({:?})", f),
4109            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4110            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4111            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4112            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4113            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4114            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4115            HydroNode::Unique { .. } => "Unique()".to_owned(),
4116            HydroNode::Sort { .. } => "Sort()".to_owned(),
4117            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4118            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4119            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4120            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4121            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4122            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4123            HydroNode::Network { .. } => "Network()".to_owned(),
4124            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4125            HydroNode::Counter { tag, duration, .. } => {
4126                format!("Counter({:?}, {:?})", tag, duration)
4127            }
4128        }
4129    }
4130}
4131
4132#[cfg(feature = "build")]
4133fn instantiate_network<'a, D>(
4134    env: &mut D::InstantiateEnv,
4135    from_location: &LocationId,
4136    to_location: &LocationId,
4137    processes: &SparseSecondaryMap<LocationKey, D::Process>,
4138    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4139    name: Option<&str>,
4140    networking_info: &crate::networking::NetworkingInfo,
4141) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4142where
4143    D: Deploy<'a>,
4144{
4145    let ((sink, source), connect_fn) = match (from_location, to_location) {
4146        (&LocationId::Process(from), &LocationId::Process(to)) => {
4147            let from_node = processes
4148                .get(from)
4149                .unwrap_or_else(|| {
4150                    panic!("A process used in the graph was not instantiated: {}", from)
4151                })
4152                .clone();
4153            let to_node = processes
4154                .get(to)
4155                .unwrap_or_else(|| {
4156                    panic!("A process used in the graph was not instantiated: {}", to)
4157                })
4158                .clone();
4159
4160            let sink_port = from_node.next_port();
4161            let source_port = to_node.next_port();
4162
4163            (
4164                D::o2o_sink_source(
4165                    env,
4166                    &from_node,
4167                    &sink_port,
4168                    &to_node,
4169                    &source_port,
4170                    name,
4171                    networking_info,
4172                ),
4173                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4174            )
4175        }
4176        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4177            let from_node = processes
4178                .get(from)
4179                .unwrap_or_else(|| {
4180                    panic!("A process used in the graph was not instantiated: {}", from)
4181                })
4182                .clone();
4183            let to_node = clusters
4184                .get(to)
4185                .unwrap_or_else(|| {
4186                    panic!("A cluster used in the graph was not instantiated: {}", to)
4187                })
4188                .clone();
4189
4190            let sink_port = from_node.next_port();
4191            let source_port = to_node.next_port();
4192
4193            (
4194                D::o2m_sink_source(
4195                    env,
4196                    &from_node,
4197                    &sink_port,
4198                    &to_node,
4199                    &source_port,
4200                    name,
4201                    networking_info,
4202                ),
4203                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4204            )
4205        }
4206        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4207            let from_node = clusters
4208                .get(from)
4209                .unwrap_or_else(|| {
4210                    panic!("A cluster used in the graph was not instantiated: {}", from)
4211                })
4212                .clone();
4213            let to_node = processes
4214                .get(to)
4215                .unwrap_or_else(|| {
4216                    panic!("A process used in the graph was not instantiated: {}", to)
4217                })
4218                .clone();
4219
4220            let sink_port = from_node.next_port();
4221            let source_port = to_node.next_port();
4222
4223            (
4224                D::m2o_sink_source(
4225                    env,
4226                    &from_node,
4227                    &sink_port,
4228                    &to_node,
4229                    &source_port,
4230                    name,
4231                    networking_info,
4232                ),
4233                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4234            )
4235        }
4236        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4237            let from_node = clusters
4238                .get(from)
4239                .unwrap_or_else(|| {
4240                    panic!("A cluster used in the graph was not instantiated: {}", from)
4241                })
4242                .clone();
4243            let to_node = clusters
4244                .get(to)
4245                .unwrap_or_else(|| {
4246                    panic!("A cluster used in the graph was not instantiated: {}", to)
4247                })
4248                .clone();
4249
4250            let sink_port = from_node.next_port();
4251            let source_port = to_node.next_port();
4252
4253            (
4254                D::m2m_sink_source(
4255                    env,
4256                    &from_node,
4257                    &sink_port,
4258                    &to_node,
4259                    &source_port,
4260                    name,
4261                    networking_info,
4262                ),
4263                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4264            )
4265        }
4266        (LocationId::Tick(_, _), _) => panic!(),
4267        (_, LocationId::Tick(_, _)) => panic!(),
4268        (LocationId::Atomic(_), _) => panic!(),
4269        (_, LocationId::Atomic(_)) => panic!(),
4270    };
4271    (sink, source, connect_fn)
4272}
4273
4274#[cfg(test)]
4275mod test {
4276    use std::mem::size_of;
4277
4278    use stageleft::{QuotedWithContext, q};
4279
4280    use super::*;
4281
4282    #[test]
4283    #[cfg_attr(
4284        not(feature = "build"),
4285        ignore = "expects inclusion of feature-gated fields"
4286    )]
4287    fn hydro_node_size() {
4288        assert_eq!(size_of::<HydroNode>(), 248);
4289    }
4290
4291    #[test]
4292    #[cfg_attr(
4293        not(feature = "build"),
4294        ignore = "expects inclusion of feature-gated fields"
4295    )]
4296    fn hydro_root_size() {
4297        assert_eq!(size_of::<HydroRoot>(), 136);
4298    }
4299
4300    #[test]
4301    fn test_simplify_q_macro_basic() {
4302        // Test basic non-q! expression
4303        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4304        let result = simplify_q_macro(simple_expr.clone());
4305        assert_eq!(result, simple_expr);
4306    }
4307
4308    #[test]
4309    fn test_simplify_q_macro_actual_stageleft_call() {
4310        // Test a simplified version of what a real stageleft call might look like
4311        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4312        let result = simplify_q_macro(stageleft_call);
4313        // This should be processed by our visitor and simplified to q!(...)
4314        // since we detect the stageleft::runtime_support::fn_* pattern
4315        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4316    }
4317
4318    #[test]
4319    fn test_closure_no_pipe_at_start() {
4320        // Test a closure that does not start with a pipe
4321        let stageleft_call = q!({
4322            let foo = 123;
4323            move |b: usize| b + foo
4324        })
4325        .splice_fn1_ctx(&());
4326        let result = simplify_q_macro(stageleft_call);
4327        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4328    }
4329}