Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::{Debug, Display};
5use std::hash::{Hash, Hasher};
6use std::ops::Deref;
7use std::rc::Rc;
8
9#[cfg(feature = "build")]
10use dfir_lang::graph::FlatGraphBuilder;
11#[cfg(feature = "build")]
12use proc_macro2::Span;
13use proc_macro2::TokenStream;
14use quote::ToTokens;
15#[cfg(feature = "build")]
16use quote::quote;
17#[cfg(feature = "build")]
18use slotmap::{SecondaryMap, SparseSecondaryMap};
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::ExternalPortId;
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::dynamic::LocationId;
28use crate::location::{LocationKey, NetworkHint};
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33/// Wrapper that displays only the tokens of a parsed expr.
34///
35/// Boxes `syn::Type` which is ~240 bytes.
36#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40    fn from(expr: syn::Expr) -> Self {
41        Self(Box::new(expr))
42    }
43}
44
45impl Deref for DebugExpr {
46    type Target = syn::Expr;
47
48    fn deref(&self) -> &Self::Target {
49        &self.0
50    }
51}
52
53impl ToTokens for DebugExpr {
54    fn to_tokens(&self, tokens: &mut TokenStream) {
55        self.0.to_tokens(tokens);
56    }
57}
58
59impl Debug for DebugExpr {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, "{}", self.0.to_token_stream())
62    }
63}
64
65impl Display for DebugExpr {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        let original = self.0.as_ref().clone();
68        let simplified = simplify_q_macro(original);
69
70        // For now, just use quote formatting without trying to parse as a statement
71        // This avoids the syn::parse_quote! issues entirely
72        write!(f, "q!({})", quote::quote!(#simplified))
73    }
74}
75
76/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
77fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78    // Try to parse the token string as a syn::Expr
79    // Use a visitor to simplify q! macro expansions
80    let mut simplifier = QMacroSimplifier::new();
81    simplifier.visit_expr_mut(&mut expr);
82
83    // If we found and simplified a q! macro, return the simplified version
84    if let Some(simplified) = simplifier.simplified_result {
85        simplified
86    } else {
87        expr
88    }
89}
90
91/// AST visitor that simplifies q! macro expansions
92#[derive(Default)]
93pub struct QMacroSimplifier {
94    pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98    pub fn new() -> Self {
99        Self::default()
100    }
101}
102
103impl VisitMut for QMacroSimplifier {
104    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105        // Check if we already found a result to avoid further processing
106        if self.simplified_result.is_some() {
107            return;
108        }
109
110        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111            // Look for calls to stageleft::runtime_support::fn*
112            && self.is_stageleft_runtime_support_call(&path_expr.path)
113            // Try to extract the closure from the arguments
114            && let Some(closure) = self.extract_closure_from_args(&call.args)
115        {
116            self.simplified_result = Some(closure);
117            return;
118        }
119
120        // Continue visiting child expressions using the default implementation
121        // Use the default visitor to avoid infinite recursion
122        syn::visit_mut::visit_expr_mut(self, expr);
123    }
124}
125
126impl QMacroSimplifier {
127    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128        // Check if this is a call to stageleft::runtime_support::fn*
129        if let Some(last_segment) = path.segments.last() {
130            let fn_name = last_segment.ident.to_string();
131            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
132            fn_name.contains("_type_hint")
133                && path.segments.len() > 2
134                && path.segments[0].ident == "stageleft"
135                && path.segments[1].ident == "runtime_support"
136        } else {
137            false
138        }
139    }
140
141    fn extract_closure_from_args(
142        &self,
143        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144    ) -> Option<syn::Expr> {
145        // Look through the arguments for a closure expression
146        for arg in args {
147            if let syn::Expr::Closure(_) = arg {
148                return Some(arg.clone());
149            }
150            // Also check for closures nested in other expressions (like blocks)
151            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152                return Some(closure_expr);
153            }
154        }
155        None
156    }
157
158    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159        let mut visitor = ClosureFinder {
160            found_closure: None,
161            prefer_inner_blocks: true,
162        };
163        visitor.visit_expr(expr);
164        visitor.found_closure
165    }
166}
167
168/// Visitor that finds closures in expressions with special block handling
169struct ClosureFinder {
170    found_closure: Option<syn::Expr>,
171    prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176        // If we already found a closure, don't continue searching
177        if self.found_closure.is_some() {
178            return;
179        }
180
181        match expr {
182            syn::Expr::Closure(_) => {
183                self.found_closure = Some(expr.clone());
184            }
185            syn::Expr::Block(block) if self.prefer_inner_blocks => {
186                // Special handling for blocks - look for inner blocks that contain closures
187                for stmt in &block.block.stmts {
188                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
189                        && let syn::Expr::Block(_) = stmt_expr
190                    {
191                        // Check if this nested block contains a closure
192                        let mut inner_visitor = ClosureFinder {
193                            found_closure: None,
194                            prefer_inner_blocks: false, // Avoid infinite recursion
195                        };
196                        inner_visitor.visit_expr(stmt_expr);
197                        if inner_visitor.found_closure.is_some() {
198                            // Found a closure in an inner block, return that block
199                            self.found_closure = Some(stmt_expr.clone());
200                            return;
201                        }
202                    }
203                }
204
205                // If no inner block with closure found, continue with normal visitation
206                visit::visit_expr(self, expr);
207
208                // If we found a closure, just return the closure itself, not the whole block
209                // unless we're in the special case where we want the containing block
210                if self.found_closure.is_some() {
211                    // The closure was found during visitation, no need to wrap in block
212                }
213            }
214            _ => {
215                // Use default visitor behavior for all other expressions
216                visit::visit_expr(self, expr);
217            }
218        }
219    }
220}
221
222/// Debug displays the type's tokens.
223///
224/// Boxes `syn::Type` which is ~320 bytes.
225#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229    fn from(t: syn::Type) -> Self {
230        Self(Box::new(t))
231    }
232}
233
234impl Deref for DebugType {
235    type Target = syn::Type;
236
237    fn deref(&self) -> &Self::Target {
238        &self.0
239    }
240}
241
242impl ToTokens for DebugType {
243    fn to_tokens(&self, tokens: &mut TokenStream) {
244        self.0.to_tokens(tokens);
245    }
246}
247
248impl Debug for DebugType {
249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250        write!(f, "{}", self.0.to_token_stream())
251    }
252}
253
254pub enum DebugInstantiate {
255    Building,
256    Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260    not(feature = "build"),
261    expect(
262        dead_code,
263        reason = "sink, source unused without `feature = \"build\"`."
264    )
265)]
266pub struct DebugInstantiateFinalized {
267    sink: syn::Expr,
268    source: syn::Expr,
269    connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273    fn from(f: DebugInstantiateFinalized) -> Self {
274        Self::Finalized(Box::new(f))
275    }
276}
277
278impl Debug for DebugInstantiate {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        write!(f, "<network instantiate>")
281    }
282}
283
284impl Hash for DebugInstantiate {
285    fn hash<H: Hasher>(&self, _state: &mut H) {
286        // Do nothing
287    }
288}
289
290impl Clone for DebugInstantiate {
291    fn clone(&self) -> Self {
292        match self {
293            DebugInstantiate::Building => DebugInstantiate::Building,
294            DebugInstantiate::Finalized(_) => {
295                panic!("DebugInstantiate::Finalized should not be cloned")
296            }
297        }
298    }
299}
300
301/// A source in a Hydro graph, where data enters the graph.
302#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304    Stream(DebugExpr),
305    ExternalNetwork(),
306    Iter(DebugExpr),
307    Spin(),
308    ClusterMembers(LocationId),
309}
310
311#[cfg(feature = "build")]
312/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
313/// and simulations.
314///
315/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
316/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
317pub trait DfirBuilder {
318    /// Whether the representation of singletons should include intermediate states.
319    fn singleton_intermediates(&self) -> bool;
320
321    /// Gets the DFIR builder for the given location, creating it if necessary.
322    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
323
324    fn batch(
325        &mut self,
326        in_ident: syn::Ident,
327        in_location: &LocationId,
328        in_kind: &CollectionKind,
329        out_ident: &syn::Ident,
330        out_location: &LocationId,
331        op_meta: &HydroIrOpMetadata,
332    );
333    fn yield_from_tick(
334        &mut self,
335        in_ident: syn::Ident,
336        in_location: &LocationId,
337        in_kind: &CollectionKind,
338        out_ident: &syn::Ident,
339        out_location: &LocationId,
340    );
341
342    fn begin_atomic(
343        &mut self,
344        in_ident: syn::Ident,
345        in_location: &LocationId,
346        in_kind: &CollectionKind,
347        out_ident: &syn::Ident,
348        out_location: &LocationId,
349        op_meta: &HydroIrOpMetadata,
350    );
351    fn end_atomic(
352        &mut self,
353        in_ident: syn::Ident,
354        in_location: &LocationId,
355        in_kind: &CollectionKind,
356        out_ident: &syn::Ident,
357    );
358
359    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
360    fn observe_nondet(
361        &mut self,
362        trusted: bool,
363        location: &LocationId,
364        in_ident: syn::Ident,
365        in_kind: &CollectionKind,
366        out_ident: &syn::Ident,
367        out_kind: &CollectionKind,
368        op_meta: &HydroIrOpMetadata,
369    );
370
371    #[expect(clippy::too_many_arguments, reason = "TODO")]
372    fn create_network(
373        &mut self,
374        from: &LocationId,
375        to: &LocationId,
376        input_ident: syn::Ident,
377        out_ident: &syn::Ident,
378        serialize: Option<&DebugExpr>,
379        sink: syn::Expr,
380        source: syn::Expr,
381        deserialize: Option<&DebugExpr>,
382        tag_id: usize,
383    );
384
385    fn create_external_source(
386        &mut self,
387        on: &LocationId,
388        source_expr: syn::Expr,
389        out_ident: &syn::Ident,
390        deserialize: Option<&DebugExpr>,
391        tag_id: usize,
392    );
393
394    fn create_external_output(
395        &mut self,
396        on: &LocationId,
397        sink_expr: syn::Expr,
398        input_ident: &syn::Ident,
399        serialize: Option<&DebugExpr>,
400        tag_id: usize,
401    );
402}
403
404#[cfg(feature = "build")]
405impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
406    fn singleton_intermediates(&self) -> bool {
407        false
408    }
409
410    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
411        self.entry(location.root().key())
412            .expect("location was removed")
413            .or_default()
414    }
415
416    fn batch(
417        &mut self,
418        in_ident: syn::Ident,
419        in_location: &LocationId,
420        in_kind: &CollectionKind,
421        out_ident: &syn::Ident,
422        _out_location: &LocationId,
423        _op_meta: &HydroIrOpMetadata,
424    ) {
425        let builder = self.get_dfir_mut(in_location.root());
426        if in_kind.is_bounded()
427            && matches!(
428                in_kind,
429                CollectionKind::Singleton { .. }
430                    | CollectionKind::Optional { .. }
431                    | CollectionKind::KeyedSingleton { .. }
432            )
433        {
434            assert!(in_location.is_top_level());
435            builder.add_dfir(
436                parse_quote! {
437                    #out_ident = #in_ident -> persist::<'static>();
438                },
439                None,
440                None,
441            );
442        } else {
443            builder.add_dfir(
444                parse_quote! {
445                    #out_ident = #in_ident;
446                },
447                None,
448                None,
449            );
450        }
451    }
452
453    fn yield_from_tick(
454        &mut self,
455        in_ident: syn::Ident,
456        in_location: &LocationId,
457        _in_kind: &CollectionKind,
458        out_ident: &syn::Ident,
459        _out_location: &LocationId,
460    ) {
461        let builder = self.get_dfir_mut(in_location.root());
462        builder.add_dfir(
463            parse_quote! {
464                #out_ident = #in_ident;
465            },
466            None,
467            None,
468        );
469    }
470
471    fn begin_atomic(
472        &mut self,
473        in_ident: syn::Ident,
474        in_location: &LocationId,
475        _in_kind: &CollectionKind,
476        out_ident: &syn::Ident,
477        _out_location: &LocationId,
478        _op_meta: &HydroIrOpMetadata,
479    ) {
480        let builder = self.get_dfir_mut(in_location.root());
481        builder.add_dfir(
482            parse_quote! {
483                #out_ident = #in_ident;
484            },
485            None,
486            None,
487        );
488    }
489
490    fn end_atomic(
491        &mut self,
492        in_ident: syn::Ident,
493        in_location: &LocationId,
494        _in_kind: &CollectionKind,
495        out_ident: &syn::Ident,
496    ) {
497        let builder = self.get_dfir_mut(in_location.root());
498        builder.add_dfir(
499            parse_quote! {
500                #out_ident = #in_ident;
501            },
502            None,
503            None,
504        );
505    }
506
507    fn observe_nondet(
508        &mut self,
509        _trusted: bool,
510        location: &LocationId,
511        in_ident: syn::Ident,
512        _in_kind: &CollectionKind,
513        out_ident: &syn::Ident,
514        _out_kind: &CollectionKind,
515        _op_meta: &HydroIrOpMetadata,
516    ) {
517        let builder = self.get_dfir_mut(location);
518        builder.add_dfir(
519            parse_quote! {
520                #out_ident = #in_ident;
521            },
522            None,
523            None,
524        );
525    }
526
527    fn create_network(
528        &mut self,
529        from: &LocationId,
530        to: &LocationId,
531        input_ident: syn::Ident,
532        out_ident: &syn::Ident,
533        serialize: Option<&DebugExpr>,
534        sink: syn::Expr,
535        source: syn::Expr,
536        deserialize: Option<&DebugExpr>,
537        tag_id: usize,
538    ) {
539        let sender_builder = self.get_dfir_mut(from);
540        if let Some(serialize_pipeline) = serialize {
541            sender_builder.add_dfir(
542                parse_quote! {
543                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
544                },
545                None,
546                // operator tag separates send and receive, which otherwise have the same next_stmt_id
547                Some(&format!("send{}", tag_id)),
548            );
549        } else {
550            sender_builder.add_dfir(
551                parse_quote! {
552                    #input_ident -> dest_sink(#sink);
553                },
554                None,
555                Some(&format!("send{}", tag_id)),
556            );
557        }
558
559        let receiver_builder = self.get_dfir_mut(to);
560        if let Some(deserialize_pipeline) = deserialize {
561            receiver_builder.add_dfir(
562                parse_quote! {
563                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
564                },
565                None,
566                Some(&format!("recv{}", tag_id)),
567            );
568        } else {
569            receiver_builder.add_dfir(
570                parse_quote! {
571                    #out_ident = source_stream(#source);
572                },
573                None,
574                Some(&format!("recv{}", tag_id)),
575            );
576        }
577    }
578
579    fn create_external_source(
580        &mut self,
581        on: &LocationId,
582        source_expr: syn::Expr,
583        out_ident: &syn::Ident,
584        deserialize: Option<&DebugExpr>,
585        tag_id: usize,
586    ) {
587        let receiver_builder = self.get_dfir_mut(on);
588        if let Some(deserialize_pipeline) = deserialize {
589            receiver_builder.add_dfir(
590                parse_quote! {
591                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
592                },
593                None,
594                Some(&format!("recv{}", tag_id)),
595            );
596        } else {
597            receiver_builder.add_dfir(
598                parse_quote! {
599                    #out_ident = source_stream(#source_expr);
600                },
601                None,
602                Some(&format!("recv{}", tag_id)),
603            );
604        }
605    }
606
607    fn create_external_output(
608        &mut self,
609        on: &LocationId,
610        sink_expr: syn::Expr,
611        input_ident: &syn::Ident,
612        serialize: Option<&DebugExpr>,
613        tag_id: usize,
614    ) {
615        let sender_builder = self.get_dfir_mut(on);
616        if let Some(serialize_fn) = serialize {
617            sender_builder.add_dfir(
618                parse_quote! {
619                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
620                },
621                None,
622                // operator tag separates send and receive, which otherwise have the same next_stmt_id
623                Some(&format!("send{}", tag_id)),
624            );
625        } else {
626            sender_builder.add_dfir(
627                parse_quote! {
628                    #input_ident -> dest_sink(#sink_expr);
629                },
630                None,
631                Some(&format!("send{}", tag_id)),
632            );
633        }
634    }
635}
636
637#[cfg(feature = "build")]
638pub enum BuildersOrCallback<'a, L, N>
639where
640    L: FnMut(&mut HydroRoot, &mut usize),
641    N: FnMut(&mut HydroNode, &mut usize),
642{
643    Builders(&'a mut dyn DfirBuilder),
644    Callback(L, N),
645}
646
647/// An root in a Hydro graph, which is an pipeline that doesn't emit
648/// any downstream values. Traversals over the dataflow graph and
649/// generating DFIR IR start from roots.
650#[derive(Debug, Hash)]
651pub enum HydroRoot {
652    ForEach {
653        f: DebugExpr,
654        input: Box<HydroNode>,
655        op_metadata: HydroIrOpMetadata,
656    },
657    SendExternal {
658        to_external_key: LocationKey,
659        to_port_id: ExternalPortId,
660        to_many: bool,
661        unpaired: bool,
662        serialize_fn: Option<DebugExpr>,
663        instantiate_fn: DebugInstantiate,
664        input: Box<HydroNode>,
665        op_metadata: HydroIrOpMetadata,
666    },
667    DestSink {
668        sink: DebugExpr,
669        input: Box<HydroNode>,
670        op_metadata: HydroIrOpMetadata,
671    },
672    CycleSink {
673        ident: syn::Ident,
674        input: Box<HydroNode>,
675        op_metadata: HydroIrOpMetadata,
676    },
677}
678
679impl HydroRoot {
680    #[cfg(feature = "build")]
681    pub fn compile_network<'a, D>(
682        &mut self,
683        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
684        seen_tees: &mut SeenTees,
685        processes: &SparseSecondaryMap<LocationKey, D::Process>,
686        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
687        externals: &SparseSecondaryMap<LocationKey, D::External>,
688    ) where
689        D: Deploy<'a>,
690    {
691        let refcell_extra_stmts = RefCell::new(extra_stmts);
692        self.transform_bottom_up(
693            &mut |l| {
694                if let HydroRoot::SendExternal {
695                    input,
696                    to_external_key,
697                    to_port_id,
698                    to_many,
699                    unpaired,
700                    instantiate_fn,
701                    ..
702                } = l
703                {
704                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
705                        DebugInstantiate::Building => {
706                            let to_node = externals
707                                .get(*to_external_key)
708                                .unwrap_or_else(|| {
709                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
710                                })
711                                .clone();
712
713                            match input.metadata().location_id.root() {
714                                &LocationId::Process(process_key) => {
715                                    if *to_many {
716                                        (
717                                            (
718                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
719                                                parse_quote!(DUMMY),
720                                            ),
721                                            Box::new(|| {}) as Box<dyn FnOnce()>,
722                                        )
723                                    } else {
724                                        let from_node = processes
725                                            .get(process_key)
726                                            .unwrap_or_else(|| {
727                                                panic!("A process used in the graph was not instantiated: {}", process_key)
728                                            })
729                                            .clone();
730
731                                        let sink_port = from_node.next_port();
732                                        let source_port = to_node.next_port();
733
734                                        if *unpaired {
735                                            use stageleft::quote_type;
736                                            use tokio_util::codec::LengthDelimitedCodec;
737
738                                            to_node.register(*to_port_id, source_port.clone());
739
740                                            let _ = D::e2o_source(
741                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
742                                                &to_node, &source_port,
743                                                &from_node, &sink_port,
744                                                &quote_type::<LengthDelimitedCodec>(),
745                                                format!("{}_{}", *to_external_key, *to_port_id)
746                                            );
747                                        }
748
749                                        (
750                                            (
751                                                D::o2e_sink(
752                                                    &from_node,
753                                                    &sink_port,
754                                                    &to_node,
755                                                    &source_port,
756                                                    format!("{}_{}", *to_external_key, *to_port_id)
757                                                ),
758                                                parse_quote!(DUMMY),
759                                            ),
760                                            if *unpaired {
761                                                D::e2o_connect(
762                                                    &to_node,
763                                                    &source_port,
764                                                    &from_node,
765                                                    &sink_port,
766                                                    *to_many,
767                                                    NetworkHint::Auto,
768                                                )
769                                            } else {
770                                                Box::new(|| {}) as Box<dyn FnOnce()>
771                                            },
772                                        )
773                                    }
774                                }
775                                LocationId::Cluster(_) => todo!(),
776                                _ => panic!()
777                            }
778                        },
779
780                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
781                    };
782
783                    *instantiate_fn = DebugInstantiateFinalized {
784                        sink: sink_expr,
785                        source: source_expr,
786                        connect_fn: Some(connect_fn),
787                    }
788                    .into();
789                }
790            },
791            &mut |n| {
792                if let HydroNode::Network {
793                    input,
794                    instantiate_fn,
795                    metadata,
796                    ..
797                } = n
798                {
799                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
800                        DebugInstantiate::Building => instantiate_network::<D>(
801                            input.metadata().location_id.root(),
802                            metadata.location_id.root(),
803                            processes,
804                            clusters,
805                        ),
806
807                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
808                    };
809
810                    *instantiate_fn = DebugInstantiateFinalized {
811                        sink: sink_expr,
812                        source: source_expr,
813                        connect_fn: Some(connect_fn),
814                    }
815                    .into();
816                } else if let HydroNode::ExternalInput {
817                    from_external_key,
818                    from_port_id,
819                    from_many,
820                    codec_type,
821                    port_hint,
822                    instantiate_fn,
823                    metadata,
824                    ..
825                } = n
826                {
827                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
828                        DebugInstantiate::Building => {
829                            let from_node = externals
830                                .get(*from_external_key)
831                                .unwrap_or_else(|| {
832                                    panic!(
833                                        "A external used in the graph was not instantiated: {}",
834                                        from_external_key,
835                                    )
836                                })
837                                .clone();
838
839                            match metadata.location_id.root() {
840                                &LocationId::Process(process_key) => {
841                                    let to_node = processes
842                                        .get(process_key)
843                                        .unwrap_or_else(|| {
844                                            panic!("A process used in the graph was not instantiated: {}", process_key)
845                                        })
846                                        .clone();
847
848                                    let sink_port = from_node.next_port();
849                                    let source_port = to_node.next_port();
850
851                                    from_node.register(*from_port_id, sink_port.clone());
852
853                                    (
854                                        (
855                                            parse_quote!(DUMMY),
856                                            if *from_many {
857                                                D::e2o_many_source(
858                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
859                                                    &to_node, &source_port,
860                                                    codec_type.0.as_ref(),
861                                                    format!("{}_{}", *from_external_key, *from_port_id)
862                                                )
863                                            } else {
864                                                D::e2o_source(
865                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
866                                                    &from_node, &sink_port,
867                                                    &to_node, &source_port,
868                                                    codec_type.0.as_ref(),
869                                                    format!("{}_{}", *from_external_key, *from_port_id)
870                                                )
871                                            },
872                                        ),
873                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
874                                    )
875                                }
876                                LocationId::Cluster(_) => todo!(),
877                                _ => panic!()
878                            }
879                        },
880
881                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
882                    };
883
884                    *instantiate_fn = DebugInstantiateFinalized {
885                        sink: sink_expr,
886                        source: source_expr,
887                        connect_fn: Some(connect_fn),
888                    }
889                    .into();
890                }
891            },
892            seen_tees,
893            false,
894        );
895    }
896
897    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
898        self.transform_bottom_up(
899            &mut |l| {
900                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
901                    match instantiate_fn {
902                        DebugInstantiate::Building => panic!("network not built"),
903
904                        DebugInstantiate::Finalized(finalized) => {
905                            (finalized.connect_fn.take().unwrap())();
906                        }
907                    }
908                }
909            },
910            &mut |n| {
911                if let HydroNode::Network { instantiate_fn, .. }
912                | HydroNode::ExternalInput { instantiate_fn, .. } = n
913                {
914                    match instantiate_fn {
915                        DebugInstantiate::Building => panic!("network not built"),
916
917                        DebugInstantiate::Finalized(finalized) => {
918                            (finalized.connect_fn.take().unwrap())();
919                        }
920                    }
921                }
922            },
923            seen_tees,
924            false,
925        );
926    }
927
928    pub fn transform_bottom_up(
929        &mut self,
930        transform_root: &mut impl FnMut(&mut HydroRoot),
931        transform_node: &mut impl FnMut(&mut HydroNode),
932        seen_tees: &mut SeenTees,
933        check_well_formed: bool,
934    ) {
935        self.transform_children(
936            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
937            seen_tees,
938        );
939
940        transform_root(self);
941    }
942
943    pub fn transform_children(
944        &mut self,
945        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
946        seen_tees: &mut SeenTees,
947    ) {
948        match self {
949            HydroRoot::ForEach { input, .. }
950            | HydroRoot::SendExternal { input, .. }
951            | HydroRoot::DestSink { input, .. }
952            | HydroRoot::CycleSink { input, .. } => {
953                transform(input, seen_tees);
954            }
955        }
956    }
957
958    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
959        match self {
960            HydroRoot::ForEach {
961                f,
962                input,
963                op_metadata,
964            } => HydroRoot::ForEach {
965                f: f.clone(),
966                input: Box::new(input.deep_clone(seen_tees)),
967                op_metadata: op_metadata.clone(),
968            },
969            HydroRoot::SendExternal {
970                to_external_key,
971                to_port_id,
972                to_many,
973                unpaired,
974                serialize_fn,
975                instantiate_fn,
976                input,
977                op_metadata,
978            } => HydroRoot::SendExternal {
979                to_external_key: *to_external_key,
980                to_port_id: *to_port_id,
981                to_many: *to_many,
982                unpaired: *unpaired,
983                serialize_fn: serialize_fn.clone(),
984                instantiate_fn: instantiate_fn.clone(),
985                input: Box::new(input.deep_clone(seen_tees)),
986                op_metadata: op_metadata.clone(),
987            },
988            HydroRoot::DestSink {
989                sink,
990                input,
991                op_metadata,
992            } => HydroRoot::DestSink {
993                sink: sink.clone(),
994                input: Box::new(input.deep_clone(seen_tees)),
995                op_metadata: op_metadata.clone(),
996            },
997            HydroRoot::CycleSink {
998                ident,
999                input,
1000                op_metadata,
1001            } => HydroRoot::CycleSink {
1002                ident: ident.clone(),
1003                input: Box::new(input.deep_clone(seen_tees)),
1004                op_metadata: op_metadata.clone(),
1005            },
1006        }
1007    }
1008
1009    #[cfg(feature = "build")]
1010    pub fn emit<'a, D: Deploy<'a>>(
1011        &mut self,
1012        graph_builders: &mut dyn DfirBuilder,
1013        seen_tees: &mut SeenTees,
1014        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1015        next_stmt_id: &mut usize,
1016    ) {
1017        self.emit_core::<D>(
1018            &mut BuildersOrCallback::<
1019                fn(&mut HydroRoot, &mut usize),
1020                fn(&mut HydroNode, &mut usize),
1021            >::Builders(graph_builders),
1022            seen_tees,
1023            built_tees,
1024            next_stmt_id,
1025        );
1026    }
1027
1028    #[cfg(feature = "build")]
1029    pub fn emit_core<'a, D: Deploy<'a>>(
1030        &mut self,
1031        builders_or_callback: &mut BuildersOrCallback<
1032            impl FnMut(&mut HydroRoot, &mut usize),
1033            impl FnMut(&mut HydroNode, &mut usize),
1034        >,
1035        seen_tees: &mut SeenTees,
1036        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1037        next_stmt_id: &mut usize,
1038    ) {
1039        match self {
1040            HydroRoot::ForEach { f, input, .. } => {
1041                let input_ident =
1042                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1043
1044                match builders_or_callback {
1045                    BuildersOrCallback::Builders(graph_builders) => {
1046                        graph_builders
1047                            .get_dfir_mut(&input.metadata().location_id)
1048                            .add_dfir(
1049                                parse_quote! {
1050                                    #input_ident -> for_each(#f);
1051                                },
1052                                None,
1053                                Some(&next_stmt_id.to_string()),
1054                            );
1055                    }
1056                    BuildersOrCallback::Callback(leaf_callback, _) => {
1057                        leaf_callback(self, next_stmt_id);
1058                    }
1059                }
1060
1061                *next_stmt_id += 1;
1062            }
1063
1064            HydroRoot::SendExternal {
1065                serialize_fn,
1066                instantiate_fn,
1067                input,
1068                ..
1069            } => {
1070                let input_ident =
1071                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1072
1073                match builders_or_callback {
1074                    BuildersOrCallback::Builders(graph_builders) => {
1075                        let (sink_expr, _) = match instantiate_fn {
1076                            DebugInstantiate::Building => (
1077                                syn::parse_quote!(DUMMY_SINK),
1078                                syn::parse_quote!(DUMMY_SOURCE),
1079                            ),
1080
1081                            DebugInstantiate::Finalized(finalized) => {
1082                                (finalized.sink.clone(), finalized.source.clone())
1083                            }
1084                        };
1085
1086                        graph_builders.create_external_output(
1087                            &input.metadata().location_id,
1088                            sink_expr,
1089                            &input_ident,
1090                            serialize_fn.as_ref(),
1091                            *next_stmt_id,
1092                        );
1093                    }
1094                    BuildersOrCallback::Callback(leaf_callback, _) => {
1095                        leaf_callback(self, next_stmt_id);
1096                    }
1097                }
1098
1099                *next_stmt_id += 1;
1100            }
1101
1102            HydroRoot::DestSink { sink, input, .. } => {
1103                let input_ident =
1104                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1105
1106                match builders_or_callback {
1107                    BuildersOrCallback::Builders(graph_builders) => {
1108                        graph_builders
1109                            .get_dfir_mut(&input.metadata().location_id)
1110                            .add_dfir(
1111                                parse_quote! {
1112                                    #input_ident -> dest_sink(#sink);
1113                                },
1114                                None,
1115                                Some(&next_stmt_id.to_string()),
1116                            );
1117                    }
1118                    BuildersOrCallback::Callback(leaf_callback, _) => {
1119                        leaf_callback(self, next_stmt_id);
1120                    }
1121                }
1122
1123                *next_stmt_id += 1;
1124            }
1125
1126            HydroRoot::CycleSink { ident, input, .. } => {
1127                let input_ident =
1128                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1129
1130                match builders_or_callback {
1131                    BuildersOrCallback::Builders(graph_builders) => {
1132                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1133                            CollectionKind::KeyedSingleton {
1134                                key_type,
1135                                value_type,
1136                                ..
1137                            }
1138                            | CollectionKind::KeyedStream {
1139                                key_type,
1140                                value_type,
1141                                ..
1142                            } => {
1143                                parse_quote!((#key_type, #value_type))
1144                            }
1145                            CollectionKind::Stream { element_type, .. }
1146                            | CollectionKind::Singleton { element_type, .. }
1147                            | CollectionKind::Optional { element_type, .. } => {
1148                                parse_quote!(#element_type)
1149                            }
1150                        };
1151
1152                        graph_builders
1153                            .get_dfir_mut(&input.metadata().location_id)
1154                            .add_dfir(
1155                                parse_quote! {
1156                                    #ident = #input_ident -> identity::<#elem_type>();
1157                                },
1158                                None,
1159                                None,
1160                            );
1161                    }
1162                    // No ID, no callback
1163                    BuildersOrCallback::Callback(_, _) => {}
1164                }
1165            }
1166        }
1167    }
1168
1169    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1170        match self {
1171            HydroRoot::ForEach { op_metadata, .. }
1172            | HydroRoot::SendExternal { op_metadata, .. }
1173            | HydroRoot::DestSink { op_metadata, .. }
1174            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1175        }
1176    }
1177
1178    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1179        match self {
1180            HydroRoot::ForEach { op_metadata, .. }
1181            | HydroRoot::SendExternal { op_metadata, .. }
1182            | HydroRoot::DestSink { op_metadata, .. }
1183            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1184        }
1185    }
1186
1187    pub fn input(&self) -> &HydroNode {
1188        match self {
1189            HydroRoot::ForEach { input, .. }
1190            | HydroRoot::SendExternal { input, .. }
1191            | HydroRoot::DestSink { input, .. }
1192            | HydroRoot::CycleSink { input, .. } => input,
1193        }
1194    }
1195
1196    pub fn input_metadata(&self) -> &HydroIrMetadata {
1197        self.input().metadata()
1198    }
1199
1200    pub fn print_root(&self) -> String {
1201        match self {
1202            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1203            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1204            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1205            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1206        }
1207    }
1208
1209    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1210        match self {
1211            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1212                transform(f);
1213            }
1214            HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1215        }
1216    }
1217}
1218
1219#[cfg(feature = "build")]
1220pub fn emit<'a, D: Deploy<'a>>(
1221    ir: &mut Vec<HydroRoot>,
1222) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1223    let mut builders = SecondaryMap::new();
1224    let mut seen_tees = HashMap::new();
1225    let mut built_tees = HashMap::new();
1226    let mut next_stmt_id = 0;
1227    for leaf in ir {
1228        leaf.emit::<D>(
1229            &mut builders,
1230            &mut seen_tees,
1231            &mut built_tees,
1232            &mut next_stmt_id,
1233        );
1234    }
1235    builders
1236}
1237
1238#[cfg(feature = "build")]
1239pub fn traverse_dfir<'a, D: Deploy<'a>>(
1240    ir: &mut [HydroRoot],
1241    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1242    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1243) {
1244    let mut seen_tees = HashMap::new();
1245    let mut built_tees = HashMap::new();
1246    let mut next_stmt_id = 0;
1247    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1248    ir.iter_mut().for_each(|leaf| {
1249        leaf.emit_core::<D>(
1250            &mut callback,
1251            &mut seen_tees,
1252            &mut built_tees,
1253            &mut next_stmt_id,
1254        );
1255    });
1256}
1257
1258pub fn transform_bottom_up(
1259    ir: &mut [HydroRoot],
1260    transform_root: &mut impl FnMut(&mut HydroRoot),
1261    transform_node: &mut impl FnMut(&mut HydroNode),
1262    check_well_formed: bool,
1263) {
1264    let mut seen_tees = HashMap::new();
1265    ir.iter_mut().for_each(|leaf| {
1266        leaf.transform_bottom_up(
1267            transform_root,
1268            transform_node,
1269            &mut seen_tees,
1270            check_well_formed,
1271        );
1272    });
1273}
1274
1275pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1276    let mut seen_tees = HashMap::new();
1277    ir.iter()
1278        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1279        .collect()
1280}
1281
1282type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1283thread_local! {
1284    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1285}
1286
1287pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1288    PRINTED_TEES.with(|printed_tees| {
1289        let mut printed_tees_mut = printed_tees.borrow_mut();
1290        *printed_tees_mut = Some((0, HashMap::new()));
1291        drop(printed_tees_mut);
1292
1293        let ret = f();
1294
1295        let mut printed_tees_mut = printed_tees.borrow_mut();
1296        *printed_tees_mut = None;
1297
1298        ret
1299    })
1300}
1301
1302pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1303
1304impl TeeNode {
1305    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1306        Rc::as_ptr(&self.0)
1307    }
1308}
1309
1310impl Debug for TeeNode {
1311    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1312        PRINTED_TEES.with(|printed_tees| {
1313            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1314            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1315
1316            if let Some(printed_tees_mut) = printed_tees_mut {
1317                if let Some(existing) = printed_tees_mut
1318                    .1
1319                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1320                {
1321                    write!(f, "<tee {}>", existing)
1322                } else {
1323                    let next_id = printed_tees_mut.0;
1324                    printed_tees_mut.0 += 1;
1325                    printed_tees_mut
1326                        .1
1327                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1328                    drop(printed_tees_mut_borrow);
1329                    write!(f, "<tee {}>: ", next_id)?;
1330                    Debug::fmt(&self.0.borrow(), f)
1331                }
1332            } else {
1333                drop(printed_tees_mut_borrow);
1334                write!(f, "<tee>: ")?;
1335                Debug::fmt(&self.0.borrow(), f)
1336            }
1337        })
1338    }
1339}
1340
1341impl Hash for TeeNode {
1342    fn hash<H: Hasher>(&self, state: &mut H) {
1343        self.0.borrow_mut().hash(state);
1344    }
1345}
1346
1347#[derive(Clone, PartialEq, Eq, Debug)]
1348pub enum BoundKind {
1349    Unbounded,
1350    Bounded,
1351}
1352
1353#[derive(Clone, PartialEq, Eq, Debug)]
1354pub enum StreamOrder {
1355    NoOrder,
1356    TotalOrder,
1357}
1358
1359#[derive(Clone, PartialEq, Eq, Debug)]
1360pub enum StreamRetry {
1361    AtLeastOnce,
1362    ExactlyOnce,
1363}
1364
1365#[derive(Clone, PartialEq, Eq, Debug)]
1366pub enum KeyedSingletonBoundKind {
1367    Unbounded,
1368    BoundedValue,
1369    Bounded,
1370}
1371
1372#[derive(Clone, PartialEq, Eq, Debug)]
1373pub enum CollectionKind {
1374    Stream {
1375        bound: BoundKind,
1376        order: StreamOrder,
1377        retry: StreamRetry,
1378        element_type: DebugType,
1379    },
1380    Singleton {
1381        bound: BoundKind,
1382        element_type: DebugType,
1383    },
1384    Optional {
1385        bound: BoundKind,
1386        element_type: DebugType,
1387    },
1388    KeyedStream {
1389        bound: BoundKind,
1390        value_order: StreamOrder,
1391        value_retry: StreamRetry,
1392        key_type: DebugType,
1393        value_type: DebugType,
1394    },
1395    KeyedSingleton {
1396        bound: KeyedSingletonBoundKind,
1397        key_type: DebugType,
1398        value_type: DebugType,
1399    },
1400}
1401
1402impl CollectionKind {
1403    pub fn is_bounded(&self) -> bool {
1404        matches!(
1405            self,
1406            CollectionKind::Stream {
1407                bound: BoundKind::Bounded,
1408                ..
1409            } | CollectionKind::Singleton {
1410                bound: BoundKind::Bounded,
1411                ..
1412            } | CollectionKind::Optional {
1413                bound: BoundKind::Bounded,
1414                ..
1415            } | CollectionKind::KeyedStream {
1416                bound: BoundKind::Bounded,
1417                ..
1418            } | CollectionKind::KeyedSingleton {
1419                bound: KeyedSingletonBoundKind::Bounded,
1420                ..
1421            }
1422        )
1423    }
1424}
1425
1426#[derive(Clone)]
1427pub struct HydroIrMetadata {
1428    pub location_id: LocationId,
1429    pub collection_kind: CollectionKind,
1430    pub cardinality: Option<usize>,
1431    pub tag: Option<String>,
1432    pub op: HydroIrOpMetadata,
1433}
1434
1435// HydroIrMetadata shouldn't be used to hash or compare
1436impl Hash for HydroIrMetadata {
1437    fn hash<H: Hasher>(&self, _: &mut H) {}
1438}
1439
1440impl PartialEq for HydroIrMetadata {
1441    fn eq(&self, _: &Self) -> bool {
1442        true
1443    }
1444}
1445
1446impl Eq for HydroIrMetadata {}
1447
1448impl Debug for HydroIrMetadata {
1449    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1450        f.debug_struct("HydroIrMetadata")
1451            .field("location_id", &self.location_id)
1452            .field("collection_kind", &self.collection_kind)
1453            .finish()
1454    }
1455}
1456
1457/// Metadata that is specific to the operator itself, rather than its outputs.
1458/// This is available on _both_ inner nodes and roots.
1459#[derive(Clone)]
1460pub struct HydroIrOpMetadata {
1461    pub backtrace: Backtrace,
1462    pub cpu_usage: Option<f64>,
1463    pub network_recv_cpu_usage: Option<f64>,
1464    pub id: Option<usize>,
1465}
1466
1467impl HydroIrOpMetadata {
1468    #[expect(
1469        clippy::new_without_default,
1470        reason = "explicit calls to new ensure correct backtrace bounds"
1471    )]
1472    pub fn new() -> HydroIrOpMetadata {
1473        Self::new_with_skip(1)
1474    }
1475
1476    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1477        HydroIrOpMetadata {
1478            backtrace: Backtrace::get_backtrace(2 + skip_count),
1479            cpu_usage: None,
1480            network_recv_cpu_usage: None,
1481            id: None,
1482        }
1483    }
1484}
1485
1486impl Debug for HydroIrOpMetadata {
1487    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1488        f.debug_struct("HydroIrOpMetadata").finish()
1489    }
1490}
1491
1492impl Hash for HydroIrOpMetadata {
1493    fn hash<H: Hasher>(&self, _: &mut H) {}
1494}
1495
1496/// An intermediate node in a Hydro graph, which consumes data
1497/// from upstream nodes and emits data to downstream nodes.
1498#[derive(Debug, Hash)]
1499pub enum HydroNode {
1500    Placeholder,
1501
1502    /// Manually "casts" between two different collection kinds.
1503    ///
1504    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1505    /// correctness checks. In particular, the user must ensure that every possible
1506    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1507    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1508    /// collection. This ensures that the simulator does not miss any possible outputs.
1509    Cast {
1510        inner: Box<HydroNode>,
1511        metadata: HydroIrMetadata,
1512    },
1513
1514    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1515    /// interpretation of the input stream.
1516    ///
1517    /// In production, this simply passes through the input, but in simulation, this operator
1518    /// explicitly selects a randomized interpretation.
1519    ObserveNonDet {
1520        inner: Box<HydroNode>,
1521        trusted: bool, // if true, we do not need to simulate non-determinism
1522        metadata: HydroIrMetadata,
1523    },
1524
1525    Source {
1526        source: HydroSource,
1527        metadata: HydroIrMetadata,
1528    },
1529
1530    SingletonSource {
1531        value: DebugExpr,
1532        metadata: HydroIrMetadata,
1533    },
1534
1535    CycleSource {
1536        ident: syn::Ident,
1537        metadata: HydroIrMetadata,
1538    },
1539
1540    Tee {
1541        inner: TeeNode,
1542        metadata: HydroIrMetadata,
1543    },
1544
1545    BeginAtomic {
1546        inner: Box<HydroNode>,
1547        metadata: HydroIrMetadata,
1548    },
1549
1550    EndAtomic {
1551        inner: Box<HydroNode>,
1552        metadata: HydroIrMetadata,
1553    },
1554
1555    Batch {
1556        inner: Box<HydroNode>,
1557        metadata: HydroIrMetadata,
1558    },
1559
1560    YieldConcat {
1561        inner: Box<HydroNode>,
1562        metadata: HydroIrMetadata,
1563    },
1564
1565    Chain {
1566        first: Box<HydroNode>,
1567        second: Box<HydroNode>,
1568        metadata: HydroIrMetadata,
1569    },
1570
1571    ChainFirst {
1572        first: Box<HydroNode>,
1573        second: Box<HydroNode>,
1574        metadata: HydroIrMetadata,
1575    },
1576
1577    CrossProduct {
1578        left: Box<HydroNode>,
1579        right: Box<HydroNode>,
1580        metadata: HydroIrMetadata,
1581    },
1582
1583    CrossSingleton {
1584        left: Box<HydroNode>,
1585        right: Box<HydroNode>,
1586        metadata: HydroIrMetadata,
1587    },
1588
1589    Join {
1590        left: Box<HydroNode>,
1591        right: Box<HydroNode>,
1592        metadata: HydroIrMetadata,
1593    },
1594
1595    Difference {
1596        pos: Box<HydroNode>,
1597        neg: Box<HydroNode>,
1598        metadata: HydroIrMetadata,
1599    },
1600
1601    AntiJoin {
1602        pos: Box<HydroNode>,
1603        neg: Box<HydroNode>,
1604        metadata: HydroIrMetadata,
1605    },
1606
1607    ResolveFutures {
1608        input: Box<HydroNode>,
1609        metadata: HydroIrMetadata,
1610    },
1611    ResolveFuturesOrdered {
1612        input: Box<HydroNode>,
1613        metadata: HydroIrMetadata,
1614    },
1615
1616    Map {
1617        f: DebugExpr,
1618        input: Box<HydroNode>,
1619        metadata: HydroIrMetadata,
1620    },
1621    FlatMap {
1622        f: DebugExpr,
1623        input: Box<HydroNode>,
1624        metadata: HydroIrMetadata,
1625    },
1626    Filter {
1627        f: DebugExpr,
1628        input: Box<HydroNode>,
1629        metadata: HydroIrMetadata,
1630    },
1631    FilterMap {
1632        f: DebugExpr,
1633        input: Box<HydroNode>,
1634        metadata: HydroIrMetadata,
1635    },
1636
1637    DeferTick {
1638        input: Box<HydroNode>,
1639        metadata: HydroIrMetadata,
1640    },
1641    Enumerate {
1642        input: Box<HydroNode>,
1643        metadata: HydroIrMetadata,
1644    },
1645    Inspect {
1646        f: DebugExpr,
1647        input: Box<HydroNode>,
1648        metadata: HydroIrMetadata,
1649    },
1650
1651    Unique {
1652        input: Box<HydroNode>,
1653        metadata: HydroIrMetadata,
1654    },
1655
1656    Sort {
1657        input: Box<HydroNode>,
1658        metadata: HydroIrMetadata,
1659    },
1660    Fold {
1661        init: DebugExpr,
1662        acc: DebugExpr,
1663        input: Box<HydroNode>,
1664        metadata: HydroIrMetadata,
1665    },
1666
1667    Scan {
1668        init: DebugExpr,
1669        acc: DebugExpr,
1670        input: Box<HydroNode>,
1671        metadata: HydroIrMetadata,
1672    },
1673    FoldKeyed {
1674        init: DebugExpr,
1675        acc: DebugExpr,
1676        input: Box<HydroNode>,
1677        metadata: HydroIrMetadata,
1678    },
1679
1680    Reduce {
1681        f: DebugExpr,
1682        input: Box<HydroNode>,
1683        metadata: HydroIrMetadata,
1684    },
1685    ReduceKeyed {
1686        f: DebugExpr,
1687        input: Box<HydroNode>,
1688        metadata: HydroIrMetadata,
1689    },
1690    ReduceKeyedWatermark {
1691        f: DebugExpr,
1692        input: Box<HydroNode>,
1693        watermark: Box<HydroNode>,
1694        metadata: HydroIrMetadata,
1695    },
1696
1697    Network {
1698        name: Option<String>,
1699        serialize_fn: Option<DebugExpr>,
1700        instantiate_fn: DebugInstantiate,
1701        deserialize_fn: Option<DebugExpr>,
1702        input: Box<HydroNode>,
1703        metadata: HydroIrMetadata,
1704    },
1705
1706    ExternalInput {
1707        from_external_key: LocationKey,
1708        from_port_id: ExternalPortId,
1709        from_many: bool,
1710        codec_type: DebugType,
1711        port_hint: NetworkHint,
1712        instantiate_fn: DebugInstantiate,
1713        deserialize_fn: Option<DebugExpr>,
1714        metadata: HydroIrMetadata,
1715    },
1716
1717    Counter {
1718        tag: String,
1719        duration: DebugExpr,
1720        prefix: String,
1721        input: Box<HydroNode>,
1722        metadata: HydroIrMetadata,
1723    },
1724}
1725
1726pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1727pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1728
1729impl HydroNode {
1730    pub fn transform_bottom_up(
1731        &mut self,
1732        transform: &mut impl FnMut(&mut HydroNode),
1733        seen_tees: &mut SeenTees,
1734        check_well_formed: bool,
1735    ) {
1736        self.transform_children(
1737            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1738            seen_tees,
1739        );
1740
1741        transform(self);
1742
1743        let self_location = self.metadata().location_id.root();
1744
1745        if check_well_formed {
1746            match &*self {
1747                HydroNode::Network { .. } => {}
1748                _ => {
1749                    self.input_metadata().iter().for_each(|i| {
1750                        if i.location_id.root() != self_location {
1751                            panic!(
1752                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1753                                i,
1754                                i.location_id.root(),
1755                                self,
1756                                self_location
1757                            )
1758                        }
1759                    });
1760                }
1761            }
1762        }
1763    }
1764
1765    #[inline(always)]
1766    pub fn transform_children(
1767        &mut self,
1768        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1769        seen_tees: &mut SeenTees,
1770    ) {
1771        match self {
1772            HydroNode::Placeholder => {
1773                panic!();
1774            }
1775
1776            HydroNode::Source { .. }
1777            | HydroNode::SingletonSource { .. }
1778            | HydroNode::CycleSource { .. }
1779            | HydroNode::ExternalInput { .. } => {}
1780
1781            HydroNode::Tee { inner, .. } => {
1782                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1783                    *inner = TeeNode(transformed.clone());
1784                } else {
1785                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1786                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1787                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1788                    transform(&mut orig, seen_tees);
1789                    *transformed_cell.borrow_mut() = orig;
1790                    *inner = TeeNode(transformed_cell);
1791                }
1792            }
1793
1794            HydroNode::Cast { inner, .. }
1795            | HydroNode::ObserveNonDet { inner, .. }
1796            | HydroNode::BeginAtomic { inner, .. }
1797            | HydroNode::EndAtomic { inner, .. }
1798            | HydroNode::Batch { inner, .. }
1799            | HydroNode::YieldConcat { inner, .. } => {
1800                transform(inner.as_mut(), seen_tees);
1801            }
1802
1803            HydroNode::Chain { first, second, .. } => {
1804                transform(first.as_mut(), seen_tees);
1805                transform(second.as_mut(), seen_tees);
1806            }
1807
1808            HydroNode::ChainFirst { first, second, .. } => {
1809                transform(first.as_mut(), seen_tees);
1810                transform(second.as_mut(), seen_tees);
1811            }
1812
1813            HydroNode::CrossSingleton { left, right, .. }
1814            | HydroNode::CrossProduct { left, right, .. }
1815            | HydroNode::Join { left, right, .. } => {
1816                transform(left.as_mut(), seen_tees);
1817                transform(right.as_mut(), seen_tees);
1818            }
1819
1820            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1821                transform(pos.as_mut(), seen_tees);
1822                transform(neg.as_mut(), seen_tees);
1823            }
1824
1825            HydroNode::ReduceKeyedWatermark {
1826                input, watermark, ..
1827            } => {
1828                transform(input.as_mut(), seen_tees);
1829                transform(watermark.as_mut(), seen_tees);
1830            }
1831
1832            HydroNode::Map { input, .. }
1833            | HydroNode::ResolveFutures { input, .. }
1834            | HydroNode::ResolveFuturesOrdered { input, .. }
1835            | HydroNode::FlatMap { input, .. }
1836            | HydroNode::Filter { input, .. }
1837            | HydroNode::FilterMap { input, .. }
1838            | HydroNode::Sort { input, .. }
1839            | HydroNode::DeferTick { input, .. }
1840            | HydroNode::Enumerate { input, .. }
1841            | HydroNode::Inspect { input, .. }
1842            | HydroNode::Unique { input, .. }
1843            | HydroNode::Network { input, .. }
1844            | HydroNode::Fold { input, .. }
1845            | HydroNode::Scan { input, .. }
1846            | HydroNode::FoldKeyed { input, .. }
1847            | HydroNode::Reduce { input, .. }
1848            | HydroNode::ReduceKeyed { input, .. }
1849            | HydroNode::Counter { input, .. } => {
1850                transform(input.as_mut(), seen_tees);
1851            }
1852        }
1853    }
1854
1855    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1856        match self {
1857            HydroNode::Placeholder => HydroNode::Placeholder,
1858            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1859                inner: Box::new(inner.deep_clone(seen_tees)),
1860                metadata: metadata.clone(),
1861            },
1862            HydroNode::ObserveNonDet {
1863                inner,
1864                trusted,
1865                metadata,
1866            } => HydroNode::ObserveNonDet {
1867                inner: Box::new(inner.deep_clone(seen_tees)),
1868                trusted: *trusted,
1869                metadata: metadata.clone(),
1870            },
1871            HydroNode::Source { source, metadata } => HydroNode::Source {
1872                source: source.clone(),
1873                metadata: metadata.clone(),
1874            },
1875            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1876                value: value.clone(),
1877                metadata: metadata.clone(),
1878            },
1879            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1880                ident: ident.clone(),
1881                metadata: metadata.clone(),
1882            },
1883            HydroNode::Tee { inner, metadata } => {
1884                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1885                    HydroNode::Tee {
1886                        inner: TeeNode(transformed.clone()),
1887                        metadata: metadata.clone(),
1888                    }
1889                } else {
1890                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1891                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1892                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1893                    *new_rc.borrow_mut() = cloned;
1894                    HydroNode::Tee {
1895                        inner: TeeNode(new_rc),
1896                        metadata: metadata.clone(),
1897                    }
1898                }
1899            }
1900            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1901                inner: Box::new(inner.deep_clone(seen_tees)),
1902                metadata: metadata.clone(),
1903            },
1904            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1905                inner: Box::new(inner.deep_clone(seen_tees)),
1906                metadata: metadata.clone(),
1907            },
1908            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1909                inner: Box::new(inner.deep_clone(seen_tees)),
1910                metadata: metadata.clone(),
1911            },
1912            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1913                inner: Box::new(inner.deep_clone(seen_tees)),
1914                metadata: metadata.clone(),
1915            },
1916            HydroNode::Chain {
1917                first,
1918                second,
1919                metadata,
1920            } => HydroNode::Chain {
1921                first: Box::new(first.deep_clone(seen_tees)),
1922                second: Box::new(second.deep_clone(seen_tees)),
1923                metadata: metadata.clone(),
1924            },
1925            HydroNode::ChainFirst {
1926                first,
1927                second,
1928                metadata,
1929            } => HydroNode::ChainFirst {
1930                first: Box::new(first.deep_clone(seen_tees)),
1931                second: Box::new(second.deep_clone(seen_tees)),
1932                metadata: metadata.clone(),
1933            },
1934            HydroNode::CrossProduct {
1935                left,
1936                right,
1937                metadata,
1938            } => HydroNode::CrossProduct {
1939                left: Box::new(left.deep_clone(seen_tees)),
1940                right: Box::new(right.deep_clone(seen_tees)),
1941                metadata: metadata.clone(),
1942            },
1943            HydroNode::CrossSingleton {
1944                left,
1945                right,
1946                metadata,
1947            } => HydroNode::CrossSingleton {
1948                left: Box::new(left.deep_clone(seen_tees)),
1949                right: Box::new(right.deep_clone(seen_tees)),
1950                metadata: metadata.clone(),
1951            },
1952            HydroNode::Join {
1953                left,
1954                right,
1955                metadata,
1956            } => HydroNode::Join {
1957                left: Box::new(left.deep_clone(seen_tees)),
1958                right: Box::new(right.deep_clone(seen_tees)),
1959                metadata: metadata.clone(),
1960            },
1961            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1962                pos: Box::new(pos.deep_clone(seen_tees)),
1963                neg: Box::new(neg.deep_clone(seen_tees)),
1964                metadata: metadata.clone(),
1965            },
1966            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1967                pos: Box::new(pos.deep_clone(seen_tees)),
1968                neg: Box::new(neg.deep_clone(seen_tees)),
1969                metadata: metadata.clone(),
1970            },
1971            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1972                input: Box::new(input.deep_clone(seen_tees)),
1973                metadata: metadata.clone(),
1974            },
1975            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1976                HydroNode::ResolveFuturesOrdered {
1977                    input: Box::new(input.deep_clone(seen_tees)),
1978                    metadata: metadata.clone(),
1979                }
1980            }
1981            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1982                f: f.clone(),
1983                input: Box::new(input.deep_clone(seen_tees)),
1984                metadata: metadata.clone(),
1985            },
1986            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1987                f: f.clone(),
1988                input: Box::new(input.deep_clone(seen_tees)),
1989                metadata: metadata.clone(),
1990            },
1991            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1992                f: f.clone(),
1993                input: Box::new(input.deep_clone(seen_tees)),
1994                metadata: metadata.clone(),
1995            },
1996            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1997                f: f.clone(),
1998                input: Box::new(input.deep_clone(seen_tees)),
1999                metadata: metadata.clone(),
2000            },
2001            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2002                input: Box::new(input.deep_clone(seen_tees)),
2003                metadata: metadata.clone(),
2004            },
2005            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2006                input: Box::new(input.deep_clone(seen_tees)),
2007                metadata: metadata.clone(),
2008            },
2009            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2010                f: f.clone(),
2011                input: Box::new(input.deep_clone(seen_tees)),
2012                metadata: metadata.clone(),
2013            },
2014            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2015                input: Box::new(input.deep_clone(seen_tees)),
2016                metadata: metadata.clone(),
2017            },
2018            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2019                input: Box::new(input.deep_clone(seen_tees)),
2020                metadata: metadata.clone(),
2021            },
2022            HydroNode::Fold {
2023                init,
2024                acc,
2025                input,
2026                metadata,
2027            } => HydroNode::Fold {
2028                init: init.clone(),
2029                acc: acc.clone(),
2030                input: Box::new(input.deep_clone(seen_tees)),
2031                metadata: metadata.clone(),
2032            },
2033            HydroNode::Scan {
2034                init,
2035                acc,
2036                input,
2037                metadata,
2038            } => HydroNode::Scan {
2039                init: init.clone(),
2040                acc: acc.clone(),
2041                input: Box::new(input.deep_clone(seen_tees)),
2042                metadata: metadata.clone(),
2043            },
2044            HydroNode::FoldKeyed {
2045                init,
2046                acc,
2047                input,
2048                metadata,
2049            } => HydroNode::FoldKeyed {
2050                init: init.clone(),
2051                acc: acc.clone(),
2052                input: Box::new(input.deep_clone(seen_tees)),
2053                metadata: metadata.clone(),
2054            },
2055            HydroNode::ReduceKeyedWatermark {
2056                f,
2057                input,
2058                watermark,
2059                metadata,
2060            } => HydroNode::ReduceKeyedWatermark {
2061                f: f.clone(),
2062                input: Box::new(input.deep_clone(seen_tees)),
2063                watermark: Box::new(watermark.deep_clone(seen_tees)),
2064                metadata: metadata.clone(),
2065            },
2066            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2067                f: f.clone(),
2068                input: Box::new(input.deep_clone(seen_tees)),
2069                metadata: metadata.clone(),
2070            },
2071            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2072                f: f.clone(),
2073                input: Box::new(input.deep_clone(seen_tees)),
2074                metadata: metadata.clone(),
2075            },
2076            HydroNode::Network {
2077                name,
2078                serialize_fn,
2079                instantiate_fn,
2080                deserialize_fn,
2081                input,
2082                metadata,
2083            } => HydroNode::Network {
2084                name: name.clone(),
2085                serialize_fn: serialize_fn.clone(),
2086                instantiate_fn: instantiate_fn.clone(),
2087                deserialize_fn: deserialize_fn.clone(),
2088                input: Box::new(input.deep_clone(seen_tees)),
2089                metadata: metadata.clone(),
2090            },
2091            HydroNode::ExternalInput {
2092                from_external_key,
2093                from_port_id,
2094                from_many,
2095                codec_type,
2096                port_hint,
2097                instantiate_fn,
2098                deserialize_fn,
2099                metadata,
2100            } => HydroNode::ExternalInput {
2101                from_external_key: *from_external_key,
2102                from_port_id: *from_port_id,
2103                from_many: *from_many,
2104                codec_type: codec_type.clone(),
2105                port_hint: *port_hint,
2106                instantiate_fn: instantiate_fn.clone(),
2107                deserialize_fn: deserialize_fn.clone(),
2108                metadata: metadata.clone(),
2109            },
2110            HydroNode::Counter {
2111                tag,
2112                duration,
2113                prefix,
2114                input,
2115                metadata,
2116            } => HydroNode::Counter {
2117                tag: tag.clone(),
2118                duration: duration.clone(),
2119                prefix: prefix.clone(),
2120                input: Box::new(input.deep_clone(seen_tees)),
2121                metadata: metadata.clone(),
2122            },
2123        }
2124    }
2125
2126    #[cfg(feature = "build")]
2127    pub fn emit_core<'a, D: Deploy<'a>>(
2128        &mut self,
2129        builders_or_callback: &mut BuildersOrCallback<
2130            impl FnMut(&mut HydroRoot, &mut usize),
2131            impl FnMut(&mut HydroNode, &mut usize),
2132        >,
2133        seen_tees: &mut SeenTees,
2134        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2135        next_stmt_id: &mut usize,
2136    ) -> syn::Ident {
2137        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2138
2139        self.transform_bottom_up(
2140            &mut |node: &mut HydroNode| {
2141                let out_location = node.metadata().location_id.clone();
2142                match node {
2143                    HydroNode::Placeholder => {
2144                        panic!()
2145                    }
2146
2147                    HydroNode::Cast { .. } => {
2148                        // Cast passes through the input ident unchanged
2149                        // The input ident is already on the stack from processing the child
2150                        match builders_or_callback {
2151                            BuildersOrCallback::Builders(_) => {}
2152                            BuildersOrCallback::Callback(_, node_callback) => {
2153                                node_callback(node, next_stmt_id);
2154                            }
2155                        }
2156
2157                        *next_stmt_id += 1;
2158                        // input_ident stays on stack as output
2159                    }
2160
2161                    HydroNode::ObserveNonDet {
2162                        inner,
2163                        trusted,
2164                        metadata,
2165                        ..
2166                    } => {
2167                        let inner_ident = ident_stack.pop().unwrap();
2168
2169                        let observe_ident =
2170                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2171
2172                        match builders_or_callback {
2173                            BuildersOrCallback::Builders(graph_builders) => {
2174                                graph_builders.observe_nondet(
2175                                    *trusted,
2176                                    &inner.metadata().location_id,
2177                                    inner_ident,
2178                                    &inner.metadata().collection_kind,
2179                                    &observe_ident,
2180                                    &metadata.collection_kind,
2181                                    &metadata.op,
2182                                );
2183                            }
2184                            BuildersOrCallback::Callback(_, node_callback) => {
2185                                node_callback(node, next_stmt_id);
2186                            }
2187                        }
2188
2189                        *next_stmt_id += 1;
2190
2191                        ident_stack.push(observe_ident);
2192                    }
2193
2194                    HydroNode::Batch {
2195                        inner, metadata, ..
2196                    } => {
2197                        let inner_ident = ident_stack.pop().unwrap();
2198
2199                        let batch_ident =
2200                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2201
2202                        match builders_or_callback {
2203                            BuildersOrCallback::Builders(graph_builders) => {
2204                                graph_builders.batch(
2205                                    inner_ident,
2206                                    &inner.metadata().location_id,
2207                                    &inner.metadata().collection_kind,
2208                                    &batch_ident,
2209                                    &out_location,
2210                                    &metadata.op,
2211                                );
2212                            }
2213                            BuildersOrCallback::Callback(_, node_callback) => {
2214                                node_callback(node, next_stmt_id);
2215                            }
2216                        }
2217
2218                        *next_stmt_id += 1;
2219
2220                        ident_stack.push(batch_ident);
2221                    }
2222
2223                    HydroNode::YieldConcat { inner, .. } => {
2224                        let inner_ident = ident_stack.pop().unwrap();
2225
2226                        let yield_ident =
2227                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2228
2229                        match builders_or_callback {
2230                            BuildersOrCallback::Builders(graph_builders) => {
2231                                graph_builders.yield_from_tick(
2232                                    inner_ident,
2233                                    &inner.metadata().location_id,
2234                                    &inner.metadata().collection_kind,
2235                                    &yield_ident,
2236                                    &out_location,
2237                                );
2238                            }
2239                            BuildersOrCallback::Callback(_, node_callback) => {
2240                                node_callback(node, next_stmt_id);
2241                            }
2242                        }
2243
2244                        *next_stmt_id += 1;
2245
2246                        ident_stack.push(yield_ident);
2247                    }
2248
2249                    HydroNode::BeginAtomic { inner, metadata } => {
2250                        let inner_ident = ident_stack.pop().unwrap();
2251
2252                        let begin_ident =
2253                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2254
2255                        match builders_or_callback {
2256                            BuildersOrCallback::Builders(graph_builders) => {
2257                                graph_builders.begin_atomic(
2258                                    inner_ident,
2259                                    &inner.metadata().location_id,
2260                                    &inner.metadata().collection_kind,
2261                                    &begin_ident,
2262                                    &out_location,
2263                                    &metadata.op,
2264                                );
2265                            }
2266                            BuildersOrCallback::Callback(_, node_callback) => {
2267                                node_callback(node, next_stmt_id);
2268                            }
2269                        }
2270
2271                        *next_stmt_id += 1;
2272
2273                        ident_stack.push(begin_ident);
2274                    }
2275
2276                    HydroNode::EndAtomic { inner, .. } => {
2277                        let inner_ident = ident_stack.pop().unwrap();
2278
2279                        let end_ident =
2280                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2281
2282                        match builders_or_callback {
2283                            BuildersOrCallback::Builders(graph_builders) => {
2284                                graph_builders.end_atomic(
2285                                    inner_ident,
2286                                    &inner.metadata().location_id,
2287                                    &inner.metadata().collection_kind,
2288                                    &end_ident,
2289                                );
2290                            }
2291                            BuildersOrCallback::Callback(_, node_callback) => {
2292                                node_callback(node, next_stmt_id);
2293                            }
2294                        }
2295
2296                        *next_stmt_id += 1;
2297
2298                        ident_stack.push(end_ident);
2299                    }
2300
2301                    HydroNode::Source {
2302                        source, metadata, ..
2303                    } => {
2304                        if let HydroSource::ExternalNetwork() = source {
2305                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2306                        } else {
2307                            let source_ident =
2308                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2309
2310                            let source_stmt = match source {
2311                                HydroSource::Stream(expr) => {
2312                                    debug_assert!(metadata.location_id.is_top_level());
2313                                    parse_quote! {
2314                                        #source_ident = source_stream(#expr);
2315                                    }
2316                                }
2317
2318                                HydroSource::ExternalNetwork() => {
2319                                    unreachable!()
2320                                }
2321
2322                                HydroSource::Iter(expr) => {
2323                                    if metadata.location_id.is_top_level() {
2324                                        parse_quote! {
2325                                            #source_ident = source_iter(#expr);
2326                                        }
2327                                    } else {
2328                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2329                                        parse_quote! {
2330                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2331                                        }
2332                                    }
2333                                }
2334
2335                                HydroSource::Spin() => {
2336                                    debug_assert!(metadata.location_id.is_top_level());
2337                                    parse_quote! {
2338                                        #source_ident = spin();
2339                                    }
2340                                }
2341
2342                                HydroSource::ClusterMembers(location_id) => {
2343                                    debug_assert!(metadata.location_id.is_top_level());
2344
2345                                    let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2346                                        D::cluster_membership_stream(location_id),
2347                                        &(),
2348                                    );
2349
2350                                    parse_quote! {
2351                                        #source_ident = source_stream(#expr);
2352                                    }
2353                                }
2354                            };
2355
2356                            match builders_or_callback {
2357                                BuildersOrCallback::Builders(graph_builders) => {
2358                                    let builder = graph_builders.get_dfir_mut(&out_location);
2359                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2360                                }
2361                                BuildersOrCallback::Callback(_, node_callback) => {
2362                                    node_callback(node, next_stmt_id);
2363                                }
2364                            }
2365
2366                            *next_stmt_id += 1;
2367
2368                            ident_stack.push(source_ident);
2369                        }
2370                    }
2371
2372                    HydroNode::SingletonSource { value, metadata } => {
2373                        let source_ident =
2374                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2375
2376                        match builders_or_callback {
2377                            BuildersOrCallback::Builders(graph_builders) => {
2378                                let builder = graph_builders.get_dfir_mut(&out_location);
2379
2380                                if metadata.location_id.is_top_level()
2381                                    && metadata.collection_kind.is_bounded()
2382                                {
2383                                    builder.add_dfir(
2384                                        parse_quote! {
2385                                            #source_ident = source_iter([#value]);
2386                                        },
2387                                        None,
2388                                        Some(&next_stmt_id.to_string()),
2389                                    );
2390                                } else {
2391                                    builder.add_dfir(
2392                                        parse_quote! {
2393                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2394                                        },
2395                                        None,
2396                                        Some(&next_stmt_id.to_string()),
2397                                    );
2398                                }
2399                            }
2400                            BuildersOrCallback::Callback(_, node_callback) => {
2401                                node_callback(node, next_stmt_id);
2402                            }
2403                        }
2404
2405                        *next_stmt_id += 1;
2406
2407                        ident_stack.push(source_ident);
2408                    }
2409
2410                    HydroNode::CycleSource { ident, .. } => {
2411                        let ident = ident.clone();
2412
2413                        match builders_or_callback {
2414                            BuildersOrCallback::Builders(_) => {}
2415                            BuildersOrCallback::Callback(_, node_callback) => {
2416                                node_callback(node, next_stmt_id);
2417                            }
2418                        }
2419
2420                        // consume a stmt id even though we did not emit anything so that we can instrument this
2421                        *next_stmt_id += 1;
2422
2423                        ident_stack.push(ident);
2424                    }
2425
2426                    HydroNode::Tee { inner, .. } => {
2427                        let ret_ident = if let Some(teed_from) =
2428                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2429                        {
2430                            match builders_or_callback {
2431                                BuildersOrCallback::Builders(_) => {}
2432                                BuildersOrCallback::Callback(_, node_callback) => {
2433                                    node_callback(node, next_stmt_id);
2434                                }
2435                            }
2436
2437                            teed_from.clone()
2438                        } else {
2439                            // The inner node was already processed by transform_bottom_up,
2440                            // so its ident is on the stack
2441                            let inner_ident = ident_stack.pop().unwrap();
2442
2443                            let tee_ident =
2444                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2445
2446                            built_tees.insert(
2447                                inner.0.as_ref() as *const RefCell<HydroNode>,
2448                                tee_ident.clone(),
2449                            );
2450
2451                            match builders_or_callback {
2452                                BuildersOrCallback::Builders(graph_builders) => {
2453                                    let builder = graph_builders.get_dfir_mut(&out_location);
2454                                    builder.add_dfir(
2455                                        parse_quote! {
2456                                            #tee_ident = #inner_ident -> tee();
2457                                        },
2458                                        None,
2459                                        Some(&next_stmt_id.to_string()),
2460                                    );
2461                                }
2462                                BuildersOrCallback::Callback(_, node_callback) => {
2463                                    node_callback(node, next_stmt_id);
2464                                }
2465                            }
2466
2467                            tee_ident
2468                        };
2469
2470                        // we consume a stmt id regardless of if we emit the tee() operator,
2471                        // so that during rewrites we touch all recipients of the tee()
2472
2473                        *next_stmt_id += 1;
2474                        ident_stack.push(ret_ident);
2475                    }
2476
2477                    HydroNode::Chain { .. } => {
2478                        // Children are processed left-to-right, so second is on top
2479                        let second_ident = ident_stack.pop().unwrap();
2480                        let first_ident = ident_stack.pop().unwrap();
2481
2482                        let chain_ident =
2483                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2484
2485                        match builders_or_callback {
2486                            BuildersOrCallback::Builders(graph_builders) => {
2487                                let builder = graph_builders.get_dfir_mut(&out_location);
2488                                builder.add_dfir(
2489                                    parse_quote! {
2490                                        #chain_ident = chain();
2491                                        #first_ident -> [0]#chain_ident;
2492                                        #second_ident -> [1]#chain_ident;
2493                                    },
2494                                    None,
2495                                    Some(&next_stmt_id.to_string()),
2496                                );
2497                            }
2498                            BuildersOrCallback::Callback(_, node_callback) => {
2499                                node_callback(node, next_stmt_id);
2500                            }
2501                        }
2502
2503                        *next_stmt_id += 1;
2504
2505                        ident_stack.push(chain_ident);
2506                    }
2507
2508                    HydroNode::ChainFirst { .. } => {
2509                        let second_ident = ident_stack.pop().unwrap();
2510                        let first_ident = ident_stack.pop().unwrap();
2511
2512                        let chain_ident =
2513                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2514
2515                        match builders_or_callback {
2516                            BuildersOrCallback::Builders(graph_builders) => {
2517                                let builder = graph_builders.get_dfir_mut(&out_location);
2518                                builder.add_dfir(
2519                                    parse_quote! {
2520                                        #chain_ident = chain_first_n(1);
2521                                        #first_ident -> [0]#chain_ident;
2522                                        #second_ident -> [1]#chain_ident;
2523                                    },
2524                                    None,
2525                                    Some(&next_stmt_id.to_string()),
2526                                );
2527                            }
2528                            BuildersOrCallback::Callback(_, node_callback) => {
2529                                node_callback(node, next_stmt_id);
2530                            }
2531                        }
2532
2533                        *next_stmt_id += 1;
2534
2535                        ident_stack.push(chain_ident);
2536                    }
2537
2538                    HydroNode::CrossSingleton { right, .. } => {
2539                        let right_ident = ident_stack.pop().unwrap();
2540                        let left_ident = ident_stack.pop().unwrap();
2541
2542                        let cross_ident =
2543                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2544
2545                        match builders_or_callback {
2546                            BuildersOrCallback::Builders(graph_builders) => {
2547                                let builder = graph_builders.get_dfir_mut(&out_location);
2548
2549                                if right.metadata().location_id.is_top_level()
2550                                    && right.metadata().collection_kind.is_bounded()
2551                                {
2552                                    builder.add_dfir(
2553                                        parse_quote! {
2554                                            #cross_ident = cross_singleton();
2555                                            #left_ident -> [input]#cross_ident;
2556                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2557                                        },
2558                                        None,
2559                                        Some(&next_stmt_id.to_string()),
2560                                    );
2561                                } else {
2562                                    builder.add_dfir(
2563                                        parse_quote! {
2564                                            #cross_ident = cross_singleton();
2565                                            #left_ident -> [input]#cross_ident;
2566                                            #right_ident -> [single]#cross_ident;
2567                                        },
2568                                        None,
2569                                        Some(&next_stmt_id.to_string()),
2570                                    );
2571                                }
2572                            }
2573                            BuildersOrCallback::Callback(_, node_callback) => {
2574                                node_callback(node, next_stmt_id);
2575                            }
2576                        }
2577
2578                        *next_stmt_id += 1;
2579
2580                        ident_stack.push(cross_ident);
2581                    }
2582
2583                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2584                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2585                            parse_quote!(cross_join_multiset)
2586                        } else {
2587                            parse_quote!(join_multiset)
2588                        };
2589
2590                        let (HydroNode::CrossProduct { left, right, .. }
2591                        | HydroNode::Join { left, right, .. }) = node
2592                        else {
2593                            unreachable!()
2594                        };
2595
2596                        let is_top_level = left.metadata().location_id.is_top_level()
2597                            && right.metadata().location_id.is_top_level();
2598                        let left_lifetime = if left.metadata().location_id.is_top_level() {
2599                            quote!('static)
2600                        } else {
2601                            quote!('tick)
2602                        };
2603
2604                        let right_lifetime = if right.metadata().location_id.is_top_level() {
2605                            quote!('static)
2606                        } else {
2607                            quote!('tick)
2608                        };
2609
2610                        let right_ident = ident_stack.pop().unwrap();
2611                        let left_ident = ident_stack.pop().unwrap();
2612
2613                        let stream_ident =
2614                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2615
2616                        match builders_or_callback {
2617                            BuildersOrCallback::Builders(graph_builders) => {
2618                                let builder = graph_builders.get_dfir_mut(&out_location);
2619                                builder.add_dfir(
2620                                    if is_top_level {
2621                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
2622                                        // a multiset_delta() to negate the replay behavior
2623                                        parse_quote! {
2624                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2625                                            #left_ident -> [0]#stream_ident;
2626                                            #right_ident -> [1]#stream_ident;
2627                                        }
2628                                    } else {
2629                                        parse_quote! {
2630                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2631                                            #left_ident -> [0]#stream_ident;
2632                                            #right_ident -> [1]#stream_ident;
2633                                        }
2634                                    }
2635                                    ,
2636                                    None,
2637                                    Some(&next_stmt_id.to_string()),
2638                                );
2639                            }
2640                            BuildersOrCallback::Callback(_, node_callback) => {
2641                                node_callback(node, next_stmt_id);
2642                            }
2643                        }
2644
2645                        *next_stmt_id += 1;
2646
2647                        ident_stack.push(stream_ident);
2648                    }
2649
2650                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2651                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2652                            parse_quote!(difference)
2653                        } else {
2654                            parse_quote!(anti_join)
2655                        };
2656
2657                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2658                            node
2659                        else {
2660                            unreachable!()
2661                        };
2662
2663                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2664                            quote!('static)
2665                        } else {
2666                            quote!('tick)
2667                        };
2668
2669                        let neg_ident = ident_stack.pop().unwrap();
2670                        let pos_ident = ident_stack.pop().unwrap();
2671
2672                        let stream_ident =
2673                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2674
2675                        match builders_or_callback {
2676                            BuildersOrCallback::Builders(graph_builders) => {
2677                                let builder = graph_builders.get_dfir_mut(&out_location);
2678                                builder.add_dfir(
2679                                    parse_quote! {
2680                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
2681                                        #pos_ident -> [pos]#stream_ident;
2682                                        #neg_ident -> [neg]#stream_ident;
2683                                    },
2684                                    None,
2685                                    Some(&next_stmt_id.to_string()),
2686                                );
2687                            }
2688                            BuildersOrCallback::Callback(_, node_callback) => {
2689                                node_callback(node, next_stmt_id);
2690                            }
2691                        }
2692
2693                        *next_stmt_id += 1;
2694
2695                        ident_stack.push(stream_ident);
2696                    }
2697
2698                    HydroNode::ResolveFutures { .. } => {
2699                        let input_ident = ident_stack.pop().unwrap();
2700
2701                        let futures_ident =
2702                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2703
2704                        match builders_or_callback {
2705                            BuildersOrCallback::Builders(graph_builders) => {
2706                                let builder = graph_builders.get_dfir_mut(&out_location);
2707                                builder.add_dfir(
2708                                    parse_quote! {
2709                                        #futures_ident = #input_ident -> resolve_futures();
2710                                    },
2711                                    None,
2712                                    Some(&next_stmt_id.to_string()),
2713                                );
2714                            }
2715                            BuildersOrCallback::Callback(_, node_callback) => {
2716                                node_callback(node, next_stmt_id);
2717                            }
2718                        }
2719
2720                        *next_stmt_id += 1;
2721
2722                        ident_stack.push(futures_ident);
2723                    }
2724
2725                    HydroNode::ResolveFuturesOrdered { .. } => {
2726                        let input_ident = ident_stack.pop().unwrap();
2727
2728                        let futures_ident =
2729                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2730
2731                        match builders_or_callback {
2732                            BuildersOrCallback::Builders(graph_builders) => {
2733                                let builder = graph_builders.get_dfir_mut(&out_location);
2734                                builder.add_dfir(
2735                                    parse_quote! {
2736                                        #futures_ident = #input_ident -> resolve_futures_ordered();
2737                                    },
2738                                    None,
2739                                    Some(&next_stmt_id.to_string()),
2740                                );
2741                            }
2742                            BuildersOrCallback::Callback(_, node_callback) => {
2743                                node_callback(node, next_stmt_id);
2744                            }
2745                        }
2746
2747                        *next_stmt_id += 1;
2748
2749                        ident_stack.push(futures_ident);
2750                    }
2751
2752                    HydroNode::Map { f, .. } => {
2753                        let input_ident = ident_stack.pop().unwrap();
2754
2755                        let map_ident =
2756                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2757
2758                        match builders_or_callback {
2759                            BuildersOrCallback::Builders(graph_builders) => {
2760                                let builder = graph_builders.get_dfir_mut(&out_location);
2761                                builder.add_dfir(
2762                                    parse_quote! {
2763                                        #map_ident = #input_ident -> map(#f);
2764                                    },
2765                                    None,
2766                                    Some(&next_stmt_id.to_string()),
2767                                );
2768                            }
2769                            BuildersOrCallback::Callback(_, node_callback) => {
2770                                node_callback(node, next_stmt_id);
2771                            }
2772                        }
2773
2774                        *next_stmt_id += 1;
2775
2776                        ident_stack.push(map_ident);
2777                    }
2778
2779                    HydroNode::FlatMap { f, .. } => {
2780                        let input_ident = ident_stack.pop().unwrap();
2781
2782                        let flat_map_ident =
2783                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2784
2785                        match builders_or_callback {
2786                            BuildersOrCallback::Builders(graph_builders) => {
2787                                let builder = graph_builders.get_dfir_mut(&out_location);
2788                                builder.add_dfir(
2789                                    parse_quote! {
2790                                        #flat_map_ident = #input_ident -> flat_map(#f);
2791                                    },
2792                                    None,
2793                                    Some(&next_stmt_id.to_string()),
2794                                );
2795                            }
2796                            BuildersOrCallback::Callback(_, node_callback) => {
2797                                node_callback(node, next_stmt_id);
2798                            }
2799                        }
2800
2801                        *next_stmt_id += 1;
2802
2803                        ident_stack.push(flat_map_ident);
2804                    }
2805
2806                    HydroNode::Filter { f, .. } => {
2807                        let input_ident = ident_stack.pop().unwrap();
2808
2809                        let filter_ident =
2810                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2811
2812                        match builders_or_callback {
2813                            BuildersOrCallback::Builders(graph_builders) => {
2814                                let builder = graph_builders.get_dfir_mut(&out_location);
2815                                builder.add_dfir(
2816                                    parse_quote! {
2817                                        #filter_ident = #input_ident -> filter(#f);
2818                                    },
2819                                    None,
2820                                    Some(&next_stmt_id.to_string()),
2821                                );
2822                            }
2823                            BuildersOrCallback::Callback(_, node_callback) => {
2824                                node_callback(node, next_stmt_id);
2825                            }
2826                        }
2827
2828                        *next_stmt_id += 1;
2829
2830                        ident_stack.push(filter_ident);
2831                    }
2832
2833                    HydroNode::FilterMap { f, .. } => {
2834                        let input_ident = ident_stack.pop().unwrap();
2835
2836                        let filter_map_ident =
2837                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2838
2839                        match builders_or_callback {
2840                            BuildersOrCallback::Builders(graph_builders) => {
2841                                let builder = graph_builders.get_dfir_mut(&out_location);
2842                                builder.add_dfir(
2843                                    parse_quote! {
2844                                        #filter_map_ident = #input_ident -> filter_map(#f);
2845                                    },
2846                                    None,
2847                                    Some(&next_stmt_id.to_string()),
2848                                );
2849                            }
2850                            BuildersOrCallback::Callback(_, node_callback) => {
2851                                node_callback(node, next_stmt_id);
2852                            }
2853                        }
2854
2855                        *next_stmt_id += 1;
2856
2857                        ident_stack.push(filter_map_ident);
2858                    }
2859
2860                    HydroNode::Sort { .. } => {
2861                        let input_ident = ident_stack.pop().unwrap();
2862
2863                        let sort_ident =
2864                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2865
2866                        match builders_or_callback {
2867                            BuildersOrCallback::Builders(graph_builders) => {
2868                                let builder = graph_builders.get_dfir_mut(&out_location);
2869                                builder.add_dfir(
2870                                    parse_quote! {
2871                                        #sort_ident = #input_ident -> sort();
2872                                    },
2873                                    None,
2874                                    Some(&next_stmt_id.to_string()),
2875                                );
2876                            }
2877                            BuildersOrCallback::Callback(_, node_callback) => {
2878                                node_callback(node, next_stmt_id);
2879                            }
2880                        }
2881
2882                        *next_stmt_id += 1;
2883
2884                        ident_stack.push(sort_ident);
2885                    }
2886
2887                    HydroNode::DeferTick { .. } => {
2888                        let input_ident = ident_stack.pop().unwrap();
2889
2890                        let defer_tick_ident =
2891                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2892
2893                        match builders_or_callback {
2894                            BuildersOrCallback::Builders(graph_builders) => {
2895                                let builder = graph_builders.get_dfir_mut(&out_location);
2896                                builder.add_dfir(
2897                                    parse_quote! {
2898                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
2899                                    },
2900                                    None,
2901                                    Some(&next_stmt_id.to_string()),
2902                                );
2903                            }
2904                            BuildersOrCallback::Callback(_, node_callback) => {
2905                                node_callback(node, next_stmt_id);
2906                            }
2907                        }
2908
2909                        *next_stmt_id += 1;
2910
2911                        ident_stack.push(defer_tick_ident);
2912                    }
2913
2914                    HydroNode::Enumerate { input, .. } => {
2915                        let input_ident = ident_stack.pop().unwrap();
2916
2917                        let enumerate_ident =
2918                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2919
2920                        match builders_or_callback {
2921                            BuildersOrCallback::Builders(graph_builders) => {
2922                                let builder = graph_builders.get_dfir_mut(&out_location);
2923                                let lifetime = if input.metadata().location_id.is_top_level() {
2924                                    quote!('static)
2925                                } else {
2926                                    quote!('tick)
2927                                };
2928                                builder.add_dfir(
2929                                    parse_quote! {
2930                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2931                                    },
2932                                    None,
2933                                    Some(&next_stmt_id.to_string()),
2934                                );
2935                            }
2936                            BuildersOrCallback::Callback(_, node_callback) => {
2937                                node_callback(node, next_stmt_id);
2938                            }
2939                        }
2940
2941                        *next_stmt_id += 1;
2942
2943                        ident_stack.push(enumerate_ident);
2944                    }
2945
2946                    HydroNode::Inspect { f, .. } => {
2947                        let input_ident = ident_stack.pop().unwrap();
2948
2949                        let inspect_ident =
2950                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2951
2952                        match builders_or_callback {
2953                            BuildersOrCallback::Builders(graph_builders) => {
2954                                let builder = graph_builders.get_dfir_mut(&out_location);
2955                                builder.add_dfir(
2956                                    parse_quote! {
2957                                        #inspect_ident = #input_ident -> inspect(#f);
2958                                    },
2959                                    None,
2960                                    Some(&next_stmt_id.to_string()),
2961                                );
2962                            }
2963                            BuildersOrCallback::Callback(_, node_callback) => {
2964                                node_callback(node, next_stmt_id);
2965                            }
2966                        }
2967
2968                        *next_stmt_id += 1;
2969
2970                        ident_stack.push(inspect_ident);
2971                    }
2972
2973                    HydroNode::Unique { input, .. } => {
2974                        let input_ident = ident_stack.pop().unwrap();
2975
2976                        let unique_ident =
2977                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2978
2979                        match builders_or_callback {
2980                            BuildersOrCallback::Builders(graph_builders) => {
2981                                let builder = graph_builders.get_dfir_mut(&out_location);
2982                                let lifetime = if input.metadata().location_id.is_top_level() {
2983                                    quote!('static)
2984                                } else {
2985                                    quote!('tick)
2986                                };
2987
2988                                builder.add_dfir(
2989                                    parse_quote! {
2990                                        #unique_ident = #input_ident -> unique::<#lifetime>();
2991                                    },
2992                                    None,
2993                                    Some(&next_stmt_id.to_string()),
2994                                );
2995                            }
2996                            BuildersOrCallback::Callback(_, node_callback) => {
2997                                node_callback(node, next_stmt_id);
2998                            }
2999                        }
3000
3001                        *next_stmt_id += 1;
3002
3003                        ident_stack.push(unique_ident);
3004                    }
3005
3006                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3007                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3008                            if input.metadata().location_id.is_top_level()
3009                                && input.metadata().collection_kind.is_bounded()
3010                            {
3011                                parse_quote!(fold_no_replay)
3012                            } else {
3013                                parse_quote!(fold)
3014                            }
3015                        } else if matches!(node, HydroNode::Scan { .. }) {
3016                            parse_quote!(scan)
3017                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3018                            if input.metadata().location_id.is_top_level()
3019                                && input.metadata().collection_kind.is_bounded()
3020                            {
3021                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3022                            } else {
3023                                parse_quote!(fold_keyed)
3024                            }
3025                        } else {
3026                            unreachable!()
3027                        };
3028
3029                        let (HydroNode::Fold { input, .. }
3030                        | HydroNode::FoldKeyed { input, .. }
3031                        | HydroNode::Scan { input, .. }) = node
3032                        else {
3033                            unreachable!()
3034                        };
3035
3036                        let lifetime = if input.metadata().location_id.is_top_level() {
3037                            quote!('static)
3038                        } else {
3039                            quote!('tick)
3040                        };
3041
3042                        let input_ident = ident_stack.pop().unwrap();
3043
3044                        let (HydroNode::Fold { init, acc, .. }
3045                        | HydroNode::FoldKeyed { init, acc, .. }
3046                        | HydroNode::Scan { init, acc, .. }) = &*node
3047                        else {
3048                            unreachable!()
3049                        };
3050
3051                        let fold_ident =
3052                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3053
3054                        match builders_or_callback {
3055                            BuildersOrCallback::Builders(graph_builders) => {
3056                                if matches!(node, HydroNode::Fold { .. })
3057                                    && node.metadata().location_id.is_top_level()
3058                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3059                                    && graph_builders.singleton_intermediates()
3060                                    && !node.metadata().collection_kind.is_bounded()
3061                                {
3062                                    let builder = graph_builders.get_dfir_mut(&out_location);
3063
3064                                    let acc: syn::Expr = parse_quote!({
3065                                        let mut __inner = #acc;
3066                                        move |__state, __value| {
3067                                            __inner(__state, __value);
3068                                            Some(__state.clone())
3069                                        }
3070                                    });
3071
3072                                    builder.add_dfir(
3073                                        parse_quote! {
3074                                            source_iter([(#init)()]) -> [0]#fold_ident;
3075                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3076                                            #fold_ident = chain();
3077                                        },
3078                                        None,
3079                                        Some(&next_stmt_id.to_string()),
3080                                    );
3081                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3082                                    && node.metadata().location_id.is_top_level()
3083                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3084                                    && graph_builders.singleton_intermediates()
3085                                    && !node.metadata().collection_kind.is_bounded()
3086                                {
3087                                    let builder = graph_builders.get_dfir_mut(&out_location);
3088
3089                                    let acc: syn::Expr = parse_quote!({
3090                                        let mut __init = #init;
3091                                        let mut __inner = #acc;
3092                                        move |__state, __kv: (_, _)| {
3093                                            // TODO(shadaj): we can avoid the clone when the entry exists
3094                                            let __state = __state
3095                                                .entry(::std::clone::Clone::clone(&__kv.0))
3096                                                .or_insert_with(|| (__init)());
3097                                            __inner(__state, __kv.1);
3098                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3099                                        }
3100                                    });
3101
3102                                    builder.add_dfir(
3103                                        parse_quote! {
3104                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3105                                        },
3106                                        None,
3107                                        Some(&next_stmt_id.to_string()),
3108                                    );
3109                                } else {
3110                                    let builder = graph_builders.get_dfir_mut(&out_location);
3111                                    builder.add_dfir(
3112                                        parse_quote! {
3113                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3114                                        },
3115                                        None,
3116                                        Some(&next_stmt_id.to_string()),
3117                                    );
3118                                }
3119                            }
3120                            BuildersOrCallback::Callback(_, node_callback) => {
3121                                node_callback(node, next_stmt_id);
3122                            }
3123                        }
3124
3125                        *next_stmt_id += 1;
3126
3127                        ident_stack.push(fold_ident);
3128                    }
3129
3130                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3131                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3132                            if input.metadata().location_id.is_top_level()
3133                                && input.metadata().collection_kind.is_bounded()
3134                            {
3135                                parse_quote!(reduce_no_replay)
3136                            } else {
3137                                parse_quote!(reduce)
3138                            }
3139                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3140                            if input.metadata().location_id.is_top_level()
3141                                && input.metadata().collection_kind.is_bounded()
3142                            {
3143                                todo!(
3144                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3145                                )
3146                            } else {
3147                                parse_quote!(reduce_keyed)
3148                            }
3149                        } else {
3150                            unreachable!()
3151                        };
3152
3153                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3154                        else {
3155                            unreachable!()
3156                        };
3157
3158                        let lifetime = if input.metadata().location_id.is_top_level() {
3159                            quote!('static)
3160                        } else {
3161                            quote!('tick)
3162                        };
3163
3164                        let input_ident = ident_stack.pop().unwrap();
3165
3166                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3167                        else {
3168                            unreachable!()
3169                        };
3170
3171                        let reduce_ident =
3172                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3173
3174                        match builders_or_callback {
3175                            BuildersOrCallback::Builders(graph_builders) => {
3176                                if matches!(node, HydroNode::Reduce { .. })
3177                                    && node.metadata().location_id.is_top_level()
3178                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3179                                    && graph_builders.singleton_intermediates()
3180                                    && !node.metadata().collection_kind.is_bounded()
3181                                {
3182                                    todo!(
3183                                        "Reduce with optional intermediates is not yet supported in simulator"
3184                                    );
3185                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3186                                    && node.metadata().location_id.is_top_level()
3187                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3188                                    && graph_builders.singleton_intermediates()
3189                                    && !node.metadata().collection_kind.is_bounded()
3190                                {
3191                                    todo!(
3192                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3193                                    );
3194                                } else {
3195                                    let builder = graph_builders.get_dfir_mut(&out_location);
3196                                    builder.add_dfir(
3197                                        parse_quote! {
3198                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3199                                        },
3200                                        None,
3201                                        Some(&next_stmt_id.to_string()),
3202                                    );
3203                                }
3204                            }
3205                            BuildersOrCallback::Callback(_, node_callback) => {
3206                                node_callback(node, next_stmt_id);
3207                            }
3208                        }
3209
3210                        *next_stmt_id += 1;
3211
3212                        ident_stack.push(reduce_ident);
3213                    }
3214
3215                    HydroNode::ReduceKeyedWatermark {
3216                        f,
3217                        input,
3218                        metadata,
3219                        ..
3220                    } => {
3221                        let lifetime = if input.metadata().location_id.is_top_level() {
3222                            quote!('static)
3223                        } else {
3224                            quote!('tick)
3225                        };
3226
3227                        // watermark is processed second, so it's on top
3228                        let watermark_ident = ident_stack.pop().unwrap();
3229                        let input_ident = ident_stack.pop().unwrap();
3230
3231                        let chain_ident = syn::Ident::new(
3232                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3233                            Span::call_site(),
3234                        );
3235
3236                        let fold_ident =
3237                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3238
3239                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3240                            && input.metadata().collection_kind.is_bounded()
3241                        {
3242                            parse_quote!(fold_no_replay)
3243                        } else {
3244                            parse_quote!(fold)
3245                        };
3246
3247                        match builders_or_callback {
3248                            BuildersOrCallback::Builders(graph_builders) => {
3249                                if metadata.location_id.is_top_level()
3250                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3251                                    && graph_builders.singleton_intermediates()
3252                                    && !metadata.collection_kind.is_bounded()
3253                                {
3254                                    todo!(
3255                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3256                                    )
3257                                } else {
3258                                    let builder = graph_builders.get_dfir_mut(&out_location);
3259                                    builder.add_dfir(
3260                                        parse_quote! {
3261                                            #chain_ident = chain();
3262                                            #input_ident
3263                                                -> map(|x| (Some(x), None))
3264                                                -> [0]#chain_ident;
3265                                            #watermark_ident
3266                                                -> map(|watermark| (None, Some(watermark)))
3267                                                -> [1]#chain_ident;
3268
3269                                            #fold_ident = #chain_ident
3270                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3271                                                    let __reduce_keyed_fn = #f;
3272                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3273                                                        if let Some((k, v)) = opt_payload {
3274                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3275                                                                if k <= curr_watermark {
3276                                                                    return;
3277                                                                }
3278                                                            }
3279                                                            match map.entry(k) {
3280                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3281                                                                    e.insert(v);
3282                                                                }
3283                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3284                                                                    __reduce_keyed_fn(e.get_mut(), v);
3285                                                                }
3286                                                            }
3287                                                        } else {
3288                                                            let watermark = opt_watermark.unwrap();
3289                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3290                                                                if watermark <= curr_watermark {
3291                                                                    return;
3292                                                                }
3293                                                            }
3294                                                            *opt_curr_watermark = opt_watermark;
3295                                                            map.retain(|k, _| *k > watermark);
3296                                                        }
3297                                                    }
3298                                                })
3299                                                -> flat_map(|(map, _curr_watermark)| map);
3300                                        },
3301                                        None,
3302                                        Some(&next_stmt_id.to_string()),
3303                                    );
3304                                }
3305                            }
3306                            BuildersOrCallback::Callback(_, node_callback) => {
3307                                node_callback(node, next_stmt_id);
3308                            }
3309                        }
3310
3311                        *next_stmt_id += 1;
3312
3313                        ident_stack.push(fold_ident);
3314                    }
3315
3316                    HydroNode::Network {
3317                        serialize_fn: serialize_pipeline,
3318                        instantiate_fn,
3319                        deserialize_fn: deserialize_pipeline,
3320                        input,
3321                        ..
3322                    } => {
3323                        let input_ident = ident_stack.pop().unwrap();
3324
3325                        let receiver_stream_ident =
3326                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3327
3328                        match builders_or_callback {
3329                            BuildersOrCallback::Builders(graph_builders) => {
3330                                let (sink_expr, source_expr) = match instantiate_fn {
3331                                    DebugInstantiate::Building => (
3332                                        syn::parse_quote!(DUMMY_SINK),
3333                                        syn::parse_quote!(DUMMY_SOURCE),
3334                                    ),
3335
3336                                    DebugInstantiate::Finalized(finalized) => {
3337                                        (finalized.sink.clone(), finalized.source.clone())
3338                                    }
3339                                };
3340
3341                                graph_builders.create_network(
3342                                    &input.metadata().location_id,
3343                                    &out_location,
3344                                    input_ident,
3345                                    &receiver_stream_ident,
3346                                    serialize_pipeline.as_ref(),
3347                                    sink_expr,
3348                                    source_expr,
3349                                    deserialize_pipeline.as_ref(),
3350                                    *next_stmt_id,
3351                                );
3352                            }
3353                            BuildersOrCallback::Callback(_, node_callback) => {
3354                                node_callback(node, next_stmt_id);
3355                            }
3356                        }
3357
3358                        *next_stmt_id += 1;
3359
3360                        ident_stack.push(receiver_stream_ident);
3361                    }
3362
3363                    HydroNode::ExternalInput {
3364                        instantiate_fn,
3365                        deserialize_fn: deserialize_pipeline,
3366                        ..
3367                    } => {
3368                        let receiver_stream_ident =
3369                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3370
3371                        match builders_or_callback {
3372                            BuildersOrCallback::Builders(graph_builders) => {
3373                                let (_, source_expr) = match instantiate_fn {
3374                                    DebugInstantiate::Building => (
3375                                        syn::parse_quote!(DUMMY_SINK),
3376                                        syn::parse_quote!(DUMMY_SOURCE),
3377                                    ),
3378
3379                                    DebugInstantiate::Finalized(finalized) => {
3380                                        (finalized.sink.clone(), finalized.source.clone())
3381                                    }
3382                                };
3383
3384                                graph_builders.create_external_source(
3385                                    &out_location,
3386                                    source_expr,
3387                                    &receiver_stream_ident,
3388                                    deserialize_pipeline.as_ref(),
3389                                    *next_stmt_id,
3390                                );
3391                            }
3392                            BuildersOrCallback::Callback(_, node_callback) => {
3393                                node_callback(node, next_stmt_id);
3394                            }
3395                        }
3396
3397                        *next_stmt_id += 1;
3398
3399                        ident_stack.push(receiver_stream_ident);
3400                    }
3401
3402                    HydroNode::Counter {
3403                        tag,
3404                        duration,
3405                        prefix,
3406                        ..
3407                    } => {
3408                        let input_ident = ident_stack.pop().unwrap();
3409
3410                        let counter_ident =
3411                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3412
3413                        match builders_or_callback {
3414                            BuildersOrCallback::Builders(graph_builders) => {
3415                                let builder = graph_builders.get_dfir_mut(&out_location);
3416                                builder.add_dfir(
3417                                    parse_quote! {
3418                                        #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3419                                    },
3420                                    None,
3421                                    Some(&next_stmt_id.to_string()),
3422                                );
3423                            }
3424                            BuildersOrCallback::Callback(_, node_callback) => {
3425                                node_callback(node, next_stmt_id);
3426                            }
3427                        }
3428
3429                        *next_stmt_id += 1;
3430
3431                        ident_stack.push(counter_ident);
3432                    }
3433                }
3434            },
3435            seen_tees,
3436            false,
3437        );
3438
3439        ident_stack
3440            .pop()
3441            .expect("ident_stack should have exactly one element after traversal")
3442    }
3443
3444    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3445        match self {
3446            HydroNode::Placeholder => {
3447                panic!()
3448            }
3449            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3450            HydroNode::Source { source, .. } => match source {
3451                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3452                HydroSource::ExternalNetwork()
3453                | HydroSource::Spin()
3454                | HydroSource::ClusterMembers(_) => {} // TODO: what goes here?
3455            },
3456            HydroNode::SingletonSource { value, .. } => {
3457                transform(value);
3458            }
3459            HydroNode::CycleSource { .. }
3460            | HydroNode::Tee { .. }
3461            | HydroNode::YieldConcat { .. }
3462            | HydroNode::BeginAtomic { .. }
3463            | HydroNode::EndAtomic { .. }
3464            | HydroNode::Batch { .. }
3465            | HydroNode::Chain { .. }
3466            | HydroNode::ChainFirst { .. }
3467            | HydroNode::CrossProduct { .. }
3468            | HydroNode::CrossSingleton { .. }
3469            | HydroNode::ResolveFutures { .. }
3470            | HydroNode::ResolveFuturesOrdered { .. }
3471            | HydroNode::Join { .. }
3472            | HydroNode::Difference { .. }
3473            | HydroNode::AntiJoin { .. }
3474            | HydroNode::DeferTick { .. }
3475            | HydroNode::Enumerate { .. }
3476            | HydroNode::Unique { .. }
3477            | HydroNode::Sort { .. } => {}
3478            HydroNode::Map { f, .. }
3479            | HydroNode::FlatMap { f, .. }
3480            | HydroNode::Filter { f, .. }
3481            | HydroNode::FilterMap { f, .. }
3482            | HydroNode::Inspect { f, .. }
3483            | HydroNode::Reduce { f, .. }
3484            | HydroNode::ReduceKeyed { f, .. }
3485            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3486                transform(f);
3487            }
3488            HydroNode::Fold { init, acc, .. }
3489            | HydroNode::Scan { init, acc, .. }
3490            | HydroNode::FoldKeyed { init, acc, .. } => {
3491                transform(init);
3492                transform(acc);
3493            }
3494            HydroNode::Network {
3495                serialize_fn,
3496                deserialize_fn,
3497                ..
3498            } => {
3499                if let Some(serialize_fn) = serialize_fn {
3500                    transform(serialize_fn);
3501                }
3502                if let Some(deserialize_fn) = deserialize_fn {
3503                    transform(deserialize_fn);
3504                }
3505            }
3506            HydroNode::ExternalInput { deserialize_fn, .. } => {
3507                if let Some(deserialize_fn) = deserialize_fn {
3508                    transform(deserialize_fn);
3509                }
3510            }
3511            HydroNode::Counter { duration, .. } => {
3512                transform(duration);
3513            }
3514        }
3515    }
3516
3517    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3518        &self.metadata().op
3519    }
3520
3521    pub fn metadata(&self) -> &HydroIrMetadata {
3522        match self {
3523            HydroNode::Placeholder => {
3524                panic!()
3525            }
3526            HydroNode::Cast { metadata, .. } => metadata,
3527            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3528            HydroNode::Source { metadata, .. } => metadata,
3529            HydroNode::SingletonSource { metadata, .. } => metadata,
3530            HydroNode::CycleSource { metadata, .. } => metadata,
3531            HydroNode::Tee { metadata, .. } => metadata,
3532            HydroNode::YieldConcat { metadata, .. } => metadata,
3533            HydroNode::BeginAtomic { metadata, .. } => metadata,
3534            HydroNode::EndAtomic { metadata, .. } => metadata,
3535            HydroNode::Batch { metadata, .. } => metadata,
3536            HydroNode::Chain { metadata, .. } => metadata,
3537            HydroNode::ChainFirst { metadata, .. } => metadata,
3538            HydroNode::CrossProduct { metadata, .. } => metadata,
3539            HydroNode::CrossSingleton { metadata, .. } => metadata,
3540            HydroNode::Join { metadata, .. } => metadata,
3541            HydroNode::Difference { metadata, .. } => metadata,
3542            HydroNode::AntiJoin { metadata, .. } => metadata,
3543            HydroNode::ResolveFutures { metadata, .. } => metadata,
3544            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3545            HydroNode::Map { metadata, .. } => metadata,
3546            HydroNode::FlatMap { metadata, .. } => metadata,
3547            HydroNode::Filter { metadata, .. } => metadata,
3548            HydroNode::FilterMap { metadata, .. } => metadata,
3549            HydroNode::DeferTick { metadata, .. } => metadata,
3550            HydroNode::Enumerate { metadata, .. } => metadata,
3551            HydroNode::Inspect { metadata, .. } => metadata,
3552            HydroNode::Unique { metadata, .. } => metadata,
3553            HydroNode::Sort { metadata, .. } => metadata,
3554            HydroNode::Scan { metadata, .. } => metadata,
3555            HydroNode::Fold { metadata, .. } => metadata,
3556            HydroNode::FoldKeyed { metadata, .. } => metadata,
3557            HydroNode::Reduce { metadata, .. } => metadata,
3558            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3559            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3560            HydroNode::ExternalInput { metadata, .. } => metadata,
3561            HydroNode::Network { metadata, .. } => metadata,
3562            HydroNode::Counter { metadata, .. } => metadata,
3563        }
3564    }
3565
3566    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3567        &mut self.metadata_mut().op
3568    }
3569
3570    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3571        match self {
3572            HydroNode::Placeholder => {
3573                panic!()
3574            }
3575            HydroNode::Cast { metadata, .. } => metadata,
3576            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3577            HydroNode::Source { metadata, .. } => metadata,
3578            HydroNode::SingletonSource { metadata, .. } => metadata,
3579            HydroNode::CycleSource { metadata, .. } => metadata,
3580            HydroNode::Tee { metadata, .. } => metadata,
3581            HydroNode::YieldConcat { metadata, .. } => metadata,
3582            HydroNode::BeginAtomic { metadata, .. } => metadata,
3583            HydroNode::EndAtomic { metadata, .. } => metadata,
3584            HydroNode::Batch { metadata, .. } => metadata,
3585            HydroNode::Chain { metadata, .. } => metadata,
3586            HydroNode::ChainFirst { metadata, .. } => metadata,
3587            HydroNode::CrossProduct { metadata, .. } => metadata,
3588            HydroNode::CrossSingleton { metadata, .. } => metadata,
3589            HydroNode::Join { metadata, .. } => metadata,
3590            HydroNode::Difference { metadata, .. } => metadata,
3591            HydroNode::AntiJoin { metadata, .. } => metadata,
3592            HydroNode::ResolveFutures { metadata, .. } => metadata,
3593            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3594            HydroNode::Map { metadata, .. } => metadata,
3595            HydroNode::FlatMap { metadata, .. } => metadata,
3596            HydroNode::Filter { metadata, .. } => metadata,
3597            HydroNode::FilterMap { metadata, .. } => metadata,
3598            HydroNode::DeferTick { metadata, .. } => metadata,
3599            HydroNode::Enumerate { metadata, .. } => metadata,
3600            HydroNode::Inspect { metadata, .. } => metadata,
3601            HydroNode::Unique { metadata, .. } => metadata,
3602            HydroNode::Sort { metadata, .. } => metadata,
3603            HydroNode::Scan { metadata, .. } => metadata,
3604            HydroNode::Fold { metadata, .. } => metadata,
3605            HydroNode::FoldKeyed { metadata, .. } => metadata,
3606            HydroNode::Reduce { metadata, .. } => metadata,
3607            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3608            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3609            HydroNode::ExternalInput { metadata, .. } => metadata,
3610            HydroNode::Network { metadata, .. } => metadata,
3611            HydroNode::Counter { metadata, .. } => metadata,
3612        }
3613    }
3614
3615    pub fn input(&self) -> Vec<&HydroNode> {
3616        match self {
3617            HydroNode::Placeholder => {
3618                panic!()
3619            }
3620            HydroNode::Source { .. }
3621            | HydroNode::SingletonSource { .. }
3622            | HydroNode::ExternalInput { .. }
3623            | HydroNode::CycleSource { .. }
3624            | HydroNode::Tee { .. } => {
3625                // Tee should find its input in separate special ways
3626                vec![]
3627            }
3628            HydroNode::Cast { inner, .. }
3629            | HydroNode::ObserveNonDet { inner, .. }
3630            | HydroNode::YieldConcat { inner, .. }
3631            | HydroNode::BeginAtomic { inner, .. }
3632            | HydroNode::EndAtomic { inner, .. }
3633            | HydroNode::Batch { inner, .. } => {
3634                vec![inner]
3635            }
3636            HydroNode::Chain { first, second, .. } => {
3637                vec![first, second]
3638            }
3639            HydroNode::ChainFirst { first, second, .. } => {
3640                vec![first, second]
3641            }
3642            HydroNode::CrossProduct { left, right, .. }
3643            | HydroNode::CrossSingleton { left, right, .. }
3644            | HydroNode::Join { left, right, .. } => {
3645                vec![left, right]
3646            }
3647            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3648                vec![pos, neg]
3649            }
3650            HydroNode::Map { input, .. }
3651            | HydroNode::FlatMap { input, .. }
3652            | HydroNode::Filter { input, .. }
3653            | HydroNode::FilterMap { input, .. }
3654            | HydroNode::Sort { input, .. }
3655            | HydroNode::DeferTick { input, .. }
3656            | HydroNode::Enumerate { input, .. }
3657            | HydroNode::Inspect { input, .. }
3658            | HydroNode::Unique { input, .. }
3659            | HydroNode::Network { input, .. }
3660            | HydroNode::Counter { input, .. }
3661            | HydroNode::ResolveFutures { input, .. }
3662            | HydroNode::ResolveFuturesOrdered { input, .. }
3663            | HydroNode::Fold { input, .. }
3664            | HydroNode::FoldKeyed { input, .. }
3665            | HydroNode::Reduce { input, .. }
3666            | HydroNode::ReduceKeyed { input, .. }
3667            | HydroNode::Scan { input, .. } => {
3668                vec![input]
3669            }
3670            HydroNode::ReduceKeyedWatermark {
3671                input, watermark, ..
3672            } => {
3673                vec![input, watermark]
3674            }
3675        }
3676    }
3677
3678    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3679        self.input()
3680            .iter()
3681            .map(|input_node| input_node.metadata())
3682            .collect()
3683    }
3684
3685    pub fn print_root(&self) -> String {
3686        match self {
3687            HydroNode::Placeholder => {
3688                panic!()
3689            }
3690            HydroNode::Cast { .. } => "Cast()".to_owned(),
3691            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3692            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3693            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3694            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3695            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3696            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3697            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3698            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3699            HydroNode::Batch { .. } => "Batch()".to_owned(),
3700            HydroNode::Chain { first, second, .. } => {
3701                format!("Chain({}, {})", first.print_root(), second.print_root())
3702            }
3703            HydroNode::ChainFirst { first, second, .. } => {
3704                format!(
3705                    "ChainFirst({}, {})",
3706                    first.print_root(),
3707                    second.print_root()
3708                )
3709            }
3710            HydroNode::CrossProduct { left, right, .. } => {
3711                format!(
3712                    "CrossProduct({}, {})",
3713                    left.print_root(),
3714                    right.print_root()
3715                )
3716            }
3717            HydroNode::CrossSingleton { left, right, .. } => {
3718                format!(
3719                    "CrossSingleton({}, {})",
3720                    left.print_root(),
3721                    right.print_root()
3722                )
3723            }
3724            HydroNode::Join { left, right, .. } => {
3725                format!("Join({}, {})", left.print_root(), right.print_root())
3726            }
3727            HydroNode::Difference { pos, neg, .. } => {
3728                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3729            }
3730            HydroNode::AntiJoin { pos, neg, .. } => {
3731                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3732            }
3733            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3734            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3735            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3736            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3737            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3738            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3739            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3740            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3741            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3742            HydroNode::Unique { .. } => "Unique()".to_owned(),
3743            HydroNode::Sort { .. } => "Sort()".to_owned(),
3744            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3745            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3746            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3747            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3748            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3749            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3750            HydroNode::Network { .. } => "Network()".to_owned(),
3751            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3752            HydroNode::Counter { tag, duration, .. } => {
3753                format!("Counter({:?}, {:?})", tag, duration)
3754            }
3755        }
3756    }
3757}
3758
3759#[cfg(feature = "build")]
3760fn instantiate_network<'a, D>(
3761    from_location: &LocationId,
3762    to_location: &LocationId,
3763    processes: &SparseSecondaryMap<LocationKey, D::Process>,
3764    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3765) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3766where
3767    D: Deploy<'a>,
3768{
3769    let ((sink, source), connect_fn) = match (from_location, to_location) {
3770        (&LocationId::Process(from), &LocationId::Process(to)) => {
3771            let from_node = processes
3772                .get(from)
3773                .unwrap_or_else(|| {
3774                    panic!("A process used in the graph was not instantiated: {}", from)
3775                })
3776                .clone();
3777            let to_node = processes
3778                .get(to)
3779                .unwrap_or_else(|| {
3780                    panic!("A process used in the graph was not instantiated: {}", to)
3781                })
3782                .clone();
3783
3784            let sink_port = from_node.next_port();
3785            let source_port = to_node.next_port();
3786
3787            (
3788                D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3789                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3790            )
3791        }
3792        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3793            let from_node = processes
3794                .get(from)
3795                .unwrap_or_else(|| {
3796                    panic!("A process used in the graph was not instantiated: {}", from)
3797                })
3798                .clone();
3799            let to_node = clusters
3800                .get(to)
3801                .unwrap_or_else(|| {
3802                    panic!("A cluster used in the graph was not instantiated: {}", to)
3803                })
3804                .clone();
3805
3806            let sink_port = from_node.next_port();
3807            let source_port = to_node.next_port();
3808
3809            (
3810                D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3811                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3812            )
3813        }
3814        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3815            let from_node = clusters
3816                .get(from)
3817                .unwrap_or_else(|| {
3818                    panic!("A cluster used in the graph was not instantiated: {}", from)
3819                })
3820                .clone();
3821            let to_node = processes
3822                .get(to)
3823                .unwrap_or_else(|| {
3824                    panic!("A process used in the graph was not instantiated: {}", to)
3825                })
3826                .clone();
3827
3828            let sink_port = from_node.next_port();
3829            let source_port = to_node.next_port();
3830
3831            (
3832                D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3833                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3834            )
3835        }
3836        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3837            let from_node = clusters
3838                .get(from)
3839                .unwrap_or_else(|| {
3840                    panic!("A cluster used in the graph was not instantiated: {}", from)
3841                })
3842                .clone();
3843            let to_node = clusters
3844                .get(to)
3845                .unwrap_or_else(|| {
3846                    panic!("A cluster used in the graph was not instantiated: {}", to)
3847                })
3848                .clone();
3849
3850            let sink_port = from_node.next_port();
3851            let source_port = to_node.next_port();
3852
3853            (
3854                D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3855                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3856            )
3857        }
3858        (LocationId::Tick(_, _), _) => panic!(),
3859        (_, LocationId::Tick(_, _)) => panic!(),
3860        (LocationId::Atomic(_), _) => panic!(),
3861        (_, LocationId::Atomic(_)) => panic!(),
3862    };
3863    (sink, source, connect_fn)
3864}
3865
3866#[cfg(test)]
3867mod test {
3868    use std::mem::size_of;
3869
3870    use stageleft::{QuotedWithContext, q};
3871
3872    use super::*;
3873
3874    #[test]
3875    #[cfg_attr(
3876        not(feature = "build"),
3877        ignore = "expects inclusion of feature-gated fields"
3878    )]
3879    fn hydro_node_size() {
3880        assert_eq!(size_of::<HydroNode>(), 240);
3881    }
3882
3883    #[test]
3884    #[cfg_attr(
3885        not(feature = "build"),
3886        ignore = "expects inclusion of feature-gated fields"
3887    )]
3888    fn hydro_root_size() {
3889        assert_eq!(size_of::<HydroRoot>(), 136);
3890    }
3891
3892    #[test]
3893    fn test_simplify_q_macro_basic() {
3894        // Test basic non-q! expression
3895        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3896        let result = simplify_q_macro(simple_expr.clone());
3897        assert_eq!(result, simple_expr);
3898    }
3899
3900    #[test]
3901    fn test_simplify_q_macro_actual_stageleft_call() {
3902        // Test a simplified version of what a real stageleft call might look like
3903        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3904        let result = simplify_q_macro(stageleft_call);
3905        // This should be processed by our visitor and simplified to q!(...)
3906        // since we detect the stageleft::runtime_support::fn_* pattern
3907        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3908    }
3909
3910    #[test]
3911    fn test_closure_no_pipe_at_start() {
3912        // Test a closure that does not start with a pipe
3913        let stageleft_call = q!({
3914            let foo = 123;
3915            move |b: usize| b + foo
3916        })
3917        .splice_fn1_ctx(&());
3918        let result = simplify_q_macro(stageleft_call);
3919        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3920    }
3921}