hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::DynLocation;
19use crate::location::external_process::ExternalBincodeStream;
20use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21use crate::nondet::NonDet;
22#[cfg(feature = "sim")]
23use crate::sim::SimReceiver;
24use crate::staging_util::get_this_crate;
25
26// same as the one in `hydro_std`, but internal use only
27fn track_membership<'a, C, L: Location<'a> + NoTick>(
28    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
29) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
30    membership.fold(
31        q!(|| false),
32        q!(|present, event| {
33            match event {
34                MembershipEvent::Joined => *present = true,
35                MembershipEvent::Left => *present = false,
36            }
37        }),
38    )
39}
40
41fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
42    let root = get_this_crate();
43
44    if is_demux {
45        parse_quote! {
46            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
47                |(id, data)| {
48                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
49                }
50            )
51        }
52    } else {
53        parse_quote! {
54            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
55                |data| {
56                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
57                }
58            )
59        }
60    }
61}
62
63pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
64    serialize_bincode_with_type(is_demux, &quote_type::<T>())
65}
66
67fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
68    let root = get_this_crate();
69
70    if let Some(c_type) = tagged {
71        parse_quote! {
72            |res| {
73                let (id, b) = res.unwrap();
74                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
75            }
76        }
77    } else {
78        parse_quote! {
79            |res| {
80                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
81            }
82        }
83    }
84}
85
86pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
87    deserialize_bincode_with_type(tagged, &quote_type::<T>())
88}
89
90impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
91    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
92    /// using [`bincode`] to serialize/deserialize messages.
93    ///
94    /// The returned stream captures the elements received at the destination, where values will
95    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
96    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
97    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
98    /// dropped no further messages will be sent.
99    ///
100    /// # Example
101    /// ```rust
102    /// # #[cfg(feature = "deploy")] {
103    /// # use hydro_lang::prelude::*;
104    /// # use futures::StreamExt;
105    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
106    /// let p1 = flow.process::<()>();
107    /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
108    /// let p2 = flow.process::<()>();
109    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
110    /// // 1, 2, 3
111    /// # on_p2.send_bincode(&p_out)
112    /// # }, |mut stream| async move {
113    /// # for w in 1..=3 {
114    /// #     assert_eq!(stream.next().await, Some(w));
115    /// # }
116    /// # }));
117    /// # }
118    /// ```
119    pub fn send_bincode<L2>(
120        self,
121        other: &Process<'a, L2>,
122    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
123    where
124        T: Serialize + DeserializeOwned,
125    {
126        let serialize_pipeline = Some(serialize_bincode::<T>(false));
127
128        let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
129
130        Stream::new(
131            other.clone(),
132            HydroNode::Network {
133                serialize_fn: serialize_pipeline.map(|e| e.into()),
134                instantiate_fn: DebugInstantiate::Building,
135                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
136                input: Box::new(self.ir_node.into_inner()),
137                metadata: other.new_node_metadata(
138                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
139                ),
140            },
141        )
142    }
143
144    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
145    /// using [`bincode`] to serialize/deserialize messages.
146    ///
147    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
148    /// membership information. This is a common pattern in distributed systems for broadcasting data to
149    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
150    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
151    /// each element to all cluster members.
152    ///
153    /// # Non-Determinism
154    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
155    /// to the current cluster members _at that point in time_. Depending on when we are notified of
156    /// membership changes, we will broadcast each element to different members.
157    ///
158    /// # Example
159    /// ```rust
160    /// # #[cfg(feature = "deploy")] {
161    /// # use hydro_lang::prelude::*;
162    /// # use futures::StreamExt;
163    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
164    /// let p1 = flow.process::<()>();
165    /// let workers: Cluster<()> = flow.cluster::<()>();
166    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
167    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
168    /// # on_worker.send_bincode(&p2).entries()
169    /// // if there are 4 members in the cluster, each receives one element
170    /// // - MemberId::<()>(0): [123]
171    /// // - MemberId::<()>(1): [123]
172    /// // - MemberId::<()>(2): [123]
173    /// // - MemberId::<()>(3): [123]
174    /// # }, |mut stream| async move {
175    /// # let mut results = Vec::new();
176    /// # for w in 0..4 {
177    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
178    /// # }
179    /// # results.sort();
180    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
181    /// # }));
182    /// # }
183    /// ```
184    pub fn broadcast_bincode<L2: 'a>(
185        self,
186        other: &Cluster<'a, L2>,
187        nondet_membership: NonDet,
188    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
189    where
190        T: Clone + Serialize + DeserializeOwned,
191    {
192        let ids = track_membership(self.location.source_cluster_members(other));
193        sliced! {
194            let members_snapshot = use(ids, nondet_membership);
195            let elements = use(self, nondet_membership);
196
197            let current_members = members_snapshot.filter(q!(|b| *b));
198            elements.repeat_with_keys(current_members)
199        }
200        .demux_bincode(other)
201    }
202
203    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
204    /// serialization. The external process can receive these elements by establishing a TCP
205    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
206    ///
207    /// # Example
208    /// ```rust
209    /// # #[cfg(feature = "deploy")] {
210    /// # use hydro_lang::prelude::*;
211    /// # use futures::StreamExt;
212    /// # tokio_test::block_on(async move {
213    /// let flow = FlowBuilder::new();
214    /// let process = flow.process::<()>();
215    /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
216    /// let external = flow.external::<()>();
217    /// let external_handle = numbers.send_bincode_external(&external);
218    ///
219    /// let mut deployment = hydro_deploy::Deployment::new();
220    /// let nodes = flow
221    ///     .with_process(&process, deployment.Localhost())
222    ///     .with_external(&external, deployment.Localhost())
223    ///     .deploy(&mut deployment);
224    ///
225    /// deployment.deploy().await.unwrap();
226    /// // establish the TCP connection
227    /// let mut external_recv_stream = nodes.connect(external_handle).await;
228    /// deployment.start().await.unwrap();
229    ///
230    /// for w in 1..=3 {
231    ///     assert_eq!(external_recv_stream.next().await, Some(w));
232    /// }
233    /// # });
234    /// # }
235    /// ```
236    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
237    where
238        T: Serialize + DeserializeOwned,
239    {
240        let serialize_pipeline = Some(serialize_bincode::<T>(false));
241
242        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
243
244        let external_key = flow_state_borrow.next_external_out;
245        flow_state_borrow.next_external_out += 1;
246
247        flow_state_borrow.push_root(HydroRoot::SendExternal {
248            to_external_id: other.id,
249            to_key: external_key,
250            to_many: false,
251            unpaired: true,
252            serialize_fn: serialize_pipeline.map(|e| e.into()),
253            instantiate_fn: DebugInstantiate::Building,
254            input: Box::new(self.ir_node.into_inner()),
255            op_metadata: HydroIrOpMetadata::new(),
256        });
257
258        ExternalBincodeStream {
259            process_id: other.id,
260            port_id: external_key,
261            _phantom: PhantomData,
262        }
263    }
264
265    #[cfg(feature = "sim")]
266    /// Sets up a simulation output port for this stream, allowing test code to receive elements
267    /// sent to this stream during simulation.
268    pub fn sim_output(self) -> SimReceiver<T, O, R>
269    where
270        T: Serialize + DeserializeOwned,
271    {
272        let external_location: External<'a, ()> = External {
273            id: 0,
274            flow_state: self.location.flow_state().clone(),
275            _phantom: PhantomData,
276        };
277
278        let external = self.send_bincode_external(&external_location);
279
280        SimReceiver(external.port_id, PhantomData)
281    }
282}
283
284impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
285    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
286{
287    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
288    /// using [`bincode`] to serialize/deserialize messages.
289    ///
290    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
291    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
292    /// this API allows precise targeting of specific cluster members rather than broadcasting to
293    /// all members.
294    ///
295    /// # Example
296    /// ```rust
297    /// # #[cfg(feature = "deploy")] {
298    /// # use hydro_lang::prelude::*;
299    /// # use futures::StreamExt;
300    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
301    /// let p1 = flow.process::<()>();
302    /// let workers: Cluster<()> = flow.cluster::<()>();
303    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
304    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
305    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
306    ///     .demux_bincode(&workers);
307    /// # on_worker.send_bincode(&p2).entries()
308    /// // if there are 4 members in the cluster, each receives one element
309    /// // - MemberId::<()>(0): [0]
310    /// // - MemberId::<()>(1): [1]
311    /// // - MemberId::<()>(2): [2]
312    /// // - MemberId::<()>(3): [3]
313    /// # }, |mut stream| async move {
314    /// # let mut results = Vec::new();
315    /// # for w in 0..4 {
316    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
317    /// # }
318    /// # results.sort();
319    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
320    /// # }));
321    /// # }
322    /// ```
323    pub fn demux_bincode(
324        self,
325        other: &Cluster<'a, L2>,
326    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
327    where
328        T: Serialize + DeserializeOwned,
329    {
330        self.into_keyed().demux_bincode(other)
331    }
332}
333
334impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
335    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
336    /// [`bincode`] to serialize/deserialize messages.
337    ///
338    /// This provides load balancing by evenly distributing work across cluster members. The
339    /// distribution is deterministic based on element order - the first element goes to member 0,
340    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
341    ///
342    /// # Non-Determinism
343    /// The set of cluster members may asynchronously change over time. Each element is distributed
344    /// based on the current cluster membership _at that point in time_. Depending on when cluster
345    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
346    /// membership is stable, the order of members in the round-robin pattern may change across runs.
347    ///
348    /// # Ordering Requirements
349    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
350    /// order of messages and retries affects the round-robin pattern.
351    ///
352    /// # Example
353    /// ```rust
354    /// # #[cfg(feature = "deploy")] {
355    /// # use hydro_lang::prelude::*;
356    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
357    /// # use futures::StreamExt;
358    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
359    /// let p1 = flow.process::<()>();
360    /// let workers: Cluster<()> = flow.cluster::<()>();
361    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
362    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
363    /// on_worker.send_bincode(&p2)
364    /// # .first().values() // we use first to assert that each member gets one element
365    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
366    /// // - MemberId::<()>(?): [1]
367    /// // - MemberId::<()>(?): [2]
368    /// // - MemberId::<()>(?): [3]
369    /// // - MemberId::<()>(?): [4]
370    /// # }, |mut stream| async move {
371    /// # let mut results = Vec::new();
372    /// # for w in 0..4 {
373    /// #     results.push(stream.next().await.unwrap());
374    /// # }
375    /// # results.sort();
376    /// # assert_eq!(results, vec![1, 2, 3, 4]);
377    /// # }));
378    /// # }
379    /// ```
380    pub fn round_robin_bincode<L2: 'a>(
381        self,
382        other: &Cluster<'a, L2>,
383        nondet_membership: NonDet,
384    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
385    where
386        T: Serialize + DeserializeOwned,
387    {
388        let ids = track_membership(self.location.source_cluster_members(other));
389        sliced! {
390            let members_snapshot = use(ids, nondet_membership);
391            let elements = use(self.enumerate(), nondet_membership);
392
393            let current_members = members_snapshot
394                .filter(q!(|b| *b))
395                .keys()
396                .assume_ordering(nondet_membership)
397                .collect_vec();
398
399            elements
400                .cross_singleton(current_members)
401                .map(q!(|(data, members)| (
402                    members[data.0 % members.len()].clone(),
403                    data.1
404                )))
405        }
406        .demux_bincode(other)
407    }
408}
409
410impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
411    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
412    /// [`bincode`] to serialize/deserialize messages.
413    ///
414    /// This provides load balancing by evenly distributing work across cluster members. The
415    /// distribution is deterministic based on element order - the first element goes to member 0,
416    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
417    ///
418    /// # Non-Determinism
419    /// The set of cluster members may asynchronously change over time. Each element is distributed
420    /// based on the current cluster membership _at that point in time_. Depending on when cluster
421    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
422    /// membership is stable, the order of members in the round-robin pattern may change across runs.
423    ///
424    /// # Ordering Requirements
425    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
426    /// order of messages and retries affects the round-robin pattern.
427    ///
428    /// # Example
429    /// ```rust
430    /// # #[cfg(feature = "deploy")] {
431    /// # use hydro_lang::prelude::*;
432    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
433    /// # use hydro_lang::location::MemberId;
434    /// # use futures::StreamExt;
435    /// # std::thread::spawn(|| {
436    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
437    /// let p1 = flow.process::<()>();
438    /// let workers1: Cluster<()> = flow.cluster::<()>();
439    /// let workers2: Cluster<()> = flow.cluster::<()>();
440    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
441    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
442    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
443    /// on_worker2.send_bincode(&p2)
444    /// # .entries()
445    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
446    /// # }, |mut stream| async move {
447    /// # let mut results = Vec::new();
448    /// # let mut locations = std::collections::HashSet::new();
449    /// # for w in 0..=16 {
450    /// #     let (location, v) = stream.next().await.unwrap();
451    /// #     locations.insert(location);
452    /// #     results.push(v);
453    /// # }
454    /// # results.sort();
455    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
456    /// # assert_eq!(locations.len(), 16);
457    /// # }));
458    /// # }).join().unwrap();
459    /// # }
460    /// ```
461    pub fn round_robin_bincode<L2: 'a>(
462        self,
463        other: &Cluster<'a, L2>,
464        nondet_membership: NonDet,
465    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
466    where
467        T: Serialize + DeserializeOwned,
468    {
469        let ids = track_membership(self.location.source_cluster_members(other));
470        sliced! {
471            let members_snapshot = use(ids, nondet_membership);
472            let elements = use(self.enumerate(), nondet_membership);
473
474            let current_members = members_snapshot
475                .filter(q!(|b| *b))
476                .keys()
477                .assume_ordering(nondet_membership)
478                .collect_vec();
479
480            elements
481                .cross_singleton(current_members)
482                .map(q!(|(data, members)| (
483                    members[data.0 % members.len()].clone(),
484                    data.1
485                )))
486        }
487        .demux_bincode(other)
488    }
489}
490
491impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
492    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
493    /// using [`bincode`] to serialize/deserialize messages.
494    ///
495    /// Each cluster member sends its local stream elements, and they are collected at the destination
496    /// as a [`KeyedStream`] where keys identify the source cluster member.
497    ///
498    /// # Example
499    /// ```rust
500    /// # #[cfg(feature = "deploy")] {
501    /// # use hydro_lang::prelude::*;
502    /// # use futures::StreamExt;
503    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
504    /// let workers: Cluster<()> = flow.cluster::<()>();
505    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
506    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
507    /// # all_received.entries()
508    /// # }, |mut stream| async move {
509    /// // if there are 4 members in the cluster, we should receive 4 elements
510    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
511    /// # let mut results = Vec::new();
512    /// # for w in 0..4 {
513    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
514    /// # }
515    /// # results.sort();
516    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
517    /// # }));
518    /// # }
519    /// ```
520    ///
521    /// If you don't need to know the source for each element, you can use `.values()`
522    /// to get just the data:
523    /// ```rust
524    /// # #[cfg(feature = "deploy")] {
525    /// # use hydro_lang::prelude::*;
526    /// # use hydro_lang::live_collections::stream::NoOrder;
527    /// # use futures::StreamExt;
528    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
529    /// # let workers: Cluster<()> = flow.cluster::<()>();
530    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
531    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
532    /// # values
533    /// # }, |mut stream| async move {
534    /// # let mut results = Vec::new();
535    /// # for w in 0..4 {
536    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
537    /// # }
538    /// # results.sort();
539    /// // if there are 4 members in the cluster, we should receive 4 elements
540    /// // 1, 1, 1, 1
541    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
542    /// # }));
543    /// # }
544    /// ```
545    pub fn send_bincode<L2>(
546        self,
547        other: &Process<'a, L2>,
548    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
549    where
550        T: Serialize + DeserializeOwned,
551    {
552        let serialize_pipeline = Some(serialize_bincode::<T>(false));
553
554        let deserialize_pipeline = Some(deserialize_bincode::<T>(Some(&quote_type::<L>())));
555
556        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
557            other.clone(),
558            HydroNode::Network {
559                serialize_fn: serialize_pipeline.map(|e| e.into()),
560                instantiate_fn: DebugInstantiate::Building,
561                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
562                input: Box::new(self.ir_node.into_inner()),
563                metadata: other.new_node_metadata(Stream::<
564                    (MemberId<L>, T),
565                    Process<'a, L2>,
566                    Unbounded,
567                    O,
568                    R,
569                >::collection_kind()),
570            },
571        );
572
573        raw_stream.into_keyed()
574    }
575
576    /// Broadcasts elements of this stream at each source member to all members of a destination
577    /// cluster, using [`bincode`] to serialize/deserialize messages.
578    ///
579    /// Each source member sends each of its stream elements to **every** member of the cluster
580    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
581    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
582    /// **only data elements** and sends each element to all cluster members.
583    ///
584    /// # Non-Determinism
585    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
586    /// to the current cluster members known _at that point in time_ at the source member. Depending
587    /// on when each source member is notified of membership changes, it will broadcast each element
588    /// to different members.
589    ///
590    /// # Example
591    /// ```rust
592    /// # #[cfg(feature = "deploy")] {
593    /// # use hydro_lang::prelude::*;
594    /// # use hydro_lang::location::MemberId;
595    /// # use futures::StreamExt;
596    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
597    /// # type Source = ();
598    /// # type Destination = ();
599    /// let source: Cluster<Source> = flow.cluster::<Source>();
600    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
601    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
602    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
603    /// # on_destination.entries().send_bincode(&p2).entries()
604    /// // if there are 4 members in the desination, each receives one element from each source member
605    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
606    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
607    /// // - ...
608    /// # }, |mut stream| async move {
609    /// # let mut results = Vec::new();
610    /// # for w in 0..16 {
611    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
612    /// # }
613    /// # results.sort();
614    /// # assert_eq!(results, vec![
615    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
616    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
617    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
618    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
619    /// # ]);
620    /// # }));
621    /// # }
622    /// ```
623    pub fn broadcast_bincode<L2: 'a>(
624        self,
625        other: &Cluster<'a, L2>,
626        nondet_membership: NonDet,
627    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
628    where
629        T: Clone + Serialize + DeserializeOwned,
630    {
631        let ids = track_membership(self.location.source_cluster_members(other));
632        sliced! {
633            let members_snapshot = use(ids, nondet_membership);
634            let elements = use(self, nondet_membership);
635
636            let current_members = members_snapshot.filter(q!(|b| *b));
637            elements.repeat_with_keys(current_members)
638        }
639        .demux_bincode(other)
640    }
641}
642
643impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
644    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
645{
646    /// Sends elements of this stream at each source member to specific members of a destination
647    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
648    ///
649    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
650    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
651    /// this API allows precise targeting of specific cluster members rather than broadcasting to
652    /// all members.
653    ///
654    /// Each cluster member sends its local stream elements, and they are collected at each
655    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
656    ///
657    /// # Example
658    /// ```rust
659    /// # #[cfg(feature = "deploy")] {
660    /// # use hydro_lang::prelude::*;
661    /// # use futures::StreamExt;
662    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
663    /// # type Source = ();
664    /// # type Destination = ();
665    /// let source: Cluster<Source> = flow.cluster::<Source>();
666    /// let to_send: Stream<_, Cluster<_>, _> = source
667    ///     .source_iter(q!(vec![0, 1, 2, 3]))
668    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
669    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
670    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
671    /// # all_received.entries().send_bincode(&p2).entries()
672    /// # }, |mut stream| async move {
673    /// // if there are 4 members in the destination cluster, each receives one message from each source member
674    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
675    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
676    /// // - ...
677    /// # let mut results = Vec::new();
678    /// # for w in 0..16 {
679    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
680    /// # }
681    /// # results.sort();
682    /// # assert_eq!(results, vec![
683    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
684    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
685    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
686    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
687    /// # ]);
688    /// # }));
689    /// # }
690    /// ```
691    pub fn demux_bincode(
692        self,
693        other: &Cluster<'a, L2>,
694    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
695    where
696        T: Serialize + DeserializeOwned,
697    {
698        self.into_keyed().demux_bincode(other)
699    }
700}
701
702#[cfg(test)]
703mod tests {
704    #[cfg(feature = "sim")]
705    use stageleft::q;
706
707    #[cfg(feature = "sim")]
708    use crate::location::{Location, MemberId};
709    #[cfg(feature = "sim")]
710    use crate::nondet::nondet;
711    #[cfg(feature = "sim")]
712    use crate::prelude::FlowBuilder;
713
714    #[cfg(feature = "sim")]
715    #[test]
716    fn sim_send_bincode_o2o() {
717        let flow = FlowBuilder::new();
718        let node = flow.process::<()>();
719        let node2 = flow.process::<()>();
720
721        let (in_send, input) = node.sim_input();
722
723        let out_recv = input
724            .send_bincode(&node2)
725            .batch(&node2.tick(), nondet!(/** test */))
726            .count()
727            .all_ticks()
728            .sim_output();
729
730        let instances = flow.sim().exhaustive(async || {
731            in_send.send(());
732            in_send.send(());
733            in_send.send(());
734
735            let received = out_recv.collect::<Vec<_>>().await;
736            assert!(received.into_iter().sum::<usize>() == 3);
737        });
738
739        assert_eq!(instances, 4); // 2^{3 - 1}
740    }
741
742    #[cfg(feature = "sim")]
743    #[test]
744    fn sim_send_bincode_m2o() {
745        let flow = FlowBuilder::new();
746        let cluster = flow.cluster::<()>();
747        let node = flow.process::<()>();
748
749        let input = cluster.source_iter(q!(vec![1]));
750
751        let out_recv = input
752            .send_bincode(&node)
753            .entries()
754            .batch(&node.tick(), nondet!(/** test */))
755            .all_ticks()
756            .sim_output();
757
758        let instances = flow
759            .sim()
760            .with_cluster_size(&cluster, 4)
761            .exhaustive(async || {
762                out_recv
763                    .assert_yields_only_unordered(vec![
764                        (MemberId::from_raw_id(0), 1),
765                        (MemberId::from_raw_id(1), 1),
766                        (MemberId::from_raw_id(2), 1),
767                        (MemberId::from_raw_id(3), 1),
768                    ])
769                    .await
770            });
771
772        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
773    }
774
775    #[cfg(feature = "sim")]
776    #[test]
777    fn sim_send_bincode_multiple_m2o() {
778        let flow = FlowBuilder::new();
779        let cluster1 = flow.cluster::<()>();
780        let cluster2 = flow.cluster::<()>();
781        let node = flow.process::<()>();
782
783        let out_recv_1 = cluster1
784            .source_iter(q!(vec![1]))
785            .send_bincode(&node)
786            .entries()
787            .sim_output();
788
789        let out_recv_2 = cluster2
790            .source_iter(q!(vec![2]))
791            .send_bincode(&node)
792            .entries()
793            .sim_output();
794
795        let instances = flow
796            .sim()
797            .with_cluster_size(&cluster1, 3)
798            .with_cluster_size(&cluster2, 4)
799            .exhaustive(async || {
800                out_recv_1
801                    .assert_yields_only_unordered(vec![
802                        (MemberId::from_raw_id(0), 1),
803                        (MemberId::from_raw_id(1), 1),
804                        (MemberId::from_raw_id(2), 1),
805                    ])
806                    .await;
807
808                out_recv_2
809                    .assert_yields_only_unordered(vec![
810                        (MemberId::from_raw_id(0), 2),
811                        (MemberId::from_raw_id(1), 2),
812                        (MemberId::from_raw_id(2), 2),
813                        (MemberId::from_raw_id(3), 2),
814                    ])
815                    .await;
816            });
817
818        assert_eq!(instances, 1);
819    }
820
821    #[cfg(feature = "sim")]
822    #[test]
823    fn sim_send_bincode_o2m() {
824        let flow = FlowBuilder::new();
825        let cluster = flow.cluster::<()>();
826        let node = flow.process::<()>();
827
828        let input = node.source_iter(q!(vec![
829            (MemberId::from_raw_id(0), 123),
830            (MemberId::from_raw_id(1), 456),
831        ]));
832
833        let out_recv = input
834            .demux_bincode(&cluster)
835            .map(q!(|x| x + 1))
836            .send_bincode(&node)
837            .entries()
838            .sim_output();
839
840        flow.sim()
841            .with_cluster_size(&cluster, 4)
842            .exhaustive(async || {
843                out_recv
844                    .assert_yields_only_unordered(vec![
845                        (MemberId::from_raw_id(0), 124),
846                        (MemberId::from_raw_id(1), 457),
847                    ])
848                    .await
849            });
850    }
851
852    #[cfg(feature = "sim")]
853    #[test]
854    fn sim_broadcast_bincode_o2m() {
855        let flow = FlowBuilder::new();
856        let cluster = flow.cluster::<()>();
857        let node = flow.process::<()>();
858
859        let input = node.source_iter(q!(vec![123, 456]));
860
861        let out_recv = input
862            .broadcast_bincode(&cluster, nondet!(/** test */))
863            .map(q!(|x| x + 1))
864            .send_bincode(&node)
865            .entries()
866            .sim_output();
867
868        let mut c_1_produced = false;
869        let mut c_2_produced = false;
870
871        flow.sim()
872            .with_cluster_size(&cluster, 2)
873            .exhaustive(async || {
874                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
875
876                // check that order is preserved
877                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
878                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
879                    c_1_produced = true;
880                }
881
882                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
883                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
884                    c_2_produced = true;
885                }
886            });
887
888        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
889    }
890
891    #[cfg(feature = "sim")]
892    #[test]
893    fn sim_send_bincode_m2m() {
894        let flow = FlowBuilder::new();
895        let cluster = flow.cluster::<()>();
896        let node = flow.process::<()>();
897
898        let input = node.source_iter(q!(vec![
899            (MemberId::from_raw_id(0), 123),
900            (MemberId::from_raw_id(1), 456),
901        ]));
902
903        let out_recv = input
904            .demux_bincode(&cluster)
905            .map(q!(|x| x + 1))
906            .flat_map_ordered(q!(|x| vec![
907                (MemberId::from_raw_id(0), x),
908                (MemberId::from_raw_id(1), x),
909            ]))
910            .demux_bincode(&cluster)
911            .entries()
912            .send_bincode(&node)
913            .entries()
914            .sim_output();
915
916        flow.sim()
917            .with_cluster_size(&cluster, 4)
918            .exhaustive(async || {
919                out_recv
920                    .assert_yields_only_unordered(vec![
921                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
922                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
923                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
924                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
925                    ])
926                    .await
927            });
928    }
929}