1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::{Debug, Display};
5use std::hash::{Hash, Hasher};
6use std::ops::Deref;
7use std::rc::Rc;
8
9#[cfg(feature = "build")]
10use dfir_lang::graph::FlatGraphBuilder;
11#[cfg(feature = "build")]
12use proc_macro2::Span;
13use proc_macro2::TokenStream;
14use quote::ToTokens;
15#[cfg(feature = "build")]
16use quote::quote;
17#[cfg(feature = "build")]
18use slotmap::{SecondaryMap, SparseSecondaryMap};
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::ExternalPortId;
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::dynamic::LocationId;
28use crate::location::{LocationKey, NetworkHint};
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40 fn from(expr: syn::Expr) -> Self {
41 Self(Box::new(expr))
42 }
43}
44
45impl Deref for DebugExpr {
46 type Target = syn::Expr;
47
48 fn deref(&self) -> &Self::Target {
49 &self.0
50 }
51}
52
53impl ToTokens for DebugExpr {
54 fn to_tokens(&self, tokens: &mut TokenStream) {
55 self.0.to_tokens(tokens);
56 }
57}
58
59impl Debug for DebugExpr {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, "{}", self.0.to_token_stream())
62 }
63}
64
65impl Display for DebugExpr {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 let original = self.0.as_ref().clone();
68 let simplified = simplify_q_macro(original);
69
70 write!(f, "q!({})", quote::quote!(#simplified))
73 }
74}
75
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78 let mut simplifier = QMacroSimplifier::new();
81 simplifier.visit_expr_mut(&mut expr);
82
83 if let Some(simplified) = simplifier.simplified_result {
85 simplified
86 } else {
87 expr
88 }
89}
90
91#[derive(Default)]
93pub struct QMacroSimplifier {
94 pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98 pub fn new() -> Self {
99 Self::default()
100 }
101}
102
103impl VisitMut for QMacroSimplifier {
104 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105 if self.simplified_result.is_some() {
107 return;
108 }
109
110 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111 && self.is_stageleft_runtime_support_call(&path_expr.path)
113 && let Some(closure) = self.extract_closure_from_args(&call.args)
115 {
116 self.simplified_result = Some(closure);
117 return;
118 }
119
120 syn::visit_mut::visit_expr_mut(self, expr);
123 }
124}
125
126impl QMacroSimplifier {
127 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128 if let Some(last_segment) = path.segments.last() {
130 let fn_name = last_segment.ident.to_string();
131 fn_name.contains("_type_hint")
133 && path.segments.len() > 2
134 && path.segments[0].ident == "stageleft"
135 && path.segments[1].ident == "runtime_support"
136 } else {
137 false
138 }
139 }
140
141 fn extract_closure_from_args(
142 &self,
143 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144 ) -> Option<syn::Expr> {
145 for arg in args {
147 if let syn::Expr::Closure(_) = arg {
148 return Some(arg.clone());
149 }
150 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152 return Some(closure_expr);
153 }
154 }
155 None
156 }
157
158 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159 let mut visitor = ClosureFinder {
160 found_closure: None,
161 prefer_inner_blocks: true,
162 };
163 visitor.visit_expr(expr);
164 visitor.found_closure
165 }
166}
167
168struct ClosureFinder {
170 found_closure: Option<syn::Expr>,
171 prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176 if self.found_closure.is_some() {
178 return;
179 }
180
181 match expr {
182 syn::Expr::Closure(_) => {
183 self.found_closure = Some(expr.clone());
184 }
185 syn::Expr::Block(block) if self.prefer_inner_blocks => {
186 for stmt in &block.block.stmts {
188 if let syn::Stmt::Expr(stmt_expr, _) = stmt
189 && let syn::Expr::Block(_) = stmt_expr
190 {
191 let mut inner_visitor = ClosureFinder {
193 found_closure: None,
194 prefer_inner_blocks: false, };
196 inner_visitor.visit_expr(stmt_expr);
197 if inner_visitor.found_closure.is_some() {
198 self.found_closure = Some(stmt_expr.clone());
200 return;
201 }
202 }
203 }
204
205 visit::visit_expr(self, expr);
207
208 if self.found_closure.is_some() {
211 }
213 }
214 _ => {
215 visit::visit_expr(self, expr);
217 }
218 }
219 }
220}
221
222#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229 fn from(t: syn::Type) -> Self {
230 Self(Box::new(t))
231 }
232}
233
234impl Deref for DebugType {
235 type Target = syn::Type;
236
237 fn deref(&self) -> &Self::Target {
238 &self.0
239 }
240}
241
242impl ToTokens for DebugType {
243 fn to_tokens(&self, tokens: &mut TokenStream) {
244 self.0.to_tokens(tokens);
245 }
246}
247
248impl Debug for DebugType {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 write!(f, "{}", self.0.to_token_stream())
251 }
252}
253
254pub enum DebugInstantiate {
255 Building,
256 Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260 not(feature = "build"),
261 expect(
262 dead_code,
263 reason = "sink, source unused without `feature = \"build\"`."
264 )
265)]
266pub struct DebugInstantiateFinalized {
267 sink: syn::Expr,
268 source: syn::Expr,
269 connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273 fn from(f: DebugInstantiateFinalized) -> Self {
274 Self::Finalized(Box::new(f))
275 }
276}
277
278impl Debug for DebugInstantiate {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 write!(f, "<network instantiate>")
281 }
282}
283
284impl Hash for DebugInstantiate {
285 fn hash<H: Hasher>(&self, _state: &mut H) {
286 }
288}
289
290impl Clone for DebugInstantiate {
291 fn clone(&self) -> Self {
292 match self {
293 DebugInstantiate::Building => DebugInstantiate::Building,
294 DebugInstantiate::Finalized(_) => {
295 panic!("DebugInstantiate::Finalized should not be cloned")
296 }
297 }
298 }
299}
300
301#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304 Stream(DebugExpr),
305 ExternalNetwork(),
306 Iter(DebugExpr),
307 Spin(),
308 ClusterMembers(LocationId),
309}
310
311#[cfg(feature = "build")]
312pub trait DfirBuilder {
318 fn singleton_intermediates(&self) -> bool;
320
321 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
323
324 fn batch(
325 &mut self,
326 in_ident: syn::Ident,
327 in_location: &LocationId,
328 in_kind: &CollectionKind,
329 out_ident: &syn::Ident,
330 out_location: &LocationId,
331 op_meta: &HydroIrOpMetadata,
332 );
333 fn yield_from_tick(
334 &mut self,
335 in_ident: syn::Ident,
336 in_location: &LocationId,
337 in_kind: &CollectionKind,
338 out_ident: &syn::Ident,
339 out_location: &LocationId,
340 );
341
342 fn begin_atomic(
343 &mut self,
344 in_ident: syn::Ident,
345 in_location: &LocationId,
346 in_kind: &CollectionKind,
347 out_ident: &syn::Ident,
348 out_location: &LocationId,
349 op_meta: &HydroIrOpMetadata,
350 );
351 fn end_atomic(
352 &mut self,
353 in_ident: syn::Ident,
354 in_location: &LocationId,
355 in_kind: &CollectionKind,
356 out_ident: &syn::Ident,
357 );
358
359 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
360 fn observe_nondet(
361 &mut self,
362 trusted: bool,
363 location: &LocationId,
364 in_ident: syn::Ident,
365 in_kind: &CollectionKind,
366 out_ident: &syn::Ident,
367 out_kind: &CollectionKind,
368 op_meta: &HydroIrOpMetadata,
369 );
370
371 #[expect(clippy::too_many_arguments, reason = "TODO")]
372 fn create_network(
373 &mut self,
374 from: &LocationId,
375 to: &LocationId,
376 input_ident: syn::Ident,
377 out_ident: &syn::Ident,
378 serialize: Option<&DebugExpr>,
379 sink: syn::Expr,
380 source: syn::Expr,
381 deserialize: Option<&DebugExpr>,
382 tag_id: usize,
383 );
384
385 fn create_external_source(
386 &mut self,
387 on: &LocationId,
388 source_expr: syn::Expr,
389 out_ident: &syn::Ident,
390 deserialize: Option<&DebugExpr>,
391 tag_id: usize,
392 );
393
394 fn create_external_output(
395 &mut self,
396 on: &LocationId,
397 sink_expr: syn::Expr,
398 input_ident: &syn::Ident,
399 serialize: Option<&DebugExpr>,
400 tag_id: usize,
401 );
402}
403
404#[cfg(feature = "build")]
405impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
406 fn singleton_intermediates(&self) -> bool {
407 false
408 }
409
410 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
411 self.entry(location.root().key())
412 .expect("location was removed")
413 .or_default()
414 }
415
416 fn batch(
417 &mut self,
418 in_ident: syn::Ident,
419 in_location: &LocationId,
420 in_kind: &CollectionKind,
421 out_ident: &syn::Ident,
422 _out_location: &LocationId,
423 _op_meta: &HydroIrOpMetadata,
424 ) {
425 let builder = self.get_dfir_mut(in_location.root());
426 if in_kind.is_bounded()
427 && matches!(
428 in_kind,
429 CollectionKind::Singleton { .. }
430 | CollectionKind::Optional { .. }
431 | CollectionKind::KeyedSingleton { .. }
432 )
433 {
434 assert!(in_location.is_top_level());
435 builder.add_dfir(
436 parse_quote! {
437 #out_ident = #in_ident -> persist::<'static>();
438 },
439 None,
440 None,
441 );
442 } else {
443 builder.add_dfir(
444 parse_quote! {
445 #out_ident = #in_ident;
446 },
447 None,
448 None,
449 );
450 }
451 }
452
453 fn yield_from_tick(
454 &mut self,
455 in_ident: syn::Ident,
456 in_location: &LocationId,
457 _in_kind: &CollectionKind,
458 out_ident: &syn::Ident,
459 _out_location: &LocationId,
460 ) {
461 let builder = self.get_dfir_mut(in_location.root());
462 builder.add_dfir(
463 parse_quote! {
464 #out_ident = #in_ident;
465 },
466 None,
467 None,
468 );
469 }
470
471 fn begin_atomic(
472 &mut self,
473 in_ident: syn::Ident,
474 in_location: &LocationId,
475 _in_kind: &CollectionKind,
476 out_ident: &syn::Ident,
477 _out_location: &LocationId,
478 _op_meta: &HydroIrOpMetadata,
479 ) {
480 let builder = self.get_dfir_mut(in_location.root());
481 builder.add_dfir(
482 parse_quote! {
483 #out_ident = #in_ident;
484 },
485 None,
486 None,
487 );
488 }
489
490 fn end_atomic(
491 &mut self,
492 in_ident: syn::Ident,
493 in_location: &LocationId,
494 _in_kind: &CollectionKind,
495 out_ident: &syn::Ident,
496 ) {
497 let builder = self.get_dfir_mut(in_location.root());
498 builder.add_dfir(
499 parse_quote! {
500 #out_ident = #in_ident;
501 },
502 None,
503 None,
504 );
505 }
506
507 fn observe_nondet(
508 &mut self,
509 _trusted: bool,
510 location: &LocationId,
511 in_ident: syn::Ident,
512 _in_kind: &CollectionKind,
513 out_ident: &syn::Ident,
514 _out_kind: &CollectionKind,
515 _op_meta: &HydroIrOpMetadata,
516 ) {
517 let builder = self.get_dfir_mut(location);
518 builder.add_dfir(
519 parse_quote! {
520 #out_ident = #in_ident;
521 },
522 None,
523 None,
524 );
525 }
526
527 fn create_network(
528 &mut self,
529 from: &LocationId,
530 to: &LocationId,
531 input_ident: syn::Ident,
532 out_ident: &syn::Ident,
533 serialize: Option<&DebugExpr>,
534 sink: syn::Expr,
535 source: syn::Expr,
536 deserialize: Option<&DebugExpr>,
537 tag_id: usize,
538 ) {
539 let sender_builder = self.get_dfir_mut(from);
540 if let Some(serialize_pipeline) = serialize {
541 sender_builder.add_dfir(
542 parse_quote! {
543 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
544 },
545 None,
546 Some(&format!("send{}", tag_id)),
548 );
549 } else {
550 sender_builder.add_dfir(
551 parse_quote! {
552 #input_ident -> dest_sink(#sink);
553 },
554 None,
555 Some(&format!("send{}", tag_id)),
556 );
557 }
558
559 let receiver_builder = self.get_dfir_mut(to);
560 if let Some(deserialize_pipeline) = deserialize {
561 receiver_builder.add_dfir(
562 parse_quote! {
563 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
564 },
565 None,
566 Some(&format!("recv{}", tag_id)),
567 );
568 } else {
569 receiver_builder.add_dfir(
570 parse_quote! {
571 #out_ident = source_stream(#source);
572 },
573 None,
574 Some(&format!("recv{}", tag_id)),
575 );
576 }
577 }
578
579 fn create_external_source(
580 &mut self,
581 on: &LocationId,
582 source_expr: syn::Expr,
583 out_ident: &syn::Ident,
584 deserialize: Option<&DebugExpr>,
585 tag_id: usize,
586 ) {
587 let receiver_builder = self.get_dfir_mut(on);
588 if let Some(deserialize_pipeline) = deserialize {
589 receiver_builder.add_dfir(
590 parse_quote! {
591 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
592 },
593 None,
594 Some(&format!("recv{}", tag_id)),
595 );
596 } else {
597 receiver_builder.add_dfir(
598 parse_quote! {
599 #out_ident = source_stream(#source_expr);
600 },
601 None,
602 Some(&format!("recv{}", tag_id)),
603 );
604 }
605 }
606
607 fn create_external_output(
608 &mut self,
609 on: &LocationId,
610 sink_expr: syn::Expr,
611 input_ident: &syn::Ident,
612 serialize: Option<&DebugExpr>,
613 tag_id: usize,
614 ) {
615 let sender_builder = self.get_dfir_mut(on);
616 if let Some(serialize_fn) = serialize {
617 sender_builder.add_dfir(
618 parse_quote! {
619 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
620 },
621 None,
622 Some(&format!("send{}", tag_id)),
624 );
625 } else {
626 sender_builder.add_dfir(
627 parse_quote! {
628 #input_ident -> dest_sink(#sink_expr);
629 },
630 None,
631 Some(&format!("send{}", tag_id)),
632 );
633 }
634 }
635}
636
637#[cfg(feature = "build")]
638pub enum BuildersOrCallback<'a, L, N>
639where
640 L: FnMut(&mut HydroRoot, &mut usize),
641 N: FnMut(&mut HydroNode, &mut usize),
642{
643 Builders(&'a mut dyn DfirBuilder),
644 Callback(L, N),
645}
646
647#[derive(Debug, Hash)]
651pub enum HydroRoot {
652 ForEach {
653 f: DebugExpr,
654 input: Box<HydroNode>,
655 op_metadata: HydroIrOpMetadata,
656 },
657 SendExternal {
658 to_external_key: LocationKey,
659 to_port_id: ExternalPortId,
660 to_many: bool,
661 unpaired: bool,
662 serialize_fn: Option<DebugExpr>,
663 instantiate_fn: DebugInstantiate,
664 input: Box<HydroNode>,
665 op_metadata: HydroIrOpMetadata,
666 },
667 DestSink {
668 sink: DebugExpr,
669 input: Box<HydroNode>,
670 op_metadata: HydroIrOpMetadata,
671 },
672 CycleSink {
673 ident: syn::Ident,
674 input: Box<HydroNode>,
675 op_metadata: HydroIrOpMetadata,
676 },
677}
678
679impl HydroRoot {
680 #[cfg(feature = "build")]
681 pub fn compile_network<'a, D>(
682 &mut self,
683 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
684 seen_tees: &mut SeenTees,
685 processes: &SparseSecondaryMap<LocationKey, D::Process>,
686 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
687 externals: &SparseSecondaryMap<LocationKey, D::External>,
688 ) where
689 D: Deploy<'a>,
690 {
691 let refcell_extra_stmts = RefCell::new(extra_stmts);
692 self.transform_bottom_up(
693 &mut |l| {
694 if let HydroRoot::SendExternal {
695 input,
696 to_external_key,
697 to_port_id,
698 to_many,
699 unpaired,
700 instantiate_fn,
701 ..
702 } = l
703 {
704 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
705 DebugInstantiate::Building => {
706 let to_node = externals
707 .get(*to_external_key)
708 .unwrap_or_else(|| {
709 panic!("A external used in the graph was not instantiated: {}", to_external_key)
710 })
711 .clone();
712
713 match input.metadata().location_id.root() {
714 &LocationId::Process(process_key) => {
715 if *to_many {
716 (
717 (
718 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
719 parse_quote!(DUMMY),
720 ),
721 Box::new(|| {}) as Box<dyn FnOnce()>,
722 )
723 } else {
724 let from_node = processes
725 .get(process_key)
726 .unwrap_or_else(|| {
727 panic!("A process used in the graph was not instantiated: {}", process_key)
728 })
729 .clone();
730
731 let sink_port = from_node.next_port();
732 let source_port = to_node.next_port();
733
734 if *unpaired {
735 use stageleft::quote_type;
736 use tokio_util::codec::LengthDelimitedCodec;
737
738 to_node.register(*to_port_id, source_port.clone());
739
740 let _ = D::e2o_source(
741 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
742 &to_node, &source_port,
743 &from_node, &sink_port,
744 "e_type::<LengthDelimitedCodec>(),
745 format!("{}_{}", *to_external_key, *to_port_id)
746 );
747 }
748
749 (
750 (
751 D::o2e_sink(
752 &from_node,
753 &sink_port,
754 &to_node,
755 &source_port,
756 format!("{}_{}", *to_external_key, *to_port_id)
757 ),
758 parse_quote!(DUMMY),
759 ),
760 if *unpaired {
761 D::e2o_connect(
762 &to_node,
763 &source_port,
764 &from_node,
765 &sink_port,
766 *to_many,
767 NetworkHint::Auto,
768 )
769 } else {
770 Box::new(|| {}) as Box<dyn FnOnce()>
771 },
772 )
773 }
774 }
775 LocationId::Cluster(_) => todo!(),
776 _ => panic!()
777 }
778 },
779
780 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
781 };
782
783 *instantiate_fn = DebugInstantiateFinalized {
784 sink: sink_expr,
785 source: source_expr,
786 connect_fn: Some(connect_fn),
787 }
788 .into();
789 }
790 },
791 &mut |n| {
792 if let HydroNode::Network {
793 input,
794 instantiate_fn,
795 metadata,
796 ..
797 } = n
798 {
799 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
800 DebugInstantiate::Building => instantiate_network::<D>(
801 input.metadata().location_id.root(),
802 metadata.location_id.root(),
803 processes,
804 clusters,
805 ),
806
807 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
808 };
809
810 *instantiate_fn = DebugInstantiateFinalized {
811 sink: sink_expr,
812 source: source_expr,
813 connect_fn: Some(connect_fn),
814 }
815 .into();
816 } else if let HydroNode::ExternalInput {
817 from_external_key,
818 from_port_id,
819 from_many,
820 codec_type,
821 port_hint,
822 instantiate_fn,
823 metadata,
824 ..
825 } = n
826 {
827 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
828 DebugInstantiate::Building => {
829 let from_node = externals
830 .get(*from_external_key)
831 .unwrap_or_else(|| {
832 panic!(
833 "A external used in the graph was not instantiated: {}",
834 from_external_key,
835 )
836 })
837 .clone();
838
839 match metadata.location_id.root() {
840 &LocationId::Process(process_key) => {
841 let to_node = processes
842 .get(process_key)
843 .unwrap_or_else(|| {
844 panic!("A process used in the graph was not instantiated: {}", process_key)
845 })
846 .clone();
847
848 let sink_port = from_node.next_port();
849 let source_port = to_node.next_port();
850
851 from_node.register(*from_port_id, sink_port.clone());
852
853 (
854 (
855 parse_quote!(DUMMY),
856 if *from_many {
857 D::e2o_many_source(
858 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
859 &to_node, &source_port,
860 codec_type.0.as_ref(),
861 format!("{}_{}", *from_external_key, *from_port_id)
862 )
863 } else {
864 D::e2o_source(
865 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
866 &from_node, &sink_port,
867 &to_node, &source_port,
868 codec_type.0.as_ref(),
869 format!("{}_{}", *from_external_key, *from_port_id)
870 )
871 },
872 ),
873 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
874 )
875 }
876 LocationId::Cluster(_) => todo!(),
877 _ => panic!()
878 }
879 },
880
881 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
882 };
883
884 *instantiate_fn = DebugInstantiateFinalized {
885 sink: sink_expr,
886 source: source_expr,
887 connect_fn: Some(connect_fn),
888 }
889 .into();
890 }
891 },
892 seen_tees,
893 false,
894 );
895 }
896
897 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
898 self.transform_bottom_up(
899 &mut |l| {
900 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
901 match instantiate_fn {
902 DebugInstantiate::Building => panic!("network not built"),
903
904 DebugInstantiate::Finalized(finalized) => {
905 (finalized.connect_fn.take().unwrap())();
906 }
907 }
908 }
909 },
910 &mut |n| {
911 if let HydroNode::Network { instantiate_fn, .. }
912 | HydroNode::ExternalInput { instantiate_fn, .. } = n
913 {
914 match instantiate_fn {
915 DebugInstantiate::Building => panic!("network not built"),
916
917 DebugInstantiate::Finalized(finalized) => {
918 (finalized.connect_fn.take().unwrap())();
919 }
920 }
921 }
922 },
923 seen_tees,
924 false,
925 );
926 }
927
928 pub fn transform_bottom_up(
929 &mut self,
930 transform_root: &mut impl FnMut(&mut HydroRoot),
931 transform_node: &mut impl FnMut(&mut HydroNode),
932 seen_tees: &mut SeenTees,
933 check_well_formed: bool,
934 ) {
935 self.transform_children(
936 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
937 seen_tees,
938 );
939
940 transform_root(self);
941 }
942
943 pub fn transform_children(
944 &mut self,
945 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
946 seen_tees: &mut SeenTees,
947 ) {
948 match self {
949 HydroRoot::ForEach { input, .. }
950 | HydroRoot::SendExternal { input, .. }
951 | HydroRoot::DestSink { input, .. }
952 | HydroRoot::CycleSink { input, .. } => {
953 transform(input, seen_tees);
954 }
955 }
956 }
957
958 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
959 match self {
960 HydroRoot::ForEach {
961 f,
962 input,
963 op_metadata,
964 } => HydroRoot::ForEach {
965 f: f.clone(),
966 input: Box::new(input.deep_clone(seen_tees)),
967 op_metadata: op_metadata.clone(),
968 },
969 HydroRoot::SendExternal {
970 to_external_key,
971 to_port_id,
972 to_many,
973 unpaired,
974 serialize_fn,
975 instantiate_fn,
976 input,
977 op_metadata,
978 } => HydroRoot::SendExternal {
979 to_external_key: *to_external_key,
980 to_port_id: *to_port_id,
981 to_many: *to_many,
982 unpaired: *unpaired,
983 serialize_fn: serialize_fn.clone(),
984 instantiate_fn: instantiate_fn.clone(),
985 input: Box::new(input.deep_clone(seen_tees)),
986 op_metadata: op_metadata.clone(),
987 },
988 HydroRoot::DestSink {
989 sink,
990 input,
991 op_metadata,
992 } => HydroRoot::DestSink {
993 sink: sink.clone(),
994 input: Box::new(input.deep_clone(seen_tees)),
995 op_metadata: op_metadata.clone(),
996 },
997 HydroRoot::CycleSink {
998 ident,
999 input,
1000 op_metadata,
1001 } => HydroRoot::CycleSink {
1002 ident: ident.clone(),
1003 input: Box::new(input.deep_clone(seen_tees)),
1004 op_metadata: op_metadata.clone(),
1005 },
1006 }
1007 }
1008
1009 #[cfg(feature = "build")]
1010 pub fn emit<'a, D: Deploy<'a>>(
1011 &mut self,
1012 graph_builders: &mut dyn DfirBuilder,
1013 seen_tees: &mut SeenTees,
1014 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1015 next_stmt_id: &mut usize,
1016 ) {
1017 self.emit_core::<D>(
1018 &mut BuildersOrCallback::<
1019 fn(&mut HydroRoot, &mut usize),
1020 fn(&mut HydroNode, &mut usize),
1021 >::Builders(graph_builders),
1022 seen_tees,
1023 built_tees,
1024 next_stmt_id,
1025 );
1026 }
1027
1028 #[cfg(feature = "build")]
1029 pub fn emit_core<'a, D: Deploy<'a>>(
1030 &mut self,
1031 builders_or_callback: &mut BuildersOrCallback<
1032 impl FnMut(&mut HydroRoot, &mut usize),
1033 impl FnMut(&mut HydroNode, &mut usize),
1034 >,
1035 seen_tees: &mut SeenTees,
1036 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1037 next_stmt_id: &mut usize,
1038 ) {
1039 match self {
1040 HydroRoot::ForEach { f, input, .. } => {
1041 let input_ident =
1042 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1043
1044 match builders_or_callback {
1045 BuildersOrCallback::Builders(graph_builders) => {
1046 graph_builders
1047 .get_dfir_mut(&input.metadata().location_id)
1048 .add_dfir(
1049 parse_quote! {
1050 #input_ident -> for_each(#f);
1051 },
1052 None,
1053 Some(&next_stmt_id.to_string()),
1054 );
1055 }
1056 BuildersOrCallback::Callback(leaf_callback, _) => {
1057 leaf_callback(self, next_stmt_id);
1058 }
1059 }
1060
1061 *next_stmt_id += 1;
1062 }
1063
1064 HydroRoot::SendExternal {
1065 serialize_fn,
1066 instantiate_fn,
1067 input,
1068 ..
1069 } => {
1070 let input_ident =
1071 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1072
1073 match builders_or_callback {
1074 BuildersOrCallback::Builders(graph_builders) => {
1075 let (sink_expr, _) = match instantiate_fn {
1076 DebugInstantiate::Building => (
1077 syn::parse_quote!(DUMMY_SINK),
1078 syn::parse_quote!(DUMMY_SOURCE),
1079 ),
1080
1081 DebugInstantiate::Finalized(finalized) => {
1082 (finalized.sink.clone(), finalized.source.clone())
1083 }
1084 };
1085
1086 graph_builders.create_external_output(
1087 &input.metadata().location_id,
1088 sink_expr,
1089 &input_ident,
1090 serialize_fn.as_ref(),
1091 *next_stmt_id,
1092 );
1093 }
1094 BuildersOrCallback::Callback(leaf_callback, _) => {
1095 leaf_callback(self, next_stmt_id);
1096 }
1097 }
1098
1099 *next_stmt_id += 1;
1100 }
1101
1102 HydroRoot::DestSink { sink, input, .. } => {
1103 let input_ident =
1104 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1105
1106 match builders_or_callback {
1107 BuildersOrCallback::Builders(graph_builders) => {
1108 graph_builders
1109 .get_dfir_mut(&input.metadata().location_id)
1110 .add_dfir(
1111 parse_quote! {
1112 #input_ident -> dest_sink(#sink);
1113 },
1114 None,
1115 Some(&next_stmt_id.to_string()),
1116 );
1117 }
1118 BuildersOrCallback::Callback(leaf_callback, _) => {
1119 leaf_callback(self, next_stmt_id);
1120 }
1121 }
1122
1123 *next_stmt_id += 1;
1124 }
1125
1126 HydroRoot::CycleSink { ident, input, .. } => {
1127 let input_ident =
1128 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1129
1130 match builders_or_callback {
1131 BuildersOrCallback::Builders(graph_builders) => {
1132 let elem_type: syn::Type = match &input.metadata().collection_kind {
1133 CollectionKind::KeyedSingleton {
1134 key_type,
1135 value_type,
1136 ..
1137 }
1138 | CollectionKind::KeyedStream {
1139 key_type,
1140 value_type,
1141 ..
1142 } => {
1143 parse_quote!((#key_type, #value_type))
1144 }
1145 CollectionKind::Stream { element_type, .. }
1146 | CollectionKind::Singleton { element_type, .. }
1147 | CollectionKind::Optional { element_type, .. } => {
1148 parse_quote!(#element_type)
1149 }
1150 };
1151
1152 graph_builders
1153 .get_dfir_mut(&input.metadata().location_id)
1154 .add_dfir(
1155 parse_quote! {
1156 #ident = #input_ident -> identity::<#elem_type>();
1157 },
1158 None,
1159 None,
1160 );
1161 }
1162 BuildersOrCallback::Callback(_, _) => {}
1164 }
1165 }
1166 }
1167 }
1168
1169 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1170 match self {
1171 HydroRoot::ForEach { op_metadata, .. }
1172 | HydroRoot::SendExternal { op_metadata, .. }
1173 | HydroRoot::DestSink { op_metadata, .. }
1174 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1175 }
1176 }
1177
1178 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1179 match self {
1180 HydroRoot::ForEach { op_metadata, .. }
1181 | HydroRoot::SendExternal { op_metadata, .. }
1182 | HydroRoot::DestSink { op_metadata, .. }
1183 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1184 }
1185 }
1186
1187 pub fn input(&self) -> &HydroNode {
1188 match self {
1189 HydroRoot::ForEach { input, .. }
1190 | HydroRoot::SendExternal { input, .. }
1191 | HydroRoot::DestSink { input, .. }
1192 | HydroRoot::CycleSink { input, .. } => input,
1193 }
1194 }
1195
1196 pub fn input_metadata(&self) -> &HydroIrMetadata {
1197 self.input().metadata()
1198 }
1199
1200 pub fn print_root(&self) -> String {
1201 match self {
1202 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1203 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1204 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1205 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1206 }
1207 }
1208
1209 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1210 match self {
1211 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1212 transform(f);
1213 }
1214 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1215 }
1216 }
1217}
1218
1219#[cfg(feature = "build")]
1220pub fn emit<'a, D: Deploy<'a>>(
1221 ir: &mut Vec<HydroRoot>,
1222) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1223 let mut builders = SecondaryMap::new();
1224 let mut seen_tees = HashMap::new();
1225 let mut built_tees = HashMap::new();
1226 let mut next_stmt_id = 0;
1227 for leaf in ir {
1228 leaf.emit::<D>(
1229 &mut builders,
1230 &mut seen_tees,
1231 &mut built_tees,
1232 &mut next_stmt_id,
1233 );
1234 }
1235 builders
1236}
1237
1238#[cfg(feature = "build")]
1239pub fn traverse_dfir<'a, D: Deploy<'a>>(
1240 ir: &mut [HydroRoot],
1241 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1242 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1243) {
1244 let mut seen_tees = HashMap::new();
1245 let mut built_tees = HashMap::new();
1246 let mut next_stmt_id = 0;
1247 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1248 ir.iter_mut().for_each(|leaf| {
1249 leaf.emit_core::<D>(
1250 &mut callback,
1251 &mut seen_tees,
1252 &mut built_tees,
1253 &mut next_stmt_id,
1254 );
1255 });
1256}
1257
1258pub fn transform_bottom_up(
1259 ir: &mut [HydroRoot],
1260 transform_root: &mut impl FnMut(&mut HydroRoot),
1261 transform_node: &mut impl FnMut(&mut HydroNode),
1262 check_well_formed: bool,
1263) {
1264 let mut seen_tees = HashMap::new();
1265 ir.iter_mut().for_each(|leaf| {
1266 leaf.transform_bottom_up(
1267 transform_root,
1268 transform_node,
1269 &mut seen_tees,
1270 check_well_formed,
1271 );
1272 });
1273}
1274
1275pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1276 let mut seen_tees = HashMap::new();
1277 ir.iter()
1278 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1279 .collect()
1280}
1281
1282type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1283thread_local! {
1284 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1285}
1286
1287pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1288 PRINTED_TEES.with(|printed_tees| {
1289 let mut printed_tees_mut = printed_tees.borrow_mut();
1290 *printed_tees_mut = Some((0, HashMap::new()));
1291 drop(printed_tees_mut);
1292
1293 let ret = f();
1294
1295 let mut printed_tees_mut = printed_tees.borrow_mut();
1296 *printed_tees_mut = None;
1297
1298 ret
1299 })
1300}
1301
1302pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1303
1304impl TeeNode {
1305 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1306 Rc::as_ptr(&self.0)
1307 }
1308}
1309
1310impl Debug for TeeNode {
1311 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1312 PRINTED_TEES.with(|printed_tees| {
1313 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1314 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1315
1316 if let Some(printed_tees_mut) = printed_tees_mut {
1317 if let Some(existing) = printed_tees_mut
1318 .1
1319 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1320 {
1321 write!(f, "<tee {}>", existing)
1322 } else {
1323 let next_id = printed_tees_mut.0;
1324 printed_tees_mut.0 += 1;
1325 printed_tees_mut
1326 .1
1327 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1328 drop(printed_tees_mut_borrow);
1329 write!(f, "<tee {}>: ", next_id)?;
1330 Debug::fmt(&self.0.borrow(), f)
1331 }
1332 } else {
1333 drop(printed_tees_mut_borrow);
1334 write!(f, "<tee>: ")?;
1335 Debug::fmt(&self.0.borrow(), f)
1336 }
1337 })
1338 }
1339}
1340
1341impl Hash for TeeNode {
1342 fn hash<H: Hasher>(&self, state: &mut H) {
1343 self.0.borrow_mut().hash(state);
1344 }
1345}
1346
1347#[derive(Clone, PartialEq, Eq, Debug)]
1348pub enum BoundKind {
1349 Unbounded,
1350 Bounded,
1351}
1352
1353#[derive(Clone, PartialEq, Eq, Debug)]
1354pub enum StreamOrder {
1355 NoOrder,
1356 TotalOrder,
1357}
1358
1359#[derive(Clone, PartialEq, Eq, Debug)]
1360pub enum StreamRetry {
1361 AtLeastOnce,
1362 ExactlyOnce,
1363}
1364
1365#[derive(Clone, PartialEq, Eq, Debug)]
1366pub enum KeyedSingletonBoundKind {
1367 Unbounded,
1368 BoundedValue,
1369 Bounded,
1370}
1371
1372#[derive(Clone, PartialEq, Eq, Debug)]
1373pub enum CollectionKind {
1374 Stream {
1375 bound: BoundKind,
1376 order: StreamOrder,
1377 retry: StreamRetry,
1378 element_type: DebugType,
1379 },
1380 Singleton {
1381 bound: BoundKind,
1382 element_type: DebugType,
1383 },
1384 Optional {
1385 bound: BoundKind,
1386 element_type: DebugType,
1387 },
1388 KeyedStream {
1389 bound: BoundKind,
1390 value_order: StreamOrder,
1391 value_retry: StreamRetry,
1392 key_type: DebugType,
1393 value_type: DebugType,
1394 },
1395 KeyedSingleton {
1396 bound: KeyedSingletonBoundKind,
1397 key_type: DebugType,
1398 value_type: DebugType,
1399 },
1400}
1401
1402impl CollectionKind {
1403 pub fn is_bounded(&self) -> bool {
1404 matches!(
1405 self,
1406 CollectionKind::Stream {
1407 bound: BoundKind::Bounded,
1408 ..
1409 } | CollectionKind::Singleton {
1410 bound: BoundKind::Bounded,
1411 ..
1412 } | CollectionKind::Optional {
1413 bound: BoundKind::Bounded,
1414 ..
1415 } | CollectionKind::KeyedStream {
1416 bound: BoundKind::Bounded,
1417 ..
1418 } | CollectionKind::KeyedSingleton {
1419 bound: KeyedSingletonBoundKind::Bounded,
1420 ..
1421 }
1422 )
1423 }
1424}
1425
1426#[derive(Clone)]
1427pub struct HydroIrMetadata {
1428 pub location_id: LocationId,
1429 pub collection_kind: CollectionKind,
1430 pub cardinality: Option<usize>,
1431 pub tag: Option<String>,
1432 pub op: HydroIrOpMetadata,
1433}
1434
1435impl Hash for HydroIrMetadata {
1437 fn hash<H: Hasher>(&self, _: &mut H) {}
1438}
1439
1440impl PartialEq for HydroIrMetadata {
1441 fn eq(&self, _: &Self) -> bool {
1442 true
1443 }
1444}
1445
1446impl Eq for HydroIrMetadata {}
1447
1448impl Debug for HydroIrMetadata {
1449 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1450 f.debug_struct("HydroIrMetadata")
1451 .field("location_id", &self.location_id)
1452 .field("collection_kind", &self.collection_kind)
1453 .finish()
1454 }
1455}
1456
1457#[derive(Clone)]
1460pub struct HydroIrOpMetadata {
1461 pub backtrace: Backtrace,
1462 pub cpu_usage: Option<f64>,
1463 pub network_recv_cpu_usage: Option<f64>,
1464 pub id: Option<usize>,
1465}
1466
1467impl HydroIrOpMetadata {
1468 #[expect(
1469 clippy::new_without_default,
1470 reason = "explicit calls to new ensure correct backtrace bounds"
1471 )]
1472 pub fn new() -> HydroIrOpMetadata {
1473 Self::new_with_skip(1)
1474 }
1475
1476 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1477 HydroIrOpMetadata {
1478 backtrace: Backtrace::get_backtrace(2 + skip_count),
1479 cpu_usage: None,
1480 network_recv_cpu_usage: None,
1481 id: None,
1482 }
1483 }
1484}
1485
1486impl Debug for HydroIrOpMetadata {
1487 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1488 f.debug_struct("HydroIrOpMetadata").finish()
1489 }
1490}
1491
1492impl Hash for HydroIrOpMetadata {
1493 fn hash<H: Hasher>(&self, _: &mut H) {}
1494}
1495
1496#[derive(Debug, Hash)]
1499pub enum HydroNode {
1500 Placeholder,
1501
1502 Cast {
1510 inner: Box<HydroNode>,
1511 metadata: HydroIrMetadata,
1512 },
1513
1514 ObserveNonDet {
1520 inner: Box<HydroNode>,
1521 trusted: bool, metadata: HydroIrMetadata,
1523 },
1524
1525 Source {
1526 source: HydroSource,
1527 metadata: HydroIrMetadata,
1528 },
1529
1530 SingletonSource {
1531 value: DebugExpr,
1532 metadata: HydroIrMetadata,
1533 },
1534
1535 CycleSource {
1536 ident: syn::Ident,
1537 metadata: HydroIrMetadata,
1538 },
1539
1540 Tee {
1541 inner: TeeNode,
1542 metadata: HydroIrMetadata,
1543 },
1544
1545 BeginAtomic {
1546 inner: Box<HydroNode>,
1547 metadata: HydroIrMetadata,
1548 },
1549
1550 EndAtomic {
1551 inner: Box<HydroNode>,
1552 metadata: HydroIrMetadata,
1553 },
1554
1555 Batch {
1556 inner: Box<HydroNode>,
1557 metadata: HydroIrMetadata,
1558 },
1559
1560 YieldConcat {
1561 inner: Box<HydroNode>,
1562 metadata: HydroIrMetadata,
1563 },
1564
1565 Chain {
1566 first: Box<HydroNode>,
1567 second: Box<HydroNode>,
1568 metadata: HydroIrMetadata,
1569 },
1570
1571 ChainFirst {
1572 first: Box<HydroNode>,
1573 second: Box<HydroNode>,
1574 metadata: HydroIrMetadata,
1575 },
1576
1577 CrossProduct {
1578 left: Box<HydroNode>,
1579 right: Box<HydroNode>,
1580 metadata: HydroIrMetadata,
1581 },
1582
1583 CrossSingleton {
1584 left: Box<HydroNode>,
1585 right: Box<HydroNode>,
1586 metadata: HydroIrMetadata,
1587 },
1588
1589 Join {
1590 left: Box<HydroNode>,
1591 right: Box<HydroNode>,
1592 metadata: HydroIrMetadata,
1593 },
1594
1595 Difference {
1596 pos: Box<HydroNode>,
1597 neg: Box<HydroNode>,
1598 metadata: HydroIrMetadata,
1599 },
1600
1601 AntiJoin {
1602 pos: Box<HydroNode>,
1603 neg: Box<HydroNode>,
1604 metadata: HydroIrMetadata,
1605 },
1606
1607 ResolveFutures {
1608 input: Box<HydroNode>,
1609 metadata: HydroIrMetadata,
1610 },
1611 ResolveFuturesOrdered {
1612 input: Box<HydroNode>,
1613 metadata: HydroIrMetadata,
1614 },
1615
1616 Map {
1617 f: DebugExpr,
1618 input: Box<HydroNode>,
1619 metadata: HydroIrMetadata,
1620 },
1621 FlatMap {
1622 f: DebugExpr,
1623 input: Box<HydroNode>,
1624 metadata: HydroIrMetadata,
1625 },
1626 Filter {
1627 f: DebugExpr,
1628 input: Box<HydroNode>,
1629 metadata: HydroIrMetadata,
1630 },
1631 FilterMap {
1632 f: DebugExpr,
1633 input: Box<HydroNode>,
1634 metadata: HydroIrMetadata,
1635 },
1636
1637 DeferTick {
1638 input: Box<HydroNode>,
1639 metadata: HydroIrMetadata,
1640 },
1641 Enumerate {
1642 input: Box<HydroNode>,
1643 metadata: HydroIrMetadata,
1644 },
1645 Inspect {
1646 f: DebugExpr,
1647 input: Box<HydroNode>,
1648 metadata: HydroIrMetadata,
1649 },
1650
1651 Unique {
1652 input: Box<HydroNode>,
1653 metadata: HydroIrMetadata,
1654 },
1655
1656 Sort {
1657 input: Box<HydroNode>,
1658 metadata: HydroIrMetadata,
1659 },
1660 Fold {
1661 init: DebugExpr,
1662 acc: DebugExpr,
1663 input: Box<HydroNode>,
1664 metadata: HydroIrMetadata,
1665 },
1666
1667 Scan {
1668 init: DebugExpr,
1669 acc: DebugExpr,
1670 input: Box<HydroNode>,
1671 metadata: HydroIrMetadata,
1672 },
1673 FoldKeyed {
1674 init: DebugExpr,
1675 acc: DebugExpr,
1676 input: Box<HydroNode>,
1677 metadata: HydroIrMetadata,
1678 },
1679
1680 Reduce {
1681 f: DebugExpr,
1682 input: Box<HydroNode>,
1683 metadata: HydroIrMetadata,
1684 },
1685 ReduceKeyed {
1686 f: DebugExpr,
1687 input: Box<HydroNode>,
1688 metadata: HydroIrMetadata,
1689 },
1690 ReduceKeyedWatermark {
1691 f: DebugExpr,
1692 input: Box<HydroNode>,
1693 watermark: Box<HydroNode>,
1694 metadata: HydroIrMetadata,
1695 },
1696
1697 Network {
1698 name: Option<String>,
1699 serialize_fn: Option<DebugExpr>,
1700 instantiate_fn: DebugInstantiate,
1701 deserialize_fn: Option<DebugExpr>,
1702 input: Box<HydroNode>,
1703 metadata: HydroIrMetadata,
1704 },
1705
1706 ExternalInput {
1707 from_external_key: LocationKey,
1708 from_port_id: ExternalPortId,
1709 from_many: bool,
1710 codec_type: DebugType,
1711 port_hint: NetworkHint,
1712 instantiate_fn: DebugInstantiate,
1713 deserialize_fn: Option<DebugExpr>,
1714 metadata: HydroIrMetadata,
1715 },
1716
1717 Counter {
1718 tag: String,
1719 duration: DebugExpr,
1720 prefix: String,
1721 input: Box<HydroNode>,
1722 metadata: HydroIrMetadata,
1723 },
1724}
1725
1726pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1727pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1728
1729impl HydroNode {
1730 pub fn transform_bottom_up(
1731 &mut self,
1732 transform: &mut impl FnMut(&mut HydroNode),
1733 seen_tees: &mut SeenTees,
1734 check_well_formed: bool,
1735 ) {
1736 self.transform_children(
1737 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1738 seen_tees,
1739 );
1740
1741 transform(self);
1742
1743 let self_location = self.metadata().location_id.root();
1744
1745 if check_well_formed {
1746 match &*self {
1747 HydroNode::Network { .. } => {}
1748 _ => {
1749 self.input_metadata().iter().for_each(|i| {
1750 if i.location_id.root() != self_location {
1751 panic!(
1752 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1753 i,
1754 i.location_id.root(),
1755 self,
1756 self_location
1757 )
1758 }
1759 });
1760 }
1761 }
1762 }
1763 }
1764
1765 #[inline(always)]
1766 pub fn transform_children(
1767 &mut self,
1768 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1769 seen_tees: &mut SeenTees,
1770 ) {
1771 match self {
1772 HydroNode::Placeholder => {
1773 panic!();
1774 }
1775
1776 HydroNode::Source { .. }
1777 | HydroNode::SingletonSource { .. }
1778 | HydroNode::CycleSource { .. }
1779 | HydroNode::ExternalInput { .. } => {}
1780
1781 HydroNode::Tee { inner, .. } => {
1782 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1783 *inner = TeeNode(transformed.clone());
1784 } else {
1785 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1786 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1787 let mut orig = inner.0.replace(HydroNode::Placeholder);
1788 transform(&mut orig, seen_tees);
1789 *transformed_cell.borrow_mut() = orig;
1790 *inner = TeeNode(transformed_cell);
1791 }
1792 }
1793
1794 HydroNode::Cast { inner, .. }
1795 | HydroNode::ObserveNonDet { inner, .. }
1796 | HydroNode::BeginAtomic { inner, .. }
1797 | HydroNode::EndAtomic { inner, .. }
1798 | HydroNode::Batch { inner, .. }
1799 | HydroNode::YieldConcat { inner, .. } => {
1800 transform(inner.as_mut(), seen_tees);
1801 }
1802
1803 HydroNode::Chain { first, second, .. } => {
1804 transform(first.as_mut(), seen_tees);
1805 transform(second.as_mut(), seen_tees);
1806 }
1807
1808 HydroNode::ChainFirst { first, second, .. } => {
1809 transform(first.as_mut(), seen_tees);
1810 transform(second.as_mut(), seen_tees);
1811 }
1812
1813 HydroNode::CrossSingleton { left, right, .. }
1814 | HydroNode::CrossProduct { left, right, .. }
1815 | HydroNode::Join { left, right, .. } => {
1816 transform(left.as_mut(), seen_tees);
1817 transform(right.as_mut(), seen_tees);
1818 }
1819
1820 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1821 transform(pos.as_mut(), seen_tees);
1822 transform(neg.as_mut(), seen_tees);
1823 }
1824
1825 HydroNode::ReduceKeyedWatermark {
1826 input, watermark, ..
1827 } => {
1828 transform(input.as_mut(), seen_tees);
1829 transform(watermark.as_mut(), seen_tees);
1830 }
1831
1832 HydroNode::Map { input, .. }
1833 | HydroNode::ResolveFutures { input, .. }
1834 | HydroNode::ResolveFuturesOrdered { input, .. }
1835 | HydroNode::FlatMap { input, .. }
1836 | HydroNode::Filter { input, .. }
1837 | HydroNode::FilterMap { input, .. }
1838 | HydroNode::Sort { input, .. }
1839 | HydroNode::DeferTick { input, .. }
1840 | HydroNode::Enumerate { input, .. }
1841 | HydroNode::Inspect { input, .. }
1842 | HydroNode::Unique { input, .. }
1843 | HydroNode::Network { input, .. }
1844 | HydroNode::Fold { input, .. }
1845 | HydroNode::Scan { input, .. }
1846 | HydroNode::FoldKeyed { input, .. }
1847 | HydroNode::Reduce { input, .. }
1848 | HydroNode::ReduceKeyed { input, .. }
1849 | HydroNode::Counter { input, .. } => {
1850 transform(input.as_mut(), seen_tees);
1851 }
1852 }
1853 }
1854
1855 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1856 match self {
1857 HydroNode::Placeholder => HydroNode::Placeholder,
1858 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1859 inner: Box::new(inner.deep_clone(seen_tees)),
1860 metadata: metadata.clone(),
1861 },
1862 HydroNode::ObserveNonDet {
1863 inner,
1864 trusted,
1865 metadata,
1866 } => HydroNode::ObserveNonDet {
1867 inner: Box::new(inner.deep_clone(seen_tees)),
1868 trusted: *trusted,
1869 metadata: metadata.clone(),
1870 },
1871 HydroNode::Source { source, metadata } => HydroNode::Source {
1872 source: source.clone(),
1873 metadata: metadata.clone(),
1874 },
1875 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1876 value: value.clone(),
1877 metadata: metadata.clone(),
1878 },
1879 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1880 ident: ident.clone(),
1881 metadata: metadata.clone(),
1882 },
1883 HydroNode::Tee { inner, metadata } => {
1884 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1885 HydroNode::Tee {
1886 inner: TeeNode(transformed.clone()),
1887 metadata: metadata.clone(),
1888 }
1889 } else {
1890 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1891 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1892 let cloned = inner.0.borrow().deep_clone(seen_tees);
1893 *new_rc.borrow_mut() = cloned;
1894 HydroNode::Tee {
1895 inner: TeeNode(new_rc),
1896 metadata: metadata.clone(),
1897 }
1898 }
1899 }
1900 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1901 inner: Box::new(inner.deep_clone(seen_tees)),
1902 metadata: metadata.clone(),
1903 },
1904 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1905 inner: Box::new(inner.deep_clone(seen_tees)),
1906 metadata: metadata.clone(),
1907 },
1908 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1909 inner: Box::new(inner.deep_clone(seen_tees)),
1910 metadata: metadata.clone(),
1911 },
1912 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1913 inner: Box::new(inner.deep_clone(seen_tees)),
1914 metadata: metadata.clone(),
1915 },
1916 HydroNode::Chain {
1917 first,
1918 second,
1919 metadata,
1920 } => HydroNode::Chain {
1921 first: Box::new(first.deep_clone(seen_tees)),
1922 second: Box::new(second.deep_clone(seen_tees)),
1923 metadata: metadata.clone(),
1924 },
1925 HydroNode::ChainFirst {
1926 first,
1927 second,
1928 metadata,
1929 } => HydroNode::ChainFirst {
1930 first: Box::new(first.deep_clone(seen_tees)),
1931 second: Box::new(second.deep_clone(seen_tees)),
1932 metadata: metadata.clone(),
1933 },
1934 HydroNode::CrossProduct {
1935 left,
1936 right,
1937 metadata,
1938 } => HydroNode::CrossProduct {
1939 left: Box::new(left.deep_clone(seen_tees)),
1940 right: Box::new(right.deep_clone(seen_tees)),
1941 metadata: metadata.clone(),
1942 },
1943 HydroNode::CrossSingleton {
1944 left,
1945 right,
1946 metadata,
1947 } => HydroNode::CrossSingleton {
1948 left: Box::new(left.deep_clone(seen_tees)),
1949 right: Box::new(right.deep_clone(seen_tees)),
1950 metadata: metadata.clone(),
1951 },
1952 HydroNode::Join {
1953 left,
1954 right,
1955 metadata,
1956 } => HydroNode::Join {
1957 left: Box::new(left.deep_clone(seen_tees)),
1958 right: Box::new(right.deep_clone(seen_tees)),
1959 metadata: metadata.clone(),
1960 },
1961 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1962 pos: Box::new(pos.deep_clone(seen_tees)),
1963 neg: Box::new(neg.deep_clone(seen_tees)),
1964 metadata: metadata.clone(),
1965 },
1966 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1967 pos: Box::new(pos.deep_clone(seen_tees)),
1968 neg: Box::new(neg.deep_clone(seen_tees)),
1969 metadata: metadata.clone(),
1970 },
1971 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1972 input: Box::new(input.deep_clone(seen_tees)),
1973 metadata: metadata.clone(),
1974 },
1975 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1976 HydroNode::ResolveFuturesOrdered {
1977 input: Box::new(input.deep_clone(seen_tees)),
1978 metadata: metadata.clone(),
1979 }
1980 }
1981 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1982 f: f.clone(),
1983 input: Box::new(input.deep_clone(seen_tees)),
1984 metadata: metadata.clone(),
1985 },
1986 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1987 f: f.clone(),
1988 input: Box::new(input.deep_clone(seen_tees)),
1989 metadata: metadata.clone(),
1990 },
1991 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1992 f: f.clone(),
1993 input: Box::new(input.deep_clone(seen_tees)),
1994 metadata: metadata.clone(),
1995 },
1996 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1997 f: f.clone(),
1998 input: Box::new(input.deep_clone(seen_tees)),
1999 metadata: metadata.clone(),
2000 },
2001 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2002 input: Box::new(input.deep_clone(seen_tees)),
2003 metadata: metadata.clone(),
2004 },
2005 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2006 input: Box::new(input.deep_clone(seen_tees)),
2007 metadata: metadata.clone(),
2008 },
2009 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2010 f: f.clone(),
2011 input: Box::new(input.deep_clone(seen_tees)),
2012 metadata: metadata.clone(),
2013 },
2014 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2015 input: Box::new(input.deep_clone(seen_tees)),
2016 metadata: metadata.clone(),
2017 },
2018 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2019 input: Box::new(input.deep_clone(seen_tees)),
2020 metadata: metadata.clone(),
2021 },
2022 HydroNode::Fold {
2023 init,
2024 acc,
2025 input,
2026 metadata,
2027 } => HydroNode::Fold {
2028 init: init.clone(),
2029 acc: acc.clone(),
2030 input: Box::new(input.deep_clone(seen_tees)),
2031 metadata: metadata.clone(),
2032 },
2033 HydroNode::Scan {
2034 init,
2035 acc,
2036 input,
2037 metadata,
2038 } => HydroNode::Scan {
2039 init: init.clone(),
2040 acc: acc.clone(),
2041 input: Box::new(input.deep_clone(seen_tees)),
2042 metadata: metadata.clone(),
2043 },
2044 HydroNode::FoldKeyed {
2045 init,
2046 acc,
2047 input,
2048 metadata,
2049 } => HydroNode::FoldKeyed {
2050 init: init.clone(),
2051 acc: acc.clone(),
2052 input: Box::new(input.deep_clone(seen_tees)),
2053 metadata: metadata.clone(),
2054 },
2055 HydroNode::ReduceKeyedWatermark {
2056 f,
2057 input,
2058 watermark,
2059 metadata,
2060 } => HydroNode::ReduceKeyedWatermark {
2061 f: f.clone(),
2062 input: Box::new(input.deep_clone(seen_tees)),
2063 watermark: Box::new(watermark.deep_clone(seen_tees)),
2064 metadata: metadata.clone(),
2065 },
2066 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2067 f: f.clone(),
2068 input: Box::new(input.deep_clone(seen_tees)),
2069 metadata: metadata.clone(),
2070 },
2071 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2072 f: f.clone(),
2073 input: Box::new(input.deep_clone(seen_tees)),
2074 metadata: metadata.clone(),
2075 },
2076 HydroNode::Network {
2077 name,
2078 serialize_fn,
2079 instantiate_fn,
2080 deserialize_fn,
2081 input,
2082 metadata,
2083 } => HydroNode::Network {
2084 name: name.clone(),
2085 serialize_fn: serialize_fn.clone(),
2086 instantiate_fn: instantiate_fn.clone(),
2087 deserialize_fn: deserialize_fn.clone(),
2088 input: Box::new(input.deep_clone(seen_tees)),
2089 metadata: metadata.clone(),
2090 },
2091 HydroNode::ExternalInput {
2092 from_external_key,
2093 from_port_id,
2094 from_many,
2095 codec_type,
2096 port_hint,
2097 instantiate_fn,
2098 deserialize_fn,
2099 metadata,
2100 } => HydroNode::ExternalInput {
2101 from_external_key: *from_external_key,
2102 from_port_id: *from_port_id,
2103 from_many: *from_many,
2104 codec_type: codec_type.clone(),
2105 port_hint: *port_hint,
2106 instantiate_fn: instantiate_fn.clone(),
2107 deserialize_fn: deserialize_fn.clone(),
2108 metadata: metadata.clone(),
2109 },
2110 HydroNode::Counter {
2111 tag,
2112 duration,
2113 prefix,
2114 input,
2115 metadata,
2116 } => HydroNode::Counter {
2117 tag: tag.clone(),
2118 duration: duration.clone(),
2119 prefix: prefix.clone(),
2120 input: Box::new(input.deep_clone(seen_tees)),
2121 metadata: metadata.clone(),
2122 },
2123 }
2124 }
2125
2126 #[cfg(feature = "build")]
2127 pub fn emit_core<'a, D: Deploy<'a>>(
2128 &mut self,
2129 builders_or_callback: &mut BuildersOrCallback<
2130 impl FnMut(&mut HydroRoot, &mut usize),
2131 impl FnMut(&mut HydroNode, &mut usize),
2132 >,
2133 seen_tees: &mut SeenTees,
2134 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2135 next_stmt_id: &mut usize,
2136 ) -> syn::Ident {
2137 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2138
2139 self.transform_bottom_up(
2140 &mut |node: &mut HydroNode| {
2141 let out_location = node.metadata().location_id.clone();
2142 match node {
2143 HydroNode::Placeholder => {
2144 panic!()
2145 }
2146
2147 HydroNode::Cast { .. } => {
2148 match builders_or_callback {
2151 BuildersOrCallback::Builders(_) => {}
2152 BuildersOrCallback::Callback(_, node_callback) => {
2153 node_callback(node, next_stmt_id);
2154 }
2155 }
2156
2157 *next_stmt_id += 1;
2158 }
2160
2161 HydroNode::ObserveNonDet {
2162 inner,
2163 trusted,
2164 metadata,
2165 ..
2166 } => {
2167 let inner_ident = ident_stack.pop().unwrap();
2168
2169 let observe_ident =
2170 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2171
2172 match builders_or_callback {
2173 BuildersOrCallback::Builders(graph_builders) => {
2174 graph_builders.observe_nondet(
2175 *trusted,
2176 &inner.metadata().location_id,
2177 inner_ident,
2178 &inner.metadata().collection_kind,
2179 &observe_ident,
2180 &metadata.collection_kind,
2181 &metadata.op,
2182 );
2183 }
2184 BuildersOrCallback::Callback(_, node_callback) => {
2185 node_callback(node, next_stmt_id);
2186 }
2187 }
2188
2189 *next_stmt_id += 1;
2190
2191 ident_stack.push(observe_ident);
2192 }
2193
2194 HydroNode::Batch {
2195 inner, metadata, ..
2196 } => {
2197 let inner_ident = ident_stack.pop().unwrap();
2198
2199 let batch_ident =
2200 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2201
2202 match builders_or_callback {
2203 BuildersOrCallback::Builders(graph_builders) => {
2204 graph_builders.batch(
2205 inner_ident,
2206 &inner.metadata().location_id,
2207 &inner.metadata().collection_kind,
2208 &batch_ident,
2209 &out_location,
2210 &metadata.op,
2211 );
2212 }
2213 BuildersOrCallback::Callback(_, node_callback) => {
2214 node_callback(node, next_stmt_id);
2215 }
2216 }
2217
2218 *next_stmt_id += 1;
2219
2220 ident_stack.push(batch_ident);
2221 }
2222
2223 HydroNode::YieldConcat { inner, .. } => {
2224 let inner_ident = ident_stack.pop().unwrap();
2225
2226 let yield_ident =
2227 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2228
2229 match builders_or_callback {
2230 BuildersOrCallback::Builders(graph_builders) => {
2231 graph_builders.yield_from_tick(
2232 inner_ident,
2233 &inner.metadata().location_id,
2234 &inner.metadata().collection_kind,
2235 &yield_ident,
2236 &out_location,
2237 );
2238 }
2239 BuildersOrCallback::Callback(_, node_callback) => {
2240 node_callback(node, next_stmt_id);
2241 }
2242 }
2243
2244 *next_stmt_id += 1;
2245
2246 ident_stack.push(yield_ident);
2247 }
2248
2249 HydroNode::BeginAtomic { inner, metadata } => {
2250 let inner_ident = ident_stack.pop().unwrap();
2251
2252 let begin_ident =
2253 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2254
2255 match builders_or_callback {
2256 BuildersOrCallback::Builders(graph_builders) => {
2257 graph_builders.begin_atomic(
2258 inner_ident,
2259 &inner.metadata().location_id,
2260 &inner.metadata().collection_kind,
2261 &begin_ident,
2262 &out_location,
2263 &metadata.op,
2264 );
2265 }
2266 BuildersOrCallback::Callback(_, node_callback) => {
2267 node_callback(node, next_stmt_id);
2268 }
2269 }
2270
2271 *next_stmt_id += 1;
2272
2273 ident_stack.push(begin_ident);
2274 }
2275
2276 HydroNode::EndAtomic { inner, .. } => {
2277 let inner_ident = ident_stack.pop().unwrap();
2278
2279 let end_ident =
2280 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2281
2282 match builders_or_callback {
2283 BuildersOrCallback::Builders(graph_builders) => {
2284 graph_builders.end_atomic(
2285 inner_ident,
2286 &inner.metadata().location_id,
2287 &inner.metadata().collection_kind,
2288 &end_ident,
2289 );
2290 }
2291 BuildersOrCallback::Callback(_, node_callback) => {
2292 node_callback(node, next_stmt_id);
2293 }
2294 }
2295
2296 *next_stmt_id += 1;
2297
2298 ident_stack.push(end_ident);
2299 }
2300
2301 HydroNode::Source {
2302 source, metadata, ..
2303 } => {
2304 if let HydroSource::ExternalNetwork() = source {
2305 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2306 } else {
2307 let source_ident =
2308 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2309
2310 let source_stmt = match source {
2311 HydroSource::Stream(expr) => {
2312 debug_assert!(metadata.location_id.is_top_level());
2313 parse_quote! {
2314 #source_ident = source_stream(#expr);
2315 }
2316 }
2317
2318 HydroSource::ExternalNetwork() => {
2319 unreachable!()
2320 }
2321
2322 HydroSource::Iter(expr) => {
2323 if metadata.location_id.is_top_level() {
2324 parse_quote! {
2325 #source_ident = source_iter(#expr);
2326 }
2327 } else {
2328 parse_quote! {
2330 #source_ident = source_iter(#expr) -> persist::<'static>();
2331 }
2332 }
2333 }
2334
2335 HydroSource::Spin() => {
2336 debug_assert!(metadata.location_id.is_top_level());
2337 parse_quote! {
2338 #source_ident = spin();
2339 }
2340 }
2341
2342 HydroSource::ClusterMembers(location_id) => {
2343 debug_assert!(metadata.location_id.is_top_level());
2344
2345 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2346 D::cluster_membership_stream(location_id),
2347 &(),
2348 );
2349
2350 parse_quote! {
2351 #source_ident = source_stream(#expr);
2352 }
2353 }
2354 };
2355
2356 match builders_or_callback {
2357 BuildersOrCallback::Builders(graph_builders) => {
2358 let builder = graph_builders.get_dfir_mut(&out_location);
2359 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2360 }
2361 BuildersOrCallback::Callback(_, node_callback) => {
2362 node_callback(node, next_stmt_id);
2363 }
2364 }
2365
2366 *next_stmt_id += 1;
2367
2368 ident_stack.push(source_ident);
2369 }
2370 }
2371
2372 HydroNode::SingletonSource { value, metadata } => {
2373 let source_ident =
2374 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2375
2376 match builders_or_callback {
2377 BuildersOrCallback::Builders(graph_builders) => {
2378 let builder = graph_builders.get_dfir_mut(&out_location);
2379
2380 if metadata.location_id.is_top_level()
2381 && metadata.collection_kind.is_bounded()
2382 {
2383 builder.add_dfir(
2384 parse_quote! {
2385 #source_ident = source_iter([#value]);
2386 },
2387 None,
2388 Some(&next_stmt_id.to_string()),
2389 );
2390 } else {
2391 builder.add_dfir(
2392 parse_quote! {
2393 #source_ident = source_iter([#value]) -> persist::<'static>();
2394 },
2395 None,
2396 Some(&next_stmt_id.to_string()),
2397 );
2398 }
2399 }
2400 BuildersOrCallback::Callback(_, node_callback) => {
2401 node_callback(node, next_stmt_id);
2402 }
2403 }
2404
2405 *next_stmt_id += 1;
2406
2407 ident_stack.push(source_ident);
2408 }
2409
2410 HydroNode::CycleSource { ident, .. } => {
2411 let ident = ident.clone();
2412
2413 match builders_or_callback {
2414 BuildersOrCallback::Builders(_) => {}
2415 BuildersOrCallback::Callback(_, node_callback) => {
2416 node_callback(node, next_stmt_id);
2417 }
2418 }
2419
2420 *next_stmt_id += 1;
2422
2423 ident_stack.push(ident);
2424 }
2425
2426 HydroNode::Tee { inner, .. } => {
2427 let ret_ident = if let Some(teed_from) =
2428 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2429 {
2430 match builders_or_callback {
2431 BuildersOrCallback::Builders(_) => {}
2432 BuildersOrCallback::Callback(_, node_callback) => {
2433 node_callback(node, next_stmt_id);
2434 }
2435 }
2436
2437 teed_from.clone()
2438 } else {
2439 let inner_ident = ident_stack.pop().unwrap();
2442
2443 let tee_ident =
2444 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2445
2446 built_tees.insert(
2447 inner.0.as_ref() as *const RefCell<HydroNode>,
2448 tee_ident.clone(),
2449 );
2450
2451 match builders_or_callback {
2452 BuildersOrCallback::Builders(graph_builders) => {
2453 let builder = graph_builders.get_dfir_mut(&out_location);
2454 builder.add_dfir(
2455 parse_quote! {
2456 #tee_ident = #inner_ident -> tee();
2457 },
2458 None,
2459 Some(&next_stmt_id.to_string()),
2460 );
2461 }
2462 BuildersOrCallback::Callback(_, node_callback) => {
2463 node_callback(node, next_stmt_id);
2464 }
2465 }
2466
2467 tee_ident
2468 };
2469
2470 *next_stmt_id += 1;
2474 ident_stack.push(ret_ident);
2475 }
2476
2477 HydroNode::Chain { .. } => {
2478 let second_ident = ident_stack.pop().unwrap();
2480 let first_ident = ident_stack.pop().unwrap();
2481
2482 let chain_ident =
2483 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2484
2485 match builders_or_callback {
2486 BuildersOrCallback::Builders(graph_builders) => {
2487 let builder = graph_builders.get_dfir_mut(&out_location);
2488 builder.add_dfir(
2489 parse_quote! {
2490 #chain_ident = chain();
2491 #first_ident -> [0]#chain_ident;
2492 #second_ident -> [1]#chain_ident;
2493 },
2494 None,
2495 Some(&next_stmt_id.to_string()),
2496 );
2497 }
2498 BuildersOrCallback::Callback(_, node_callback) => {
2499 node_callback(node, next_stmt_id);
2500 }
2501 }
2502
2503 *next_stmt_id += 1;
2504
2505 ident_stack.push(chain_ident);
2506 }
2507
2508 HydroNode::ChainFirst { .. } => {
2509 let second_ident = ident_stack.pop().unwrap();
2510 let first_ident = ident_stack.pop().unwrap();
2511
2512 let chain_ident =
2513 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2514
2515 match builders_or_callback {
2516 BuildersOrCallback::Builders(graph_builders) => {
2517 let builder = graph_builders.get_dfir_mut(&out_location);
2518 builder.add_dfir(
2519 parse_quote! {
2520 #chain_ident = chain_first_n(1);
2521 #first_ident -> [0]#chain_ident;
2522 #second_ident -> [1]#chain_ident;
2523 },
2524 None,
2525 Some(&next_stmt_id.to_string()),
2526 );
2527 }
2528 BuildersOrCallback::Callback(_, node_callback) => {
2529 node_callback(node, next_stmt_id);
2530 }
2531 }
2532
2533 *next_stmt_id += 1;
2534
2535 ident_stack.push(chain_ident);
2536 }
2537
2538 HydroNode::CrossSingleton { right, .. } => {
2539 let right_ident = ident_stack.pop().unwrap();
2540 let left_ident = ident_stack.pop().unwrap();
2541
2542 let cross_ident =
2543 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2544
2545 match builders_or_callback {
2546 BuildersOrCallback::Builders(graph_builders) => {
2547 let builder = graph_builders.get_dfir_mut(&out_location);
2548
2549 if right.metadata().location_id.is_top_level()
2550 && right.metadata().collection_kind.is_bounded()
2551 {
2552 builder.add_dfir(
2553 parse_quote! {
2554 #cross_ident = cross_singleton();
2555 #left_ident -> [input]#cross_ident;
2556 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2557 },
2558 None,
2559 Some(&next_stmt_id.to_string()),
2560 );
2561 } else {
2562 builder.add_dfir(
2563 parse_quote! {
2564 #cross_ident = cross_singleton();
2565 #left_ident -> [input]#cross_ident;
2566 #right_ident -> [single]#cross_ident;
2567 },
2568 None,
2569 Some(&next_stmt_id.to_string()),
2570 );
2571 }
2572 }
2573 BuildersOrCallback::Callback(_, node_callback) => {
2574 node_callback(node, next_stmt_id);
2575 }
2576 }
2577
2578 *next_stmt_id += 1;
2579
2580 ident_stack.push(cross_ident);
2581 }
2582
2583 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2584 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2585 parse_quote!(cross_join_multiset)
2586 } else {
2587 parse_quote!(join_multiset)
2588 };
2589
2590 let (HydroNode::CrossProduct { left, right, .. }
2591 | HydroNode::Join { left, right, .. }) = node
2592 else {
2593 unreachable!()
2594 };
2595
2596 let is_top_level = left.metadata().location_id.is_top_level()
2597 && right.metadata().location_id.is_top_level();
2598 let left_lifetime = if left.metadata().location_id.is_top_level() {
2599 quote!('static)
2600 } else {
2601 quote!('tick)
2602 };
2603
2604 let right_lifetime = if right.metadata().location_id.is_top_level() {
2605 quote!('static)
2606 } else {
2607 quote!('tick)
2608 };
2609
2610 let right_ident = ident_stack.pop().unwrap();
2611 let left_ident = ident_stack.pop().unwrap();
2612
2613 let stream_ident =
2614 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2615
2616 match builders_or_callback {
2617 BuildersOrCallback::Builders(graph_builders) => {
2618 let builder = graph_builders.get_dfir_mut(&out_location);
2619 builder.add_dfir(
2620 if is_top_level {
2621 parse_quote! {
2624 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2625 #left_ident -> [0]#stream_ident;
2626 #right_ident -> [1]#stream_ident;
2627 }
2628 } else {
2629 parse_quote! {
2630 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2631 #left_ident -> [0]#stream_ident;
2632 #right_ident -> [1]#stream_ident;
2633 }
2634 }
2635 ,
2636 None,
2637 Some(&next_stmt_id.to_string()),
2638 );
2639 }
2640 BuildersOrCallback::Callback(_, node_callback) => {
2641 node_callback(node, next_stmt_id);
2642 }
2643 }
2644
2645 *next_stmt_id += 1;
2646
2647 ident_stack.push(stream_ident);
2648 }
2649
2650 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2651 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2652 parse_quote!(difference)
2653 } else {
2654 parse_quote!(anti_join)
2655 };
2656
2657 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2658 node
2659 else {
2660 unreachable!()
2661 };
2662
2663 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2664 quote!('static)
2665 } else {
2666 quote!('tick)
2667 };
2668
2669 let neg_ident = ident_stack.pop().unwrap();
2670 let pos_ident = ident_stack.pop().unwrap();
2671
2672 let stream_ident =
2673 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2674
2675 match builders_or_callback {
2676 BuildersOrCallback::Builders(graph_builders) => {
2677 let builder = graph_builders.get_dfir_mut(&out_location);
2678 builder.add_dfir(
2679 parse_quote! {
2680 #stream_ident = #operator::<'tick, #neg_lifetime>();
2681 #pos_ident -> [pos]#stream_ident;
2682 #neg_ident -> [neg]#stream_ident;
2683 },
2684 None,
2685 Some(&next_stmt_id.to_string()),
2686 );
2687 }
2688 BuildersOrCallback::Callback(_, node_callback) => {
2689 node_callback(node, next_stmt_id);
2690 }
2691 }
2692
2693 *next_stmt_id += 1;
2694
2695 ident_stack.push(stream_ident);
2696 }
2697
2698 HydroNode::ResolveFutures { .. } => {
2699 let input_ident = ident_stack.pop().unwrap();
2700
2701 let futures_ident =
2702 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2703
2704 match builders_or_callback {
2705 BuildersOrCallback::Builders(graph_builders) => {
2706 let builder = graph_builders.get_dfir_mut(&out_location);
2707 builder.add_dfir(
2708 parse_quote! {
2709 #futures_ident = #input_ident -> resolve_futures();
2710 },
2711 None,
2712 Some(&next_stmt_id.to_string()),
2713 );
2714 }
2715 BuildersOrCallback::Callback(_, node_callback) => {
2716 node_callback(node, next_stmt_id);
2717 }
2718 }
2719
2720 *next_stmt_id += 1;
2721
2722 ident_stack.push(futures_ident);
2723 }
2724
2725 HydroNode::ResolveFuturesOrdered { .. } => {
2726 let input_ident = ident_stack.pop().unwrap();
2727
2728 let futures_ident =
2729 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2730
2731 match builders_or_callback {
2732 BuildersOrCallback::Builders(graph_builders) => {
2733 let builder = graph_builders.get_dfir_mut(&out_location);
2734 builder.add_dfir(
2735 parse_quote! {
2736 #futures_ident = #input_ident -> resolve_futures_ordered();
2737 },
2738 None,
2739 Some(&next_stmt_id.to_string()),
2740 );
2741 }
2742 BuildersOrCallback::Callback(_, node_callback) => {
2743 node_callback(node, next_stmt_id);
2744 }
2745 }
2746
2747 *next_stmt_id += 1;
2748
2749 ident_stack.push(futures_ident);
2750 }
2751
2752 HydroNode::Map { f, .. } => {
2753 let input_ident = ident_stack.pop().unwrap();
2754
2755 let map_ident =
2756 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2757
2758 match builders_or_callback {
2759 BuildersOrCallback::Builders(graph_builders) => {
2760 let builder = graph_builders.get_dfir_mut(&out_location);
2761 builder.add_dfir(
2762 parse_quote! {
2763 #map_ident = #input_ident -> map(#f);
2764 },
2765 None,
2766 Some(&next_stmt_id.to_string()),
2767 );
2768 }
2769 BuildersOrCallback::Callback(_, node_callback) => {
2770 node_callback(node, next_stmt_id);
2771 }
2772 }
2773
2774 *next_stmt_id += 1;
2775
2776 ident_stack.push(map_ident);
2777 }
2778
2779 HydroNode::FlatMap { f, .. } => {
2780 let input_ident = ident_stack.pop().unwrap();
2781
2782 let flat_map_ident =
2783 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2784
2785 match builders_or_callback {
2786 BuildersOrCallback::Builders(graph_builders) => {
2787 let builder = graph_builders.get_dfir_mut(&out_location);
2788 builder.add_dfir(
2789 parse_quote! {
2790 #flat_map_ident = #input_ident -> flat_map(#f);
2791 },
2792 None,
2793 Some(&next_stmt_id.to_string()),
2794 );
2795 }
2796 BuildersOrCallback::Callback(_, node_callback) => {
2797 node_callback(node, next_stmt_id);
2798 }
2799 }
2800
2801 *next_stmt_id += 1;
2802
2803 ident_stack.push(flat_map_ident);
2804 }
2805
2806 HydroNode::Filter { f, .. } => {
2807 let input_ident = ident_stack.pop().unwrap();
2808
2809 let filter_ident =
2810 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2811
2812 match builders_or_callback {
2813 BuildersOrCallback::Builders(graph_builders) => {
2814 let builder = graph_builders.get_dfir_mut(&out_location);
2815 builder.add_dfir(
2816 parse_quote! {
2817 #filter_ident = #input_ident -> filter(#f);
2818 },
2819 None,
2820 Some(&next_stmt_id.to_string()),
2821 );
2822 }
2823 BuildersOrCallback::Callback(_, node_callback) => {
2824 node_callback(node, next_stmt_id);
2825 }
2826 }
2827
2828 *next_stmt_id += 1;
2829
2830 ident_stack.push(filter_ident);
2831 }
2832
2833 HydroNode::FilterMap { f, .. } => {
2834 let input_ident = ident_stack.pop().unwrap();
2835
2836 let filter_map_ident =
2837 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2838
2839 match builders_or_callback {
2840 BuildersOrCallback::Builders(graph_builders) => {
2841 let builder = graph_builders.get_dfir_mut(&out_location);
2842 builder.add_dfir(
2843 parse_quote! {
2844 #filter_map_ident = #input_ident -> filter_map(#f);
2845 },
2846 None,
2847 Some(&next_stmt_id.to_string()),
2848 );
2849 }
2850 BuildersOrCallback::Callback(_, node_callback) => {
2851 node_callback(node, next_stmt_id);
2852 }
2853 }
2854
2855 *next_stmt_id += 1;
2856
2857 ident_stack.push(filter_map_ident);
2858 }
2859
2860 HydroNode::Sort { .. } => {
2861 let input_ident = ident_stack.pop().unwrap();
2862
2863 let sort_ident =
2864 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2865
2866 match builders_or_callback {
2867 BuildersOrCallback::Builders(graph_builders) => {
2868 let builder = graph_builders.get_dfir_mut(&out_location);
2869 builder.add_dfir(
2870 parse_quote! {
2871 #sort_ident = #input_ident -> sort();
2872 },
2873 None,
2874 Some(&next_stmt_id.to_string()),
2875 );
2876 }
2877 BuildersOrCallback::Callback(_, node_callback) => {
2878 node_callback(node, next_stmt_id);
2879 }
2880 }
2881
2882 *next_stmt_id += 1;
2883
2884 ident_stack.push(sort_ident);
2885 }
2886
2887 HydroNode::DeferTick { .. } => {
2888 let input_ident = ident_stack.pop().unwrap();
2889
2890 let defer_tick_ident =
2891 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2892
2893 match builders_or_callback {
2894 BuildersOrCallback::Builders(graph_builders) => {
2895 let builder = graph_builders.get_dfir_mut(&out_location);
2896 builder.add_dfir(
2897 parse_quote! {
2898 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2899 },
2900 None,
2901 Some(&next_stmt_id.to_string()),
2902 );
2903 }
2904 BuildersOrCallback::Callback(_, node_callback) => {
2905 node_callback(node, next_stmt_id);
2906 }
2907 }
2908
2909 *next_stmt_id += 1;
2910
2911 ident_stack.push(defer_tick_ident);
2912 }
2913
2914 HydroNode::Enumerate { input, .. } => {
2915 let input_ident = ident_stack.pop().unwrap();
2916
2917 let enumerate_ident =
2918 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2919
2920 match builders_or_callback {
2921 BuildersOrCallback::Builders(graph_builders) => {
2922 let builder = graph_builders.get_dfir_mut(&out_location);
2923 let lifetime = if input.metadata().location_id.is_top_level() {
2924 quote!('static)
2925 } else {
2926 quote!('tick)
2927 };
2928 builder.add_dfir(
2929 parse_quote! {
2930 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2931 },
2932 None,
2933 Some(&next_stmt_id.to_string()),
2934 );
2935 }
2936 BuildersOrCallback::Callback(_, node_callback) => {
2937 node_callback(node, next_stmt_id);
2938 }
2939 }
2940
2941 *next_stmt_id += 1;
2942
2943 ident_stack.push(enumerate_ident);
2944 }
2945
2946 HydroNode::Inspect { f, .. } => {
2947 let input_ident = ident_stack.pop().unwrap();
2948
2949 let inspect_ident =
2950 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2951
2952 match builders_or_callback {
2953 BuildersOrCallback::Builders(graph_builders) => {
2954 let builder = graph_builders.get_dfir_mut(&out_location);
2955 builder.add_dfir(
2956 parse_quote! {
2957 #inspect_ident = #input_ident -> inspect(#f);
2958 },
2959 None,
2960 Some(&next_stmt_id.to_string()),
2961 );
2962 }
2963 BuildersOrCallback::Callback(_, node_callback) => {
2964 node_callback(node, next_stmt_id);
2965 }
2966 }
2967
2968 *next_stmt_id += 1;
2969
2970 ident_stack.push(inspect_ident);
2971 }
2972
2973 HydroNode::Unique { input, .. } => {
2974 let input_ident = ident_stack.pop().unwrap();
2975
2976 let unique_ident =
2977 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2978
2979 match builders_or_callback {
2980 BuildersOrCallback::Builders(graph_builders) => {
2981 let builder = graph_builders.get_dfir_mut(&out_location);
2982 let lifetime = if input.metadata().location_id.is_top_level() {
2983 quote!('static)
2984 } else {
2985 quote!('tick)
2986 };
2987
2988 builder.add_dfir(
2989 parse_quote! {
2990 #unique_ident = #input_ident -> unique::<#lifetime>();
2991 },
2992 None,
2993 Some(&next_stmt_id.to_string()),
2994 );
2995 }
2996 BuildersOrCallback::Callback(_, node_callback) => {
2997 node_callback(node, next_stmt_id);
2998 }
2999 }
3000
3001 *next_stmt_id += 1;
3002
3003 ident_stack.push(unique_ident);
3004 }
3005
3006 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3007 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3008 if input.metadata().location_id.is_top_level()
3009 && input.metadata().collection_kind.is_bounded()
3010 {
3011 parse_quote!(fold_no_replay)
3012 } else {
3013 parse_quote!(fold)
3014 }
3015 } else if matches!(node, HydroNode::Scan { .. }) {
3016 parse_quote!(scan)
3017 } else if let HydroNode::FoldKeyed { input, .. } = node {
3018 if input.metadata().location_id.is_top_level()
3019 && input.metadata().collection_kind.is_bounded()
3020 {
3021 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3022 } else {
3023 parse_quote!(fold_keyed)
3024 }
3025 } else {
3026 unreachable!()
3027 };
3028
3029 let (HydroNode::Fold { input, .. }
3030 | HydroNode::FoldKeyed { input, .. }
3031 | HydroNode::Scan { input, .. }) = node
3032 else {
3033 unreachable!()
3034 };
3035
3036 let lifetime = if input.metadata().location_id.is_top_level() {
3037 quote!('static)
3038 } else {
3039 quote!('tick)
3040 };
3041
3042 let input_ident = ident_stack.pop().unwrap();
3043
3044 let (HydroNode::Fold { init, acc, .. }
3045 | HydroNode::FoldKeyed { init, acc, .. }
3046 | HydroNode::Scan { init, acc, .. }) = &*node
3047 else {
3048 unreachable!()
3049 };
3050
3051 let fold_ident =
3052 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3053
3054 match builders_or_callback {
3055 BuildersOrCallback::Builders(graph_builders) => {
3056 if matches!(node, HydroNode::Fold { .. })
3057 && node.metadata().location_id.is_top_level()
3058 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3059 && graph_builders.singleton_intermediates()
3060 && !node.metadata().collection_kind.is_bounded()
3061 {
3062 let builder = graph_builders.get_dfir_mut(&out_location);
3063
3064 let acc: syn::Expr = parse_quote!({
3065 let mut __inner = #acc;
3066 move |__state, __value| {
3067 __inner(__state, __value);
3068 Some(__state.clone())
3069 }
3070 });
3071
3072 builder.add_dfir(
3073 parse_quote! {
3074 source_iter([(#init)()]) -> [0]#fold_ident;
3075 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3076 #fold_ident = chain();
3077 },
3078 None,
3079 Some(&next_stmt_id.to_string()),
3080 );
3081 } else if matches!(node, HydroNode::FoldKeyed { .. })
3082 && node.metadata().location_id.is_top_level()
3083 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3084 && graph_builders.singleton_intermediates()
3085 && !node.metadata().collection_kind.is_bounded()
3086 {
3087 let builder = graph_builders.get_dfir_mut(&out_location);
3088
3089 let acc: syn::Expr = parse_quote!({
3090 let mut __init = #init;
3091 let mut __inner = #acc;
3092 move |__state, __kv: (_, _)| {
3093 let __state = __state
3095 .entry(::std::clone::Clone::clone(&__kv.0))
3096 .or_insert_with(|| (__init)());
3097 __inner(__state, __kv.1);
3098 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3099 }
3100 });
3101
3102 builder.add_dfir(
3103 parse_quote! {
3104 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3105 },
3106 None,
3107 Some(&next_stmt_id.to_string()),
3108 );
3109 } else {
3110 let builder = graph_builders.get_dfir_mut(&out_location);
3111 builder.add_dfir(
3112 parse_quote! {
3113 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3114 },
3115 None,
3116 Some(&next_stmt_id.to_string()),
3117 );
3118 }
3119 }
3120 BuildersOrCallback::Callback(_, node_callback) => {
3121 node_callback(node, next_stmt_id);
3122 }
3123 }
3124
3125 *next_stmt_id += 1;
3126
3127 ident_stack.push(fold_ident);
3128 }
3129
3130 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3131 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3132 if input.metadata().location_id.is_top_level()
3133 && input.metadata().collection_kind.is_bounded()
3134 {
3135 parse_quote!(reduce_no_replay)
3136 } else {
3137 parse_quote!(reduce)
3138 }
3139 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3140 if input.metadata().location_id.is_top_level()
3141 && input.metadata().collection_kind.is_bounded()
3142 {
3143 todo!(
3144 "Calling keyed reduce on a top-level bounded collection is not supported"
3145 )
3146 } else {
3147 parse_quote!(reduce_keyed)
3148 }
3149 } else {
3150 unreachable!()
3151 };
3152
3153 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3154 else {
3155 unreachable!()
3156 };
3157
3158 let lifetime = if input.metadata().location_id.is_top_level() {
3159 quote!('static)
3160 } else {
3161 quote!('tick)
3162 };
3163
3164 let input_ident = ident_stack.pop().unwrap();
3165
3166 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3167 else {
3168 unreachable!()
3169 };
3170
3171 let reduce_ident =
3172 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3173
3174 match builders_or_callback {
3175 BuildersOrCallback::Builders(graph_builders) => {
3176 if matches!(node, HydroNode::Reduce { .. })
3177 && node.metadata().location_id.is_top_level()
3178 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3179 && graph_builders.singleton_intermediates()
3180 && !node.metadata().collection_kind.is_bounded()
3181 {
3182 todo!(
3183 "Reduce with optional intermediates is not yet supported in simulator"
3184 );
3185 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3186 && node.metadata().location_id.is_top_level()
3187 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3188 && graph_builders.singleton_intermediates()
3189 && !node.metadata().collection_kind.is_bounded()
3190 {
3191 todo!(
3192 "Reduce keyed with optional intermediates is not yet supported in simulator"
3193 );
3194 } else {
3195 let builder = graph_builders.get_dfir_mut(&out_location);
3196 builder.add_dfir(
3197 parse_quote! {
3198 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3199 },
3200 None,
3201 Some(&next_stmt_id.to_string()),
3202 );
3203 }
3204 }
3205 BuildersOrCallback::Callback(_, node_callback) => {
3206 node_callback(node, next_stmt_id);
3207 }
3208 }
3209
3210 *next_stmt_id += 1;
3211
3212 ident_stack.push(reduce_ident);
3213 }
3214
3215 HydroNode::ReduceKeyedWatermark {
3216 f,
3217 input,
3218 metadata,
3219 ..
3220 } => {
3221 let lifetime = if input.metadata().location_id.is_top_level() {
3222 quote!('static)
3223 } else {
3224 quote!('tick)
3225 };
3226
3227 let watermark_ident = ident_stack.pop().unwrap();
3229 let input_ident = ident_stack.pop().unwrap();
3230
3231 let chain_ident = syn::Ident::new(
3232 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3233 Span::call_site(),
3234 );
3235
3236 let fold_ident =
3237 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3238
3239 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3240 && input.metadata().collection_kind.is_bounded()
3241 {
3242 parse_quote!(fold_no_replay)
3243 } else {
3244 parse_quote!(fold)
3245 };
3246
3247 match builders_or_callback {
3248 BuildersOrCallback::Builders(graph_builders) => {
3249 if metadata.location_id.is_top_level()
3250 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3251 && graph_builders.singleton_intermediates()
3252 && !metadata.collection_kind.is_bounded()
3253 {
3254 todo!(
3255 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3256 )
3257 } else {
3258 let builder = graph_builders.get_dfir_mut(&out_location);
3259 builder.add_dfir(
3260 parse_quote! {
3261 #chain_ident = chain();
3262 #input_ident
3263 -> map(|x| (Some(x), None))
3264 -> [0]#chain_ident;
3265 #watermark_ident
3266 -> map(|watermark| (None, Some(watermark)))
3267 -> [1]#chain_ident;
3268
3269 #fold_ident = #chain_ident
3270 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3271 let __reduce_keyed_fn = #f;
3272 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3273 if let Some((k, v)) = opt_payload {
3274 if let Some(curr_watermark) = *opt_curr_watermark {
3275 if k <= curr_watermark {
3276 return;
3277 }
3278 }
3279 match map.entry(k) {
3280 ::std::collections::hash_map::Entry::Vacant(e) => {
3281 e.insert(v);
3282 }
3283 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3284 __reduce_keyed_fn(e.get_mut(), v);
3285 }
3286 }
3287 } else {
3288 let watermark = opt_watermark.unwrap();
3289 if let Some(curr_watermark) = *opt_curr_watermark {
3290 if watermark <= curr_watermark {
3291 return;
3292 }
3293 }
3294 *opt_curr_watermark = opt_watermark;
3295 map.retain(|k, _| *k > watermark);
3296 }
3297 }
3298 })
3299 -> flat_map(|(map, _curr_watermark)| map);
3300 },
3301 None,
3302 Some(&next_stmt_id.to_string()),
3303 );
3304 }
3305 }
3306 BuildersOrCallback::Callback(_, node_callback) => {
3307 node_callback(node, next_stmt_id);
3308 }
3309 }
3310
3311 *next_stmt_id += 1;
3312
3313 ident_stack.push(fold_ident);
3314 }
3315
3316 HydroNode::Network {
3317 serialize_fn: serialize_pipeline,
3318 instantiate_fn,
3319 deserialize_fn: deserialize_pipeline,
3320 input,
3321 ..
3322 } => {
3323 let input_ident = ident_stack.pop().unwrap();
3324
3325 let receiver_stream_ident =
3326 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3327
3328 match builders_or_callback {
3329 BuildersOrCallback::Builders(graph_builders) => {
3330 let (sink_expr, source_expr) = match instantiate_fn {
3331 DebugInstantiate::Building => (
3332 syn::parse_quote!(DUMMY_SINK),
3333 syn::parse_quote!(DUMMY_SOURCE),
3334 ),
3335
3336 DebugInstantiate::Finalized(finalized) => {
3337 (finalized.sink.clone(), finalized.source.clone())
3338 }
3339 };
3340
3341 graph_builders.create_network(
3342 &input.metadata().location_id,
3343 &out_location,
3344 input_ident,
3345 &receiver_stream_ident,
3346 serialize_pipeline.as_ref(),
3347 sink_expr,
3348 source_expr,
3349 deserialize_pipeline.as_ref(),
3350 *next_stmt_id,
3351 );
3352 }
3353 BuildersOrCallback::Callback(_, node_callback) => {
3354 node_callback(node, next_stmt_id);
3355 }
3356 }
3357
3358 *next_stmt_id += 1;
3359
3360 ident_stack.push(receiver_stream_ident);
3361 }
3362
3363 HydroNode::ExternalInput {
3364 instantiate_fn,
3365 deserialize_fn: deserialize_pipeline,
3366 ..
3367 } => {
3368 let receiver_stream_ident =
3369 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3370
3371 match builders_or_callback {
3372 BuildersOrCallback::Builders(graph_builders) => {
3373 let (_, source_expr) = match instantiate_fn {
3374 DebugInstantiate::Building => (
3375 syn::parse_quote!(DUMMY_SINK),
3376 syn::parse_quote!(DUMMY_SOURCE),
3377 ),
3378
3379 DebugInstantiate::Finalized(finalized) => {
3380 (finalized.sink.clone(), finalized.source.clone())
3381 }
3382 };
3383
3384 graph_builders.create_external_source(
3385 &out_location,
3386 source_expr,
3387 &receiver_stream_ident,
3388 deserialize_pipeline.as_ref(),
3389 *next_stmt_id,
3390 );
3391 }
3392 BuildersOrCallback::Callback(_, node_callback) => {
3393 node_callback(node, next_stmt_id);
3394 }
3395 }
3396
3397 *next_stmt_id += 1;
3398
3399 ident_stack.push(receiver_stream_ident);
3400 }
3401
3402 HydroNode::Counter {
3403 tag,
3404 duration,
3405 prefix,
3406 ..
3407 } => {
3408 let input_ident = ident_stack.pop().unwrap();
3409
3410 let counter_ident =
3411 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3412
3413 match builders_or_callback {
3414 BuildersOrCallback::Builders(graph_builders) => {
3415 let builder = graph_builders.get_dfir_mut(&out_location);
3416 builder.add_dfir(
3417 parse_quote! {
3418 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3419 },
3420 None,
3421 Some(&next_stmt_id.to_string()),
3422 );
3423 }
3424 BuildersOrCallback::Callback(_, node_callback) => {
3425 node_callback(node, next_stmt_id);
3426 }
3427 }
3428
3429 *next_stmt_id += 1;
3430
3431 ident_stack.push(counter_ident);
3432 }
3433 }
3434 },
3435 seen_tees,
3436 false,
3437 );
3438
3439 ident_stack
3440 .pop()
3441 .expect("ident_stack should have exactly one element after traversal")
3442 }
3443
3444 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3445 match self {
3446 HydroNode::Placeholder => {
3447 panic!()
3448 }
3449 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3450 HydroNode::Source { source, .. } => match source {
3451 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3452 HydroSource::ExternalNetwork()
3453 | HydroSource::Spin()
3454 | HydroSource::ClusterMembers(_) => {} },
3456 HydroNode::SingletonSource { value, .. } => {
3457 transform(value);
3458 }
3459 HydroNode::CycleSource { .. }
3460 | HydroNode::Tee { .. }
3461 | HydroNode::YieldConcat { .. }
3462 | HydroNode::BeginAtomic { .. }
3463 | HydroNode::EndAtomic { .. }
3464 | HydroNode::Batch { .. }
3465 | HydroNode::Chain { .. }
3466 | HydroNode::ChainFirst { .. }
3467 | HydroNode::CrossProduct { .. }
3468 | HydroNode::CrossSingleton { .. }
3469 | HydroNode::ResolveFutures { .. }
3470 | HydroNode::ResolveFuturesOrdered { .. }
3471 | HydroNode::Join { .. }
3472 | HydroNode::Difference { .. }
3473 | HydroNode::AntiJoin { .. }
3474 | HydroNode::DeferTick { .. }
3475 | HydroNode::Enumerate { .. }
3476 | HydroNode::Unique { .. }
3477 | HydroNode::Sort { .. } => {}
3478 HydroNode::Map { f, .. }
3479 | HydroNode::FlatMap { f, .. }
3480 | HydroNode::Filter { f, .. }
3481 | HydroNode::FilterMap { f, .. }
3482 | HydroNode::Inspect { f, .. }
3483 | HydroNode::Reduce { f, .. }
3484 | HydroNode::ReduceKeyed { f, .. }
3485 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3486 transform(f);
3487 }
3488 HydroNode::Fold { init, acc, .. }
3489 | HydroNode::Scan { init, acc, .. }
3490 | HydroNode::FoldKeyed { init, acc, .. } => {
3491 transform(init);
3492 transform(acc);
3493 }
3494 HydroNode::Network {
3495 serialize_fn,
3496 deserialize_fn,
3497 ..
3498 } => {
3499 if let Some(serialize_fn) = serialize_fn {
3500 transform(serialize_fn);
3501 }
3502 if let Some(deserialize_fn) = deserialize_fn {
3503 transform(deserialize_fn);
3504 }
3505 }
3506 HydroNode::ExternalInput { deserialize_fn, .. } => {
3507 if let Some(deserialize_fn) = deserialize_fn {
3508 transform(deserialize_fn);
3509 }
3510 }
3511 HydroNode::Counter { duration, .. } => {
3512 transform(duration);
3513 }
3514 }
3515 }
3516
3517 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3518 &self.metadata().op
3519 }
3520
3521 pub fn metadata(&self) -> &HydroIrMetadata {
3522 match self {
3523 HydroNode::Placeholder => {
3524 panic!()
3525 }
3526 HydroNode::Cast { metadata, .. } => metadata,
3527 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3528 HydroNode::Source { metadata, .. } => metadata,
3529 HydroNode::SingletonSource { metadata, .. } => metadata,
3530 HydroNode::CycleSource { metadata, .. } => metadata,
3531 HydroNode::Tee { metadata, .. } => metadata,
3532 HydroNode::YieldConcat { metadata, .. } => metadata,
3533 HydroNode::BeginAtomic { metadata, .. } => metadata,
3534 HydroNode::EndAtomic { metadata, .. } => metadata,
3535 HydroNode::Batch { metadata, .. } => metadata,
3536 HydroNode::Chain { metadata, .. } => metadata,
3537 HydroNode::ChainFirst { metadata, .. } => metadata,
3538 HydroNode::CrossProduct { metadata, .. } => metadata,
3539 HydroNode::CrossSingleton { metadata, .. } => metadata,
3540 HydroNode::Join { metadata, .. } => metadata,
3541 HydroNode::Difference { metadata, .. } => metadata,
3542 HydroNode::AntiJoin { metadata, .. } => metadata,
3543 HydroNode::ResolveFutures { metadata, .. } => metadata,
3544 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3545 HydroNode::Map { metadata, .. } => metadata,
3546 HydroNode::FlatMap { metadata, .. } => metadata,
3547 HydroNode::Filter { metadata, .. } => metadata,
3548 HydroNode::FilterMap { metadata, .. } => metadata,
3549 HydroNode::DeferTick { metadata, .. } => metadata,
3550 HydroNode::Enumerate { metadata, .. } => metadata,
3551 HydroNode::Inspect { metadata, .. } => metadata,
3552 HydroNode::Unique { metadata, .. } => metadata,
3553 HydroNode::Sort { metadata, .. } => metadata,
3554 HydroNode::Scan { metadata, .. } => metadata,
3555 HydroNode::Fold { metadata, .. } => metadata,
3556 HydroNode::FoldKeyed { metadata, .. } => metadata,
3557 HydroNode::Reduce { metadata, .. } => metadata,
3558 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3559 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3560 HydroNode::ExternalInput { metadata, .. } => metadata,
3561 HydroNode::Network { metadata, .. } => metadata,
3562 HydroNode::Counter { metadata, .. } => metadata,
3563 }
3564 }
3565
3566 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3567 &mut self.metadata_mut().op
3568 }
3569
3570 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3571 match self {
3572 HydroNode::Placeholder => {
3573 panic!()
3574 }
3575 HydroNode::Cast { metadata, .. } => metadata,
3576 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3577 HydroNode::Source { metadata, .. } => metadata,
3578 HydroNode::SingletonSource { metadata, .. } => metadata,
3579 HydroNode::CycleSource { metadata, .. } => metadata,
3580 HydroNode::Tee { metadata, .. } => metadata,
3581 HydroNode::YieldConcat { metadata, .. } => metadata,
3582 HydroNode::BeginAtomic { metadata, .. } => metadata,
3583 HydroNode::EndAtomic { metadata, .. } => metadata,
3584 HydroNode::Batch { metadata, .. } => metadata,
3585 HydroNode::Chain { metadata, .. } => metadata,
3586 HydroNode::ChainFirst { metadata, .. } => metadata,
3587 HydroNode::CrossProduct { metadata, .. } => metadata,
3588 HydroNode::CrossSingleton { metadata, .. } => metadata,
3589 HydroNode::Join { metadata, .. } => metadata,
3590 HydroNode::Difference { metadata, .. } => metadata,
3591 HydroNode::AntiJoin { metadata, .. } => metadata,
3592 HydroNode::ResolveFutures { metadata, .. } => metadata,
3593 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3594 HydroNode::Map { metadata, .. } => metadata,
3595 HydroNode::FlatMap { metadata, .. } => metadata,
3596 HydroNode::Filter { metadata, .. } => metadata,
3597 HydroNode::FilterMap { metadata, .. } => metadata,
3598 HydroNode::DeferTick { metadata, .. } => metadata,
3599 HydroNode::Enumerate { metadata, .. } => metadata,
3600 HydroNode::Inspect { metadata, .. } => metadata,
3601 HydroNode::Unique { metadata, .. } => metadata,
3602 HydroNode::Sort { metadata, .. } => metadata,
3603 HydroNode::Scan { metadata, .. } => metadata,
3604 HydroNode::Fold { metadata, .. } => metadata,
3605 HydroNode::FoldKeyed { metadata, .. } => metadata,
3606 HydroNode::Reduce { metadata, .. } => metadata,
3607 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3608 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3609 HydroNode::ExternalInput { metadata, .. } => metadata,
3610 HydroNode::Network { metadata, .. } => metadata,
3611 HydroNode::Counter { metadata, .. } => metadata,
3612 }
3613 }
3614
3615 pub fn input(&self) -> Vec<&HydroNode> {
3616 match self {
3617 HydroNode::Placeholder => {
3618 panic!()
3619 }
3620 HydroNode::Source { .. }
3621 | HydroNode::SingletonSource { .. }
3622 | HydroNode::ExternalInput { .. }
3623 | HydroNode::CycleSource { .. }
3624 | HydroNode::Tee { .. } => {
3625 vec![]
3627 }
3628 HydroNode::Cast { inner, .. }
3629 | HydroNode::ObserveNonDet { inner, .. }
3630 | HydroNode::YieldConcat { inner, .. }
3631 | HydroNode::BeginAtomic { inner, .. }
3632 | HydroNode::EndAtomic { inner, .. }
3633 | HydroNode::Batch { inner, .. } => {
3634 vec![inner]
3635 }
3636 HydroNode::Chain { first, second, .. } => {
3637 vec![first, second]
3638 }
3639 HydroNode::ChainFirst { first, second, .. } => {
3640 vec![first, second]
3641 }
3642 HydroNode::CrossProduct { left, right, .. }
3643 | HydroNode::CrossSingleton { left, right, .. }
3644 | HydroNode::Join { left, right, .. } => {
3645 vec![left, right]
3646 }
3647 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3648 vec![pos, neg]
3649 }
3650 HydroNode::Map { input, .. }
3651 | HydroNode::FlatMap { input, .. }
3652 | HydroNode::Filter { input, .. }
3653 | HydroNode::FilterMap { input, .. }
3654 | HydroNode::Sort { input, .. }
3655 | HydroNode::DeferTick { input, .. }
3656 | HydroNode::Enumerate { input, .. }
3657 | HydroNode::Inspect { input, .. }
3658 | HydroNode::Unique { input, .. }
3659 | HydroNode::Network { input, .. }
3660 | HydroNode::Counter { input, .. }
3661 | HydroNode::ResolveFutures { input, .. }
3662 | HydroNode::ResolveFuturesOrdered { input, .. }
3663 | HydroNode::Fold { input, .. }
3664 | HydroNode::FoldKeyed { input, .. }
3665 | HydroNode::Reduce { input, .. }
3666 | HydroNode::ReduceKeyed { input, .. }
3667 | HydroNode::Scan { input, .. } => {
3668 vec![input]
3669 }
3670 HydroNode::ReduceKeyedWatermark {
3671 input, watermark, ..
3672 } => {
3673 vec![input, watermark]
3674 }
3675 }
3676 }
3677
3678 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3679 self.input()
3680 .iter()
3681 .map(|input_node| input_node.metadata())
3682 .collect()
3683 }
3684
3685 pub fn print_root(&self) -> String {
3686 match self {
3687 HydroNode::Placeholder => {
3688 panic!()
3689 }
3690 HydroNode::Cast { .. } => "Cast()".to_owned(),
3691 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3692 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3693 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3694 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3695 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3696 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3697 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3698 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3699 HydroNode::Batch { .. } => "Batch()".to_owned(),
3700 HydroNode::Chain { first, second, .. } => {
3701 format!("Chain({}, {})", first.print_root(), second.print_root())
3702 }
3703 HydroNode::ChainFirst { first, second, .. } => {
3704 format!(
3705 "ChainFirst({}, {})",
3706 first.print_root(),
3707 second.print_root()
3708 )
3709 }
3710 HydroNode::CrossProduct { left, right, .. } => {
3711 format!(
3712 "CrossProduct({}, {})",
3713 left.print_root(),
3714 right.print_root()
3715 )
3716 }
3717 HydroNode::CrossSingleton { left, right, .. } => {
3718 format!(
3719 "CrossSingleton({}, {})",
3720 left.print_root(),
3721 right.print_root()
3722 )
3723 }
3724 HydroNode::Join { left, right, .. } => {
3725 format!("Join({}, {})", left.print_root(), right.print_root())
3726 }
3727 HydroNode::Difference { pos, neg, .. } => {
3728 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3729 }
3730 HydroNode::AntiJoin { pos, neg, .. } => {
3731 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3732 }
3733 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3734 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3735 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3736 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3737 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3738 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3739 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3740 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3741 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3742 HydroNode::Unique { .. } => "Unique()".to_owned(),
3743 HydroNode::Sort { .. } => "Sort()".to_owned(),
3744 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3745 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3746 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3747 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3748 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3749 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3750 HydroNode::Network { .. } => "Network()".to_owned(),
3751 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3752 HydroNode::Counter { tag, duration, .. } => {
3753 format!("Counter({:?}, {:?})", tag, duration)
3754 }
3755 }
3756 }
3757}
3758
3759#[cfg(feature = "build")]
3760fn instantiate_network<'a, D>(
3761 from_location: &LocationId,
3762 to_location: &LocationId,
3763 processes: &SparseSecondaryMap<LocationKey, D::Process>,
3764 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3765) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3766where
3767 D: Deploy<'a>,
3768{
3769 let ((sink, source), connect_fn) = match (from_location, to_location) {
3770 (&LocationId::Process(from), &LocationId::Process(to)) => {
3771 let from_node = processes
3772 .get(from)
3773 .unwrap_or_else(|| {
3774 panic!("A process used in the graph was not instantiated: {}", from)
3775 })
3776 .clone();
3777 let to_node = processes
3778 .get(to)
3779 .unwrap_or_else(|| {
3780 panic!("A process used in the graph was not instantiated: {}", to)
3781 })
3782 .clone();
3783
3784 let sink_port = from_node.next_port();
3785 let source_port = to_node.next_port();
3786
3787 (
3788 D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3789 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3790 )
3791 }
3792 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3793 let from_node = processes
3794 .get(from)
3795 .unwrap_or_else(|| {
3796 panic!("A process used in the graph was not instantiated: {}", from)
3797 })
3798 .clone();
3799 let to_node = clusters
3800 .get(to)
3801 .unwrap_or_else(|| {
3802 panic!("A cluster used in the graph was not instantiated: {}", to)
3803 })
3804 .clone();
3805
3806 let sink_port = from_node.next_port();
3807 let source_port = to_node.next_port();
3808
3809 (
3810 D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3811 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3812 )
3813 }
3814 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3815 let from_node = clusters
3816 .get(from)
3817 .unwrap_or_else(|| {
3818 panic!("A cluster used in the graph was not instantiated: {}", from)
3819 })
3820 .clone();
3821 let to_node = processes
3822 .get(to)
3823 .unwrap_or_else(|| {
3824 panic!("A process used in the graph was not instantiated: {}", to)
3825 })
3826 .clone();
3827
3828 let sink_port = from_node.next_port();
3829 let source_port = to_node.next_port();
3830
3831 (
3832 D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3833 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3834 )
3835 }
3836 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3837 let from_node = clusters
3838 .get(from)
3839 .unwrap_or_else(|| {
3840 panic!("A cluster used in the graph was not instantiated: {}", from)
3841 })
3842 .clone();
3843 let to_node = clusters
3844 .get(to)
3845 .unwrap_or_else(|| {
3846 panic!("A cluster used in the graph was not instantiated: {}", to)
3847 })
3848 .clone();
3849
3850 let sink_port = from_node.next_port();
3851 let source_port = to_node.next_port();
3852
3853 (
3854 D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3855 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3856 )
3857 }
3858 (LocationId::Tick(_, _), _) => panic!(),
3859 (_, LocationId::Tick(_, _)) => panic!(),
3860 (LocationId::Atomic(_), _) => panic!(),
3861 (_, LocationId::Atomic(_)) => panic!(),
3862 };
3863 (sink, source, connect_fn)
3864}
3865
3866#[cfg(test)]
3867mod test {
3868 use std::mem::size_of;
3869
3870 use stageleft::{QuotedWithContext, q};
3871
3872 use super::*;
3873
3874 #[test]
3875 #[cfg_attr(
3876 not(feature = "build"),
3877 ignore = "expects inclusion of feature-gated fields"
3878 )]
3879 fn hydro_node_size() {
3880 assert_eq!(size_of::<HydroNode>(), 240);
3881 }
3882
3883 #[test]
3884 #[cfg_attr(
3885 not(feature = "build"),
3886 ignore = "expects inclusion of feature-gated fields"
3887 )]
3888 fn hydro_root_size() {
3889 assert_eq!(size_of::<HydroRoot>(), 136);
3890 }
3891
3892 #[test]
3893 fn test_simplify_q_macro_basic() {
3894 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3896 let result = simplify_q_macro(simple_expr.clone());
3897 assert_eq!(result, simple_expr);
3898 }
3899
3900 #[test]
3901 fn test_simplify_q_macro_actual_stageleft_call() {
3902 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3904 let result = simplify_q_macro(stageleft_call);
3905 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3908 }
3909
3910 #[test]
3911 fn test_closure_no_pipe_at_start() {
3912 let stageleft_call = q!({
3914 let foo = 123;
3915 move |b: usize| b + foo
3916 })
3917 .splice_fn1_ctx(&());
3918 let result = simplify_q_macro(stageleft_call);
3919 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3920 }
3921}