Skip to main content

hydro_lang/viz/
render.rs

1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::{Display, Write};
4use std::num::ParseIntError;
5use std::sync::OnceLock;
6
7use auto_impl::auto_impl;
8use slotmap::{Key, SecondaryMap, SlotMap};
9
10pub use super::graphviz::{HydroDot, escape_dot};
11pub use super::json::HydroJson;
12// Re-export specific implementations
13pub use super::mermaid::{HydroMermaid, escape_mermaid};
14use crate::compile::ir::backtrace::Backtrace;
15use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
16use crate::location::dynamic::LocationId;
17use crate::location::{LocationKey, LocationType};
18
19/// Label for a graph node - can be either a static string or contain expressions.
20#[derive(Debug, Clone)]
21pub enum NodeLabel {
22    /// A static string label
23    Static(String),
24    /// A label with an operation name and expression arguments
25    WithExprs {
26        op_name: String,
27        exprs: Vec<DebugExpr>,
28    },
29}
30
31impl NodeLabel {
32    /// Create a static label
33    pub fn static_label(s: String) -> Self {
34        Self::Static(s)
35    }
36
37    /// Create a label for an operation with multiple expression
38    pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
39        Self::WithExprs { op_name, exprs }
40    }
41}
42
43impl Display for NodeLabel {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            Self::Static(s) => write!(f, "{}", s),
47            Self::WithExprs { op_name, exprs } => {
48                if exprs.is_empty() {
49                    write!(f, "{}()", op_name)
50                } else {
51                    let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
52                    write!(f, "{}({})", op_name, expr_strs.join(", "))
53                }
54            }
55        }
56    }
57}
58
59/// Base struct for text-based graph writers that use indentation.
60/// Contains common fields shared by DOT and Mermaid writers.
61pub struct IndentedGraphWriter<'a, W> {
62    pub write: W,
63    pub indent: usize,
64    pub config: HydroWriteConfig<'a>,
65}
66
67impl<'a, W> IndentedGraphWriter<'a, W> {
68    /// Create a new writer with default configuration.
69    pub fn new(write: W) -> Self {
70        Self {
71            write,
72            indent: 0,
73            config: HydroWriteConfig::default(),
74        }
75    }
76
77    /// Create a new writer with the given configuration.
78    pub fn new_with_config(write: W, config: HydroWriteConfig<'a>) -> Self {
79        Self {
80            write,
81            indent: 0,
82            config,
83        }
84    }
85}
86
87impl<W: Write> IndentedGraphWriter<'_, W> {
88    /// Write an indented line using the current indentation level.
89    pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
90        writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
91    }
92}
93
94/// Common error type used by all graph writers.
95pub type GraphWriteError = std::fmt::Error;
96
97/// Trait for writing textual representations of Hydro IR graphs, i.e. mermaid or dot graphs.
98#[auto_impl(&mut, Box)]
99pub trait HydroGraphWrite {
100    /// Error type emitted by writing.
101    type Err: Error;
102
103    /// Begin the graph. First method called.
104    fn write_prologue(&mut self) -> Result<(), Self::Err>;
105
106    /// Write a node definition with styling.
107    fn write_node_definition(
108        &mut self,
109        node_id: VizNodeKey,
110        node_label: &NodeLabel,
111        node_type: HydroNodeType,
112        location_key: Option<LocationKey>,
113        location_type: Option<LocationType>,
114        backtrace: Option<&Backtrace>,
115    ) -> Result<(), Self::Err>;
116
117    /// Write an edge between nodes with optional labeling.
118    fn write_edge(
119        &mut self,
120        src_id: VizNodeKey,
121        dst_id: VizNodeKey,
122        edge_properties: &HashSet<HydroEdgeProp>,
123        label: Option<&str>,
124    ) -> Result<(), Self::Err>;
125
126    /// Begin writing a location grouping (process/cluster).
127    fn write_location_start(
128        &mut self,
129        location_key: LocationKey,
130        location_type: LocationType,
131    ) -> Result<(), Self::Err>;
132
133    /// Write a node within a location.
134    fn write_node(&mut self, node_id: VizNodeKey) -> Result<(), Self::Err>;
135
136    /// End writing a location grouping.
137    fn write_location_end(&mut self) -> Result<(), Self::Err>;
138
139    /// End the graph. Last method called.
140    fn write_epilogue(&mut self) -> Result<(), Self::Err>;
141}
142
143/// Node type utilities - centralized handling of HydroNodeType operations
144pub mod node_type_utils {
145    use super::HydroNodeType;
146
147    /// All node types with their string names
148    const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
149        (HydroNodeType::Source, "Source"),
150        (HydroNodeType::Transform, "Transform"),
151        (HydroNodeType::Join, "Join"),
152        (HydroNodeType::Aggregation, "Aggregation"),
153        (HydroNodeType::Network, "Network"),
154        (HydroNodeType::Sink, "Sink"),
155        (HydroNodeType::Tee, "Tee"),
156        (HydroNodeType::NonDeterministic, "NonDeterministic"),
157    ];
158
159    /// Convert HydroNodeType to string representation (used by JSON format)
160    pub fn to_string(node_type: HydroNodeType) -> &'static str {
161        NODE_TYPE_DATA
162            .iter()
163            .find(|(nt, _)| *nt == node_type)
164            .map(|(_, name)| *name)
165            .unwrap_or("Unknown")
166    }
167
168    /// Get all node types with their string representations (used by JSON format)
169    pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
170        NODE_TYPE_DATA.to_vec()
171    }
172}
173
174/// Types of nodes in Hydro IR for styling purposes.
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum HydroNodeType {
177    Source,
178    Transform,
179    Join,
180    Aggregation,
181    Network,
182    Sink,
183    Tee,
184    NonDeterministic,
185}
186
187/// Types of edges in Hydro IR representing stream properties.
188#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
189pub enum HydroEdgeProp {
190    Bounded,
191    Unbounded,
192    TotalOrder,
193    NoOrder,
194    Keyed,
195    // Collection type tags for styling
196    Stream,
197    KeyedSingleton,
198    KeyedStream,
199    Singleton,
200    Optional,
201    Network,
202    Cycle,
203}
204
205/// Unified edge style representation for all graph formats.
206/// This intermediate format allows consistent styling across JSON, DOT, and Mermaid.
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct UnifiedEdgeStyle {
209    /// Line pattern (solid, dashed)
210    pub line_pattern: LinePattern,
211    /// Line width (1 = thin, 3 = thick)
212    pub line_width: u8,
213    /// Arrowhead style
214    pub arrowhead: ArrowheadStyle,
215    /// Line style (single plain line, or line with hash marks/dots for keyed streams)
216    pub line_style: LineStyle,
217    /// Halo/background effect for boundedness
218    pub halo: HaloStyle,
219    /// Line waviness for ordering information
220    pub waviness: WavinessStyle,
221    /// Whether animation is enabled (JSON only)
222    pub animation: AnimationStyle,
223    /// Color for the edge
224    pub color: &'static str,
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum LinePattern {
229    Solid,
230    Dotted,
231    Dashed,
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub enum ArrowheadStyle {
236    TriangleFilled,
237    CircleFilled,
238    DiamondOpen,
239    Default,
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq)]
243pub enum LineStyle {
244    /// Plain single line
245    Single,
246    /// Single line with hash marks/dots (for keyed streams)
247    HashMarks,
248}
249
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub enum HaloStyle {
252    None,
253    LightBlue,
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum WavinessStyle {
258    None,
259    Wavy,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum AnimationStyle {
264    Static,
265    Animated,
266}
267
268impl Default for UnifiedEdgeStyle {
269    fn default() -> Self {
270        Self {
271            line_pattern: LinePattern::Solid,
272            line_width: 1,
273            arrowhead: ArrowheadStyle::Default,
274            line_style: LineStyle::Single,
275            halo: HaloStyle::None,
276            waviness: WavinessStyle::None,
277            animation: AnimationStyle::Static,
278            color: "#666666",
279        }
280    }
281}
282
283/// Convert HydroEdgeType properties to unified edge style.
284/// This is the core logic for determining edge visual properties.
285///
286/// # Visual Encoding Mapping
287///
288/// | Semantic Property | Visual Channel | Values |
289/// |------------------|----------------|---------|
290/// | Network | Line Pattern + Animation | Local (solid, static), Network (dashed, animated) |
291/// | Ordering | Waviness | TotalOrder (straight), NoOrder (wavy) |
292/// | Boundedness | Halo | Bounded (none), Unbounded (light-blue transparent) |
293/// | Keyedness | Line Style | NotKeyed (plain line), Keyed (line with hash marks/dots) |
294/// | Collection Type | Color + Arrowhead | Stream (blue #2563eb, triangle), Singleton (black, circle), Optional (gray, diamond) |
295pub fn get_unified_edge_style(
296    edge_properties: &HashSet<HydroEdgeProp>,
297    src_location: Option<usize>,
298    dst_location: Option<usize>,
299) -> UnifiedEdgeStyle {
300    let mut style = UnifiedEdgeStyle::default();
301
302    // Network communication group - controls line pattern AND animation
303    let is_network = edge_properties.contains(&HydroEdgeProp::Network)
304        || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
305
306    if is_network {
307        style.line_pattern = LinePattern::Dashed;
308        style.animation = AnimationStyle::Animated;
309    } else {
310        style.line_pattern = LinePattern::Solid;
311        style.animation = AnimationStyle::Static;
312    }
313
314    // Boundedness group - controls halo
315    if edge_properties.contains(&HydroEdgeProp::Unbounded) {
316        style.halo = HaloStyle::LightBlue;
317    } else {
318        style.halo = HaloStyle::None;
319    }
320
321    // Collection type group - controls arrowhead and color
322    if edge_properties.contains(&HydroEdgeProp::Stream) {
323        style.arrowhead = ArrowheadStyle::TriangleFilled;
324        style.color = "#2563eb"; // Bright blue for Stream
325    } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
326        style.arrowhead = ArrowheadStyle::TriangleFilled;
327        style.color = "#2563eb"; // Bright blue for Stream (keyed variant)
328    } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
329        style.arrowhead = ArrowheadStyle::TriangleFilled;
330        style.color = "#000000"; // Black for Singleton (keyed variant)
331    } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
332        style.arrowhead = ArrowheadStyle::CircleFilled;
333        style.color = "#000000"; // Black for Singleton
334    } else if edge_properties.contains(&HydroEdgeProp::Optional) {
335        style.arrowhead = ArrowheadStyle::DiamondOpen;
336        style.color = "#6b7280"; // Gray for Optional
337    }
338
339    // Keyedness group - controls hash marks on the line
340    if edge_properties.contains(&HydroEdgeProp::Keyed) {
341        style.line_style = LineStyle::HashMarks; // Renders as hash marks/dots on the line in hydroscope
342    } else {
343        style.line_style = LineStyle::Single;
344    }
345
346    // Ordering group - waviness channel
347    if edge_properties.contains(&HydroEdgeProp::NoOrder) {
348        style.waviness = WavinessStyle::Wavy;
349    } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
350        style.waviness = WavinessStyle::None;
351    }
352
353    style
354}
355
356/// Extract semantic edge properties from CollectionKind metadata.
357/// This function analyzes the collection type and extracts relevant semantic tags
358/// for visualization purposes.
359pub fn extract_edge_properties_from_collection_kind(
360    collection_kind: &crate::compile::ir::CollectionKind,
361) -> HashSet<HydroEdgeProp> {
362    use crate::compile::ir::CollectionKind;
363
364    let mut properties = HashSet::new();
365
366    match collection_kind {
367        CollectionKind::Stream { bound, order, .. } => {
368            properties.insert(HydroEdgeProp::Stream);
369            add_bound_property(&mut properties, bound);
370            add_order_property(&mut properties, order);
371        }
372        CollectionKind::KeyedStream {
373            bound, value_order, ..
374        } => {
375            properties.insert(HydroEdgeProp::KeyedStream);
376            properties.insert(HydroEdgeProp::Keyed);
377            add_bound_property(&mut properties, bound);
378            add_order_property(&mut properties, value_order);
379        }
380        CollectionKind::Singleton { bound, .. } => {
381            properties.insert(HydroEdgeProp::Singleton);
382            add_bound_property(&mut properties, bound);
383            // Singletons have implicit TotalOrder
384            properties.insert(HydroEdgeProp::TotalOrder);
385        }
386        CollectionKind::Optional { bound, .. } => {
387            properties.insert(HydroEdgeProp::Optional);
388            add_bound_property(&mut properties, bound);
389            // Optionals have implicit TotalOrder
390            properties.insert(HydroEdgeProp::TotalOrder);
391        }
392        CollectionKind::KeyedSingleton { bound, .. } => {
393            properties.insert(HydroEdgeProp::Singleton);
394            properties.insert(HydroEdgeProp::Keyed);
395            // KeyedSingletons boundedness depends on the bound kind
396            add_keyed_singleton_bound_property(&mut properties, bound);
397            properties.insert(HydroEdgeProp::TotalOrder);
398        }
399    }
400
401    properties
402}
403
404/// Helper function to add bound property based on BoundKind.
405fn add_bound_property(
406    properties: &mut HashSet<HydroEdgeProp>,
407    bound: &crate::compile::ir::BoundKind,
408) {
409    use crate::compile::ir::BoundKind;
410
411    match bound {
412        BoundKind::Bounded => {
413            properties.insert(HydroEdgeProp::Bounded);
414        }
415        BoundKind::Unbounded => {
416            properties.insert(HydroEdgeProp::Unbounded);
417        }
418    }
419}
420
421/// Helper function to add bound property for KeyedSingleton based on KeyedSingletonBoundKind.
422fn add_keyed_singleton_bound_property(
423    properties: &mut HashSet<HydroEdgeProp>,
424    bound: &crate::compile::ir::KeyedSingletonBoundKind,
425) {
426    use crate::compile::ir::KeyedSingletonBoundKind;
427
428    match bound {
429        KeyedSingletonBoundKind::Bounded | KeyedSingletonBoundKind::BoundedValue => {
430            properties.insert(HydroEdgeProp::Bounded);
431        }
432        KeyedSingletonBoundKind::Unbounded => {
433            properties.insert(HydroEdgeProp::Unbounded);
434        }
435    }
436}
437
438/// Helper function to add order property based on StreamOrder.
439fn add_order_property(
440    properties: &mut HashSet<HydroEdgeProp>,
441    order: &crate::compile::ir::StreamOrder,
442) {
443    use crate::compile::ir::StreamOrder;
444
445    match order {
446        StreamOrder::TotalOrder => {
447            properties.insert(HydroEdgeProp::TotalOrder);
448        }
449        StreamOrder::NoOrder => {
450            properties.insert(HydroEdgeProp::NoOrder);
451        }
452    }
453}
454
455/// Detect if an edge crosses network boundaries by comparing source and destination locations.
456/// Returns true if the edge represents network communication between different locations.
457pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
458    // Compare the root locations to determine if they differ
459    src_location.root() != dst_location.root()
460}
461
462/// Add network edge tag if source and destination locations differ.
463pub fn add_network_edge_tag(
464    properties: &mut HashSet<HydroEdgeProp>,
465    src_location: &LocationId,
466    dst_location: &LocationId,
467) {
468    if is_network_edge(src_location, dst_location) {
469        properties.insert(HydroEdgeProp::Network);
470    }
471}
472
473/// Configuration for graph writing.
474#[derive(Debug, Clone, Copy)]
475pub struct HydroWriteConfig<'a> {
476    pub show_metadata: bool,
477    pub show_location_groups: bool,
478    pub use_short_labels: bool,
479    pub location_names: &'a SecondaryMap<LocationKey, String>,
480}
481
482impl Default for HydroWriteConfig<'_> {
483    fn default() -> Self {
484        static EMPTY: OnceLock<SecondaryMap<LocationKey, String>> = OnceLock::new();
485        Self {
486            show_metadata: false,
487            show_location_groups: true,
488            use_short_labels: true, // Default to short labels for all renderers
489            location_names: EMPTY.get_or_init(SecondaryMap::new),
490        }
491    }
492}
493
494/// Node information in the Hydro graph.
495#[derive(Clone)]
496pub struct HydroGraphNode {
497    pub label: NodeLabel,
498    pub node_type: HydroNodeType,
499    pub location_key: Option<LocationKey>,
500    pub backtrace: Option<Backtrace>,
501}
502
503slotmap::new_key_type! {
504    /// Unique identifier for nodes in the visualization graph.
505    ///
506    /// This is counted/allocated separately from any other IDs within `hydro_lang`.
507    pub struct VizNodeKey;
508}
509
510impl Display for VizNodeKey {
511    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512        write!(f, "viz{:?}", self.data()) // `"viz1v1"``
513    }
514}
515
516/// This is used by the visualizer
517/// TODO(mingwei): Make this more robust?
518impl std::str::FromStr for VizNodeKey {
519    type Err = Option<ParseIntError>;
520
521    fn from_str(s: &str) -> Result<Self, Self::Err> {
522        let nvn = s.strip_prefix("viz").ok_or(None)?;
523        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
524        let idx: u64 = idx.parse()?;
525        let ver: u64 = ver.parse()?;
526        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
527    }
528}
529
530impl VizNodeKey {
531    /// A key for testing with index 1.
532    #[cfg(test)]
533    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x0000008f00000001)); // `1v143`
534
535    /// A key for testing with index 2.
536    #[cfg(test)]
537    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x0000008f00000002)); // `2v143`
538}
539
540/// Edge information in the Hydro graph.
541#[derive(Debug, Clone)]
542pub struct HydroGraphEdge {
543    pub src: VizNodeKey,
544    pub dst: VizNodeKey,
545    pub edge_properties: HashSet<HydroEdgeProp>,
546    pub label: Option<String>,
547}
548
549/// Graph structure tracker for Hydro IR rendering.
550#[derive(Default)]
551pub struct HydroGraphStructure {
552    pub nodes: SlotMap<VizNodeKey, HydroGraphNode>,
553    pub edges: Vec<HydroGraphEdge>,
554    pub locations: SecondaryMap<LocationKey, LocationType>,
555}
556
557impl HydroGraphStructure {
558    pub fn new() -> Self {
559        Self::default()
560    }
561
562    pub fn add_node(
563        &mut self,
564        label: NodeLabel,
565        node_type: HydroNodeType,
566        location_key: Option<LocationKey>,
567    ) -> VizNodeKey {
568        self.add_node_with_backtrace(label, node_type, location_key, None)
569    }
570
571    pub fn add_node_with_backtrace(
572        &mut self,
573        label: NodeLabel,
574        node_type: HydroNodeType,
575        location_key: Option<LocationKey>,
576        backtrace: Option<Backtrace>,
577    ) -> VizNodeKey {
578        self.nodes.insert(HydroGraphNode {
579            label,
580            node_type,
581            location_key,
582            backtrace,
583        })
584    }
585
586    /// Add a node with metadata, extracting backtrace automatically
587    pub fn add_node_with_metadata(
588        &mut self,
589        label: NodeLabel,
590        node_type: HydroNodeType,
591        metadata: &HydroIrMetadata,
592    ) -> VizNodeKey {
593        let location_key = Some(setup_location(self, metadata));
594        let backtrace = Some(metadata.op.backtrace.clone());
595        self.add_node_with_backtrace(label, node_type, location_key, backtrace)
596    }
597
598    pub fn add_edge(
599        &mut self,
600        src: VizNodeKey,
601        dst: VizNodeKey,
602        edge_properties: HashSet<HydroEdgeProp>,
603        label: Option<String>,
604    ) {
605        self.edges.push(HydroGraphEdge {
606            src,
607            dst,
608            edge_properties,
609            label,
610        });
611    }
612
613    // Legacy method for backward compatibility
614    pub fn add_edge_single(
615        &mut self,
616        src: VizNodeKey,
617        dst: VizNodeKey,
618        edge_type: HydroEdgeProp,
619        label: Option<String>,
620    ) {
621        let mut properties = HashSet::new();
622        properties.insert(edge_type);
623        self.edges.push(HydroGraphEdge {
624            src,
625            dst,
626            edge_properties: properties,
627            label,
628        });
629    }
630
631    pub fn add_location(&mut self, location_key: LocationKey, location_type: LocationType) {
632        self.locations.insert(location_key, location_type);
633    }
634}
635
636/// Function to extract an op_name from a print_root() result for use in labels.
637pub fn extract_op_name(full_label: String) -> String {
638    full_label
639        .split('(')
640        .next()
641        .unwrap_or("unknown")
642        .to_lowercase()
643}
644
645/// Extract a short, readable label from the full token stream label using print_root() style naming
646pub fn extract_short_label(full_label: &str) -> String {
647    // Use the same logic as extract_op_name but handle the specific cases we need for UI display
648    if let Some(op_name) = full_label.split('(').next() {
649        let base_name = op_name.to_lowercase();
650        match base_name.as_str() {
651            // Handle special cases for UI display
652            "source" => {
653                if full_label.contains("Iter") {
654                    "source_iter".to_owned()
655                } else if full_label.contains("Stream") {
656                    "source_stream".to_owned()
657                } else if full_label.contains("ExternalNetwork") {
658                    "external_network".to_owned()
659                } else if full_label.contains("Spin") {
660                    "spin".to_owned()
661                } else {
662                    "source".to_owned()
663                }
664            }
665            "network" => {
666                if full_label.contains("deser") {
667                    "network(recv)".to_owned()
668                } else if full_label.contains("ser") {
669                    "network(send)".to_owned()
670                } else {
671                    "network".to_owned()
672                }
673            }
674            // For all other cases, just use the lowercase base name (same as extract_op_name)
675            _ => base_name,
676        }
677    } else {
678        // Fallback for labels that don't follow the pattern
679        if full_label.len() > 20 {
680            format!("{}...", &full_label[..17])
681        } else {
682            full_label.to_owned()
683        }
684    }
685}
686
687/// Helper function to set up location in structure from metadata.
688fn setup_location(structure: &mut HydroGraphStructure, metadata: &HydroIrMetadata) -> LocationKey {
689    let root = metadata.location_id.root();
690    let location_key = root.key();
691    let location_type = root.location_type().unwrap();
692    structure.add_location(location_key, location_type);
693    location_key
694}
695
696/// Helper function to add an edge with semantic tags extracted from metadata.
697/// This function combines collection kind extraction with network detection.
698fn add_edge_with_metadata(
699    structure: &mut HydroGraphStructure,
700    src_id: VizNodeKey,
701    dst_id: VizNodeKey,
702    src_metadata: Option<&HydroIrMetadata>,
703    dst_metadata: Option<&HydroIrMetadata>,
704    label: Option<String>,
705) {
706    let mut properties = HashSet::new();
707
708    // Extract semantic tags from source metadata's collection kind
709    if let Some(metadata) = src_metadata {
710        properties.extend(extract_edge_properties_from_collection_kind(
711            &metadata.collection_kind,
712        ));
713    }
714
715    // Add network edge tag if locations differ
716    if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
717        add_network_edge_tag(
718            &mut properties,
719            &src_meta.location_id,
720            &dst_meta.location_id,
721        );
722    }
723
724    // If no properties were extracted, default to Stream
725    if properties.is_empty() {
726        properties.insert(HydroEdgeProp::Stream);
727    }
728
729    structure.add_edge(src_id, dst_id, properties, label);
730}
731
732/// Helper function to write a graph structure using any GraphWrite implementation
733fn write_graph_structure<W>(
734    structure: &HydroGraphStructure,
735    graph_write: W,
736    config: HydroWriteConfig<'_>,
737) -> Result<(), W::Err>
738where
739    W: HydroGraphWrite,
740{
741    let mut graph_write = graph_write;
742    // Write the graph
743    graph_write.write_prologue()?;
744
745    // Write node definitions
746    for (node_id, node) in structure.nodes.iter() {
747        let location_type = node
748            .location_key
749            .and_then(|loc_key| structure.locations.get(loc_key))
750            .copied();
751
752        graph_write.write_node_definition(
753            node_id,
754            &node.label,
755            node.node_type,
756            node.location_key,
757            location_type,
758            node.backtrace.as_ref(),
759        )?;
760    }
761
762    // Group nodes by location if requested
763    if config.show_location_groups {
764        let mut nodes_by_location = SecondaryMap::<LocationKey, Vec<VizNodeKey>>::new();
765        for (node_id, node) in structure.nodes.iter() {
766            if let Some(location_key) = node.location_key {
767                nodes_by_location
768                    .entry(location_key)
769                    .expect("location was removed")
770                    .or_default()
771                    .push(node_id);
772            }
773        }
774
775        for (location_key, node_ids) in nodes_by_location.iter() {
776            if let Some(&location_type) = structure.locations.get(location_key) {
777                graph_write.write_location_start(location_key, location_type)?;
778                for &node_id in node_ids.iter() {
779                    graph_write.write_node(node_id)?;
780                }
781                graph_write.write_location_end()?;
782            }
783        }
784    }
785
786    // Write edges
787    for edge in structure.edges.iter() {
788        graph_write.write_edge(
789            edge.src,
790            edge.dst,
791            &edge.edge_properties,
792            edge.label.as_deref(),
793        )?;
794    }
795
796    graph_write.write_epilogue()?;
797    Ok(())
798}
799
800impl HydroRoot {
801    /// Build the graph structure by traversing the IR tree.
802    pub fn build_graph_structure(
803        &self,
804        structure: &mut HydroGraphStructure,
805        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
806        config: HydroWriteConfig<'_>,
807    ) -> VizNodeKey {
808        // Helper function for sink nodes to reduce duplication
809        fn build_sink_node(
810            structure: &mut HydroGraphStructure,
811            seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
812            config: HydroWriteConfig<'_>,
813            input: &HydroNode,
814            sink_metadata: Option<&HydroIrMetadata>,
815            label: NodeLabel,
816        ) -> VizNodeKey {
817            let input_id = input.build_graph_structure(structure, seen_tees, config);
818
819            // If no explicit metadata is provided, extract it from the input node
820            let effective_metadata = if let Some(meta) = sink_metadata {
821                Some(meta)
822            } else {
823                match input {
824                    HydroNode::Placeholder => None,
825                    // All other variants have metadata
826                    _ => Some(input.metadata()),
827                }
828            };
829
830            let location_key = effective_metadata.map(|m| setup_location(structure, m));
831            let sink_id = structure.add_node_with_backtrace(
832                label,
833                HydroNodeType::Sink,
834                location_key,
835                effective_metadata.map(|m| m.op.backtrace.clone()),
836            );
837
838            // Extract semantic tags from input metadata
839            let input_metadata = input.metadata();
840            add_edge_with_metadata(
841                structure,
842                input_id,
843                sink_id,
844                Some(input_metadata),
845                sink_metadata,
846                None,
847            );
848
849            sink_id
850        }
851
852        match self {
853            // Sink operations - semantic tags extracted from input metadata
854            HydroRoot::ForEach { f, input, .. } => build_sink_node(
855                structure,
856                seen_tees,
857                config,
858                input,
859                None,
860                NodeLabel::with_exprs("for_each".to_owned(), vec![f.clone()]),
861            ),
862
863            HydroRoot::SendExternal {
864                to_external_key,
865                to_port_id,
866                input,
867                ..
868            } => build_sink_node(
869                structure,
870                seen_tees,
871                config,
872                input,
873                None,
874                NodeLabel::with_exprs(
875                    format!("send_external({}:{})", to_external_key, to_port_id),
876                    vec![],
877                ),
878            ),
879
880            HydroRoot::DestSink { sink, input, .. } => build_sink_node(
881                structure,
882                seen_tees,
883                config,
884                input,
885                None,
886                NodeLabel::with_exprs("dest_sink".to_owned(), vec![sink.clone()]),
887            ),
888
889            HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
890                structure,
891                seen_tees,
892                config,
893                input,
894                None,
895                NodeLabel::static_label(format!("cycle_sink({})", ident)),
896            ),
897        }
898    }
899}
900
901impl HydroNode {
902    /// Build the graph structure recursively for this node.
903    pub fn build_graph_structure(
904        &self,
905        structure: &mut HydroGraphStructure,
906        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
907        config: HydroWriteConfig<'_>,
908    ) -> VizNodeKey {
909        // Helper functions to reduce duplication, categorized by input/expression patterns
910
911        /// Common parameters for transform builder functions to reduce argument count
912        struct TransformParams<'a> {
913            structure: &'a mut HydroGraphStructure,
914            seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
915            config: HydroWriteConfig<'a>,
916            input: &'a HydroNode,
917            metadata: &'a HydroIrMetadata,
918            op_name: String,
919            node_type: HydroNodeType,
920        }
921
922        // Single-input transform with no expressions
923        fn build_simple_transform(params: TransformParams) -> VizNodeKey {
924            let input_id = params.input.build_graph_structure(
925                params.structure,
926                params.seen_tees,
927                params.config,
928            );
929            let node_id = params.structure.add_node_with_metadata(
930                NodeLabel::Static(params.op_name.to_string()),
931                params.node_type,
932                params.metadata,
933            );
934
935            // Extract semantic tags from input metadata
936            let input_metadata = params.input.metadata();
937            add_edge_with_metadata(
938                params.structure,
939                input_id,
940                node_id,
941                Some(input_metadata),
942                Some(params.metadata),
943                None,
944            );
945
946            node_id
947        }
948
949        // Single-input transform with one expression
950        fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> VizNodeKey {
951            let input_id = params.input.build_graph_structure(
952                params.structure,
953                params.seen_tees,
954                params.config,
955            );
956            let node_id = params.structure.add_node_with_metadata(
957                NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
958                params.node_type,
959                params.metadata,
960            );
961
962            // Extract semantic tags from input metadata
963            let input_metadata = params.input.metadata();
964            add_edge_with_metadata(
965                params.structure,
966                input_id,
967                node_id,
968                Some(input_metadata),
969                Some(params.metadata),
970                None,
971            );
972
973            node_id
974        }
975
976        // Single-input transform with two expressions
977        fn build_dual_expr_transform(
978            params: TransformParams,
979            expr1: &DebugExpr,
980            expr2: &DebugExpr,
981        ) -> VizNodeKey {
982            let input_id = params.input.build_graph_structure(
983                params.structure,
984                params.seen_tees,
985                params.config,
986            );
987            let node_id = params.structure.add_node_with_metadata(
988                NodeLabel::with_exprs(
989                    params.op_name.to_string(),
990                    vec![expr1.clone(), expr2.clone()],
991                ),
992                params.node_type,
993                params.metadata,
994            );
995
996            // Extract semantic tags from input metadata
997            let input_metadata = params.input.metadata();
998            add_edge_with_metadata(
999                params.structure,
1000                input_id,
1001                node_id,
1002                Some(input_metadata),
1003                Some(params.metadata),
1004                None,
1005            );
1006
1007            node_id
1008        }
1009
1010        // Helper function for source nodes
1011        fn build_source_node(
1012            structure: &mut HydroGraphStructure,
1013            metadata: &HydroIrMetadata,
1014            label: String,
1015        ) -> VizNodeKey {
1016            structure.add_node_with_metadata(
1017                NodeLabel::Static(label),
1018                HydroNodeType::Source,
1019                metadata,
1020            )
1021        }
1022
1023        match self {
1024            HydroNode::Placeholder => structure.add_node(
1025                NodeLabel::Static("PLACEHOLDER".to_owned()),
1026                HydroNodeType::Transform,
1027                None,
1028            ),
1029
1030            HydroNode::Source {
1031                source, metadata, ..
1032            } => {
1033                let label = match source {
1034                    HydroSource::Stream(expr) => format!("source_stream({})", expr),
1035                    HydroSource::ExternalNetwork() => "external_network()".to_owned(),
1036                    HydroSource::Iter(expr) => format!("source_iter({})", expr),
1037                    HydroSource::Spin() => "spin()".to_owned(),
1038                    HydroSource::ClusterMembers(location_id) => {
1039                        format!(
1040                            "source_stream(cluster_membership_stream({:?}))",
1041                            location_id
1042                        )
1043                    }
1044                };
1045                build_source_node(structure, metadata, label)
1046            }
1047
1048            HydroNode::SingletonSource { value, metadata } => {
1049                let label = format!("singleton({})", value);
1050                build_source_node(structure, metadata, label)
1051            }
1052
1053            HydroNode::ExternalInput {
1054                from_external_key,
1055                from_port_id,
1056                metadata,
1057                ..
1058            } => build_source_node(
1059                structure,
1060                metadata,
1061                format!("external_input({}:{})", from_external_key, from_port_id),
1062            ),
1063
1064            HydroNode::CycleSource {
1065                ident, metadata, ..
1066            } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
1067
1068            HydroNode::Tee { inner, metadata } => {
1069                let ptr = inner.as_ptr();
1070                if let Some(&existing_id) = seen_tees.get(&ptr) {
1071                    return existing_id;
1072                }
1073
1074                let input_id = inner
1075                    .0
1076                    .borrow()
1077                    .build_graph_structure(structure, seen_tees, config);
1078                let tee_id = structure.add_node_with_metadata(
1079                    NodeLabel::Static(extract_op_name(self.print_root())),
1080                    HydroNodeType::Tee,
1081                    metadata,
1082                );
1083
1084                seen_tees.insert(ptr, tee_id);
1085
1086                // Extract semantic tags from input
1087                let inner_borrow = inner.0.borrow();
1088                let input_metadata = inner_borrow.metadata();
1089                add_edge_with_metadata(
1090                    structure,
1091                    input_id,
1092                    tee_id,
1093                    Some(input_metadata),
1094                    Some(metadata),
1095                    None,
1096                );
1097                drop(inner_borrow);
1098
1099                tee_id
1100            }
1101
1102            // Non-deterministic operation
1103            HydroNode::ObserveNonDet {
1104                inner, metadata, ..
1105            } => build_simple_transform(TransformParams {
1106                structure,
1107                seen_tees,
1108                config,
1109                input: inner,
1110                metadata,
1111                op_name: extract_op_name(self.print_root()),
1112                node_type: HydroNodeType::NonDeterministic,
1113            }),
1114
1115            // Transform operations with Stream edges - grouped by node/edge type
1116            HydroNode::Cast { inner, metadata }
1117            | HydroNode::DeferTick {
1118                input: inner,
1119                metadata,
1120            }
1121            | HydroNode::Enumerate {
1122                input: inner,
1123                metadata,
1124                ..
1125            }
1126            | HydroNode::Unique {
1127                input: inner,
1128                metadata,
1129            }
1130            | HydroNode::ResolveFutures {
1131                input: inner,
1132                metadata,
1133            }
1134            | HydroNode::ResolveFuturesOrdered {
1135                input: inner,
1136                metadata,
1137            } => build_simple_transform(TransformParams {
1138                structure,
1139                seen_tees,
1140                config,
1141                input: inner,
1142                metadata,
1143                op_name: extract_op_name(self.print_root()),
1144                node_type: HydroNodeType::Transform,
1145            }),
1146
1147            // Aggregation operation - semantic tags extracted from metadata
1148            HydroNode::Sort {
1149                input: inner,
1150                metadata,
1151            } => build_simple_transform(TransformParams {
1152                structure,
1153                seen_tees,
1154                config,
1155                input: inner,
1156                metadata,
1157                op_name: extract_op_name(self.print_root()),
1158                node_type: HydroNodeType::Aggregation,
1159            }),
1160
1161            // Single-expression Transform operations - grouped by node type
1162            HydroNode::Map { f, input, metadata }
1163            | HydroNode::Filter { f, input, metadata }
1164            | HydroNode::FlatMap { f, input, metadata }
1165            | HydroNode::FilterMap { f, input, metadata }
1166            | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1167                TransformParams {
1168                    structure,
1169                    seen_tees,
1170                    config,
1171                    input,
1172                    metadata,
1173                    op_name: extract_op_name(self.print_root()),
1174                    node_type: HydroNodeType::Transform,
1175                },
1176                f,
1177            ),
1178
1179            // Single-expression Aggregation operations - grouped by node type
1180            HydroNode::Reduce { f, input, metadata }
1181            | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1182                TransformParams {
1183                    structure,
1184                    seen_tees,
1185                    config,
1186                    input,
1187                    metadata,
1188                    op_name: extract_op_name(self.print_root()),
1189                    node_type: HydroNodeType::Aggregation,
1190                },
1191                f,
1192            ),
1193
1194            // Join-like operations with left/right edge labels - grouped by edge labeling
1195            HydroNode::Join {
1196                left,
1197                right,
1198                metadata,
1199            }
1200            | HydroNode::CrossProduct {
1201                left,
1202                right,
1203                metadata,
1204            }
1205            | HydroNode::CrossSingleton {
1206                left,
1207                right,
1208                metadata,
1209            } => {
1210                let left_id = left.build_graph_structure(structure, seen_tees, config);
1211                let right_id = right.build_graph_structure(structure, seen_tees, config);
1212                let node_id = structure.add_node_with_metadata(
1213                    NodeLabel::Static(extract_op_name(self.print_root())),
1214                    HydroNodeType::Join,
1215                    metadata,
1216                );
1217
1218                // Extract semantic tags for left edge
1219                let left_metadata = left.metadata();
1220                add_edge_with_metadata(
1221                    structure,
1222                    left_id,
1223                    node_id,
1224                    Some(left_metadata),
1225                    Some(metadata),
1226                    Some("left".to_owned()),
1227                );
1228
1229                // Extract semantic tags for right edge
1230                let right_metadata = right.metadata();
1231                add_edge_with_metadata(
1232                    structure,
1233                    right_id,
1234                    node_id,
1235                    Some(right_metadata),
1236                    Some(metadata),
1237                    Some("right".to_owned()),
1238                );
1239
1240                node_id
1241            }
1242
1243            // Join-like operations with pos/neg edge labels - grouped by edge labeling
1244            HydroNode::Difference {
1245                pos: left,
1246                neg: right,
1247                metadata,
1248            }
1249            | HydroNode::AntiJoin {
1250                pos: left,
1251                neg: right,
1252                metadata,
1253            } => {
1254                let left_id = left.build_graph_structure(structure, seen_tees, config);
1255                let right_id = right.build_graph_structure(structure, seen_tees, config);
1256                let node_id = structure.add_node_with_metadata(
1257                    NodeLabel::Static(extract_op_name(self.print_root())),
1258                    HydroNodeType::Join,
1259                    metadata,
1260                );
1261
1262                // Extract semantic tags for pos edge
1263                let left_metadata = left.metadata();
1264                add_edge_with_metadata(
1265                    structure,
1266                    left_id,
1267                    node_id,
1268                    Some(left_metadata),
1269                    Some(metadata),
1270                    Some("pos".to_owned()),
1271                );
1272
1273                // Extract semantic tags for neg edge
1274                let right_metadata = right.metadata();
1275                add_edge_with_metadata(
1276                    structure,
1277                    right_id,
1278                    node_id,
1279                    Some(right_metadata),
1280                    Some(metadata),
1281                    Some("neg".to_owned()),
1282                );
1283
1284                node_id
1285            }
1286
1287            // Dual expression transforms - consolidated using pattern matching
1288            HydroNode::Fold {
1289                init,
1290                acc,
1291                input,
1292                metadata,
1293            }
1294            | HydroNode::FoldKeyed {
1295                init,
1296                acc,
1297                input,
1298                metadata,
1299            }
1300            | HydroNode::Scan {
1301                init,
1302                acc,
1303                input,
1304                metadata,
1305            } => {
1306                let node_type = HydroNodeType::Aggregation; // All are aggregation operations
1307
1308                build_dual_expr_transform(
1309                    TransformParams {
1310                        structure,
1311                        seen_tees,
1312                        config,
1313                        input,
1314                        metadata,
1315                        op_name: extract_op_name(self.print_root()),
1316                        node_type,
1317                    },
1318                    init,
1319                    acc,
1320                )
1321            }
1322
1323            // Combination of join and transform
1324            HydroNode::ReduceKeyedWatermark {
1325                f,
1326                input,
1327                watermark,
1328                metadata,
1329            } => {
1330                let input_id = input.build_graph_structure(structure, seen_tees, config);
1331                let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1332                let location_key = Some(setup_location(structure, metadata));
1333                let join_node_id = structure.add_node_with_backtrace(
1334                    NodeLabel::Static(extract_op_name(self.print_root())),
1335                    HydroNodeType::Join,
1336                    location_key,
1337                    Some(metadata.op.backtrace.clone()),
1338                );
1339
1340                // Extract semantic tags for input edge
1341                let input_metadata = input.metadata();
1342                add_edge_with_metadata(
1343                    structure,
1344                    input_id,
1345                    join_node_id,
1346                    Some(input_metadata),
1347                    Some(metadata),
1348                    Some("input".to_owned()),
1349                );
1350
1351                // Extract semantic tags for watermark edge
1352                let watermark_metadata = watermark.metadata();
1353                add_edge_with_metadata(
1354                    structure,
1355                    watermark_id,
1356                    join_node_id,
1357                    Some(watermark_metadata),
1358                    Some(metadata),
1359                    Some("watermark".to_owned()),
1360                );
1361
1362                let node_id = structure.add_node_with_backtrace(
1363                    NodeLabel::with_exprs(extract_op_name(self.print_root()), vec![f.clone()]),
1364                    HydroNodeType::Aggregation,
1365                    location_key,
1366                    Some(metadata.op.backtrace.clone()),
1367                );
1368
1369                // Edge from join to aggregation node
1370                let join_metadata = metadata; // Use the same metadata
1371                add_edge_with_metadata(
1372                    structure,
1373                    join_node_id,
1374                    node_id,
1375                    Some(join_metadata),
1376                    Some(metadata),
1377                    None,
1378                );
1379
1380                node_id
1381            }
1382
1383            HydroNode::Network {
1384                serialize_fn,
1385                deserialize_fn,
1386                input,
1387                metadata,
1388                ..
1389            } => {
1390                let input_id = input.build_graph_structure(structure, seen_tees, config);
1391                let _from_location_key = setup_location(structure, metadata);
1392
1393                let root = metadata.location_id.root();
1394                let to_location_key = root.key();
1395                let to_location_type = root.location_type().unwrap();
1396                structure.add_location(to_location_key, to_location_type);
1397
1398                let mut label = "network(".to_owned();
1399                if serialize_fn.is_some() {
1400                    label.push_str("send");
1401                }
1402                if deserialize_fn.is_some() {
1403                    if serialize_fn.is_some() {
1404                        label.push_str(" + ");
1405                    }
1406                    label.push_str("recv");
1407                }
1408                label.push(')');
1409
1410                let network_id = structure.add_node_with_backtrace(
1411                    NodeLabel::Static(label),
1412                    HydroNodeType::Network,
1413                    Some(to_location_key),
1414                    Some(metadata.op.backtrace.clone()),
1415                );
1416
1417                // Extract semantic tags for network edge
1418                let input_metadata = input.metadata();
1419                add_edge_with_metadata(
1420                    structure,
1421                    input_id,
1422                    network_id,
1423                    Some(input_metadata),
1424                    Some(metadata),
1425                    Some(format!("to {:?}({})", to_location_type, to_location_key)),
1426                );
1427
1428                network_id
1429            }
1430
1431            // Non-deterministic batch operation
1432            HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
1433                structure,
1434                seen_tees,
1435                config,
1436                input: inner,
1437                metadata,
1438                op_name: extract_op_name(self.print_root()),
1439                node_type: HydroNodeType::NonDeterministic,
1440            }),
1441
1442            HydroNode::YieldConcat { inner, .. } => {
1443                // Unpersist is typically optimized away, just pass through
1444                inner.build_graph_structure(structure, seen_tees, config)
1445            }
1446
1447            HydroNode::BeginAtomic { inner, .. } => {
1448                inner.build_graph_structure(structure, seen_tees, config)
1449            }
1450
1451            HydroNode::EndAtomic { inner, .. } => {
1452                inner.build_graph_structure(structure, seen_tees, config)
1453            }
1454
1455            HydroNode::Chain {
1456                first,
1457                second,
1458                metadata,
1459            } => {
1460                let first_id = first.build_graph_structure(structure, seen_tees, config);
1461                let second_id = second.build_graph_structure(structure, seen_tees, config);
1462                let location_key = Some(setup_location(structure, metadata));
1463                let chain_id = structure.add_node_with_backtrace(
1464                    NodeLabel::Static(extract_op_name(self.print_root())),
1465                    HydroNodeType::Transform,
1466                    location_key,
1467                    Some(metadata.op.backtrace.clone()),
1468                );
1469
1470                // Extract semantic tags for first edge
1471                let first_metadata = first.metadata();
1472                add_edge_with_metadata(
1473                    structure,
1474                    first_id,
1475                    chain_id,
1476                    Some(first_metadata),
1477                    Some(metadata),
1478                    Some("first".to_owned()),
1479                );
1480
1481                // Extract semantic tags for second edge
1482                let second_metadata = second.metadata();
1483                add_edge_with_metadata(
1484                    structure,
1485                    second_id,
1486                    chain_id,
1487                    Some(second_metadata),
1488                    Some(metadata),
1489                    Some("second".to_owned()),
1490                );
1491
1492                chain_id
1493            }
1494
1495            HydroNode::ChainFirst {
1496                first,
1497                second,
1498                metadata,
1499            } => {
1500                let first_id = first.build_graph_structure(structure, seen_tees, config);
1501                let second_id = second.build_graph_structure(structure, seen_tees, config);
1502                let location_key = Some(setup_location(structure, metadata));
1503                let chain_id = structure.add_node_with_backtrace(
1504                    NodeLabel::Static(extract_op_name(self.print_root())),
1505                    HydroNodeType::Transform,
1506                    location_key,
1507                    Some(metadata.op.backtrace.clone()),
1508                );
1509
1510                // Extract semantic tags for first edge
1511                let first_metadata = first.metadata();
1512                add_edge_with_metadata(
1513                    structure,
1514                    first_id,
1515                    chain_id,
1516                    Some(first_metadata),
1517                    Some(metadata),
1518                    Some("first".to_owned()),
1519                );
1520
1521                // Extract semantic tags for second edge
1522                let second_metadata = second.metadata();
1523                add_edge_with_metadata(
1524                    structure,
1525                    second_id,
1526                    chain_id,
1527                    Some(second_metadata),
1528                    Some(metadata),
1529                    Some("second".to_owned()),
1530                );
1531
1532                chain_id
1533            }
1534
1535            HydroNode::Counter {
1536                tag: _,
1537                prefix: _,
1538                duration,
1539                input,
1540                metadata,
1541            } => build_single_expr_transform(
1542                TransformParams {
1543                    structure,
1544                    seen_tees,
1545                    config,
1546                    input,
1547                    metadata,
1548                    op_name: extract_op_name(self.print_root()),
1549                    node_type: HydroNodeType::Transform,
1550                },
1551                duration,
1552            ),
1553        }
1554    }
1555}
1556
1557/// Utility functions for rendering multiple roots as a single graph.
1558/// Macro to reduce duplication in render functions.
1559macro_rules! render_hydro_ir {
1560    ($name:ident, $write_fn:ident) => {
1561        pub fn $name(roots: &[HydroRoot], config: HydroWriteConfig<'_>) -> String {
1562            let mut output = String::new();
1563            $write_fn(&mut output, roots, config).unwrap();
1564            output
1565        }
1566    };
1567}
1568
1569/// Macro to reduce duplication in write functions.
1570macro_rules! write_hydro_ir {
1571    ($name:ident, $writer_type:ty, $constructor:expr) => {
1572        pub fn $name(
1573            output: impl std::fmt::Write,
1574            roots: &[HydroRoot],
1575            config: HydroWriteConfig<'_>,
1576        ) -> std::fmt::Result {
1577            let mut graph_write: $writer_type = $constructor(output, config);
1578            write_hydro_ir_graph(&mut graph_write, roots, config)
1579        }
1580    };
1581}
1582
1583render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1584write_hydro_ir!(
1585    write_hydro_ir_mermaid,
1586    HydroMermaid<_>,
1587    HydroMermaid::new_with_config
1588);
1589
1590render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1591write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1592
1593// Legacy hydroscope function - now uses HydroJson for consistency
1594render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1595
1596// JSON rendering
1597render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1598write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1599
1600fn write_hydro_ir_graph<W>(
1601    graph_write: W,
1602    roots: &[HydroRoot],
1603    config: HydroWriteConfig<'_>,
1604) -> Result<(), W::Err>
1605where
1606    W: HydroGraphWrite,
1607{
1608    let mut structure = HydroGraphStructure::new();
1609    let mut seen_tees = HashMap::new();
1610
1611    // Build the graph structure for all roots
1612    for leaf in roots {
1613        leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1614    }
1615
1616    write_graph_structure(&structure, graph_write, config)
1617}