1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26use crate::compile::builder::{CycleId, ExternalPortId};
27#[cfg(feature = "build")]
28use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
29use crate::location::dynamic::LocationId;
30use crate::location::{LocationKey, NetworkHint};
31
32pub mod backtrace;
33use backtrace::Backtrace;
34
35#[derive(Clone, Hash)]
39pub struct DebugExpr(pub Box<syn::Expr>);
40
41impl From<syn::Expr> for DebugExpr {
42 fn from(expr: syn::Expr) -> Self {
43 Self(Box::new(expr))
44 }
45}
46
47impl Deref for DebugExpr {
48 type Target = syn::Expr;
49
50 fn deref(&self) -> &Self::Target {
51 &self.0
52 }
53}
54
55impl ToTokens for DebugExpr {
56 fn to_tokens(&self, tokens: &mut TokenStream) {
57 self.0.to_tokens(tokens);
58 }
59}
60
61impl Debug for DebugExpr {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.0.to_token_stream())
64 }
65}
66
67impl Display for DebugExpr {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 let original = self.0.as_ref().clone();
70 let simplified = simplify_q_macro(original);
71
72 write!(f, "q!({})", quote::quote!(#simplified))
75 }
76}
77
78fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
80 let mut simplifier = QMacroSimplifier::new();
83 simplifier.visit_expr_mut(&mut expr);
84
85 if let Some(simplified) = simplifier.simplified_result {
87 simplified
88 } else {
89 expr
90 }
91}
92
93#[derive(Default)]
95pub struct QMacroSimplifier {
96 pub simplified_result: Option<syn::Expr>,
97}
98
99impl QMacroSimplifier {
100 pub fn new() -> Self {
101 Self::default()
102 }
103}
104
105impl VisitMut for QMacroSimplifier {
106 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
107 if self.simplified_result.is_some() {
109 return;
110 }
111
112 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
113 && self.is_stageleft_runtime_support_call(&path_expr.path)
115 && let Some(closure) = self.extract_closure_from_args(&call.args)
117 {
118 self.simplified_result = Some(closure);
119 return;
120 }
121
122 syn::visit_mut::visit_expr_mut(self, expr);
125 }
126}
127
128impl QMacroSimplifier {
129 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
130 if let Some(last_segment) = path.segments.last() {
132 let fn_name = last_segment.ident.to_string();
133 fn_name.contains("_type_hint")
135 && path.segments.len() > 2
136 && path.segments[0].ident == "stageleft"
137 && path.segments[1].ident == "runtime_support"
138 } else {
139 false
140 }
141 }
142
143 fn extract_closure_from_args(
144 &self,
145 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
146 ) -> Option<syn::Expr> {
147 for arg in args {
149 if let syn::Expr::Closure(_) = arg {
150 return Some(arg.clone());
151 }
152 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
154 return Some(closure_expr);
155 }
156 }
157 None
158 }
159
160 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
161 let mut visitor = ClosureFinder {
162 found_closure: None,
163 prefer_inner_blocks: true,
164 };
165 visitor.visit_expr(expr);
166 visitor.found_closure
167 }
168}
169
170struct ClosureFinder {
172 found_closure: Option<syn::Expr>,
173 prefer_inner_blocks: bool,
174}
175
176impl<'ast> Visit<'ast> for ClosureFinder {
177 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
178 if self.found_closure.is_some() {
180 return;
181 }
182
183 match expr {
184 syn::Expr::Closure(_) => {
185 self.found_closure = Some(expr.clone());
186 }
187 syn::Expr::Block(block) if self.prefer_inner_blocks => {
188 for stmt in &block.block.stmts {
190 if let syn::Stmt::Expr(stmt_expr, _) = stmt
191 && let syn::Expr::Block(_) = stmt_expr
192 {
193 let mut inner_visitor = ClosureFinder {
195 found_closure: None,
196 prefer_inner_blocks: false, };
198 inner_visitor.visit_expr(stmt_expr);
199 if inner_visitor.found_closure.is_some() {
200 self.found_closure = Some(stmt_expr.clone());
202 return;
203 }
204 }
205 }
206
207 visit::visit_expr(self, expr);
209
210 if self.found_closure.is_some() {
213 }
215 }
216 _ => {
217 visit::visit_expr(self, expr);
219 }
220 }
221 }
222}
223
224#[derive(Clone, PartialEq, Eq, Hash)]
228pub struct DebugType(pub Box<syn::Type>);
229
230impl From<syn::Type> for DebugType {
231 fn from(t: syn::Type) -> Self {
232 Self(Box::new(t))
233 }
234}
235
236impl Deref for DebugType {
237 type Target = syn::Type;
238
239 fn deref(&self) -> &Self::Target {
240 &self.0
241 }
242}
243
244impl ToTokens for DebugType {
245 fn to_tokens(&self, tokens: &mut TokenStream) {
246 self.0.to_tokens(tokens);
247 }
248}
249
250impl Debug for DebugType {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 write!(f, "{}", self.0.to_token_stream())
253 }
254}
255
256pub enum DebugInstantiate {
257 Building,
258 Finalized(Box<DebugInstantiateFinalized>),
259}
260
261#[cfg_attr(
262 not(feature = "build"),
263 expect(
264 dead_code,
265 reason = "sink, source unused without `feature = \"build\"`."
266 )
267)]
268pub struct DebugInstantiateFinalized {
269 sink: syn::Expr,
270 source: syn::Expr,
271 connect_fn: Option<Box<dyn FnOnce()>>,
272}
273
274impl From<DebugInstantiateFinalized> for DebugInstantiate {
275 fn from(f: DebugInstantiateFinalized) -> Self {
276 Self::Finalized(Box::new(f))
277 }
278}
279
280impl Debug for DebugInstantiate {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 write!(f, "<network instantiate>")
283 }
284}
285
286impl Hash for DebugInstantiate {
287 fn hash<H: Hasher>(&self, _state: &mut H) {
288 }
290}
291
292impl Clone for DebugInstantiate {
293 fn clone(&self) -> Self {
294 match self {
295 DebugInstantiate::Building => DebugInstantiate::Building,
296 DebugInstantiate::Finalized(_) => {
297 panic!("DebugInstantiate::Finalized should not be cloned")
298 }
299 }
300 }
301}
302
303#[derive(Debug, Hash, Clone)]
312pub enum ClusterMembersState {
313 Uninit,
315 Stream(DebugExpr),
318 Tee(LocationId, LocationId),
322}
323
324#[derive(Debug, Hash, Clone)]
326pub enum HydroSource {
327 Stream(DebugExpr),
328 ExternalNetwork(),
329 Iter(DebugExpr),
330 Spin(),
331 ClusterMembers(LocationId, ClusterMembersState),
332 Embedded(syn::Ident),
333 EmbeddedSingleton(syn::Ident),
334}
335
336#[cfg(feature = "build")]
337pub trait DfirBuilder {
343 fn singleton_intermediates(&self) -> bool;
345
346 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
348
349 fn batch(
350 &mut self,
351 in_ident: syn::Ident,
352 in_location: &LocationId,
353 in_kind: &CollectionKind,
354 out_ident: &syn::Ident,
355 out_location: &LocationId,
356 op_meta: &HydroIrOpMetadata,
357 );
358 fn yield_from_tick(
359 &mut self,
360 in_ident: syn::Ident,
361 in_location: &LocationId,
362 in_kind: &CollectionKind,
363 out_ident: &syn::Ident,
364 out_location: &LocationId,
365 );
366
367 fn begin_atomic(
368 &mut self,
369 in_ident: syn::Ident,
370 in_location: &LocationId,
371 in_kind: &CollectionKind,
372 out_ident: &syn::Ident,
373 out_location: &LocationId,
374 op_meta: &HydroIrOpMetadata,
375 );
376 fn end_atomic(
377 &mut self,
378 in_ident: syn::Ident,
379 in_location: &LocationId,
380 in_kind: &CollectionKind,
381 out_ident: &syn::Ident,
382 );
383
384 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
385 fn observe_nondet(
386 &mut self,
387 trusted: bool,
388 location: &LocationId,
389 in_ident: syn::Ident,
390 in_kind: &CollectionKind,
391 out_ident: &syn::Ident,
392 out_kind: &CollectionKind,
393 op_meta: &HydroIrOpMetadata,
394 );
395
396 #[expect(clippy::too_many_arguments, reason = "TODO")]
397 fn create_network(
398 &mut self,
399 from: &LocationId,
400 to: &LocationId,
401 input_ident: syn::Ident,
402 out_ident: &syn::Ident,
403 serialize: Option<&DebugExpr>,
404 sink: syn::Expr,
405 source: syn::Expr,
406 deserialize: Option<&DebugExpr>,
407 tag_id: usize,
408 networking_info: &crate::networking::NetworkingInfo,
409 );
410
411 fn create_external_source(
412 &mut self,
413 on: &LocationId,
414 source_expr: syn::Expr,
415 out_ident: &syn::Ident,
416 deserialize: Option<&DebugExpr>,
417 tag_id: usize,
418 );
419
420 fn create_external_output(
421 &mut self,
422 on: &LocationId,
423 sink_expr: syn::Expr,
424 input_ident: &syn::Ident,
425 serialize: Option<&DebugExpr>,
426 tag_id: usize,
427 );
428}
429
430#[cfg(feature = "build")]
431impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
432 fn singleton_intermediates(&self) -> bool {
433 false
434 }
435
436 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
437 self.entry(location.root().key())
438 .expect("location was removed")
439 .or_default()
440 }
441
442 fn batch(
443 &mut self,
444 in_ident: syn::Ident,
445 in_location: &LocationId,
446 in_kind: &CollectionKind,
447 out_ident: &syn::Ident,
448 _out_location: &LocationId,
449 _op_meta: &HydroIrOpMetadata,
450 ) {
451 let builder = self.get_dfir_mut(in_location.root());
452 if in_kind.is_bounded()
453 && matches!(
454 in_kind,
455 CollectionKind::Singleton { .. }
456 | CollectionKind::Optional { .. }
457 | CollectionKind::KeyedSingleton { .. }
458 )
459 {
460 assert!(in_location.is_top_level());
461 builder.add_dfir(
462 parse_quote! {
463 #out_ident = #in_ident -> persist::<'static>();
464 },
465 None,
466 None,
467 );
468 } else {
469 builder.add_dfir(
470 parse_quote! {
471 #out_ident = #in_ident;
472 },
473 None,
474 None,
475 );
476 }
477 }
478
479 fn yield_from_tick(
480 &mut self,
481 in_ident: syn::Ident,
482 in_location: &LocationId,
483 _in_kind: &CollectionKind,
484 out_ident: &syn::Ident,
485 _out_location: &LocationId,
486 ) {
487 let builder = self.get_dfir_mut(in_location.root());
488 builder.add_dfir(
489 parse_quote! {
490 #out_ident = #in_ident;
491 },
492 None,
493 None,
494 );
495 }
496
497 fn begin_atomic(
498 &mut self,
499 in_ident: syn::Ident,
500 in_location: &LocationId,
501 _in_kind: &CollectionKind,
502 out_ident: &syn::Ident,
503 _out_location: &LocationId,
504 _op_meta: &HydroIrOpMetadata,
505 ) {
506 let builder = self.get_dfir_mut(in_location.root());
507 builder.add_dfir(
508 parse_quote! {
509 #out_ident = #in_ident;
510 },
511 None,
512 None,
513 );
514 }
515
516 fn end_atomic(
517 &mut self,
518 in_ident: syn::Ident,
519 in_location: &LocationId,
520 _in_kind: &CollectionKind,
521 out_ident: &syn::Ident,
522 ) {
523 let builder = self.get_dfir_mut(in_location.root());
524 builder.add_dfir(
525 parse_quote! {
526 #out_ident = #in_ident;
527 },
528 None,
529 None,
530 );
531 }
532
533 fn observe_nondet(
534 &mut self,
535 _trusted: bool,
536 location: &LocationId,
537 in_ident: syn::Ident,
538 _in_kind: &CollectionKind,
539 out_ident: &syn::Ident,
540 _out_kind: &CollectionKind,
541 _op_meta: &HydroIrOpMetadata,
542 ) {
543 let builder = self.get_dfir_mut(location);
544 builder.add_dfir(
545 parse_quote! {
546 #out_ident = #in_ident;
547 },
548 None,
549 None,
550 );
551 }
552
553 fn create_network(
554 &mut self,
555 from: &LocationId,
556 to: &LocationId,
557 input_ident: syn::Ident,
558 out_ident: &syn::Ident,
559 serialize: Option<&DebugExpr>,
560 sink: syn::Expr,
561 source: syn::Expr,
562 deserialize: Option<&DebugExpr>,
563 tag_id: usize,
564 _networking_info: &crate::networking::NetworkingInfo,
565 ) {
566 let sender_builder = self.get_dfir_mut(from);
567 if let Some(serialize_pipeline) = serialize {
568 sender_builder.add_dfir(
569 parse_quote! {
570 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
571 },
572 None,
573 Some(&format!("send{}", tag_id)),
575 );
576 } else {
577 sender_builder.add_dfir(
578 parse_quote! {
579 #input_ident -> dest_sink(#sink);
580 },
581 None,
582 Some(&format!("send{}", tag_id)),
583 );
584 }
585
586 let receiver_builder = self.get_dfir_mut(to);
587 if let Some(deserialize_pipeline) = deserialize {
588 receiver_builder.add_dfir(
589 parse_quote! {
590 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
591 },
592 None,
593 Some(&format!("recv{}", tag_id)),
594 );
595 } else {
596 receiver_builder.add_dfir(
597 parse_quote! {
598 #out_ident = source_stream(#source);
599 },
600 None,
601 Some(&format!("recv{}", tag_id)),
602 );
603 }
604 }
605
606 fn create_external_source(
607 &mut self,
608 on: &LocationId,
609 source_expr: syn::Expr,
610 out_ident: &syn::Ident,
611 deserialize: Option<&DebugExpr>,
612 tag_id: usize,
613 ) {
614 let receiver_builder = self.get_dfir_mut(on);
615 if let Some(deserialize_pipeline) = deserialize {
616 receiver_builder.add_dfir(
617 parse_quote! {
618 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
619 },
620 None,
621 Some(&format!("recv{}", tag_id)),
622 );
623 } else {
624 receiver_builder.add_dfir(
625 parse_quote! {
626 #out_ident = source_stream(#source_expr);
627 },
628 None,
629 Some(&format!("recv{}", tag_id)),
630 );
631 }
632 }
633
634 fn create_external_output(
635 &mut self,
636 on: &LocationId,
637 sink_expr: syn::Expr,
638 input_ident: &syn::Ident,
639 serialize: Option<&DebugExpr>,
640 tag_id: usize,
641 ) {
642 let sender_builder = self.get_dfir_mut(on);
643 if let Some(serialize_fn) = serialize {
644 sender_builder.add_dfir(
645 parse_quote! {
646 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
647 },
648 None,
649 Some(&format!("send{}", tag_id)),
651 );
652 } else {
653 sender_builder.add_dfir(
654 parse_quote! {
655 #input_ident -> dest_sink(#sink_expr);
656 },
657 None,
658 Some(&format!("send{}", tag_id)),
659 );
660 }
661 }
662}
663
664#[cfg(feature = "build")]
665pub enum BuildersOrCallback<'a, L, N>
666where
667 L: FnMut(&mut HydroRoot, &mut usize),
668 N: FnMut(&mut HydroNode, &mut usize),
669{
670 Builders(&'a mut dyn DfirBuilder),
671 Callback(L, N),
672}
673
674#[derive(Debug, Hash)]
678pub enum HydroRoot {
679 ForEach {
680 f: DebugExpr,
681 input: Box<HydroNode>,
682 op_metadata: HydroIrOpMetadata,
683 },
684 SendExternal {
685 to_external_key: LocationKey,
686 to_port_id: ExternalPortId,
687 to_many: bool,
688 unpaired: bool,
689 serialize_fn: Option<DebugExpr>,
690 instantiate_fn: DebugInstantiate,
691 input: Box<HydroNode>,
692 op_metadata: HydroIrOpMetadata,
693 },
694 DestSink {
695 sink: DebugExpr,
696 input: Box<HydroNode>,
697 op_metadata: HydroIrOpMetadata,
698 },
699 CycleSink {
700 cycle_id: CycleId,
701 input: Box<HydroNode>,
702 op_metadata: HydroIrOpMetadata,
703 },
704 EmbeddedOutput {
705 ident: syn::Ident,
706 input: Box<HydroNode>,
707 op_metadata: HydroIrOpMetadata,
708 },
709 Null {
710 input: Box<HydroNode>,
711 op_metadata: HydroIrOpMetadata,
712 },
713}
714
715impl HydroRoot {
716 #[cfg(feature = "build")]
717 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
718 pub fn compile_network<'a, D>(
719 &mut self,
720 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
721 seen_tees: &mut SeenSharedNodes,
722 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
723 processes: &SparseSecondaryMap<LocationKey, D::Process>,
724 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
725 externals: &SparseSecondaryMap<LocationKey, D::External>,
726 env: &mut D::InstantiateEnv,
727 ) where
728 D: Deploy<'a>,
729 {
730 let refcell_extra_stmts = RefCell::new(extra_stmts);
731 let refcell_env = RefCell::new(env);
732 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
733 self.transform_bottom_up(
734 &mut |l| {
735 if let HydroRoot::SendExternal {
736 input,
737 to_external_key,
738 to_port_id,
739 to_many,
740 unpaired,
741 instantiate_fn,
742 ..
743 } = l
744 {
745 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
746 DebugInstantiate::Building => {
747 let to_node = externals
748 .get(*to_external_key)
749 .unwrap_or_else(|| {
750 panic!("A external used in the graph was not instantiated: {}", to_external_key)
751 })
752 .clone();
753
754 match input.metadata().location_id.root() {
755 &LocationId::Process(process_key) => {
756 if *to_many {
757 (
758 (
759 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
760 parse_quote!(DUMMY),
761 ),
762 Box::new(|| {}) as Box<dyn FnOnce()>,
763 )
764 } else {
765 let from_node = processes
766 .get(process_key)
767 .unwrap_or_else(|| {
768 panic!("A process used in the graph was not instantiated: {}", process_key)
769 })
770 .clone();
771
772 let sink_port = from_node.next_port();
773 let source_port = to_node.next_port();
774
775 if *unpaired {
776 use stageleft::quote_type;
777 use tokio_util::codec::LengthDelimitedCodec;
778
779 to_node.register(*to_port_id, source_port.clone());
780
781 let _ = D::e2o_source(
782 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
783 &to_node, &source_port,
784 &from_node, &sink_port,
785 "e_type::<LengthDelimitedCodec>(),
786 format!("{}_{}", *to_external_key, *to_port_id)
787 );
788 }
789
790 (
791 (
792 D::o2e_sink(
793 &from_node,
794 &sink_port,
795 &to_node,
796 &source_port,
797 format!("{}_{}", *to_external_key, *to_port_id)
798 ),
799 parse_quote!(DUMMY),
800 ),
801 if *unpaired {
802 D::e2o_connect(
803 &to_node,
804 &source_port,
805 &from_node,
806 &sink_port,
807 *to_many,
808 NetworkHint::Auto,
809 )
810 } else {
811 Box::new(|| {}) as Box<dyn FnOnce()>
812 },
813 )
814 }
815 }
816 LocationId::Cluster(_) => todo!("SendExternal from a cluster location is not yet supported"),
817 _ => panic!()
818 }
819 },
820
821 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
822 };
823
824 *instantiate_fn = DebugInstantiateFinalized {
825 sink: sink_expr,
826 source: source_expr,
827 connect_fn: Some(connect_fn),
828 }
829 .into();
830 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
831 let element_type = match &input.metadata().collection_kind {
832 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
833 _ => panic!("Embedded output must have Stream collection kind"),
834 };
835 let location_key = match input.metadata().location_id.root() {
836 LocationId::Process(key) | LocationId::Cluster(key) => *key,
837 _ => panic!("Embedded output must be on a process or cluster"),
838 };
839 D::register_embedded_output(
840 &mut refcell_env.borrow_mut(),
841 location_key,
842 ident,
843 &element_type,
844 );
845 }
846 },
847 &mut |n| {
848 if let HydroNode::Network {
849 name,
850 networking_info,
851 input,
852 instantiate_fn,
853 metadata,
854 ..
855 } = n
856 {
857 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
858 DebugInstantiate::Building => instantiate_network::<D>(
859 &mut refcell_env.borrow_mut(),
860 input.metadata().location_id.root(),
861 metadata.location_id.root(),
862 processes,
863 clusters,
864 name.as_deref(),
865 networking_info,
866 ),
867
868 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
869 };
870
871 *instantiate_fn = DebugInstantiateFinalized {
872 sink: sink_expr,
873 source: source_expr,
874 connect_fn: Some(connect_fn),
875 }
876 .into();
877 } else if let HydroNode::ExternalInput {
878 from_external_key,
879 from_port_id,
880 from_many,
881 codec_type,
882 port_hint,
883 instantiate_fn,
884 metadata,
885 ..
886 } = n
887 {
888 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
889 DebugInstantiate::Building => {
890 let from_node = externals
891 .get(*from_external_key)
892 .unwrap_or_else(|| {
893 panic!(
894 "A external used in the graph was not instantiated: {}",
895 from_external_key,
896 )
897 })
898 .clone();
899
900 match metadata.location_id.root() {
901 &LocationId::Process(process_key) => {
902 let to_node = processes
903 .get(process_key)
904 .unwrap_or_else(|| {
905 panic!("A process used in the graph was not instantiated: {}", process_key)
906 })
907 .clone();
908
909 let sink_port = from_node.next_port();
910 let source_port = to_node.next_port();
911
912 from_node.register(*from_port_id, sink_port.clone());
913
914 (
915 (
916 parse_quote!(DUMMY),
917 if *from_many {
918 D::e2o_many_source(
919 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
920 &to_node, &source_port,
921 codec_type.0.as_ref(),
922 format!("{}_{}", *from_external_key, *from_port_id)
923 )
924 } else {
925 D::e2o_source(
926 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
927 &from_node, &sink_port,
928 &to_node, &source_port,
929 codec_type.0.as_ref(),
930 format!("{}_{}", *from_external_key, *from_port_id)
931 )
932 },
933 ),
934 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
935 )
936 }
937 LocationId::Cluster(_) => todo!("ExternalInput to a cluster location is not yet supported"),
938 _ => panic!()
939 }
940 },
941
942 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
943 };
944
945 *instantiate_fn = DebugInstantiateFinalized {
946 sink: sink_expr,
947 source: source_expr,
948 connect_fn: Some(connect_fn),
949 }
950 .into();
951 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
952 let element_type = match &metadata.collection_kind {
953 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
954 _ => panic!("Embedded source must have Stream collection kind"),
955 };
956 let location_key = match metadata.location_id.root() {
957 LocationId::Process(key) | LocationId::Cluster(key) => *key,
958 _ => panic!("Embedded source must be on a process or cluster"),
959 };
960 D::register_embedded_stream_input(
961 &mut refcell_env.borrow_mut(),
962 location_key,
963 ident,
964 &element_type,
965 );
966 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
967 let element_type = match &metadata.collection_kind {
968 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
969 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
970 };
971 let location_key = match metadata.location_id.root() {
972 LocationId::Process(key) | LocationId::Cluster(key) => *key,
973 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
974 };
975 D::register_embedded_singleton_input(
976 &mut refcell_env.borrow_mut(),
977 location_key,
978 ident,
979 &element_type,
980 );
981 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
982 match state {
983 ClusterMembersState::Uninit => {
984 let at_location = metadata.location_id.root().clone();
985 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
986 if refcell_seen_cluster_members.borrow_mut().insert(key) {
987 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
989 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
990 &(),
991 );
992 *state = ClusterMembersState::Stream(expr.into());
993 } else {
994 *state = ClusterMembersState::Tee(at_location, location_id.clone());
996 }
997 }
998 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
999 panic!("cluster members already finalized");
1000 }
1001 }
1002 }
1003 },
1004 seen_tees,
1005 false,
1006 );
1007 }
1008
1009 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1010 self.transform_bottom_up(
1011 &mut |l| {
1012 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1013 match instantiate_fn {
1014 DebugInstantiate::Building => panic!("network not built"),
1015
1016 DebugInstantiate::Finalized(finalized) => {
1017 (finalized.connect_fn.take().unwrap())();
1018 }
1019 }
1020 }
1021 },
1022 &mut |n| {
1023 if let HydroNode::Network { instantiate_fn, .. }
1024 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1025 {
1026 match instantiate_fn {
1027 DebugInstantiate::Building => panic!("network not built"),
1028
1029 DebugInstantiate::Finalized(finalized) => {
1030 (finalized.connect_fn.take().unwrap())();
1031 }
1032 }
1033 }
1034 },
1035 seen_tees,
1036 false,
1037 );
1038 }
1039
1040 pub fn transform_bottom_up(
1041 &mut self,
1042 transform_root: &mut impl FnMut(&mut HydroRoot),
1043 transform_node: &mut impl FnMut(&mut HydroNode),
1044 seen_tees: &mut SeenSharedNodes,
1045 check_well_formed: bool,
1046 ) {
1047 self.transform_children(
1048 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1049 seen_tees,
1050 );
1051
1052 transform_root(self);
1053 }
1054
1055 pub fn transform_children(
1056 &mut self,
1057 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1058 seen_tees: &mut SeenSharedNodes,
1059 ) {
1060 match self {
1061 HydroRoot::ForEach { input, .. }
1062 | HydroRoot::SendExternal { input, .. }
1063 | HydroRoot::DestSink { input, .. }
1064 | HydroRoot::CycleSink { input, .. }
1065 | HydroRoot::EmbeddedOutput { input, .. }
1066 | HydroRoot::Null { input, .. } => {
1067 transform(input, seen_tees);
1068 }
1069 }
1070 }
1071
1072 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1073 match self {
1074 HydroRoot::ForEach {
1075 f,
1076 input,
1077 op_metadata,
1078 } => HydroRoot::ForEach {
1079 f: f.clone(),
1080 input: Box::new(input.deep_clone(seen_tees)),
1081 op_metadata: op_metadata.clone(),
1082 },
1083 HydroRoot::SendExternal {
1084 to_external_key,
1085 to_port_id,
1086 to_many,
1087 unpaired,
1088 serialize_fn,
1089 instantiate_fn,
1090 input,
1091 op_metadata,
1092 } => HydroRoot::SendExternal {
1093 to_external_key: *to_external_key,
1094 to_port_id: *to_port_id,
1095 to_many: *to_many,
1096 unpaired: *unpaired,
1097 serialize_fn: serialize_fn.clone(),
1098 instantiate_fn: instantiate_fn.clone(),
1099 input: Box::new(input.deep_clone(seen_tees)),
1100 op_metadata: op_metadata.clone(),
1101 },
1102 HydroRoot::DestSink {
1103 sink,
1104 input,
1105 op_metadata,
1106 } => HydroRoot::DestSink {
1107 sink: sink.clone(),
1108 input: Box::new(input.deep_clone(seen_tees)),
1109 op_metadata: op_metadata.clone(),
1110 },
1111 HydroRoot::CycleSink {
1112 cycle_id,
1113 input,
1114 op_metadata,
1115 } => HydroRoot::CycleSink {
1116 cycle_id: *cycle_id,
1117 input: Box::new(input.deep_clone(seen_tees)),
1118 op_metadata: op_metadata.clone(),
1119 },
1120 HydroRoot::EmbeddedOutput {
1121 ident,
1122 input,
1123 op_metadata,
1124 } => HydroRoot::EmbeddedOutput {
1125 ident: ident.clone(),
1126 input: Box::new(input.deep_clone(seen_tees)),
1127 op_metadata: op_metadata.clone(),
1128 },
1129 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1130 input: Box::new(input.deep_clone(seen_tees)),
1131 op_metadata: op_metadata.clone(),
1132 },
1133 }
1134 }
1135
1136 #[cfg(feature = "build")]
1137 pub fn emit(
1138 &mut self,
1139 graph_builders: &mut dyn DfirBuilder,
1140 seen_tees: &mut SeenSharedNodes,
1141 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1142 next_stmt_id: &mut usize,
1143 ) {
1144 self.emit_core(
1145 &mut BuildersOrCallback::<
1146 fn(&mut HydroRoot, &mut usize),
1147 fn(&mut HydroNode, &mut usize),
1148 >::Builders(graph_builders),
1149 seen_tees,
1150 built_tees,
1151 next_stmt_id,
1152 );
1153 }
1154
1155 #[cfg(feature = "build")]
1156 pub fn emit_core(
1157 &mut self,
1158 builders_or_callback: &mut BuildersOrCallback<
1159 impl FnMut(&mut HydroRoot, &mut usize),
1160 impl FnMut(&mut HydroNode, &mut usize),
1161 >,
1162 seen_tees: &mut SeenSharedNodes,
1163 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1164 next_stmt_id: &mut usize,
1165 ) {
1166 match self {
1167 HydroRoot::ForEach { f, input, .. } => {
1168 let input_ident =
1169 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1170
1171 match builders_or_callback {
1172 BuildersOrCallback::Builders(graph_builders) => {
1173 graph_builders
1174 .get_dfir_mut(&input.metadata().location_id)
1175 .add_dfir(
1176 parse_quote! {
1177 #input_ident -> for_each(#f);
1178 },
1179 None,
1180 Some(&next_stmt_id.to_string()),
1181 );
1182 }
1183 BuildersOrCallback::Callback(leaf_callback, _) => {
1184 leaf_callback(self, next_stmt_id);
1185 }
1186 }
1187
1188 *next_stmt_id += 1;
1189 }
1190
1191 HydroRoot::SendExternal {
1192 serialize_fn,
1193 instantiate_fn,
1194 input,
1195 ..
1196 } => {
1197 let input_ident =
1198 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1199
1200 match builders_or_callback {
1201 BuildersOrCallback::Builders(graph_builders) => {
1202 let (sink_expr, _) = match instantiate_fn {
1203 DebugInstantiate::Building => (
1204 syn::parse_quote!(DUMMY_SINK),
1205 syn::parse_quote!(DUMMY_SOURCE),
1206 ),
1207
1208 DebugInstantiate::Finalized(finalized) => {
1209 (finalized.sink.clone(), finalized.source.clone())
1210 }
1211 };
1212
1213 graph_builders.create_external_output(
1214 &input.metadata().location_id,
1215 sink_expr,
1216 &input_ident,
1217 serialize_fn.as_ref(),
1218 *next_stmt_id,
1219 );
1220 }
1221 BuildersOrCallback::Callback(leaf_callback, _) => {
1222 leaf_callback(self, next_stmt_id);
1223 }
1224 }
1225
1226 *next_stmt_id += 1;
1227 }
1228
1229 HydroRoot::DestSink { sink, input, .. } => {
1230 let input_ident =
1231 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1232
1233 match builders_or_callback {
1234 BuildersOrCallback::Builders(graph_builders) => {
1235 graph_builders
1236 .get_dfir_mut(&input.metadata().location_id)
1237 .add_dfir(
1238 parse_quote! {
1239 #input_ident -> dest_sink(#sink);
1240 },
1241 None,
1242 Some(&next_stmt_id.to_string()),
1243 );
1244 }
1245 BuildersOrCallback::Callback(leaf_callback, _) => {
1246 leaf_callback(self, next_stmt_id);
1247 }
1248 }
1249
1250 *next_stmt_id += 1;
1251 }
1252
1253 HydroRoot::CycleSink {
1254 cycle_id, input, ..
1255 } => {
1256 let input_ident =
1257 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1258
1259 match builders_or_callback {
1260 BuildersOrCallback::Builders(graph_builders) => {
1261 let elem_type: syn::Type = match &input.metadata().collection_kind {
1262 CollectionKind::KeyedSingleton {
1263 key_type,
1264 value_type,
1265 ..
1266 }
1267 | CollectionKind::KeyedStream {
1268 key_type,
1269 value_type,
1270 ..
1271 } => {
1272 parse_quote!((#key_type, #value_type))
1273 }
1274 CollectionKind::Stream { element_type, .. }
1275 | CollectionKind::Singleton { element_type, .. }
1276 | CollectionKind::Optional { element_type, .. } => {
1277 parse_quote!(#element_type)
1278 }
1279 };
1280
1281 let cycle_id_ident = cycle_id.as_ident();
1282 graph_builders
1283 .get_dfir_mut(&input.metadata().location_id)
1284 .add_dfir(
1285 parse_quote! {
1286 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1287 },
1288 None,
1289 None,
1290 );
1291 }
1292 BuildersOrCallback::Callback(_, _) => {}
1294 }
1295 }
1296
1297 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1298 let input_ident =
1299 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1300
1301 match builders_or_callback {
1302 BuildersOrCallback::Builders(graph_builders) => {
1303 graph_builders
1304 .get_dfir_mut(&input.metadata().location_id)
1305 .add_dfir(
1306 parse_quote! {
1307 #input_ident -> for_each(&mut #ident);
1308 },
1309 None,
1310 Some(&next_stmt_id.to_string()),
1311 );
1312 }
1313 BuildersOrCallback::Callback(leaf_callback, _) => {
1314 leaf_callback(self, next_stmt_id);
1315 }
1316 }
1317
1318 *next_stmt_id += 1;
1319 }
1320
1321 HydroRoot::Null { input, .. } => {
1322 let input_ident =
1323 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1324
1325 match builders_or_callback {
1326 BuildersOrCallback::Builders(graph_builders) => {
1327 graph_builders
1328 .get_dfir_mut(&input.metadata().location_id)
1329 .add_dfir(
1330 parse_quote! {
1331 #input_ident -> for_each(|_| {});
1332 },
1333 None,
1334 Some(&next_stmt_id.to_string()),
1335 );
1336 }
1337 BuildersOrCallback::Callback(leaf_callback, _) => {
1338 leaf_callback(self, next_stmt_id);
1339 }
1340 }
1341
1342 *next_stmt_id += 1;
1343 }
1344 }
1345 }
1346
1347 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1348 match self {
1349 HydroRoot::ForEach { op_metadata, .. }
1350 | HydroRoot::SendExternal { op_metadata, .. }
1351 | HydroRoot::DestSink { op_metadata, .. }
1352 | HydroRoot::CycleSink { op_metadata, .. }
1353 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1354 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1355 }
1356 }
1357
1358 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1359 match self {
1360 HydroRoot::ForEach { op_metadata, .. }
1361 | HydroRoot::SendExternal { op_metadata, .. }
1362 | HydroRoot::DestSink { op_metadata, .. }
1363 | HydroRoot::CycleSink { op_metadata, .. }
1364 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1365 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1366 }
1367 }
1368
1369 pub fn input(&self) -> &HydroNode {
1370 match self {
1371 HydroRoot::ForEach { input, .. }
1372 | HydroRoot::SendExternal { input, .. }
1373 | HydroRoot::DestSink { input, .. }
1374 | HydroRoot::CycleSink { input, .. }
1375 | HydroRoot::EmbeddedOutput { input, .. }
1376 | HydroRoot::Null { input, .. } => input,
1377 }
1378 }
1379
1380 pub fn input_metadata(&self) -> &HydroIrMetadata {
1381 self.input().metadata()
1382 }
1383
1384 pub fn print_root(&self) -> String {
1385 match self {
1386 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1387 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1388 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1389 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1390 HydroRoot::EmbeddedOutput { ident, .. } => {
1391 format!("EmbeddedOutput({})", ident)
1392 }
1393 HydroRoot::Null { .. } => "Null".to_owned(),
1394 }
1395 }
1396
1397 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1398 match self {
1399 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1400 transform(f);
1401 }
1402 HydroRoot::SendExternal { .. }
1403 | HydroRoot::CycleSink { .. }
1404 | HydroRoot::EmbeddedOutput { .. }
1405 | HydroRoot::Null { .. } => {}
1406 }
1407 }
1408}
1409
1410#[cfg(feature = "build")]
1411pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1412 let mut builders = SecondaryMap::new();
1413 let mut seen_tees = HashMap::new();
1414 let mut built_tees = HashMap::new();
1415 let mut next_stmt_id = 0;
1416 for leaf in ir {
1417 leaf.emit(
1418 &mut builders,
1419 &mut seen_tees,
1420 &mut built_tees,
1421 &mut next_stmt_id,
1422 );
1423 }
1424 builders
1425}
1426
1427#[cfg(feature = "build")]
1428pub fn traverse_dfir(
1429 ir: &mut [HydroRoot],
1430 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1431 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1432) {
1433 let mut seen_tees = HashMap::new();
1434 let mut built_tees = HashMap::new();
1435 let mut next_stmt_id = 0;
1436 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1437 ir.iter_mut().for_each(|leaf| {
1438 leaf.emit_core(
1439 &mut callback,
1440 &mut seen_tees,
1441 &mut built_tees,
1442 &mut next_stmt_id,
1443 );
1444 });
1445}
1446
1447pub fn transform_bottom_up(
1448 ir: &mut [HydroRoot],
1449 transform_root: &mut impl FnMut(&mut HydroRoot),
1450 transform_node: &mut impl FnMut(&mut HydroNode),
1451 check_well_formed: bool,
1452) {
1453 let mut seen_tees = HashMap::new();
1454 ir.iter_mut().for_each(|leaf| {
1455 leaf.transform_bottom_up(
1456 transform_root,
1457 transform_node,
1458 &mut seen_tees,
1459 check_well_formed,
1460 );
1461 });
1462}
1463
1464pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1465 let mut seen_tees = HashMap::new();
1466 ir.iter()
1467 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1468 .collect()
1469}
1470
1471type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1472thread_local! {
1473 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1474}
1475
1476pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1477 PRINTED_TEES.with(|printed_tees| {
1478 let mut printed_tees_mut = printed_tees.borrow_mut();
1479 *printed_tees_mut = Some((0, HashMap::new()));
1480 drop(printed_tees_mut);
1481
1482 let ret = f();
1483
1484 let mut printed_tees_mut = printed_tees.borrow_mut();
1485 *printed_tees_mut = None;
1486
1487 ret
1488 })
1489}
1490
1491pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1492
1493impl SharedNode {
1494 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1495 Rc::as_ptr(&self.0)
1496 }
1497}
1498
1499impl Debug for SharedNode {
1500 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1501 PRINTED_TEES.with(|printed_tees| {
1502 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1503 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1504
1505 if let Some(printed_tees_mut) = printed_tees_mut {
1506 if let Some(existing) = printed_tees_mut
1507 .1
1508 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1509 {
1510 write!(f, "<shared {}>", existing)
1511 } else {
1512 let next_id = printed_tees_mut.0;
1513 printed_tees_mut.0 += 1;
1514 printed_tees_mut
1515 .1
1516 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1517 drop(printed_tees_mut_borrow);
1518 write!(f, "<shared {}>: ", next_id)?;
1519 Debug::fmt(&self.0.borrow(), f)
1520 }
1521 } else {
1522 drop(printed_tees_mut_borrow);
1523 write!(f, "<shared>: ")?;
1524 Debug::fmt(&self.0.borrow(), f)
1525 }
1526 })
1527 }
1528}
1529
1530impl Hash for SharedNode {
1531 fn hash<H: Hasher>(&self, state: &mut H) {
1532 self.0.borrow_mut().hash(state);
1533 }
1534}
1535
1536#[derive(Clone, PartialEq, Eq, Debug)]
1537pub enum BoundKind {
1538 Unbounded,
1539 Bounded,
1540}
1541
1542#[derive(Clone, PartialEq, Eq, Debug)]
1543pub enum StreamOrder {
1544 NoOrder,
1545 TotalOrder,
1546}
1547
1548#[derive(Clone, PartialEq, Eq, Debug)]
1549pub enum StreamRetry {
1550 AtLeastOnce,
1551 ExactlyOnce,
1552}
1553
1554#[derive(Clone, PartialEq, Eq, Debug)]
1555pub enum KeyedSingletonBoundKind {
1556 Unbounded,
1557 BoundedValue,
1558 Bounded,
1559}
1560
1561#[derive(Clone, PartialEq, Eq, Debug)]
1562pub enum CollectionKind {
1563 Stream {
1564 bound: BoundKind,
1565 order: StreamOrder,
1566 retry: StreamRetry,
1567 element_type: DebugType,
1568 },
1569 Singleton {
1570 bound: BoundKind,
1571 element_type: DebugType,
1572 },
1573 Optional {
1574 bound: BoundKind,
1575 element_type: DebugType,
1576 },
1577 KeyedStream {
1578 bound: BoundKind,
1579 value_order: StreamOrder,
1580 value_retry: StreamRetry,
1581 key_type: DebugType,
1582 value_type: DebugType,
1583 },
1584 KeyedSingleton {
1585 bound: KeyedSingletonBoundKind,
1586 key_type: DebugType,
1587 value_type: DebugType,
1588 },
1589}
1590
1591impl CollectionKind {
1592 pub fn is_bounded(&self) -> bool {
1593 matches!(
1594 self,
1595 CollectionKind::Stream {
1596 bound: BoundKind::Bounded,
1597 ..
1598 } | CollectionKind::Singleton {
1599 bound: BoundKind::Bounded,
1600 ..
1601 } | CollectionKind::Optional {
1602 bound: BoundKind::Bounded,
1603 ..
1604 } | CollectionKind::KeyedStream {
1605 bound: BoundKind::Bounded,
1606 ..
1607 } | CollectionKind::KeyedSingleton {
1608 bound: KeyedSingletonBoundKind::Bounded,
1609 ..
1610 }
1611 )
1612 }
1613}
1614
1615#[derive(Clone)]
1616pub struct HydroIrMetadata {
1617 pub location_id: LocationId,
1618 pub collection_kind: CollectionKind,
1619 pub cardinality: Option<usize>,
1620 pub tag: Option<String>,
1621 pub op: HydroIrOpMetadata,
1622}
1623
1624impl Hash for HydroIrMetadata {
1626 fn hash<H: Hasher>(&self, _: &mut H) {}
1627}
1628
1629impl PartialEq for HydroIrMetadata {
1630 fn eq(&self, _: &Self) -> bool {
1631 true
1632 }
1633}
1634
1635impl Eq for HydroIrMetadata {}
1636
1637impl Debug for HydroIrMetadata {
1638 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1639 f.debug_struct("HydroIrMetadata")
1640 .field("location_id", &self.location_id)
1641 .field("collection_kind", &self.collection_kind)
1642 .finish()
1643 }
1644}
1645
1646#[derive(Clone)]
1649pub struct HydroIrOpMetadata {
1650 pub backtrace: Backtrace,
1651 pub cpu_usage: Option<f64>,
1652 pub network_recv_cpu_usage: Option<f64>,
1653 pub id: Option<usize>,
1654}
1655
1656impl HydroIrOpMetadata {
1657 #[expect(
1658 clippy::new_without_default,
1659 reason = "explicit calls to new ensure correct backtrace bounds"
1660 )]
1661 pub fn new() -> HydroIrOpMetadata {
1662 Self::new_with_skip(1)
1663 }
1664
1665 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1666 HydroIrOpMetadata {
1667 backtrace: Backtrace::get_backtrace(2 + skip_count),
1668 cpu_usage: None,
1669 network_recv_cpu_usage: None,
1670 id: None,
1671 }
1672 }
1673}
1674
1675impl Debug for HydroIrOpMetadata {
1676 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1677 f.debug_struct("HydroIrOpMetadata").finish()
1678 }
1679}
1680
1681impl Hash for HydroIrOpMetadata {
1682 fn hash<H: Hasher>(&self, _: &mut H) {}
1683}
1684
1685#[derive(Debug, Hash)]
1688pub enum HydroNode {
1689 Placeholder,
1690
1691 Cast {
1699 inner: Box<HydroNode>,
1700 metadata: HydroIrMetadata,
1701 },
1702
1703 ObserveNonDet {
1709 inner: Box<HydroNode>,
1710 trusted: bool, metadata: HydroIrMetadata,
1712 },
1713
1714 Source {
1715 source: HydroSource,
1716 metadata: HydroIrMetadata,
1717 },
1718
1719 SingletonSource {
1720 value: DebugExpr,
1721 first_tick_only: bool,
1722 metadata: HydroIrMetadata,
1723 },
1724
1725 CycleSource {
1726 cycle_id: CycleId,
1727 metadata: HydroIrMetadata,
1728 },
1729
1730 Tee {
1731 inner: SharedNode,
1732 metadata: HydroIrMetadata,
1733 },
1734
1735 Partition {
1736 inner: SharedNode,
1737 f: DebugExpr,
1738 is_true: bool,
1739 metadata: HydroIrMetadata,
1740 },
1741
1742 BeginAtomic {
1743 inner: Box<HydroNode>,
1744 metadata: HydroIrMetadata,
1745 },
1746
1747 EndAtomic {
1748 inner: Box<HydroNode>,
1749 metadata: HydroIrMetadata,
1750 },
1751
1752 Batch {
1753 inner: Box<HydroNode>,
1754 metadata: HydroIrMetadata,
1755 },
1756
1757 YieldConcat {
1758 inner: Box<HydroNode>,
1759 metadata: HydroIrMetadata,
1760 },
1761
1762 Chain {
1763 first: Box<HydroNode>,
1764 second: Box<HydroNode>,
1765 metadata: HydroIrMetadata,
1766 },
1767
1768 ChainFirst {
1769 first: Box<HydroNode>,
1770 second: Box<HydroNode>,
1771 metadata: HydroIrMetadata,
1772 },
1773
1774 CrossProduct {
1775 left: Box<HydroNode>,
1776 right: Box<HydroNode>,
1777 metadata: HydroIrMetadata,
1778 },
1779
1780 CrossSingleton {
1781 left: Box<HydroNode>,
1782 right: Box<HydroNode>,
1783 metadata: HydroIrMetadata,
1784 },
1785
1786 Join {
1787 left: Box<HydroNode>,
1788 right: Box<HydroNode>,
1789 metadata: HydroIrMetadata,
1790 },
1791
1792 Difference {
1793 pos: Box<HydroNode>,
1794 neg: Box<HydroNode>,
1795 metadata: HydroIrMetadata,
1796 },
1797
1798 AntiJoin {
1799 pos: Box<HydroNode>,
1800 neg: Box<HydroNode>,
1801 metadata: HydroIrMetadata,
1802 },
1803
1804 ResolveFutures {
1805 input: Box<HydroNode>,
1806 metadata: HydroIrMetadata,
1807 },
1808 ResolveFuturesOrdered {
1809 input: Box<HydroNode>,
1810 metadata: HydroIrMetadata,
1811 },
1812
1813 Map {
1814 f: DebugExpr,
1815 input: Box<HydroNode>,
1816 metadata: HydroIrMetadata,
1817 },
1818 FlatMap {
1819 f: DebugExpr,
1820 input: Box<HydroNode>,
1821 metadata: HydroIrMetadata,
1822 },
1823 Filter {
1824 f: DebugExpr,
1825 input: Box<HydroNode>,
1826 metadata: HydroIrMetadata,
1827 },
1828 FilterMap {
1829 f: DebugExpr,
1830 input: Box<HydroNode>,
1831 metadata: HydroIrMetadata,
1832 },
1833
1834 DeferTick {
1835 input: Box<HydroNode>,
1836 metadata: HydroIrMetadata,
1837 },
1838 Enumerate {
1839 input: Box<HydroNode>,
1840 metadata: HydroIrMetadata,
1841 },
1842 Inspect {
1843 f: DebugExpr,
1844 input: Box<HydroNode>,
1845 metadata: HydroIrMetadata,
1846 },
1847
1848 Unique {
1849 input: Box<HydroNode>,
1850 metadata: HydroIrMetadata,
1851 },
1852
1853 Sort {
1854 input: Box<HydroNode>,
1855 metadata: HydroIrMetadata,
1856 },
1857 Fold {
1858 init: DebugExpr,
1859 acc: DebugExpr,
1860 input: Box<HydroNode>,
1861 metadata: HydroIrMetadata,
1862 },
1863
1864 Scan {
1865 init: DebugExpr,
1866 acc: DebugExpr,
1867 input: Box<HydroNode>,
1868 metadata: HydroIrMetadata,
1869 },
1870 FoldKeyed {
1871 init: DebugExpr,
1872 acc: DebugExpr,
1873 input: Box<HydroNode>,
1874 metadata: HydroIrMetadata,
1875 },
1876
1877 Reduce {
1878 f: DebugExpr,
1879 input: Box<HydroNode>,
1880 metadata: HydroIrMetadata,
1881 },
1882 ReduceKeyed {
1883 f: DebugExpr,
1884 input: Box<HydroNode>,
1885 metadata: HydroIrMetadata,
1886 },
1887 ReduceKeyedWatermark {
1888 f: DebugExpr,
1889 input: Box<HydroNode>,
1890 watermark: Box<HydroNode>,
1891 metadata: HydroIrMetadata,
1892 },
1893
1894 Network {
1895 name: Option<String>,
1896 networking_info: crate::networking::NetworkingInfo,
1897 serialize_fn: Option<DebugExpr>,
1898 instantiate_fn: DebugInstantiate,
1899 deserialize_fn: Option<DebugExpr>,
1900 input: Box<HydroNode>,
1901 metadata: HydroIrMetadata,
1902 },
1903
1904 ExternalInput {
1905 from_external_key: LocationKey,
1906 from_port_id: ExternalPortId,
1907 from_many: bool,
1908 codec_type: DebugType,
1909 port_hint: NetworkHint,
1910 instantiate_fn: DebugInstantiate,
1911 deserialize_fn: Option<DebugExpr>,
1912 metadata: HydroIrMetadata,
1913 },
1914
1915 Counter {
1916 tag: String,
1917 duration: DebugExpr,
1918 prefix: String,
1919 input: Box<HydroNode>,
1920 metadata: HydroIrMetadata,
1921 },
1922}
1923
1924pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1925pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1926
1927impl HydroNode {
1928 pub fn transform_bottom_up(
1929 &mut self,
1930 transform: &mut impl FnMut(&mut HydroNode),
1931 seen_tees: &mut SeenSharedNodes,
1932 check_well_formed: bool,
1933 ) {
1934 self.transform_children(
1935 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1936 seen_tees,
1937 );
1938
1939 transform(self);
1940
1941 let self_location = self.metadata().location_id.root();
1942
1943 if check_well_formed {
1944 match &*self {
1945 HydroNode::Network { .. } => {}
1946 _ => {
1947 self.input_metadata().iter().for_each(|i| {
1948 if i.location_id.root() != self_location {
1949 panic!(
1950 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1951 i,
1952 i.location_id.root(),
1953 self,
1954 self_location
1955 )
1956 }
1957 });
1958 }
1959 }
1960 }
1961 }
1962
1963 #[inline(always)]
1964 pub fn transform_children(
1965 &mut self,
1966 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1967 seen_tees: &mut SeenSharedNodes,
1968 ) {
1969 match self {
1970 HydroNode::Placeholder => {
1971 panic!();
1972 }
1973
1974 HydroNode::Source { .. }
1975 | HydroNode::SingletonSource { .. }
1976 | HydroNode::CycleSource { .. }
1977 | HydroNode::ExternalInput { .. } => {}
1978
1979 HydroNode::Tee { inner, .. } => {
1980 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1981 *inner = SharedNode(transformed.clone());
1982 } else {
1983 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1984 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1985 let mut orig = inner.0.replace(HydroNode::Placeholder);
1986 transform(&mut orig, seen_tees);
1987 *transformed_cell.borrow_mut() = orig;
1988 *inner = SharedNode(transformed_cell);
1989 }
1990 }
1991
1992 HydroNode::Partition { inner, .. } => {
1993 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1994 *inner = SharedNode(transformed.clone());
1995 } else {
1996 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1997 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1998 let mut orig = inner.0.replace(HydroNode::Placeholder);
1999 transform(&mut orig, seen_tees);
2000 *transformed_cell.borrow_mut() = orig;
2001 *inner = SharedNode(transformed_cell);
2002 }
2003 }
2004
2005 HydroNode::Cast { inner, .. }
2006 | HydroNode::ObserveNonDet { inner, .. }
2007 | HydroNode::BeginAtomic { inner, .. }
2008 | HydroNode::EndAtomic { inner, .. }
2009 | HydroNode::Batch { inner, .. }
2010 | HydroNode::YieldConcat { inner, .. } => {
2011 transform(inner.as_mut(), seen_tees);
2012 }
2013
2014 HydroNode::Chain { first, second, .. } => {
2015 transform(first.as_mut(), seen_tees);
2016 transform(second.as_mut(), seen_tees);
2017 }
2018
2019 HydroNode::ChainFirst { first, second, .. } => {
2020 transform(first.as_mut(), seen_tees);
2021 transform(second.as_mut(), seen_tees);
2022 }
2023
2024 HydroNode::CrossSingleton { left, right, .. }
2025 | HydroNode::CrossProduct { left, right, .. }
2026 | HydroNode::Join { left, right, .. } => {
2027 transform(left.as_mut(), seen_tees);
2028 transform(right.as_mut(), seen_tees);
2029 }
2030
2031 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2032 transform(pos.as_mut(), seen_tees);
2033 transform(neg.as_mut(), seen_tees);
2034 }
2035
2036 HydroNode::ReduceKeyedWatermark {
2037 input, watermark, ..
2038 } => {
2039 transform(input.as_mut(), seen_tees);
2040 transform(watermark.as_mut(), seen_tees);
2041 }
2042
2043 HydroNode::Map { input, .. }
2044 | HydroNode::ResolveFutures { input, .. }
2045 | HydroNode::ResolveFuturesOrdered { input, .. }
2046 | HydroNode::FlatMap { input, .. }
2047 | HydroNode::Filter { input, .. }
2048 | HydroNode::FilterMap { input, .. }
2049 | HydroNode::Sort { input, .. }
2050 | HydroNode::DeferTick { input, .. }
2051 | HydroNode::Enumerate { input, .. }
2052 | HydroNode::Inspect { input, .. }
2053 | HydroNode::Unique { input, .. }
2054 | HydroNode::Network { input, .. }
2055 | HydroNode::Fold { input, .. }
2056 | HydroNode::Scan { input, .. }
2057 | HydroNode::FoldKeyed { input, .. }
2058 | HydroNode::Reduce { input, .. }
2059 | HydroNode::ReduceKeyed { input, .. }
2060 | HydroNode::Counter { input, .. } => {
2061 transform(input.as_mut(), seen_tees);
2062 }
2063 }
2064 }
2065
2066 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2067 match self {
2068 HydroNode::Placeholder => HydroNode::Placeholder,
2069 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2070 inner: Box::new(inner.deep_clone(seen_tees)),
2071 metadata: metadata.clone(),
2072 },
2073 HydroNode::ObserveNonDet {
2074 inner,
2075 trusted,
2076 metadata,
2077 } => HydroNode::ObserveNonDet {
2078 inner: Box::new(inner.deep_clone(seen_tees)),
2079 trusted: *trusted,
2080 metadata: metadata.clone(),
2081 },
2082 HydroNode::Source { source, metadata } => HydroNode::Source {
2083 source: source.clone(),
2084 metadata: metadata.clone(),
2085 },
2086 HydroNode::SingletonSource {
2087 value,
2088 first_tick_only,
2089 metadata,
2090 } => HydroNode::SingletonSource {
2091 value: value.clone(),
2092 first_tick_only: *first_tick_only,
2093 metadata: metadata.clone(),
2094 },
2095 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2096 cycle_id: *cycle_id,
2097 metadata: metadata.clone(),
2098 },
2099 HydroNode::Tee { inner, metadata } => {
2100 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2101 HydroNode::Tee {
2102 inner: SharedNode(transformed.clone()),
2103 metadata: metadata.clone(),
2104 }
2105 } else {
2106 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2107 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2108 let cloned = inner.0.borrow().deep_clone(seen_tees);
2109 *new_rc.borrow_mut() = cloned;
2110 HydroNode::Tee {
2111 inner: SharedNode(new_rc),
2112 metadata: metadata.clone(),
2113 }
2114 }
2115 }
2116 HydroNode::Partition {
2117 inner,
2118 f,
2119 is_true,
2120 metadata,
2121 } => {
2122 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2123 HydroNode::Partition {
2124 inner: SharedNode(transformed.clone()),
2125 f: f.clone(),
2126 is_true: *is_true,
2127 metadata: metadata.clone(),
2128 }
2129 } else {
2130 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2131 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2132 let cloned = inner.0.borrow().deep_clone(seen_tees);
2133 *new_rc.borrow_mut() = cloned;
2134 HydroNode::Partition {
2135 inner: SharedNode(new_rc),
2136 f: f.clone(),
2137 is_true: *is_true,
2138 metadata: metadata.clone(),
2139 }
2140 }
2141 }
2142 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2143 inner: Box::new(inner.deep_clone(seen_tees)),
2144 metadata: metadata.clone(),
2145 },
2146 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2147 inner: Box::new(inner.deep_clone(seen_tees)),
2148 metadata: metadata.clone(),
2149 },
2150 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2151 inner: Box::new(inner.deep_clone(seen_tees)),
2152 metadata: metadata.clone(),
2153 },
2154 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2155 inner: Box::new(inner.deep_clone(seen_tees)),
2156 metadata: metadata.clone(),
2157 },
2158 HydroNode::Chain {
2159 first,
2160 second,
2161 metadata,
2162 } => HydroNode::Chain {
2163 first: Box::new(first.deep_clone(seen_tees)),
2164 second: Box::new(second.deep_clone(seen_tees)),
2165 metadata: metadata.clone(),
2166 },
2167 HydroNode::ChainFirst {
2168 first,
2169 second,
2170 metadata,
2171 } => HydroNode::ChainFirst {
2172 first: Box::new(first.deep_clone(seen_tees)),
2173 second: Box::new(second.deep_clone(seen_tees)),
2174 metadata: metadata.clone(),
2175 },
2176 HydroNode::CrossProduct {
2177 left,
2178 right,
2179 metadata,
2180 } => HydroNode::CrossProduct {
2181 left: Box::new(left.deep_clone(seen_tees)),
2182 right: Box::new(right.deep_clone(seen_tees)),
2183 metadata: metadata.clone(),
2184 },
2185 HydroNode::CrossSingleton {
2186 left,
2187 right,
2188 metadata,
2189 } => HydroNode::CrossSingleton {
2190 left: Box::new(left.deep_clone(seen_tees)),
2191 right: Box::new(right.deep_clone(seen_tees)),
2192 metadata: metadata.clone(),
2193 },
2194 HydroNode::Join {
2195 left,
2196 right,
2197 metadata,
2198 } => HydroNode::Join {
2199 left: Box::new(left.deep_clone(seen_tees)),
2200 right: Box::new(right.deep_clone(seen_tees)),
2201 metadata: metadata.clone(),
2202 },
2203 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2204 pos: Box::new(pos.deep_clone(seen_tees)),
2205 neg: Box::new(neg.deep_clone(seen_tees)),
2206 metadata: metadata.clone(),
2207 },
2208 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2209 pos: Box::new(pos.deep_clone(seen_tees)),
2210 neg: Box::new(neg.deep_clone(seen_tees)),
2211 metadata: metadata.clone(),
2212 },
2213 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2214 input: Box::new(input.deep_clone(seen_tees)),
2215 metadata: metadata.clone(),
2216 },
2217 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2218 HydroNode::ResolveFuturesOrdered {
2219 input: Box::new(input.deep_clone(seen_tees)),
2220 metadata: metadata.clone(),
2221 }
2222 }
2223 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2224 f: f.clone(),
2225 input: Box::new(input.deep_clone(seen_tees)),
2226 metadata: metadata.clone(),
2227 },
2228 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2229 f: f.clone(),
2230 input: Box::new(input.deep_clone(seen_tees)),
2231 metadata: metadata.clone(),
2232 },
2233 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2234 f: f.clone(),
2235 input: Box::new(input.deep_clone(seen_tees)),
2236 metadata: metadata.clone(),
2237 },
2238 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2239 f: f.clone(),
2240 input: Box::new(input.deep_clone(seen_tees)),
2241 metadata: metadata.clone(),
2242 },
2243 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2244 input: Box::new(input.deep_clone(seen_tees)),
2245 metadata: metadata.clone(),
2246 },
2247 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2248 input: Box::new(input.deep_clone(seen_tees)),
2249 metadata: metadata.clone(),
2250 },
2251 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2252 f: f.clone(),
2253 input: Box::new(input.deep_clone(seen_tees)),
2254 metadata: metadata.clone(),
2255 },
2256 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2257 input: Box::new(input.deep_clone(seen_tees)),
2258 metadata: metadata.clone(),
2259 },
2260 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2261 input: Box::new(input.deep_clone(seen_tees)),
2262 metadata: metadata.clone(),
2263 },
2264 HydroNode::Fold {
2265 init,
2266 acc,
2267 input,
2268 metadata,
2269 } => HydroNode::Fold {
2270 init: init.clone(),
2271 acc: acc.clone(),
2272 input: Box::new(input.deep_clone(seen_tees)),
2273 metadata: metadata.clone(),
2274 },
2275 HydroNode::Scan {
2276 init,
2277 acc,
2278 input,
2279 metadata,
2280 } => HydroNode::Scan {
2281 init: init.clone(),
2282 acc: acc.clone(),
2283 input: Box::new(input.deep_clone(seen_tees)),
2284 metadata: metadata.clone(),
2285 },
2286 HydroNode::FoldKeyed {
2287 init,
2288 acc,
2289 input,
2290 metadata,
2291 } => HydroNode::FoldKeyed {
2292 init: init.clone(),
2293 acc: acc.clone(),
2294 input: Box::new(input.deep_clone(seen_tees)),
2295 metadata: metadata.clone(),
2296 },
2297 HydroNode::ReduceKeyedWatermark {
2298 f,
2299 input,
2300 watermark,
2301 metadata,
2302 } => HydroNode::ReduceKeyedWatermark {
2303 f: f.clone(),
2304 input: Box::new(input.deep_clone(seen_tees)),
2305 watermark: Box::new(watermark.deep_clone(seen_tees)),
2306 metadata: metadata.clone(),
2307 },
2308 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2309 f: f.clone(),
2310 input: Box::new(input.deep_clone(seen_tees)),
2311 metadata: metadata.clone(),
2312 },
2313 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2314 f: f.clone(),
2315 input: Box::new(input.deep_clone(seen_tees)),
2316 metadata: metadata.clone(),
2317 },
2318 HydroNode::Network {
2319 name,
2320 networking_info,
2321 serialize_fn,
2322 instantiate_fn,
2323 deserialize_fn,
2324 input,
2325 metadata,
2326 } => HydroNode::Network {
2327 name: name.clone(),
2328 networking_info: networking_info.clone(),
2329 serialize_fn: serialize_fn.clone(),
2330 instantiate_fn: instantiate_fn.clone(),
2331 deserialize_fn: deserialize_fn.clone(),
2332 input: Box::new(input.deep_clone(seen_tees)),
2333 metadata: metadata.clone(),
2334 },
2335 HydroNode::ExternalInput {
2336 from_external_key,
2337 from_port_id,
2338 from_many,
2339 codec_type,
2340 port_hint,
2341 instantiate_fn,
2342 deserialize_fn,
2343 metadata,
2344 } => HydroNode::ExternalInput {
2345 from_external_key: *from_external_key,
2346 from_port_id: *from_port_id,
2347 from_many: *from_many,
2348 codec_type: codec_type.clone(),
2349 port_hint: *port_hint,
2350 instantiate_fn: instantiate_fn.clone(),
2351 deserialize_fn: deserialize_fn.clone(),
2352 metadata: metadata.clone(),
2353 },
2354 HydroNode::Counter {
2355 tag,
2356 duration,
2357 prefix,
2358 input,
2359 metadata,
2360 } => HydroNode::Counter {
2361 tag: tag.clone(),
2362 duration: duration.clone(),
2363 prefix: prefix.clone(),
2364 input: Box::new(input.deep_clone(seen_tees)),
2365 metadata: metadata.clone(),
2366 },
2367 }
2368 }
2369
2370 #[cfg(feature = "build")]
2371 pub fn emit_core(
2372 &mut self,
2373 builders_or_callback: &mut BuildersOrCallback<
2374 impl FnMut(&mut HydroRoot, &mut usize),
2375 impl FnMut(&mut HydroNode, &mut usize),
2376 >,
2377 seen_tees: &mut SeenSharedNodes,
2378 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2379 next_stmt_id: &mut usize,
2380 ) -> syn::Ident {
2381 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2382
2383 self.transform_bottom_up(
2384 &mut |node: &mut HydroNode| {
2385 let out_location = node.metadata().location_id.clone();
2386 match node {
2387 HydroNode::Placeholder => {
2388 panic!()
2389 }
2390
2391 HydroNode::Cast { .. } => {
2392 match builders_or_callback {
2395 BuildersOrCallback::Builders(_) => {}
2396 BuildersOrCallback::Callback(_, node_callback) => {
2397 node_callback(node, next_stmt_id);
2398 }
2399 }
2400
2401 *next_stmt_id += 1;
2402 }
2404
2405 HydroNode::ObserveNonDet {
2406 inner,
2407 trusted,
2408 metadata,
2409 ..
2410 } => {
2411 let inner_ident = ident_stack.pop().unwrap();
2412
2413 let observe_ident =
2414 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2415
2416 match builders_or_callback {
2417 BuildersOrCallback::Builders(graph_builders) => {
2418 graph_builders.observe_nondet(
2419 *trusted,
2420 &inner.metadata().location_id,
2421 inner_ident,
2422 &inner.metadata().collection_kind,
2423 &observe_ident,
2424 &metadata.collection_kind,
2425 &metadata.op,
2426 );
2427 }
2428 BuildersOrCallback::Callback(_, node_callback) => {
2429 node_callback(node, next_stmt_id);
2430 }
2431 }
2432
2433 *next_stmt_id += 1;
2434
2435 ident_stack.push(observe_ident);
2436 }
2437
2438 HydroNode::Batch {
2439 inner, metadata, ..
2440 } => {
2441 let inner_ident = ident_stack.pop().unwrap();
2442
2443 let batch_ident =
2444 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2445
2446 match builders_or_callback {
2447 BuildersOrCallback::Builders(graph_builders) => {
2448 graph_builders.batch(
2449 inner_ident,
2450 &inner.metadata().location_id,
2451 &inner.metadata().collection_kind,
2452 &batch_ident,
2453 &out_location,
2454 &metadata.op,
2455 );
2456 }
2457 BuildersOrCallback::Callback(_, node_callback) => {
2458 node_callback(node, next_stmt_id);
2459 }
2460 }
2461
2462 *next_stmt_id += 1;
2463
2464 ident_stack.push(batch_ident);
2465 }
2466
2467 HydroNode::YieldConcat { inner, .. } => {
2468 let inner_ident = ident_stack.pop().unwrap();
2469
2470 let yield_ident =
2471 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2472
2473 match builders_or_callback {
2474 BuildersOrCallback::Builders(graph_builders) => {
2475 graph_builders.yield_from_tick(
2476 inner_ident,
2477 &inner.metadata().location_id,
2478 &inner.metadata().collection_kind,
2479 &yield_ident,
2480 &out_location,
2481 );
2482 }
2483 BuildersOrCallback::Callback(_, node_callback) => {
2484 node_callback(node, next_stmt_id);
2485 }
2486 }
2487
2488 *next_stmt_id += 1;
2489
2490 ident_stack.push(yield_ident);
2491 }
2492
2493 HydroNode::BeginAtomic { inner, metadata } => {
2494 let inner_ident = ident_stack.pop().unwrap();
2495
2496 let begin_ident =
2497 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2498
2499 match builders_or_callback {
2500 BuildersOrCallback::Builders(graph_builders) => {
2501 graph_builders.begin_atomic(
2502 inner_ident,
2503 &inner.metadata().location_id,
2504 &inner.metadata().collection_kind,
2505 &begin_ident,
2506 &out_location,
2507 &metadata.op,
2508 );
2509 }
2510 BuildersOrCallback::Callback(_, node_callback) => {
2511 node_callback(node, next_stmt_id);
2512 }
2513 }
2514
2515 *next_stmt_id += 1;
2516
2517 ident_stack.push(begin_ident);
2518 }
2519
2520 HydroNode::EndAtomic { inner, .. } => {
2521 let inner_ident = ident_stack.pop().unwrap();
2522
2523 let end_ident =
2524 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2525
2526 match builders_or_callback {
2527 BuildersOrCallback::Builders(graph_builders) => {
2528 graph_builders.end_atomic(
2529 inner_ident,
2530 &inner.metadata().location_id,
2531 &inner.metadata().collection_kind,
2532 &end_ident,
2533 );
2534 }
2535 BuildersOrCallback::Callback(_, node_callback) => {
2536 node_callback(node, next_stmt_id);
2537 }
2538 }
2539
2540 *next_stmt_id += 1;
2541
2542 ident_stack.push(end_ident);
2543 }
2544
2545 HydroNode::Source {
2546 source, metadata, ..
2547 } => {
2548 if let HydroSource::ExternalNetwork() = source {
2549 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2550 } else {
2551 let source_ident =
2552 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2553
2554 let source_stmt = match source {
2555 HydroSource::Stream(expr) => {
2556 debug_assert!(metadata.location_id.is_top_level());
2557 parse_quote! {
2558 #source_ident = source_stream(#expr);
2559 }
2560 }
2561
2562 HydroSource::ExternalNetwork() => {
2563 unreachable!()
2564 }
2565
2566 HydroSource::Iter(expr) => {
2567 if metadata.location_id.is_top_level() {
2568 parse_quote! {
2569 #source_ident = source_iter(#expr);
2570 }
2571 } else {
2572 parse_quote! {
2574 #source_ident = source_iter(#expr) -> persist::<'static>();
2575 }
2576 }
2577 }
2578
2579 HydroSource::Spin() => {
2580 debug_assert!(metadata.location_id.is_top_level());
2581 parse_quote! {
2582 #source_ident = spin();
2583 }
2584 }
2585
2586 HydroSource::ClusterMembers(target_loc, state) => {
2587 debug_assert!(metadata.location_id.is_top_level());
2588
2589 let members_tee_ident = syn::Ident::new(
2590 &format!(
2591 "__cluster_members_tee_{}_{}",
2592 metadata.location_id.root().key(),
2593 target_loc.key(),
2594 ),
2595 Span::call_site(),
2596 );
2597
2598 match state {
2599 ClusterMembersState::Stream(d) => {
2600 parse_quote! {
2601 #members_tee_ident = source_stream(#d) -> tee();
2602 #source_ident = #members_tee_ident;
2603 }
2604 },
2605 ClusterMembersState::Uninit => syn::parse_quote! {
2606 #source_ident = source_stream(DUMMY);
2607 },
2608 ClusterMembersState::Tee(..) => parse_quote! {
2609 #source_ident = #members_tee_ident;
2610 },
2611 }
2612 }
2613
2614 HydroSource::Embedded(ident) => {
2615 parse_quote! {
2616 #source_ident = source_stream(#ident);
2617 }
2618 }
2619
2620 HydroSource::EmbeddedSingleton(ident) => {
2621 parse_quote! {
2622 #source_ident = source_iter([#ident]);
2623 }
2624 }
2625 };
2626
2627 match builders_or_callback {
2628 BuildersOrCallback::Builders(graph_builders) => {
2629 let builder = graph_builders.get_dfir_mut(&out_location);
2630 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2631 }
2632 BuildersOrCallback::Callback(_, node_callback) => {
2633 node_callback(node, next_stmt_id);
2634 }
2635 }
2636
2637 *next_stmt_id += 1;
2638
2639 ident_stack.push(source_ident);
2640 }
2641 }
2642
2643 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2644 let source_ident =
2645 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2646
2647 match builders_or_callback {
2648 BuildersOrCallback::Builders(graph_builders) => {
2649 let builder = graph_builders.get_dfir_mut(&out_location);
2650
2651 if *first_tick_only {
2652 assert!(
2653 !metadata.location_id.is_top_level(),
2654 "first_tick_only SingletonSource must be inside a tick"
2655 );
2656 }
2657
2658 if *first_tick_only
2659 || (metadata.location_id.is_top_level()
2660 && metadata.collection_kind.is_bounded())
2661 {
2662 builder.add_dfir(
2663 parse_quote! {
2664 #source_ident = source_iter([#value]);
2665 },
2666 None,
2667 Some(&next_stmt_id.to_string()),
2668 );
2669 } else {
2670 builder.add_dfir(
2671 parse_quote! {
2672 #source_ident = source_iter([#value]) -> persist::<'static>();
2673 },
2674 None,
2675 Some(&next_stmt_id.to_string()),
2676 );
2677 }
2678 }
2679 BuildersOrCallback::Callback(_, node_callback) => {
2680 node_callback(node, next_stmt_id);
2681 }
2682 }
2683
2684 *next_stmt_id += 1;
2685
2686 ident_stack.push(source_ident);
2687 }
2688
2689 HydroNode::CycleSource { cycle_id, .. } => {
2690 let ident = cycle_id.as_ident();
2691
2692 match builders_or_callback {
2693 BuildersOrCallback::Builders(_) => {}
2694 BuildersOrCallback::Callback(_, node_callback) => {
2695 node_callback(node, next_stmt_id);
2696 }
2697 }
2698
2699 *next_stmt_id += 1;
2701
2702 ident_stack.push(ident);
2703 }
2704
2705 HydroNode::Tee { inner, .. } => {
2706 let ret_ident = if let Some(built_idents) =
2707 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2708 {
2709 match builders_or_callback {
2710 BuildersOrCallback::Builders(_) => {}
2711 BuildersOrCallback::Callback(_, node_callback) => {
2712 node_callback(node, next_stmt_id);
2713 }
2714 }
2715
2716 built_idents[0].clone()
2717 } else {
2718 let inner_ident = ident_stack.pop().unwrap();
2721
2722 let tee_ident =
2723 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2724
2725 built_tees.insert(
2726 inner.0.as_ref() as *const RefCell<HydroNode>,
2727 vec![tee_ident.clone()],
2728 );
2729
2730 match builders_or_callback {
2731 BuildersOrCallback::Builders(graph_builders) => {
2732 let builder = graph_builders.get_dfir_mut(&out_location);
2733 builder.add_dfir(
2734 parse_quote! {
2735 #tee_ident = #inner_ident -> tee();
2736 },
2737 None,
2738 Some(&next_stmt_id.to_string()),
2739 );
2740 }
2741 BuildersOrCallback::Callback(_, node_callback) => {
2742 node_callback(node, next_stmt_id);
2743 }
2744 }
2745
2746 tee_ident
2747 };
2748
2749 *next_stmt_id += 1;
2753 ident_stack.push(ret_ident);
2754 }
2755
2756 HydroNode::Partition {
2757 inner, f, is_true, ..
2758 } => {
2759 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2761 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2762 match builders_or_callback {
2763 BuildersOrCallback::Builders(_) => {}
2764 BuildersOrCallback::Callback(_, node_callback) => {
2765 node_callback(node, next_stmt_id);
2766 }
2767 }
2768
2769 let idx = if is_true { 0 } else { 1 };
2770 built_idents[idx].clone()
2771 } else {
2772 let inner_ident = ident_stack.pop().unwrap();
2775
2776 let partition_ident = syn::Ident::new(
2777 &format!("stream_{}_partition", *next_stmt_id),
2778 Span::call_site(),
2779 );
2780 let true_ident = syn::Ident::new(
2781 &format!("stream_{}_true", *next_stmt_id),
2782 Span::call_site(),
2783 );
2784 let false_ident = syn::Ident::new(
2785 &format!("stream_{}_false", *next_stmt_id),
2786 Span::call_site(),
2787 );
2788
2789 built_tees.insert(
2790 ptr,
2791 vec![true_ident.clone(), false_ident.clone()],
2792 );
2793
2794 match builders_or_callback {
2795 BuildersOrCallback::Builders(graph_builders) => {
2796 let builder = graph_builders.get_dfir_mut(&out_location);
2797 builder.add_dfir(
2798 parse_quote! {
2799 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2800 #true_ident = #partition_ident[0];
2801 #false_ident = #partition_ident[1];
2802 },
2803 None,
2804 Some(&next_stmt_id.to_string()),
2805 );
2806 }
2807 BuildersOrCallback::Callback(_, node_callback) => {
2808 node_callback(node, next_stmt_id);
2809 }
2810 }
2811
2812 if is_true { true_ident } else { false_ident }
2813 };
2814
2815 *next_stmt_id += 1;
2816 ident_stack.push(ret_ident);
2817 }
2818
2819 HydroNode::Chain { .. } => {
2820 let second_ident = ident_stack.pop().unwrap();
2822 let first_ident = ident_stack.pop().unwrap();
2823
2824 let chain_ident =
2825 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2826
2827 match builders_or_callback {
2828 BuildersOrCallback::Builders(graph_builders) => {
2829 let builder = graph_builders.get_dfir_mut(&out_location);
2830 builder.add_dfir(
2831 parse_quote! {
2832 #chain_ident = chain();
2833 #first_ident -> [0]#chain_ident;
2834 #second_ident -> [1]#chain_ident;
2835 },
2836 None,
2837 Some(&next_stmt_id.to_string()),
2838 );
2839 }
2840 BuildersOrCallback::Callback(_, node_callback) => {
2841 node_callback(node, next_stmt_id);
2842 }
2843 }
2844
2845 *next_stmt_id += 1;
2846
2847 ident_stack.push(chain_ident);
2848 }
2849
2850 HydroNode::ChainFirst { .. } => {
2851 let second_ident = ident_stack.pop().unwrap();
2852 let first_ident = ident_stack.pop().unwrap();
2853
2854 let chain_ident =
2855 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2856
2857 match builders_or_callback {
2858 BuildersOrCallback::Builders(graph_builders) => {
2859 let builder = graph_builders.get_dfir_mut(&out_location);
2860 builder.add_dfir(
2861 parse_quote! {
2862 #chain_ident = chain_first_n(1);
2863 #first_ident -> [0]#chain_ident;
2864 #second_ident -> [1]#chain_ident;
2865 },
2866 None,
2867 Some(&next_stmt_id.to_string()),
2868 );
2869 }
2870 BuildersOrCallback::Callback(_, node_callback) => {
2871 node_callback(node, next_stmt_id);
2872 }
2873 }
2874
2875 *next_stmt_id += 1;
2876
2877 ident_stack.push(chain_ident);
2878 }
2879
2880 HydroNode::CrossSingleton { right, .. } => {
2881 let right_ident = ident_stack.pop().unwrap();
2882 let left_ident = ident_stack.pop().unwrap();
2883
2884 let cross_ident =
2885 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2886
2887 match builders_or_callback {
2888 BuildersOrCallback::Builders(graph_builders) => {
2889 let builder = graph_builders.get_dfir_mut(&out_location);
2890
2891 if right.metadata().location_id.is_top_level()
2892 && right.metadata().collection_kind.is_bounded()
2893 {
2894 builder.add_dfir(
2895 parse_quote! {
2896 #cross_ident = cross_singleton();
2897 #left_ident -> [input]#cross_ident;
2898 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2899 },
2900 None,
2901 Some(&next_stmt_id.to_string()),
2902 );
2903 } else {
2904 builder.add_dfir(
2905 parse_quote! {
2906 #cross_ident = cross_singleton();
2907 #left_ident -> [input]#cross_ident;
2908 #right_ident -> [single]#cross_ident;
2909 },
2910 None,
2911 Some(&next_stmt_id.to_string()),
2912 );
2913 }
2914 }
2915 BuildersOrCallback::Callback(_, node_callback) => {
2916 node_callback(node, next_stmt_id);
2917 }
2918 }
2919
2920 *next_stmt_id += 1;
2921
2922 ident_stack.push(cross_ident);
2923 }
2924
2925 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2926 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2927 parse_quote!(cross_join_multiset)
2928 } else {
2929 parse_quote!(join_multiset)
2930 };
2931
2932 let (HydroNode::CrossProduct { left, right, .. }
2933 | HydroNode::Join { left, right, .. }) = node
2934 else {
2935 unreachable!()
2936 };
2937
2938 let is_top_level = left.metadata().location_id.is_top_level()
2939 && right.metadata().location_id.is_top_level();
2940 let left_lifetime = if left.metadata().location_id.is_top_level() {
2941 quote!('static)
2942 } else {
2943 quote!('tick)
2944 };
2945
2946 let right_lifetime = if right.metadata().location_id.is_top_level() {
2947 quote!('static)
2948 } else {
2949 quote!('tick)
2950 };
2951
2952 let right_ident = ident_stack.pop().unwrap();
2953 let left_ident = ident_stack.pop().unwrap();
2954
2955 let stream_ident =
2956 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2957
2958 match builders_or_callback {
2959 BuildersOrCallback::Builders(graph_builders) => {
2960 let builder = graph_builders.get_dfir_mut(&out_location);
2961 builder.add_dfir(
2962 if is_top_level {
2963 parse_quote! {
2966 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2967 #left_ident -> [0]#stream_ident;
2968 #right_ident -> [1]#stream_ident;
2969 }
2970 } else {
2971 parse_quote! {
2972 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2973 #left_ident -> [0]#stream_ident;
2974 #right_ident -> [1]#stream_ident;
2975 }
2976 }
2977 ,
2978 None,
2979 Some(&next_stmt_id.to_string()),
2980 );
2981 }
2982 BuildersOrCallback::Callback(_, node_callback) => {
2983 node_callback(node, next_stmt_id);
2984 }
2985 }
2986
2987 *next_stmt_id += 1;
2988
2989 ident_stack.push(stream_ident);
2990 }
2991
2992 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2993 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2994 parse_quote!(difference)
2995 } else {
2996 parse_quote!(anti_join)
2997 };
2998
2999 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3000 node
3001 else {
3002 unreachable!()
3003 };
3004
3005 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3006 quote!('static)
3007 } else {
3008 quote!('tick)
3009 };
3010
3011 let neg_ident = ident_stack.pop().unwrap();
3012 let pos_ident = ident_stack.pop().unwrap();
3013
3014 let stream_ident =
3015 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3016
3017 match builders_or_callback {
3018 BuildersOrCallback::Builders(graph_builders) => {
3019 let builder = graph_builders.get_dfir_mut(&out_location);
3020 builder.add_dfir(
3021 parse_quote! {
3022 #stream_ident = #operator::<'tick, #neg_lifetime>();
3023 #pos_ident -> [pos]#stream_ident;
3024 #neg_ident -> [neg]#stream_ident;
3025 },
3026 None,
3027 Some(&next_stmt_id.to_string()),
3028 );
3029 }
3030 BuildersOrCallback::Callback(_, node_callback) => {
3031 node_callback(node, next_stmt_id);
3032 }
3033 }
3034
3035 *next_stmt_id += 1;
3036
3037 ident_stack.push(stream_ident);
3038 }
3039
3040 HydroNode::ResolveFutures { .. } => {
3041 let input_ident = ident_stack.pop().unwrap();
3042
3043 let futures_ident =
3044 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3045
3046 match builders_or_callback {
3047 BuildersOrCallback::Builders(graph_builders) => {
3048 let builder = graph_builders.get_dfir_mut(&out_location);
3049 builder.add_dfir(
3050 parse_quote! {
3051 #futures_ident = #input_ident -> resolve_futures();
3052 },
3053 None,
3054 Some(&next_stmt_id.to_string()),
3055 );
3056 }
3057 BuildersOrCallback::Callback(_, node_callback) => {
3058 node_callback(node, next_stmt_id);
3059 }
3060 }
3061
3062 *next_stmt_id += 1;
3063
3064 ident_stack.push(futures_ident);
3065 }
3066
3067 HydroNode::ResolveFuturesOrdered { .. } => {
3068 let input_ident = ident_stack.pop().unwrap();
3069
3070 let futures_ident =
3071 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3072
3073 match builders_or_callback {
3074 BuildersOrCallback::Builders(graph_builders) => {
3075 let builder = graph_builders.get_dfir_mut(&out_location);
3076 builder.add_dfir(
3077 parse_quote! {
3078 #futures_ident = #input_ident -> resolve_futures_ordered();
3079 },
3080 None,
3081 Some(&next_stmt_id.to_string()),
3082 );
3083 }
3084 BuildersOrCallback::Callback(_, node_callback) => {
3085 node_callback(node, next_stmt_id);
3086 }
3087 }
3088
3089 *next_stmt_id += 1;
3090
3091 ident_stack.push(futures_ident);
3092 }
3093
3094 HydroNode::Map { f, .. } => {
3095 let input_ident = ident_stack.pop().unwrap();
3096
3097 let map_ident =
3098 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3099
3100 match builders_or_callback {
3101 BuildersOrCallback::Builders(graph_builders) => {
3102 let builder = graph_builders.get_dfir_mut(&out_location);
3103 builder.add_dfir(
3104 parse_quote! {
3105 #map_ident = #input_ident -> map(#f);
3106 },
3107 None,
3108 Some(&next_stmt_id.to_string()),
3109 );
3110 }
3111 BuildersOrCallback::Callback(_, node_callback) => {
3112 node_callback(node, next_stmt_id);
3113 }
3114 }
3115
3116 *next_stmt_id += 1;
3117
3118 ident_stack.push(map_ident);
3119 }
3120
3121 HydroNode::FlatMap { f, .. } => {
3122 let input_ident = ident_stack.pop().unwrap();
3123
3124 let flat_map_ident =
3125 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3126
3127 match builders_or_callback {
3128 BuildersOrCallback::Builders(graph_builders) => {
3129 let builder = graph_builders.get_dfir_mut(&out_location);
3130 builder.add_dfir(
3131 parse_quote! {
3132 #flat_map_ident = #input_ident -> flat_map(#f);
3133 },
3134 None,
3135 Some(&next_stmt_id.to_string()),
3136 );
3137 }
3138 BuildersOrCallback::Callback(_, node_callback) => {
3139 node_callback(node, next_stmt_id);
3140 }
3141 }
3142
3143 *next_stmt_id += 1;
3144
3145 ident_stack.push(flat_map_ident);
3146 }
3147
3148 HydroNode::Filter { f, .. } => {
3149 let input_ident = ident_stack.pop().unwrap();
3150
3151 let filter_ident =
3152 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3153
3154 match builders_or_callback {
3155 BuildersOrCallback::Builders(graph_builders) => {
3156 let builder = graph_builders.get_dfir_mut(&out_location);
3157 builder.add_dfir(
3158 parse_quote! {
3159 #filter_ident = #input_ident -> filter(#f);
3160 },
3161 None,
3162 Some(&next_stmt_id.to_string()),
3163 );
3164 }
3165 BuildersOrCallback::Callback(_, node_callback) => {
3166 node_callback(node, next_stmt_id);
3167 }
3168 }
3169
3170 *next_stmt_id += 1;
3171
3172 ident_stack.push(filter_ident);
3173 }
3174
3175 HydroNode::FilterMap { f, .. } => {
3176 let input_ident = ident_stack.pop().unwrap();
3177
3178 let filter_map_ident =
3179 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3180
3181 match builders_or_callback {
3182 BuildersOrCallback::Builders(graph_builders) => {
3183 let builder = graph_builders.get_dfir_mut(&out_location);
3184 builder.add_dfir(
3185 parse_quote! {
3186 #filter_map_ident = #input_ident -> filter_map(#f);
3187 },
3188 None,
3189 Some(&next_stmt_id.to_string()),
3190 );
3191 }
3192 BuildersOrCallback::Callback(_, node_callback) => {
3193 node_callback(node, next_stmt_id);
3194 }
3195 }
3196
3197 *next_stmt_id += 1;
3198
3199 ident_stack.push(filter_map_ident);
3200 }
3201
3202 HydroNode::Sort { .. } => {
3203 let input_ident = ident_stack.pop().unwrap();
3204
3205 let sort_ident =
3206 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3207
3208 match builders_or_callback {
3209 BuildersOrCallback::Builders(graph_builders) => {
3210 let builder = graph_builders.get_dfir_mut(&out_location);
3211 builder.add_dfir(
3212 parse_quote! {
3213 #sort_ident = #input_ident -> sort();
3214 },
3215 None,
3216 Some(&next_stmt_id.to_string()),
3217 );
3218 }
3219 BuildersOrCallback::Callback(_, node_callback) => {
3220 node_callback(node, next_stmt_id);
3221 }
3222 }
3223
3224 *next_stmt_id += 1;
3225
3226 ident_stack.push(sort_ident);
3227 }
3228
3229 HydroNode::DeferTick { .. } => {
3230 let input_ident = ident_stack.pop().unwrap();
3231
3232 let defer_tick_ident =
3233 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3234
3235 match builders_or_callback {
3236 BuildersOrCallback::Builders(graph_builders) => {
3237 let builder = graph_builders.get_dfir_mut(&out_location);
3238 builder.add_dfir(
3239 parse_quote! {
3240 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3241 },
3242 None,
3243 Some(&next_stmt_id.to_string()),
3244 );
3245 }
3246 BuildersOrCallback::Callback(_, node_callback) => {
3247 node_callback(node, next_stmt_id);
3248 }
3249 }
3250
3251 *next_stmt_id += 1;
3252
3253 ident_stack.push(defer_tick_ident);
3254 }
3255
3256 HydroNode::Enumerate { input, .. } => {
3257 let input_ident = ident_stack.pop().unwrap();
3258
3259 let enumerate_ident =
3260 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3261
3262 match builders_or_callback {
3263 BuildersOrCallback::Builders(graph_builders) => {
3264 let builder = graph_builders.get_dfir_mut(&out_location);
3265 let lifetime = if input.metadata().location_id.is_top_level() {
3266 quote!('static)
3267 } else {
3268 quote!('tick)
3269 };
3270 builder.add_dfir(
3271 parse_quote! {
3272 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3273 },
3274 None,
3275 Some(&next_stmt_id.to_string()),
3276 );
3277 }
3278 BuildersOrCallback::Callback(_, node_callback) => {
3279 node_callback(node, next_stmt_id);
3280 }
3281 }
3282
3283 *next_stmt_id += 1;
3284
3285 ident_stack.push(enumerate_ident);
3286 }
3287
3288 HydroNode::Inspect { f, .. } => {
3289 let input_ident = ident_stack.pop().unwrap();
3290
3291 let inspect_ident =
3292 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3293
3294 match builders_or_callback {
3295 BuildersOrCallback::Builders(graph_builders) => {
3296 let builder = graph_builders.get_dfir_mut(&out_location);
3297 builder.add_dfir(
3298 parse_quote! {
3299 #inspect_ident = #input_ident -> inspect(#f);
3300 },
3301 None,
3302 Some(&next_stmt_id.to_string()),
3303 );
3304 }
3305 BuildersOrCallback::Callback(_, node_callback) => {
3306 node_callback(node, next_stmt_id);
3307 }
3308 }
3309
3310 *next_stmt_id += 1;
3311
3312 ident_stack.push(inspect_ident);
3313 }
3314
3315 HydroNode::Unique { input, .. } => {
3316 let input_ident = ident_stack.pop().unwrap();
3317
3318 let unique_ident =
3319 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3320
3321 match builders_or_callback {
3322 BuildersOrCallback::Builders(graph_builders) => {
3323 let builder = graph_builders.get_dfir_mut(&out_location);
3324 let lifetime = if input.metadata().location_id.is_top_level() {
3325 quote!('static)
3326 } else {
3327 quote!('tick)
3328 };
3329
3330 builder.add_dfir(
3331 parse_quote! {
3332 #unique_ident = #input_ident -> unique::<#lifetime>();
3333 },
3334 None,
3335 Some(&next_stmt_id.to_string()),
3336 );
3337 }
3338 BuildersOrCallback::Callback(_, node_callback) => {
3339 node_callback(node, next_stmt_id);
3340 }
3341 }
3342
3343 *next_stmt_id += 1;
3344
3345 ident_stack.push(unique_ident);
3346 }
3347
3348 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3349 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3350 if input.metadata().location_id.is_top_level()
3351 && input.metadata().collection_kind.is_bounded()
3352 {
3353 parse_quote!(fold_no_replay)
3354 } else {
3355 parse_quote!(fold)
3356 }
3357 } else if matches!(node, HydroNode::Scan { .. }) {
3358 parse_quote!(scan)
3359 } else if let HydroNode::FoldKeyed { input, .. } = node {
3360 if input.metadata().location_id.is_top_level()
3361 && input.metadata().collection_kind.is_bounded()
3362 {
3363 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3364 } else {
3365 parse_quote!(fold_keyed)
3366 }
3367 } else {
3368 unreachable!()
3369 };
3370
3371 let (HydroNode::Fold { input, .. }
3372 | HydroNode::FoldKeyed { input, .. }
3373 | HydroNode::Scan { input, .. }) = node
3374 else {
3375 unreachable!()
3376 };
3377
3378 let lifetime = if input.metadata().location_id.is_top_level() {
3379 quote!('static)
3380 } else {
3381 quote!('tick)
3382 };
3383
3384 let input_ident = ident_stack.pop().unwrap();
3385
3386 let (HydroNode::Fold { init, acc, .. }
3387 | HydroNode::FoldKeyed { init, acc, .. }
3388 | HydroNode::Scan { init, acc, .. }) = &*node
3389 else {
3390 unreachable!()
3391 };
3392
3393 let fold_ident =
3394 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3395
3396 match builders_or_callback {
3397 BuildersOrCallback::Builders(graph_builders) => {
3398 if matches!(node, HydroNode::Fold { .. })
3399 && node.metadata().location_id.is_top_level()
3400 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3401 && graph_builders.singleton_intermediates()
3402 && !node.metadata().collection_kind.is_bounded()
3403 {
3404 let builder = graph_builders.get_dfir_mut(&out_location);
3405
3406 let acc: syn::Expr = parse_quote!({
3407 let mut __inner = #acc;
3408 move |__state, __value| {
3409 __inner(__state, __value);
3410 Some(__state.clone())
3411 }
3412 });
3413
3414 builder.add_dfir(
3415 parse_quote! {
3416 source_iter([(#init)()]) -> [0]#fold_ident;
3417 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3418 #fold_ident = chain();
3419 },
3420 None,
3421 Some(&next_stmt_id.to_string()),
3422 );
3423 } else if matches!(node, HydroNode::FoldKeyed { .. })
3424 && node.metadata().location_id.is_top_level()
3425 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3426 && graph_builders.singleton_intermediates()
3427 && !node.metadata().collection_kind.is_bounded()
3428 {
3429 let builder = graph_builders.get_dfir_mut(&out_location);
3430
3431 let acc: syn::Expr = parse_quote!({
3432 let mut __init = #init;
3433 let mut __inner = #acc;
3434 move |__state, __kv: (_, _)| {
3435 let __state = __state
3437 .entry(::std::clone::Clone::clone(&__kv.0))
3438 .or_insert_with(|| (__init)());
3439 __inner(__state, __kv.1);
3440 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3441 }
3442 });
3443
3444 builder.add_dfir(
3445 parse_quote! {
3446 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3447 },
3448 None,
3449 Some(&next_stmt_id.to_string()),
3450 );
3451 } else {
3452 let builder = graph_builders.get_dfir_mut(&out_location);
3453 builder.add_dfir(
3454 parse_quote! {
3455 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3456 },
3457 None,
3458 Some(&next_stmt_id.to_string()),
3459 );
3460 }
3461 }
3462 BuildersOrCallback::Callback(_, node_callback) => {
3463 node_callback(node, next_stmt_id);
3464 }
3465 }
3466
3467 *next_stmt_id += 1;
3468
3469 ident_stack.push(fold_ident);
3470 }
3471
3472 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3473 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3474 if input.metadata().location_id.is_top_level()
3475 && input.metadata().collection_kind.is_bounded()
3476 {
3477 parse_quote!(reduce_no_replay)
3478 } else {
3479 parse_quote!(reduce)
3480 }
3481 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3482 if input.metadata().location_id.is_top_level()
3483 && input.metadata().collection_kind.is_bounded()
3484 {
3485 todo!(
3486 "Calling keyed reduce on a top-level bounded collection is not supported"
3487 )
3488 } else {
3489 parse_quote!(reduce_keyed)
3490 }
3491 } else {
3492 unreachable!()
3493 };
3494
3495 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3496 else {
3497 unreachable!()
3498 };
3499
3500 let lifetime = if input.metadata().location_id.is_top_level() {
3501 quote!('static)
3502 } else {
3503 quote!('tick)
3504 };
3505
3506 let input_ident = ident_stack.pop().unwrap();
3507
3508 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3509 else {
3510 unreachable!()
3511 };
3512
3513 let reduce_ident =
3514 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3515
3516 match builders_or_callback {
3517 BuildersOrCallback::Builders(graph_builders) => {
3518 if matches!(node, HydroNode::Reduce { .. })
3519 && node.metadata().location_id.is_top_level()
3520 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3521 && graph_builders.singleton_intermediates()
3522 && !node.metadata().collection_kind.is_bounded()
3523 {
3524 todo!(
3525 "Reduce with optional intermediates is not yet supported in simulator"
3526 );
3527 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3528 && node.metadata().location_id.is_top_level()
3529 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3530 && graph_builders.singleton_intermediates()
3531 && !node.metadata().collection_kind.is_bounded()
3532 {
3533 todo!(
3534 "Reduce keyed with optional intermediates is not yet supported in simulator"
3535 );
3536 } else {
3537 let builder = graph_builders.get_dfir_mut(&out_location);
3538 builder.add_dfir(
3539 parse_quote! {
3540 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3541 },
3542 None,
3543 Some(&next_stmt_id.to_string()),
3544 );
3545 }
3546 }
3547 BuildersOrCallback::Callback(_, node_callback) => {
3548 node_callback(node, next_stmt_id);
3549 }
3550 }
3551
3552 *next_stmt_id += 1;
3553
3554 ident_stack.push(reduce_ident);
3555 }
3556
3557 HydroNode::ReduceKeyedWatermark {
3558 f,
3559 input,
3560 metadata,
3561 ..
3562 } => {
3563 let lifetime = if input.metadata().location_id.is_top_level() {
3564 quote!('static)
3565 } else {
3566 quote!('tick)
3567 };
3568
3569 let watermark_ident = ident_stack.pop().unwrap();
3571 let input_ident = ident_stack.pop().unwrap();
3572
3573 let chain_ident = syn::Ident::new(
3574 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3575 Span::call_site(),
3576 );
3577
3578 let fold_ident =
3579 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3580
3581 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3582 && input.metadata().collection_kind.is_bounded()
3583 {
3584 parse_quote!(fold_no_replay)
3585 } else {
3586 parse_quote!(fold)
3587 };
3588
3589 match builders_or_callback {
3590 BuildersOrCallback::Builders(graph_builders) => {
3591 if metadata.location_id.is_top_level()
3592 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3593 && graph_builders.singleton_intermediates()
3594 && !metadata.collection_kind.is_bounded()
3595 {
3596 todo!(
3597 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3598 )
3599 } else {
3600 let builder = graph_builders.get_dfir_mut(&out_location);
3601 builder.add_dfir(
3602 parse_quote! {
3603 #chain_ident = chain();
3604 #input_ident
3605 -> map(|x| (Some(x), None))
3606 -> [0]#chain_ident;
3607 #watermark_ident
3608 -> map(|watermark| (None, Some(watermark)))
3609 -> [1]#chain_ident;
3610
3611 #fold_ident = #chain_ident
3612 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3613 let __reduce_keyed_fn = #f;
3614 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3615 if let Some((k, v)) = opt_payload {
3616 if let Some(curr_watermark) = *opt_curr_watermark {
3617 if k < curr_watermark {
3618 return;
3619 }
3620 }
3621 match map.entry(k) {
3622 ::std::collections::hash_map::Entry::Vacant(e) => {
3623 e.insert(v);
3624 }
3625 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3626 __reduce_keyed_fn(e.get_mut(), v);
3627 }
3628 }
3629 } else {
3630 let watermark = opt_watermark.unwrap();
3631 if let Some(curr_watermark) = *opt_curr_watermark {
3632 if watermark <= curr_watermark {
3633 return;
3634 }
3635 }
3636 *opt_curr_watermark = opt_watermark;
3637 map.retain(|k, _| *k >= watermark);
3638 }
3639 }
3640 })
3641 -> flat_map(|(map, _curr_watermark)| map);
3642 },
3643 None,
3644 Some(&next_stmt_id.to_string()),
3645 );
3646 }
3647 }
3648 BuildersOrCallback::Callback(_, node_callback) => {
3649 node_callback(node, next_stmt_id);
3650 }
3651 }
3652
3653 *next_stmt_id += 1;
3654
3655 ident_stack.push(fold_ident);
3656 }
3657
3658 HydroNode::Network {
3659 networking_info,
3660 serialize_fn: serialize_pipeline,
3661 instantiate_fn,
3662 deserialize_fn: deserialize_pipeline,
3663 input,
3664 ..
3665 } => {
3666 let input_ident = ident_stack.pop().unwrap();
3667
3668 let receiver_stream_ident =
3669 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3670
3671 match builders_or_callback {
3672 BuildersOrCallback::Builders(graph_builders) => {
3673 let (sink_expr, source_expr) = match instantiate_fn {
3674 DebugInstantiate::Building => (
3675 syn::parse_quote!(DUMMY_SINK),
3676 syn::parse_quote!(DUMMY_SOURCE),
3677 ),
3678
3679 DebugInstantiate::Finalized(finalized) => {
3680 (finalized.sink.clone(), finalized.source.clone())
3681 }
3682 };
3683
3684 graph_builders.create_network(
3685 &input.metadata().location_id,
3686 &out_location,
3687 input_ident,
3688 &receiver_stream_ident,
3689 serialize_pipeline.as_ref(),
3690 sink_expr,
3691 source_expr,
3692 deserialize_pipeline.as_ref(),
3693 *next_stmt_id,
3694 networking_info,
3695 );
3696 }
3697 BuildersOrCallback::Callback(_, node_callback) => {
3698 node_callback(node, next_stmt_id);
3699 }
3700 }
3701
3702 *next_stmt_id += 1;
3703
3704 ident_stack.push(receiver_stream_ident);
3705 }
3706
3707 HydroNode::ExternalInput {
3708 instantiate_fn,
3709 deserialize_fn: deserialize_pipeline,
3710 ..
3711 } => {
3712 let receiver_stream_ident =
3713 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3714
3715 match builders_or_callback {
3716 BuildersOrCallback::Builders(graph_builders) => {
3717 let (_, source_expr) = match instantiate_fn {
3718 DebugInstantiate::Building => (
3719 syn::parse_quote!(DUMMY_SINK),
3720 syn::parse_quote!(DUMMY_SOURCE),
3721 ),
3722
3723 DebugInstantiate::Finalized(finalized) => {
3724 (finalized.sink.clone(), finalized.source.clone())
3725 }
3726 };
3727
3728 graph_builders.create_external_source(
3729 &out_location,
3730 source_expr,
3731 &receiver_stream_ident,
3732 deserialize_pipeline.as_ref(),
3733 *next_stmt_id,
3734 );
3735 }
3736 BuildersOrCallback::Callback(_, node_callback) => {
3737 node_callback(node, next_stmt_id);
3738 }
3739 }
3740
3741 *next_stmt_id += 1;
3742
3743 ident_stack.push(receiver_stream_ident);
3744 }
3745
3746 HydroNode::Counter {
3747 tag,
3748 duration,
3749 prefix,
3750 ..
3751 } => {
3752 let input_ident = ident_stack.pop().unwrap();
3753
3754 let counter_ident =
3755 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3756
3757 match builders_or_callback {
3758 BuildersOrCallback::Builders(graph_builders) => {
3759 let arg = format!("{}({})", prefix, tag);
3760 let builder = graph_builders.get_dfir_mut(&out_location);
3761 builder.add_dfir(
3762 parse_quote! {
3763 #counter_ident = #input_ident -> _counter(#arg, #duration);
3764 },
3765 None,
3766 Some(&next_stmt_id.to_string()),
3767 );
3768 }
3769 BuildersOrCallback::Callback(_, node_callback) => {
3770 node_callback(node, next_stmt_id);
3771 }
3772 }
3773
3774 *next_stmt_id += 1;
3775
3776 ident_stack.push(counter_ident);
3777 }
3778 }
3779 },
3780 seen_tees,
3781 false,
3782 );
3783
3784 ident_stack
3785 .pop()
3786 .expect("ident_stack should have exactly one element after traversal")
3787 }
3788
3789 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3790 match self {
3791 HydroNode::Placeholder => {
3792 panic!()
3793 }
3794 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3795 HydroNode::Source { source, .. } => match source {
3796 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3797 HydroSource::ExternalNetwork()
3798 | HydroSource::Spin()
3799 | HydroSource::ClusterMembers(_, _)
3800 | HydroSource::Embedded(_)
3801 | HydroSource::EmbeddedSingleton(_) => {} },
3803 HydroNode::SingletonSource { value, .. } => {
3804 transform(value);
3805 }
3806 HydroNode::CycleSource { .. }
3807 | HydroNode::Tee { .. }
3808 | HydroNode::YieldConcat { .. }
3809 | HydroNode::BeginAtomic { .. }
3810 | HydroNode::EndAtomic { .. }
3811 | HydroNode::Batch { .. }
3812 | HydroNode::Chain { .. }
3813 | HydroNode::ChainFirst { .. }
3814 | HydroNode::CrossProduct { .. }
3815 | HydroNode::CrossSingleton { .. }
3816 | HydroNode::ResolveFutures { .. }
3817 | HydroNode::ResolveFuturesOrdered { .. }
3818 | HydroNode::Join { .. }
3819 | HydroNode::Difference { .. }
3820 | HydroNode::AntiJoin { .. }
3821 | HydroNode::DeferTick { .. }
3822 | HydroNode::Enumerate { .. }
3823 | HydroNode::Unique { .. }
3824 | HydroNode::Sort { .. } => {}
3825 HydroNode::Map { f, .. }
3826 | HydroNode::FlatMap { f, .. }
3827 | HydroNode::Filter { f, .. }
3828 | HydroNode::FilterMap { f, .. }
3829 | HydroNode::Inspect { f, .. }
3830 | HydroNode::Partition { f, .. }
3831 | HydroNode::Reduce { f, .. }
3832 | HydroNode::ReduceKeyed { f, .. }
3833 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3834 transform(f);
3835 }
3836 HydroNode::Fold { init, acc, .. }
3837 | HydroNode::Scan { init, acc, .. }
3838 | HydroNode::FoldKeyed { init, acc, .. } => {
3839 transform(init);
3840 transform(acc);
3841 }
3842 HydroNode::Network {
3843 serialize_fn,
3844 deserialize_fn,
3845 ..
3846 } => {
3847 if let Some(serialize_fn) = serialize_fn {
3848 transform(serialize_fn);
3849 }
3850 if let Some(deserialize_fn) = deserialize_fn {
3851 transform(deserialize_fn);
3852 }
3853 }
3854 HydroNode::ExternalInput { deserialize_fn, .. } => {
3855 if let Some(deserialize_fn) = deserialize_fn {
3856 transform(deserialize_fn);
3857 }
3858 }
3859 HydroNode::Counter { duration, .. } => {
3860 transform(duration);
3861 }
3862 }
3863 }
3864
3865 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3866 &self.metadata().op
3867 }
3868
3869 pub fn metadata(&self) -> &HydroIrMetadata {
3870 match self {
3871 HydroNode::Placeholder => {
3872 panic!()
3873 }
3874 HydroNode::Cast { metadata, .. } => metadata,
3875 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3876 HydroNode::Source { metadata, .. } => metadata,
3877 HydroNode::SingletonSource { metadata, .. } => metadata,
3878 HydroNode::CycleSource { metadata, .. } => metadata,
3879 HydroNode::Tee { metadata, .. } => metadata,
3880 HydroNode::Partition { metadata, .. } => metadata,
3881 HydroNode::YieldConcat { metadata, .. } => metadata,
3882 HydroNode::BeginAtomic { metadata, .. } => metadata,
3883 HydroNode::EndAtomic { metadata, .. } => metadata,
3884 HydroNode::Batch { metadata, .. } => metadata,
3885 HydroNode::Chain { metadata, .. } => metadata,
3886 HydroNode::ChainFirst { metadata, .. } => metadata,
3887 HydroNode::CrossProduct { metadata, .. } => metadata,
3888 HydroNode::CrossSingleton { metadata, .. } => metadata,
3889 HydroNode::Join { metadata, .. } => metadata,
3890 HydroNode::Difference { metadata, .. } => metadata,
3891 HydroNode::AntiJoin { metadata, .. } => metadata,
3892 HydroNode::ResolveFutures { metadata, .. } => metadata,
3893 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3894 HydroNode::Map { metadata, .. } => metadata,
3895 HydroNode::FlatMap { metadata, .. } => metadata,
3896 HydroNode::Filter { metadata, .. } => metadata,
3897 HydroNode::FilterMap { metadata, .. } => metadata,
3898 HydroNode::DeferTick { metadata, .. } => metadata,
3899 HydroNode::Enumerate { metadata, .. } => metadata,
3900 HydroNode::Inspect { metadata, .. } => metadata,
3901 HydroNode::Unique { metadata, .. } => metadata,
3902 HydroNode::Sort { metadata, .. } => metadata,
3903 HydroNode::Scan { metadata, .. } => metadata,
3904 HydroNode::Fold { metadata, .. } => metadata,
3905 HydroNode::FoldKeyed { metadata, .. } => metadata,
3906 HydroNode::Reduce { metadata, .. } => metadata,
3907 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3908 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3909 HydroNode::ExternalInput { metadata, .. } => metadata,
3910 HydroNode::Network { metadata, .. } => metadata,
3911 HydroNode::Counter { metadata, .. } => metadata,
3912 }
3913 }
3914
3915 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3916 &mut self.metadata_mut().op
3917 }
3918
3919 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3920 match self {
3921 HydroNode::Placeholder => {
3922 panic!()
3923 }
3924 HydroNode::Cast { metadata, .. } => metadata,
3925 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3926 HydroNode::Source { metadata, .. } => metadata,
3927 HydroNode::SingletonSource { metadata, .. } => metadata,
3928 HydroNode::CycleSource { metadata, .. } => metadata,
3929 HydroNode::Tee { metadata, .. } => metadata,
3930 HydroNode::Partition { metadata, .. } => metadata,
3931 HydroNode::YieldConcat { metadata, .. } => metadata,
3932 HydroNode::BeginAtomic { metadata, .. } => metadata,
3933 HydroNode::EndAtomic { metadata, .. } => metadata,
3934 HydroNode::Batch { metadata, .. } => metadata,
3935 HydroNode::Chain { metadata, .. } => metadata,
3936 HydroNode::ChainFirst { metadata, .. } => metadata,
3937 HydroNode::CrossProduct { metadata, .. } => metadata,
3938 HydroNode::CrossSingleton { metadata, .. } => metadata,
3939 HydroNode::Join { metadata, .. } => metadata,
3940 HydroNode::Difference { metadata, .. } => metadata,
3941 HydroNode::AntiJoin { metadata, .. } => metadata,
3942 HydroNode::ResolveFutures { metadata, .. } => metadata,
3943 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3944 HydroNode::Map { metadata, .. } => metadata,
3945 HydroNode::FlatMap { metadata, .. } => metadata,
3946 HydroNode::Filter { metadata, .. } => metadata,
3947 HydroNode::FilterMap { metadata, .. } => metadata,
3948 HydroNode::DeferTick { metadata, .. } => metadata,
3949 HydroNode::Enumerate { metadata, .. } => metadata,
3950 HydroNode::Inspect { metadata, .. } => metadata,
3951 HydroNode::Unique { metadata, .. } => metadata,
3952 HydroNode::Sort { metadata, .. } => metadata,
3953 HydroNode::Scan { metadata, .. } => metadata,
3954 HydroNode::Fold { metadata, .. } => metadata,
3955 HydroNode::FoldKeyed { metadata, .. } => metadata,
3956 HydroNode::Reduce { metadata, .. } => metadata,
3957 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3958 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3959 HydroNode::ExternalInput { metadata, .. } => metadata,
3960 HydroNode::Network { metadata, .. } => metadata,
3961 HydroNode::Counter { metadata, .. } => metadata,
3962 }
3963 }
3964
3965 pub fn input(&self) -> Vec<&HydroNode> {
3966 match self {
3967 HydroNode::Placeholder => {
3968 panic!()
3969 }
3970 HydroNode::Source { .. }
3971 | HydroNode::SingletonSource { .. }
3972 | HydroNode::ExternalInput { .. }
3973 | HydroNode::CycleSource { .. }
3974 | HydroNode::Tee { .. }
3975 | HydroNode::Partition { .. } => {
3976 vec![]
3978 }
3979 HydroNode::Cast { inner, .. }
3980 | HydroNode::ObserveNonDet { inner, .. }
3981 | HydroNode::YieldConcat { inner, .. }
3982 | HydroNode::BeginAtomic { inner, .. }
3983 | HydroNode::EndAtomic { inner, .. }
3984 | HydroNode::Batch { inner, .. } => {
3985 vec![inner]
3986 }
3987 HydroNode::Chain { first, second, .. } => {
3988 vec![first, second]
3989 }
3990 HydroNode::ChainFirst { first, second, .. } => {
3991 vec![first, second]
3992 }
3993 HydroNode::CrossProduct { left, right, .. }
3994 | HydroNode::CrossSingleton { left, right, .. }
3995 | HydroNode::Join { left, right, .. } => {
3996 vec![left, right]
3997 }
3998 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3999 vec![pos, neg]
4000 }
4001 HydroNode::Map { input, .. }
4002 | HydroNode::FlatMap { input, .. }
4003 | HydroNode::Filter { input, .. }
4004 | HydroNode::FilterMap { input, .. }
4005 | HydroNode::Sort { input, .. }
4006 | HydroNode::DeferTick { input, .. }
4007 | HydroNode::Enumerate { input, .. }
4008 | HydroNode::Inspect { input, .. }
4009 | HydroNode::Unique { input, .. }
4010 | HydroNode::Network { input, .. }
4011 | HydroNode::Counter { input, .. }
4012 | HydroNode::ResolveFutures { input, .. }
4013 | HydroNode::ResolveFuturesOrdered { input, .. }
4014 | HydroNode::Fold { input, .. }
4015 | HydroNode::FoldKeyed { input, .. }
4016 | HydroNode::Reduce { input, .. }
4017 | HydroNode::ReduceKeyed { input, .. }
4018 | HydroNode::Scan { input, .. } => {
4019 vec![input]
4020 }
4021 HydroNode::ReduceKeyedWatermark {
4022 input, watermark, ..
4023 } => {
4024 vec![input, watermark]
4025 }
4026 }
4027 }
4028
4029 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4030 self.input()
4031 .iter()
4032 .map(|input_node| input_node.metadata())
4033 .collect()
4034 }
4035
4036 pub fn is_shared_with_others(&self) -> bool {
4040 match self {
4041 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4042 Rc::strong_count(&inner.0) > 1
4043 }
4044 _ => false,
4045 }
4046 }
4047
4048 pub fn print_root(&self) -> String {
4049 match self {
4050 HydroNode::Placeholder => {
4051 panic!()
4052 }
4053 HydroNode::Cast { .. } => "Cast()".to_owned(),
4054 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4055 HydroNode::Source { source, .. } => format!("Source({:?})", source),
4056 HydroNode::SingletonSource {
4057 value,
4058 first_tick_only,
4059 ..
4060 } => format!(
4061 "SingletonSource({:?}, first_tick_only={})",
4062 value, first_tick_only
4063 ),
4064 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4065 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4066 HydroNode::Partition { f, is_true, .. } => {
4067 format!("Partition({:?}, is_true={})", f, is_true)
4068 }
4069 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4070 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4071 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4072 HydroNode::Batch { .. } => "Batch()".to_owned(),
4073 HydroNode::Chain { first, second, .. } => {
4074 format!("Chain({}, {})", first.print_root(), second.print_root())
4075 }
4076 HydroNode::ChainFirst { first, second, .. } => {
4077 format!(
4078 "ChainFirst({}, {})",
4079 first.print_root(),
4080 second.print_root()
4081 )
4082 }
4083 HydroNode::CrossProduct { left, right, .. } => {
4084 format!(
4085 "CrossProduct({}, {})",
4086 left.print_root(),
4087 right.print_root()
4088 )
4089 }
4090 HydroNode::CrossSingleton { left, right, .. } => {
4091 format!(
4092 "CrossSingleton({}, {})",
4093 left.print_root(),
4094 right.print_root()
4095 )
4096 }
4097 HydroNode::Join { left, right, .. } => {
4098 format!("Join({}, {})", left.print_root(), right.print_root())
4099 }
4100 HydroNode::Difference { pos, neg, .. } => {
4101 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4102 }
4103 HydroNode::AntiJoin { pos, neg, .. } => {
4104 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4105 }
4106 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4107 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4108 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4109 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4110 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4111 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4112 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4113 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4114 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4115 HydroNode::Unique { .. } => "Unique()".to_owned(),
4116 HydroNode::Sort { .. } => "Sort()".to_owned(),
4117 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4118 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4119 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4120 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4121 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4122 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4123 HydroNode::Network { .. } => "Network()".to_owned(),
4124 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4125 HydroNode::Counter { tag, duration, .. } => {
4126 format!("Counter({:?}, {:?})", tag, duration)
4127 }
4128 }
4129 }
4130}
4131
4132#[cfg(feature = "build")]
4133fn instantiate_network<'a, D>(
4134 env: &mut D::InstantiateEnv,
4135 from_location: &LocationId,
4136 to_location: &LocationId,
4137 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4138 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4139 name: Option<&str>,
4140 networking_info: &crate::networking::NetworkingInfo,
4141) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4142where
4143 D: Deploy<'a>,
4144{
4145 let ((sink, source), connect_fn) = match (from_location, to_location) {
4146 (&LocationId::Process(from), &LocationId::Process(to)) => {
4147 let from_node = processes
4148 .get(from)
4149 .unwrap_or_else(|| {
4150 panic!("A process used in the graph was not instantiated: {}", from)
4151 })
4152 .clone();
4153 let to_node = processes
4154 .get(to)
4155 .unwrap_or_else(|| {
4156 panic!("A process used in the graph was not instantiated: {}", to)
4157 })
4158 .clone();
4159
4160 let sink_port = from_node.next_port();
4161 let source_port = to_node.next_port();
4162
4163 (
4164 D::o2o_sink_source(
4165 env,
4166 &from_node,
4167 &sink_port,
4168 &to_node,
4169 &source_port,
4170 name,
4171 networking_info,
4172 ),
4173 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4174 )
4175 }
4176 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4177 let from_node = processes
4178 .get(from)
4179 .unwrap_or_else(|| {
4180 panic!("A process used in the graph was not instantiated: {}", from)
4181 })
4182 .clone();
4183 let to_node = clusters
4184 .get(to)
4185 .unwrap_or_else(|| {
4186 panic!("A cluster used in the graph was not instantiated: {}", to)
4187 })
4188 .clone();
4189
4190 let sink_port = from_node.next_port();
4191 let source_port = to_node.next_port();
4192
4193 (
4194 D::o2m_sink_source(
4195 env,
4196 &from_node,
4197 &sink_port,
4198 &to_node,
4199 &source_port,
4200 name,
4201 networking_info,
4202 ),
4203 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4204 )
4205 }
4206 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4207 let from_node = clusters
4208 .get(from)
4209 .unwrap_or_else(|| {
4210 panic!("A cluster used in the graph was not instantiated: {}", from)
4211 })
4212 .clone();
4213 let to_node = processes
4214 .get(to)
4215 .unwrap_or_else(|| {
4216 panic!("A process used in the graph was not instantiated: {}", to)
4217 })
4218 .clone();
4219
4220 let sink_port = from_node.next_port();
4221 let source_port = to_node.next_port();
4222
4223 (
4224 D::m2o_sink_source(
4225 env,
4226 &from_node,
4227 &sink_port,
4228 &to_node,
4229 &source_port,
4230 name,
4231 networking_info,
4232 ),
4233 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4234 )
4235 }
4236 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4237 let from_node = clusters
4238 .get(from)
4239 .unwrap_or_else(|| {
4240 panic!("A cluster used in the graph was not instantiated: {}", from)
4241 })
4242 .clone();
4243 let to_node = clusters
4244 .get(to)
4245 .unwrap_or_else(|| {
4246 panic!("A cluster used in the graph was not instantiated: {}", to)
4247 })
4248 .clone();
4249
4250 let sink_port = from_node.next_port();
4251 let source_port = to_node.next_port();
4252
4253 (
4254 D::m2m_sink_source(
4255 env,
4256 &from_node,
4257 &sink_port,
4258 &to_node,
4259 &source_port,
4260 name,
4261 networking_info,
4262 ),
4263 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4264 )
4265 }
4266 (LocationId::Tick(_, _), _) => panic!(),
4267 (_, LocationId::Tick(_, _)) => panic!(),
4268 (LocationId::Atomic(_), _) => panic!(),
4269 (_, LocationId::Atomic(_)) => panic!(),
4270 };
4271 (sink, source, connect_fn)
4272}
4273
4274#[cfg(test)]
4275mod test {
4276 use std::mem::size_of;
4277
4278 use stageleft::{QuotedWithContext, q};
4279
4280 use super::*;
4281
4282 #[test]
4283 #[cfg_attr(
4284 not(feature = "build"),
4285 ignore = "expects inclusion of feature-gated fields"
4286 )]
4287 fn hydro_node_size() {
4288 assert_eq!(size_of::<HydroNode>(), 248);
4289 }
4290
4291 #[test]
4292 #[cfg_attr(
4293 not(feature = "build"),
4294 ignore = "expects inclusion of feature-gated fields"
4295 )]
4296 fn hydro_root_size() {
4297 assert_eq!(size_of::<HydroRoot>(), 136);
4298 }
4299
4300 #[test]
4301 fn test_simplify_q_macro_basic() {
4302 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4304 let result = simplify_q_macro(simple_expr.clone());
4305 assert_eq!(result, simple_expr);
4306 }
4307
4308 #[test]
4309 fn test_simplify_q_macro_actual_stageleft_call() {
4310 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4312 let result = simplify_q_macro(stageleft_call);
4313 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4316 }
4317
4318 #[test]
4319 fn test_closure_no_pipe_at_start() {
4320 let stageleft_call = q!({
4322 let foo = 123;
4323 move |b: usize| b + foo
4324 })
4325 .splice_fn1_ctx(&());
4326 let result = simplify_q_macro(stageleft_call);
4327 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4328 }
4329}