Skip to main content

hydro_lang/compile/
builder.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use slotmap::{SecondaryMap, SlotMap};
7
8#[cfg(feature = "build")]
9use super::compiled::CompiledFlow;
10#[cfg(feature = "build")]
11use super::deploy::{DeployFlow, DeployResult};
12#[cfg(feature = "build")]
13use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use super::ir::HydroRoot;
15use crate::location::{Cluster, External, LocationKey, LocationType, Process};
16#[cfg(feature = "sim")]
17#[cfg(stageleft_runtime)]
18use crate::sim::flow::SimFlow;
19use crate::staging_util::Invariant;
20
21crate::newtype_counter! {
22    /// Counter for generating unique external output identifiers.
23    pub struct ExternalPortId(usize);
24}
25
26pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
27
28pub(crate) struct FlowStateInner {
29    /// Tracks the roots of the dataflow IR. This is referenced by
30    /// `Stream` and `HfCycle` to build the IR. The inner option will
31    /// be set to `None` when this builder is finalized.
32    pub(crate) roots: Option<Vec<HydroRoot>>,
33
34    /// Counter for generating unique external output identifiers.
35    pub(crate) next_external_port: ExternalPortId,
36
37    /// Counters for generating identifiers for cycles.
38    pub(crate) cycle_counts: usize,
39
40    /// Counters for clock IDs.
41    pub(crate) next_clock_id: usize,
42}
43
44impl FlowStateInner {
45    pub fn next_cycle_id(&mut self) -> usize {
46        let id = self.cycle_counts;
47        self.cycle_counts += 1;
48        id
49    }
50
51    pub fn push_root(&mut self, root: HydroRoot) {
52        self.roots
53            .as_mut()
54            .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
55            .push(root);
56    }
57}
58
59pub struct FlowBuilder<'a> {
60    /// Hydro IR and associated counters
61    flow_state: FlowState,
62
63    /// Locations and their type.
64    locations: SlotMap<LocationKey, LocationType>,
65    /// Map from raw location ID to name (including externals).
66    location_names: SecondaryMap<LocationKey, String>,
67
68    /// Application name used in telemetry.
69    #[cfg_attr(
70        not(feature = "build"),
71        expect(dead_code, reason = "unused without build")
72    )]
73    flow_name: String,
74
75    /// Tracks whether this flow has been finalized; it is an error to
76    /// drop without finalizing.
77    finalized: bool,
78
79    /// 'a on a FlowBuilder is used to ensure that staged code does not
80    /// capture more data that it is allowed to; 'a is generated at the
81    /// entrypoint of the staged code and we keep it invariant here
82    /// to enforce the appropriate constraints
83    _phantom: Invariant<'a>,
84}
85
86impl Drop for FlowBuilder<'_> {
87    fn drop(&mut self) {
88        if !self.finalized && !std::thread::panicking() {
89            panic!(
90                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
91            );
92        }
93    }
94}
95
96#[expect(missing_docs, reason = "TODO")]
97impl<'a> FlowBuilder<'a> {
98    /// Creates a new `FlowBuilder` to construct a Hydro program, using the Cargo package name as the program name.
99    #[expect(
100        clippy::new_without_default,
101        reason = "call `new` explicitly, not `default`"
102    )]
103    pub fn new() -> Self {
104        let mut name = std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".to_owned());
105        if let Ok(bin_path) = std::env::current_exe()
106            && let Some(bin_name) = bin_path.file_stem()
107        {
108            name = format!("{}/{}", name, bin_name.display());
109        }
110        Self::with_name(name)
111    }
112
113    /// Creates a new `FlowBuilder` to construct a Hydro program, with the given program name.
114    pub fn with_name(name: impl Into<String>) -> Self {
115        Self {
116            flow_state: Rc::new(RefCell::new(FlowStateInner {
117                roots: Some(vec![]),
118                next_external_port: ExternalPortId(0),
119                cycle_counts: 0,
120                next_clock_id: 0,
121            })),
122            locations: SlotMap::with_key(),
123            location_names: SecondaryMap::new(),
124            flow_name: name.into(),
125            finalized: false,
126            _phantom: PhantomData,
127        }
128    }
129
130    pub(crate) fn flow_state(&self) -> &FlowState {
131        &self.flow_state
132    }
133
134    pub fn process<P>(&mut self) -> Process<'a, P> {
135        let key = self.locations.insert(LocationType::Process);
136        self.location_names.insert(key, type_name::<P>().to_owned());
137        Process {
138            key,
139            flow_state: self.flow_state().clone(),
140            _phantom: PhantomData,
141        }
142    }
143
144    pub fn cluster<C>(&mut self) -> Cluster<'a, C> {
145        let key = self.locations.insert(LocationType::Cluster);
146        self.location_names.insert(key, type_name::<C>().to_owned());
147        Cluster {
148            key,
149            flow_state: self.flow_state().clone(),
150            _phantom: PhantomData,
151        }
152    }
153
154    pub fn external<E>(&mut self) -> External<'a, E> {
155        let key = self.locations.insert(LocationType::External);
156        self.location_names.insert(key, type_name::<E>().to_owned());
157        External {
158            key,
159            flow_state: self.flow_state().clone(),
160            _phantom: PhantomData,
161        }
162    }
163}
164
165#[cfg(feature = "build")]
166#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
167#[expect(missing_docs, reason = "TODO")]
168impl<'a> FlowBuilder<'a> {
169    pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
170        self.finalized = true;
171
172        super::built::BuiltFlow {
173            ir: self.flow_state.borrow_mut().roots.take().unwrap(),
174            locations: std::mem::take(&mut self.locations),
175            location_names: std::mem::take(&mut self.location_names),
176            flow_name: std::mem::take(&mut self.flow_name),
177            _phantom: PhantomData,
178        }
179    }
180
181    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
182        self.finalize().with_default_optimize()
183    }
184
185    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
186        self.finalize().optimize_with(f)
187    }
188
189    pub fn with_process<P, D: Deploy<'a>>(
190        self,
191        process: &Process<P>,
192        spec: impl IntoProcessSpec<'a, D>,
193    ) -> DeployFlow<'a, D> {
194        self.with_default_optimize().with_process(process, spec)
195    }
196
197    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
198        self,
199        spec: impl Fn() -> S,
200    ) -> DeployFlow<'a, D> {
201        self.with_default_optimize().with_remaining_processes(spec)
202    }
203
204    pub fn with_external<P, D: Deploy<'a>>(
205        self,
206        process: &External<P>,
207        spec: impl ExternalSpec<'a, D>,
208    ) -> DeployFlow<'a, D> {
209        self.with_default_optimize().with_external(process, spec)
210    }
211
212    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
213        self,
214        spec: impl Fn() -> S,
215    ) -> DeployFlow<'a, D> {
216        self.with_default_optimize().with_remaining_externals(spec)
217    }
218
219    pub fn with_cluster<C, D: Deploy<'a>>(
220        self,
221        cluster: &Cluster<C>,
222        spec: impl ClusterSpec<'a, D>,
223    ) -> DeployFlow<'a, D> {
224        self.with_default_optimize().with_cluster(cluster, spec)
225    }
226
227    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
228        self,
229        spec: impl Fn() -> S,
230    ) -> DeployFlow<'a, D> {
231        self.with_default_optimize().with_remaining_clusters(spec)
232    }
233
234    pub fn compile<D: Deploy<'a>>(self) -> CompiledFlow<'a> {
235        self.with_default_optimize::<D>().compile()
236    }
237
238    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
239        self.with_default_optimize().deploy(env)
240    }
241
242    #[cfg(feature = "sim")]
243    /// Creates a simulation for this builder, which can be used to run deterministic simulations
244    /// of the Hydro program.
245    pub fn sim(self) -> SimFlow<'a> {
246        self.finalize().sim()
247    }
248
249    pub fn from_built<'b>(built: &super::built::BuiltFlow) -> FlowBuilder<'b> {
250        FlowBuilder {
251            flow_state: Rc::new(RefCell::new(FlowStateInner {
252                roots: None,
253                next_external_port: ExternalPortId(0),
254                cycle_counts: 0,
255                next_clock_id: 0,
256            })),
257            locations: built.locations.clone(),
258            location_names: built.location_names.clone(),
259            flow_name: built.flow_name.clone(),
260            finalized: false,
261            _phantom: PhantomData,
262        }
263    }
264
265    #[doc(hidden)] // TODO(mingwei): This is an unstable API for now
266    pub fn replace_ir(&mut self, roots: Vec<HydroRoot>) {
267        self.flow_state.borrow_mut().roots = Some(roots);
268    }
269}