hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32/// Wrapper that displays only the tokens of a parsed expr.
33///
34/// Boxes `syn::Type` which is ~240 bytes.
35#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39    fn from(expr: syn::Expr) -> Self {
40        Self(Box::new(expr))
41    }
42}
43
44impl Deref for DebugExpr {
45    type Target = syn::Expr;
46
47    fn deref(&self) -> &Self::Target {
48        &self.0
49    }
50}
51
52impl ToTokens for DebugExpr {
53    fn to_tokens(&self, tokens: &mut TokenStream) {
54        self.0.to_tokens(tokens);
55    }
56}
57
58impl Debug for DebugExpr {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.0.to_token_stream())
61    }
62}
63
64impl Display for DebugExpr {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        let original = self.0.as_ref().clone();
67        let simplified = simplify_q_macro(original);
68
69        // For now, just use quote formatting without trying to parse as a statement
70        // This avoids the syn::parse_quote! issues entirely
71        write!(f, "q!({})", quote::quote!(#simplified))
72    }
73}
74
75/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77    // Try to parse the token string as a syn::Expr
78    // Use a visitor to simplify q! macro expansions
79    let mut simplifier = QMacroSimplifier::new();
80    simplifier.visit_expr_mut(&mut expr);
81
82    // If we found and simplified a q! macro, return the simplified version
83    if let Some(simplified) = simplifier.simplified_result {
84        simplified
85    } else {
86        expr
87    }
88}
89
90/// AST visitor that simplifies q! macro expansions
91#[derive(Default)]
92pub struct QMacroSimplifier {
93    pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97    pub fn new() -> Self {
98        Self::default()
99    }
100}
101
102impl VisitMut for QMacroSimplifier {
103    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104        // Check if we already found a result to avoid further processing
105        if self.simplified_result.is_some() {
106            return;
107        }
108
109        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110            // Look for calls to stageleft::runtime_support::fn*
111            && self.is_stageleft_runtime_support_call(&path_expr.path)
112            // Try to extract the closure from the arguments
113            && let Some(closure) = self.extract_closure_from_args(&call.args)
114        {
115            self.simplified_result = Some(closure);
116            return;
117        }
118
119        // Continue visiting child expressions using the default implementation
120        // Use the default visitor to avoid infinite recursion
121        syn::visit_mut::visit_expr_mut(self, expr);
122    }
123}
124
125impl QMacroSimplifier {
126    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127        // Check if this is a call to stageleft::runtime_support::fn*
128        if let Some(last_segment) = path.segments.last() {
129            let fn_name = last_segment.ident.to_string();
130            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
131            fn_name.contains("_type_hint")
132                && path.segments.len() > 2
133                && path.segments[0].ident == "stageleft"
134                && path.segments[1].ident == "runtime_support"
135        } else {
136            false
137        }
138    }
139
140    fn extract_closure_from_args(
141        &self,
142        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143    ) -> Option<syn::Expr> {
144        // Look through the arguments for a closure expression
145        for arg in args {
146            if let syn::Expr::Closure(_) = arg {
147                return Some(arg.clone());
148            }
149            // Also check for closures nested in other expressions (like blocks)
150            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151                return Some(closure_expr);
152            }
153        }
154        None
155    }
156
157    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158        let mut visitor = ClosureFinder {
159            found_closure: None,
160            prefer_inner_blocks: true,
161        };
162        visitor.visit_expr(expr);
163        visitor.found_closure
164    }
165}
166
167/// Visitor that finds closures in expressions with special block handling
168struct ClosureFinder {
169    found_closure: Option<syn::Expr>,
170    prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175        // If we already found a closure, don't continue searching
176        if self.found_closure.is_some() {
177            return;
178        }
179
180        match expr {
181            syn::Expr::Closure(_) => {
182                self.found_closure = Some(expr.clone());
183            }
184            syn::Expr::Block(block) if self.prefer_inner_blocks => {
185                // Special handling for blocks - look for inner blocks that contain closures
186                for stmt in &block.block.stmts {
187                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
188                        && let syn::Expr::Block(_) = stmt_expr
189                    {
190                        // Check if this nested block contains a closure
191                        let mut inner_visitor = ClosureFinder {
192                            found_closure: None,
193                            prefer_inner_blocks: false, // Avoid infinite recursion
194                        };
195                        inner_visitor.visit_expr(stmt_expr);
196                        if inner_visitor.found_closure.is_some() {
197                            // Found a closure in an inner block, return that block
198                            self.found_closure = Some(stmt_expr.clone());
199                            return;
200                        }
201                    }
202                }
203
204                // If no inner block with closure found, continue with normal visitation
205                visit::visit_expr(self, expr);
206
207                // If we found a closure, just return the closure itself, not the whole block
208                // unless we're in the special case where we want the containing block
209                if self.found_closure.is_some() {
210                    // The closure was found during visitation, no need to wrap in block
211                }
212            }
213            _ => {
214                // Use default visitor behavior for all other expressions
215                visit::visit_expr(self, expr);
216            }
217        }
218    }
219}
220
221/// Debug displays the type's tokens.
222///
223/// Boxes `syn::Type` which is ~320 bytes.
224#[derive(Clone, PartialEq, Eq, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228    fn from(t: syn::Type) -> Self {
229        Self(Box::new(t))
230    }
231}
232
233impl Deref for DebugType {
234    type Target = syn::Type;
235
236    fn deref(&self) -> &Self::Target {
237        &self.0
238    }
239}
240
241impl ToTokens for DebugType {
242    fn to_tokens(&self, tokens: &mut TokenStream) {
243        self.0.to_tokens(tokens);
244    }
245}
246
247impl Debug for DebugType {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        write!(f, "{}", self.0.to_token_stream())
250    }
251}
252
253pub enum DebugInstantiate {
254    Building,
255    Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259    not(feature = "build"),
260    expect(
261        dead_code,
262        reason = "sink, source unused without `feature = \"build\"`."
263    )
264)]
265pub struct DebugInstantiateFinalized {
266    sink: syn::Expr,
267    source: syn::Expr,
268    connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272    fn from(f: DebugInstantiateFinalized) -> Self {
273        Self::Finalized(Box::new(f))
274    }
275}
276
277impl Debug for DebugInstantiate {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        write!(f, "<network instantiate>")
280    }
281}
282
283impl Hash for DebugInstantiate {
284    fn hash<H: Hasher>(&self, _state: &mut H) {
285        // Do nothing
286    }
287}
288
289impl Clone for DebugInstantiate {
290    fn clone(&self) -> Self {
291        match self {
292            DebugInstantiate::Building => DebugInstantiate::Building,
293            DebugInstantiate::Finalized(_) => {
294                panic!("DebugInstantiate::Finalized should not be cloned")
295            }
296        }
297    }
298}
299
300/// A source in a Hydro graph, where data enters the graph.
301#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303    Stream(DebugExpr),
304    ExternalNetwork(),
305    Iter(DebugExpr),
306    Spin(),
307    ClusterMembers(LocationId),
308}
309
310#[cfg(feature = "build")]
311/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
312/// and simulations.
313///
314/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
315/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
316pub trait DfirBuilder {
317    /// Whether the representation of singletons should include intermediate states.
318    fn singleton_intermediates(&self) -> bool;
319
320    /// Gets the DFIR builder for the given location, creating it if necessary.
321    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
322
323    fn batch(
324        &mut self,
325        in_ident: syn::Ident,
326        in_location: &LocationId,
327        in_kind: &CollectionKind,
328        out_ident: &syn::Ident,
329        out_location: &LocationId,
330        op_meta: &HydroIrOpMetadata,
331    );
332    fn yield_from_tick(
333        &mut self,
334        in_ident: syn::Ident,
335        in_location: &LocationId,
336        in_kind: &CollectionKind,
337        out_ident: &syn::Ident,
338        out_location: &LocationId,
339    );
340
341    fn begin_atomic(
342        &mut self,
343        in_ident: syn::Ident,
344        in_location: &LocationId,
345        in_kind: &CollectionKind,
346        out_ident: &syn::Ident,
347        out_location: &LocationId,
348        op_meta: &HydroIrOpMetadata,
349    );
350    fn end_atomic(
351        &mut self,
352        in_ident: syn::Ident,
353        in_location: &LocationId,
354        in_kind: &CollectionKind,
355        out_ident: &syn::Ident,
356    );
357
358    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
359    fn observe_nondet(
360        &mut self,
361        trusted: bool,
362        location: &LocationId,
363        in_ident: syn::Ident,
364        in_kind: &CollectionKind,
365        out_ident: &syn::Ident,
366        out_kind: &CollectionKind,
367        op_meta: &HydroIrOpMetadata,
368    );
369
370    #[expect(clippy::too_many_arguments, reason = "TODO")]
371    fn create_network(
372        &mut self,
373        from: &LocationId,
374        to: &LocationId,
375        input_ident: syn::Ident,
376        out_ident: &syn::Ident,
377        serialize: &Option<DebugExpr>,
378        sink: syn::Expr,
379        source: syn::Expr,
380        deserialize: &Option<DebugExpr>,
381        tag_id: usize,
382    );
383
384    fn create_external_source(
385        &mut self,
386        on: &LocationId,
387        source_expr: syn::Expr,
388        out_ident: &syn::Ident,
389        deserialize: &Option<DebugExpr>,
390        tag_id: usize,
391    );
392
393    fn create_external_output(
394        &mut self,
395        on: &LocationId,
396        sink_expr: syn::Expr,
397        input_ident: &syn::Ident,
398        serialize: &Option<DebugExpr>,
399        tag_id: usize,
400    );
401}
402
403#[cfg(feature = "build")]
404impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
405    fn singleton_intermediates(&self) -> bool {
406        false
407    }
408
409    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
410        self.entry(location.root().raw_id()).or_default()
411    }
412
413    fn batch(
414        &mut self,
415        in_ident: syn::Ident,
416        in_location: &LocationId,
417        _in_kind: &CollectionKind,
418        out_ident: &syn::Ident,
419        _out_location: &LocationId,
420        _op_meta: &HydroIrOpMetadata,
421    ) {
422        let builder = self.get_dfir_mut(in_location.root());
423        builder.add_dfir(
424            parse_quote! {
425                #out_ident = #in_ident;
426            },
427            None,
428            None,
429        );
430    }
431
432    fn yield_from_tick(
433        &mut self,
434        in_ident: syn::Ident,
435        in_location: &LocationId,
436        _in_kind: &CollectionKind,
437        out_ident: &syn::Ident,
438        _out_location: &LocationId,
439    ) {
440        let builder = self.get_dfir_mut(in_location.root());
441        builder.add_dfir(
442            parse_quote! {
443                #out_ident = #in_ident;
444            },
445            None,
446            None,
447        );
448    }
449
450    fn begin_atomic(
451        &mut self,
452        in_ident: syn::Ident,
453        in_location: &LocationId,
454        _in_kind: &CollectionKind,
455        out_ident: &syn::Ident,
456        _out_location: &LocationId,
457        _op_meta: &HydroIrOpMetadata,
458    ) {
459        let builder = self.get_dfir_mut(in_location.root());
460        builder.add_dfir(
461            parse_quote! {
462                #out_ident = #in_ident;
463            },
464            None,
465            None,
466        );
467    }
468
469    fn end_atomic(
470        &mut self,
471        in_ident: syn::Ident,
472        in_location: &LocationId,
473        _in_kind: &CollectionKind,
474        out_ident: &syn::Ident,
475    ) {
476        let builder = self.get_dfir_mut(in_location.root());
477        builder.add_dfir(
478            parse_quote! {
479                #out_ident = #in_ident;
480            },
481            None,
482            None,
483        );
484    }
485
486    fn observe_nondet(
487        &mut self,
488        _trusted: bool,
489        location: &LocationId,
490        in_ident: syn::Ident,
491        _in_kind: &CollectionKind,
492        out_ident: &syn::Ident,
493        _out_kind: &CollectionKind,
494        _op_meta: &HydroIrOpMetadata,
495    ) {
496        let builder = self.get_dfir_mut(location);
497        builder.add_dfir(
498            parse_quote! {
499                #out_ident = #in_ident;
500            },
501            None,
502            None,
503        );
504    }
505
506    fn create_network(
507        &mut self,
508        from: &LocationId,
509        to: &LocationId,
510        input_ident: syn::Ident,
511        out_ident: &syn::Ident,
512        serialize: &Option<DebugExpr>,
513        sink: syn::Expr,
514        source: syn::Expr,
515        deserialize: &Option<DebugExpr>,
516        tag_id: usize,
517    ) {
518        let sender_builder = self.get_dfir_mut(from);
519        if let Some(serialize_pipeline) = serialize {
520            sender_builder.add_dfir(
521                parse_quote! {
522                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
523                },
524                None,
525                // operator tag separates send and receive, which otherwise have the same next_stmt_id
526                Some(&format!("send{}", tag_id)),
527            );
528        } else {
529            sender_builder.add_dfir(
530                parse_quote! {
531                    #input_ident -> dest_sink(#sink);
532                },
533                None,
534                Some(&format!("send{}", tag_id)),
535            );
536        }
537
538        let receiver_builder = self.get_dfir_mut(to);
539        if let Some(deserialize_pipeline) = deserialize {
540            receiver_builder.add_dfir(
541                parse_quote! {
542                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
543                },
544                None,
545                Some(&format!("recv{}", tag_id)),
546            );
547        } else {
548            receiver_builder.add_dfir(
549                parse_quote! {
550                    #out_ident = source_stream(#source);
551                },
552                None,
553                Some(&format!("recv{}", tag_id)),
554            );
555        }
556    }
557
558    fn create_external_source(
559        &mut self,
560        on: &LocationId,
561        source_expr: syn::Expr,
562        out_ident: &syn::Ident,
563        deserialize: &Option<DebugExpr>,
564        tag_id: usize,
565    ) {
566        let receiver_builder = self.get_dfir_mut(on);
567        if let Some(deserialize_pipeline) = deserialize {
568            receiver_builder.add_dfir(
569                parse_quote! {
570                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
571                },
572                None,
573                Some(&format!("recv{}", tag_id)),
574            );
575        } else {
576            receiver_builder.add_dfir(
577                parse_quote! {
578                    #out_ident = source_stream(#source_expr);
579                },
580                None,
581                Some(&format!("recv{}", tag_id)),
582            );
583        }
584    }
585
586    fn create_external_output(
587        &mut self,
588        on: &LocationId,
589        sink_expr: syn::Expr,
590        input_ident: &syn::Ident,
591        serialize: &Option<DebugExpr>,
592        tag_id: usize,
593    ) {
594        let sender_builder = self.get_dfir_mut(on);
595        if let Some(serialize_fn) = serialize {
596            sender_builder.add_dfir(
597                parse_quote! {
598                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
599                },
600                None,
601                // operator tag separates send and receive, which otherwise have the same next_stmt_id
602                Some(&format!("send{}", tag_id)),
603            );
604        } else {
605            sender_builder.add_dfir(
606                parse_quote! {
607                    #input_ident -> dest_sink(#sink_expr);
608                },
609                None,
610                Some(&format!("send{}", tag_id)),
611            );
612        }
613    }
614}
615
616#[cfg(feature = "build")]
617pub enum BuildersOrCallback<'a, L, N>
618where
619    L: FnMut(&mut HydroRoot, &mut usize),
620    N: FnMut(&mut HydroNode, &mut usize),
621{
622    Builders(&'a mut dyn DfirBuilder),
623    Callback(L, N),
624}
625
626/// An root in a Hydro graph, which is an pipeline that doesn't emit
627/// any downstream values. Traversals over the dataflow graph and
628/// generating DFIR IR start from roots.
629#[derive(Debug, Hash)]
630pub enum HydroRoot {
631    ForEach {
632        f: DebugExpr,
633        input: Box<HydroNode>,
634        op_metadata: HydroIrOpMetadata,
635    },
636    SendExternal {
637        to_external_id: usize,
638        to_key: usize,
639        to_many: bool,
640        unpaired: bool,
641        serialize_fn: Option<DebugExpr>,
642        instantiate_fn: DebugInstantiate,
643        input: Box<HydroNode>,
644        op_metadata: HydroIrOpMetadata,
645    },
646    DestSink {
647        sink: DebugExpr,
648        input: Box<HydroNode>,
649        op_metadata: HydroIrOpMetadata,
650    },
651    CycleSink {
652        ident: syn::Ident,
653        input: Box<HydroNode>,
654        op_metadata: HydroIrOpMetadata,
655    },
656}
657
658impl HydroRoot {
659    #[cfg(feature = "build")]
660    pub fn compile_network<'a, D>(
661        &mut self,
662        extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
663        seen_tees: &mut SeenTees,
664        processes: &HashMap<usize, D::Process>,
665        clusters: &HashMap<usize, D::Cluster>,
666        externals: &HashMap<usize, D::External>,
667    ) where
668        D: Deploy<'a>,
669    {
670        let refcell_extra_stmts = RefCell::new(extra_stmts);
671        self.transform_bottom_up(
672            &mut |l| {
673                if let HydroRoot::SendExternal {
674                    input,
675                    to_external_id,
676                    to_key,
677                    to_many,
678                    unpaired,
679                    instantiate_fn,
680                    ..
681                } = l
682                {
683                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
684                        DebugInstantiate::Building => {
685                            let to_node = externals
686                                .get(to_external_id)
687                                .unwrap_or_else(|| {
688                                    panic!("A external used in the graph was not instantiated: {}", to_external_id)
689                                })
690                                .clone();
691
692                            match input.metadata().location_kind.root() {
693                                LocationId::Process(process_id) => {
694                                    if *to_many {
695                                        (
696                                            (
697                                                D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
698                                                parse_quote!(DUMMY),
699                                            ),
700                                            Box::new(|| {}) as Box<dyn FnOnce()>,
701                                        )
702                                    } else {
703                                        let from_node = processes
704                                            .get(process_id)
705                                            .unwrap_or_else(|| {
706                                                panic!("A process used in the graph was not instantiated: {}", process_id)
707                                            })
708                                            .clone();
709
710                                        let sink_port = D::allocate_process_port(&from_node);
711                                        let source_port: <D as Deploy<'a>>::Port = D::allocate_external_port(&to_node);
712
713                                        if *unpaired {
714                                            use stageleft::quote_type;
715                                            use tokio_util::codec::LengthDelimitedCodec;
716
717                                            to_node.register(*to_key, source_port.clone());
718
719                                            let _ = D::e2o_source(
720                                                refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
721                                                &to_node, &source_port,
722                                                &from_node, &sink_port,
723                                                &quote_type::<LengthDelimitedCodec>(),
724                                                format!("{}_{}", *to_external_id, *to_key)
725                                            );
726                                        }
727
728                                        (
729                                            (
730                                                D::o2e_sink(
731                                                    &from_node,
732                                                    &sink_port,
733                                                    &to_node,
734                                                    &source_port,
735                                                    format!("{}_{}", *to_external_id, *to_key)
736                                                ),
737                                                parse_quote!(DUMMY),
738                                            ),
739                                            if *unpaired {
740                                                D::e2o_connect(
741                                                    &to_node,
742                                                    &source_port,
743                                                    &from_node,
744                                                    &sink_port,
745                                                    *to_many,
746                                                    NetworkHint::Auto,
747                                                )
748                                            } else {
749                                                Box::new(|| {}) as Box<dyn FnOnce()>
750                                            },
751                                        )
752                                    }
753                                }
754                                LocationId::Cluster(_) => todo!(),
755                                _ => panic!()
756                            }
757                        },
758
759                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
760                    };
761
762                    *instantiate_fn = DebugInstantiateFinalized {
763                        sink: sink_expr,
764                        source: source_expr,
765                        connect_fn: Some(connect_fn),
766                    }
767                    .into();
768                }
769            },
770            &mut |n| {
771                if let HydroNode::Network {
772                    input,
773                    instantiate_fn,
774                    metadata,
775                    ..
776                } = n
777                {
778                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
779                        DebugInstantiate::Building => instantiate_network::<D>(
780                            input.metadata().location_kind.root(),
781                            metadata.location_kind.root(),
782                            processes,
783                            clusters,
784                        ),
785
786                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
787                    };
788
789                    *instantiate_fn = DebugInstantiateFinalized {
790                        sink: sink_expr,
791                        source: source_expr,
792                        connect_fn: Some(connect_fn),
793                    }
794                    .into();
795                } else if let HydroNode::ExternalInput {
796                    from_external_id,
797                    from_key,
798                    from_many,
799                    codec_type,
800                    port_hint,
801                    instantiate_fn,
802                    metadata,
803                    ..
804                } = n
805                {
806                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
807                        DebugInstantiate::Building => {
808                            let from_node = externals
809                                .get(from_external_id)
810                                .unwrap_or_else(|| {
811                                    panic!(
812                                        "A external used in the graph was not instantiated: {}",
813                                        from_external_id
814                                    )
815                                })
816                                .clone();
817
818                            match metadata.location_kind.root() {
819                                LocationId::Process(process_id) => {
820                                    let to_node = processes
821                                        .get(process_id)
822                                        .unwrap_or_else(|| {
823                                            panic!("A process used in the graph was not instantiated: {}", process_id)
824                                        })
825                                        .clone();
826
827                                    let sink_port = D::allocate_external_port(&from_node);
828                                    let source_port = D::allocate_process_port(&to_node);
829
830                                    from_node.register(*from_key, sink_port.clone());
831
832                                    (
833                                        (
834                                            parse_quote!(DUMMY),
835                                            if *from_many {
836                                                D::e2o_many_source(
837                                                    refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
838                                                    &to_node, &source_port,
839                                                    codec_type.0.as_ref(),
840                                                    format!("{}_{}", *from_external_id, *from_key)
841                                                )
842                                            } else {
843                                                D::e2o_source(
844                                                    refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
845                                                    &from_node, &sink_port,
846                                                    &to_node, &source_port,
847                                                    codec_type.0.as_ref(),
848                                                    format!("{}_{}", *from_external_id, *from_key)
849                                                )
850                                            },
851                                        ),
852                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
853                                    )
854                                }
855                                LocationId::Cluster(_) => todo!(),
856                                _ => panic!()
857                            }
858                        },
859
860                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
861                    };
862
863                    *instantiate_fn = DebugInstantiateFinalized {
864                        sink: sink_expr,
865                        source: source_expr,
866                        connect_fn: Some(connect_fn),
867                    }
868                    .into();
869                }
870            },
871            seen_tees,
872            false,
873        );
874    }
875
876    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
877        self.transform_bottom_up(
878            &mut |l| {
879                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
880                    match instantiate_fn {
881                        DebugInstantiate::Building => panic!("network not built"),
882
883                        DebugInstantiate::Finalized(finalized) => {
884                            (finalized.connect_fn.take().unwrap())();
885                        }
886                    }
887                }
888            },
889            &mut |n| {
890                if let HydroNode::Network { instantiate_fn, .. }
891                | HydroNode::ExternalInput { instantiate_fn, .. } = n
892                {
893                    match instantiate_fn {
894                        DebugInstantiate::Building => panic!("network not built"),
895
896                        DebugInstantiate::Finalized(finalized) => {
897                            (finalized.connect_fn.take().unwrap())();
898                        }
899                    }
900                }
901            },
902            seen_tees,
903            false,
904        );
905    }
906
907    pub fn transform_bottom_up(
908        &mut self,
909        transform_root: &mut impl FnMut(&mut HydroRoot),
910        transform_node: &mut impl FnMut(&mut HydroNode),
911        seen_tees: &mut SeenTees,
912        check_well_formed: bool,
913    ) {
914        self.transform_children(
915            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
916            seen_tees,
917        );
918
919        transform_root(self);
920    }
921
922    pub fn transform_children(
923        &mut self,
924        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
925        seen_tees: &mut SeenTees,
926    ) {
927        match self {
928            HydroRoot::ForEach { input, .. }
929            | HydroRoot::SendExternal { input, .. }
930            | HydroRoot::DestSink { input, .. }
931            | HydroRoot::CycleSink { input, .. } => {
932                transform(input, seen_tees);
933            }
934        }
935    }
936
937    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
938        match self {
939            HydroRoot::ForEach {
940                f,
941                input,
942                op_metadata,
943            } => HydroRoot::ForEach {
944                f: f.clone(),
945                input: Box::new(input.deep_clone(seen_tees)),
946                op_metadata: op_metadata.clone(),
947            },
948            HydroRoot::SendExternal {
949                to_external_id,
950                to_key,
951                to_many,
952                unpaired,
953                serialize_fn,
954                instantiate_fn,
955                input,
956                op_metadata,
957            } => HydroRoot::SendExternal {
958                to_external_id: *to_external_id,
959                to_key: *to_key,
960                to_many: *to_many,
961                unpaired: *unpaired,
962                serialize_fn: serialize_fn.clone(),
963                instantiate_fn: instantiate_fn.clone(),
964                input: Box::new(input.deep_clone(seen_tees)),
965                op_metadata: op_metadata.clone(),
966            },
967            HydroRoot::DestSink {
968                sink,
969                input,
970                op_metadata,
971            } => HydroRoot::DestSink {
972                sink: sink.clone(),
973                input: Box::new(input.deep_clone(seen_tees)),
974                op_metadata: op_metadata.clone(),
975            },
976            HydroRoot::CycleSink {
977                ident,
978                input,
979                op_metadata,
980            } => HydroRoot::CycleSink {
981                ident: ident.clone(),
982                input: Box::new(input.deep_clone(seen_tees)),
983                op_metadata: op_metadata.clone(),
984            },
985        }
986    }
987
988    #[cfg(feature = "build")]
989    pub fn emit<'a, D: Deploy<'a>>(
990        &mut self,
991        graph_builders: &mut dyn DfirBuilder,
992        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
993        next_stmt_id: &mut usize,
994    ) {
995        self.emit_core::<D>(
996            &mut BuildersOrCallback::Builders::<
997                fn(&mut HydroRoot, &mut usize),
998                fn(&mut HydroNode, &mut usize),
999            >(graph_builders),
1000            built_tees,
1001            next_stmt_id,
1002        );
1003    }
1004
1005    #[cfg(feature = "build")]
1006    pub fn emit_core<'a, D: Deploy<'a>>(
1007        &mut self,
1008        builders_or_callback: &mut BuildersOrCallback<
1009            impl FnMut(&mut HydroRoot, &mut usize),
1010            impl FnMut(&mut HydroNode, &mut usize),
1011        >,
1012        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1013        next_stmt_id: &mut usize,
1014    ) {
1015        match self {
1016            HydroRoot::ForEach { f, input, .. } => {
1017                let input_ident =
1018                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1019
1020                match builders_or_callback {
1021                    BuildersOrCallback::Builders(graph_builders) => {
1022                        graph_builders
1023                            .get_dfir_mut(&input.metadata().location_kind)
1024                            .add_dfir(
1025                                parse_quote! {
1026                                    #input_ident -> for_each(#f);
1027                                },
1028                                None,
1029                                Some(&next_stmt_id.to_string()),
1030                            );
1031                    }
1032                    BuildersOrCallback::Callback(leaf_callback, _) => {
1033                        leaf_callback(self, next_stmt_id);
1034                    }
1035                }
1036
1037                *next_stmt_id += 1;
1038            }
1039
1040            HydroRoot::SendExternal {
1041                serialize_fn,
1042                instantiate_fn,
1043                input,
1044                ..
1045            } => {
1046                let input_ident =
1047                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1048
1049                match builders_or_callback {
1050                    BuildersOrCallback::Builders(graph_builders) => {
1051                        let (sink_expr, _) = match instantiate_fn {
1052                            DebugInstantiate::Building => (
1053                                syn::parse_quote!(DUMMY_SINK),
1054                                syn::parse_quote!(DUMMY_SOURCE),
1055                            ),
1056
1057                            DebugInstantiate::Finalized(finalized) => {
1058                                (finalized.sink.clone(), finalized.source.clone())
1059                            }
1060                        };
1061
1062                        graph_builders.create_external_output(
1063                            &input.metadata().location_kind,
1064                            sink_expr,
1065                            &input_ident,
1066                            serialize_fn,
1067                            *next_stmt_id,
1068                        );
1069                    }
1070                    BuildersOrCallback::Callback(leaf_callback, _) => {
1071                        leaf_callback(self, next_stmt_id);
1072                    }
1073                }
1074
1075                *next_stmt_id += 1;
1076            }
1077
1078            HydroRoot::DestSink { sink, input, .. } => {
1079                let input_ident =
1080                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1081
1082                match builders_or_callback {
1083                    BuildersOrCallback::Builders(graph_builders) => {
1084                        graph_builders
1085                            .get_dfir_mut(&input.metadata().location_kind)
1086                            .add_dfir(
1087                                parse_quote! {
1088                                    #input_ident -> dest_sink(#sink);
1089                                },
1090                                None,
1091                                Some(&next_stmt_id.to_string()),
1092                            );
1093                    }
1094                    BuildersOrCallback::Callback(leaf_callback, _) => {
1095                        leaf_callback(self, next_stmt_id);
1096                    }
1097                }
1098
1099                *next_stmt_id += 1;
1100            }
1101
1102            HydroRoot::CycleSink { ident, input, .. } => {
1103                let input_ident =
1104                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1105
1106                match builders_or_callback {
1107                    BuildersOrCallback::Builders(graph_builders) => {
1108                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1109                            CollectionKind::KeyedSingleton {
1110                                key_type,
1111                                value_type,
1112                                ..
1113                            }
1114                            | CollectionKind::KeyedStream {
1115                                key_type,
1116                                value_type,
1117                                ..
1118                            } => {
1119                                parse_quote!((#key_type, #value_type))
1120                            }
1121                            CollectionKind::Stream { element_type, .. }
1122                            | CollectionKind::Singleton { element_type, .. }
1123                            | CollectionKind::Optional { element_type, .. } => {
1124                                parse_quote!(#element_type)
1125                            }
1126                        };
1127
1128                        graph_builders
1129                            .get_dfir_mut(&input.metadata().location_kind)
1130                            .add_dfir(
1131                                parse_quote! {
1132                                    #ident = #input_ident -> identity::<#elem_type>();
1133                                },
1134                                None,
1135                                None,
1136                            );
1137                    }
1138                    // No ID, no callback
1139                    BuildersOrCallback::Callback(_, _) => {}
1140                }
1141            }
1142        }
1143    }
1144
1145    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1146        match self {
1147            HydroRoot::ForEach { op_metadata, .. }
1148            | HydroRoot::SendExternal { op_metadata, .. }
1149            | HydroRoot::DestSink { op_metadata, .. }
1150            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1151        }
1152    }
1153
1154    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1155        match self {
1156            HydroRoot::ForEach { op_metadata, .. }
1157            | HydroRoot::SendExternal { op_metadata, .. }
1158            | HydroRoot::DestSink { op_metadata, .. }
1159            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1160        }
1161    }
1162
1163    pub fn input(&self) -> &HydroNode {
1164        match self {
1165            HydroRoot::ForEach { input, .. }
1166            | HydroRoot::SendExternal { input, .. }
1167            | HydroRoot::DestSink { input, .. }
1168            | HydroRoot::CycleSink { input, .. } => input,
1169        }
1170    }
1171
1172    pub fn input_metadata(&self) -> &HydroIrMetadata {
1173        self.input().metadata()
1174    }
1175
1176    pub fn print_root(&self) -> String {
1177        match self {
1178            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1179            HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1180            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1181            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1182        }
1183    }
1184
1185    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1186        match self {
1187            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1188                transform(f);
1189            }
1190            HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1191        }
1192    }
1193}
1194
1195#[cfg(feature = "build")]
1196pub fn emit<'a, D: Deploy<'a>>(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1197    let mut builders = BTreeMap::new();
1198    let mut built_tees = HashMap::new();
1199    let mut next_stmt_id = 0;
1200    for leaf in ir {
1201        leaf.emit::<D>(&mut builders, &mut built_tees, &mut next_stmt_id);
1202    }
1203    builders
1204}
1205
1206#[cfg(feature = "build")]
1207pub fn traverse_dfir<'a, D: Deploy<'a>>(
1208    ir: &mut [HydroRoot],
1209    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1210    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1211) {
1212    let mut seen_tees = HashMap::new();
1213    let mut next_stmt_id = 0;
1214    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1215    ir.iter_mut().for_each(|leaf| {
1216        leaf.emit_core::<D>(&mut callback, &mut seen_tees, &mut next_stmt_id);
1217    });
1218}
1219
1220pub fn transform_bottom_up(
1221    ir: &mut [HydroRoot],
1222    transform_root: &mut impl FnMut(&mut HydroRoot),
1223    transform_node: &mut impl FnMut(&mut HydroNode),
1224    check_well_formed: bool,
1225) {
1226    let mut seen_tees = HashMap::new();
1227    ir.iter_mut().for_each(|leaf| {
1228        leaf.transform_bottom_up(
1229            transform_root,
1230            transform_node,
1231            &mut seen_tees,
1232            check_well_formed,
1233        );
1234    });
1235}
1236
1237pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1238    let mut seen_tees = HashMap::new();
1239    ir.iter()
1240        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1241        .collect()
1242}
1243
1244type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1245thread_local! {
1246    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1247}
1248
1249pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1250    PRINTED_TEES.with(|printed_tees| {
1251        let mut printed_tees_mut = printed_tees.borrow_mut();
1252        *printed_tees_mut = Some((0, HashMap::new()));
1253        drop(printed_tees_mut);
1254
1255        let ret = f();
1256
1257        let mut printed_tees_mut = printed_tees.borrow_mut();
1258        *printed_tees_mut = None;
1259
1260        ret
1261    })
1262}
1263
1264pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1265
1266impl TeeNode {
1267    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1268        Rc::as_ptr(&self.0)
1269    }
1270}
1271
1272impl Debug for TeeNode {
1273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1274        PRINTED_TEES.with(|printed_tees| {
1275            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1276            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1277
1278            if let Some(printed_tees_mut) = printed_tees_mut {
1279                if let Some(existing) = printed_tees_mut
1280                    .1
1281                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1282                {
1283                    write!(f, "<tee {}>", existing)
1284                } else {
1285                    let next_id = printed_tees_mut.0;
1286                    printed_tees_mut.0 += 1;
1287                    printed_tees_mut
1288                        .1
1289                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1290                    drop(printed_tees_mut_borrow);
1291                    write!(f, "<tee {}>: ", next_id)?;
1292                    Debug::fmt(&self.0.borrow(), f)
1293                }
1294            } else {
1295                drop(printed_tees_mut_borrow);
1296                write!(f, "<tee>: ")?;
1297                Debug::fmt(&self.0.borrow(), f)
1298            }
1299        })
1300    }
1301}
1302
1303impl Hash for TeeNode {
1304    fn hash<H: Hasher>(&self, state: &mut H) {
1305        self.0.borrow_mut().hash(state);
1306    }
1307}
1308
1309#[derive(Clone, PartialEq, Eq, Debug)]
1310pub enum BoundKind {
1311    Unbounded,
1312    Bounded,
1313}
1314
1315#[derive(Clone, PartialEq, Eq, Debug)]
1316pub enum StreamOrder {
1317    NoOrder,
1318    TotalOrder,
1319}
1320
1321#[derive(Clone, PartialEq, Eq, Debug)]
1322pub enum StreamRetry {
1323    AtLeastOnce,
1324    ExactlyOnce,
1325}
1326
1327#[derive(Clone, PartialEq, Eq, Debug)]
1328pub enum KeyedSingletonBoundKind {
1329    Unbounded,
1330    BoundedValue,
1331    Bounded,
1332}
1333
1334#[derive(Clone, PartialEq, Eq, Debug)]
1335pub enum CollectionKind {
1336    Stream {
1337        bound: BoundKind,
1338        order: StreamOrder,
1339        retry: StreamRetry,
1340        element_type: DebugType,
1341    },
1342    Singleton {
1343        bound: BoundKind,
1344        element_type: DebugType,
1345    },
1346    Optional {
1347        bound: BoundKind,
1348        element_type: DebugType,
1349    },
1350    KeyedStream {
1351        bound: BoundKind,
1352        value_order: StreamOrder,
1353        value_retry: StreamRetry,
1354        key_type: DebugType,
1355        value_type: DebugType,
1356    },
1357    KeyedSingleton {
1358        bound: KeyedSingletonBoundKind,
1359        key_type: DebugType,
1360        value_type: DebugType,
1361    },
1362}
1363
1364#[derive(Clone)]
1365pub struct HydroIrMetadata {
1366    pub location_kind: LocationId,
1367    pub collection_kind: CollectionKind,
1368    pub cardinality: Option<usize>,
1369    pub tag: Option<String>,
1370    pub op: HydroIrOpMetadata,
1371}
1372
1373// HydroIrMetadata shouldn't be used to hash or compare
1374impl Hash for HydroIrMetadata {
1375    fn hash<H: Hasher>(&self, _: &mut H) {}
1376}
1377
1378impl PartialEq for HydroIrMetadata {
1379    fn eq(&self, _: &Self) -> bool {
1380        true
1381    }
1382}
1383
1384impl Eq for HydroIrMetadata {}
1385
1386impl Debug for HydroIrMetadata {
1387    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1388        f.debug_struct("HydroIrMetadata")
1389            .field("location_kind", &self.location_kind)
1390            .field("collection_kind", &self.collection_kind)
1391            .finish()
1392    }
1393}
1394
1395/// Metadata that is specific to the operator itself, rather than its outputs.
1396/// This is available on _both_ inner nodes and roots.
1397#[derive(Clone)]
1398pub struct HydroIrOpMetadata {
1399    pub backtrace: Backtrace,
1400    pub cpu_usage: Option<f64>,
1401    pub network_recv_cpu_usage: Option<f64>,
1402    pub id: Option<usize>,
1403}
1404
1405impl HydroIrOpMetadata {
1406    #[expect(
1407        clippy::new_without_default,
1408        reason = "explicit calls to new ensure correct backtrace bounds"
1409    )]
1410    pub fn new() -> HydroIrOpMetadata {
1411        Self::new_with_skip(1)
1412    }
1413
1414    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1415        HydroIrOpMetadata {
1416            backtrace: Backtrace::get_backtrace(2 + skip_count),
1417            cpu_usage: None,
1418            network_recv_cpu_usage: None,
1419            id: None,
1420        }
1421    }
1422}
1423
1424impl Debug for HydroIrOpMetadata {
1425    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1426        f.debug_struct("HydroIrOpMetadata").finish()
1427    }
1428}
1429
1430impl Hash for HydroIrOpMetadata {
1431    fn hash<H: Hasher>(&self, _: &mut H) {}
1432}
1433
1434/// An intermediate node in a Hydro graph, which consumes data
1435/// from upstream nodes and emits data to downstream nodes.
1436#[derive(Debug, Hash)]
1437pub enum HydroNode {
1438    Placeholder,
1439
1440    /// Manually "casts" between two different collection kinds.
1441    ///
1442    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1443    /// correctness checks. In particular, the user must ensure that every possible
1444    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1445    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1446    /// collection. This ensures that the simulator does not miss any possible outputs.
1447    Cast {
1448        inner: Box<HydroNode>,
1449        metadata: HydroIrMetadata,
1450    },
1451
1452    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1453    /// interpretation of the input stream.
1454    ///
1455    /// In production, this simply passes through the input, but in simulation, this operator
1456    /// explicitly selects a randomized interpretation.
1457    ObserveNonDet {
1458        inner: Box<HydroNode>,
1459        trusted: bool, // if true, we do not need to simulate non-determinism
1460        metadata: HydroIrMetadata,
1461    },
1462
1463    Source {
1464        source: HydroSource,
1465        metadata: HydroIrMetadata,
1466    },
1467
1468    SingletonSource {
1469        value: DebugExpr,
1470        metadata: HydroIrMetadata,
1471    },
1472
1473    CycleSource {
1474        ident: syn::Ident,
1475        metadata: HydroIrMetadata,
1476    },
1477
1478    Tee {
1479        inner: TeeNode,
1480        metadata: HydroIrMetadata,
1481    },
1482
1483    BeginAtomic {
1484        inner: Box<HydroNode>,
1485        metadata: HydroIrMetadata,
1486    },
1487
1488    EndAtomic {
1489        inner: Box<HydroNode>,
1490        metadata: HydroIrMetadata,
1491    },
1492
1493    Batch {
1494        inner: Box<HydroNode>,
1495        metadata: HydroIrMetadata,
1496    },
1497
1498    YieldConcat {
1499        inner: Box<HydroNode>,
1500        metadata: HydroIrMetadata,
1501    },
1502
1503    Chain {
1504        first: Box<HydroNode>,
1505        second: Box<HydroNode>,
1506        metadata: HydroIrMetadata,
1507    },
1508
1509    ChainFirst {
1510        first: Box<HydroNode>,
1511        second: Box<HydroNode>,
1512        metadata: HydroIrMetadata,
1513    },
1514
1515    CrossProduct {
1516        left: Box<HydroNode>,
1517        right: Box<HydroNode>,
1518        metadata: HydroIrMetadata,
1519    },
1520
1521    CrossSingleton {
1522        left: Box<HydroNode>,
1523        right: Box<HydroNode>,
1524        metadata: HydroIrMetadata,
1525    },
1526
1527    Join {
1528        left: Box<HydroNode>,
1529        right: Box<HydroNode>,
1530        metadata: HydroIrMetadata,
1531    },
1532
1533    Difference {
1534        pos: Box<HydroNode>,
1535        neg: Box<HydroNode>,
1536        metadata: HydroIrMetadata,
1537    },
1538
1539    AntiJoin {
1540        pos: Box<HydroNode>,
1541        neg: Box<HydroNode>,
1542        metadata: HydroIrMetadata,
1543    },
1544
1545    ResolveFutures {
1546        input: Box<HydroNode>,
1547        metadata: HydroIrMetadata,
1548    },
1549    ResolveFuturesOrdered {
1550        input: Box<HydroNode>,
1551        metadata: HydroIrMetadata,
1552    },
1553
1554    Map {
1555        f: DebugExpr,
1556        input: Box<HydroNode>,
1557        metadata: HydroIrMetadata,
1558    },
1559    FlatMap {
1560        f: DebugExpr,
1561        input: Box<HydroNode>,
1562        metadata: HydroIrMetadata,
1563    },
1564    Filter {
1565        f: DebugExpr,
1566        input: Box<HydroNode>,
1567        metadata: HydroIrMetadata,
1568    },
1569    FilterMap {
1570        f: DebugExpr,
1571        input: Box<HydroNode>,
1572        metadata: HydroIrMetadata,
1573    },
1574
1575    DeferTick {
1576        input: Box<HydroNode>,
1577        metadata: HydroIrMetadata,
1578    },
1579    Enumerate {
1580        input: Box<HydroNode>,
1581        metadata: HydroIrMetadata,
1582    },
1583    Inspect {
1584        f: DebugExpr,
1585        input: Box<HydroNode>,
1586        metadata: HydroIrMetadata,
1587    },
1588
1589    Unique {
1590        input: Box<HydroNode>,
1591        metadata: HydroIrMetadata,
1592    },
1593
1594    Sort {
1595        input: Box<HydroNode>,
1596        metadata: HydroIrMetadata,
1597    },
1598    Fold {
1599        init: DebugExpr,
1600        acc: DebugExpr,
1601        input: Box<HydroNode>,
1602        metadata: HydroIrMetadata,
1603    },
1604
1605    Scan {
1606        init: DebugExpr,
1607        acc: DebugExpr,
1608        input: Box<HydroNode>,
1609        metadata: HydroIrMetadata,
1610    },
1611    FoldKeyed {
1612        init: DebugExpr,
1613        acc: DebugExpr,
1614        input: Box<HydroNode>,
1615        metadata: HydroIrMetadata,
1616    },
1617
1618    Reduce {
1619        f: DebugExpr,
1620        input: Box<HydroNode>,
1621        metadata: HydroIrMetadata,
1622    },
1623    ReduceKeyed {
1624        f: DebugExpr,
1625        input: Box<HydroNode>,
1626        metadata: HydroIrMetadata,
1627    },
1628    ReduceKeyedWatermark {
1629        f: DebugExpr,
1630        input: Box<HydroNode>,
1631        watermark: Box<HydroNode>,
1632        metadata: HydroIrMetadata,
1633    },
1634
1635    Network {
1636        serialize_fn: Option<DebugExpr>,
1637        instantiate_fn: DebugInstantiate,
1638        deserialize_fn: Option<DebugExpr>,
1639        input: Box<HydroNode>,
1640        metadata: HydroIrMetadata,
1641    },
1642
1643    ExternalInput {
1644        from_external_id: usize,
1645        from_key: usize,
1646        from_many: bool,
1647        codec_type: DebugType,
1648        port_hint: NetworkHint,
1649        instantiate_fn: DebugInstantiate,
1650        deserialize_fn: Option<DebugExpr>,
1651        metadata: HydroIrMetadata,
1652    },
1653
1654    Counter {
1655        tag: String,
1656        duration: DebugExpr,
1657        prefix: String,
1658        input: Box<HydroNode>,
1659        metadata: HydroIrMetadata,
1660    },
1661}
1662
1663pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1664pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1665
1666impl HydroNode {
1667    pub fn transform_bottom_up(
1668        &mut self,
1669        transform: &mut impl FnMut(&mut HydroNode),
1670        seen_tees: &mut SeenTees,
1671        check_well_formed: bool,
1672    ) {
1673        self.transform_children(
1674            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1675            seen_tees,
1676        );
1677
1678        transform(self);
1679
1680        let self_location = self.metadata().location_kind.root();
1681
1682        if check_well_formed {
1683            match &*self {
1684                HydroNode::Network { .. } => {}
1685                _ => {
1686                    self.input_metadata().iter().for_each(|i| {
1687                        if i.location_kind.root() != self_location {
1688                            panic!(
1689                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1690                                i,
1691                                i.location_kind.root(),
1692                                self,
1693                                self_location
1694                            )
1695                        }
1696                    });
1697                }
1698            }
1699        }
1700    }
1701
1702    #[inline(always)]
1703    pub fn transform_children(
1704        &mut self,
1705        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1706        seen_tees: &mut SeenTees,
1707    ) {
1708        match self {
1709            HydroNode::Placeholder => {
1710                panic!();
1711            }
1712
1713            HydroNode::Source { .. }
1714            | HydroNode::SingletonSource { .. }
1715            | HydroNode::CycleSource { .. }
1716            | HydroNode::ExternalInput { .. } => {}
1717
1718            HydroNode::Tee { inner, .. } => {
1719                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1720                    *inner = TeeNode(transformed.clone());
1721                } else {
1722                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1723                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1724                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1725                    transform(&mut orig, seen_tees);
1726                    *transformed_cell.borrow_mut() = orig;
1727                    *inner = TeeNode(transformed_cell);
1728                }
1729            }
1730
1731            HydroNode::Cast { inner, .. }
1732            | HydroNode::ObserveNonDet { inner, .. }
1733            | HydroNode::BeginAtomic { inner, .. }
1734            | HydroNode::EndAtomic { inner, .. }
1735            | HydroNode::Batch { inner, .. }
1736            | HydroNode::YieldConcat { inner, .. } => {
1737                transform(inner.as_mut(), seen_tees);
1738            }
1739
1740            HydroNode::Chain { first, second, .. } => {
1741                transform(first.as_mut(), seen_tees);
1742                transform(second.as_mut(), seen_tees);
1743            }
1744
1745            HydroNode::ChainFirst { first, second, .. } => {
1746                transform(first.as_mut(), seen_tees);
1747                transform(second.as_mut(), seen_tees);
1748            }
1749
1750            HydroNode::CrossSingleton { left, right, .. }
1751            | HydroNode::CrossProduct { left, right, .. }
1752            | HydroNode::Join { left, right, .. } => {
1753                transform(left.as_mut(), seen_tees);
1754                transform(right.as_mut(), seen_tees);
1755            }
1756
1757            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1758                transform(pos.as_mut(), seen_tees);
1759                transform(neg.as_mut(), seen_tees);
1760            }
1761
1762            HydroNode::ReduceKeyedWatermark {
1763                input, watermark, ..
1764            } => {
1765                transform(input.as_mut(), seen_tees);
1766                transform(watermark.as_mut(), seen_tees);
1767            }
1768
1769            HydroNode::Map { input, .. }
1770            | HydroNode::ResolveFutures { input, .. }
1771            | HydroNode::ResolveFuturesOrdered { input, .. }
1772            | HydroNode::FlatMap { input, .. }
1773            | HydroNode::Filter { input, .. }
1774            | HydroNode::FilterMap { input, .. }
1775            | HydroNode::Sort { input, .. }
1776            | HydroNode::DeferTick { input, .. }
1777            | HydroNode::Enumerate { input, .. }
1778            | HydroNode::Inspect { input, .. }
1779            | HydroNode::Unique { input, .. }
1780            | HydroNode::Network { input, .. }
1781            | HydroNode::Fold { input, .. }
1782            | HydroNode::Scan { input, .. }
1783            | HydroNode::FoldKeyed { input, .. }
1784            | HydroNode::Reduce { input, .. }
1785            | HydroNode::ReduceKeyed { input, .. }
1786            | HydroNode::Counter { input, .. } => {
1787                transform(input.as_mut(), seen_tees);
1788            }
1789        }
1790    }
1791
1792    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1793        match self {
1794            HydroNode::Placeholder => HydroNode::Placeholder,
1795            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1796                inner: Box::new(inner.deep_clone(seen_tees)),
1797                metadata: metadata.clone(),
1798            },
1799            HydroNode::ObserveNonDet {
1800                inner,
1801                trusted,
1802                metadata,
1803            } => HydroNode::ObserveNonDet {
1804                inner: Box::new(inner.deep_clone(seen_tees)),
1805                trusted: *trusted,
1806                metadata: metadata.clone(),
1807            },
1808            HydroNode::Source { source, metadata } => HydroNode::Source {
1809                source: source.clone(),
1810                metadata: metadata.clone(),
1811            },
1812            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1813                value: value.clone(),
1814                metadata: metadata.clone(),
1815            },
1816            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1817                ident: ident.clone(),
1818                metadata: metadata.clone(),
1819            },
1820            HydroNode::Tee { inner, metadata } => {
1821                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1822                    HydroNode::Tee {
1823                        inner: TeeNode(transformed.clone()),
1824                        metadata: metadata.clone(),
1825                    }
1826                } else {
1827                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1828                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1829                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1830                    *new_rc.borrow_mut() = cloned;
1831                    HydroNode::Tee {
1832                        inner: TeeNode(new_rc),
1833                        metadata: metadata.clone(),
1834                    }
1835                }
1836            }
1837            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1838                inner: Box::new(inner.deep_clone(seen_tees)),
1839                metadata: metadata.clone(),
1840            },
1841            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1842                inner: Box::new(inner.deep_clone(seen_tees)),
1843                metadata: metadata.clone(),
1844            },
1845            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1846                inner: Box::new(inner.deep_clone(seen_tees)),
1847                metadata: metadata.clone(),
1848            },
1849            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1850                inner: Box::new(inner.deep_clone(seen_tees)),
1851                metadata: metadata.clone(),
1852            },
1853            HydroNode::Chain {
1854                first,
1855                second,
1856                metadata,
1857            } => HydroNode::Chain {
1858                first: Box::new(first.deep_clone(seen_tees)),
1859                second: Box::new(second.deep_clone(seen_tees)),
1860                metadata: metadata.clone(),
1861            },
1862            HydroNode::ChainFirst {
1863                first,
1864                second,
1865                metadata,
1866            } => HydroNode::ChainFirst {
1867                first: Box::new(first.deep_clone(seen_tees)),
1868                second: Box::new(second.deep_clone(seen_tees)),
1869                metadata: metadata.clone(),
1870            },
1871            HydroNode::CrossProduct {
1872                left,
1873                right,
1874                metadata,
1875            } => HydroNode::CrossProduct {
1876                left: Box::new(left.deep_clone(seen_tees)),
1877                right: Box::new(right.deep_clone(seen_tees)),
1878                metadata: metadata.clone(),
1879            },
1880            HydroNode::CrossSingleton {
1881                left,
1882                right,
1883                metadata,
1884            } => HydroNode::CrossSingleton {
1885                left: Box::new(left.deep_clone(seen_tees)),
1886                right: Box::new(right.deep_clone(seen_tees)),
1887                metadata: metadata.clone(),
1888            },
1889            HydroNode::Join {
1890                left,
1891                right,
1892                metadata,
1893            } => HydroNode::Join {
1894                left: Box::new(left.deep_clone(seen_tees)),
1895                right: Box::new(right.deep_clone(seen_tees)),
1896                metadata: metadata.clone(),
1897            },
1898            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1899                pos: Box::new(pos.deep_clone(seen_tees)),
1900                neg: Box::new(neg.deep_clone(seen_tees)),
1901                metadata: metadata.clone(),
1902            },
1903            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1904                pos: Box::new(pos.deep_clone(seen_tees)),
1905                neg: Box::new(neg.deep_clone(seen_tees)),
1906                metadata: metadata.clone(),
1907            },
1908            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1909                input: Box::new(input.deep_clone(seen_tees)),
1910                metadata: metadata.clone(),
1911            },
1912            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1913                HydroNode::ResolveFuturesOrdered {
1914                    input: Box::new(input.deep_clone(seen_tees)),
1915                    metadata: metadata.clone(),
1916                }
1917            }
1918            HydroNode::Map { f, input, metadata } => HydroNode::Map {
1919                f: f.clone(),
1920                input: Box::new(input.deep_clone(seen_tees)),
1921                metadata: metadata.clone(),
1922            },
1923            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1924                f: f.clone(),
1925                input: Box::new(input.deep_clone(seen_tees)),
1926                metadata: metadata.clone(),
1927            },
1928            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1929                f: f.clone(),
1930                input: Box::new(input.deep_clone(seen_tees)),
1931                metadata: metadata.clone(),
1932            },
1933            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1934                f: f.clone(),
1935                input: Box::new(input.deep_clone(seen_tees)),
1936                metadata: metadata.clone(),
1937            },
1938            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1939                input: Box::new(input.deep_clone(seen_tees)),
1940                metadata: metadata.clone(),
1941            },
1942            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1943                input: Box::new(input.deep_clone(seen_tees)),
1944                metadata: metadata.clone(),
1945            },
1946            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1947                f: f.clone(),
1948                input: Box::new(input.deep_clone(seen_tees)),
1949                metadata: metadata.clone(),
1950            },
1951            HydroNode::Unique { input, metadata } => HydroNode::Unique {
1952                input: Box::new(input.deep_clone(seen_tees)),
1953                metadata: metadata.clone(),
1954            },
1955            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1956                input: Box::new(input.deep_clone(seen_tees)),
1957                metadata: metadata.clone(),
1958            },
1959            HydroNode::Fold {
1960                init,
1961                acc,
1962                input,
1963                metadata,
1964            } => HydroNode::Fold {
1965                init: init.clone(),
1966                acc: acc.clone(),
1967                input: Box::new(input.deep_clone(seen_tees)),
1968                metadata: metadata.clone(),
1969            },
1970            HydroNode::Scan {
1971                init,
1972                acc,
1973                input,
1974                metadata,
1975            } => HydroNode::Scan {
1976                init: init.clone(),
1977                acc: acc.clone(),
1978                input: Box::new(input.deep_clone(seen_tees)),
1979                metadata: metadata.clone(),
1980            },
1981            HydroNode::FoldKeyed {
1982                init,
1983                acc,
1984                input,
1985                metadata,
1986            } => HydroNode::FoldKeyed {
1987                init: init.clone(),
1988                acc: acc.clone(),
1989                input: Box::new(input.deep_clone(seen_tees)),
1990                metadata: metadata.clone(),
1991            },
1992            HydroNode::ReduceKeyedWatermark {
1993                f,
1994                input,
1995                watermark,
1996                metadata,
1997            } => HydroNode::ReduceKeyedWatermark {
1998                f: f.clone(),
1999                input: Box::new(input.deep_clone(seen_tees)),
2000                watermark: Box::new(watermark.deep_clone(seen_tees)),
2001                metadata: metadata.clone(),
2002            },
2003            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2004                f: f.clone(),
2005                input: Box::new(input.deep_clone(seen_tees)),
2006                metadata: metadata.clone(),
2007            },
2008            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2009                f: f.clone(),
2010                input: Box::new(input.deep_clone(seen_tees)),
2011                metadata: metadata.clone(),
2012            },
2013            HydroNode::Network {
2014                serialize_fn,
2015                instantiate_fn,
2016                deserialize_fn,
2017                input,
2018                metadata,
2019            } => HydroNode::Network {
2020                serialize_fn: serialize_fn.clone(),
2021                instantiate_fn: instantiate_fn.clone(),
2022                deserialize_fn: deserialize_fn.clone(),
2023                input: Box::new(input.deep_clone(seen_tees)),
2024                metadata: metadata.clone(),
2025            },
2026            HydroNode::ExternalInput {
2027                from_external_id,
2028                from_key,
2029                from_many,
2030                codec_type,
2031                port_hint,
2032                instantiate_fn,
2033                deserialize_fn,
2034                metadata,
2035            } => HydroNode::ExternalInput {
2036                from_external_id: *from_external_id,
2037                from_key: *from_key,
2038                from_many: *from_many,
2039                codec_type: codec_type.clone(),
2040                port_hint: *port_hint,
2041                instantiate_fn: instantiate_fn.clone(),
2042                deserialize_fn: deserialize_fn.clone(),
2043                metadata: metadata.clone(),
2044            },
2045            HydroNode::Counter {
2046                tag,
2047                duration,
2048                prefix,
2049                input,
2050                metadata,
2051            } => HydroNode::Counter {
2052                tag: tag.clone(),
2053                duration: duration.clone(),
2054                prefix: prefix.clone(),
2055                input: Box::new(input.deep_clone(seen_tees)),
2056                metadata: metadata.clone(),
2057            },
2058        }
2059    }
2060
2061    #[cfg(feature = "build")]
2062    pub fn emit_core<'a, D: Deploy<'a>>(
2063        &mut self,
2064        builders_or_callback: &mut BuildersOrCallback<
2065            impl FnMut(&mut HydroRoot, &mut usize),
2066            impl FnMut(&mut HydroNode, &mut usize),
2067        >,
2068        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2069        next_stmt_id: &mut usize,
2070    ) -> syn::Ident {
2071        let out_location = self.metadata().location_kind.clone();
2072        match self {
2073            HydroNode::Placeholder => {
2074                panic!()
2075            }
2076
2077            HydroNode::Cast { inner, .. } => {
2078                let inner_ident =
2079                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2080
2081                match builders_or_callback {
2082                    BuildersOrCallback::Builders(_) => {}
2083                    BuildersOrCallback::Callback(_, node_callback) => {
2084                        node_callback(self, next_stmt_id);
2085                    }
2086                }
2087
2088                *next_stmt_id += 1;
2089
2090                inner_ident
2091            }
2092
2093            HydroNode::ObserveNonDet {
2094                inner,
2095                trusted,
2096                metadata,
2097                ..
2098            } => {
2099                let inner_ident =
2100                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2101
2102                let observe_ident =
2103                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2104
2105                match builders_or_callback {
2106                    BuildersOrCallback::Builders(graph_builders) => {
2107                        graph_builders.observe_nondet(
2108                            *trusted,
2109                            &inner.metadata().location_kind,
2110                            inner_ident,
2111                            &inner.metadata().collection_kind,
2112                            &observe_ident,
2113                            &metadata.collection_kind,
2114                            &metadata.op,
2115                        );
2116                    }
2117                    BuildersOrCallback::Callback(_, node_callback) => {
2118                        node_callback(self, next_stmt_id);
2119                    }
2120                }
2121
2122                *next_stmt_id += 1;
2123
2124                observe_ident
2125            }
2126
2127            HydroNode::Batch {
2128                inner, metadata, ..
2129            } => {
2130                let inner_ident =
2131                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2132
2133                let batch_ident =
2134                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2135
2136                match builders_or_callback {
2137                    BuildersOrCallback::Builders(graph_builders) => {
2138                        graph_builders.batch(
2139                            inner_ident,
2140                            &inner.metadata().location_kind,
2141                            &inner.metadata().collection_kind,
2142                            &batch_ident,
2143                            &out_location,
2144                            &metadata.op,
2145                        );
2146                    }
2147                    BuildersOrCallback::Callback(_, node_callback) => {
2148                        node_callback(self, next_stmt_id);
2149                    }
2150                }
2151
2152                *next_stmt_id += 1;
2153
2154                batch_ident
2155            }
2156
2157            HydroNode::YieldConcat { inner, .. } => {
2158                let inner_ident =
2159                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2160
2161                let yield_ident =
2162                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2163
2164                match builders_or_callback {
2165                    BuildersOrCallback::Builders(graph_builders) => {
2166                        graph_builders.yield_from_tick(
2167                            inner_ident,
2168                            &inner.metadata().location_kind,
2169                            &inner.metadata().collection_kind,
2170                            &yield_ident,
2171                            &out_location,
2172                        );
2173                    }
2174                    BuildersOrCallback::Callback(_, node_callback) => {
2175                        node_callback(self, next_stmt_id);
2176                    }
2177                }
2178
2179                *next_stmt_id += 1;
2180
2181                yield_ident
2182            }
2183
2184            HydroNode::BeginAtomic { inner, metadata } => {
2185                let inner_ident =
2186                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2187
2188                let begin_ident =
2189                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2190
2191                match builders_or_callback {
2192                    BuildersOrCallback::Builders(graph_builders) => {
2193                        graph_builders.begin_atomic(
2194                            inner_ident,
2195                            &inner.metadata().location_kind,
2196                            &inner.metadata().collection_kind,
2197                            &begin_ident,
2198                            &out_location,
2199                            &metadata.op,
2200                        );
2201                    }
2202                    BuildersOrCallback::Callback(_, node_callback) => {
2203                        node_callback(self, next_stmt_id);
2204                    }
2205                }
2206
2207                *next_stmt_id += 1;
2208
2209                begin_ident
2210            }
2211
2212            HydroNode::EndAtomic { inner, .. } => {
2213                let inner_ident =
2214                    inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2215
2216                let end_ident =
2217                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2218
2219                match builders_or_callback {
2220                    BuildersOrCallback::Builders(graph_builders) => {
2221                        graph_builders.end_atomic(
2222                            inner_ident,
2223                            &inner.metadata().location_kind,
2224                            &inner.metadata().collection_kind,
2225                            &end_ident,
2226                        );
2227                    }
2228                    BuildersOrCallback::Callback(_, node_callback) => {
2229                        node_callback(self, next_stmt_id);
2230                    }
2231                }
2232
2233                *next_stmt_id += 1;
2234
2235                end_ident
2236            }
2237
2238            HydroNode::Source {
2239                source, metadata, ..
2240            } => {
2241                if let HydroSource::ExternalNetwork() = source {
2242                    syn::Ident::new("DUMMY", Span::call_site())
2243                } else {
2244                    let source_ident =
2245                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2246
2247                    let source_stmt = match source {
2248                        HydroSource::Stream(expr) => {
2249                            debug_assert!(metadata.location_kind.is_top_level());
2250                            parse_quote! {
2251                                #source_ident = source_stream(#expr);
2252                            }
2253                        }
2254
2255                        HydroSource::ExternalNetwork() => {
2256                            unreachable!()
2257                        }
2258
2259                        HydroSource::Iter(expr) => {
2260                            if metadata.location_kind.is_top_level() {
2261                                parse_quote! {
2262                                    #source_ident = source_iter(#expr);
2263                                }
2264                            } else {
2265                                // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2266                                parse_quote! {
2267                                    #source_ident = source_iter(#expr) -> persist::<'static>();
2268                                }
2269                            }
2270                        }
2271
2272                        HydroSource::Spin() => {
2273                            debug_assert!(metadata.location_kind.is_top_level());
2274                            parse_quote! {
2275                                #source_ident = spin();
2276                            }
2277                        }
2278
2279                        HydroSource::ClusterMembers(location_id) => {
2280                            debug_assert!(metadata.location_kind.is_top_level());
2281
2282                            let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2283                                D::cluster_membership_stream(location_id),
2284                                &(),
2285                            );
2286
2287                            parse_quote! {
2288                                #source_ident = source_stream(#expr);
2289                            }
2290                        }
2291                    };
2292
2293                    match builders_or_callback {
2294                        BuildersOrCallback::Builders(graph_builders) => {
2295                            let builder = graph_builders.get_dfir_mut(&out_location);
2296                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2297                        }
2298                        BuildersOrCallback::Callback(_, node_callback) => {
2299                            node_callback(self, next_stmt_id);
2300                        }
2301                    }
2302
2303                    *next_stmt_id += 1;
2304
2305                    source_ident
2306                }
2307            }
2308
2309            HydroNode::SingletonSource { value, metadata } => {
2310                let source_ident =
2311                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2312
2313                match builders_or_callback {
2314                    BuildersOrCallback::Builders(graph_builders) => {
2315                        let should_replay = !graph_builders.singleton_intermediates();
2316                        let builder = graph_builders.get_dfir_mut(&out_location);
2317
2318                        if should_replay || !metadata.location_kind.is_top_level() {
2319                            builder.add_dfir(
2320                                parse_quote! {
2321                                    #source_ident = source_iter([#value]) -> persist::<'static>();
2322                                },
2323                                None,
2324                                Some(&next_stmt_id.to_string()),
2325                            );
2326                        } else {
2327                            builder.add_dfir(
2328                                parse_quote! {
2329                                    #source_ident = source_iter([#value]);
2330                                },
2331                                None,
2332                                Some(&next_stmt_id.to_string()),
2333                            );
2334                        }
2335                    }
2336                    BuildersOrCallback::Callback(_, node_callback) => {
2337                        node_callback(self, next_stmt_id);
2338                    }
2339                }
2340
2341                *next_stmt_id += 1;
2342
2343                source_ident
2344            }
2345
2346            HydroNode::CycleSource { ident, .. } => {
2347                let ident = ident.clone();
2348
2349                match builders_or_callback {
2350                    BuildersOrCallback::Builders(_) => {}
2351                    BuildersOrCallback::Callback(_, node_callback) => {
2352                        node_callback(self, next_stmt_id);
2353                    }
2354                }
2355
2356                // consume a stmt id even though we did not emit anything so that we can instrument this
2357                *next_stmt_id += 1;
2358
2359                ident
2360            }
2361
2362            HydroNode::Tee { inner, .. } => {
2363                let ret_ident = if let Some(teed_from) =
2364                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2365                {
2366                    match builders_or_callback {
2367                        BuildersOrCallback::Builders(_) => {}
2368                        BuildersOrCallback::Callback(_, node_callback) => {
2369                            node_callback(self, next_stmt_id);
2370                        }
2371                    }
2372
2373                    teed_from.clone()
2374                } else {
2375                    let inner_ident = inner.0.borrow_mut().emit_core::<D>(
2376                        builders_or_callback,
2377                        built_tees,
2378                        next_stmt_id,
2379                    );
2380
2381                    let tee_ident =
2382                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2383
2384                    built_tees.insert(
2385                        inner.0.as_ref() as *const RefCell<HydroNode>,
2386                        tee_ident.clone(),
2387                    );
2388
2389                    match builders_or_callback {
2390                        BuildersOrCallback::Builders(graph_builders) => {
2391                            let builder = graph_builders.get_dfir_mut(&out_location);
2392                            builder.add_dfir(
2393                                parse_quote! {
2394                                    #tee_ident = #inner_ident -> tee();
2395                                },
2396                                None,
2397                                Some(&next_stmt_id.to_string()),
2398                            );
2399                        }
2400                        BuildersOrCallback::Callback(_, node_callback) => {
2401                            node_callback(self, next_stmt_id);
2402                        }
2403                    }
2404
2405                    tee_ident
2406                };
2407
2408                // we consume a stmt id regardless of if we emit the tee() operator,
2409                // so that during rewrites we touch all recipients of the tee()
2410
2411                *next_stmt_id += 1;
2412                ret_ident
2413            }
2414
2415            HydroNode::Chain { first, second, .. } => {
2416                let first_ident =
2417                    first.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2418                let second_ident =
2419                    second.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2420
2421                let chain_ident =
2422                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2423
2424                match builders_or_callback {
2425                    BuildersOrCallback::Builders(graph_builders) => {
2426                        let builder = graph_builders.get_dfir_mut(&out_location);
2427                        builder.add_dfir(
2428                            parse_quote! {
2429                                #chain_ident = chain();
2430                                #first_ident -> [0]#chain_ident;
2431                                #second_ident -> [1]#chain_ident;
2432                            },
2433                            None,
2434                            Some(&next_stmt_id.to_string()),
2435                        );
2436                    }
2437                    BuildersOrCallback::Callback(_, node_callback) => {
2438                        node_callback(self, next_stmt_id);
2439                    }
2440                }
2441
2442                *next_stmt_id += 1;
2443
2444                chain_ident
2445            }
2446
2447            HydroNode::ChainFirst { first, second, .. } => {
2448                let first_ident =
2449                    first.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2450                let second_ident =
2451                    second.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2452
2453                let chain_ident =
2454                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2455
2456                match builders_or_callback {
2457                    BuildersOrCallback::Builders(graph_builders) => {
2458                        let builder = graph_builders.get_dfir_mut(&out_location);
2459                        builder.add_dfir(
2460                            parse_quote! {
2461                                #chain_ident = chain_first_n(1);
2462                                #first_ident -> [0]#chain_ident;
2463                                #second_ident -> [1]#chain_ident;
2464                            },
2465                            None,
2466                            Some(&next_stmt_id.to_string()),
2467                        );
2468                    }
2469                    BuildersOrCallback::Callback(_, node_callback) => {
2470                        node_callback(self, next_stmt_id);
2471                    }
2472                }
2473
2474                *next_stmt_id += 1;
2475
2476                chain_ident
2477            }
2478
2479            HydroNode::CrossSingleton { left, right, .. } => {
2480                let left_ident =
2481                    left.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2482                let right_ident =
2483                    right.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2484
2485                let cross_ident =
2486                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2487
2488                match builders_or_callback {
2489                    BuildersOrCallback::Builders(graph_builders) => {
2490                        let builder = graph_builders.get_dfir_mut(&out_location);
2491                        builder.add_dfir(
2492                            parse_quote! {
2493                                #cross_ident = cross_singleton();
2494                                #left_ident -> [input]#cross_ident;
2495                                #right_ident -> [single]#cross_ident;
2496                            },
2497                            None,
2498                            Some(&next_stmt_id.to_string()),
2499                        );
2500                    }
2501                    BuildersOrCallback::Callback(_, node_callback) => {
2502                        node_callback(self, next_stmt_id);
2503                    }
2504                }
2505
2506                *next_stmt_id += 1;
2507
2508                cross_ident
2509            }
2510
2511            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2512                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
2513                    parse_quote!(cross_join_multiset)
2514                } else {
2515                    parse_quote!(join_multiset)
2516                };
2517
2518                let (HydroNode::CrossProduct { left, right, .. }
2519                | HydroNode::Join { left, right, .. }) = self
2520                else {
2521                    unreachable!()
2522                };
2523
2524                let is_top_level = left.metadata().location_kind.is_top_level()
2525                    && right.metadata().location_kind.is_top_level();
2526                let (left_inner, left_lifetime) = if left.metadata().location_kind.is_top_level() {
2527                    (left, quote!('static))
2528                } else {
2529                    (left, quote!('tick))
2530                };
2531
2532                let (right_inner, right_lifetime) = if right.metadata().location_kind.is_top_level()
2533                {
2534                    (right, quote!('static))
2535                } else {
2536                    (right, quote!('tick))
2537                };
2538
2539                let left_ident =
2540                    left_inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2541                let right_ident =
2542                    right_inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2543
2544                let stream_ident =
2545                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2546
2547                match builders_or_callback {
2548                    BuildersOrCallback::Builders(graph_builders) => {
2549                        let builder = graph_builders.get_dfir_mut(&out_location);
2550                        builder.add_dfir(
2551                            if is_top_level {
2552                                // if both inputs are root, the output is expected to have streamy semantics, so we need
2553                                // a multiset_delta() to negate the replay behavior
2554                                parse_quote! {
2555                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2556                                    #left_ident -> [0]#stream_ident;
2557                                    #right_ident -> [1]#stream_ident;
2558                                }
2559                            } else {
2560                                parse_quote! {
2561                                    #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2562                                    #left_ident -> [0]#stream_ident;
2563                                    #right_ident -> [1]#stream_ident;
2564                                }
2565                            }
2566                            ,
2567                            None,
2568                            Some(&next_stmt_id.to_string()),
2569                        );
2570                    }
2571                    BuildersOrCallback::Callback(_, node_callback) => {
2572                        node_callback(self, next_stmt_id);
2573                    }
2574                }
2575
2576                *next_stmt_id += 1;
2577
2578                stream_ident
2579            }
2580
2581            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2582                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2583                    parse_quote!(difference)
2584                } else {
2585                    parse_quote!(anti_join)
2586                };
2587
2588                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2589                    self
2590                else {
2591                    unreachable!()
2592                };
2593
2594                let (neg, neg_lifetime) = if neg.metadata().location_kind.is_top_level() {
2595                    (neg, quote!('static))
2596                } else {
2597                    (neg, quote!('tick))
2598                };
2599
2600                let pos_ident = pos.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2601                let neg_ident = neg.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2602
2603                let stream_ident =
2604                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2605
2606                match builders_or_callback {
2607                    BuildersOrCallback::Builders(graph_builders) => {
2608                        let builder = graph_builders.get_dfir_mut(&out_location);
2609                        builder.add_dfir(
2610                            parse_quote! {
2611                                #stream_ident = #operator::<'tick, #neg_lifetime>();
2612                                #pos_ident -> [pos]#stream_ident;
2613                                #neg_ident -> [neg]#stream_ident;
2614                            },
2615                            None,
2616                            Some(&next_stmt_id.to_string()),
2617                        );
2618                    }
2619                    BuildersOrCallback::Callback(_, node_callback) => {
2620                        node_callback(self, next_stmt_id);
2621                    }
2622                }
2623
2624                *next_stmt_id += 1;
2625
2626                stream_ident
2627            }
2628
2629            HydroNode::ResolveFutures { input, .. } => {
2630                let input_ident =
2631                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2632
2633                let futures_ident =
2634                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2635
2636                match builders_or_callback {
2637                    BuildersOrCallback::Builders(graph_builders) => {
2638                        let builder = graph_builders.get_dfir_mut(&out_location);
2639                        builder.add_dfir(
2640                            parse_quote! {
2641                                #futures_ident = #input_ident -> resolve_futures();
2642                            },
2643                            None,
2644                            Some(&next_stmt_id.to_string()),
2645                        );
2646                    }
2647                    BuildersOrCallback::Callback(_, node_callback) => {
2648                        node_callback(self, next_stmt_id);
2649                    }
2650                }
2651
2652                *next_stmt_id += 1;
2653
2654                futures_ident
2655            }
2656
2657            HydroNode::ResolveFuturesOrdered { input, .. } => {
2658                let input_ident =
2659                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2660
2661                let futures_ident =
2662                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2663
2664                match builders_or_callback {
2665                    BuildersOrCallback::Builders(graph_builders) => {
2666                        let builder = graph_builders.get_dfir_mut(&out_location);
2667                        builder.add_dfir(
2668                            parse_quote! {
2669                                #futures_ident = #input_ident -> resolve_futures_ordered();
2670                            },
2671                            None,
2672                            Some(&next_stmt_id.to_string()),
2673                        );
2674                    }
2675                    BuildersOrCallback::Callback(_, node_callback) => {
2676                        node_callback(self, next_stmt_id);
2677                    }
2678                }
2679
2680                *next_stmt_id += 1;
2681
2682                futures_ident
2683            }
2684
2685            HydroNode::Map { f, input, .. } => {
2686                let input_ident =
2687                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2688
2689                let map_ident =
2690                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2691
2692                match builders_or_callback {
2693                    BuildersOrCallback::Builders(graph_builders) => {
2694                        let builder = graph_builders.get_dfir_mut(&out_location);
2695                        builder.add_dfir(
2696                            parse_quote! {
2697                                #map_ident = #input_ident -> map(#f);
2698                            },
2699                            None,
2700                            Some(&next_stmt_id.to_string()),
2701                        );
2702                    }
2703                    BuildersOrCallback::Callback(_, node_callback) => {
2704                        node_callback(self, next_stmt_id);
2705                    }
2706                }
2707
2708                *next_stmt_id += 1;
2709
2710                map_ident
2711            }
2712
2713            HydroNode::FlatMap { f, input, .. } => {
2714                let input_ident =
2715                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2716
2717                let flat_map_ident =
2718                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2719
2720                match builders_or_callback {
2721                    BuildersOrCallback::Builders(graph_builders) => {
2722                        let builder = graph_builders.get_dfir_mut(&out_location);
2723                        builder.add_dfir(
2724                            parse_quote! {
2725                                #flat_map_ident = #input_ident -> flat_map(#f);
2726                            },
2727                            None,
2728                            Some(&next_stmt_id.to_string()),
2729                        );
2730                    }
2731                    BuildersOrCallback::Callback(_, node_callback) => {
2732                        node_callback(self, next_stmt_id);
2733                    }
2734                }
2735
2736                *next_stmt_id += 1;
2737
2738                flat_map_ident
2739            }
2740
2741            HydroNode::Filter { f, input, .. } => {
2742                let input_ident =
2743                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2744
2745                let filter_ident =
2746                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2747
2748                match builders_or_callback {
2749                    BuildersOrCallback::Builders(graph_builders) => {
2750                        let builder = graph_builders.get_dfir_mut(&out_location);
2751                        builder.add_dfir(
2752                            parse_quote! {
2753                                #filter_ident = #input_ident -> filter(#f);
2754                            },
2755                            None,
2756                            Some(&next_stmt_id.to_string()),
2757                        );
2758                    }
2759                    BuildersOrCallback::Callback(_, node_callback) => {
2760                        node_callback(self, next_stmt_id);
2761                    }
2762                }
2763
2764                *next_stmt_id += 1;
2765
2766                filter_ident
2767            }
2768
2769            HydroNode::FilterMap { f, input, .. } => {
2770                let input_ident =
2771                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2772
2773                let filter_map_ident =
2774                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2775
2776                match builders_or_callback {
2777                    BuildersOrCallback::Builders(graph_builders) => {
2778                        let builder = graph_builders.get_dfir_mut(&out_location);
2779                        builder.add_dfir(
2780                            parse_quote! {
2781                                #filter_map_ident = #input_ident -> filter_map(#f);
2782                            },
2783                            None,
2784                            Some(&next_stmt_id.to_string()),
2785                        );
2786                    }
2787                    BuildersOrCallback::Callback(_, node_callback) => {
2788                        node_callback(self, next_stmt_id);
2789                    }
2790                }
2791
2792                *next_stmt_id += 1;
2793
2794                filter_map_ident
2795            }
2796
2797            HydroNode::Sort { input, .. } => {
2798                let input_ident =
2799                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2800
2801                let sort_ident =
2802                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2803
2804                match builders_or_callback {
2805                    BuildersOrCallback::Builders(graph_builders) => {
2806                        let builder = graph_builders.get_dfir_mut(&out_location);
2807                        builder.add_dfir(
2808                            parse_quote! {
2809                                #sort_ident = #input_ident -> sort();
2810                            },
2811                            None,
2812                            Some(&next_stmt_id.to_string()),
2813                        );
2814                    }
2815                    BuildersOrCallback::Callback(_, node_callback) => {
2816                        node_callback(self, next_stmt_id);
2817                    }
2818                }
2819
2820                *next_stmt_id += 1;
2821
2822                sort_ident
2823            }
2824
2825            HydroNode::DeferTick { input, .. } => {
2826                let input_ident =
2827                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2828
2829                let defer_tick_ident =
2830                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2831
2832                match builders_or_callback {
2833                    BuildersOrCallback::Builders(graph_builders) => {
2834                        let builder = graph_builders.get_dfir_mut(&out_location);
2835                        builder.add_dfir(
2836                            parse_quote! {
2837                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
2838                            },
2839                            None,
2840                            Some(&next_stmt_id.to_string()),
2841                        );
2842                    }
2843                    BuildersOrCallback::Callback(_, node_callback) => {
2844                        node_callback(self, next_stmt_id);
2845                    }
2846                }
2847
2848                *next_stmt_id += 1;
2849
2850                defer_tick_ident
2851            }
2852
2853            HydroNode::Enumerate { input, .. } => {
2854                let input_ident =
2855                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2856
2857                let enumerate_ident =
2858                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2859
2860                match builders_or_callback {
2861                    BuildersOrCallback::Builders(graph_builders) => {
2862                        let builder = graph_builders.get_dfir_mut(&out_location);
2863                        let lifetime = if input.metadata().location_kind.is_top_level() {
2864                            quote!('static)
2865                        } else {
2866                            quote!('tick)
2867                        };
2868                        builder.add_dfir(
2869                            parse_quote! {
2870                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2871                            },
2872                            None,
2873                            Some(&next_stmt_id.to_string()),
2874                        );
2875                    }
2876                    BuildersOrCallback::Callback(_, node_callback) => {
2877                        node_callback(self, next_stmt_id);
2878                    }
2879                }
2880
2881                *next_stmt_id += 1;
2882
2883                enumerate_ident
2884            }
2885
2886            HydroNode::Inspect { f, input, .. } => {
2887                let input_ident =
2888                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2889
2890                let inspect_ident =
2891                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2892
2893                match builders_or_callback {
2894                    BuildersOrCallback::Builders(graph_builders) => {
2895                        let builder = graph_builders.get_dfir_mut(&out_location);
2896                        builder.add_dfir(
2897                            parse_quote! {
2898                                #inspect_ident = #input_ident -> inspect(#f);
2899                            },
2900                            None,
2901                            Some(&next_stmt_id.to_string()),
2902                        );
2903                    }
2904                    BuildersOrCallback::Callback(_, node_callback) => {
2905                        node_callback(self, next_stmt_id);
2906                    }
2907                }
2908
2909                *next_stmt_id += 1;
2910
2911                inspect_ident
2912            }
2913
2914            HydroNode::Unique { input, .. } => {
2915                let input_ident =
2916                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2917
2918                let unique_ident =
2919                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2920
2921                match builders_or_callback {
2922                    BuildersOrCallback::Builders(graph_builders) => {
2923                        let builder = graph_builders.get_dfir_mut(&out_location);
2924                        let lifetime = if input.metadata().location_kind.is_top_level() {
2925                            quote!('static)
2926                        } else {
2927                            quote!('tick)
2928                        };
2929
2930                        builder.add_dfir(
2931                            parse_quote! {
2932                                #unique_ident = #input_ident -> unique::<#lifetime>();
2933                            },
2934                            None,
2935                            Some(&next_stmt_id.to_string()),
2936                        );
2937                    }
2938                    BuildersOrCallback::Callback(_, node_callback) => {
2939                        node_callback(self, next_stmt_id);
2940                    }
2941                }
2942
2943                *next_stmt_id += 1;
2944
2945                unique_ident
2946            }
2947
2948            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2949                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2950                    parse_quote!(fold)
2951                } else if matches!(self, HydroNode::Scan { .. }) {
2952                    parse_quote!(scan)
2953                } else {
2954                    parse_quote!(fold_keyed)
2955                };
2956
2957                let (HydroNode::Fold { input, .. }
2958                | HydroNode::FoldKeyed { input, .. }
2959                | HydroNode::Scan { input, .. }) = self
2960                else {
2961                    unreachable!()
2962                };
2963
2964                let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
2965                    (input, quote!('static))
2966                } else {
2967                    (input, quote!('tick))
2968                };
2969
2970                let input_ident =
2971                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2972
2973                let (HydroNode::Fold { init, acc, .. }
2974                | HydroNode::FoldKeyed { init, acc, .. }
2975                | HydroNode::Scan { init, acc, .. }) = &*self
2976                else {
2977                    unreachable!()
2978                };
2979
2980                let fold_ident =
2981                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2982
2983                match builders_or_callback {
2984                    BuildersOrCallback::Builders(graph_builders) => {
2985                        if matches!(self, HydroNode::Fold { .. })
2986                            && self.metadata().location_kind.is_top_level()
2987                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2988                            && graph_builders.singleton_intermediates()
2989                        {
2990                            let builder = graph_builders.get_dfir_mut(&out_location);
2991
2992                            let acc: syn::Expr = parse_quote!({
2993                                let mut __inner = #acc;
2994                                move |__state, __value| {
2995                                    __inner(__state, __value);
2996                                    Some(__state.clone())
2997                                }
2998                            });
2999
3000                            builder.add_dfir(
3001                                parse_quote! {
3002                                    source_iter([(#init)()]) -> [0]#fold_ident;
3003                                    #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3004                                    #fold_ident = chain();
3005                                },
3006                                None,
3007                                Some(&next_stmt_id.to_string()),
3008                            );
3009                        } else if matches!(self, HydroNode::FoldKeyed { .. })
3010                            && self.metadata().location_kind.is_top_level()
3011                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3012                            && graph_builders.singleton_intermediates()
3013                        {
3014                            let builder = graph_builders.get_dfir_mut(&out_location);
3015
3016                            let acc: syn::Expr = parse_quote!({
3017                                let mut __init = #init;
3018                                let mut __inner = #acc;
3019                                move |__state, __kv: (_, _)| {
3020                                    // TODO(shadaj): we can avoid the clone when the entry exists
3021                                    let __state = __state
3022                                        .entry(::std::clone::Clone::clone(&__kv.0))
3023                                        .or_insert_with(|| (__init)());
3024                                    __inner(__state, __kv.1);
3025                                    Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3026                                }
3027                            });
3028
3029                            builder.add_dfir(
3030                                parse_quote! {
3031                                    source_iter([(#init)()]) -> [0]#fold_ident;
3032                                    #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3033                                },
3034                                None,
3035                                Some(&next_stmt_id.to_string()),
3036                            );
3037                        } else {
3038                            let builder = graph_builders.get_dfir_mut(&out_location);
3039                            builder.add_dfir(
3040                                parse_quote! {
3041                                    #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3042                                },
3043                                None,
3044                                Some(&next_stmt_id.to_string()),
3045                            );
3046                        }
3047                    }
3048                    BuildersOrCallback::Callback(_, node_callback) => {
3049                        node_callback(self, next_stmt_id);
3050                    }
3051                }
3052
3053                *next_stmt_id += 1;
3054
3055                fold_ident
3056            }
3057
3058            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3059                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
3060                    parse_quote!(reduce)
3061                } else {
3062                    parse_quote!(reduce_keyed)
3063                };
3064
3065                let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = self
3066                else {
3067                    unreachable!()
3068                };
3069
3070                let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
3071                    (input, quote!('static))
3072                } else {
3073                    (input, quote!('tick))
3074                };
3075
3076                let input_ident =
3077                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3078
3079                let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*self
3080                else {
3081                    unreachable!()
3082                };
3083
3084                let reduce_ident =
3085                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3086
3087                match builders_or_callback {
3088                    BuildersOrCallback::Builders(graph_builders) => {
3089                        if matches!(self, HydroNode::Reduce { .. })
3090                            && self.metadata().location_kind.is_top_level()
3091                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3092                            && graph_builders.singleton_intermediates()
3093                        {
3094                            todo!(
3095                                "Reduce with optional intermediates is not yet supported in simulator"
3096                            );
3097                        } else if matches!(self, HydroNode::ReduceKeyed { .. })
3098                            && self.metadata().location_kind.is_top_level()
3099                            && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3100                            && graph_builders.singleton_intermediates()
3101                        {
3102                            todo!();
3103                        } else {
3104                            let builder = graph_builders.get_dfir_mut(&out_location);
3105                            builder.add_dfir(
3106                                parse_quote! {
3107                                    #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3108                                },
3109                                None,
3110                                Some(&next_stmt_id.to_string()),
3111                            );
3112                        }
3113                    }
3114                    BuildersOrCallback::Callback(_, node_callback) => {
3115                        node_callback(self, next_stmt_id);
3116                    }
3117                }
3118
3119                *next_stmt_id += 1;
3120
3121                reduce_ident
3122            }
3123
3124            HydroNode::ReduceKeyedWatermark {
3125                f,
3126                input,
3127                watermark,
3128                ..
3129            } => {
3130                let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
3131                    (input, quote!('static))
3132                } else {
3133                    (input, quote!('tick))
3134                };
3135
3136                let input_ident =
3137                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3138
3139                let watermark_ident =
3140                    watermark.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3141
3142                let chain_ident = syn::Ident::new(
3143                    &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3144                    Span::call_site(),
3145                );
3146
3147                let fold_ident =
3148                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3149
3150                match builders_or_callback {
3151                    BuildersOrCallback::Builders(graph_builders) => {
3152                        let builder = graph_builders.get_dfir_mut(&out_location);
3153                        // 1. Don't allow any values to be added to the map if the key <=the watermark
3154                        // 2. If the entry didn't exist in the BTreeMap, add it. Otherwise, call f.
3155                        //    If the watermark changed, delete all BTreeMap entries with a key < the watermark.
3156                        // 3. Convert the BTreeMap back into a stream of (k, v)
3157                        builder.add_dfir(
3158                            parse_quote! {
3159                                #chain_ident = chain();
3160                                #input_ident
3161                                    -> map(|x| (Some(x), None))
3162                                    -> [0]#chain_ident;
3163                                #watermark_ident
3164                                    -> map(|watermark| (None, Some(watermark)))
3165                                    -> [1]#chain_ident;
3166
3167                                #fold_ident = #chain_ident
3168                                    -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3169                                        let __reduce_keyed_fn = #f;
3170                                        move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3171                                            if let Some((k, v)) = opt_payload {
3172                                                if let Some(curr_watermark) = *opt_curr_watermark {
3173                                                    if k <= curr_watermark {
3174                                                        return;
3175                                                    }
3176                                                }
3177                                                match map.entry(k) {
3178                                                    ::std::collections::hash_map::Entry::Vacant(e) => {
3179                                                        e.insert(v);
3180                                                    }
3181                                                    ::std::collections::hash_map::Entry::Occupied(mut e) => {
3182                                                        __reduce_keyed_fn(e.get_mut(), v);
3183                                                    }
3184                                                }
3185                                            } else {
3186                                                let watermark = opt_watermark.unwrap();
3187                                                if let Some(curr_watermark) = *opt_curr_watermark {
3188                                                    if watermark <= curr_watermark {
3189                                                        return;
3190                                                    }
3191                                                }
3192                                                *opt_curr_watermark = opt_watermark;
3193                                                map.retain(|k, _| *k > watermark);
3194                                            }
3195                                        }
3196                                    })
3197                                    -> flat_map(|(map, _curr_watermark)| map);
3198                            },
3199                            None,
3200                            Some(&next_stmt_id.to_string()),
3201                        );
3202                    }
3203                    BuildersOrCallback::Callback(_, node_callback) => {
3204                        node_callback(self, next_stmt_id);
3205                    }
3206                }
3207
3208                *next_stmt_id += 1;
3209
3210                fold_ident
3211            }
3212
3213            HydroNode::Network {
3214                serialize_fn: serialize_pipeline,
3215                instantiate_fn,
3216                deserialize_fn: deserialize_pipeline,
3217                input,
3218                ..
3219            } => {
3220                let input_ident =
3221                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3222
3223                let receiver_stream_ident =
3224                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3225
3226                match builders_or_callback {
3227                    BuildersOrCallback::Builders(graph_builders) => {
3228                        let (sink_expr, source_expr) = match instantiate_fn {
3229                            DebugInstantiate::Building => (
3230                                syn::parse_quote!(DUMMY_SINK),
3231                                syn::parse_quote!(DUMMY_SOURCE),
3232                            ),
3233
3234                            DebugInstantiate::Finalized(finalized) => {
3235                                (finalized.sink.clone(), finalized.source.clone())
3236                            }
3237                        };
3238
3239                        graph_builders.create_network(
3240                            &input.metadata().location_kind,
3241                            &out_location,
3242                            input_ident,
3243                            &receiver_stream_ident,
3244                            serialize_pipeline,
3245                            sink_expr,
3246                            source_expr,
3247                            deserialize_pipeline,
3248                            *next_stmt_id,
3249                        );
3250                    }
3251                    BuildersOrCallback::Callback(_, node_callback) => {
3252                        node_callback(self, next_stmt_id);
3253                    }
3254                }
3255
3256                *next_stmt_id += 1;
3257
3258                receiver_stream_ident
3259            }
3260
3261            HydroNode::ExternalInput {
3262                instantiate_fn,
3263                deserialize_fn: deserialize_pipeline,
3264                ..
3265            } => {
3266                let receiver_stream_ident =
3267                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3268
3269                match builders_or_callback {
3270                    BuildersOrCallback::Builders(graph_builders) => {
3271                        let (_, source_expr) = match instantiate_fn {
3272                            DebugInstantiate::Building => (
3273                                syn::parse_quote!(DUMMY_SINK),
3274                                syn::parse_quote!(DUMMY_SOURCE),
3275                            ),
3276
3277                            DebugInstantiate::Finalized(finalized) => {
3278                                (finalized.sink.clone(), finalized.source.clone())
3279                            }
3280                        };
3281
3282                        graph_builders.create_external_source(
3283                            &out_location,
3284                            source_expr,
3285                            &receiver_stream_ident,
3286                            deserialize_pipeline,
3287                            *next_stmt_id,
3288                        );
3289                    }
3290                    BuildersOrCallback::Callback(_, node_callback) => {
3291                        node_callback(self, next_stmt_id);
3292                    }
3293                }
3294
3295                *next_stmt_id += 1;
3296
3297                receiver_stream_ident
3298            }
3299
3300            HydroNode::Counter {
3301                tag,
3302                duration,
3303                prefix,
3304                input,
3305                ..
3306            } => {
3307                let input_ident =
3308                    input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3309
3310                let counter_ident =
3311                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3312
3313                match builders_or_callback {
3314                    BuildersOrCallback::Builders(graph_builders) => {
3315                        let builder = graph_builders.get_dfir_mut(&out_location);
3316                        builder.add_dfir(
3317                            parse_quote! {
3318                                #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3319                            },
3320                            None,
3321                            Some(&next_stmt_id.to_string()),
3322                        );
3323                    }
3324                    BuildersOrCallback::Callback(_, node_callback) => {
3325                        node_callback(self, next_stmt_id);
3326                    }
3327                }
3328
3329                *next_stmt_id += 1;
3330
3331                counter_ident
3332            }
3333        }
3334    }
3335
3336    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3337        match self {
3338            HydroNode::Placeholder => {
3339                panic!()
3340            }
3341            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3342            HydroNode::Source { source, .. } => match source {
3343                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3344                HydroSource::ExternalNetwork()
3345                | HydroSource::Spin()
3346                | HydroSource::ClusterMembers(_) => {} // TODO: what goes here?
3347            },
3348            HydroNode::SingletonSource { value, .. } => {
3349                transform(value);
3350            }
3351            HydroNode::CycleSource { .. }
3352            | HydroNode::Tee { .. }
3353            | HydroNode::YieldConcat { .. }
3354            | HydroNode::BeginAtomic { .. }
3355            | HydroNode::EndAtomic { .. }
3356            | HydroNode::Batch { .. }
3357            | HydroNode::Chain { .. }
3358            | HydroNode::ChainFirst { .. }
3359            | HydroNode::CrossProduct { .. }
3360            | HydroNode::CrossSingleton { .. }
3361            | HydroNode::ResolveFutures { .. }
3362            | HydroNode::ResolveFuturesOrdered { .. }
3363            | HydroNode::Join { .. }
3364            | HydroNode::Difference { .. }
3365            | HydroNode::AntiJoin { .. }
3366            | HydroNode::DeferTick { .. }
3367            | HydroNode::Enumerate { .. }
3368            | HydroNode::Unique { .. }
3369            | HydroNode::Sort { .. } => {}
3370            HydroNode::Map { f, .. }
3371            | HydroNode::FlatMap { f, .. }
3372            | HydroNode::Filter { f, .. }
3373            | HydroNode::FilterMap { f, .. }
3374            | HydroNode::Inspect { f, .. }
3375            | HydroNode::Reduce { f, .. }
3376            | HydroNode::ReduceKeyed { f, .. }
3377            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3378                transform(f);
3379            }
3380            HydroNode::Fold { init, acc, .. }
3381            | HydroNode::Scan { init, acc, .. }
3382            | HydroNode::FoldKeyed { init, acc, .. } => {
3383                transform(init);
3384                transform(acc);
3385            }
3386            HydroNode::Network {
3387                serialize_fn,
3388                deserialize_fn,
3389                ..
3390            } => {
3391                if let Some(serialize_fn) = serialize_fn {
3392                    transform(serialize_fn);
3393                }
3394                if let Some(deserialize_fn) = deserialize_fn {
3395                    transform(deserialize_fn);
3396                }
3397            }
3398            HydroNode::ExternalInput { deserialize_fn, .. } => {
3399                if let Some(deserialize_fn) = deserialize_fn {
3400                    transform(deserialize_fn);
3401                }
3402            }
3403            HydroNode::Counter { duration, .. } => {
3404                transform(duration);
3405            }
3406        }
3407    }
3408
3409    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3410        &self.metadata().op
3411    }
3412
3413    pub fn metadata(&self) -> &HydroIrMetadata {
3414        match self {
3415            HydroNode::Placeholder => {
3416                panic!()
3417            }
3418            HydroNode::Cast { metadata, .. } => metadata,
3419            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3420            HydroNode::Source { metadata, .. } => metadata,
3421            HydroNode::SingletonSource { metadata, .. } => metadata,
3422            HydroNode::CycleSource { metadata, .. } => metadata,
3423            HydroNode::Tee { metadata, .. } => metadata,
3424            HydroNode::YieldConcat { metadata, .. } => metadata,
3425            HydroNode::BeginAtomic { metadata, .. } => metadata,
3426            HydroNode::EndAtomic { metadata, .. } => metadata,
3427            HydroNode::Batch { metadata, .. } => metadata,
3428            HydroNode::Chain { metadata, .. } => metadata,
3429            HydroNode::ChainFirst { metadata, .. } => metadata,
3430            HydroNode::CrossProduct { metadata, .. } => metadata,
3431            HydroNode::CrossSingleton { metadata, .. } => metadata,
3432            HydroNode::Join { metadata, .. } => metadata,
3433            HydroNode::Difference { metadata, .. } => metadata,
3434            HydroNode::AntiJoin { metadata, .. } => metadata,
3435            HydroNode::ResolveFutures { metadata, .. } => metadata,
3436            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3437            HydroNode::Map { metadata, .. } => metadata,
3438            HydroNode::FlatMap { metadata, .. } => metadata,
3439            HydroNode::Filter { metadata, .. } => metadata,
3440            HydroNode::FilterMap { metadata, .. } => metadata,
3441            HydroNode::DeferTick { metadata, .. } => metadata,
3442            HydroNode::Enumerate { metadata, .. } => metadata,
3443            HydroNode::Inspect { metadata, .. } => metadata,
3444            HydroNode::Unique { metadata, .. } => metadata,
3445            HydroNode::Sort { metadata, .. } => metadata,
3446            HydroNode::Scan { metadata, .. } => metadata,
3447            HydroNode::Fold { metadata, .. } => metadata,
3448            HydroNode::FoldKeyed { metadata, .. } => metadata,
3449            HydroNode::Reduce { metadata, .. } => metadata,
3450            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3451            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3452            HydroNode::ExternalInput { metadata, .. } => metadata,
3453            HydroNode::Network { metadata, .. } => metadata,
3454            HydroNode::Counter { metadata, .. } => metadata,
3455        }
3456    }
3457
3458    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3459        &mut self.metadata_mut().op
3460    }
3461
3462    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3463        match self {
3464            HydroNode::Placeholder => {
3465                panic!()
3466            }
3467            HydroNode::Cast { metadata, .. } => metadata,
3468            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3469            HydroNode::Source { metadata, .. } => metadata,
3470            HydroNode::SingletonSource { metadata, .. } => metadata,
3471            HydroNode::CycleSource { metadata, .. } => metadata,
3472            HydroNode::Tee { metadata, .. } => metadata,
3473            HydroNode::YieldConcat { metadata, .. } => metadata,
3474            HydroNode::BeginAtomic { metadata, .. } => metadata,
3475            HydroNode::EndAtomic { metadata, .. } => metadata,
3476            HydroNode::Batch { metadata, .. } => metadata,
3477            HydroNode::Chain { metadata, .. } => metadata,
3478            HydroNode::ChainFirst { metadata, .. } => metadata,
3479            HydroNode::CrossProduct { metadata, .. } => metadata,
3480            HydroNode::CrossSingleton { metadata, .. } => metadata,
3481            HydroNode::Join { metadata, .. } => metadata,
3482            HydroNode::Difference { metadata, .. } => metadata,
3483            HydroNode::AntiJoin { metadata, .. } => metadata,
3484            HydroNode::ResolveFutures { metadata, .. } => metadata,
3485            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3486            HydroNode::Map { metadata, .. } => metadata,
3487            HydroNode::FlatMap { metadata, .. } => metadata,
3488            HydroNode::Filter { metadata, .. } => metadata,
3489            HydroNode::FilterMap { metadata, .. } => metadata,
3490            HydroNode::DeferTick { metadata, .. } => metadata,
3491            HydroNode::Enumerate { metadata, .. } => metadata,
3492            HydroNode::Inspect { metadata, .. } => metadata,
3493            HydroNode::Unique { metadata, .. } => metadata,
3494            HydroNode::Sort { metadata, .. } => metadata,
3495            HydroNode::Scan { metadata, .. } => metadata,
3496            HydroNode::Fold { metadata, .. } => metadata,
3497            HydroNode::FoldKeyed { metadata, .. } => metadata,
3498            HydroNode::Reduce { metadata, .. } => metadata,
3499            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3500            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3501            HydroNode::ExternalInput { metadata, .. } => metadata,
3502            HydroNode::Network { metadata, .. } => metadata,
3503            HydroNode::Counter { metadata, .. } => metadata,
3504        }
3505    }
3506
3507    pub fn input(&self) -> Vec<&HydroNode> {
3508        match self {
3509            HydroNode::Placeholder => {
3510                panic!()
3511            }
3512            HydroNode::Source { .. }
3513            | HydroNode::SingletonSource { .. }
3514            | HydroNode::ExternalInput { .. }
3515            | HydroNode::CycleSource { .. }
3516            | HydroNode::Tee { .. } => {
3517                // Tee should find its input in separate special ways
3518                vec![]
3519            }
3520            HydroNode::Cast { inner, .. }
3521            | HydroNode::ObserveNonDet { inner, .. }
3522            | HydroNode::YieldConcat { inner, .. }
3523            | HydroNode::BeginAtomic { inner, .. }
3524            | HydroNode::EndAtomic { inner, .. }
3525            | HydroNode::Batch { inner, .. } => {
3526                vec![inner]
3527            }
3528            HydroNode::Chain { first, second, .. } => {
3529                vec![first, second]
3530            }
3531            HydroNode::ChainFirst { first, second, .. } => {
3532                vec![first, second]
3533            }
3534            HydroNode::CrossProduct { left, right, .. }
3535            | HydroNode::CrossSingleton { left, right, .. }
3536            | HydroNode::Join { left, right, .. } => {
3537                vec![left, right]
3538            }
3539            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3540                vec![pos, neg]
3541            }
3542            HydroNode::Map { input, .. }
3543            | HydroNode::FlatMap { input, .. }
3544            | HydroNode::Filter { input, .. }
3545            | HydroNode::FilterMap { input, .. }
3546            | HydroNode::Sort { input, .. }
3547            | HydroNode::DeferTick { input, .. }
3548            | HydroNode::Enumerate { input, .. }
3549            | HydroNode::Inspect { input, .. }
3550            | HydroNode::Unique { input, .. }
3551            | HydroNode::Network { input, .. }
3552            | HydroNode::Counter { input, .. }
3553            | HydroNode::ResolveFutures { input, .. }
3554            | HydroNode::ResolveFuturesOrdered { input, .. }
3555            | HydroNode::Fold { input, .. }
3556            | HydroNode::FoldKeyed { input, .. }
3557            | HydroNode::Reduce { input, .. }
3558            | HydroNode::ReduceKeyed { input, .. }
3559            | HydroNode::Scan { input, .. } => {
3560                vec![input]
3561            }
3562            HydroNode::ReduceKeyedWatermark {
3563                input, watermark, ..
3564            } => {
3565                vec![input, watermark]
3566            }
3567        }
3568    }
3569
3570    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3571        self.input()
3572            .iter()
3573            .map(|input_node| input_node.metadata())
3574            .collect()
3575    }
3576
3577    pub fn print_root(&self) -> String {
3578        match self {
3579            HydroNode::Placeholder => {
3580                panic!()
3581            }
3582            HydroNode::Cast { .. } => "Cast()".to_string(),
3583            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3584            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3585            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3586            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3587            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3588            HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3589            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3590            HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3591            HydroNode::Batch { .. } => "Batch()".to_string(),
3592            HydroNode::Chain { first, second, .. } => {
3593                format!("Chain({}, {})", first.print_root(), second.print_root())
3594            }
3595            HydroNode::ChainFirst { first, second, .. } => {
3596                format!(
3597                    "ChainFirst({}, {})",
3598                    first.print_root(),
3599                    second.print_root()
3600                )
3601            }
3602            HydroNode::CrossProduct { left, right, .. } => {
3603                format!(
3604                    "CrossProduct({}, {})",
3605                    left.print_root(),
3606                    right.print_root()
3607                )
3608            }
3609            HydroNode::CrossSingleton { left, right, .. } => {
3610                format!(
3611                    "CrossSingleton({}, {})",
3612                    left.print_root(),
3613                    right.print_root()
3614                )
3615            }
3616            HydroNode::Join { left, right, .. } => {
3617                format!("Join({}, {})", left.print_root(), right.print_root())
3618            }
3619            HydroNode::Difference { pos, neg, .. } => {
3620                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3621            }
3622            HydroNode::AntiJoin { pos, neg, .. } => {
3623                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3624            }
3625            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3626            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3627            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3628            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3629            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3630            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3631            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3632            HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3633            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3634            HydroNode::Unique { .. } => "Unique()".to_string(),
3635            HydroNode::Sort { .. } => "Sort()".to_string(),
3636            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3637            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3638            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3639            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3640            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3641            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3642            HydroNode::Network { .. } => "Network()".to_string(),
3643            HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3644            HydroNode::Counter { tag, duration, .. } => {
3645                format!("Counter({:?}, {:?})", tag, duration)
3646            }
3647        }
3648    }
3649}
3650
3651#[cfg(feature = "build")]
3652fn instantiate_network<'a, D>(
3653    from_location: &LocationId,
3654    to_location: &LocationId,
3655    processes: &HashMap<usize, D::Process>,
3656    clusters: &HashMap<usize, D::Cluster>,
3657) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3658where
3659    D: Deploy<'a>,
3660{
3661    let ((sink, source), connect_fn) = match (from_location, to_location) {
3662        (LocationId::Process(from), LocationId::Process(to)) => {
3663            let from_node = processes
3664                .get(from)
3665                .unwrap_or_else(|| {
3666                    panic!("A process used in the graph was not instantiated: {}", from)
3667                })
3668                .clone();
3669            let to_node = processes
3670                .get(to)
3671                .unwrap_or_else(|| {
3672                    panic!("A process used in the graph was not instantiated: {}", to)
3673                })
3674                .clone();
3675
3676            let sink_port = D::allocate_process_port(&from_node);
3677            let source_port = D::allocate_process_port(&to_node);
3678
3679            (
3680                D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3681                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3682            )
3683        }
3684        (LocationId::Process(from), LocationId::Cluster(to)) => {
3685            let from_node = processes
3686                .get(from)
3687                .unwrap_or_else(|| {
3688                    panic!("A process used in the graph was not instantiated: {}", from)
3689                })
3690                .clone();
3691            let to_node = clusters
3692                .get(to)
3693                .unwrap_or_else(|| {
3694                    panic!("A cluster used in the graph was not instantiated: {}", to)
3695                })
3696                .clone();
3697
3698            let sink_port = D::allocate_process_port(&from_node);
3699            let source_port = D::allocate_cluster_port(&to_node);
3700
3701            (
3702                D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3703                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3704            )
3705        }
3706        (LocationId::Cluster(from), LocationId::Process(to)) => {
3707            let from_node = clusters
3708                .get(from)
3709                .unwrap_or_else(|| {
3710                    panic!("A cluster used in the graph was not instantiated: {}", from)
3711                })
3712                .clone();
3713            let to_node = processes
3714                .get(to)
3715                .unwrap_or_else(|| {
3716                    panic!("A process used in the graph was not instantiated: {}", to)
3717                })
3718                .clone();
3719
3720            let sink_port = D::allocate_cluster_port(&from_node);
3721            let source_port = D::allocate_process_port(&to_node);
3722
3723            (
3724                D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3725                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3726            )
3727        }
3728        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3729            let from_node = clusters
3730                .get(from)
3731                .unwrap_or_else(|| {
3732                    panic!("A cluster used in the graph was not instantiated: {}", from)
3733                })
3734                .clone();
3735            let to_node = clusters
3736                .get(to)
3737                .unwrap_or_else(|| {
3738                    panic!("A cluster used in the graph was not instantiated: {}", to)
3739                })
3740                .clone();
3741
3742            let sink_port = D::allocate_cluster_port(&from_node);
3743            let source_port = D::allocate_cluster_port(&to_node);
3744
3745            (
3746                D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3747                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3748            )
3749        }
3750        (LocationId::Tick(_, _), _) => panic!(),
3751        (_, LocationId::Tick(_, _)) => panic!(),
3752        (LocationId::Atomic(_), _) => panic!(),
3753        (_, LocationId::Atomic(_)) => panic!(),
3754    };
3755    (sink, source, connect_fn)
3756}
3757
3758#[cfg(test)]
3759mod test {
3760    use std::mem::size_of;
3761
3762    use stageleft::{QuotedWithContext, q};
3763
3764    use super::*;
3765
3766    #[test]
3767    #[cfg_attr(
3768        not(feature = "build"),
3769        ignore = "expects inclusion of feature-gated fields"
3770    )]
3771    fn hydro_node_size() {
3772        assert_eq!(size_of::<HydroNode>(), 272);
3773    }
3774
3775    #[test]
3776    #[cfg_attr(
3777        not(feature = "build"),
3778        ignore = "expects inclusion of feature-gated fields"
3779    )]
3780    fn hydro_root_size() {
3781        assert_eq!(size_of::<HydroRoot>(), 168);
3782    }
3783
3784    #[test]
3785    fn test_simplify_q_macro_basic() {
3786        // Test basic non-q! expression
3787        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3788        let result = simplify_q_macro(simple_expr.clone());
3789        assert_eq!(result, simple_expr);
3790    }
3791
3792    #[test]
3793    fn test_simplify_q_macro_actual_stageleft_call() {
3794        // Test a simplified version of what a real stageleft call might look like
3795        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3796        let result = simplify_q_macro(stageleft_call);
3797        // This should be processed by our visitor and simplified to q!(...)
3798        // since we detect the stageleft::runtime_support::fn_* pattern
3799        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3800    }
3801
3802    #[test]
3803    fn test_closure_no_pipe_at_start() {
3804        // Test a closure that does not start with a pipe
3805        let stageleft_call = q!({
3806            let foo = 123;
3807            move |b: usize| b + foo
3808        })
3809        .splice_fn1_ctx(&());
3810        let result = simplify_q_macro(stageleft_call);
3811        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3812    }
3813}