Skip to main content

hydro_lang/viz/
render.rs

1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::{Display, Write};
4use std::num::ParseIntError;
5use std::sync::OnceLock;
6
7use auto_impl::auto_impl;
8use slotmap::{Key, SecondaryMap, SlotMap};
9
10pub use super::graphviz::{HydroDot, escape_dot};
11pub use super::json::HydroJson;
12// Re-export specific implementations
13pub use super::mermaid::{HydroMermaid, escape_mermaid};
14use crate::compile::ir::backtrace::Backtrace;
15use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
16use crate::location::dynamic::LocationId;
17use crate::location::{LocationKey, LocationType};
18
19/// Label for a graph node - can be either a static string or contain expressions.
20#[derive(Debug, Clone)]
21pub enum NodeLabel {
22    /// A static string label
23    Static(String),
24    /// A label with an operation name and expression arguments
25    WithExprs {
26        op_name: String,
27        exprs: Vec<DebugExpr>,
28    },
29}
30
31impl NodeLabel {
32    /// Create a static label
33    pub fn static_label(s: String) -> Self {
34        Self::Static(s)
35    }
36
37    /// Create a label for an operation with multiple expression
38    pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
39        Self::WithExprs { op_name, exprs }
40    }
41}
42
43impl Display for NodeLabel {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            Self::Static(s) => write!(f, "{}", s),
47            Self::WithExprs { op_name, exprs } => {
48                if exprs.is_empty() {
49                    write!(f, "{}()", op_name)
50                } else {
51                    let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
52                    write!(f, "{}({})", op_name, expr_strs.join(", "))
53                }
54            }
55        }
56    }
57}
58
59/// Base struct for text-based graph writers that use indentation.
60/// Contains common fields shared by DOT and Mermaid writers.
61pub struct IndentedGraphWriter<'a, W> {
62    pub write: W,
63    pub indent: usize,
64    pub config: HydroWriteConfig<'a>,
65}
66
67impl<'a, W> IndentedGraphWriter<'a, W> {
68    /// Create a new writer with default configuration.
69    pub fn new(write: W) -> Self {
70        Self {
71            write,
72            indent: 0,
73            config: HydroWriteConfig::default(),
74        }
75    }
76
77    /// Create a new writer with the given configuration.
78    pub fn new_with_config(write: W, config: HydroWriteConfig<'a>) -> Self {
79        Self {
80            write,
81            indent: 0,
82            config,
83        }
84    }
85}
86
87impl<W: Write> IndentedGraphWriter<'_, W> {
88    /// Write an indented line using the current indentation level.
89    pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
90        writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
91    }
92}
93
94/// Common error type used by all graph writers.
95pub type GraphWriteError = std::fmt::Error;
96
97/// Trait for writing textual representations of Hydro IR graphs, i.e. mermaid or dot graphs.
98#[auto_impl(&mut, Box)]
99pub trait HydroGraphWrite {
100    /// Error type emitted by writing.
101    type Err: Error;
102
103    /// Begin the graph. First method called.
104    fn write_prologue(&mut self) -> Result<(), Self::Err>;
105
106    /// Write a node definition with styling.
107    fn write_node_definition(
108        &mut self,
109        node_id: VizNodeKey,
110        node_label: &NodeLabel,
111        node_type: HydroNodeType,
112        location_key: Option<LocationKey>,
113        location_type: Option<LocationType>,
114        backtrace: Option<&Backtrace>,
115    ) -> Result<(), Self::Err>;
116
117    /// Write an edge between nodes with optional labeling.
118    fn write_edge(
119        &mut self,
120        src_id: VizNodeKey,
121        dst_id: VizNodeKey,
122        edge_properties: &HashSet<HydroEdgeProp>,
123        label: Option<&str>,
124    ) -> Result<(), Self::Err>;
125
126    /// Begin writing a location grouping (process/cluster).
127    fn write_location_start(
128        &mut self,
129        location_key: LocationKey,
130        location_type: LocationType,
131    ) -> Result<(), Self::Err>;
132
133    /// Write a node within a location.
134    fn write_node(&mut self, node_id: VizNodeKey) -> Result<(), Self::Err>;
135
136    /// End writing a location grouping.
137    fn write_location_end(&mut self) -> Result<(), Self::Err>;
138
139    /// End the graph. Last method called.
140    fn write_epilogue(&mut self) -> Result<(), Self::Err>;
141}
142
143/// Node type utilities - centralized handling of HydroNodeType operations
144pub mod node_type_utils {
145    use super::HydroNodeType;
146
147    /// All node types with their string names
148    const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
149        (HydroNodeType::Source, "Source"),
150        (HydroNodeType::Transform, "Transform"),
151        (HydroNodeType::Join, "Join"),
152        (HydroNodeType::Aggregation, "Aggregation"),
153        (HydroNodeType::Network, "Network"),
154        (HydroNodeType::Sink, "Sink"),
155        (HydroNodeType::Tee, "Tee"),
156        (HydroNodeType::NonDeterministic, "NonDeterministic"),
157    ];
158
159    /// Convert HydroNodeType to string representation (used by JSON format)
160    pub fn to_string(node_type: HydroNodeType) -> &'static str {
161        NODE_TYPE_DATA
162            .iter()
163            .find(|(nt, _)| *nt == node_type)
164            .map(|(_, name)| *name)
165            .unwrap_or("Unknown")
166    }
167
168    /// Get all node types with their string representations (used by JSON format)
169    pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
170        NODE_TYPE_DATA.to_vec()
171    }
172}
173
174/// Types of nodes in Hydro IR for styling purposes.
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum HydroNodeType {
177    Source,
178    Transform,
179    Join,
180    Aggregation,
181    Network,
182    Sink,
183    Tee,
184    NonDeterministic,
185}
186
187/// Types of edges in Hydro IR representing stream properties.
188#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
189pub enum HydroEdgeProp {
190    Bounded,
191    Unbounded,
192    TotalOrder,
193    NoOrder,
194    Keyed,
195    // Collection type tags for styling
196    Stream,
197    KeyedSingleton,
198    KeyedStream,
199    Singleton,
200    Optional,
201    Network,
202    Cycle,
203}
204
205/// Unified edge style representation for all graph formats.
206/// This intermediate format allows consistent styling across JSON, DOT, and Mermaid.
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct UnifiedEdgeStyle {
209    /// Line pattern (solid, dashed)
210    pub line_pattern: LinePattern,
211    /// Line width (1 = thin, 3 = thick)
212    pub line_width: u8,
213    /// Arrowhead style
214    pub arrowhead: ArrowheadStyle,
215    /// Line style (single plain line, or line with hash marks/dots for keyed streams)
216    pub line_style: LineStyle,
217    /// Halo/background effect for boundedness
218    pub halo: HaloStyle,
219    /// Line waviness for ordering information
220    pub waviness: WavinessStyle,
221    /// Whether animation is enabled (JSON only)
222    pub animation: AnimationStyle,
223    /// Color for the edge
224    pub color: &'static str,
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum LinePattern {
229    Solid,
230    Dotted,
231    Dashed,
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub enum ArrowheadStyle {
236    TriangleFilled,
237    CircleFilled,
238    DiamondOpen,
239    Default,
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq)]
243pub enum LineStyle {
244    /// Plain single line
245    Single,
246    /// Single line with hash marks/dots (for keyed streams)
247    HashMarks,
248}
249
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub enum HaloStyle {
252    None,
253    LightBlue,
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum WavinessStyle {
258    None,
259    Wavy,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum AnimationStyle {
264    Static,
265    Animated,
266}
267
268impl Default for UnifiedEdgeStyle {
269    fn default() -> Self {
270        Self {
271            line_pattern: LinePattern::Solid,
272            line_width: 1,
273            arrowhead: ArrowheadStyle::Default,
274            line_style: LineStyle::Single,
275            halo: HaloStyle::None,
276            waviness: WavinessStyle::None,
277            animation: AnimationStyle::Static,
278            color: "#666666",
279        }
280    }
281}
282
283/// Convert HydroEdgeType properties to unified edge style.
284/// This is the core logic for determining edge visual properties.
285///
286/// # Visual Encoding Mapping
287///
288/// | Semantic Property | Visual Channel | Values |
289/// |------------------|----------------|---------|
290/// | Network | Line Pattern + Animation | Local (solid, static), Network (dashed, animated) |
291/// | Ordering | Waviness | TotalOrder (straight), NoOrder (wavy) |
292/// | Boundedness | Halo | Bounded (none), Unbounded (light-blue transparent) |
293/// | Keyedness | Line Style | NotKeyed (plain line), Keyed (line with hash marks/dots) |
294/// | Collection Type | Color + Arrowhead | Stream (blue #2563eb, triangle), Singleton (black, circle), Optional (gray, diamond) |
295pub fn get_unified_edge_style(
296    edge_properties: &HashSet<HydroEdgeProp>,
297    src_location: Option<usize>,
298    dst_location: Option<usize>,
299) -> UnifiedEdgeStyle {
300    let mut style = UnifiedEdgeStyle::default();
301
302    // Network communication group - controls line pattern AND animation
303    let is_network = edge_properties.contains(&HydroEdgeProp::Network)
304        || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
305
306    if is_network {
307        style.line_pattern = LinePattern::Dashed;
308        style.animation = AnimationStyle::Animated;
309    } else {
310        style.line_pattern = LinePattern::Solid;
311        style.animation = AnimationStyle::Static;
312    }
313
314    // Boundedness group - controls halo
315    if edge_properties.contains(&HydroEdgeProp::Unbounded) {
316        style.halo = HaloStyle::LightBlue;
317    } else {
318        style.halo = HaloStyle::None;
319    }
320
321    // Collection type group - controls arrowhead and color
322    if edge_properties.contains(&HydroEdgeProp::Stream) {
323        style.arrowhead = ArrowheadStyle::TriangleFilled;
324        style.color = "#2563eb"; // Bright blue for Stream
325    } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
326        style.arrowhead = ArrowheadStyle::TriangleFilled;
327        style.color = "#2563eb"; // Bright blue for Stream (keyed variant)
328    } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
329        style.arrowhead = ArrowheadStyle::TriangleFilled;
330        style.color = "#000000"; // Black for Singleton (keyed variant)
331    } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
332        style.arrowhead = ArrowheadStyle::CircleFilled;
333        style.color = "#000000"; // Black for Singleton
334    } else if edge_properties.contains(&HydroEdgeProp::Optional) {
335        style.arrowhead = ArrowheadStyle::DiamondOpen;
336        style.color = "#6b7280"; // Gray for Optional
337    }
338
339    // Keyedness group - controls hash marks on the line
340    if edge_properties.contains(&HydroEdgeProp::Keyed) {
341        style.line_style = LineStyle::HashMarks; // Renders as hash marks/dots on the line in hydroscope
342    } else {
343        style.line_style = LineStyle::Single;
344    }
345
346    // Ordering group - waviness channel
347    if edge_properties.contains(&HydroEdgeProp::NoOrder) {
348        style.waviness = WavinessStyle::Wavy;
349    } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
350        style.waviness = WavinessStyle::None;
351    }
352
353    style
354}
355
356/// Extract semantic edge properties from CollectionKind metadata.
357/// This function analyzes the collection type and extracts relevant semantic tags
358/// for visualization purposes.
359pub fn extract_edge_properties_from_collection_kind(
360    collection_kind: &crate::compile::ir::CollectionKind,
361) -> HashSet<HydroEdgeProp> {
362    use crate::compile::ir::CollectionKind;
363
364    let mut properties = HashSet::new();
365
366    match collection_kind {
367        CollectionKind::Stream { bound, order, .. } => {
368            properties.insert(HydroEdgeProp::Stream);
369            add_bound_property(&mut properties, bound);
370            add_order_property(&mut properties, order);
371        }
372        CollectionKind::KeyedStream {
373            bound, value_order, ..
374        } => {
375            properties.insert(HydroEdgeProp::KeyedStream);
376            properties.insert(HydroEdgeProp::Keyed);
377            add_bound_property(&mut properties, bound);
378            add_order_property(&mut properties, value_order);
379        }
380        CollectionKind::Singleton { bound, .. } => {
381            properties.insert(HydroEdgeProp::Singleton);
382            add_bound_property(&mut properties, bound);
383            // Singletons have implicit TotalOrder
384            properties.insert(HydroEdgeProp::TotalOrder);
385        }
386        CollectionKind::Optional { bound, .. } => {
387            properties.insert(HydroEdgeProp::Optional);
388            add_bound_property(&mut properties, bound);
389            // Optionals have implicit TotalOrder
390            properties.insert(HydroEdgeProp::TotalOrder);
391        }
392        CollectionKind::KeyedSingleton { bound, .. } => {
393            properties.insert(HydroEdgeProp::Singleton);
394            properties.insert(HydroEdgeProp::Keyed);
395            // KeyedSingletons boundedness depends on the bound kind
396            add_keyed_singleton_bound_property(&mut properties, bound);
397            properties.insert(HydroEdgeProp::TotalOrder);
398        }
399    }
400
401    properties
402}
403
404/// Helper function to add bound property based on BoundKind.
405fn add_bound_property(
406    properties: &mut HashSet<HydroEdgeProp>,
407    bound: &crate::compile::ir::BoundKind,
408) {
409    use crate::compile::ir::BoundKind;
410
411    match bound {
412        BoundKind::Bounded => {
413            properties.insert(HydroEdgeProp::Bounded);
414        }
415        BoundKind::Unbounded => {
416            properties.insert(HydroEdgeProp::Unbounded);
417        }
418    }
419}
420
421/// Helper function to add bound property for KeyedSingleton based on KeyedSingletonBoundKind.
422fn add_keyed_singleton_bound_property(
423    properties: &mut HashSet<HydroEdgeProp>,
424    bound: &crate::compile::ir::KeyedSingletonBoundKind,
425) {
426    use crate::compile::ir::KeyedSingletonBoundKind;
427
428    match bound {
429        KeyedSingletonBoundKind::Bounded | KeyedSingletonBoundKind::BoundedValue => {
430            properties.insert(HydroEdgeProp::Bounded);
431        }
432        KeyedSingletonBoundKind::Unbounded => {
433            properties.insert(HydroEdgeProp::Unbounded);
434        }
435    }
436}
437
438/// Helper function to add order property based on StreamOrder.
439fn add_order_property(
440    properties: &mut HashSet<HydroEdgeProp>,
441    order: &crate::compile::ir::StreamOrder,
442) {
443    use crate::compile::ir::StreamOrder;
444
445    match order {
446        StreamOrder::TotalOrder => {
447            properties.insert(HydroEdgeProp::TotalOrder);
448        }
449        StreamOrder::NoOrder => {
450            properties.insert(HydroEdgeProp::NoOrder);
451        }
452    }
453}
454
455/// Detect if an edge crosses network boundaries by comparing source and destination locations.
456/// Returns true if the edge represents network communication between different locations.
457pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
458    // Compare the root locations to determine if they differ
459    src_location.root() != dst_location.root()
460}
461
462/// Add network edge tag if source and destination locations differ.
463pub fn add_network_edge_tag(
464    properties: &mut HashSet<HydroEdgeProp>,
465    src_location: &LocationId,
466    dst_location: &LocationId,
467) {
468    if is_network_edge(src_location, dst_location) {
469        properties.insert(HydroEdgeProp::Network);
470    }
471}
472
473/// Configuration for graph writing.
474#[derive(Debug, Clone, Copy)]
475pub struct HydroWriteConfig<'a> {
476    pub show_metadata: bool,
477    pub show_location_groups: bool,
478    pub use_short_labels: bool,
479    pub location_names: &'a SecondaryMap<LocationKey, String>,
480}
481
482impl Default for HydroWriteConfig<'_> {
483    fn default() -> Self {
484        static EMPTY: OnceLock<SecondaryMap<LocationKey, String>> = OnceLock::new();
485        Self {
486            show_metadata: false,
487            show_location_groups: true,
488            use_short_labels: true, // Default to short labels for all renderers
489            location_names: EMPTY.get_or_init(SecondaryMap::new),
490        }
491    }
492}
493
494/// Node information in the Hydro graph.
495#[derive(Clone)]
496pub struct HydroGraphNode {
497    pub label: NodeLabel,
498    pub node_type: HydroNodeType,
499    pub location_key: Option<LocationKey>,
500    pub backtrace: Option<Backtrace>,
501}
502
503slotmap::new_key_type! {
504    /// Unique identifier for nodes in the visualization graph.
505    ///
506    /// This is counted/allocated separately from any other IDs within `hydro_lang`.
507    pub struct VizNodeKey;
508}
509
510impl Display for VizNodeKey {
511    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512        write!(f, "viz{:?}", self.data()) // `"viz1v1"``
513    }
514}
515
516/// This is used by the visualizer
517/// TODO(mingwei): Make this more robust?
518impl std::str::FromStr for VizNodeKey {
519    type Err = Option<ParseIntError>;
520
521    fn from_str(s: &str) -> Result<Self, Self::Err> {
522        let nvn = s.strip_prefix("viz").ok_or(None)?;
523        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
524        let idx: u64 = idx.parse()?;
525        let ver: u64 = ver.parse()?;
526        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
527    }
528}
529
530impl VizNodeKey {
531    /// A key for testing with index 1.
532    #[cfg(test)]
533    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x0000008f00000001)); // `1v143`
534
535    /// A key for testing with index 2.
536    #[cfg(test)]
537    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x0000008f00000002)); // `2v143`
538}
539
540/// Edge information in the Hydro graph.
541#[derive(Debug, Clone)]
542pub struct HydroGraphEdge {
543    pub src: VizNodeKey,
544    pub dst: VizNodeKey,
545    pub edge_properties: HashSet<HydroEdgeProp>,
546    pub label: Option<String>,
547}
548
549/// Graph structure tracker for Hydro IR rendering.
550#[derive(Default)]
551pub struct HydroGraphStructure {
552    pub nodes: SlotMap<VizNodeKey, HydroGraphNode>,
553    pub edges: Vec<HydroGraphEdge>,
554    pub locations: SecondaryMap<LocationKey, LocationType>,
555}
556
557impl HydroGraphStructure {
558    pub fn new() -> Self {
559        Self::default()
560    }
561
562    pub fn add_node(
563        &mut self,
564        label: NodeLabel,
565        node_type: HydroNodeType,
566        location_key: Option<LocationKey>,
567    ) -> VizNodeKey {
568        self.add_node_with_backtrace(label, node_type, location_key, None)
569    }
570
571    pub fn add_node_with_backtrace(
572        &mut self,
573        label: NodeLabel,
574        node_type: HydroNodeType,
575        location_key: Option<LocationKey>,
576        backtrace: Option<Backtrace>,
577    ) -> VizNodeKey {
578        self.nodes.insert(HydroGraphNode {
579            label,
580            node_type,
581            location_key,
582            backtrace,
583        })
584    }
585
586    /// Add a node with metadata, extracting backtrace automatically
587    pub fn add_node_with_metadata(
588        &mut self,
589        label: NodeLabel,
590        node_type: HydroNodeType,
591        metadata: &HydroIrMetadata,
592    ) -> VizNodeKey {
593        let location_key = Some(setup_location(self, metadata));
594        let backtrace = Some(metadata.op.backtrace.clone());
595        self.add_node_with_backtrace(label, node_type, location_key, backtrace)
596    }
597
598    pub fn add_edge(
599        &mut self,
600        src: VizNodeKey,
601        dst: VizNodeKey,
602        edge_properties: HashSet<HydroEdgeProp>,
603        label: Option<String>,
604    ) {
605        self.edges.push(HydroGraphEdge {
606            src,
607            dst,
608            edge_properties,
609            label,
610        });
611    }
612
613    // Legacy method for backward compatibility
614    pub fn add_edge_single(
615        &mut self,
616        src: VizNodeKey,
617        dst: VizNodeKey,
618        edge_type: HydroEdgeProp,
619        label: Option<String>,
620    ) {
621        let mut properties = HashSet::new();
622        properties.insert(edge_type);
623        self.edges.push(HydroGraphEdge {
624            src,
625            dst,
626            edge_properties: properties,
627            label,
628        });
629    }
630
631    pub fn add_location(&mut self, location_key: LocationKey, location_type: LocationType) {
632        self.locations.insert(location_key, location_type);
633    }
634}
635
636/// Function to extract an op_name from a print_root() result for use in labels.
637pub fn extract_op_name(full_label: String) -> String {
638    full_label
639        .split('(')
640        .next()
641        .unwrap_or("unknown")
642        .to_lowercase()
643}
644
645/// Extract a short, readable label from the full token stream label using print_root() style naming
646pub fn extract_short_label(full_label: &str) -> String {
647    // Use the same logic as extract_op_name but handle the specific cases we need for UI display
648    if let Some(op_name) = full_label.split('(').next() {
649        let base_name = op_name.to_lowercase();
650        match base_name.as_str() {
651            // Handle special cases for UI display
652            "source" => {
653                if full_label.contains("Iter") {
654                    "source_iter".to_owned()
655                } else if full_label.contains("Stream") {
656                    "source_stream".to_owned()
657                } else if full_label.contains("ExternalNetwork") {
658                    "external_network".to_owned()
659                } else if full_label.contains("Spin") {
660                    "spin".to_owned()
661                } else {
662                    "source".to_owned()
663                }
664            }
665            "network" => {
666                if full_label.contains("deser") {
667                    "network(recv)".to_owned()
668                } else if full_label.contains("ser") {
669                    "network(send)".to_owned()
670                } else {
671                    "network".to_owned()
672                }
673            }
674            // For all other cases, just use the lowercase base name (same as extract_op_name)
675            _ => base_name,
676        }
677    } else {
678        // Fallback for labels that don't follow the pattern
679        if full_label.len() > 20 {
680            format!("{}...", &full_label[..17])
681        } else {
682            full_label.to_owned()
683        }
684    }
685}
686
687/// Helper function to set up location in structure from metadata.
688fn setup_location(structure: &mut HydroGraphStructure, metadata: &HydroIrMetadata) -> LocationKey {
689    let root = metadata.location_id.root();
690    let location_key = root.key();
691    let location_type = root.location_type().unwrap();
692    structure.add_location(location_key, location_type);
693    location_key
694}
695
696/// Helper function to add an edge with semantic tags extracted from metadata.
697/// This function combines collection kind extraction with network detection.
698fn add_edge_with_metadata(
699    structure: &mut HydroGraphStructure,
700    src_id: VizNodeKey,
701    dst_id: VizNodeKey,
702    src_metadata: Option<&HydroIrMetadata>,
703    dst_metadata: Option<&HydroIrMetadata>,
704    label: Option<String>,
705) {
706    let mut properties = HashSet::new();
707
708    // Extract semantic tags from source metadata's collection kind
709    if let Some(metadata) = src_metadata {
710        properties.extend(extract_edge_properties_from_collection_kind(
711            &metadata.collection_kind,
712        ));
713    }
714
715    // Add network edge tag if locations differ
716    if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
717        add_network_edge_tag(
718            &mut properties,
719            &src_meta.location_id,
720            &dst_meta.location_id,
721        );
722    }
723
724    // If no properties were extracted, default to Stream
725    if properties.is_empty() {
726        properties.insert(HydroEdgeProp::Stream);
727    }
728
729    structure.add_edge(src_id, dst_id, properties, label);
730}
731
732/// Helper function to write a graph structure using any GraphWrite implementation
733fn write_graph_structure<W>(
734    structure: &HydroGraphStructure,
735    graph_write: W,
736    config: HydroWriteConfig<'_>,
737) -> Result<(), W::Err>
738where
739    W: HydroGraphWrite,
740{
741    let mut graph_write = graph_write;
742    // Write the graph
743    graph_write.write_prologue()?;
744
745    // Write node definitions
746    for (node_id, node) in structure.nodes.iter() {
747        let location_type = node
748            .location_key
749            .and_then(|loc_key| structure.locations.get(loc_key))
750            .copied();
751
752        graph_write.write_node_definition(
753            node_id,
754            &node.label,
755            node.node_type,
756            node.location_key,
757            location_type,
758            node.backtrace.as_ref(),
759        )?;
760    }
761
762    // Group nodes by location if requested
763    if config.show_location_groups {
764        let mut nodes_by_location = SecondaryMap::<LocationKey, Vec<VizNodeKey>>::new();
765        for (node_id, node) in structure.nodes.iter() {
766            if let Some(location_key) = node.location_key {
767                nodes_by_location
768                    .entry(location_key)
769                    .expect("location was removed")
770                    .or_default()
771                    .push(node_id);
772            }
773        }
774
775        for (location_key, node_ids) in nodes_by_location.iter() {
776            if let Some(&location_type) = structure.locations.get(location_key) {
777                graph_write.write_location_start(location_key, location_type)?;
778                for &node_id in node_ids.iter() {
779                    graph_write.write_node(node_id)?;
780                }
781                graph_write.write_location_end()?;
782            }
783        }
784    }
785
786    // Write edges
787    for edge in structure.edges.iter() {
788        graph_write.write_edge(
789            edge.src,
790            edge.dst,
791            &edge.edge_properties,
792            edge.label.as_deref(),
793        )?;
794    }
795
796    graph_write.write_epilogue()?;
797    Ok(())
798}
799
800impl HydroRoot {
801    /// Build the graph structure by traversing the IR tree.
802    pub fn build_graph_structure(
803        &self,
804        structure: &mut HydroGraphStructure,
805        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
806        config: HydroWriteConfig<'_>,
807    ) -> VizNodeKey {
808        // Helper function for sink nodes to reduce duplication
809        fn build_sink_node(
810            structure: &mut HydroGraphStructure,
811            seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
812            config: HydroWriteConfig<'_>,
813            input: &HydroNode,
814            sink_metadata: Option<&HydroIrMetadata>,
815            label: NodeLabel,
816        ) -> VizNodeKey {
817            let input_id = input.build_graph_structure(structure, seen_tees, config);
818
819            // If no explicit metadata is provided, extract it from the input node
820            let effective_metadata = if let Some(meta) = sink_metadata {
821                Some(meta)
822            } else {
823                match input {
824                    HydroNode::Placeholder => None,
825                    // All other variants have metadata
826                    _ => Some(input.metadata()),
827                }
828            };
829
830            let location_key = effective_metadata.map(|m| setup_location(structure, m));
831            let sink_id = structure.add_node_with_backtrace(
832                label,
833                HydroNodeType::Sink,
834                location_key,
835                effective_metadata.map(|m| m.op.backtrace.clone()),
836            );
837
838            // Extract semantic tags from input metadata
839            let input_metadata = input.metadata();
840            add_edge_with_metadata(
841                structure,
842                input_id,
843                sink_id,
844                Some(input_metadata),
845                sink_metadata,
846                None,
847            );
848
849            sink_id
850        }
851
852        match self {
853            // Sink operations - semantic tags extracted from input metadata
854            HydroRoot::ForEach { f, input, .. } => build_sink_node(
855                structure,
856                seen_tees,
857                config,
858                input,
859                None,
860                NodeLabel::with_exprs("for_each".to_owned(), vec![f.clone()]),
861            ),
862
863            HydroRoot::SendExternal {
864                to_external_key,
865                to_port_id,
866                input,
867                ..
868            } => build_sink_node(
869                structure,
870                seen_tees,
871                config,
872                input,
873                None,
874                NodeLabel::with_exprs(
875                    format!("send_external({}:{})", to_external_key, to_port_id),
876                    vec![],
877                ),
878            ),
879
880            HydroRoot::DestSink { sink, input, .. } => build_sink_node(
881                structure,
882                seen_tees,
883                config,
884                input,
885                None,
886                NodeLabel::with_exprs("dest_sink".to_owned(), vec![sink.clone()]),
887            ),
888
889            HydroRoot::CycleSink {
890                cycle_id, input, ..
891            } => build_sink_node(
892                structure,
893                seen_tees,
894                config,
895                input,
896                None,
897                NodeLabel::static_label(format!("cycle_sink({})", cycle_id)),
898            ),
899
900            HydroRoot::EmbeddedOutput { ident, input, .. } => build_sink_node(
901                structure,
902                seen_tees,
903                config,
904                input,
905                None,
906                NodeLabel::static_label(format!("embedded_output({})", ident)),
907            ),
908
909            HydroRoot::Null { input, .. } => build_sink_node(
910                structure,
911                seen_tees,
912                config,
913                input,
914                None,
915                NodeLabel::static_label("null".to_owned()),
916            ),
917        }
918    }
919}
920
921impl HydroNode {
922    /// Build the graph structure recursively for this node.
923    pub fn build_graph_structure(
924        &self,
925        structure: &mut HydroGraphStructure,
926        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
927        config: HydroWriteConfig<'_>,
928    ) -> VizNodeKey {
929        // Helper functions to reduce duplication, categorized by input/expression patterns
930
931        /// Common parameters for transform builder functions to reduce argument count
932        struct TransformParams<'a> {
933            structure: &'a mut HydroGraphStructure,
934            seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
935            config: HydroWriteConfig<'a>,
936            input: &'a HydroNode,
937            metadata: &'a HydroIrMetadata,
938            op_name: String,
939            node_type: HydroNodeType,
940        }
941
942        // Single-input transform with no expressions
943        fn build_simple_transform(params: TransformParams) -> VizNodeKey {
944            let input_id = params.input.build_graph_structure(
945                params.structure,
946                params.seen_tees,
947                params.config,
948            );
949            let node_id = params.structure.add_node_with_metadata(
950                NodeLabel::Static(params.op_name.to_string()),
951                params.node_type,
952                params.metadata,
953            );
954
955            // Extract semantic tags from input metadata
956            let input_metadata = params.input.metadata();
957            add_edge_with_metadata(
958                params.structure,
959                input_id,
960                node_id,
961                Some(input_metadata),
962                Some(params.metadata),
963                None,
964            );
965
966            node_id
967        }
968
969        // Single-input transform with one expression
970        fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> VizNodeKey {
971            let input_id = params.input.build_graph_structure(
972                params.structure,
973                params.seen_tees,
974                params.config,
975            );
976            let node_id = params.structure.add_node_with_metadata(
977                NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
978                params.node_type,
979                params.metadata,
980            );
981
982            // Extract semantic tags from input metadata
983            let input_metadata = params.input.metadata();
984            add_edge_with_metadata(
985                params.structure,
986                input_id,
987                node_id,
988                Some(input_metadata),
989                Some(params.metadata),
990                None,
991            );
992
993            node_id
994        }
995
996        // Single-input transform with two expressions
997        fn build_dual_expr_transform(
998            params: TransformParams,
999            expr1: &DebugExpr,
1000            expr2: &DebugExpr,
1001        ) -> VizNodeKey {
1002            let input_id = params.input.build_graph_structure(
1003                params.structure,
1004                params.seen_tees,
1005                params.config,
1006            );
1007            let node_id = params.structure.add_node_with_metadata(
1008                NodeLabel::with_exprs(
1009                    params.op_name.to_string(),
1010                    vec![expr1.clone(), expr2.clone()],
1011                ),
1012                params.node_type,
1013                params.metadata,
1014            );
1015
1016            // Extract semantic tags from input metadata
1017            let input_metadata = params.input.metadata();
1018            add_edge_with_metadata(
1019                params.structure,
1020                input_id,
1021                node_id,
1022                Some(input_metadata),
1023                Some(params.metadata),
1024                None,
1025            );
1026
1027            node_id
1028        }
1029
1030        // Helper function for source nodes
1031        fn build_source_node(
1032            structure: &mut HydroGraphStructure,
1033            metadata: &HydroIrMetadata,
1034            label: String,
1035        ) -> VizNodeKey {
1036            structure.add_node_with_metadata(
1037                NodeLabel::Static(label),
1038                HydroNodeType::Source,
1039                metadata,
1040            )
1041        }
1042
1043        match self {
1044            HydroNode::Placeholder => structure.add_node(
1045                NodeLabel::Static("PLACEHOLDER".to_owned()),
1046                HydroNodeType::Transform,
1047                None,
1048            ),
1049
1050            HydroNode::Source {
1051                source, metadata, ..
1052            } => {
1053                let label = match source {
1054                    HydroSource::Stream(expr) => format!("source_stream({})", expr),
1055                    HydroSource::ExternalNetwork() => "external_network()".to_owned(),
1056                    HydroSource::Iter(expr) => format!("source_iter({})", expr),
1057                    HydroSource::Spin() => "spin()".to_owned(),
1058                    HydroSource::ClusterMembers(location_id, _) => {
1059                        format!(
1060                            "source_stream(cluster_membership_stream({:?}))",
1061                            location_id
1062                        )
1063                    }
1064                    HydroSource::Embedded(ident) => {
1065                        format!("embedded_input({})", ident)
1066                    }
1067                    HydroSource::EmbeddedSingleton(ident) => {
1068                        format!("embedded_singleton_input({})", ident)
1069                    }
1070                };
1071                build_source_node(structure, metadata, label)
1072            }
1073
1074            HydroNode::SingletonSource {
1075                value,
1076                first_tick_only,
1077                metadata,
1078            } => {
1079                let label = if *first_tick_only {
1080                    format!("singleton_first_tick({})", value)
1081                } else {
1082                    format!("singleton({})", value)
1083                };
1084                build_source_node(structure, metadata, label)
1085            }
1086
1087            HydroNode::ExternalInput {
1088                from_external_key,
1089                from_port_id,
1090                metadata,
1091                ..
1092            } => build_source_node(
1093                structure,
1094                metadata,
1095                format!("external_input({}:{})", from_external_key, from_port_id),
1096            ),
1097
1098            HydroNode::CycleSource {
1099                cycle_id, metadata, ..
1100            } => build_source_node(structure, metadata, format!("cycle_source({})", cycle_id)),
1101
1102            HydroNode::Tee { inner, metadata } => {
1103                let ptr = inner.as_ptr();
1104                if let Some(&existing_id) = seen_tees.get(&ptr) {
1105                    return existing_id;
1106                }
1107
1108                let input_id = inner
1109                    .0
1110                    .borrow()
1111                    .build_graph_structure(structure, seen_tees, config);
1112                let tee_id = structure.add_node_with_metadata(
1113                    NodeLabel::Static(extract_op_name(self.print_root())),
1114                    HydroNodeType::Tee,
1115                    metadata,
1116                );
1117
1118                seen_tees.insert(ptr, tee_id);
1119
1120                // Extract semantic tags from input
1121                let inner_borrow = inner.0.borrow();
1122                let input_metadata = inner_borrow.metadata();
1123                add_edge_with_metadata(
1124                    structure,
1125                    input_id,
1126                    tee_id,
1127                    Some(input_metadata),
1128                    Some(metadata),
1129                    None,
1130                );
1131                drop(inner_borrow);
1132
1133                tee_id
1134            }
1135
1136            HydroNode::Partition {
1137                inner, metadata, ..
1138            } => {
1139                let ptr = inner.as_ptr();
1140                if let Some(&existing_id) = seen_tees.get(&ptr) {
1141                    return existing_id;
1142                }
1143
1144                let input_id = inner
1145                    .0
1146                    .borrow()
1147                    .build_graph_structure(structure, seen_tees, config);
1148                let partition_id = structure.add_node_with_metadata(
1149                    NodeLabel::Static(extract_op_name(self.print_root())),
1150                    HydroNodeType::Tee,
1151                    metadata,
1152                );
1153
1154                seen_tees.insert(ptr, partition_id);
1155
1156                // Extract semantic tags from input
1157                let inner_borrow = inner.0.borrow();
1158                let input_metadata = inner_borrow.metadata();
1159                add_edge_with_metadata(
1160                    structure,
1161                    input_id,
1162                    partition_id,
1163                    Some(input_metadata),
1164                    Some(metadata),
1165                    None,
1166                );
1167                drop(inner_borrow);
1168
1169                partition_id
1170            }
1171
1172            // Non-deterministic operation
1173            HydroNode::ObserveNonDet {
1174                inner, metadata, ..
1175            } => build_simple_transform(TransformParams {
1176                structure,
1177                seen_tees,
1178                config,
1179                input: inner,
1180                metadata,
1181                op_name: extract_op_name(self.print_root()),
1182                node_type: HydroNodeType::NonDeterministic,
1183            }),
1184
1185            // Transform operations with Stream edges - grouped by node/edge type
1186            HydroNode::Cast { inner, metadata }
1187            | HydroNode::DeferTick {
1188                input: inner,
1189                metadata,
1190            }
1191            | HydroNode::Enumerate {
1192                input: inner,
1193                metadata,
1194                ..
1195            }
1196            | HydroNode::Unique {
1197                input: inner,
1198                metadata,
1199            }
1200            | HydroNode::ResolveFutures {
1201                input: inner,
1202                metadata,
1203            }
1204            | HydroNode::ResolveFuturesOrdered {
1205                input: inner,
1206                metadata,
1207            } => build_simple_transform(TransformParams {
1208                structure,
1209                seen_tees,
1210                config,
1211                input: inner,
1212                metadata,
1213                op_name: extract_op_name(self.print_root()),
1214                node_type: HydroNodeType::Transform,
1215            }),
1216
1217            // Aggregation operation - semantic tags extracted from metadata
1218            HydroNode::Sort {
1219                input: inner,
1220                metadata,
1221            } => build_simple_transform(TransformParams {
1222                structure,
1223                seen_tees,
1224                config,
1225                input: inner,
1226                metadata,
1227                op_name: extract_op_name(self.print_root()),
1228                node_type: HydroNodeType::Aggregation,
1229            }),
1230
1231            // Single-expression Transform operations - grouped by node type
1232            HydroNode::Map { f, input, metadata }
1233            | HydroNode::Filter { f, input, metadata }
1234            | HydroNode::FlatMap { f, input, metadata }
1235            | HydroNode::FilterMap { f, input, metadata }
1236            | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1237                TransformParams {
1238                    structure,
1239                    seen_tees,
1240                    config,
1241                    input,
1242                    metadata,
1243                    op_name: extract_op_name(self.print_root()),
1244                    node_type: HydroNodeType::Transform,
1245                },
1246                f,
1247            ),
1248
1249            // Single-expression Aggregation operations - grouped by node type
1250            HydroNode::Reduce { f, input, metadata }
1251            | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1252                TransformParams {
1253                    structure,
1254                    seen_tees,
1255                    config,
1256                    input,
1257                    metadata,
1258                    op_name: extract_op_name(self.print_root()),
1259                    node_type: HydroNodeType::Aggregation,
1260                },
1261                f,
1262            ),
1263
1264            // Join-like operations with left/right edge labels - grouped by edge labeling
1265            HydroNode::Join {
1266                left,
1267                right,
1268                metadata,
1269            }
1270            | HydroNode::CrossProduct {
1271                left,
1272                right,
1273                metadata,
1274            }
1275            | HydroNode::CrossSingleton {
1276                left,
1277                right,
1278                metadata,
1279            } => {
1280                let left_id = left.build_graph_structure(structure, seen_tees, config);
1281                let right_id = right.build_graph_structure(structure, seen_tees, config);
1282                let node_id = structure.add_node_with_metadata(
1283                    NodeLabel::Static(extract_op_name(self.print_root())),
1284                    HydroNodeType::Join,
1285                    metadata,
1286                );
1287
1288                // Extract semantic tags for left edge
1289                let left_metadata = left.metadata();
1290                add_edge_with_metadata(
1291                    structure,
1292                    left_id,
1293                    node_id,
1294                    Some(left_metadata),
1295                    Some(metadata),
1296                    Some("left".to_owned()),
1297                );
1298
1299                // Extract semantic tags for right edge
1300                let right_metadata = right.metadata();
1301                add_edge_with_metadata(
1302                    structure,
1303                    right_id,
1304                    node_id,
1305                    Some(right_metadata),
1306                    Some(metadata),
1307                    Some("right".to_owned()),
1308                );
1309
1310                node_id
1311            }
1312
1313            // Join-like operations with pos/neg edge labels - grouped by edge labeling
1314            HydroNode::Difference {
1315                pos: left,
1316                neg: right,
1317                metadata,
1318            }
1319            | HydroNode::AntiJoin {
1320                pos: left,
1321                neg: right,
1322                metadata,
1323            } => {
1324                let left_id = left.build_graph_structure(structure, seen_tees, config);
1325                let right_id = right.build_graph_structure(structure, seen_tees, config);
1326                let node_id = structure.add_node_with_metadata(
1327                    NodeLabel::Static(extract_op_name(self.print_root())),
1328                    HydroNodeType::Join,
1329                    metadata,
1330                );
1331
1332                // Extract semantic tags for pos edge
1333                let left_metadata = left.metadata();
1334                add_edge_with_metadata(
1335                    structure,
1336                    left_id,
1337                    node_id,
1338                    Some(left_metadata),
1339                    Some(metadata),
1340                    Some("pos".to_owned()),
1341                );
1342
1343                // Extract semantic tags for neg edge
1344                let right_metadata = right.metadata();
1345                add_edge_with_metadata(
1346                    structure,
1347                    right_id,
1348                    node_id,
1349                    Some(right_metadata),
1350                    Some(metadata),
1351                    Some("neg".to_owned()),
1352                );
1353
1354                node_id
1355            }
1356
1357            // Dual expression transforms - consolidated using pattern matching
1358            HydroNode::Fold {
1359                init,
1360                acc,
1361                input,
1362                metadata,
1363            }
1364            | HydroNode::FoldKeyed {
1365                init,
1366                acc,
1367                input,
1368                metadata,
1369            }
1370            | HydroNode::Scan {
1371                init,
1372                acc,
1373                input,
1374                metadata,
1375            } => {
1376                let node_type = HydroNodeType::Aggregation; // All are aggregation operations
1377
1378                build_dual_expr_transform(
1379                    TransformParams {
1380                        structure,
1381                        seen_tees,
1382                        config,
1383                        input,
1384                        metadata,
1385                        op_name: extract_op_name(self.print_root()),
1386                        node_type,
1387                    },
1388                    init,
1389                    acc,
1390                )
1391            }
1392
1393            // Combination of join and transform
1394            HydroNode::ReduceKeyedWatermark {
1395                f,
1396                input,
1397                watermark,
1398                metadata,
1399            } => {
1400                let input_id = input.build_graph_structure(structure, seen_tees, config);
1401                let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1402                let location_key = Some(setup_location(structure, metadata));
1403                let join_node_id = structure.add_node_with_backtrace(
1404                    NodeLabel::Static(extract_op_name(self.print_root())),
1405                    HydroNodeType::Join,
1406                    location_key,
1407                    Some(metadata.op.backtrace.clone()),
1408                );
1409
1410                // Extract semantic tags for input edge
1411                let input_metadata = input.metadata();
1412                add_edge_with_metadata(
1413                    structure,
1414                    input_id,
1415                    join_node_id,
1416                    Some(input_metadata),
1417                    Some(metadata),
1418                    Some("input".to_owned()),
1419                );
1420
1421                // Extract semantic tags for watermark edge
1422                let watermark_metadata = watermark.metadata();
1423                add_edge_with_metadata(
1424                    structure,
1425                    watermark_id,
1426                    join_node_id,
1427                    Some(watermark_metadata),
1428                    Some(metadata),
1429                    Some("watermark".to_owned()),
1430                );
1431
1432                let node_id = structure.add_node_with_backtrace(
1433                    NodeLabel::with_exprs(extract_op_name(self.print_root()), vec![f.clone()]),
1434                    HydroNodeType::Aggregation,
1435                    location_key,
1436                    Some(metadata.op.backtrace.clone()),
1437                );
1438
1439                // Edge from join to aggregation node
1440                let join_metadata = metadata; // Use the same metadata
1441                add_edge_with_metadata(
1442                    structure,
1443                    join_node_id,
1444                    node_id,
1445                    Some(join_metadata),
1446                    Some(metadata),
1447                    None,
1448                );
1449
1450                node_id
1451            }
1452
1453            HydroNode::Network {
1454                serialize_fn,
1455                deserialize_fn,
1456                input,
1457                metadata,
1458                ..
1459            } => {
1460                let input_id = input.build_graph_structure(structure, seen_tees, config);
1461                let _from_location_key = setup_location(structure, metadata);
1462
1463                let root = metadata.location_id.root();
1464                let to_location_key = root.key();
1465                let to_location_type = root.location_type().unwrap();
1466                structure.add_location(to_location_key, to_location_type);
1467
1468                let mut label = "network(".to_owned();
1469                if serialize_fn.is_some() {
1470                    label.push_str("send");
1471                }
1472                if deserialize_fn.is_some() {
1473                    if serialize_fn.is_some() {
1474                        label.push_str(" + ");
1475                    }
1476                    label.push_str("recv");
1477                }
1478                label.push(')');
1479
1480                let network_id = structure.add_node_with_backtrace(
1481                    NodeLabel::Static(label),
1482                    HydroNodeType::Network,
1483                    Some(to_location_key),
1484                    Some(metadata.op.backtrace.clone()),
1485                );
1486
1487                // Extract semantic tags for network edge
1488                let input_metadata = input.metadata();
1489                add_edge_with_metadata(
1490                    structure,
1491                    input_id,
1492                    network_id,
1493                    Some(input_metadata),
1494                    Some(metadata),
1495                    Some(format!("to {:?}({})", to_location_type, to_location_key)),
1496                );
1497
1498                network_id
1499            }
1500
1501            // Non-deterministic batch operation
1502            HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
1503                structure,
1504                seen_tees,
1505                config,
1506                input: inner,
1507                metadata,
1508                op_name: extract_op_name(self.print_root()),
1509                node_type: HydroNodeType::NonDeterministic,
1510            }),
1511
1512            HydroNode::YieldConcat { inner, .. } => {
1513                // Unpersist is typically optimized away, just pass through
1514                inner.build_graph_structure(structure, seen_tees, config)
1515            }
1516
1517            HydroNode::BeginAtomic { inner, .. } => {
1518                inner.build_graph_structure(structure, seen_tees, config)
1519            }
1520
1521            HydroNode::EndAtomic { inner, .. } => {
1522                inner.build_graph_structure(structure, seen_tees, config)
1523            }
1524
1525            HydroNode::Chain {
1526                first,
1527                second,
1528                metadata,
1529            } => {
1530                let first_id = first.build_graph_structure(structure, seen_tees, config);
1531                let second_id = second.build_graph_structure(structure, seen_tees, config);
1532                let location_key = Some(setup_location(structure, metadata));
1533                let chain_id = structure.add_node_with_backtrace(
1534                    NodeLabel::Static(extract_op_name(self.print_root())),
1535                    HydroNodeType::Transform,
1536                    location_key,
1537                    Some(metadata.op.backtrace.clone()),
1538                );
1539
1540                // Extract semantic tags for first edge
1541                let first_metadata = first.metadata();
1542                add_edge_with_metadata(
1543                    structure,
1544                    first_id,
1545                    chain_id,
1546                    Some(first_metadata),
1547                    Some(metadata),
1548                    Some("first".to_owned()),
1549                );
1550
1551                // Extract semantic tags for second edge
1552                let second_metadata = second.metadata();
1553                add_edge_with_metadata(
1554                    structure,
1555                    second_id,
1556                    chain_id,
1557                    Some(second_metadata),
1558                    Some(metadata),
1559                    Some("second".to_owned()),
1560                );
1561
1562                chain_id
1563            }
1564
1565            HydroNode::ChainFirst {
1566                first,
1567                second,
1568                metadata,
1569            } => {
1570                let first_id = first.build_graph_structure(structure, seen_tees, config);
1571                let second_id = second.build_graph_structure(structure, seen_tees, config);
1572                let location_key = Some(setup_location(structure, metadata));
1573                let chain_id = structure.add_node_with_backtrace(
1574                    NodeLabel::Static(extract_op_name(self.print_root())),
1575                    HydroNodeType::Transform,
1576                    location_key,
1577                    Some(metadata.op.backtrace.clone()),
1578                );
1579
1580                // Extract semantic tags for first edge
1581                let first_metadata = first.metadata();
1582                add_edge_with_metadata(
1583                    structure,
1584                    first_id,
1585                    chain_id,
1586                    Some(first_metadata),
1587                    Some(metadata),
1588                    Some("first".to_owned()),
1589                );
1590
1591                // Extract semantic tags for second edge
1592                let second_metadata = second.metadata();
1593                add_edge_with_metadata(
1594                    structure,
1595                    second_id,
1596                    chain_id,
1597                    Some(second_metadata),
1598                    Some(metadata),
1599                    Some("second".to_owned()),
1600                );
1601
1602                chain_id
1603            }
1604
1605            HydroNode::Counter {
1606                tag: _,
1607                prefix: _,
1608                duration,
1609                input,
1610                metadata,
1611            } => build_single_expr_transform(
1612                TransformParams {
1613                    structure,
1614                    seen_tees,
1615                    config,
1616                    input,
1617                    metadata,
1618                    op_name: extract_op_name(self.print_root()),
1619                    node_type: HydroNodeType::Transform,
1620                },
1621                duration,
1622            ),
1623        }
1624    }
1625}
1626
1627/// Utility functions for rendering multiple roots as a single graph.
1628/// Macro to reduce duplication in render functions.
1629macro_rules! render_hydro_ir {
1630    ($name:ident, $write_fn:ident) => {
1631        pub fn $name(roots: &[HydroRoot], config: HydroWriteConfig<'_>) -> String {
1632            let mut output = String::new();
1633            $write_fn(&mut output, roots, config).unwrap();
1634            output
1635        }
1636    };
1637}
1638
1639/// Macro to reduce duplication in write functions.
1640macro_rules! write_hydro_ir {
1641    ($name:ident, $writer_type:ty, $constructor:expr) => {
1642        pub fn $name(
1643            output: impl std::fmt::Write,
1644            roots: &[HydroRoot],
1645            config: HydroWriteConfig<'_>,
1646        ) -> std::fmt::Result {
1647            let mut graph_write: $writer_type = $constructor(output, config);
1648            write_hydro_ir_graph(&mut graph_write, roots, config)
1649        }
1650    };
1651}
1652
1653render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1654write_hydro_ir!(
1655    write_hydro_ir_mermaid,
1656    HydroMermaid<_>,
1657    HydroMermaid::new_with_config
1658);
1659
1660render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1661write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1662
1663// Legacy hydroscope function - now uses HydroJson for consistency
1664render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1665
1666// JSON rendering
1667render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1668write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1669
1670fn write_hydro_ir_graph<W>(
1671    graph_write: W,
1672    roots: &[HydroRoot],
1673    config: HydroWriteConfig<'_>,
1674) -> Result<(), W::Err>
1675where
1676    W: HydroGraphWrite,
1677{
1678    let mut structure = HydroGraphStructure::new();
1679    let mut seen_tees = HashMap::new();
1680
1681    // Build the graph structure for all roots
1682    for leaf in roots {
1683        leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1684    }
1685
1686    write_graph_structure(&structure, graph_write, config)
1687}