1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39 fn from(expr: syn::Expr) -> Self {
40 Self(Box::new(expr))
41 }
42}
43
44impl Deref for DebugExpr {
45 type Target = syn::Expr;
46
47 fn deref(&self) -> &Self::Target {
48 &self.0
49 }
50}
51
52impl ToTokens for DebugExpr {
53 fn to_tokens(&self, tokens: &mut TokenStream) {
54 self.0.to_tokens(tokens);
55 }
56}
57
58impl Debug for DebugExpr {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "{}", self.0.to_token_stream())
61 }
62}
63
64impl Display for DebugExpr {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 let original = self.0.as_ref().clone();
67 let simplified = simplify_q_macro(original);
68
69 write!(f, "q!({})", quote::quote!(#simplified))
72 }
73}
74
75fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77 let mut simplifier = QMacroSimplifier::new();
80 simplifier.visit_expr_mut(&mut expr);
81
82 if let Some(simplified) = simplifier.simplified_result {
84 simplified
85 } else {
86 expr
87 }
88}
89
90#[derive(Default)]
92pub struct QMacroSimplifier {
93 pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97 pub fn new() -> Self {
98 Self::default()
99 }
100}
101
102impl VisitMut for QMacroSimplifier {
103 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104 if self.simplified_result.is_some() {
106 return;
107 }
108
109 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110 && self.is_stageleft_runtime_support_call(&path_expr.path)
112 && let Some(closure) = self.extract_closure_from_args(&call.args)
114 {
115 self.simplified_result = Some(closure);
116 return;
117 }
118
119 syn::visit_mut::visit_expr_mut(self, expr);
122 }
123}
124
125impl QMacroSimplifier {
126 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127 if let Some(last_segment) = path.segments.last() {
129 let fn_name = last_segment.ident.to_string();
130 fn_name.contains("_type_hint")
132 && path.segments.len() > 2
133 && path.segments[0].ident == "stageleft"
134 && path.segments[1].ident == "runtime_support"
135 } else {
136 false
137 }
138 }
139
140 fn extract_closure_from_args(
141 &self,
142 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143 ) -> Option<syn::Expr> {
144 for arg in args {
146 if let syn::Expr::Closure(_) = arg {
147 return Some(arg.clone());
148 }
149 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151 return Some(closure_expr);
152 }
153 }
154 None
155 }
156
157 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158 let mut visitor = ClosureFinder {
159 found_closure: None,
160 prefer_inner_blocks: true,
161 };
162 visitor.visit_expr(expr);
163 visitor.found_closure
164 }
165}
166
167struct ClosureFinder {
169 found_closure: Option<syn::Expr>,
170 prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175 if self.found_closure.is_some() {
177 return;
178 }
179
180 match expr {
181 syn::Expr::Closure(_) => {
182 self.found_closure = Some(expr.clone());
183 }
184 syn::Expr::Block(block) if self.prefer_inner_blocks => {
185 for stmt in &block.block.stmts {
187 if let syn::Stmt::Expr(stmt_expr, _) = stmt
188 && let syn::Expr::Block(_) = stmt_expr
189 {
190 let mut inner_visitor = ClosureFinder {
192 found_closure: None,
193 prefer_inner_blocks: false, };
195 inner_visitor.visit_expr(stmt_expr);
196 if inner_visitor.found_closure.is_some() {
197 self.found_closure = Some(stmt_expr.clone());
199 return;
200 }
201 }
202 }
203
204 visit::visit_expr(self, expr);
206
207 if self.found_closure.is_some() {
210 }
212 }
213 _ => {
214 visit::visit_expr(self, expr);
216 }
217 }
218 }
219}
220
221#[derive(Clone, PartialEq, Eq, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228 fn from(t: syn::Type) -> Self {
229 Self(Box::new(t))
230 }
231}
232
233impl Deref for DebugType {
234 type Target = syn::Type;
235
236 fn deref(&self) -> &Self::Target {
237 &self.0
238 }
239}
240
241impl ToTokens for DebugType {
242 fn to_tokens(&self, tokens: &mut TokenStream) {
243 self.0.to_tokens(tokens);
244 }
245}
246
247impl Debug for DebugType {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 write!(f, "{}", self.0.to_token_stream())
250 }
251}
252
253pub enum DebugInstantiate {
254 Building,
255 Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259 not(feature = "build"),
260 expect(
261 dead_code,
262 reason = "sink, source unused without `feature = \"build\"`."
263 )
264)]
265pub struct DebugInstantiateFinalized {
266 sink: syn::Expr,
267 source: syn::Expr,
268 connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272 fn from(f: DebugInstantiateFinalized) -> Self {
273 Self::Finalized(Box::new(f))
274 }
275}
276
277impl Debug for DebugInstantiate {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 write!(f, "<network instantiate>")
280 }
281}
282
283impl Hash for DebugInstantiate {
284 fn hash<H: Hasher>(&self, _state: &mut H) {
285 }
287}
288
289impl Clone for DebugInstantiate {
290 fn clone(&self) -> Self {
291 match self {
292 DebugInstantiate::Building => DebugInstantiate::Building,
293 DebugInstantiate::Finalized(_) => {
294 panic!("DebugInstantiate::Finalized should not be cloned")
295 }
296 }
297 }
298}
299
300#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303 Stream(DebugExpr),
304 ExternalNetwork(),
305 Iter(DebugExpr),
306 Spin(),
307 ClusterMembers(LocationId),
308}
309
310#[cfg(feature = "build")]
311pub trait DfirBuilder {
317 fn singleton_intermediates(&self) -> bool;
319
320 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
322
323 fn batch(
324 &mut self,
325 in_ident: syn::Ident,
326 in_location: &LocationId,
327 in_kind: &CollectionKind,
328 out_ident: &syn::Ident,
329 out_location: &LocationId,
330 op_meta: &HydroIrOpMetadata,
331 );
332 fn yield_from_tick(
333 &mut self,
334 in_ident: syn::Ident,
335 in_location: &LocationId,
336 in_kind: &CollectionKind,
337 out_ident: &syn::Ident,
338 out_location: &LocationId,
339 );
340
341 fn begin_atomic(
342 &mut self,
343 in_ident: syn::Ident,
344 in_location: &LocationId,
345 in_kind: &CollectionKind,
346 out_ident: &syn::Ident,
347 out_location: &LocationId,
348 op_meta: &HydroIrOpMetadata,
349 );
350 fn end_atomic(
351 &mut self,
352 in_ident: syn::Ident,
353 in_location: &LocationId,
354 in_kind: &CollectionKind,
355 out_ident: &syn::Ident,
356 );
357
358 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
359 fn observe_nondet(
360 &mut self,
361 trusted: bool,
362 location: &LocationId,
363 in_ident: syn::Ident,
364 in_kind: &CollectionKind,
365 out_ident: &syn::Ident,
366 out_kind: &CollectionKind,
367 op_meta: &HydroIrOpMetadata,
368 );
369
370 #[expect(clippy::too_many_arguments, reason = "TODO")]
371 fn create_network(
372 &mut self,
373 from: &LocationId,
374 to: &LocationId,
375 input_ident: syn::Ident,
376 out_ident: &syn::Ident,
377 serialize: &Option<DebugExpr>,
378 sink: syn::Expr,
379 source: syn::Expr,
380 deserialize: &Option<DebugExpr>,
381 tag_id: usize,
382 );
383
384 fn create_external_source(
385 &mut self,
386 on: &LocationId,
387 source_expr: syn::Expr,
388 out_ident: &syn::Ident,
389 deserialize: &Option<DebugExpr>,
390 tag_id: usize,
391 );
392
393 fn create_external_output(
394 &mut self,
395 on: &LocationId,
396 sink_expr: syn::Expr,
397 input_ident: &syn::Ident,
398 serialize: &Option<DebugExpr>,
399 tag_id: usize,
400 );
401}
402
403#[cfg(feature = "build")]
404impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
405 fn singleton_intermediates(&self) -> bool {
406 false
407 }
408
409 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
410 self.entry(location.root().raw_id()).or_default()
411 }
412
413 fn batch(
414 &mut self,
415 in_ident: syn::Ident,
416 in_location: &LocationId,
417 _in_kind: &CollectionKind,
418 out_ident: &syn::Ident,
419 _out_location: &LocationId,
420 _op_meta: &HydroIrOpMetadata,
421 ) {
422 let builder = self.get_dfir_mut(in_location.root());
423 builder.add_dfir(
424 parse_quote! {
425 #out_ident = #in_ident;
426 },
427 None,
428 None,
429 );
430 }
431
432 fn yield_from_tick(
433 &mut self,
434 in_ident: syn::Ident,
435 in_location: &LocationId,
436 _in_kind: &CollectionKind,
437 out_ident: &syn::Ident,
438 _out_location: &LocationId,
439 ) {
440 let builder = self.get_dfir_mut(in_location.root());
441 builder.add_dfir(
442 parse_quote! {
443 #out_ident = #in_ident;
444 },
445 None,
446 None,
447 );
448 }
449
450 fn begin_atomic(
451 &mut self,
452 in_ident: syn::Ident,
453 in_location: &LocationId,
454 _in_kind: &CollectionKind,
455 out_ident: &syn::Ident,
456 _out_location: &LocationId,
457 _op_meta: &HydroIrOpMetadata,
458 ) {
459 let builder = self.get_dfir_mut(in_location.root());
460 builder.add_dfir(
461 parse_quote! {
462 #out_ident = #in_ident;
463 },
464 None,
465 None,
466 );
467 }
468
469 fn end_atomic(
470 &mut self,
471 in_ident: syn::Ident,
472 in_location: &LocationId,
473 _in_kind: &CollectionKind,
474 out_ident: &syn::Ident,
475 ) {
476 let builder = self.get_dfir_mut(in_location.root());
477 builder.add_dfir(
478 parse_quote! {
479 #out_ident = #in_ident;
480 },
481 None,
482 None,
483 );
484 }
485
486 fn observe_nondet(
487 &mut self,
488 _trusted: bool,
489 location: &LocationId,
490 in_ident: syn::Ident,
491 _in_kind: &CollectionKind,
492 out_ident: &syn::Ident,
493 _out_kind: &CollectionKind,
494 _op_meta: &HydroIrOpMetadata,
495 ) {
496 let builder = self.get_dfir_mut(location);
497 builder.add_dfir(
498 parse_quote! {
499 #out_ident = #in_ident;
500 },
501 None,
502 None,
503 );
504 }
505
506 fn create_network(
507 &mut self,
508 from: &LocationId,
509 to: &LocationId,
510 input_ident: syn::Ident,
511 out_ident: &syn::Ident,
512 serialize: &Option<DebugExpr>,
513 sink: syn::Expr,
514 source: syn::Expr,
515 deserialize: &Option<DebugExpr>,
516 tag_id: usize,
517 ) {
518 let sender_builder = self.get_dfir_mut(from);
519 if let Some(serialize_pipeline) = serialize {
520 sender_builder.add_dfir(
521 parse_quote! {
522 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
523 },
524 None,
525 Some(&format!("send{}", tag_id)),
527 );
528 } else {
529 sender_builder.add_dfir(
530 parse_quote! {
531 #input_ident -> dest_sink(#sink);
532 },
533 None,
534 Some(&format!("send{}", tag_id)),
535 );
536 }
537
538 let receiver_builder = self.get_dfir_mut(to);
539 if let Some(deserialize_pipeline) = deserialize {
540 receiver_builder.add_dfir(
541 parse_quote! {
542 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
543 },
544 None,
545 Some(&format!("recv{}", tag_id)),
546 );
547 } else {
548 receiver_builder.add_dfir(
549 parse_quote! {
550 #out_ident = source_stream(#source);
551 },
552 None,
553 Some(&format!("recv{}", tag_id)),
554 );
555 }
556 }
557
558 fn create_external_source(
559 &mut self,
560 on: &LocationId,
561 source_expr: syn::Expr,
562 out_ident: &syn::Ident,
563 deserialize: &Option<DebugExpr>,
564 tag_id: usize,
565 ) {
566 let receiver_builder = self.get_dfir_mut(on);
567 if let Some(deserialize_pipeline) = deserialize {
568 receiver_builder.add_dfir(
569 parse_quote! {
570 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
571 },
572 None,
573 Some(&format!("recv{}", tag_id)),
574 );
575 } else {
576 receiver_builder.add_dfir(
577 parse_quote! {
578 #out_ident = source_stream(#source_expr);
579 },
580 None,
581 Some(&format!("recv{}", tag_id)),
582 );
583 }
584 }
585
586 fn create_external_output(
587 &mut self,
588 on: &LocationId,
589 sink_expr: syn::Expr,
590 input_ident: &syn::Ident,
591 serialize: &Option<DebugExpr>,
592 tag_id: usize,
593 ) {
594 let sender_builder = self.get_dfir_mut(on);
595 if let Some(serialize_fn) = serialize {
596 sender_builder.add_dfir(
597 parse_quote! {
598 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
599 },
600 None,
601 Some(&format!("send{}", tag_id)),
603 );
604 } else {
605 sender_builder.add_dfir(
606 parse_quote! {
607 #input_ident -> dest_sink(#sink_expr);
608 },
609 None,
610 Some(&format!("send{}", tag_id)),
611 );
612 }
613 }
614}
615
616#[cfg(feature = "build")]
617pub enum BuildersOrCallback<'a, L, N>
618where
619 L: FnMut(&mut HydroRoot, &mut usize),
620 N: FnMut(&mut HydroNode, &mut usize),
621{
622 Builders(&'a mut dyn DfirBuilder),
623 Callback(L, N),
624}
625
626#[derive(Debug, Hash)]
630pub enum HydroRoot {
631 ForEach {
632 f: DebugExpr,
633 input: Box<HydroNode>,
634 op_metadata: HydroIrOpMetadata,
635 },
636 SendExternal {
637 to_external_id: usize,
638 to_key: usize,
639 to_many: bool,
640 unpaired: bool,
641 serialize_fn: Option<DebugExpr>,
642 instantiate_fn: DebugInstantiate,
643 input: Box<HydroNode>,
644 op_metadata: HydroIrOpMetadata,
645 },
646 DestSink {
647 sink: DebugExpr,
648 input: Box<HydroNode>,
649 op_metadata: HydroIrOpMetadata,
650 },
651 CycleSink {
652 ident: syn::Ident,
653 input: Box<HydroNode>,
654 op_metadata: HydroIrOpMetadata,
655 },
656}
657
658impl HydroRoot {
659 #[cfg(feature = "build")]
660 pub fn compile_network<'a, D>(
661 &mut self,
662 extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
663 seen_tees: &mut SeenTees,
664 processes: &HashMap<usize, D::Process>,
665 clusters: &HashMap<usize, D::Cluster>,
666 externals: &HashMap<usize, D::External>,
667 ) where
668 D: Deploy<'a>,
669 {
670 let refcell_extra_stmts = RefCell::new(extra_stmts);
671 self.transform_bottom_up(
672 &mut |l| {
673 if let HydroRoot::SendExternal {
674 input,
675 to_external_id,
676 to_key,
677 to_many,
678 unpaired,
679 instantiate_fn,
680 ..
681 } = l
682 {
683 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
684 DebugInstantiate::Building => {
685 let to_node = externals
686 .get(to_external_id)
687 .unwrap_or_else(|| {
688 panic!("A external used in the graph was not instantiated: {}", to_external_id)
689 })
690 .clone();
691
692 match input.metadata().location_kind.root() {
693 LocationId::Process(process_id) => {
694 if *to_many {
695 (
696 (
697 D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
698 parse_quote!(DUMMY),
699 ),
700 Box::new(|| {}) as Box<dyn FnOnce()>,
701 )
702 } else {
703 let from_node = processes
704 .get(process_id)
705 .unwrap_or_else(|| {
706 panic!("A process used in the graph was not instantiated: {}", process_id)
707 })
708 .clone();
709
710 let sink_port = D::allocate_process_port(&from_node);
711 let source_port: <D as Deploy<'a>>::Port = D::allocate_external_port(&to_node);
712
713 if *unpaired {
714 use stageleft::quote_type;
715 use tokio_util::codec::LengthDelimitedCodec;
716
717 to_node.register(*to_key, source_port.clone());
718
719 let _ = D::e2o_source(
720 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
721 &to_node, &source_port,
722 &from_node, &sink_port,
723 "e_type::<LengthDelimitedCodec>(),
724 format!("{}_{}", *to_external_id, *to_key)
725 );
726 }
727
728 (
729 (
730 D::o2e_sink(
731 &from_node,
732 &sink_port,
733 &to_node,
734 &source_port,
735 format!("{}_{}", *to_external_id, *to_key)
736 ),
737 parse_quote!(DUMMY),
738 ),
739 if *unpaired {
740 D::e2o_connect(
741 &to_node,
742 &source_port,
743 &from_node,
744 &sink_port,
745 *to_many,
746 NetworkHint::Auto,
747 )
748 } else {
749 Box::new(|| {}) as Box<dyn FnOnce()>
750 },
751 )
752 }
753 }
754 LocationId::Cluster(_) => todo!(),
755 _ => panic!()
756 }
757 },
758
759 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
760 };
761
762 *instantiate_fn = DebugInstantiateFinalized {
763 sink: sink_expr,
764 source: source_expr,
765 connect_fn: Some(connect_fn),
766 }
767 .into();
768 }
769 },
770 &mut |n| {
771 if let HydroNode::Network {
772 input,
773 instantiate_fn,
774 metadata,
775 ..
776 } = n
777 {
778 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
779 DebugInstantiate::Building => instantiate_network::<D>(
780 input.metadata().location_kind.root(),
781 metadata.location_kind.root(),
782 processes,
783 clusters,
784 ),
785
786 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
787 };
788
789 *instantiate_fn = DebugInstantiateFinalized {
790 sink: sink_expr,
791 source: source_expr,
792 connect_fn: Some(connect_fn),
793 }
794 .into();
795 } else if let HydroNode::ExternalInput {
796 from_external_id,
797 from_key,
798 from_many,
799 codec_type,
800 port_hint,
801 instantiate_fn,
802 metadata,
803 ..
804 } = n
805 {
806 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
807 DebugInstantiate::Building => {
808 let from_node = externals
809 .get(from_external_id)
810 .unwrap_or_else(|| {
811 panic!(
812 "A external used in the graph was not instantiated: {}",
813 from_external_id
814 )
815 })
816 .clone();
817
818 match metadata.location_kind.root() {
819 LocationId::Process(process_id) => {
820 let to_node = processes
821 .get(process_id)
822 .unwrap_or_else(|| {
823 panic!("A process used in the graph was not instantiated: {}", process_id)
824 })
825 .clone();
826
827 let sink_port = D::allocate_external_port(&from_node);
828 let source_port = D::allocate_process_port(&to_node);
829
830 from_node.register(*from_key, sink_port.clone());
831
832 (
833 (
834 parse_quote!(DUMMY),
835 if *from_many {
836 D::e2o_many_source(
837 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
838 &to_node, &source_port,
839 codec_type.0.as_ref(),
840 format!("{}_{}", *from_external_id, *from_key)
841 )
842 } else {
843 D::e2o_source(
844 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
845 &from_node, &sink_port,
846 &to_node, &source_port,
847 codec_type.0.as_ref(),
848 format!("{}_{}", *from_external_id, *from_key)
849 )
850 },
851 ),
852 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
853 )
854 }
855 LocationId::Cluster(_) => todo!(),
856 _ => panic!()
857 }
858 },
859
860 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
861 };
862
863 *instantiate_fn = DebugInstantiateFinalized {
864 sink: sink_expr,
865 source: source_expr,
866 connect_fn: Some(connect_fn),
867 }
868 .into();
869 }
870 },
871 seen_tees,
872 false,
873 );
874 }
875
876 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
877 self.transform_bottom_up(
878 &mut |l| {
879 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
880 match instantiate_fn {
881 DebugInstantiate::Building => panic!("network not built"),
882
883 DebugInstantiate::Finalized(finalized) => {
884 (finalized.connect_fn.take().unwrap())();
885 }
886 }
887 }
888 },
889 &mut |n| {
890 if let HydroNode::Network { instantiate_fn, .. }
891 | HydroNode::ExternalInput { instantiate_fn, .. } = n
892 {
893 match instantiate_fn {
894 DebugInstantiate::Building => panic!("network not built"),
895
896 DebugInstantiate::Finalized(finalized) => {
897 (finalized.connect_fn.take().unwrap())();
898 }
899 }
900 }
901 },
902 seen_tees,
903 false,
904 );
905 }
906
907 pub fn transform_bottom_up(
908 &mut self,
909 transform_root: &mut impl FnMut(&mut HydroRoot),
910 transform_node: &mut impl FnMut(&mut HydroNode),
911 seen_tees: &mut SeenTees,
912 check_well_formed: bool,
913 ) {
914 self.transform_children(
915 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
916 seen_tees,
917 );
918
919 transform_root(self);
920 }
921
922 pub fn transform_children(
923 &mut self,
924 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
925 seen_tees: &mut SeenTees,
926 ) {
927 match self {
928 HydroRoot::ForEach { input, .. }
929 | HydroRoot::SendExternal { input, .. }
930 | HydroRoot::DestSink { input, .. }
931 | HydroRoot::CycleSink { input, .. } => {
932 transform(input, seen_tees);
933 }
934 }
935 }
936
937 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
938 match self {
939 HydroRoot::ForEach {
940 f,
941 input,
942 op_metadata,
943 } => HydroRoot::ForEach {
944 f: f.clone(),
945 input: Box::new(input.deep_clone(seen_tees)),
946 op_metadata: op_metadata.clone(),
947 },
948 HydroRoot::SendExternal {
949 to_external_id,
950 to_key,
951 to_many,
952 unpaired,
953 serialize_fn,
954 instantiate_fn,
955 input,
956 op_metadata,
957 } => HydroRoot::SendExternal {
958 to_external_id: *to_external_id,
959 to_key: *to_key,
960 to_many: *to_many,
961 unpaired: *unpaired,
962 serialize_fn: serialize_fn.clone(),
963 instantiate_fn: instantiate_fn.clone(),
964 input: Box::new(input.deep_clone(seen_tees)),
965 op_metadata: op_metadata.clone(),
966 },
967 HydroRoot::DestSink {
968 sink,
969 input,
970 op_metadata,
971 } => HydroRoot::DestSink {
972 sink: sink.clone(),
973 input: Box::new(input.deep_clone(seen_tees)),
974 op_metadata: op_metadata.clone(),
975 },
976 HydroRoot::CycleSink {
977 ident,
978 input,
979 op_metadata,
980 } => HydroRoot::CycleSink {
981 ident: ident.clone(),
982 input: Box::new(input.deep_clone(seen_tees)),
983 op_metadata: op_metadata.clone(),
984 },
985 }
986 }
987
988 #[cfg(feature = "build")]
989 pub fn emit<'a, D: Deploy<'a>>(
990 &mut self,
991 graph_builders: &mut dyn DfirBuilder,
992 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
993 next_stmt_id: &mut usize,
994 ) {
995 self.emit_core::<D>(
996 &mut BuildersOrCallback::Builders::<
997 fn(&mut HydroRoot, &mut usize),
998 fn(&mut HydroNode, &mut usize),
999 >(graph_builders),
1000 built_tees,
1001 next_stmt_id,
1002 );
1003 }
1004
1005 #[cfg(feature = "build")]
1006 pub fn emit_core<'a, D: Deploy<'a>>(
1007 &mut self,
1008 builders_or_callback: &mut BuildersOrCallback<
1009 impl FnMut(&mut HydroRoot, &mut usize),
1010 impl FnMut(&mut HydroNode, &mut usize),
1011 >,
1012 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1013 next_stmt_id: &mut usize,
1014 ) {
1015 match self {
1016 HydroRoot::ForEach { f, input, .. } => {
1017 let input_ident =
1018 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1019
1020 match builders_or_callback {
1021 BuildersOrCallback::Builders(graph_builders) => {
1022 graph_builders
1023 .get_dfir_mut(&input.metadata().location_kind)
1024 .add_dfir(
1025 parse_quote! {
1026 #input_ident -> for_each(#f);
1027 },
1028 None,
1029 Some(&next_stmt_id.to_string()),
1030 );
1031 }
1032 BuildersOrCallback::Callback(leaf_callback, _) => {
1033 leaf_callback(self, next_stmt_id);
1034 }
1035 }
1036
1037 *next_stmt_id += 1;
1038 }
1039
1040 HydroRoot::SendExternal {
1041 serialize_fn,
1042 instantiate_fn,
1043 input,
1044 ..
1045 } => {
1046 let input_ident =
1047 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1048
1049 match builders_or_callback {
1050 BuildersOrCallback::Builders(graph_builders) => {
1051 let (sink_expr, _) = match instantiate_fn {
1052 DebugInstantiate::Building => (
1053 syn::parse_quote!(DUMMY_SINK),
1054 syn::parse_quote!(DUMMY_SOURCE),
1055 ),
1056
1057 DebugInstantiate::Finalized(finalized) => {
1058 (finalized.sink.clone(), finalized.source.clone())
1059 }
1060 };
1061
1062 graph_builders.create_external_output(
1063 &input.metadata().location_kind,
1064 sink_expr,
1065 &input_ident,
1066 serialize_fn,
1067 *next_stmt_id,
1068 );
1069 }
1070 BuildersOrCallback::Callback(leaf_callback, _) => {
1071 leaf_callback(self, next_stmt_id);
1072 }
1073 }
1074
1075 *next_stmt_id += 1;
1076 }
1077
1078 HydroRoot::DestSink { sink, input, .. } => {
1079 let input_ident =
1080 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1081
1082 match builders_or_callback {
1083 BuildersOrCallback::Builders(graph_builders) => {
1084 graph_builders
1085 .get_dfir_mut(&input.metadata().location_kind)
1086 .add_dfir(
1087 parse_quote! {
1088 #input_ident -> dest_sink(#sink);
1089 },
1090 None,
1091 Some(&next_stmt_id.to_string()),
1092 );
1093 }
1094 BuildersOrCallback::Callback(leaf_callback, _) => {
1095 leaf_callback(self, next_stmt_id);
1096 }
1097 }
1098
1099 *next_stmt_id += 1;
1100 }
1101
1102 HydroRoot::CycleSink { ident, input, .. } => {
1103 let input_ident =
1104 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1105
1106 match builders_or_callback {
1107 BuildersOrCallback::Builders(graph_builders) => {
1108 let elem_type: syn::Type = match &input.metadata().collection_kind {
1109 CollectionKind::KeyedSingleton {
1110 key_type,
1111 value_type,
1112 ..
1113 }
1114 | CollectionKind::KeyedStream {
1115 key_type,
1116 value_type,
1117 ..
1118 } => {
1119 parse_quote!((#key_type, #value_type))
1120 }
1121 CollectionKind::Stream { element_type, .. }
1122 | CollectionKind::Singleton { element_type, .. }
1123 | CollectionKind::Optional { element_type, .. } => {
1124 parse_quote!(#element_type)
1125 }
1126 };
1127
1128 graph_builders
1129 .get_dfir_mut(&input.metadata().location_kind)
1130 .add_dfir(
1131 parse_quote! {
1132 #ident = #input_ident -> identity::<#elem_type>();
1133 },
1134 None,
1135 None,
1136 );
1137 }
1138 BuildersOrCallback::Callback(_, _) => {}
1140 }
1141 }
1142 }
1143 }
1144
1145 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1146 match self {
1147 HydroRoot::ForEach { op_metadata, .. }
1148 | HydroRoot::SendExternal { op_metadata, .. }
1149 | HydroRoot::DestSink { op_metadata, .. }
1150 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1151 }
1152 }
1153
1154 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1155 match self {
1156 HydroRoot::ForEach { op_metadata, .. }
1157 | HydroRoot::SendExternal { op_metadata, .. }
1158 | HydroRoot::DestSink { op_metadata, .. }
1159 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1160 }
1161 }
1162
1163 pub fn input(&self) -> &HydroNode {
1164 match self {
1165 HydroRoot::ForEach { input, .. }
1166 | HydroRoot::SendExternal { input, .. }
1167 | HydroRoot::DestSink { input, .. }
1168 | HydroRoot::CycleSink { input, .. } => input,
1169 }
1170 }
1171
1172 pub fn input_metadata(&self) -> &HydroIrMetadata {
1173 self.input().metadata()
1174 }
1175
1176 pub fn print_root(&self) -> String {
1177 match self {
1178 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1179 HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1180 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1181 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1182 }
1183 }
1184
1185 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1186 match self {
1187 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1188 transform(f);
1189 }
1190 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1191 }
1192 }
1193}
1194
1195#[cfg(feature = "build")]
1196pub fn emit<'a, D: Deploy<'a>>(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1197 let mut builders = BTreeMap::new();
1198 let mut built_tees = HashMap::new();
1199 let mut next_stmt_id = 0;
1200 for leaf in ir {
1201 leaf.emit::<D>(&mut builders, &mut built_tees, &mut next_stmt_id);
1202 }
1203 builders
1204}
1205
1206#[cfg(feature = "build")]
1207pub fn traverse_dfir<'a, D: Deploy<'a>>(
1208 ir: &mut [HydroRoot],
1209 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1210 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1211) {
1212 let mut seen_tees = HashMap::new();
1213 let mut next_stmt_id = 0;
1214 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1215 ir.iter_mut().for_each(|leaf| {
1216 leaf.emit_core::<D>(&mut callback, &mut seen_tees, &mut next_stmt_id);
1217 });
1218}
1219
1220pub fn transform_bottom_up(
1221 ir: &mut [HydroRoot],
1222 transform_root: &mut impl FnMut(&mut HydroRoot),
1223 transform_node: &mut impl FnMut(&mut HydroNode),
1224 check_well_formed: bool,
1225) {
1226 let mut seen_tees = HashMap::new();
1227 ir.iter_mut().for_each(|leaf| {
1228 leaf.transform_bottom_up(
1229 transform_root,
1230 transform_node,
1231 &mut seen_tees,
1232 check_well_formed,
1233 );
1234 });
1235}
1236
1237pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1238 let mut seen_tees = HashMap::new();
1239 ir.iter()
1240 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1241 .collect()
1242}
1243
1244type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1245thread_local! {
1246 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1247}
1248
1249pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1250 PRINTED_TEES.with(|printed_tees| {
1251 let mut printed_tees_mut = printed_tees.borrow_mut();
1252 *printed_tees_mut = Some((0, HashMap::new()));
1253 drop(printed_tees_mut);
1254
1255 let ret = f();
1256
1257 let mut printed_tees_mut = printed_tees.borrow_mut();
1258 *printed_tees_mut = None;
1259
1260 ret
1261 })
1262}
1263
1264pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1265
1266impl TeeNode {
1267 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1268 Rc::as_ptr(&self.0)
1269 }
1270}
1271
1272impl Debug for TeeNode {
1273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1274 PRINTED_TEES.with(|printed_tees| {
1275 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1276 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1277
1278 if let Some(printed_tees_mut) = printed_tees_mut {
1279 if let Some(existing) = printed_tees_mut
1280 .1
1281 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1282 {
1283 write!(f, "<tee {}>", existing)
1284 } else {
1285 let next_id = printed_tees_mut.0;
1286 printed_tees_mut.0 += 1;
1287 printed_tees_mut
1288 .1
1289 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1290 drop(printed_tees_mut_borrow);
1291 write!(f, "<tee {}>: ", next_id)?;
1292 Debug::fmt(&self.0.borrow(), f)
1293 }
1294 } else {
1295 drop(printed_tees_mut_borrow);
1296 write!(f, "<tee>: ")?;
1297 Debug::fmt(&self.0.borrow(), f)
1298 }
1299 })
1300 }
1301}
1302
1303impl Hash for TeeNode {
1304 fn hash<H: Hasher>(&self, state: &mut H) {
1305 self.0.borrow_mut().hash(state);
1306 }
1307}
1308
1309#[derive(Clone, PartialEq, Eq, Debug)]
1310pub enum BoundKind {
1311 Unbounded,
1312 Bounded,
1313}
1314
1315#[derive(Clone, PartialEq, Eq, Debug)]
1316pub enum StreamOrder {
1317 NoOrder,
1318 TotalOrder,
1319}
1320
1321#[derive(Clone, PartialEq, Eq, Debug)]
1322pub enum StreamRetry {
1323 AtLeastOnce,
1324 ExactlyOnce,
1325}
1326
1327#[derive(Clone, PartialEq, Eq, Debug)]
1328pub enum KeyedSingletonBoundKind {
1329 Unbounded,
1330 BoundedValue,
1331 Bounded,
1332}
1333
1334#[derive(Clone, PartialEq, Eq, Debug)]
1335pub enum CollectionKind {
1336 Stream {
1337 bound: BoundKind,
1338 order: StreamOrder,
1339 retry: StreamRetry,
1340 element_type: DebugType,
1341 },
1342 Singleton {
1343 bound: BoundKind,
1344 element_type: DebugType,
1345 },
1346 Optional {
1347 bound: BoundKind,
1348 element_type: DebugType,
1349 },
1350 KeyedStream {
1351 bound: BoundKind,
1352 value_order: StreamOrder,
1353 value_retry: StreamRetry,
1354 key_type: DebugType,
1355 value_type: DebugType,
1356 },
1357 KeyedSingleton {
1358 bound: KeyedSingletonBoundKind,
1359 key_type: DebugType,
1360 value_type: DebugType,
1361 },
1362}
1363
1364#[derive(Clone)]
1365pub struct HydroIrMetadata {
1366 pub location_kind: LocationId,
1367 pub collection_kind: CollectionKind,
1368 pub cardinality: Option<usize>,
1369 pub tag: Option<String>,
1370 pub op: HydroIrOpMetadata,
1371}
1372
1373impl Hash for HydroIrMetadata {
1375 fn hash<H: Hasher>(&self, _: &mut H) {}
1376}
1377
1378impl PartialEq for HydroIrMetadata {
1379 fn eq(&self, _: &Self) -> bool {
1380 true
1381 }
1382}
1383
1384impl Eq for HydroIrMetadata {}
1385
1386impl Debug for HydroIrMetadata {
1387 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1388 f.debug_struct("HydroIrMetadata")
1389 .field("location_kind", &self.location_kind)
1390 .field("collection_kind", &self.collection_kind)
1391 .finish()
1392 }
1393}
1394
1395#[derive(Clone)]
1398pub struct HydroIrOpMetadata {
1399 pub backtrace: Backtrace,
1400 pub cpu_usage: Option<f64>,
1401 pub network_recv_cpu_usage: Option<f64>,
1402 pub id: Option<usize>,
1403}
1404
1405impl HydroIrOpMetadata {
1406 #[expect(
1407 clippy::new_without_default,
1408 reason = "explicit calls to new ensure correct backtrace bounds"
1409 )]
1410 pub fn new() -> HydroIrOpMetadata {
1411 Self::new_with_skip(1)
1412 }
1413
1414 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1415 HydroIrOpMetadata {
1416 backtrace: Backtrace::get_backtrace(2 + skip_count),
1417 cpu_usage: None,
1418 network_recv_cpu_usage: None,
1419 id: None,
1420 }
1421 }
1422}
1423
1424impl Debug for HydroIrOpMetadata {
1425 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1426 f.debug_struct("HydroIrOpMetadata").finish()
1427 }
1428}
1429
1430impl Hash for HydroIrOpMetadata {
1431 fn hash<H: Hasher>(&self, _: &mut H) {}
1432}
1433
1434#[derive(Debug, Hash)]
1437pub enum HydroNode {
1438 Placeholder,
1439
1440 Cast {
1448 inner: Box<HydroNode>,
1449 metadata: HydroIrMetadata,
1450 },
1451
1452 ObserveNonDet {
1458 inner: Box<HydroNode>,
1459 trusted: bool, metadata: HydroIrMetadata,
1461 },
1462
1463 Source {
1464 source: HydroSource,
1465 metadata: HydroIrMetadata,
1466 },
1467
1468 SingletonSource {
1469 value: DebugExpr,
1470 metadata: HydroIrMetadata,
1471 },
1472
1473 CycleSource {
1474 ident: syn::Ident,
1475 metadata: HydroIrMetadata,
1476 },
1477
1478 Tee {
1479 inner: TeeNode,
1480 metadata: HydroIrMetadata,
1481 },
1482
1483 BeginAtomic {
1484 inner: Box<HydroNode>,
1485 metadata: HydroIrMetadata,
1486 },
1487
1488 EndAtomic {
1489 inner: Box<HydroNode>,
1490 metadata: HydroIrMetadata,
1491 },
1492
1493 Batch {
1494 inner: Box<HydroNode>,
1495 metadata: HydroIrMetadata,
1496 },
1497
1498 YieldConcat {
1499 inner: Box<HydroNode>,
1500 metadata: HydroIrMetadata,
1501 },
1502
1503 Chain {
1504 first: Box<HydroNode>,
1505 second: Box<HydroNode>,
1506 metadata: HydroIrMetadata,
1507 },
1508
1509 ChainFirst {
1510 first: Box<HydroNode>,
1511 second: Box<HydroNode>,
1512 metadata: HydroIrMetadata,
1513 },
1514
1515 CrossProduct {
1516 left: Box<HydroNode>,
1517 right: Box<HydroNode>,
1518 metadata: HydroIrMetadata,
1519 },
1520
1521 CrossSingleton {
1522 left: Box<HydroNode>,
1523 right: Box<HydroNode>,
1524 metadata: HydroIrMetadata,
1525 },
1526
1527 Join {
1528 left: Box<HydroNode>,
1529 right: Box<HydroNode>,
1530 metadata: HydroIrMetadata,
1531 },
1532
1533 Difference {
1534 pos: Box<HydroNode>,
1535 neg: Box<HydroNode>,
1536 metadata: HydroIrMetadata,
1537 },
1538
1539 AntiJoin {
1540 pos: Box<HydroNode>,
1541 neg: Box<HydroNode>,
1542 metadata: HydroIrMetadata,
1543 },
1544
1545 ResolveFutures {
1546 input: Box<HydroNode>,
1547 metadata: HydroIrMetadata,
1548 },
1549 ResolveFuturesOrdered {
1550 input: Box<HydroNode>,
1551 metadata: HydroIrMetadata,
1552 },
1553
1554 Map {
1555 f: DebugExpr,
1556 input: Box<HydroNode>,
1557 metadata: HydroIrMetadata,
1558 },
1559 FlatMap {
1560 f: DebugExpr,
1561 input: Box<HydroNode>,
1562 metadata: HydroIrMetadata,
1563 },
1564 Filter {
1565 f: DebugExpr,
1566 input: Box<HydroNode>,
1567 metadata: HydroIrMetadata,
1568 },
1569 FilterMap {
1570 f: DebugExpr,
1571 input: Box<HydroNode>,
1572 metadata: HydroIrMetadata,
1573 },
1574
1575 DeferTick {
1576 input: Box<HydroNode>,
1577 metadata: HydroIrMetadata,
1578 },
1579 Enumerate {
1580 input: Box<HydroNode>,
1581 metadata: HydroIrMetadata,
1582 },
1583 Inspect {
1584 f: DebugExpr,
1585 input: Box<HydroNode>,
1586 metadata: HydroIrMetadata,
1587 },
1588
1589 Unique {
1590 input: Box<HydroNode>,
1591 metadata: HydroIrMetadata,
1592 },
1593
1594 Sort {
1595 input: Box<HydroNode>,
1596 metadata: HydroIrMetadata,
1597 },
1598 Fold {
1599 init: DebugExpr,
1600 acc: DebugExpr,
1601 input: Box<HydroNode>,
1602 metadata: HydroIrMetadata,
1603 },
1604
1605 Scan {
1606 init: DebugExpr,
1607 acc: DebugExpr,
1608 input: Box<HydroNode>,
1609 metadata: HydroIrMetadata,
1610 },
1611 FoldKeyed {
1612 init: DebugExpr,
1613 acc: DebugExpr,
1614 input: Box<HydroNode>,
1615 metadata: HydroIrMetadata,
1616 },
1617
1618 Reduce {
1619 f: DebugExpr,
1620 input: Box<HydroNode>,
1621 metadata: HydroIrMetadata,
1622 },
1623 ReduceKeyed {
1624 f: DebugExpr,
1625 input: Box<HydroNode>,
1626 metadata: HydroIrMetadata,
1627 },
1628 ReduceKeyedWatermark {
1629 f: DebugExpr,
1630 input: Box<HydroNode>,
1631 watermark: Box<HydroNode>,
1632 metadata: HydroIrMetadata,
1633 },
1634
1635 Network {
1636 serialize_fn: Option<DebugExpr>,
1637 instantiate_fn: DebugInstantiate,
1638 deserialize_fn: Option<DebugExpr>,
1639 input: Box<HydroNode>,
1640 metadata: HydroIrMetadata,
1641 },
1642
1643 ExternalInput {
1644 from_external_id: usize,
1645 from_key: usize,
1646 from_many: bool,
1647 codec_type: DebugType,
1648 port_hint: NetworkHint,
1649 instantiate_fn: DebugInstantiate,
1650 deserialize_fn: Option<DebugExpr>,
1651 metadata: HydroIrMetadata,
1652 },
1653
1654 Counter {
1655 tag: String,
1656 duration: DebugExpr,
1657 prefix: String,
1658 input: Box<HydroNode>,
1659 metadata: HydroIrMetadata,
1660 },
1661}
1662
1663pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1664pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1665
1666impl HydroNode {
1667 pub fn transform_bottom_up(
1668 &mut self,
1669 transform: &mut impl FnMut(&mut HydroNode),
1670 seen_tees: &mut SeenTees,
1671 check_well_formed: bool,
1672 ) {
1673 self.transform_children(
1674 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1675 seen_tees,
1676 );
1677
1678 transform(self);
1679
1680 let self_location = self.metadata().location_kind.root();
1681
1682 if check_well_formed {
1683 match &*self {
1684 HydroNode::Network { .. } => {}
1685 _ => {
1686 self.input_metadata().iter().for_each(|i| {
1687 if i.location_kind.root() != self_location {
1688 panic!(
1689 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1690 i,
1691 i.location_kind.root(),
1692 self,
1693 self_location
1694 )
1695 }
1696 });
1697 }
1698 }
1699 }
1700 }
1701
1702 #[inline(always)]
1703 pub fn transform_children(
1704 &mut self,
1705 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1706 seen_tees: &mut SeenTees,
1707 ) {
1708 match self {
1709 HydroNode::Placeholder => {
1710 panic!();
1711 }
1712
1713 HydroNode::Source { .. }
1714 | HydroNode::SingletonSource { .. }
1715 | HydroNode::CycleSource { .. }
1716 | HydroNode::ExternalInput { .. } => {}
1717
1718 HydroNode::Tee { inner, .. } => {
1719 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1720 *inner = TeeNode(transformed.clone());
1721 } else {
1722 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1723 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1724 let mut orig = inner.0.replace(HydroNode::Placeholder);
1725 transform(&mut orig, seen_tees);
1726 *transformed_cell.borrow_mut() = orig;
1727 *inner = TeeNode(transformed_cell);
1728 }
1729 }
1730
1731 HydroNode::Cast { inner, .. }
1732 | HydroNode::ObserveNonDet { inner, .. }
1733 | HydroNode::BeginAtomic { inner, .. }
1734 | HydroNode::EndAtomic { inner, .. }
1735 | HydroNode::Batch { inner, .. }
1736 | HydroNode::YieldConcat { inner, .. } => {
1737 transform(inner.as_mut(), seen_tees);
1738 }
1739
1740 HydroNode::Chain { first, second, .. } => {
1741 transform(first.as_mut(), seen_tees);
1742 transform(second.as_mut(), seen_tees);
1743 }
1744
1745 HydroNode::ChainFirst { first, second, .. } => {
1746 transform(first.as_mut(), seen_tees);
1747 transform(second.as_mut(), seen_tees);
1748 }
1749
1750 HydroNode::CrossSingleton { left, right, .. }
1751 | HydroNode::CrossProduct { left, right, .. }
1752 | HydroNode::Join { left, right, .. } => {
1753 transform(left.as_mut(), seen_tees);
1754 transform(right.as_mut(), seen_tees);
1755 }
1756
1757 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1758 transform(pos.as_mut(), seen_tees);
1759 transform(neg.as_mut(), seen_tees);
1760 }
1761
1762 HydroNode::ReduceKeyedWatermark {
1763 input, watermark, ..
1764 } => {
1765 transform(input.as_mut(), seen_tees);
1766 transform(watermark.as_mut(), seen_tees);
1767 }
1768
1769 HydroNode::Map { input, .. }
1770 | HydroNode::ResolveFutures { input, .. }
1771 | HydroNode::ResolveFuturesOrdered { input, .. }
1772 | HydroNode::FlatMap { input, .. }
1773 | HydroNode::Filter { input, .. }
1774 | HydroNode::FilterMap { input, .. }
1775 | HydroNode::Sort { input, .. }
1776 | HydroNode::DeferTick { input, .. }
1777 | HydroNode::Enumerate { input, .. }
1778 | HydroNode::Inspect { input, .. }
1779 | HydroNode::Unique { input, .. }
1780 | HydroNode::Network { input, .. }
1781 | HydroNode::Fold { input, .. }
1782 | HydroNode::Scan { input, .. }
1783 | HydroNode::FoldKeyed { input, .. }
1784 | HydroNode::Reduce { input, .. }
1785 | HydroNode::ReduceKeyed { input, .. }
1786 | HydroNode::Counter { input, .. } => {
1787 transform(input.as_mut(), seen_tees);
1788 }
1789 }
1790 }
1791
1792 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1793 match self {
1794 HydroNode::Placeholder => HydroNode::Placeholder,
1795 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1796 inner: Box::new(inner.deep_clone(seen_tees)),
1797 metadata: metadata.clone(),
1798 },
1799 HydroNode::ObserveNonDet {
1800 inner,
1801 trusted,
1802 metadata,
1803 } => HydroNode::ObserveNonDet {
1804 inner: Box::new(inner.deep_clone(seen_tees)),
1805 trusted: *trusted,
1806 metadata: metadata.clone(),
1807 },
1808 HydroNode::Source { source, metadata } => HydroNode::Source {
1809 source: source.clone(),
1810 metadata: metadata.clone(),
1811 },
1812 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1813 value: value.clone(),
1814 metadata: metadata.clone(),
1815 },
1816 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1817 ident: ident.clone(),
1818 metadata: metadata.clone(),
1819 },
1820 HydroNode::Tee { inner, metadata } => {
1821 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1822 HydroNode::Tee {
1823 inner: TeeNode(transformed.clone()),
1824 metadata: metadata.clone(),
1825 }
1826 } else {
1827 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1828 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1829 let cloned = inner.0.borrow().deep_clone(seen_tees);
1830 *new_rc.borrow_mut() = cloned;
1831 HydroNode::Tee {
1832 inner: TeeNode(new_rc),
1833 metadata: metadata.clone(),
1834 }
1835 }
1836 }
1837 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1838 inner: Box::new(inner.deep_clone(seen_tees)),
1839 metadata: metadata.clone(),
1840 },
1841 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1842 inner: Box::new(inner.deep_clone(seen_tees)),
1843 metadata: metadata.clone(),
1844 },
1845 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1846 inner: Box::new(inner.deep_clone(seen_tees)),
1847 metadata: metadata.clone(),
1848 },
1849 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1850 inner: Box::new(inner.deep_clone(seen_tees)),
1851 metadata: metadata.clone(),
1852 },
1853 HydroNode::Chain {
1854 first,
1855 second,
1856 metadata,
1857 } => HydroNode::Chain {
1858 first: Box::new(first.deep_clone(seen_tees)),
1859 second: Box::new(second.deep_clone(seen_tees)),
1860 metadata: metadata.clone(),
1861 },
1862 HydroNode::ChainFirst {
1863 first,
1864 second,
1865 metadata,
1866 } => HydroNode::ChainFirst {
1867 first: Box::new(first.deep_clone(seen_tees)),
1868 second: Box::new(second.deep_clone(seen_tees)),
1869 metadata: metadata.clone(),
1870 },
1871 HydroNode::CrossProduct {
1872 left,
1873 right,
1874 metadata,
1875 } => HydroNode::CrossProduct {
1876 left: Box::new(left.deep_clone(seen_tees)),
1877 right: Box::new(right.deep_clone(seen_tees)),
1878 metadata: metadata.clone(),
1879 },
1880 HydroNode::CrossSingleton {
1881 left,
1882 right,
1883 metadata,
1884 } => HydroNode::CrossSingleton {
1885 left: Box::new(left.deep_clone(seen_tees)),
1886 right: Box::new(right.deep_clone(seen_tees)),
1887 metadata: metadata.clone(),
1888 },
1889 HydroNode::Join {
1890 left,
1891 right,
1892 metadata,
1893 } => HydroNode::Join {
1894 left: Box::new(left.deep_clone(seen_tees)),
1895 right: Box::new(right.deep_clone(seen_tees)),
1896 metadata: metadata.clone(),
1897 },
1898 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1899 pos: Box::new(pos.deep_clone(seen_tees)),
1900 neg: Box::new(neg.deep_clone(seen_tees)),
1901 metadata: metadata.clone(),
1902 },
1903 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1904 pos: Box::new(pos.deep_clone(seen_tees)),
1905 neg: Box::new(neg.deep_clone(seen_tees)),
1906 metadata: metadata.clone(),
1907 },
1908 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1909 input: Box::new(input.deep_clone(seen_tees)),
1910 metadata: metadata.clone(),
1911 },
1912 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1913 HydroNode::ResolveFuturesOrdered {
1914 input: Box::new(input.deep_clone(seen_tees)),
1915 metadata: metadata.clone(),
1916 }
1917 }
1918 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1919 f: f.clone(),
1920 input: Box::new(input.deep_clone(seen_tees)),
1921 metadata: metadata.clone(),
1922 },
1923 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1924 f: f.clone(),
1925 input: Box::new(input.deep_clone(seen_tees)),
1926 metadata: metadata.clone(),
1927 },
1928 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1929 f: f.clone(),
1930 input: Box::new(input.deep_clone(seen_tees)),
1931 metadata: metadata.clone(),
1932 },
1933 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1934 f: f.clone(),
1935 input: Box::new(input.deep_clone(seen_tees)),
1936 metadata: metadata.clone(),
1937 },
1938 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1939 input: Box::new(input.deep_clone(seen_tees)),
1940 metadata: metadata.clone(),
1941 },
1942 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1943 input: Box::new(input.deep_clone(seen_tees)),
1944 metadata: metadata.clone(),
1945 },
1946 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1947 f: f.clone(),
1948 input: Box::new(input.deep_clone(seen_tees)),
1949 metadata: metadata.clone(),
1950 },
1951 HydroNode::Unique { input, metadata } => HydroNode::Unique {
1952 input: Box::new(input.deep_clone(seen_tees)),
1953 metadata: metadata.clone(),
1954 },
1955 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1956 input: Box::new(input.deep_clone(seen_tees)),
1957 metadata: metadata.clone(),
1958 },
1959 HydroNode::Fold {
1960 init,
1961 acc,
1962 input,
1963 metadata,
1964 } => HydroNode::Fold {
1965 init: init.clone(),
1966 acc: acc.clone(),
1967 input: Box::new(input.deep_clone(seen_tees)),
1968 metadata: metadata.clone(),
1969 },
1970 HydroNode::Scan {
1971 init,
1972 acc,
1973 input,
1974 metadata,
1975 } => HydroNode::Scan {
1976 init: init.clone(),
1977 acc: acc.clone(),
1978 input: Box::new(input.deep_clone(seen_tees)),
1979 metadata: metadata.clone(),
1980 },
1981 HydroNode::FoldKeyed {
1982 init,
1983 acc,
1984 input,
1985 metadata,
1986 } => HydroNode::FoldKeyed {
1987 init: init.clone(),
1988 acc: acc.clone(),
1989 input: Box::new(input.deep_clone(seen_tees)),
1990 metadata: metadata.clone(),
1991 },
1992 HydroNode::ReduceKeyedWatermark {
1993 f,
1994 input,
1995 watermark,
1996 metadata,
1997 } => HydroNode::ReduceKeyedWatermark {
1998 f: f.clone(),
1999 input: Box::new(input.deep_clone(seen_tees)),
2000 watermark: Box::new(watermark.deep_clone(seen_tees)),
2001 metadata: metadata.clone(),
2002 },
2003 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2004 f: f.clone(),
2005 input: Box::new(input.deep_clone(seen_tees)),
2006 metadata: metadata.clone(),
2007 },
2008 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2009 f: f.clone(),
2010 input: Box::new(input.deep_clone(seen_tees)),
2011 metadata: metadata.clone(),
2012 },
2013 HydroNode::Network {
2014 serialize_fn,
2015 instantiate_fn,
2016 deserialize_fn,
2017 input,
2018 metadata,
2019 } => HydroNode::Network {
2020 serialize_fn: serialize_fn.clone(),
2021 instantiate_fn: instantiate_fn.clone(),
2022 deserialize_fn: deserialize_fn.clone(),
2023 input: Box::new(input.deep_clone(seen_tees)),
2024 metadata: metadata.clone(),
2025 },
2026 HydroNode::ExternalInput {
2027 from_external_id,
2028 from_key,
2029 from_many,
2030 codec_type,
2031 port_hint,
2032 instantiate_fn,
2033 deserialize_fn,
2034 metadata,
2035 } => HydroNode::ExternalInput {
2036 from_external_id: *from_external_id,
2037 from_key: *from_key,
2038 from_many: *from_many,
2039 codec_type: codec_type.clone(),
2040 port_hint: *port_hint,
2041 instantiate_fn: instantiate_fn.clone(),
2042 deserialize_fn: deserialize_fn.clone(),
2043 metadata: metadata.clone(),
2044 },
2045 HydroNode::Counter {
2046 tag,
2047 duration,
2048 prefix,
2049 input,
2050 metadata,
2051 } => HydroNode::Counter {
2052 tag: tag.clone(),
2053 duration: duration.clone(),
2054 prefix: prefix.clone(),
2055 input: Box::new(input.deep_clone(seen_tees)),
2056 metadata: metadata.clone(),
2057 },
2058 }
2059 }
2060
2061 #[cfg(feature = "build")]
2062 pub fn emit_core<'a, D: Deploy<'a>>(
2063 &mut self,
2064 builders_or_callback: &mut BuildersOrCallback<
2065 impl FnMut(&mut HydroRoot, &mut usize),
2066 impl FnMut(&mut HydroNode, &mut usize),
2067 >,
2068 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2069 next_stmt_id: &mut usize,
2070 ) -> syn::Ident {
2071 let out_location = self.metadata().location_kind.clone();
2072 match self {
2073 HydroNode::Placeholder => {
2074 panic!()
2075 }
2076
2077 HydroNode::Cast { inner, .. } => {
2078 let inner_ident =
2079 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2080
2081 match builders_or_callback {
2082 BuildersOrCallback::Builders(_) => {}
2083 BuildersOrCallback::Callback(_, node_callback) => {
2084 node_callback(self, next_stmt_id);
2085 }
2086 }
2087
2088 *next_stmt_id += 1;
2089
2090 inner_ident
2091 }
2092
2093 HydroNode::ObserveNonDet {
2094 inner,
2095 trusted,
2096 metadata,
2097 ..
2098 } => {
2099 let inner_ident =
2100 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2101
2102 let observe_ident =
2103 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2104
2105 match builders_or_callback {
2106 BuildersOrCallback::Builders(graph_builders) => {
2107 graph_builders.observe_nondet(
2108 *trusted,
2109 &inner.metadata().location_kind,
2110 inner_ident,
2111 &inner.metadata().collection_kind,
2112 &observe_ident,
2113 &metadata.collection_kind,
2114 &metadata.op,
2115 );
2116 }
2117 BuildersOrCallback::Callback(_, node_callback) => {
2118 node_callback(self, next_stmt_id);
2119 }
2120 }
2121
2122 *next_stmt_id += 1;
2123
2124 observe_ident
2125 }
2126
2127 HydroNode::Batch {
2128 inner, metadata, ..
2129 } => {
2130 let inner_ident =
2131 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2132
2133 let batch_ident =
2134 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2135
2136 match builders_or_callback {
2137 BuildersOrCallback::Builders(graph_builders) => {
2138 graph_builders.batch(
2139 inner_ident,
2140 &inner.metadata().location_kind,
2141 &inner.metadata().collection_kind,
2142 &batch_ident,
2143 &out_location,
2144 &metadata.op,
2145 );
2146 }
2147 BuildersOrCallback::Callback(_, node_callback) => {
2148 node_callback(self, next_stmt_id);
2149 }
2150 }
2151
2152 *next_stmt_id += 1;
2153
2154 batch_ident
2155 }
2156
2157 HydroNode::YieldConcat { inner, .. } => {
2158 let inner_ident =
2159 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2160
2161 let yield_ident =
2162 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2163
2164 match builders_or_callback {
2165 BuildersOrCallback::Builders(graph_builders) => {
2166 graph_builders.yield_from_tick(
2167 inner_ident,
2168 &inner.metadata().location_kind,
2169 &inner.metadata().collection_kind,
2170 &yield_ident,
2171 &out_location,
2172 );
2173 }
2174 BuildersOrCallback::Callback(_, node_callback) => {
2175 node_callback(self, next_stmt_id);
2176 }
2177 }
2178
2179 *next_stmt_id += 1;
2180
2181 yield_ident
2182 }
2183
2184 HydroNode::BeginAtomic { inner, metadata } => {
2185 let inner_ident =
2186 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2187
2188 let begin_ident =
2189 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2190
2191 match builders_or_callback {
2192 BuildersOrCallback::Builders(graph_builders) => {
2193 graph_builders.begin_atomic(
2194 inner_ident,
2195 &inner.metadata().location_kind,
2196 &inner.metadata().collection_kind,
2197 &begin_ident,
2198 &out_location,
2199 &metadata.op,
2200 );
2201 }
2202 BuildersOrCallback::Callback(_, node_callback) => {
2203 node_callback(self, next_stmt_id);
2204 }
2205 }
2206
2207 *next_stmt_id += 1;
2208
2209 begin_ident
2210 }
2211
2212 HydroNode::EndAtomic { inner, .. } => {
2213 let inner_ident =
2214 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2215
2216 let end_ident =
2217 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2218
2219 match builders_or_callback {
2220 BuildersOrCallback::Builders(graph_builders) => {
2221 graph_builders.end_atomic(
2222 inner_ident,
2223 &inner.metadata().location_kind,
2224 &inner.metadata().collection_kind,
2225 &end_ident,
2226 );
2227 }
2228 BuildersOrCallback::Callback(_, node_callback) => {
2229 node_callback(self, next_stmt_id);
2230 }
2231 }
2232
2233 *next_stmt_id += 1;
2234
2235 end_ident
2236 }
2237
2238 HydroNode::Source {
2239 source, metadata, ..
2240 } => {
2241 if let HydroSource::ExternalNetwork() = source {
2242 syn::Ident::new("DUMMY", Span::call_site())
2243 } else {
2244 let source_ident =
2245 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2246
2247 let source_stmt = match source {
2248 HydroSource::Stream(expr) => {
2249 debug_assert!(metadata.location_kind.is_top_level());
2250 parse_quote! {
2251 #source_ident = source_stream(#expr);
2252 }
2253 }
2254
2255 HydroSource::ExternalNetwork() => {
2256 unreachable!()
2257 }
2258
2259 HydroSource::Iter(expr) => {
2260 if metadata.location_kind.is_top_level() {
2261 parse_quote! {
2262 #source_ident = source_iter(#expr);
2263 }
2264 } else {
2265 parse_quote! {
2267 #source_ident = source_iter(#expr) -> persist::<'static>();
2268 }
2269 }
2270 }
2271
2272 HydroSource::Spin() => {
2273 debug_assert!(metadata.location_kind.is_top_level());
2274 parse_quote! {
2275 #source_ident = spin();
2276 }
2277 }
2278
2279 HydroSource::ClusterMembers(location_id) => {
2280 debug_assert!(metadata.location_kind.is_top_level());
2281
2282 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2283 D::cluster_membership_stream(location_id),
2284 &(),
2285 );
2286
2287 parse_quote! {
2288 #source_ident = source_stream(#expr);
2289 }
2290 }
2291 };
2292
2293 match builders_or_callback {
2294 BuildersOrCallback::Builders(graph_builders) => {
2295 let builder = graph_builders.get_dfir_mut(&out_location);
2296 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2297 }
2298 BuildersOrCallback::Callback(_, node_callback) => {
2299 node_callback(self, next_stmt_id);
2300 }
2301 }
2302
2303 *next_stmt_id += 1;
2304
2305 source_ident
2306 }
2307 }
2308
2309 HydroNode::SingletonSource { value, metadata } => {
2310 let source_ident =
2311 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2312
2313 match builders_or_callback {
2314 BuildersOrCallback::Builders(graph_builders) => {
2315 let should_replay = !graph_builders.singleton_intermediates();
2316 let builder = graph_builders.get_dfir_mut(&out_location);
2317
2318 if should_replay || !metadata.location_kind.is_top_level() {
2319 builder.add_dfir(
2320 parse_quote! {
2321 #source_ident = source_iter([#value]) -> persist::<'static>();
2322 },
2323 None,
2324 Some(&next_stmt_id.to_string()),
2325 );
2326 } else {
2327 builder.add_dfir(
2328 parse_quote! {
2329 #source_ident = source_iter([#value]);
2330 },
2331 None,
2332 Some(&next_stmt_id.to_string()),
2333 );
2334 }
2335 }
2336 BuildersOrCallback::Callback(_, node_callback) => {
2337 node_callback(self, next_stmt_id);
2338 }
2339 }
2340
2341 *next_stmt_id += 1;
2342
2343 source_ident
2344 }
2345
2346 HydroNode::CycleSource { ident, .. } => {
2347 let ident = ident.clone();
2348
2349 match builders_or_callback {
2350 BuildersOrCallback::Builders(_) => {}
2351 BuildersOrCallback::Callback(_, node_callback) => {
2352 node_callback(self, next_stmt_id);
2353 }
2354 }
2355
2356 *next_stmt_id += 1;
2358
2359 ident
2360 }
2361
2362 HydroNode::Tee { inner, .. } => {
2363 let ret_ident = if let Some(teed_from) =
2364 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2365 {
2366 match builders_or_callback {
2367 BuildersOrCallback::Builders(_) => {}
2368 BuildersOrCallback::Callback(_, node_callback) => {
2369 node_callback(self, next_stmt_id);
2370 }
2371 }
2372
2373 teed_from.clone()
2374 } else {
2375 let inner_ident = inner.0.borrow_mut().emit_core::<D>(
2376 builders_or_callback,
2377 built_tees,
2378 next_stmt_id,
2379 );
2380
2381 let tee_ident =
2382 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2383
2384 built_tees.insert(
2385 inner.0.as_ref() as *const RefCell<HydroNode>,
2386 tee_ident.clone(),
2387 );
2388
2389 match builders_or_callback {
2390 BuildersOrCallback::Builders(graph_builders) => {
2391 let builder = graph_builders.get_dfir_mut(&out_location);
2392 builder.add_dfir(
2393 parse_quote! {
2394 #tee_ident = #inner_ident -> tee();
2395 },
2396 None,
2397 Some(&next_stmt_id.to_string()),
2398 );
2399 }
2400 BuildersOrCallback::Callback(_, node_callback) => {
2401 node_callback(self, next_stmt_id);
2402 }
2403 }
2404
2405 tee_ident
2406 };
2407
2408 *next_stmt_id += 1;
2412 ret_ident
2413 }
2414
2415 HydroNode::Chain { first, second, .. } => {
2416 let first_ident =
2417 first.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2418 let second_ident =
2419 second.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2420
2421 let chain_ident =
2422 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2423
2424 match builders_or_callback {
2425 BuildersOrCallback::Builders(graph_builders) => {
2426 let builder = graph_builders.get_dfir_mut(&out_location);
2427 builder.add_dfir(
2428 parse_quote! {
2429 #chain_ident = chain();
2430 #first_ident -> [0]#chain_ident;
2431 #second_ident -> [1]#chain_ident;
2432 },
2433 None,
2434 Some(&next_stmt_id.to_string()),
2435 );
2436 }
2437 BuildersOrCallback::Callback(_, node_callback) => {
2438 node_callback(self, next_stmt_id);
2439 }
2440 }
2441
2442 *next_stmt_id += 1;
2443
2444 chain_ident
2445 }
2446
2447 HydroNode::ChainFirst { first, second, .. } => {
2448 let first_ident =
2449 first.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2450 let second_ident =
2451 second.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2452
2453 let chain_ident =
2454 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2455
2456 match builders_or_callback {
2457 BuildersOrCallback::Builders(graph_builders) => {
2458 let builder = graph_builders.get_dfir_mut(&out_location);
2459 builder.add_dfir(
2460 parse_quote! {
2461 #chain_ident = chain_first_n(1);
2462 #first_ident -> [0]#chain_ident;
2463 #second_ident -> [1]#chain_ident;
2464 },
2465 None,
2466 Some(&next_stmt_id.to_string()),
2467 );
2468 }
2469 BuildersOrCallback::Callback(_, node_callback) => {
2470 node_callback(self, next_stmt_id);
2471 }
2472 }
2473
2474 *next_stmt_id += 1;
2475
2476 chain_ident
2477 }
2478
2479 HydroNode::CrossSingleton { left, right, .. } => {
2480 let left_ident =
2481 left.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2482 let right_ident =
2483 right.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2484
2485 let cross_ident =
2486 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2487
2488 match builders_or_callback {
2489 BuildersOrCallback::Builders(graph_builders) => {
2490 let builder = graph_builders.get_dfir_mut(&out_location);
2491 builder.add_dfir(
2492 parse_quote! {
2493 #cross_ident = cross_singleton();
2494 #left_ident -> [input]#cross_ident;
2495 #right_ident -> [single]#cross_ident;
2496 },
2497 None,
2498 Some(&next_stmt_id.to_string()),
2499 );
2500 }
2501 BuildersOrCallback::Callback(_, node_callback) => {
2502 node_callback(self, next_stmt_id);
2503 }
2504 }
2505
2506 *next_stmt_id += 1;
2507
2508 cross_ident
2509 }
2510
2511 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2512 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
2513 parse_quote!(cross_join_multiset)
2514 } else {
2515 parse_quote!(join_multiset)
2516 };
2517
2518 let (HydroNode::CrossProduct { left, right, .. }
2519 | HydroNode::Join { left, right, .. }) = self
2520 else {
2521 unreachable!()
2522 };
2523
2524 let is_top_level = left.metadata().location_kind.is_top_level()
2525 && right.metadata().location_kind.is_top_level();
2526 let (left_inner, left_lifetime) = if left.metadata().location_kind.is_top_level() {
2527 (left, quote!('static))
2528 } else {
2529 (left, quote!('tick))
2530 };
2531
2532 let (right_inner, right_lifetime) = if right.metadata().location_kind.is_top_level()
2533 {
2534 (right, quote!('static))
2535 } else {
2536 (right, quote!('tick))
2537 };
2538
2539 let left_ident =
2540 left_inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2541 let right_ident =
2542 right_inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2543
2544 let stream_ident =
2545 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2546
2547 match builders_or_callback {
2548 BuildersOrCallback::Builders(graph_builders) => {
2549 let builder = graph_builders.get_dfir_mut(&out_location);
2550 builder.add_dfir(
2551 if is_top_level {
2552 parse_quote! {
2555 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2556 #left_ident -> [0]#stream_ident;
2557 #right_ident -> [1]#stream_ident;
2558 }
2559 } else {
2560 parse_quote! {
2561 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2562 #left_ident -> [0]#stream_ident;
2563 #right_ident -> [1]#stream_ident;
2564 }
2565 }
2566 ,
2567 None,
2568 Some(&next_stmt_id.to_string()),
2569 );
2570 }
2571 BuildersOrCallback::Callback(_, node_callback) => {
2572 node_callback(self, next_stmt_id);
2573 }
2574 }
2575
2576 *next_stmt_id += 1;
2577
2578 stream_ident
2579 }
2580
2581 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2582 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2583 parse_quote!(difference)
2584 } else {
2585 parse_quote!(anti_join)
2586 };
2587
2588 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2589 self
2590 else {
2591 unreachable!()
2592 };
2593
2594 let (neg, neg_lifetime) = if neg.metadata().location_kind.is_top_level() {
2595 (neg, quote!('static))
2596 } else {
2597 (neg, quote!('tick))
2598 };
2599
2600 let pos_ident = pos.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2601 let neg_ident = neg.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2602
2603 let stream_ident =
2604 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2605
2606 match builders_or_callback {
2607 BuildersOrCallback::Builders(graph_builders) => {
2608 let builder = graph_builders.get_dfir_mut(&out_location);
2609 builder.add_dfir(
2610 parse_quote! {
2611 #stream_ident = #operator::<'tick, #neg_lifetime>();
2612 #pos_ident -> [pos]#stream_ident;
2613 #neg_ident -> [neg]#stream_ident;
2614 },
2615 None,
2616 Some(&next_stmt_id.to_string()),
2617 );
2618 }
2619 BuildersOrCallback::Callback(_, node_callback) => {
2620 node_callback(self, next_stmt_id);
2621 }
2622 }
2623
2624 *next_stmt_id += 1;
2625
2626 stream_ident
2627 }
2628
2629 HydroNode::ResolveFutures { input, .. } => {
2630 let input_ident =
2631 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2632
2633 let futures_ident =
2634 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2635
2636 match builders_or_callback {
2637 BuildersOrCallback::Builders(graph_builders) => {
2638 let builder = graph_builders.get_dfir_mut(&out_location);
2639 builder.add_dfir(
2640 parse_quote! {
2641 #futures_ident = #input_ident -> resolve_futures();
2642 },
2643 None,
2644 Some(&next_stmt_id.to_string()),
2645 );
2646 }
2647 BuildersOrCallback::Callback(_, node_callback) => {
2648 node_callback(self, next_stmt_id);
2649 }
2650 }
2651
2652 *next_stmt_id += 1;
2653
2654 futures_ident
2655 }
2656
2657 HydroNode::ResolveFuturesOrdered { input, .. } => {
2658 let input_ident =
2659 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2660
2661 let futures_ident =
2662 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2663
2664 match builders_or_callback {
2665 BuildersOrCallback::Builders(graph_builders) => {
2666 let builder = graph_builders.get_dfir_mut(&out_location);
2667 builder.add_dfir(
2668 parse_quote! {
2669 #futures_ident = #input_ident -> resolve_futures_ordered();
2670 },
2671 None,
2672 Some(&next_stmt_id.to_string()),
2673 );
2674 }
2675 BuildersOrCallback::Callback(_, node_callback) => {
2676 node_callback(self, next_stmt_id);
2677 }
2678 }
2679
2680 *next_stmt_id += 1;
2681
2682 futures_ident
2683 }
2684
2685 HydroNode::Map { f, input, .. } => {
2686 let input_ident =
2687 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2688
2689 let map_ident =
2690 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2691
2692 match builders_or_callback {
2693 BuildersOrCallback::Builders(graph_builders) => {
2694 let builder = graph_builders.get_dfir_mut(&out_location);
2695 builder.add_dfir(
2696 parse_quote! {
2697 #map_ident = #input_ident -> map(#f);
2698 },
2699 None,
2700 Some(&next_stmt_id.to_string()),
2701 );
2702 }
2703 BuildersOrCallback::Callback(_, node_callback) => {
2704 node_callback(self, next_stmt_id);
2705 }
2706 }
2707
2708 *next_stmt_id += 1;
2709
2710 map_ident
2711 }
2712
2713 HydroNode::FlatMap { f, input, .. } => {
2714 let input_ident =
2715 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2716
2717 let flat_map_ident =
2718 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2719
2720 match builders_or_callback {
2721 BuildersOrCallback::Builders(graph_builders) => {
2722 let builder = graph_builders.get_dfir_mut(&out_location);
2723 builder.add_dfir(
2724 parse_quote! {
2725 #flat_map_ident = #input_ident -> flat_map(#f);
2726 },
2727 None,
2728 Some(&next_stmt_id.to_string()),
2729 );
2730 }
2731 BuildersOrCallback::Callback(_, node_callback) => {
2732 node_callback(self, next_stmt_id);
2733 }
2734 }
2735
2736 *next_stmt_id += 1;
2737
2738 flat_map_ident
2739 }
2740
2741 HydroNode::Filter { f, input, .. } => {
2742 let input_ident =
2743 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2744
2745 let filter_ident =
2746 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2747
2748 match builders_or_callback {
2749 BuildersOrCallback::Builders(graph_builders) => {
2750 let builder = graph_builders.get_dfir_mut(&out_location);
2751 builder.add_dfir(
2752 parse_quote! {
2753 #filter_ident = #input_ident -> filter(#f);
2754 },
2755 None,
2756 Some(&next_stmt_id.to_string()),
2757 );
2758 }
2759 BuildersOrCallback::Callback(_, node_callback) => {
2760 node_callback(self, next_stmt_id);
2761 }
2762 }
2763
2764 *next_stmt_id += 1;
2765
2766 filter_ident
2767 }
2768
2769 HydroNode::FilterMap { f, input, .. } => {
2770 let input_ident =
2771 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2772
2773 let filter_map_ident =
2774 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2775
2776 match builders_or_callback {
2777 BuildersOrCallback::Builders(graph_builders) => {
2778 let builder = graph_builders.get_dfir_mut(&out_location);
2779 builder.add_dfir(
2780 parse_quote! {
2781 #filter_map_ident = #input_ident -> filter_map(#f);
2782 },
2783 None,
2784 Some(&next_stmt_id.to_string()),
2785 );
2786 }
2787 BuildersOrCallback::Callback(_, node_callback) => {
2788 node_callback(self, next_stmt_id);
2789 }
2790 }
2791
2792 *next_stmt_id += 1;
2793
2794 filter_map_ident
2795 }
2796
2797 HydroNode::Sort { input, .. } => {
2798 let input_ident =
2799 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2800
2801 let sort_ident =
2802 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2803
2804 match builders_or_callback {
2805 BuildersOrCallback::Builders(graph_builders) => {
2806 let builder = graph_builders.get_dfir_mut(&out_location);
2807 builder.add_dfir(
2808 parse_quote! {
2809 #sort_ident = #input_ident -> sort();
2810 },
2811 None,
2812 Some(&next_stmt_id.to_string()),
2813 );
2814 }
2815 BuildersOrCallback::Callback(_, node_callback) => {
2816 node_callback(self, next_stmt_id);
2817 }
2818 }
2819
2820 *next_stmt_id += 1;
2821
2822 sort_ident
2823 }
2824
2825 HydroNode::DeferTick { input, .. } => {
2826 let input_ident =
2827 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2828
2829 let defer_tick_ident =
2830 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2831
2832 match builders_or_callback {
2833 BuildersOrCallback::Builders(graph_builders) => {
2834 let builder = graph_builders.get_dfir_mut(&out_location);
2835 builder.add_dfir(
2836 parse_quote! {
2837 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2838 },
2839 None,
2840 Some(&next_stmt_id.to_string()),
2841 );
2842 }
2843 BuildersOrCallback::Callback(_, node_callback) => {
2844 node_callback(self, next_stmt_id);
2845 }
2846 }
2847
2848 *next_stmt_id += 1;
2849
2850 defer_tick_ident
2851 }
2852
2853 HydroNode::Enumerate { input, .. } => {
2854 let input_ident =
2855 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2856
2857 let enumerate_ident =
2858 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2859
2860 match builders_or_callback {
2861 BuildersOrCallback::Builders(graph_builders) => {
2862 let builder = graph_builders.get_dfir_mut(&out_location);
2863 let lifetime = if input.metadata().location_kind.is_top_level() {
2864 quote!('static)
2865 } else {
2866 quote!('tick)
2867 };
2868 builder.add_dfir(
2869 parse_quote! {
2870 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2871 },
2872 None,
2873 Some(&next_stmt_id.to_string()),
2874 );
2875 }
2876 BuildersOrCallback::Callback(_, node_callback) => {
2877 node_callback(self, next_stmt_id);
2878 }
2879 }
2880
2881 *next_stmt_id += 1;
2882
2883 enumerate_ident
2884 }
2885
2886 HydroNode::Inspect { f, input, .. } => {
2887 let input_ident =
2888 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2889
2890 let inspect_ident =
2891 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2892
2893 match builders_or_callback {
2894 BuildersOrCallback::Builders(graph_builders) => {
2895 let builder = graph_builders.get_dfir_mut(&out_location);
2896 builder.add_dfir(
2897 parse_quote! {
2898 #inspect_ident = #input_ident -> inspect(#f);
2899 },
2900 None,
2901 Some(&next_stmt_id.to_string()),
2902 );
2903 }
2904 BuildersOrCallback::Callback(_, node_callback) => {
2905 node_callback(self, next_stmt_id);
2906 }
2907 }
2908
2909 *next_stmt_id += 1;
2910
2911 inspect_ident
2912 }
2913
2914 HydroNode::Unique { input, .. } => {
2915 let input_ident =
2916 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2917
2918 let unique_ident =
2919 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2920
2921 match builders_or_callback {
2922 BuildersOrCallback::Builders(graph_builders) => {
2923 let builder = graph_builders.get_dfir_mut(&out_location);
2924 let lifetime = if input.metadata().location_kind.is_top_level() {
2925 quote!('static)
2926 } else {
2927 quote!('tick)
2928 };
2929
2930 builder.add_dfir(
2931 parse_quote! {
2932 #unique_ident = #input_ident -> unique::<#lifetime>();
2933 },
2934 None,
2935 Some(&next_stmt_id.to_string()),
2936 );
2937 }
2938 BuildersOrCallback::Callback(_, node_callback) => {
2939 node_callback(self, next_stmt_id);
2940 }
2941 }
2942
2943 *next_stmt_id += 1;
2944
2945 unique_ident
2946 }
2947
2948 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2949 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2950 parse_quote!(fold)
2951 } else if matches!(self, HydroNode::Scan { .. }) {
2952 parse_quote!(scan)
2953 } else {
2954 parse_quote!(fold_keyed)
2955 };
2956
2957 let (HydroNode::Fold { input, .. }
2958 | HydroNode::FoldKeyed { input, .. }
2959 | HydroNode::Scan { input, .. }) = self
2960 else {
2961 unreachable!()
2962 };
2963
2964 let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
2965 (input, quote!('static))
2966 } else {
2967 (input, quote!('tick))
2968 };
2969
2970 let input_ident =
2971 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2972
2973 let (HydroNode::Fold { init, acc, .. }
2974 | HydroNode::FoldKeyed { init, acc, .. }
2975 | HydroNode::Scan { init, acc, .. }) = &*self
2976 else {
2977 unreachable!()
2978 };
2979
2980 let fold_ident =
2981 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2982
2983 match builders_or_callback {
2984 BuildersOrCallback::Builders(graph_builders) => {
2985 if matches!(self, HydroNode::Fold { .. })
2986 && self.metadata().location_kind.is_top_level()
2987 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2988 && graph_builders.singleton_intermediates()
2989 {
2990 let builder = graph_builders.get_dfir_mut(&out_location);
2991
2992 let acc: syn::Expr = parse_quote!({
2993 let mut __inner = #acc;
2994 move |__state, __value| {
2995 __inner(__state, __value);
2996 Some(__state.clone())
2997 }
2998 });
2999
3000 builder.add_dfir(
3001 parse_quote! {
3002 source_iter([(#init)()]) -> [0]#fold_ident;
3003 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3004 #fold_ident = chain();
3005 },
3006 None,
3007 Some(&next_stmt_id.to_string()),
3008 );
3009 } else if matches!(self, HydroNode::FoldKeyed { .. })
3010 && self.metadata().location_kind.is_top_level()
3011 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3012 && graph_builders.singleton_intermediates()
3013 {
3014 let builder = graph_builders.get_dfir_mut(&out_location);
3015
3016 let acc: syn::Expr = parse_quote!({
3017 let mut __init = #init;
3018 let mut __inner = #acc;
3019 move |__state, __kv: (_, _)| {
3020 let __state = __state
3022 .entry(::std::clone::Clone::clone(&__kv.0))
3023 .or_insert_with(|| (__init)());
3024 __inner(__state, __kv.1);
3025 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3026 }
3027 });
3028
3029 builder.add_dfir(
3030 parse_quote! {
3031 source_iter([(#init)()]) -> [0]#fold_ident;
3032 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3033 },
3034 None,
3035 Some(&next_stmt_id.to_string()),
3036 );
3037 } else {
3038 let builder = graph_builders.get_dfir_mut(&out_location);
3039 builder.add_dfir(
3040 parse_quote! {
3041 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3042 },
3043 None,
3044 Some(&next_stmt_id.to_string()),
3045 );
3046 }
3047 }
3048 BuildersOrCallback::Callback(_, node_callback) => {
3049 node_callback(self, next_stmt_id);
3050 }
3051 }
3052
3053 *next_stmt_id += 1;
3054
3055 fold_ident
3056 }
3057
3058 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3059 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
3060 parse_quote!(reduce)
3061 } else {
3062 parse_quote!(reduce_keyed)
3063 };
3064
3065 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = self
3066 else {
3067 unreachable!()
3068 };
3069
3070 let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
3071 (input, quote!('static))
3072 } else {
3073 (input, quote!('tick))
3074 };
3075
3076 let input_ident =
3077 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3078
3079 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*self
3080 else {
3081 unreachable!()
3082 };
3083
3084 let reduce_ident =
3085 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3086
3087 match builders_or_callback {
3088 BuildersOrCallback::Builders(graph_builders) => {
3089 if matches!(self, HydroNode::Reduce { .. })
3090 && self.metadata().location_kind.is_top_level()
3091 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3092 && graph_builders.singleton_intermediates()
3093 {
3094 todo!(
3095 "Reduce with optional intermediates is not yet supported in simulator"
3096 );
3097 } else if matches!(self, HydroNode::ReduceKeyed { .. })
3098 && self.metadata().location_kind.is_top_level()
3099 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3100 && graph_builders.singleton_intermediates()
3101 {
3102 todo!();
3103 } else {
3104 let builder = graph_builders.get_dfir_mut(&out_location);
3105 builder.add_dfir(
3106 parse_quote! {
3107 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3108 },
3109 None,
3110 Some(&next_stmt_id.to_string()),
3111 );
3112 }
3113 }
3114 BuildersOrCallback::Callback(_, node_callback) => {
3115 node_callback(self, next_stmt_id);
3116 }
3117 }
3118
3119 *next_stmt_id += 1;
3120
3121 reduce_ident
3122 }
3123
3124 HydroNode::ReduceKeyedWatermark {
3125 f,
3126 input,
3127 watermark,
3128 ..
3129 } => {
3130 let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
3131 (input, quote!('static))
3132 } else {
3133 (input, quote!('tick))
3134 };
3135
3136 let input_ident =
3137 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3138
3139 let watermark_ident =
3140 watermark.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3141
3142 let chain_ident = syn::Ident::new(
3143 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3144 Span::call_site(),
3145 );
3146
3147 let fold_ident =
3148 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3149
3150 match builders_or_callback {
3151 BuildersOrCallback::Builders(graph_builders) => {
3152 let builder = graph_builders.get_dfir_mut(&out_location);
3153 builder.add_dfir(
3158 parse_quote! {
3159 #chain_ident = chain();
3160 #input_ident
3161 -> map(|x| (Some(x), None))
3162 -> [0]#chain_ident;
3163 #watermark_ident
3164 -> map(|watermark| (None, Some(watermark)))
3165 -> [1]#chain_ident;
3166
3167 #fold_ident = #chain_ident
3168 -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3169 let __reduce_keyed_fn = #f;
3170 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3171 if let Some((k, v)) = opt_payload {
3172 if let Some(curr_watermark) = *opt_curr_watermark {
3173 if k <= curr_watermark {
3174 return;
3175 }
3176 }
3177 match map.entry(k) {
3178 ::std::collections::hash_map::Entry::Vacant(e) => {
3179 e.insert(v);
3180 }
3181 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3182 __reduce_keyed_fn(e.get_mut(), v);
3183 }
3184 }
3185 } else {
3186 let watermark = opt_watermark.unwrap();
3187 if let Some(curr_watermark) = *opt_curr_watermark {
3188 if watermark <= curr_watermark {
3189 return;
3190 }
3191 }
3192 *opt_curr_watermark = opt_watermark;
3193 map.retain(|k, _| *k > watermark);
3194 }
3195 }
3196 })
3197 -> flat_map(|(map, _curr_watermark)| map);
3198 },
3199 None,
3200 Some(&next_stmt_id.to_string()),
3201 );
3202 }
3203 BuildersOrCallback::Callback(_, node_callback) => {
3204 node_callback(self, next_stmt_id);
3205 }
3206 }
3207
3208 *next_stmt_id += 1;
3209
3210 fold_ident
3211 }
3212
3213 HydroNode::Network {
3214 serialize_fn: serialize_pipeline,
3215 instantiate_fn,
3216 deserialize_fn: deserialize_pipeline,
3217 input,
3218 ..
3219 } => {
3220 let input_ident =
3221 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3222
3223 let receiver_stream_ident =
3224 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3225
3226 match builders_or_callback {
3227 BuildersOrCallback::Builders(graph_builders) => {
3228 let (sink_expr, source_expr) = match instantiate_fn {
3229 DebugInstantiate::Building => (
3230 syn::parse_quote!(DUMMY_SINK),
3231 syn::parse_quote!(DUMMY_SOURCE),
3232 ),
3233
3234 DebugInstantiate::Finalized(finalized) => {
3235 (finalized.sink.clone(), finalized.source.clone())
3236 }
3237 };
3238
3239 graph_builders.create_network(
3240 &input.metadata().location_kind,
3241 &out_location,
3242 input_ident,
3243 &receiver_stream_ident,
3244 serialize_pipeline,
3245 sink_expr,
3246 source_expr,
3247 deserialize_pipeline,
3248 *next_stmt_id,
3249 );
3250 }
3251 BuildersOrCallback::Callback(_, node_callback) => {
3252 node_callback(self, next_stmt_id);
3253 }
3254 }
3255
3256 *next_stmt_id += 1;
3257
3258 receiver_stream_ident
3259 }
3260
3261 HydroNode::ExternalInput {
3262 instantiate_fn,
3263 deserialize_fn: deserialize_pipeline,
3264 ..
3265 } => {
3266 let receiver_stream_ident =
3267 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3268
3269 match builders_or_callback {
3270 BuildersOrCallback::Builders(graph_builders) => {
3271 let (_, source_expr) = match instantiate_fn {
3272 DebugInstantiate::Building => (
3273 syn::parse_quote!(DUMMY_SINK),
3274 syn::parse_quote!(DUMMY_SOURCE),
3275 ),
3276
3277 DebugInstantiate::Finalized(finalized) => {
3278 (finalized.sink.clone(), finalized.source.clone())
3279 }
3280 };
3281
3282 graph_builders.create_external_source(
3283 &out_location,
3284 source_expr,
3285 &receiver_stream_ident,
3286 deserialize_pipeline,
3287 *next_stmt_id,
3288 );
3289 }
3290 BuildersOrCallback::Callback(_, node_callback) => {
3291 node_callback(self, next_stmt_id);
3292 }
3293 }
3294
3295 *next_stmt_id += 1;
3296
3297 receiver_stream_ident
3298 }
3299
3300 HydroNode::Counter {
3301 tag,
3302 duration,
3303 prefix,
3304 input,
3305 ..
3306 } => {
3307 let input_ident =
3308 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3309
3310 let counter_ident =
3311 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3312
3313 match builders_or_callback {
3314 BuildersOrCallback::Builders(graph_builders) => {
3315 let builder = graph_builders.get_dfir_mut(&out_location);
3316 builder.add_dfir(
3317 parse_quote! {
3318 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3319 },
3320 None,
3321 Some(&next_stmt_id.to_string()),
3322 );
3323 }
3324 BuildersOrCallback::Callback(_, node_callback) => {
3325 node_callback(self, next_stmt_id);
3326 }
3327 }
3328
3329 *next_stmt_id += 1;
3330
3331 counter_ident
3332 }
3333 }
3334 }
3335
3336 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3337 match self {
3338 HydroNode::Placeholder => {
3339 panic!()
3340 }
3341 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3342 HydroNode::Source { source, .. } => match source {
3343 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3344 HydroSource::ExternalNetwork()
3345 | HydroSource::Spin()
3346 | HydroSource::ClusterMembers(_) => {} },
3348 HydroNode::SingletonSource { value, .. } => {
3349 transform(value);
3350 }
3351 HydroNode::CycleSource { .. }
3352 | HydroNode::Tee { .. }
3353 | HydroNode::YieldConcat { .. }
3354 | HydroNode::BeginAtomic { .. }
3355 | HydroNode::EndAtomic { .. }
3356 | HydroNode::Batch { .. }
3357 | HydroNode::Chain { .. }
3358 | HydroNode::ChainFirst { .. }
3359 | HydroNode::CrossProduct { .. }
3360 | HydroNode::CrossSingleton { .. }
3361 | HydroNode::ResolveFutures { .. }
3362 | HydroNode::ResolveFuturesOrdered { .. }
3363 | HydroNode::Join { .. }
3364 | HydroNode::Difference { .. }
3365 | HydroNode::AntiJoin { .. }
3366 | HydroNode::DeferTick { .. }
3367 | HydroNode::Enumerate { .. }
3368 | HydroNode::Unique { .. }
3369 | HydroNode::Sort { .. } => {}
3370 HydroNode::Map { f, .. }
3371 | HydroNode::FlatMap { f, .. }
3372 | HydroNode::Filter { f, .. }
3373 | HydroNode::FilterMap { f, .. }
3374 | HydroNode::Inspect { f, .. }
3375 | HydroNode::Reduce { f, .. }
3376 | HydroNode::ReduceKeyed { f, .. }
3377 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3378 transform(f);
3379 }
3380 HydroNode::Fold { init, acc, .. }
3381 | HydroNode::Scan { init, acc, .. }
3382 | HydroNode::FoldKeyed { init, acc, .. } => {
3383 transform(init);
3384 transform(acc);
3385 }
3386 HydroNode::Network {
3387 serialize_fn,
3388 deserialize_fn,
3389 ..
3390 } => {
3391 if let Some(serialize_fn) = serialize_fn {
3392 transform(serialize_fn);
3393 }
3394 if let Some(deserialize_fn) = deserialize_fn {
3395 transform(deserialize_fn);
3396 }
3397 }
3398 HydroNode::ExternalInput { deserialize_fn, .. } => {
3399 if let Some(deserialize_fn) = deserialize_fn {
3400 transform(deserialize_fn);
3401 }
3402 }
3403 HydroNode::Counter { duration, .. } => {
3404 transform(duration);
3405 }
3406 }
3407 }
3408
3409 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3410 &self.metadata().op
3411 }
3412
3413 pub fn metadata(&self) -> &HydroIrMetadata {
3414 match self {
3415 HydroNode::Placeholder => {
3416 panic!()
3417 }
3418 HydroNode::Cast { metadata, .. } => metadata,
3419 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3420 HydroNode::Source { metadata, .. } => metadata,
3421 HydroNode::SingletonSource { metadata, .. } => metadata,
3422 HydroNode::CycleSource { metadata, .. } => metadata,
3423 HydroNode::Tee { metadata, .. } => metadata,
3424 HydroNode::YieldConcat { metadata, .. } => metadata,
3425 HydroNode::BeginAtomic { metadata, .. } => metadata,
3426 HydroNode::EndAtomic { metadata, .. } => metadata,
3427 HydroNode::Batch { metadata, .. } => metadata,
3428 HydroNode::Chain { metadata, .. } => metadata,
3429 HydroNode::ChainFirst { metadata, .. } => metadata,
3430 HydroNode::CrossProduct { metadata, .. } => metadata,
3431 HydroNode::CrossSingleton { metadata, .. } => metadata,
3432 HydroNode::Join { metadata, .. } => metadata,
3433 HydroNode::Difference { metadata, .. } => metadata,
3434 HydroNode::AntiJoin { metadata, .. } => metadata,
3435 HydroNode::ResolveFutures { metadata, .. } => metadata,
3436 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3437 HydroNode::Map { metadata, .. } => metadata,
3438 HydroNode::FlatMap { metadata, .. } => metadata,
3439 HydroNode::Filter { metadata, .. } => metadata,
3440 HydroNode::FilterMap { metadata, .. } => metadata,
3441 HydroNode::DeferTick { metadata, .. } => metadata,
3442 HydroNode::Enumerate { metadata, .. } => metadata,
3443 HydroNode::Inspect { metadata, .. } => metadata,
3444 HydroNode::Unique { metadata, .. } => metadata,
3445 HydroNode::Sort { metadata, .. } => metadata,
3446 HydroNode::Scan { metadata, .. } => metadata,
3447 HydroNode::Fold { metadata, .. } => metadata,
3448 HydroNode::FoldKeyed { metadata, .. } => metadata,
3449 HydroNode::Reduce { metadata, .. } => metadata,
3450 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3451 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3452 HydroNode::ExternalInput { metadata, .. } => metadata,
3453 HydroNode::Network { metadata, .. } => metadata,
3454 HydroNode::Counter { metadata, .. } => metadata,
3455 }
3456 }
3457
3458 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3459 &mut self.metadata_mut().op
3460 }
3461
3462 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3463 match self {
3464 HydroNode::Placeholder => {
3465 panic!()
3466 }
3467 HydroNode::Cast { metadata, .. } => metadata,
3468 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3469 HydroNode::Source { metadata, .. } => metadata,
3470 HydroNode::SingletonSource { metadata, .. } => metadata,
3471 HydroNode::CycleSource { metadata, .. } => metadata,
3472 HydroNode::Tee { metadata, .. } => metadata,
3473 HydroNode::YieldConcat { metadata, .. } => metadata,
3474 HydroNode::BeginAtomic { metadata, .. } => metadata,
3475 HydroNode::EndAtomic { metadata, .. } => metadata,
3476 HydroNode::Batch { metadata, .. } => metadata,
3477 HydroNode::Chain { metadata, .. } => metadata,
3478 HydroNode::ChainFirst { metadata, .. } => metadata,
3479 HydroNode::CrossProduct { metadata, .. } => metadata,
3480 HydroNode::CrossSingleton { metadata, .. } => metadata,
3481 HydroNode::Join { metadata, .. } => metadata,
3482 HydroNode::Difference { metadata, .. } => metadata,
3483 HydroNode::AntiJoin { metadata, .. } => metadata,
3484 HydroNode::ResolveFutures { metadata, .. } => metadata,
3485 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3486 HydroNode::Map { metadata, .. } => metadata,
3487 HydroNode::FlatMap { metadata, .. } => metadata,
3488 HydroNode::Filter { metadata, .. } => metadata,
3489 HydroNode::FilterMap { metadata, .. } => metadata,
3490 HydroNode::DeferTick { metadata, .. } => metadata,
3491 HydroNode::Enumerate { metadata, .. } => metadata,
3492 HydroNode::Inspect { metadata, .. } => metadata,
3493 HydroNode::Unique { metadata, .. } => metadata,
3494 HydroNode::Sort { metadata, .. } => metadata,
3495 HydroNode::Scan { metadata, .. } => metadata,
3496 HydroNode::Fold { metadata, .. } => metadata,
3497 HydroNode::FoldKeyed { metadata, .. } => metadata,
3498 HydroNode::Reduce { metadata, .. } => metadata,
3499 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3500 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3501 HydroNode::ExternalInput { metadata, .. } => metadata,
3502 HydroNode::Network { metadata, .. } => metadata,
3503 HydroNode::Counter { metadata, .. } => metadata,
3504 }
3505 }
3506
3507 pub fn input(&self) -> Vec<&HydroNode> {
3508 match self {
3509 HydroNode::Placeholder => {
3510 panic!()
3511 }
3512 HydroNode::Source { .. }
3513 | HydroNode::SingletonSource { .. }
3514 | HydroNode::ExternalInput { .. }
3515 | HydroNode::CycleSource { .. }
3516 | HydroNode::Tee { .. } => {
3517 vec![]
3519 }
3520 HydroNode::Cast { inner, .. }
3521 | HydroNode::ObserveNonDet { inner, .. }
3522 | HydroNode::YieldConcat { inner, .. }
3523 | HydroNode::BeginAtomic { inner, .. }
3524 | HydroNode::EndAtomic { inner, .. }
3525 | HydroNode::Batch { inner, .. } => {
3526 vec![inner]
3527 }
3528 HydroNode::Chain { first, second, .. } => {
3529 vec![first, second]
3530 }
3531 HydroNode::ChainFirst { first, second, .. } => {
3532 vec![first, second]
3533 }
3534 HydroNode::CrossProduct { left, right, .. }
3535 | HydroNode::CrossSingleton { left, right, .. }
3536 | HydroNode::Join { left, right, .. } => {
3537 vec![left, right]
3538 }
3539 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3540 vec![pos, neg]
3541 }
3542 HydroNode::Map { input, .. }
3543 | HydroNode::FlatMap { input, .. }
3544 | HydroNode::Filter { input, .. }
3545 | HydroNode::FilterMap { input, .. }
3546 | HydroNode::Sort { input, .. }
3547 | HydroNode::DeferTick { input, .. }
3548 | HydroNode::Enumerate { input, .. }
3549 | HydroNode::Inspect { input, .. }
3550 | HydroNode::Unique { input, .. }
3551 | HydroNode::Network { input, .. }
3552 | HydroNode::Counter { input, .. }
3553 | HydroNode::ResolveFutures { input, .. }
3554 | HydroNode::ResolveFuturesOrdered { input, .. }
3555 | HydroNode::Fold { input, .. }
3556 | HydroNode::FoldKeyed { input, .. }
3557 | HydroNode::Reduce { input, .. }
3558 | HydroNode::ReduceKeyed { input, .. }
3559 | HydroNode::Scan { input, .. } => {
3560 vec![input]
3561 }
3562 HydroNode::ReduceKeyedWatermark {
3563 input, watermark, ..
3564 } => {
3565 vec![input, watermark]
3566 }
3567 }
3568 }
3569
3570 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3571 self.input()
3572 .iter()
3573 .map(|input_node| input_node.metadata())
3574 .collect()
3575 }
3576
3577 pub fn print_root(&self) -> String {
3578 match self {
3579 HydroNode::Placeholder => {
3580 panic!()
3581 }
3582 HydroNode::Cast { .. } => "Cast()".to_string(),
3583 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3584 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3585 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3586 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3587 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3588 HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3589 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3590 HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3591 HydroNode::Batch { .. } => "Batch()".to_string(),
3592 HydroNode::Chain { first, second, .. } => {
3593 format!("Chain({}, {})", first.print_root(), second.print_root())
3594 }
3595 HydroNode::ChainFirst { first, second, .. } => {
3596 format!(
3597 "ChainFirst({}, {})",
3598 first.print_root(),
3599 second.print_root()
3600 )
3601 }
3602 HydroNode::CrossProduct { left, right, .. } => {
3603 format!(
3604 "CrossProduct({}, {})",
3605 left.print_root(),
3606 right.print_root()
3607 )
3608 }
3609 HydroNode::CrossSingleton { left, right, .. } => {
3610 format!(
3611 "CrossSingleton({}, {})",
3612 left.print_root(),
3613 right.print_root()
3614 )
3615 }
3616 HydroNode::Join { left, right, .. } => {
3617 format!("Join({}, {})", left.print_root(), right.print_root())
3618 }
3619 HydroNode::Difference { pos, neg, .. } => {
3620 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3621 }
3622 HydroNode::AntiJoin { pos, neg, .. } => {
3623 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3624 }
3625 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3626 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3627 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3628 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3629 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3630 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3631 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3632 HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3633 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3634 HydroNode::Unique { .. } => "Unique()".to_string(),
3635 HydroNode::Sort { .. } => "Sort()".to_string(),
3636 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3637 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3638 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3639 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3640 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3641 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3642 HydroNode::Network { .. } => "Network()".to_string(),
3643 HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3644 HydroNode::Counter { tag, duration, .. } => {
3645 format!("Counter({:?}, {:?})", tag, duration)
3646 }
3647 }
3648 }
3649}
3650
3651#[cfg(feature = "build")]
3652fn instantiate_network<'a, D>(
3653 from_location: &LocationId,
3654 to_location: &LocationId,
3655 processes: &HashMap<usize, D::Process>,
3656 clusters: &HashMap<usize, D::Cluster>,
3657) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3658where
3659 D: Deploy<'a>,
3660{
3661 let ((sink, source), connect_fn) = match (from_location, to_location) {
3662 (LocationId::Process(from), LocationId::Process(to)) => {
3663 let from_node = processes
3664 .get(from)
3665 .unwrap_or_else(|| {
3666 panic!("A process used in the graph was not instantiated: {}", from)
3667 })
3668 .clone();
3669 let to_node = processes
3670 .get(to)
3671 .unwrap_or_else(|| {
3672 panic!("A process used in the graph was not instantiated: {}", to)
3673 })
3674 .clone();
3675
3676 let sink_port = D::allocate_process_port(&from_node);
3677 let source_port = D::allocate_process_port(&to_node);
3678
3679 (
3680 D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3681 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3682 )
3683 }
3684 (LocationId::Process(from), LocationId::Cluster(to)) => {
3685 let from_node = processes
3686 .get(from)
3687 .unwrap_or_else(|| {
3688 panic!("A process used in the graph was not instantiated: {}", from)
3689 })
3690 .clone();
3691 let to_node = clusters
3692 .get(to)
3693 .unwrap_or_else(|| {
3694 panic!("A cluster used in the graph was not instantiated: {}", to)
3695 })
3696 .clone();
3697
3698 let sink_port = D::allocate_process_port(&from_node);
3699 let source_port = D::allocate_cluster_port(&to_node);
3700
3701 (
3702 D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3703 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3704 )
3705 }
3706 (LocationId::Cluster(from), LocationId::Process(to)) => {
3707 let from_node = clusters
3708 .get(from)
3709 .unwrap_or_else(|| {
3710 panic!("A cluster used in the graph was not instantiated: {}", from)
3711 })
3712 .clone();
3713 let to_node = processes
3714 .get(to)
3715 .unwrap_or_else(|| {
3716 panic!("A process used in the graph was not instantiated: {}", to)
3717 })
3718 .clone();
3719
3720 let sink_port = D::allocate_cluster_port(&from_node);
3721 let source_port = D::allocate_process_port(&to_node);
3722
3723 (
3724 D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3725 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3726 )
3727 }
3728 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3729 let from_node = clusters
3730 .get(from)
3731 .unwrap_or_else(|| {
3732 panic!("A cluster used in the graph was not instantiated: {}", from)
3733 })
3734 .clone();
3735 let to_node = clusters
3736 .get(to)
3737 .unwrap_or_else(|| {
3738 panic!("A cluster used in the graph was not instantiated: {}", to)
3739 })
3740 .clone();
3741
3742 let sink_port = D::allocate_cluster_port(&from_node);
3743 let source_port = D::allocate_cluster_port(&to_node);
3744
3745 (
3746 D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3747 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3748 )
3749 }
3750 (LocationId::Tick(_, _), _) => panic!(),
3751 (_, LocationId::Tick(_, _)) => panic!(),
3752 (LocationId::Atomic(_), _) => panic!(),
3753 (_, LocationId::Atomic(_)) => panic!(),
3754 };
3755 (sink, source, connect_fn)
3756}
3757
3758#[cfg(test)]
3759mod test {
3760 use std::mem::size_of;
3761
3762 use stageleft::{QuotedWithContext, q};
3763
3764 use super::*;
3765
3766 #[test]
3767 #[cfg_attr(
3768 not(feature = "build"),
3769 ignore = "expects inclusion of feature-gated fields"
3770 )]
3771 fn hydro_node_size() {
3772 assert_eq!(size_of::<HydroNode>(), 272);
3773 }
3774
3775 #[test]
3776 #[cfg_attr(
3777 not(feature = "build"),
3778 ignore = "expects inclusion of feature-gated fields"
3779 )]
3780 fn hydro_root_size() {
3781 assert_eq!(size_of::<HydroRoot>(), 168);
3782 }
3783
3784 #[test]
3785 fn test_simplify_q_macro_basic() {
3786 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3788 let result = simplify_q_macro(simple_expr.clone());
3789 assert_eq!(result, simple_expr);
3790 }
3791
3792 #[test]
3793 fn test_simplify_q_macro_actual_stageleft_call() {
3794 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3796 let result = simplify_q_macro(stageleft_call);
3797 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3800 }
3801
3802 #[test]
3803 fn test_closure_no_pipe_at_start() {
3804 let stageleft_call = q!({
3806 let foo = 123;
3807 move |b: usize| b + foo
3808 })
3809 .splice_fn1_ctx(&());
3810 let result = simplify_q_macro(stageleft_call);
3811 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3812 }
3813}