Skip to main content

hydro_lang/compile/
builder.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use slotmap::{SecondaryMap, SlotMap};
7
8#[cfg(feature = "build")]
9use super::compiled::CompiledFlow;
10#[cfg(feature = "build")]
11use super::deploy::{DeployFlow, DeployResult};
12#[cfg(feature = "build")]
13use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use super::ir::HydroRoot;
15use crate::location::{Cluster, External, LocationKey, LocationType, Process};
16#[cfg(feature = "sim")]
17#[cfg(stageleft_runtime)]
18use crate::sim::flow::SimFlow;
19use crate::staging_util::Invariant;
20
21#[stageleft::export(ExternalPortId, CycleId, ClockId)]
22crate::newtype_counter! {
23    /// ID for an external output.
24    pub struct ExternalPortId(usize);
25
26    /// ID for a [`crate::location::Location::forward_ref`] cycle.
27    pub struct CycleId(usize);
28
29    /// ID for clocks (ticks).
30    pub struct ClockId(usize);
31}
32
33impl CycleId {
34    #[cfg(feature = "build")]
35    pub(crate) fn as_ident(&self) -> syn::Ident {
36        syn::Ident::new(&format!("cycle_{}", self), proc_macro2::Span::call_site())
37    }
38}
39
40pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
41
42pub(crate) struct FlowStateInner {
43    /// Tracks the roots of the dataflow IR. This is referenced by
44    /// `Stream` and `HfCycle` to build the IR. The inner option will
45    /// be set to `None` when this builder is finalized.
46    roots: Option<Vec<HydroRoot>>,
47
48    /// Counter for generating unique external output identifiers.
49    next_external_port: ExternalPortId,
50
51    /// Counters for generating identifiers for cycles.
52    next_cycle_id: CycleId,
53
54    /// Counters for clock IDs.
55    next_clock_id: ClockId,
56}
57
58impl FlowStateInner {
59    pub fn next_external_port(&mut self) -> ExternalPortId {
60        self.next_external_port.get_and_increment()
61    }
62
63    pub fn next_cycle_id(&mut self) -> CycleId {
64        self.next_cycle_id.get_and_increment()
65    }
66
67    pub fn next_clock_id(&mut self) -> ClockId {
68        self.next_clock_id.get_and_increment()
69    }
70
71    pub fn push_root(&mut self, root: HydroRoot) {
72        self.roots
73            .as_mut()
74            .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
75            .push(root);
76    }
77
78    pub fn try_push_root(&mut self, root: HydroRoot) {
79        if let Some(roots) = self.roots.as_mut() {
80            roots.push(root);
81        }
82    }
83}
84
85pub struct FlowBuilder<'a> {
86    /// Hydro IR and associated counters
87    flow_state: FlowState,
88
89    /// Locations and their type.
90    locations: SlotMap<LocationKey, LocationType>,
91    /// Map from raw location ID to name (including externals).
92    location_names: SecondaryMap<LocationKey, String>,
93
94    /// Application name used in telemetry.
95    #[cfg_attr(
96        not(feature = "build"),
97        expect(dead_code, reason = "unused without build")
98    )]
99    flow_name: String,
100
101    /// Tracks whether this flow has been finalized; it is an error to
102    /// drop without finalizing.
103    finalized: bool,
104
105    /// 'a on a FlowBuilder is used to ensure that staged code does not
106    /// capture more data that it is allowed to; 'a is generated at the
107    /// entrypoint of the staged code and we keep it invariant here
108    /// to enforce the appropriate constraints
109    _phantom: Invariant<'a>,
110}
111
112impl Drop for FlowBuilder<'_> {
113    fn drop(&mut self) {
114        if !self.finalized && !std::thread::panicking() {
115            panic!(
116                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
117            );
118        }
119    }
120}
121
122#[expect(missing_docs, reason = "TODO")]
123impl<'a> FlowBuilder<'a> {
124    /// Creates a new `FlowBuilder` to construct a Hydro program, using the Cargo package name as the program name.
125    #[expect(
126        clippy::new_without_default,
127        reason = "call `new` explicitly, not `default`"
128    )]
129    pub fn new() -> Self {
130        let mut name = std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".to_owned());
131        if let Ok(bin_path) = std::env::current_exe()
132            && let Some(bin_name) = bin_path.file_stem()
133        {
134            name = format!("{}/{}", name, bin_name.display());
135        }
136        Self::with_name(name)
137    }
138
139    /// Creates a new `FlowBuilder` to construct a Hydro program, with the given program name.
140    pub fn with_name(name: impl Into<String>) -> Self {
141        Self {
142            flow_state: Rc::new(RefCell::new(FlowStateInner {
143                roots: Some(vec![]),
144                next_external_port: ExternalPortId::default(),
145                next_cycle_id: CycleId::default(),
146                next_clock_id: ClockId::default(),
147            })),
148            locations: SlotMap::with_key(),
149            location_names: SecondaryMap::new(),
150            flow_name: name.into(),
151            finalized: false,
152            _phantom: PhantomData,
153        }
154    }
155
156    pub(crate) fn flow_state(&self) -> &FlowState {
157        &self.flow_state
158    }
159
160    pub fn process<P>(&mut self) -> Process<'a, P> {
161        let key = self.locations.insert(LocationType::Process);
162        self.location_names.insert(key, type_name::<P>().to_owned());
163        Process {
164            key,
165            flow_state: self.flow_state().clone(),
166            _phantom: PhantomData,
167        }
168    }
169
170    pub fn cluster<C>(&mut self) -> Cluster<'a, C> {
171        let key = self.locations.insert(LocationType::Cluster);
172        self.location_names.insert(key, type_name::<C>().to_owned());
173        Cluster {
174            key,
175            flow_state: self.flow_state().clone(),
176            _phantom: PhantomData,
177        }
178    }
179
180    pub fn external<E>(&mut self) -> External<'a, E> {
181        let key = self.locations.insert(LocationType::External);
182        self.location_names.insert(key, type_name::<E>().to_owned());
183        External {
184            key,
185            flow_state: self.flow_state().clone(),
186            _phantom: PhantomData,
187        }
188    }
189}
190
191#[cfg(feature = "build")]
192#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
193#[expect(missing_docs, reason = "TODO")]
194impl<'a> FlowBuilder<'a> {
195    pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
196        self.finalized = true;
197
198        super::built::BuiltFlow {
199            ir: self.flow_state.borrow_mut().roots.take().unwrap(),
200            locations: std::mem::take(&mut self.locations),
201            location_names: std::mem::take(&mut self.location_names),
202            flow_name: std::mem::take(&mut self.flow_name),
203            _phantom: PhantomData,
204        }
205    }
206
207    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
208        self.finalize().with_default_optimize()
209    }
210
211    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
212        self.finalize().optimize_with(f)
213    }
214
215    pub fn with_process<P, D: Deploy<'a>>(
216        self,
217        process: &Process<P>,
218        spec: impl IntoProcessSpec<'a, D>,
219    ) -> DeployFlow<'a, D> {
220        self.with_default_optimize().with_process(process, spec)
221    }
222
223    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
224        self,
225        spec: impl Fn() -> S,
226    ) -> DeployFlow<'a, D> {
227        self.with_default_optimize().with_remaining_processes(spec)
228    }
229
230    pub fn with_external<P, D: Deploy<'a>>(
231        self,
232        process: &External<P>,
233        spec: impl ExternalSpec<'a, D>,
234    ) -> DeployFlow<'a, D> {
235        self.with_default_optimize().with_external(process, spec)
236    }
237
238    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
239        self,
240        spec: impl Fn() -> S,
241    ) -> DeployFlow<'a, D> {
242        self.with_default_optimize().with_remaining_externals(spec)
243    }
244
245    pub fn with_cluster<C, D: Deploy<'a>>(
246        self,
247        cluster: &Cluster<C>,
248        spec: impl ClusterSpec<'a, D>,
249    ) -> DeployFlow<'a, D> {
250        self.with_default_optimize().with_cluster(cluster, spec)
251    }
252
253    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
254        self,
255        spec: impl Fn() -> S,
256    ) -> DeployFlow<'a, D> {
257        self.with_default_optimize().with_remaining_clusters(spec)
258    }
259
260    pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
261        self.with_default_optimize::<D>().compile()
262    }
263
264    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
265        self.with_default_optimize().deploy(env)
266    }
267
268    #[cfg(feature = "sim")]
269    /// Creates a simulation for this builder, which can be used to run deterministic simulations
270    /// of the Hydro program.
271    pub fn sim(self) -> SimFlow<'a> {
272        self.finalize().sim()
273    }
274
275    pub fn from_built<'b>(built: &super::built::BuiltFlow) -> FlowBuilder<'b> {
276        FlowBuilder {
277            flow_state: Rc::new(RefCell::new(FlowStateInner {
278                roots: None,
279                next_external_port: ExternalPortId::default(),
280                next_cycle_id: CycleId::default(),
281                next_clock_id: ClockId::default(),
282            })),
283            locations: built.locations.clone(),
284            location_names: built.location_names.clone(),
285            flow_name: built.flow_name.clone(),
286            finalized: false,
287            _phantom: PhantomData,
288        }
289    }
290
291    #[doc(hidden)] // TODO(mingwei): This is an unstable API for now
292    pub fn replace_ir(&mut self, roots: Vec<HydroRoot>) {
293        self.flow_state.borrow_mut().roots = Some(roots);
294    }
295}