Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33    membership.fold(
34        q!(|| false),
35        q!(|present, event| {
36            match event {
37                MembershipEvent::Joined => *present = true,
38                MembershipEvent::Left => *present = false,
39            }
40        }),
41    )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45    let root = get_this_crate();
46
47    if is_demux {
48        parse_quote! {
49            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50                |(id, data)| {
51                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52                }
53            )
54        }
55    } else {
56        parse_quote! {
57            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58                |data| {
59                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
60                }
61            )
62        }
63    }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67    serialize_bincode_with_type(is_demux, &quote_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71    let root = get_this_crate();
72    if let Some(c_type) = tagged {
73        parse_quote! {
74            |res| {
75                let (id, b) = res.unwrap();
76                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77            }
78        }
79    } else {
80        parse_quote! {
81            |res| {
82                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83            }
84        }
85    }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89    deserialize_bincode_with_type(tagged, &quote_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93    #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
94    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95    /// using [`bincode`] to serialize/deserialize messages.
96    ///
97    /// The returned stream captures the elements received at the destination, where values will
98    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101    /// dropped no further messages will be sent.
102    ///
103    /// # Example
104    /// ```rust
105    /// # #[cfg(feature = "deploy")] {
106    /// # use hydro_lang::prelude::*;
107    /// # use futures::StreamExt;
108    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109    /// let p1 = flow.process::<()>();
110    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111    /// let p2 = flow.process::<()>();
112    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113    /// // 1, 2, 3
114    /// # on_p2.send_bincode(&p_out)
115    /// # }, |mut stream| async move {
116    /// # for w in 1..=3 {
117    /// #     assert_eq!(stream.next().await, Some(w));
118    /// # }
119    /// # }));
120    /// # }
121    /// ```
122    pub fn send_bincode<L2>(
123        self,
124        other: &Process<'a, L2>,
125    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126    where
127        T: Serialize + DeserializeOwned,
128    {
129        self.send(other, TCP.bincode())
130    }
131
132    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133    /// using the configuration in `via` to set up the message transport.
134    ///
135    /// The returned stream captures the elements received at the destination, where values will
136    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139    /// dropped no further messages will be sent.
140    ///
141    /// # Example
142    /// ```rust
143    /// # #[cfg(feature = "deploy")] {
144    /// # use hydro_lang::prelude::*;
145    /// # use futures::StreamExt;
146    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147    /// let p1 = flow.process::<()>();
148    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149    /// let p2 = flow.process::<()>();
150    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.bincode());
151    /// // 1, 2, 3
152    /// # on_p2.send(&p_out, TCP.bincode())
153    /// # }, |mut stream| async move {
154    /// # for w in 1..=3 {
155    /// #     assert_eq!(stream.next().await, Some(w));
156    /// # }
157    /// # }));
158    /// # }
159    /// ```
160    pub fn send<L2, N: NetworkFor<T>>(
161        self,
162        to: &Process<'a, L2>,
163        via: N,
164    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
165    where
166        T: Serialize + DeserializeOwned,
167    {
168        let serialize_pipeline = Some(N::serialize_thunk(false));
169        let deserialize_pipeline = Some(N::deserialize_thunk(None));
170
171        let name = via.name();
172        if to.multiversioned() && name.is_none() {
173            panic!(
174                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
175            );
176        }
177
178        Stream::new(
179            to.clone(),
180            HydroNode::Network {
181                name: name.map(ToOwned::to_owned),
182                serialize_fn: serialize_pipeline.map(|e| e.into()),
183                instantiate_fn: DebugInstantiate::Building,
184                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
185                input: Box::new(self.ir_node.into_inner()),
186                metadata: to.new_node_metadata(
187                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
188                ),
189            },
190        )
191    }
192
193    #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
194    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
195    /// using [`bincode`] to serialize/deserialize messages.
196    ///
197    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
198    /// membership information. This is a common pattern in distributed systems for broadcasting data to
199    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
200    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
201    /// each element to all cluster members.
202    ///
203    /// # Non-Determinism
204    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
205    /// to the current cluster members _at that point in time_. Depending on when we are notified of
206    /// membership changes, we will broadcast each element to different members.
207    ///
208    /// # Example
209    /// ```rust
210    /// # #[cfg(feature = "deploy")] {
211    /// # use hydro_lang::prelude::*;
212    /// # use futures::StreamExt;
213    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
214    /// let p1 = flow.process::<()>();
215    /// let workers: Cluster<()> = flow.cluster::<()>();
216    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
217    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
218    /// # on_worker.send_bincode(&p2).entries()
219    /// // if there are 4 members in the cluster, each receives one element
220    /// // - MemberId::<()>(0): [123]
221    /// // - MemberId::<()>(1): [123]
222    /// // - MemberId::<()>(2): [123]
223    /// // - MemberId::<()>(3): [123]
224    /// # }, |mut stream| async move {
225    /// # let mut results = Vec::new();
226    /// # for w in 0..4 {
227    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
228    /// # }
229    /// # results.sort();
230    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
231    /// # }));
232    /// # }
233    /// ```
234    pub fn broadcast_bincode<L2: 'a>(
235        self,
236        other: &Cluster<'a, L2>,
237        nondet_membership: NonDet,
238    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
239    where
240        T: Clone + Serialize + DeserializeOwned,
241    {
242        self.broadcast(other, TCP.bincode(), nondet_membership)
243    }
244
245    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
246    /// using the configuration in `via` to set up the message transport.
247    ///
248    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
249    /// membership information. This is a common pattern in distributed systems for broadcasting data to
250    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
251    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
252    /// each element to all cluster members.
253    ///
254    /// # Non-Determinism
255    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
256    /// to the current cluster members _at that point in time_. Depending on when we are notified of
257    /// membership changes, we will broadcast each element to different members.
258    ///
259    /// # Example
260    /// ```rust
261    /// # #[cfg(feature = "deploy")] {
262    /// # use hydro_lang::prelude::*;
263    /// # use futures::StreamExt;
264    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
265    /// let p1 = flow.process::<()>();
266    /// let workers: Cluster<()> = flow.cluster::<()>();
267    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
268    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
269    /// # on_worker.send(&p2, TCP.bincode()).entries()
270    /// // if there are 4 members in the cluster, each receives one element
271    /// // - MemberId::<()>(0): [123]
272    /// // - MemberId::<()>(1): [123]
273    /// // - MemberId::<()>(2): [123]
274    /// // - MemberId::<()>(3): [123]
275    /// # }, |mut stream| async move {
276    /// # let mut results = Vec::new();
277    /// # for w in 0..4 {
278    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
279    /// # }
280    /// # results.sort();
281    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
282    /// # }));
283    /// # }
284    /// ```
285    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
286        self,
287        to: &Cluster<'a, L2>,
288        via: N,
289        nondet_membership: NonDet,
290    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
291    where
292        T: Clone + Serialize + DeserializeOwned,
293    {
294        let ids = track_membership(self.location.source_cluster_members(to));
295        sliced! {
296            let members_snapshot = use(ids, nondet_membership);
297            let elements = use(self, nondet_membership);
298
299            let current_members = members_snapshot.filter(q!(|b| *b));
300            elements.repeat_with_keys(current_members)
301        }
302        .demux(to, via)
303    }
304
305    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
306    /// serialization. The external process can receive these elements by establishing a TCP
307    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
308    ///
309    /// # Example
310    /// ```rust
311    /// # #[cfg(feature = "deploy")] {
312    /// # use hydro_lang::prelude::*;
313    /// # use futures::StreamExt;
314    /// # tokio_test::block_on(async move {
315    /// let mut flow = FlowBuilder::new();
316    /// let process = flow.process::<()>();
317    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
318    /// let external = flow.external::<()>();
319    /// let external_handle = numbers.send_bincode_external(&external);
320    ///
321    /// let mut deployment = hydro_deploy::Deployment::new();
322    /// let nodes = flow
323    ///     .with_process(&process, deployment.Localhost())
324    ///     .with_external(&external, deployment.Localhost())
325    ///     .deploy(&mut deployment);
326    ///
327    /// deployment.deploy().await.unwrap();
328    /// // establish the TCP connection
329    /// let mut external_recv_stream = nodes.connect(external_handle).await;
330    /// deployment.start().await.unwrap();
331    ///
332    /// for w in 1..=3 {
333    ///     assert_eq!(external_recv_stream.next().await, Some(w));
334    /// }
335    /// # });
336    /// # }
337    /// ```
338    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
339    where
340        T: Serialize + DeserializeOwned,
341    {
342        let serialize_pipeline = Some(serialize_bincode::<T>(false));
343
344        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
345
346        let external_port_id = flow_state_borrow.next_external_port.get_and_increment();
347
348        flow_state_borrow.push_root(HydroRoot::SendExternal {
349            to_external_key: other.key,
350            to_port_id: external_port_id,
351            to_many: false,
352            unpaired: true,
353            serialize_fn: serialize_pipeline.map(|e| e.into()),
354            instantiate_fn: DebugInstantiate::Building,
355            input: Box::new(self.ir_node.into_inner()),
356            op_metadata: HydroIrOpMetadata::new(),
357        });
358
359        ExternalBincodeStream {
360            process_key: other.key,
361            port_id: external_port_id,
362            _phantom: PhantomData,
363        }
364    }
365
366    #[cfg(feature = "sim")]
367    /// Sets up a simulation output port for this stream, allowing test code to receive elements
368    /// sent to this stream during simulation.
369    pub fn sim_output(self) -> SimReceiver<T, O, R>
370    where
371        T: Serialize + DeserializeOwned,
372    {
373        let external_location: External<'a, ()> = External {
374            key: LocationKey::FIRST,
375            flow_state: self.location.flow_state().clone(),
376            _phantom: PhantomData,
377        };
378
379        let external = self.send_bincode_external(&external_location);
380
381        SimReceiver(external.port_id, PhantomData)
382    }
383}
384
385impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
386    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
387{
388    #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
389    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
390    /// using [`bincode`] to serialize/deserialize messages.
391    ///
392    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
393    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
394    /// this API allows precise targeting of specific cluster members rather than broadcasting to
395    /// all members.
396    ///
397    /// # Example
398    /// ```rust
399    /// # #[cfg(feature = "deploy")] {
400    /// # use hydro_lang::prelude::*;
401    /// # use futures::StreamExt;
402    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
403    /// let p1 = flow.process::<()>();
404    /// let workers: Cluster<()> = flow.cluster::<()>();
405    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
406    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
407    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
408    ///     .demux_bincode(&workers);
409    /// # on_worker.send_bincode(&p2).entries()
410    /// // if there are 4 members in the cluster, each receives one element
411    /// // - MemberId::<()>(0): [0]
412    /// // - MemberId::<()>(1): [1]
413    /// // - MemberId::<()>(2): [2]
414    /// // - MemberId::<()>(3): [3]
415    /// # }, |mut stream| async move {
416    /// # let mut results = Vec::new();
417    /// # for w in 0..4 {
418    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
419    /// # }
420    /// # results.sort();
421    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
422    /// # }));
423    /// # }
424    /// ```
425    pub fn demux_bincode(
426        self,
427        other: &Cluster<'a, L2>,
428    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
429    where
430        T: Serialize + DeserializeOwned,
431    {
432        self.demux(other, TCP.bincode())
433    }
434
435    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
436    /// using the configuration in `via` to set up the message transport.
437    ///
438    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
439    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
440    /// this API allows precise targeting of specific cluster members rather than broadcasting to
441    /// all members.
442    ///
443    /// # Example
444    /// ```rust
445    /// # #[cfg(feature = "deploy")] {
446    /// # use hydro_lang::prelude::*;
447    /// # use futures::StreamExt;
448    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
449    /// let p1 = flow.process::<()>();
450    /// let workers: Cluster<()> = flow.cluster::<()>();
451    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
452    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
453    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
454    ///     .demux(&workers, TCP.bincode());
455    /// # on_worker.send(&p2, TCP.bincode()).entries()
456    /// // if there are 4 members in the cluster, each receives one element
457    /// // - MemberId::<()>(0): [0]
458    /// // - MemberId::<()>(1): [1]
459    /// // - MemberId::<()>(2): [2]
460    /// // - MemberId::<()>(3): [3]
461    /// # }, |mut stream| async move {
462    /// # let mut results = Vec::new();
463    /// # for w in 0..4 {
464    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
465    /// # }
466    /// # results.sort();
467    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
468    /// # }));
469    /// # }
470    /// ```
471    pub fn demux<N: NetworkFor<T>>(
472        self,
473        to: &Cluster<'a, L2>,
474        via: N,
475    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
476    where
477        T: Serialize + DeserializeOwned,
478    {
479        self.into_keyed().demux(to, via)
480    }
481}
482
483impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
484    #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
485    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
486    /// [`bincode`] to serialize/deserialize messages.
487    ///
488    /// This provides load balancing by evenly distributing work across cluster members. The
489    /// distribution is deterministic based on element order - the first element goes to member 0,
490    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
491    ///
492    /// # Non-Determinism
493    /// The set of cluster members may asynchronously change over time. Each element is distributed
494    /// based on the current cluster membership _at that point in time_. Depending on when cluster
495    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
496    /// membership is stable, the order of members in the round-robin pattern may change across runs.
497    ///
498    /// # Ordering Requirements
499    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
500    /// order of messages and retries affects the round-robin pattern.
501    ///
502    /// # Example
503    /// ```rust
504    /// # #[cfg(feature = "deploy")] {
505    /// # use hydro_lang::prelude::*;
506    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
507    /// # use futures::StreamExt;
508    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
509    /// let p1 = flow.process::<()>();
510    /// let workers: Cluster<()> = flow.cluster::<()>();
511    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
512    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
513    /// on_worker.send_bincode(&p2)
514    /// # .first().values() // we use first to assert that each member gets one element
515    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
516    /// // - MemberId::<()>(?): [1]
517    /// // - MemberId::<()>(?): [2]
518    /// // - MemberId::<()>(?): [3]
519    /// // - MemberId::<()>(?): [4]
520    /// # }, |mut stream| async move {
521    /// # let mut results = Vec::new();
522    /// # for w in 0..4 {
523    /// #     results.push(stream.next().await.unwrap());
524    /// # }
525    /// # results.sort();
526    /// # assert_eq!(results, vec![1, 2, 3, 4]);
527    /// # }));
528    /// # }
529    /// ```
530    pub fn round_robin_bincode<L2: 'a>(
531        self,
532        other: &Cluster<'a, L2>,
533        nondet_membership: NonDet,
534    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
535    where
536        T: Serialize + DeserializeOwned,
537    {
538        self.round_robin(other, TCP.bincode(), nondet_membership)
539    }
540
541    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
542    /// the configuration in `via` to set up the message transport.
543    ///
544    /// This provides load balancing by evenly distributing work across cluster members. The
545    /// distribution is deterministic based on element order - the first element goes to member 0,
546    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
547    ///
548    /// # Non-Determinism
549    /// The set of cluster members may asynchronously change over time. Each element is distributed
550    /// based on the current cluster membership _at that point in time_. Depending on when cluster
551    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
552    /// membership is stable, the order of members in the round-robin pattern may change across runs.
553    ///
554    /// # Ordering Requirements
555    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
556    /// order of messages and retries affects the round-robin pattern.
557    ///
558    /// # Example
559    /// ```rust
560    /// # #[cfg(feature = "deploy")] {
561    /// # use hydro_lang::prelude::*;
562    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
563    /// # use futures::StreamExt;
564    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
565    /// let p1 = flow.process::<()>();
566    /// let workers: Cluster<()> = flow.cluster::<()>();
567    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
568    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
569    /// on_worker.send(&p2, TCP.bincode())
570    /// # .first().values() // we use first to assert that each member gets one element
571    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
572    /// // - MemberId::<()>(?): [1]
573    /// // - MemberId::<()>(?): [2]
574    /// // - MemberId::<()>(?): [3]
575    /// // - MemberId::<()>(?): [4]
576    /// # }, |mut stream| async move {
577    /// # let mut results = Vec::new();
578    /// # for w in 0..4 {
579    /// #     results.push(stream.next().await.unwrap());
580    /// # }
581    /// # results.sort();
582    /// # assert_eq!(results, vec![1, 2, 3, 4]);
583    /// # }));
584    /// # }
585    /// ```
586    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
587        self,
588        to: &Cluster<'a, L2>,
589        via: N,
590        nondet_membership: NonDet,
591    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
592    where
593        T: Serialize + DeserializeOwned,
594    {
595        let ids = track_membership(self.location.source_cluster_members(to));
596        sliced! {
597            let members_snapshot = use(ids, nondet_membership);
598            let elements = use(self.enumerate(), nondet_membership);
599
600            let current_members = members_snapshot
601                .filter(q!(|b| *b))
602                .keys()
603                .assume_ordering(nondet_membership)
604                .collect_vec();
605
606            elements
607                .cross_singleton(current_members)
608                .map(q!(|(data, members)| (
609                    members[data.0 % members.len()].clone(),
610                    data.1
611                )))
612        }
613        .demux(to, via)
614    }
615}
616
617impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
618    #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
619    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
620    /// [`bincode`] to serialize/deserialize messages.
621    ///
622    /// This provides load balancing by evenly distributing work across cluster members. The
623    /// distribution is deterministic based on element order - the first element goes to member 0,
624    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
625    ///
626    /// # Non-Determinism
627    /// The set of cluster members may asynchronously change over time. Each element is distributed
628    /// based on the current cluster membership _at that point in time_. Depending on when cluster
629    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
630    /// membership is stable, the order of members in the round-robin pattern may change across runs.
631    ///
632    /// # Ordering Requirements
633    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
634    /// order of messages and retries affects the round-robin pattern.
635    ///
636    /// # Example
637    /// ```rust
638    /// # #[cfg(feature = "deploy")] {
639    /// # use hydro_lang::prelude::*;
640    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
641    /// # use hydro_lang::location::MemberId;
642    /// # use futures::StreamExt;
643    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
644    /// let p1 = flow.process::<()>();
645    /// let workers1: Cluster<()> = flow.cluster::<()>();
646    /// let workers2: Cluster<()> = flow.cluster::<()>();
647    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
648    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
649    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
650    /// on_worker2.send_bincode(&p2)
651    /// # .entries()
652    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
653    /// # }, |mut stream| async move {
654    /// # let mut results = Vec::new();
655    /// # let mut locations = std::collections::HashSet::new();
656    /// # for w in 0..=16 {
657    /// #     let (location, v) = stream.next().await.unwrap();
658    /// #     locations.insert(location);
659    /// #     results.push(v);
660    /// # }
661    /// # results.sort();
662    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
663    /// # assert_eq!(locations.len(), 16);
664    /// # }));
665    /// # }
666    /// ```
667    pub fn round_robin_bincode<L2: 'a>(
668        self,
669        other: &Cluster<'a, L2>,
670        nondet_membership: NonDet,
671    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
672    where
673        T: Serialize + DeserializeOwned,
674    {
675        self.round_robin(other, TCP.bincode(), nondet_membership)
676    }
677
678    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
679    /// the configuration in `via` to set up the message transport.
680    ///
681    /// This provides load balancing by evenly distributing work across cluster members. The
682    /// distribution is deterministic based on element order - the first element goes to member 0,
683    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
684    ///
685    /// # Non-Determinism
686    /// The set of cluster members may asynchronously change over time. Each element is distributed
687    /// based on the current cluster membership _at that point in time_. Depending on when cluster
688    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
689    /// membership is stable, the order of members in the round-robin pattern may change across runs.
690    ///
691    /// # Ordering Requirements
692    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
693    /// order of messages and retries affects the round-robin pattern.
694    ///
695    /// # Example
696    /// ```rust
697    /// # #[cfg(feature = "deploy")] {
698    /// # use hydro_lang::prelude::*;
699    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
700    /// # use hydro_lang::location::MemberId;
701    /// # use futures::StreamExt;
702    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
703    /// let p1 = flow.process::<()>();
704    /// let workers1: Cluster<()> = flow.cluster::<()>();
705    /// let workers2: Cluster<()> = flow.cluster::<()>();
706    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
707    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.bincode(), nondet!(/** assuming stable membership */));
708    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
709    /// on_worker2.send(&p2, TCP.bincode())
710    /// # .entries()
711    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
712    /// # }, |mut stream| async move {
713    /// # let mut results = Vec::new();
714    /// # let mut locations = std::collections::HashSet::new();
715    /// # for w in 0..=16 {
716    /// #     let (location, v) = stream.next().await.unwrap();
717    /// #     locations.insert(location);
718    /// #     results.push(v);
719    /// # }
720    /// # results.sort();
721    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
722    /// # assert_eq!(locations.len(), 16);
723    /// # }));
724    /// # }
725    /// ```
726    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
727        self,
728        to: &Cluster<'a, L2>,
729        via: N,
730        nondet_membership: NonDet,
731    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
732    where
733        T: Serialize + DeserializeOwned,
734    {
735        let ids = track_membership(self.location.source_cluster_members(to));
736        sliced! {
737            let members_snapshot = use(ids, nondet_membership);
738            let elements = use(self.enumerate(), nondet_membership);
739
740            let current_members = members_snapshot
741                .filter(q!(|b| *b))
742                .keys()
743                .assume_ordering(nondet_membership)
744                .collect_vec();
745
746            elements
747                .cross_singleton(current_members)
748                .map(q!(|(data, members)| (
749                    members[data.0 % members.len()].clone(),
750                    data.1
751                )))
752        }
753        .demux(to, via)
754    }
755}
756
757impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
758    #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
759    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
760    /// using [`bincode`] to serialize/deserialize messages.
761    ///
762    /// Each cluster member sends its local stream elements, and they are collected at the destination
763    /// as a [`KeyedStream`] where keys identify the source cluster member.
764    ///
765    /// # Example
766    /// ```rust
767    /// # #[cfg(feature = "deploy")] {
768    /// # use hydro_lang::prelude::*;
769    /// # use futures::StreamExt;
770    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
771    /// let workers: Cluster<()> = flow.cluster::<()>();
772    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
773    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
774    /// # all_received.entries()
775    /// # }, |mut stream| async move {
776    /// // if there are 4 members in the cluster, we should receive 4 elements
777    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
778    /// # let mut results = Vec::new();
779    /// # for w in 0..4 {
780    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
781    /// # }
782    /// # results.sort();
783    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
784    /// # }));
785    /// # }
786    /// ```
787    ///
788    /// If you don't need to know the source for each element, you can use `.values()`
789    /// to get just the data:
790    /// ```rust
791    /// # #[cfg(feature = "deploy")] {
792    /// # use hydro_lang::prelude::*;
793    /// # use hydro_lang::live_collections::stream::NoOrder;
794    /// # use futures::StreamExt;
795    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
796    /// # let workers: Cluster<()> = flow.cluster::<()>();
797    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
798    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
799    /// # values
800    /// # }, |mut stream| async move {
801    /// # let mut results = Vec::new();
802    /// # for w in 0..4 {
803    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
804    /// # }
805    /// # results.sort();
806    /// // if there are 4 members in the cluster, we should receive 4 elements
807    /// // 1, 1, 1, 1
808    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
809    /// # }));
810    /// # }
811    /// ```
812    pub fn send_bincode<L2>(
813        self,
814        other: &Process<'a, L2>,
815    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
816    where
817        T: Serialize + DeserializeOwned,
818    {
819        self.send(other, TCP.bincode())
820    }
821
822    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
823    /// using the configuration in `via` to set up the message transport.
824    ///
825    /// Each cluster member sends its local stream elements, and they are collected at the destination
826    /// as a [`KeyedStream`] where keys identify the source cluster member.
827    ///
828    /// # Example
829    /// ```rust
830    /// # #[cfg(feature = "deploy")] {
831    /// # use hydro_lang::prelude::*;
832    /// # use futures::StreamExt;
833    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
834    /// let workers: Cluster<()> = flow.cluster::<()>();
835    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
836    /// let all_received = numbers.send(&process, TCP.bincode()); // KeyedStream<MemberId<()>, i32, ...>
837    /// # all_received.entries()
838    /// # }, |mut stream| async move {
839    /// // if there are 4 members in the cluster, we should receive 4 elements
840    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
841    /// # let mut results = Vec::new();
842    /// # for w in 0..4 {
843    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
844    /// # }
845    /// # results.sort();
846    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
847    /// # }));
848    /// # }
849    /// ```
850    ///
851    /// If you don't need to know the source for each element, you can use `.values()`
852    /// to get just the data:
853    /// ```rust
854    /// # #[cfg(feature = "deploy")] {
855    /// # use hydro_lang::prelude::*;
856    /// # use hydro_lang::live_collections::stream::NoOrder;
857    /// # use futures::StreamExt;
858    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
859    /// # let workers: Cluster<()> = flow.cluster::<()>();
860    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
861    /// let values: Stream<i32, _, _, NoOrder> = numbers.send(&process, TCP.bincode()).values();
862    /// # values
863    /// # }, |mut stream| async move {
864    /// # let mut results = Vec::new();
865    /// # for w in 0..4 {
866    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
867    /// # }
868    /// # results.sort();
869    /// // if there are 4 members in the cluster, we should receive 4 elements
870    /// // 1, 1, 1, 1
871    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
872    /// # }));
873    /// # }
874    /// ```
875    pub fn send<L2, N: NetworkFor<T>>(
876        self,
877        to: &Process<'a, L2>,
878        via: N,
879    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
880    where
881        T: Serialize + DeserializeOwned,
882    {
883        let serialize_pipeline = Some(N::serialize_thunk(false));
884
885        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
886
887        let name = via.name();
888        if to.multiversioned() && name.is_none() {
889            panic!(
890                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
891            );
892        }
893
894        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
895            to.clone(),
896            HydroNode::Network {
897                name: name.map(ToOwned::to_owned),
898                serialize_fn: serialize_pipeline.map(|e| e.into()),
899                instantiate_fn: DebugInstantiate::Building,
900                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
901                input: Box::new(self.ir_node.into_inner()),
902                metadata: to.new_node_metadata(Stream::<
903                    (MemberId<L>, T),
904                    Process<'a, L2>,
905                    Unbounded,
906                    O,
907                    R,
908                >::collection_kind()),
909            },
910        );
911
912        raw_stream.into_keyed()
913    }
914
915    #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
916    /// Broadcasts elements of this stream at each source member to all members of a destination
917    /// cluster, using [`bincode`] to serialize/deserialize messages.
918    ///
919    /// Each source member sends each of its stream elements to **every** member of the cluster
920    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
921    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
922    /// **only data elements** and sends each element to all cluster members.
923    ///
924    /// # Non-Determinism
925    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
926    /// to the current cluster members known _at that point in time_ at the source member. Depending
927    /// on when each source member is notified of membership changes, it will broadcast each element
928    /// to different members.
929    ///
930    /// # Example
931    /// ```rust
932    /// # #[cfg(feature = "deploy")] {
933    /// # use hydro_lang::prelude::*;
934    /// # use hydro_lang::location::MemberId;
935    /// # use futures::StreamExt;
936    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
937    /// # type Source = ();
938    /// # type Destination = ();
939    /// let source: Cluster<Source> = flow.cluster::<Source>();
940    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
941    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
942    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
943    /// # on_destination.entries().send_bincode(&p2).entries()
944    /// // if there are 4 members in the desination, each receives one element from each source member
945    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
946    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
947    /// // - ...
948    /// # }, |mut stream| async move {
949    /// # let mut results = Vec::new();
950    /// # for w in 0..16 {
951    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
952    /// # }
953    /// # results.sort();
954    /// # assert_eq!(results, vec![
955    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
956    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
957    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
958    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
959    /// # ]);
960    /// # }));
961    /// # }
962    /// ```
963    pub fn broadcast_bincode<L2: 'a>(
964        self,
965        other: &Cluster<'a, L2>,
966        nondet_membership: NonDet,
967    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
968    where
969        T: Clone + Serialize + DeserializeOwned,
970    {
971        self.broadcast(other, TCP.bincode(), nondet_membership)
972    }
973
974    /// Broadcasts elements of this stream at each source member to all members of a destination
975    /// cluster, using the configuration in `via` to set up the message transport.
976    ///
977    /// Each source member sends each of its stream elements to **every** member of the cluster
978    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
979    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
980    /// **only data elements** and sends each element to all cluster members.
981    ///
982    /// # Non-Determinism
983    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
984    /// to the current cluster members known _at that point in time_ at the source member. Depending
985    /// on when each source member is notified of membership changes, it will broadcast each element
986    /// to different members.
987    ///
988    /// # Example
989    /// ```rust
990    /// # #[cfg(feature = "deploy")] {
991    /// # use hydro_lang::prelude::*;
992    /// # use hydro_lang::location::MemberId;
993    /// # use futures::StreamExt;
994    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
995    /// # type Source = ();
996    /// # type Destination = ();
997    /// let source: Cluster<Source> = flow.cluster::<Source>();
998    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
999    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1000    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.bincode(), nondet!(/** assuming stable membership */));
1001    /// # on_destination.entries().send(&p2, TCP.bincode()).entries()
1002    /// // if there are 4 members in the desination, each receives one element from each source member
1003    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1004    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1005    /// // - ...
1006    /// # }, |mut stream| async move {
1007    /// # let mut results = Vec::new();
1008    /// # for w in 0..16 {
1009    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1010    /// # }
1011    /// # results.sort();
1012    /// # assert_eq!(results, vec![
1013    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1014    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1015    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1016    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1017    /// # ]);
1018    /// # }));
1019    /// # }
1020    /// ```
1021    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1022        self,
1023        to: &Cluster<'a, L2>,
1024        via: N,
1025        nondet_membership: NonDet,
1026    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1027    where
1028        T: Clone + Serialize + DeserializeOwned,
1029    {
1030        let ids = track_membership(self.location.source_cluster_members(to));
1031        sliced! {
1032            let members_snapshot = use(ids, nondet_membership);
1033            let elements = use(self, nondet_membership);
1034
1035            let current_members = members_snapshot.filter(q!(|b| *b));
1036            elements.repeat_with_keys(current_members)
1037        }
1038        .demux(to, via)
1039    }
1040}
1041
1042impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1043    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1044{
1045    #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
1046    /// Sends elements of this stream at each source member to specific members of a destination
1047    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1048    ///
1049    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1050    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1051    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1052    /// all members.
1053    ///
1054    /// Each cluster member sends its local stream elements, and they are collected at each
1055    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1056    ///
1057    /// # Example
1058    /// ```rust
1059    /// # #[cfg(feature = "deploy")] {
1060    /// # use hydro_lang::prelude::*;
1061    /// # use futures::StreamExt;
1062    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1063    /// # type Source = ();
1064    /// # type Destination = ();
1065    /// let source: Cluster<Source> = flow.cluster::<Source>();
1066    /// let to_send: Stream<_, Cluster<_>, _> = source
1067    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1068    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1069    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1070    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1071    /// # all_received.entries().send_bincode(&p2).entries()
1072    /// # }, |mut stream| async move {
1073    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1074    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1075    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1076    /// // - ...
1077    /// # let mut results = Vec::new();
1078    /// # for w in 0..16 {
1079    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1080    /// # }
1081    /// # results.sort();
1082    /// # assert_eq!(results, vec![
1083    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1084    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1085    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1086    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1087    /// # ]);
1088    /// # }));
1089    /// # }
1090    /// ```
1091    pub fn demux_bincode(
1092        self,
1093        other: &Cluster<'a, L2>,
1094    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1095    where
1096        T: Serialize + DeserializeOwned,
1097    {
1098        self.demux(other, TCP.bincode())
1099    }
1100
1101    /// Sends elements of this stream at each source member to specific members of a destination
1102    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1103    /// message transport.
1104    ///
1105    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1106    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1107    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1108    /// all members.
1109    ///
1110    /// Each cluster member sends its local stream elements, and they are collected at each
1111    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1112    ///
1113    /// # Example
1114    /// ```rust
1115    /// # #[cfg(feature = "deploy")] {
1116    /// # use hydro_lang::prelude::*;
1117    /// # use futures::StreamExt;
1118    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1119    /// # type Source = ();
1120    /// # type Destination = ();
1121    /// let source: Cluster<Source> = flow.cluster::<Source>();
1122    /// let to_send: Stream<_, Cluster<_>, _> = source
1123    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1124    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1125    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1126    /// let all_received = to_send.demux(&destination, TCP.bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1127    /// # all_received.entries().send(&p2, TCP.bincode()).entries()
1128    /// # }, |mut stream| async move {
1129    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1130    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1131    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1132    /// // - ...
1133    /// # let mut results = Vec::new();
1134    /// # for w in 0..16 {
1135    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1136    /// # }
1137    /// # results.sort();
1138    /// # assert_eq!(results, vec![
1139    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1140    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1141    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1142    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1143    /// # ]);
1144    /// # }));
1145    /// # }
1146    /// ```
1147    pub fn demux<N: NetworkFor<T>>(
1148        self,
1149        to: &Cluster<'a, L2>,
1150        via: N,
1151    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1152    where
1153        T: Serialize + DeserializeOwned,
1154    {
1155        self.into_keyed().demux(to, via)
1156    }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161    #[cfg(feature = "sim")]
1162    use stageleft::q;
1163
1164    #[cfg(feature = "sim")]
1165    use crate::location::{Location, MemberId};
1166    #[cfg(feature = "sim")]
1167    use crate::networking::TCP;
1168    #[cfg(feature = "sim")]
1169    use crate::nondet::nondet;
1170    #[cfg(feature = "sim")]
1171    use crate::prelude::FlowBuilder;
1172
1173    #[cfg(feature = "sim")]
1174    #[test]
1175    fn sim_send_bincode_o2o() {
1176        use crate::networking::TCP;
1177
1178        let mut flow = FlowBuilder::new();
1179        let node = flow.process::<()>();
1180        let node2 = flow.process::<()>();
1181
1182        let (in_send, input) = node.sim_input();
1183
1184        let out_recv = input
1185            .send(&node2, TCP.bincode())
1186            .batch(&node2.tick(), nondet!(/** test */))
1187            .count()
1188            .all_ticks()
1189            .sim_output();
1190
1191        let instances = flow.sim().exhaustive(async || {
1192            in_send.send(());
1193            in_send.send(());
1194            in_send.send(());
1195
1196            let received = out_recv.collect::<Vec<_>>().await;
1197            assert!(received.into_iter().sum::<usize>() == 3);
1198        });
1199
1200        assert_eq!(instances, 4); // 2^{3 - 1}
1201    }
1202
1203    #[cfg(feature = "sim")]
1204    #[test]
1205    fn sim_send_bincode_m2o() {
1206        let mut flow = FlowBuilder::new();
1207        let cluster = flow.cluster::<()>();
1208        let node = flow.process::<()>();
1209
1210        let input = cluster.source_iter(q!(vec![1]));
1211
1212        let out_recv = input
1213            .send(&node, TCP.bincode())
1214            .entries()
1215            .batch(&node.tick(), nondet!(/** test */))
1216            .all_ticks()
1217            .sim_output();
1218
1219        let instances = flow
1220            .sim()
1221            .with_cluster_size(&cluster, 4)
1222            .exhaustive(async || {
1223                out_recv
1224                    .assert_yields_only_unordered(vec![
1225                        (MemberId::from_raw_id(0), 1),
1226                        (MemberId::from_raw_id(1), 1),
1227                        (MemberId::from_raw_id(2), 1),
1228                        (MemberId::from_raw_id(3), 1),
1229                    ])
1230                    .await
1231            });
1232
1233        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1234    }
1235
1236    #[cfg(feature = "sim")]
1237    #[test]
1238    fn sim_send_bincode_multiple_m2o() {
1239        let mut flow = FlowBuilder::new();
1240        let cluster1 = flow.cluster::<()>();
1241        let cluster2 = flow.cluster::<()>();
1242        let node = flow.process::<()>();
1243
1244        let out_recv_1 = cluster1
1245            .source_iter(q!(vec![1]))
1246            .send(&node, TCP.bincode())
1247            .entries()
1248            .sim_output();
1249
1250        let out_recv_2 = cluster2
1251            .source_iter(q!(vec![2]))
1252            .send(&node, TCP.bincode())
1253            .entries()
1254            .sim_output();
1255
1256        let instances = flow
1257            .sim()
1258            .with_cluster_size(&cluster1, 3)
1259            .with_cluster_size(&cluster2, 4)
1260            .exhaustive(async || {
1261                out_recv_1
1262                    .assert_yields_only_unordered(vec![
1263                        (MemberId::from_raw_id(0), 1),
1264                        (MemberId::from_raw_id(1), 1),
1265                        (MemberId::from_raw_id(2), 1),
1266                    ])
1267                    .await;
1268
1269                out_recv_2
1270                    .assert_yields_only_unordered(vec![
1271                        (MemberId::from_raw_id(0), 2),
1272                        (MemberId::from_raw_id(1), 2),
1273                        (MemberId::from_raw_id(2), 2),
1274                        (MemberId::from_raw_id(3), 2),
1275                    ])
1276                    .await;
1277            });
1278
1279        assert_eq!(instances, 1);
1280    }
1281
1282    #[cfg(feature = "sim")]
1283    #[test]
1284    fn sim_send_bincode_o2m() {
1285        let mut flow = FlowBuilder::new();
1286        let cluster = flow.cluster::<()>();
1287        let node = flow.process::<()>();
1288
1289        let input = node.source_iter(q!(vec![
1290            (MemberId::from_raw_id(0), 123),
1291            (MemberId::from_raw_id(1), 456),
1292        ]));
1293
1294        let out_recv = input
1295            .demux(&cluster, TCP.bincode())
1296            .map(q!(|x| x + 1))
1297            .send(&node, TCP.bincode())
1298            .entries()
1299            .sim_output();
1300
1301        flow.sim()
1302            .with_cluster_size(&cluster, 4)
1303            .exhaustive(async || {
1304                out_recv
1305                    .assert_yields_only_unordered(vec![
1306                        (MemberId::from_raw_id(0), 124),
1307                        (MemberId::from_raw_id(1), 457),
1308                    ])
1309                    .await
1310            });
1311    }
1312
1313    #[cfg(feature = "sim")]
1314    #[test]
1315    fn sim_broadcast_bincode_o2m() {
1316        let mut flow = FlowBuilder::new();
1317        let cluster = flow.cluster::<()>();
1318        let node = flow.process::<()>();
1319
1320        let input = node.source_iter(q!(vec![123, 456]));
1321
1322        let out_recv = input
1323            .broadcast(&cluster, TCP.bincode(), nondet!(/** test */))
1324            .map(q!(|x| x + 1))
1325            .send(&node, TCP.bincode())
1326            .entries()
1327            .sim_output();
1328
1329        let mut c_1_produced = false;
1330        let mut c_2_produced = false;
1331
1332        flow.sim()
1333            .with_cluster_size(&cluster, 2)
1334            .exhaustive(async || {
1335                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1336
1337                // check that order is preserved
1338                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1339                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1340                    c_1_produced = true;
1341                }
1342
1343                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1344                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1345                    c_2_produced = true;
1346                }
1347            });
1348
1349        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1350    }
1351
1352    #[cfg(feature = "sim")]
1353    #[test]
1354    fn sim_send_bincode_m2m() {
1355        let mut flow = FlowBuilder::new();
1356        let cluster = flow.cluster::<()>();
1357        let node = flow.process::<()>();
1358
1359        let input = node.source_iter(q!(vec![
1360            (MemberId::from_raw_id(0), 123),
1361            (MemberId::from_raw_id(1), 456),
1362        ]));
1363
1364        let out_recv = input
1365            .demux(&cluster, TCP.bincode())
1366            .map(q!(|x| x + 1))
1367            .flat_map_ordered(q!(|x| vec![
1368                (MemberId::from_raw_id(0), x),
1369                (MemberId::from_raw_id(1), x),
1370            ]))
1371            .demux(&cluster, TCP.bincode())
1372            .entries()
1373            .send(&node, TCP.bincode())
1374            .entries()
1375            .sim_output();
1376
1377        flow.sim()
1378            .with_cluster_size(&cluster, 4)
1379            .exhaustive(async || {
1380                out_recv
1381                    .assert_yields_only_unordered(vec![
1382                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1383                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1384                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1385                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1386                    ])
1387                    .await
1388            });
1389    }
1390}