1use std::collections::HashMap;
2use std::io::Error;
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Sink, Stream};
8use proc_macro2::Span;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
12use stageleft::QuotedWithContext;
13
14use super::built::build_inner;
15use super::compiled::CompiledFlow;
16use super::deploy_provider::{
17 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
18};
19use super::ir::HydroRoot;
20use crate::live_collections::stream::{Ordering, Retries};
21use crate::location::dynamic::LocationId;
22use crate::location::external_process::{
23 ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
24};
25use crate::location::{Cluster, External, Location, LocationKey, LocationType, Process};
26use crate::staging_util::Invariant;
27use crate::telemetry::Sidecar;
28
29pub struct DeployFlow<'a, D>
30where
31 D: Deploy<'a>,
32{
33 pub(super) ir: Vec<HydroRoot>,
34
35 pub(super) locations: SlotMap<LocationKey, LocationType>,
36 pub(super) location_names: SecondaryMap<LocationKey, String>,
37
38 pub(super) processes: SparseSecondaryMap<LocationKey, D::Process>,
40 pub(super) clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
41 pub(super) externals: SparseSecondaryMap<LocationKey, D::External>,
42
43 pub(super) sidecars: SparseSecondaryMap<LocationKey, Vec<syn::Expr>>,
46
47 pub(super) flow_name: String,
49
50 pub(super) _phantom: Invariant<'a, D>,
51}
52
53impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
54 pub fn ir(&self) -> &Vec<HydroRoot> {
55 &self.ir
56 }
57
58 pub fn flow_name(&self) -> &str {
60 &self.flow_name
61 }
62
63 pub fn with_process<P>(
64 mut self,
65 process: &Process<P>,
66 spec: impl IntoProcessSpec<'a, D>,
67 ) -> Self {
68 self.processes.insert(
69 process.key,
70 spec.into_process_spec()
71 .build(process.key, &self.location_names[process.key]),
72 );
73 self
74 }
75
76 #[doc(hidden)]
78 pub fn with_process_erased(
79 mut self,
80 process_loc_key: LocationKey,
81 spec: impl IntoProcessSpec<'a, D>,
82 ) -> Self {
83 assert_eq!(
84 Some(&LocationType::Process),
85 self.locations.get(process_loc_key),
86 "No process with the given `LocationKey` was found."
87 );
88 self.processes.insert(
89 process_loc_key,
90 spec.into_process_spec()
91 .build(process_loc_key, &self.location_names[process_loc_key]),
92 );
93 self
94 }
95
96 pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
97 mut self,
98 spec: impl Fn() -> S,
99 ) -> Self {
100 for (location_key, &location_type) in self.locations.iter() {
101 if LocationType::Process == location_type {
102 self.processes
103 .entry(location_key)
104 .expect("location was removed")
105 .or_insert_with(|| {
106 spec()
107 .into_process_spec()
108 .build(location_key, &self.location_names[location_key])
109 });
110 }
111 }
112 self
113 }
114
115 pub fn with_cluster<C>(mut self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
116 self.clusters.insert(
117 cluster.key,
118 spec.build(cluster.key, &self.location_names[cluster.key]),
119 );
120 self
121 }
122
123 #[doc(hidden)]
125 pub fn with_cluster_erased(
126 mut self,
127 cluster_loc_key: LocationKey,
128 spec: impl ClusterSpec<'a, D>,
129 ) -> Self {
130 assert_eq!(
131 Some(&LocationType::Cluster),
132 self.locations.get(cluster_loc_key),
133 "No cluster with the given `LocationKey` was found."
134 );
135 self.clusters.insert(
136 cluster_loc_key,
137 spec.build(cluster_loc_key, &self.location_names[cluster_loc_key]),
138 );
139 self
140 }
141
142 pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
143 mut self,
144 spec: impl Fn() -> S,
145 ) -> Self {
146 for (location_key, &location_type) in self.locations.iter() {
147 if LocationType::Cluster == location_type {
148 self.clusters
149 .entry(location_key)
150 .expect("location was removed")
151 .or_insert_with(|| {
152 spec().build(location_key, &self.location_names[location_key])
153 });
154 }
155 }
156 self
157 }
158
159 pub fn with_external<P>(
160 mut self,
161 external: &External<P>,
162 spec: impl ExternalSpec<'a, D>,
163 ) -> Self {
164 self.externals.insert(
165 external.key,
166 spec.build(external.key, &self.location_names[external.key]),
167 );
168 self
169 }
170
171 pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
172 mut self,
173 spec: impl Fn() -> S,
174 ) -> Self {
175 for (location_key, &location_type) in self.locations.iter() {
176 if LocationType::External == location_type {
177 self.externals
178 .entry(location_key)
179 .expect("location was removed")
180 .or_insert_with(|| {
181 spec().build(location_key, &self.location_names[location_key])
182 });
183 }
184 }
185 self
186 }
187
188 pub fn with_sidecar_all(mut self, sidecar: &impl Sidecar) -> Self {
190 for (location_key, &location_type) in self.locations.iter() {
191 if !matches!(location_type, LocationType::Process | LocationType::Cluster) {
192 continue;
193 }
194
195 let location_name = &self.location_names[location_key];
196
197 let sidecar = sidecar.to_expr(
198 self.flow_name(),
199 location_key,
200 location_type,
201 location_name,
202 "e::format_ident!("{}", super::DFIR_IDENT),
203 );
204 self.sidecars
205 .entry(location_key)
206 .expect("location was removed")
207 .or_default()
208 .push(sidecar);
209 }
210
211 self
212 }
213
214 pub fn with_sidecar_internal(
216 mut self,
217 location_key: LocationKey,
218 sidecar: &impl Sidecar,
219 ) -> Self {
220 let location_type = self.locations[location_key];
221 let location_name = &self.location_names[location_key];
222 let sidecar = sidecar.to_expr(
223 self.flow_name(),
224 location_key,
225 location_type,
226 location_name,
227 "e::format_ident!("{}", super::DFIR_IDENT),
228 );
229 self.sidecars
230 .entry(location_key)
231 .expect("location was removed")
232 .or_default()
233 .push(sidecar);
234 self
235 }
236
237 pub fn with_sidecar_process(self, process: &Process<()>, sidecar: &impl Sidecar) -> Self {
239 self.with_sidecar_internal(process.key, sidecar)
240 }
241
242 pub fn with_sidecar_cluster(self, cluster: &Cluster<()>, sidecar: &impl Sidecar) -> Self {
244 self.with_sidecar_internal(cluster.key, sidecar)
245 }
246
247 pub fn preview_compile(&mut self) -> CompiledFlow<'a> {
252 CompiledFlow {
255 dfir: build_inner::<D>(&mut self.ir),
256 extra_stmts: SparseSecondaryMap::new(),
257 sidecars: SparseSecondaryMap::new(),
258 _phantom: PhantomData,
259 }
260 }
261
262 pub fn compile(mut self) -> CompiledFlow<'a> {
266 self.compile_internal()
267 }
268
269 fn compile_internal(&mut self) -> CompiledFlow<'a> {
273 let mut seen_tees: HashMap<_, _> = HashMap::new();
274 let mut extra_stmts = SparseSecondaryMap::new();
275 for leaf in self.ir.iter_mut() {
276 leaf.compile_network::<D>(
277 &mut extra_stmts,
278 &mut seen_tees,
279 &self.processes,
280 &self.clusters,
281 &self.externals,
282 );
283 }
284
285 CompiledFlow {
286 dfir: build_inner::<D>(&mut self.ir),
287 extra_stmts,
288 sidecars: std::mem::take(&mut self.sidecars),
289 _phantom: PhantomData,
290 }
291 }
292
293 fn cluster_id_stmts(&self, extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>) {
295 #[expect(
296 clippy::disallowed_methods,
297 reason = "nondeterministic iteration order, will be sorted"
298 )]
299 let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
300 all_clusters_sorted.sort();
301
302 for cluster_key in all_clusters_sorted {
303 let self_id_ident = syn::Ident::new(
304 &format!("__hydro_lang_cluster_self_id_{}", cluster_key),
305 Span::call_site(),
306 );
307 let self_id_expr = D::cluster_self_id().splice_untyped();
308 extra_stmts
309 .entry(cluster_key)
310 .expect("location was removed")
311 .or_default()
312 .push(syn::parse_quote! {
313 let #self_id_ident = &*Box::leak(Box::new(#self_id_expr));
314 });
315
316 let process_cluster_locations = self.location_names.keys().filter(|&location_key| {
317 self.processes.contains_key(location_key)
318 || self.clusters.contains_key(location_key)
319 });
320 for other_location in process_cluster_locations {
321 let other_id_ident = syn::Ident::new(
322 &format!("__hydro_lang_cluster_ids_{}", cluster_key),
323 Span::call_site(),
324 );
325 let other_id_expr = D::cluster_ids(cluster_key).splice_untyped();
326 extra_stmts
327 .entry(other_location)
328 .expect("location was removed")
329 .or_default()
330 .push(syn::parse_quote! {
331 let #other_id_ident = #other_id_expr;
332 });
333 }
334 }
335 }
336
337 #[must_use]
345 pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
346 let CompiledFlow {
347 dfir,
348 mut extra_stmts,
349 mut sidecars,
350 _phantom,
351 } = self.compile_internal();
352
353 let mut compiled = dfir;
354 self.cluster_id_stmts(&mut extra_stmts);
355 let mut meta = D::Meta::default();
356
357 let (processes, clusters, externals) = (
358 self.processes
359 .into_iter()
360 .filter(|&(node_key, ref node)| {
361 if let Some(ir) = compiled.remove(node_key) {
362 node.instantiate(
363 env,
364 &mut meta,
365 ir,
366 extra_stmts.remove(node_key).as_deref().unwrap_or_default(),
367 sidecars.remove(node_key).as_deref().unwrap_or_default(),
368 );
369 true
370 } else {
371 false
372 }
373 })
374 .collect::<SparseSecondaryMap<_, _>>(),
375 self.clusters
376 .into_iter()
377 .filter(|&(cluster_key, ref cluster)| {
378 if let Some(ir) = compiled.remove(cluster_key) {
379 cluster.instantiate(
380 env,
381 &mut meta,
382 ir,
383 extra_stmts
384 .remove(cluster_key)
385 .as_deref()
386 .unwrap_or_default(),
387 sidecars.remove(cluster_key).as_deref().unwrap_or_default(),
388 );
389 true
390 } else {
391 false
392 }
393 })
394 .collect::<SparseSecondaryMap<_, _>>(),
395 self.externals
396 .into_iter()
397 .inspect(|&(external_key, ref external)| {
398 assert!(!extra_stmts.contains_key(external_key));
399 assert!(!sidecars.contains_key(external_key));
400 external.instantiate(env, &mut meta, Default::default(), &[], &[]);
401 })
402 .collect::<SparseSecondaryMap<_, _>>(),
403 );
404
405 for location_key in self.locations.keys() {
406 if let Some(node) = processes.get(location_key) {
407 node.update_meta(&meta);
408 } else if let Some(cluster) = clusters.get(location_key) {
409 cluster.update_meta(&meta);
410 } else if let Some(external) = externals.get(location_key) {
411 external.update_meta(&meta);
412 }
413 }
414
415 let mut seen_tees_connect = HashMap::new();
416 for leaf in self.ir.iter_mut() {
417 leaf.connect_network(&mut seen_tees_connect);
418 }
419
420 DeployResult {
421 location_names: self.location_names,
422 processes,
423 clusters,
424 externals,
425 }
426 }
427}
428
429pub struct DeployResult<'a, D: Deploy<'a>> {
430 location_names: SecondaryMap<LocationKey, String>,
431 processes: SparseSecondaryMap<LocationKey, D::Process>,
432 clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
433 externals: SparseSecondaryMap<LocationKey, D::External>,
434}
435
436impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
437 pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
438 let LocationId::Process(location_key) = p.id() else {
439 panic!("Process ID expected")
440 };
441 self.processes.get(location_key).unwrap()
442 }
443
444 pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
445 let LocationId::Cluster(location_key) = c.id() else {
446 panic!("Cluster ID expected")
447 };
448 self.clusters.get(location_key).unwrap()
449 }
450
451 pub fn get_external<P>(&self, e: &External<P>) -> &D::External {
452 self.externals.get(e.key).unwrap()
453 }
454
455 pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, &str, &D::Process)> {
456 self.location_names
457 .iter()
458 .filter_map(|(location_key, location_name)| {
459 self.processes
460 .get(location_key)
461 .map(|process| (LocationId::Process(location_key), &**location_name, process))
462 })
463 }
464
465 pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, &str, &D::Cluster)> {
466 self.location_names
467 .iter()
468 .filter_map(|(location_key, location_name)| {
469 self.clusters
470 .get(location_key)
471 .map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
472 })
473 }
474
475 #[deprecated(note = "use `connect` instead")]
476 pub async fn connect_bytes<M>(
477 &self,
478 port: ExternalBytesPort<M>,
479 ) -> (
480 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
481 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
482 ) {
483 self.connect(port).await
484 }
485
486 #[deprecated(note = "use `connect` instead")]
487 pub async fn connect_sink_bytes<M>(
488 &self,
489 port: ExternalBytesPort<M>,
490 ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
491 self.connect(port).await.1
492 }
493
494 pub async fn connect_bincode<
495 InT: Serialize + 'static,
496 OutT: DeserializeOwned + 'static,
497 Many,
498 >(
499 &self,
500 port: ExternalBincodeBidi<InT, OutT, Many>,
501 ) -> (
502 Pin<Box<dyn Stream<Item = OutT>>>,
503 Pin<Box<dyn Sink<InT, Error = Error>>>,
504 ) {
505 self.externals
506 .get(port.process_key)
507 .unwrap()
508 .as_bincode_bidi(port.port_id)
509 .await
510 }
511
512 #[deprecated(note = "use `connect` instead")]
513 pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
514 &self,
515 port: ExternalBincodeSink<T, Many>,
516 ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
517 self.connect(port).await
518 }
519
520 #[deprecated(note = "use `connect` instead")]
521 pub async fn connect_source_bytes(
522 &self,
523 port: ExternalBytesPort,
524 ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
525 self.connect(port).await.0
526 }
527
528 #[deprecated(note = "use `connect` instead")]
529 pub async fn connect_source_bincode<
530 T: Serialize + DeserializeOwned + 'static,
531 O: Ordering,
532 R: Retries,
533 >(
534 &self,
535 port: ExternalBincodeStream<T, O, R>,
536 ) -> Pin<Box<dyn Stream<Item = T>>> {
537 self.connect(port).await
538 }
539
540 pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
541 &'b self,
542 port: P,
543 ) -> <P as ConnectableAsync<&'b Self>>::Output {
544 port.connect(self).await
545 }
546}
547
548#[cfg(stageleft_runtime)]
549#[cfg(feature = "deploy")]
550#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
551impl DeployResult<'_, crate::deploy::HydroDeploy> {
552 pub fn raw_port<M>(
554 &self,
555 port: ExternalBytesPort<M>,
556 ) -> hydro_deploy::custom_service::CustomClientPort {
557 self.externals
558 .get(port.process_key)
559 .unwrap()
560 .raw_port(port.port_id)
561 }
562}
563
564pub trait ConnectableAsync<Ctx> {
565 type Output;
566
567 fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
568}
569
570impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
571 type Output = (
572 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
573 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
574 );
575
576 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
577 ctx.externals
578 .get(self.process_key)
579 .unwrap()
580 .as_bytes_bidi(self.port_id)
581 .await
582 }
583}
584
585impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
586 ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
587{
588 type Output = Pin<Box<dyn Stream<Item = T>>>;
589
590 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
591 ctx.externals
592 .get(self.process_key)
593 .unwrap()
594 .as_bincode_source(self.port_id)
595 .await
596 }
597}
598
599impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
600 for ExternalBincodeSink<T, Many>
601{
602 type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
603
604 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
605 ctx.externals
606 .get(self.process_key)
607 .unwrap()
608 .as_bincode_sink(self.port_id)
609 .await
610 }
611}