Skip to main content

hydro_lang/compile/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17    type Meta: Default;
18    type InstantiateEnv;
19
20    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23        + RegisterPort<'a, Self>;
24
25    /// Generates the source and sink expressions when connecting a [`Self::Process`] to another
26    /// [`Self::Process`].
27    ///
28    /// The [`Self::InstantiateEnv`] can be used to record metadata about the created channel. The
29    /// provided `name` is the user-configured channel name from the network IR node.
30    fn o2o_sink_source(
31        env: &mut Self::InstantiateEnv,
32        p1: &Self::Process,
33        p1_port: &<Self::Process as Node>::Port,
34        p2: &Self::Process,
35        p2_port: &<Self::Process as Node>::Port,
36        name: Option<&str>,
37        networking_info: &crate::networking::NetworkingInfo,
38    ) -> (syn::Expr, syn::Expr);
39
40    /// Performs any runtime wiring needed after code generation for a
41    /// [`Self::Process`]-to-[`Self::Process`] channel.
42    ///
43    /// The returned closure is executed once all locations have been instantiated.
44    fn o2o_connect(
45        p1: &Self::Process,
46        p1_port: &<Self::Process as Node>::Port,
47        p2: &Self::Process,
48        p2_port: &<Self::Process as Node>::Port,
49    ) -> Box<dyn FnOnce()>;
50
51    /// Generates the source and sink expressions when connecting a [`Self::Process`] to a
52    /// [`Self::Cluster`] (one-to-many).
53    ///
54    /// The sink expression is used on the sending process and the source expression on each
55    /// receiving cluster member. The [`Self::InstantiateEnv`] can be used to record metadata
56    /// about the created channel. The provided `name` is the user-configured channel name
57    /// from the network IR node.
58    fn o2m_sink_source(
59        env: &mut Self::InstantiateEnv,
60        p1: &Self::Process,
61        p1_port: &<Self::Process as Node>::Port,
62        c2: &Self::Cluster,
63        c2_port: &<Self::Cluster as Node>::Port,
64        name: Option<&str>,
65        networking_info: &crate::networking::NetworkingInfo,
66    ) -> (syn::Expr, syn::Expr);
67
68    /// Performs any runtime wiring needed after code generation for a
69    /// [`Self::Process`]-to-[`Self::Cluster`] channel.
70    ///
71    /// The returned closure is executed once all locations have been instantiated.
72    fn o2m_connect(
73        p1: &Self::Process,
74        p1_port: &<Self::Process as Node>::Port,
75        c2: &Self::Cluster,
76        c2_port: &<Self::Cluster as Node>::Port,
77    ) -> Box<dyn FnOnce()>;
78
79    /// Generates the source and sink expressions when connecting a [`Self::Cluster`] to a
80    /// [`Self::Process`] (many-to-one).
81    ///
82    /// The sink expression is used on each sending cluster member and the source expression
83    /// on the receiving process. The [`Self::InstantiateEnv`] can be used to record metadata
84    /// about the created channel. The provided `name` is the user-configured channel name
85    /// from the network IR node.
86    fn m2o_sink_source(
87        env: &mut Self::InstantiateEnv,
88        c1: &Self::Cluster,
89        c1_port: &<Self::Cluster as Node>::Port,
90        p2: &Self::Process,
91        p2_port: &<Self::Process as Node>::Port,
92        name: Option<&str>,
93        networking_info: &crate::networking::NetworkingInfo,
94    ) -> (syn::Expr, syn::Expr);
95
96    /// Performs any runtime wiring needed after code generation for a
97    /// [`Self::Cluster`]-to-[`Self::Process`] channel.
98    ///
99    /// The returned closure is executed once all locations have been instantiated.
100    fn m2o_connect(
101        c1: &Self::Cluster,
102        c1_port: &<Self::Cluster as Node>::Port,
103        p2: &Self::Process,
104        p2_port: &<Self::Process as Node>::Port,
105    ) -> Box<dyn FnOnce()>;
106
107    /// Generates the source and sink expressions when connecting a [`Self::Cluster`] to another
108    /// [`Self::Cluster`] (many-to-many).
109    ///
110    /// The sink expression is used on each sending cluster member and the source expression
111    /// on each receiving cluster member. The [`Self::InstantiateEnv`] can be used to record
112    /// metadata about the created channel. The provided `name` is the user-configured channel
113    /// name from the network IR node.
114    fn m2m_sink_source(
115        env: &mut Self::InstantiateEnv,
116        c1: &Self::Cluster,
117        c1_port: &<Self::Cluster as Node>::Port,
118        c2: &Self::Cluster,
119        c2_port: &<Self::Cluster as Node>::Port,
120        name: Option<&str>,
121        networking_info: &crate::networking::NetworkingInfo,
122    ) -> (syn::Expr, syn::Expr);
123
124    /// Performs any runtime wiring needed after code generation for a
125    /// [`Self::Cluster`]-to-[`Self::Cluster`] channel.
126    ///
127    /// The returned closure is executed once all locations have been instantiated.
128    fn m2m_connect(
129        c1: &Self::Cluster,
130        c1_port: &<Self::Cluster as Node>::Port,
131        c2: &Self::Cluster,
132        c2_port: &<Self::Cluster as Node>::Port,
133    ) -> Box<dyn FnOnce()>;
134
135    fn e2o_many_source(
136        extra_stmts: &mut Vec<syn::Stmt>,
137        p2: &Self::Process,
138        p2_port: &<Self::Process as Node>::Port,
139        codec_type: &syn::Type,
140        shared_handle: String,
141    ) -> syn::Expr;
142    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
143
144    fn e2o_source(
145        extra_stmts: &mut Vec<syn::Stmt>,
146        p1: &Self::External,
147        p1_port: &<Self::External as Node>::Port,
148        p2: &Self::Process,
149        p2_port: &<Self::Process as Node>::Port,
150        codec_type: &syn::Type,
151        shared_handle: String,
152    ) -> syn::Expr;
153    fn e2o_connect(
154        p1: &Self::External,
155        p1_port: &<Self::External as Node>::Port,
156        p2: &Self::Process,
157        p2_port: &<Self::Process as Node>::Port,
158        many: bool,
159        server_hint: NetworkHint,
160    ) -> Box<dyn FnOnce()>;
161
162    fn o2e_sink(
163        p1: &Self::Process,
164        p1_port: &<Self::Process as Node>::Port,
165        p2: &Self::External,
166        p2_port: &<Self::External as Node>::Port,
167        shared_handle: String,
168    ) -> syn::Expr;
169
170    fn cluster_ids(
171        of_cluster: LocationKey,
172    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
173
174    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
175
176    fn cluster_membership_stream(
177        env: &mut Self::InstantiateEnv,
178        at_location: &LocationId,
179        location_id: &LocationId,
180    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
181
182    /// Registers an embedded stream input for the given ident and element type.
183    ///
184    /// Only meaningful for the embedded deployment backend. The default
185    /// implementation panics.
186    fn register_embedded_stream_input(
187        _env: &mut Self::InstantiateEnv,
188        _location_key: LocationKey,
189        _ident: &syn::Ident,
190        _element_type: &syn::Type,
191    ) {
192        panic!("register_embedded_stream_input is only supported by EmbeddedDeploy");
193    }
194
195    /// Registers an embedded singleton input for the given ident and element type.
196    ///
197    /// Only meaningful for the embedded deployment backend. The default
198    /// implementation panics.
199    fn register_embedded_singleton_input(
200        _env: &mut Self::InstantiateEnv,
201        _location_key: LocationKey,
202        _ident: &syn::Ident,
203        _element_type: &syn::Type,
204    ) {
205        panic!("register_embedded_singleton_input is only supported by EmbeddedDeploy");
206    }
207
208    /// Registers an embedded output for the given ident and element type.
209    ///
210    /// Only meaningful for the embedded deployment backend. The default
211    /// implementation panics.
212    fn register_embedded_output(
213        _env: &mut Self::InstantiateEnv,
214        _location_key: LocationKey,
215        _ident: &syn::Ident,
216        _element_type: &syn::Type,
217    ) {
218        panic!("register_embedded_output is only supported by EmbeddedDeploy");
219    }
220}
221
222pub trait ProcessSpec<'a, D>
223where
224    D: Deploy<'a> + ?Sized,
225{
226    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
227}
228
229pub trait IntoProcessSpec<'a, D>
230where
231    D: Deploy<'a> + ?Sized,
232{
233    type ProcessSpec: ProcessSpec<'a, D>;
234    fn into_process_spec(self) -> Self::ProcessSpec;
235}
236
237impl<'a, D, T> IntoProcessSpec<'a, D> for T
238where
239    D: Deploy<'a> + ?Sized,
240    T: ProcessSpec<'a, D>,
241{
242    type ProcessSpec = T;
243    fn into_process_spec(self) -> Self::ProcessSpec {
244        self
245    }
246}
247
248pub trait ClusterSpec<'a, D>
249where
250    D: Deploy<'a> + ?Sized,
251{
252    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
253}
254
255pub trait ExternalSpec<'a, D>
256where
257    D: Deploy<'a> + ?Sized,
258{
259    fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
260}
261
262pub trait Node {
263    /// A logical communication endpoint for this node.
264    ///
265    /// Implementors are free to choose the concrete representation (for example,
266    /// a handle or identifier), but it must be `Clone` so that a single logical
267    /// port can be duplicated and passed to multiple consumers. New ports are
268    /// allocated via [`Self::next_port`].
269    type Port: Clone;
270    type Meta: Default;
271    type InstantiateEnv;
272
273    /// Allocates and returns a new port.
274    fn next_port(&self) -> Self::Port;
275
276    fn update_meta(&self, meta: &Self::Meta);
277
278    fn instantiate(
279        &self,
280        env: &mut Self::InstantiateEnv,
281        meta: &mut Self::Meta,
282        graph: DfirGraph,
283        extra_stmts: &[syn::Stmt],
284        sidecars: &[syn::Expr],
285    );
286}
287
288pub type DynSourceSink<Out, In, InErr> = (
289    Pin<Box<dyn Stream<Item = Out>>>,
290    Pin<Box<dyn Sink<In, Error = InErr>>>,
291);
292
293pub trait RegisterPort<'a, D>: Node + Clone
294where
295    D: Deploy<'a> + ?Sized,
296{
297    fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
298
299    fn as_bytes_bidi(
300        &self,
301        external_port_id: ExternalPortId,
302    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
303
304    fn as_bincode_bidi<InT, OutT>(
305        &self,
306        external_port_id: ExternalPortId,
307    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
308    where
309        InT: Serialize + 'static,
310        OutT: DeserializeOwned + 'static;
311
312    fn as_bincode_sink<T>(
313        &self,
314        external_port_id: ExternalPortId,
315    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
316    where
317        T: Serialize + 'static;
318
319    fn as_bincode_source<T>(
320        &self,
321        external_port_id: ExternalPortId,
322    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
323    where
324        T: DeserializeOwned + 'static;
325}