Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33    membership.fold(
34        q!(|| false),
35        q!(|present, event| {
36            match event {
37                MembershipEvent::Joined => *present = true,
38                MembershipEvent::Left => *present = false,
39            }
40        }),
41    )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45    let root = get_this_crate();
46
47    if is_demux {
48        parse_quote! {
49            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50                |(id, data)| {
51                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52                }
53            )
54        }
55    } else {
56        parse_quote! {
57            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58                |data| {
59                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
60                }
61            )
62        }
63    }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67    serialize_bincode_with_type(is_demux, &quote_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71    let root = get_this_crate();
72    if let Some(c_type) = tagged {
73        parse_quote! {
74            |res| {
75                let (id, b) = res.unwrap();
76                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77            }
78        }
79    } else {
80        parse_quote! {
81            |res| {
82                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83            }
84        }
85    }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89    deserialize_bincode_with_type(tagged, &quote_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
94    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95    /// using [`bincode`] to serialize/deserialize messages.
96    ///
97    /// The returned stream captures the elements received at the destination, where values will
98    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101    /// dropped no further messages will be sent.
102    ///
103    /// # Example
104    /// ```rust
105    /// # #[cfg(feature = "deploy")] {
106    /// # use hydro_lang::prelude::*;
107    /// # use futures::StreamExt;
108    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109    /// let p1 = flow.process::<()>();
110    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111    /// let p2 = flow.process::<()>();
112    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113    /// // 1, 2, 3
114    /// # on_p2.send_bincode(&p_out)
115    /// # }, |mut stream| async move {
116    /// # for w in 1..=3 {
117    /// #     assert_eq!(stream.next().await, Some(w));
118    /// # }
119    /// # }));
120    /// # }
121    /// ```
122    pub fn send_bincode<L2>(
123        self,
124        other: &Process<'a, L2>,
125    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126    where
127        T: Serialize + DeserializeOwned,
128    {
129        self.send(other, TCP.fail_stop().bincode())
130    }
131
132    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133    /// using the configuration in `via` to set up the message transport.
134    ///
135    /// The returned stream captures the elements received at the destination, where values will
136    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139    /// dropped no further messages will be sent.
140    ///
141    /// # Example
142    /// ```rust
143    /// # #[cfg(feature = "deploy")] {
144    /// # use hydro_lang::prelude::*;
145    /// # use futures::StreamExt;
146    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147    /// let p1 = flow.process::<()>();
148    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149    /// let p2 = flow.process::<()>();
150    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
151    /// // 1, 2, 3
152    /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
153    /// # }, |mut stream| async move {
154    /// # for w in 1..=3 {
155    /// #     assert_eq!(stream.next().await, Some(w));
156    /// # }
157    /// # }));
158    /// # }
159    /// ```
160    pub fn send<L2, N: NetworkFor<T>>(
161        self,
162        to: &Process<'a, L2>,
163        via: N,
164    ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
165    where
166        T: Serialize + DeserializeOwned,
167        O: MinOrder<N::OrderingGuarantee>,
168    {
169        let serialize_pipeline = Some(N::serialize_thunk(false));
170        let deserialize_pipeline = Some(N::deserialize_thunk(None));
171
172        let name = via.name();
173        if to.multiversioned() && name.is_none() {
174            panic!(
175                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
176            );
177        }
178
179        Stream::new(
180            to.clone(),
181            HydroNode::Network {
182                name: name.map(ToOwned::to_owned),
183                networking_info: N::networking_info(),
184                serialize_fn: serialize_pipeline.map(|e| e.into()),
185                instantiate_fn: DebugInstantiate::Building,
186                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
187                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
188                metadata: to.new_node_metadata(Stream::<
189                    T,
190                    Process<'a, L2>,
191                    Unbounded,
192                    <O as MinOrder<N::OrderingGuarantee>>::Min,
193                    R,
194                >::collection_kind()),
195            },
196        )
197    }
198
199    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
200    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
201    /// using [`bincode`] to serialize/deserialize messages.
202    ///
203    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
204    /// membership information. This is a common pattern in distributed systems for broadcasting data to
205    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
206    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
207    /// each element to all cluster members.
208    ///
209    /// # Non-Determinism
210    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
211    /// to the current cluster members _at that point in time_. Depending on when we are notified of
212    /// membership changes, we will broadcast each element to different members.
213    ///
214    /// # Example
215    /// ```rust
216    /// # #[cfg(feature = "deploy")] {
217    /// # use hydro_lang::prelude::*;
218    /// # use futures::StreamExt;
219    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
220    /// let p1 = flow.process::<()>();
221    /// let workers: Cluster<()> = flow.cluster::<()>();
222    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
223    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
224    /// # on_worker.send_bincode(&p2).entries()
225    /// // if there are 4 members in the cluster, each receives one element
226    /// // - MemberId::<()>(0): [123]
227    /// // - MemberId::<()>(1): [123]
228    /// // - MemberId::<()>(2): [123]
229    /// // - MemberId::<()>(3): [123]
230    /// # }, |mut stream| async move {
231    /// # let mut results = Vec::new();
232    /// # for w in 0..4 {
233    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
234    /// # }
235    /// # results.sort();
236    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
237    /// # }));
238    /// # }
239    /// ```
240    pub fn broadcast_bincode<L2: 'a>(
241        self,
242        other: &Cluster<'a, L2>,
243        nondet_membership: NonDet,
244    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
245    where
246        T: Clone + Serialize + DeserializeOwned,
247    {
248        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
249    }
250
251    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
252    /// using the configuration in `via` to set up the message transport.
253    ///
254    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
255    /// membership information. This is a common pattern in distributed systems for broadcasting data to
256    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
257    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
258    /// each element to all cluster members.
259    ///
260    /// # Non-Determinism
261    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
262    /// to the current cluster members _at that point in time_. Depending on when we are notified of
263    /// membership changes, we will broadcast each element to different members.
264    ///
265    /// # Example
266    /// ```rust
267    /// # #[cfg(feature = "deploy")] {
268    /// # use hydro_lang::prelude::*;
269    /// # use futures::StreamExt;
270    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
271    /// let p1 = flow.process::<()>();
272    /// let workers: Cluster<()> = flow.cluster::<()>();
273    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
274    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
275    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
276    /// // if there are 4 members in the cluster, each receives one element
277    /// // - MemberId::<()>(0): [123]
278    /// // - MemberId::<()>(1): [123]
279    /// // - MemberId::<()>(2): [123]
280    /// // - MemberId::<()>(3): [123]
281    /// # }, |mut stream| async move {
282    /// # let mut results = Vec::new();
283    /// # for w in 0..4 {
284    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
285    /// # }
286    /// # results.sort();
287    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
288    /// # }));
289    /// # }
290    /// ```
291    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
292        self,
293        to: &Cluster<'a, L2>,
294        via: N,
295        nondet_membership: NonDet,
296    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
297    where
298        T: Clone + Serialize + DeserializeOwned,
299        O: MinOrder<N::OrderingGuarantee>,
300    {
301        let ids = track_membership(self.location.source_cluster_members(to));
302        sliced! {
303            let members_snapshot = use(ids, nondet_membership);
304            let elements = use(self, nondet_membership);
305
306            let current_members = members_snapshot.filter(q!(|b| *b));
307            elements.repeat_with_keys(current_members)
308        }
309        .demux(to, via)
310    }
311
312    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
313    /// serialization. The external process can receive these elements by establishing a TCP
314    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
315    ///
316    /// # Example
317    /// ```rust
318    /// # #[cfg(feature = "deploy")] {
319    /// # use hydro_lang::prelude::*;
320    /// # use futures::StreamExt;
321    /// # tokio_test::block_on(async move {
322    /// let mut flow = FlowBuilder::new();
323    /// let process = flow.process::<()>();
324    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
325    /// let external = flow.external::<()>();
326    /// let external_handle = numbers.send_bincode_external(&external);
327    ///
328    /// let mut deployment = hydro_deploy::Deployment::new();
329    /// let nodes = flow
330    ///     .with_process(&process, deployment.Localhost())
331    ///     .with_external(&external, deployment.Localhost())
332    ///     .deploy(&mut deployment);
333    ///
334    /// deployment.deploy().await.unwrap();
335    /// // establish the TCP connection
336    /// let mut external_recv_stream = nodes.connect(external_handle).await;
337    /// deployment.start().await.unwrap();
338    ///
339    /// for w in 1..=3 {
340    ///     assert_eq!(external_recv_stream.next().await, Some(w));
341    /// }
342    /// # });
343    /// # }
344    /// ```
345    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
346    where
347        T: Serialize + DeserializeOwned,
348    {
349        let serialize_pipeline = Some(serialize_bincode::<T>(false));
350
351        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
352
353        let external_port_id = flow_state_borrow.next_external_port();
354
355        flow_state_borrow.push_root(HydroRoot::SendExternal {
356            to_external_key: other.key,
357            to_port_id: external_port_id,
358            to_many: false,
359            unpaired: true,
360            serialize_fn: serialize_pipeline.map(|e| e.into()),
361            instantiate_fn: DebugInstantiate::Building,
362            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
363            op_metadata: HydroIrOpMetadata::new(),
364        });
365
366        ExternalBincodeStream {
367            process_key: other.key,
368            port_id: external_port_id,
369            _phantom: PhantomData,
370        }
371    }
372
373    #[cfg(feature = "sim")]
374    /// Sets up a simulation output port for this stream, allowing test code to receive elements
375    /// sent to this stream during simulation.
376    pub fn sim_output(self) -> SimReceiver<T, O, R>
377    where
378        T: Serialize + DeserializeOwned,
379    {
380        let external_location: External<'a, ()> = External {
381            key: LocationKey::FIRST,
382            flow_state: self.location.flow_state().clone(),
383            _phantom: PhantomData,
384        };
385
386        let external = self.send_bincode_external(&external_location);
387
388        SimReceiver(external.port_id, PhantomData)
389    }
390}
391
392impl<'a, T, L: Location<'a> + NoTick, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
393    /// Creates an external output for embedded deployment mode.
394    ///
395    /// The `name` parameter specifies the name of the field in the generated
396    /// `EmbeddedOutputs` struct that will receive elements from this stream.
397    /// The generated function will accept an `EmbeddedOutputs` struct with an
398    /// `impl FnMut(T)` field with this name.
399    pub fn embedded_output(self, name: impl Into<String>) {
400        let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
401
402        self.location
403            .flow_state()
404            .borrow_mut()
405            .push_root(HydroRoot::EmbeddedOutput {
406                ident,
407                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
408                op_metadata: HydroIrOpMetadata::new(),
409            });
410    }
411}
412
413impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
414    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
415{
416    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
417    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
418    /// using [`bincode`] to serialize/deserialize messages.
419    ///
420    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
421    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
422    /// this API allows precise targeting of specific cluster members rather than broadcasting to
423    /// all members.
424    ///
425    /// # Example
426    /// ```rust
427    /// # #[cfg(feature = "deploy")] {
428    /// # use hydro_lang::prelude::*;
429    /// # use futures::StreamExt;
430    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
431    /// let p1 = flow.process::<()>();
432    /// let workers: Cluster<()> = flow.cluster::<()>();
433    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
434    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
435    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
436    ///     .demux_bincode(&workers);
437    /// # on_worker.send_bincode(&p2).entries()
438    /// // if there are 4 members in the cluster, each receives one element
439    /// // - MemberId::<()>(0): [0]
440    /// // - MemberId::<()>(1): [1]
441    /// // - MemberId::<()>(2): [2]
442    /// // - MemberId::<()>(3): [3]
443    /// # }, |mut stream| async move {
444    /// # let mut results = Vec::new();
445    /// # for w in 0..4 {
446    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
447    /// # }
448    /// # results.sort();
449    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
450    /// # }));
451    /// # }
452    /// ```
453    pub fn demux_bincode(
454        self,
455        other: &Cluster<'a, L2>,
456    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
457    where
458        T: Serialize + DeserializeOwned,
459    {
460        self.demux(other, TCP.fail_stop().bincode())
461    }
462
463    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
464    /// using the configuration in `via` to set up the message transport.
465    ///
466    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
467    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
468    /// this API allows precise targeting of specific cluster members rather than broadcasting to
469    /// all members.
470    ///
471    /// # Example
472    /// ```rust
473    /// # #[cfg(feature = "deploy")] {
474    /// # use hydro_lang::prelude::*;
475    /// # use futures::StreamExt;
476    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
477    /// let p1 = flow.process::<()>();
478    /// let workers: Cluster<()> = flow.cluster::<()>();
479    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
480    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
481    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
482    ///     .demux(&workers, TCP.fail_stop().bincode());
483    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
484    /// // if there are 4 members in the cluster, each receives one element
485    /// // - MemberId::<()>(0): [0]
486    /// // - MemberId::<()>(1): [1]
487    /// // - MemberId::<()>(2): [2]
488    /// // - MemberId::<()>(3): [3]
489    /// # }, |mut stream| async move {
490    /// # let mut results = Vec::new();
491    /// # for w in 0..4 {
492    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
493    /// # }
494    /// # results.sort();
495    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
496    /// # }));
497    /// # }
498    /// ```
499    pub fn demux<N: NetworkFor<T>>(
500        self,
501        to: &Cluster<'a, L2>,
502        via: N,
503    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
504    where
505        T: Serialize + DeserializeOwned,
506        O: MinOrder<N::OrderingGuarantee>,
507    {
508        self.into_keyed().demux(to, via)
509    }
510}
511
512impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
513    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
514    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
515    /// [`bincode`] to serialize/deserialize messages.
516    ///
517    /// This provides load balancing by evenly distributing work across cluster members. The
518    /// distribution is deterministic based on element order - the first element goes to member 0,
519    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
520    ///
521    /// # Non-Determinism
522    /// The set of cluster members may asynchronously change over time. Each element is distributed
523    /// based on the current cluster membership _at that point in time_. Depending on when cluster
524    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
525    /// membership is stable, the order of members in the round-robin pattern may change across runs.
526    ///
527    /// # Ordering Requirements
528    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
529    /// order of messages and retries affects the round-robin pattern.
530    ///
531    /// # Example
532    /// ```rust
533    /// # #[cfg(feature = "deploy")] {
534    /// # use hydro_lang::prelude::*;
535    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
536    /// # use futures::StreamExt;
537    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
538    /// let p1 = flow.process::<()>();
539    /// let workers: Cluster<()> = flow.cluster::<()>();
540    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
541    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
542    /// on_worker.send_bincode(&p2)
543    /// # .first().values() // we use first to assert that each member gets one element
544    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
545    /// // - MemberId::<()>(?): [1]
546    /// // - MemberId::<()>(?): [2]
547    /// // - MemberId::<()>(?): [3]
548    /// // - MemberId::<()>(?): [4]
549    /// # }, |mut stream| async move {
550    /// # let mut results = Vec::new();
551    /// # for w in 0..4 {
552    /// #     results.push(stream.next().await.unwrap());
553    /// # }
554    /// # results.sort();
555    /// # assert_eq!(results, vec![1, 2, 3, 4]);
556    /// # }));
557    /// # }
558    /// ```
559    pub fn round_robin_bincode<L2: 'a>(
560        self,
561        other: &Cluster<'a, L2>,
562        nondet_membership: NonDet,
563    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
564    where
565        T: Serialize + DeserializeOwned,
566    {
567        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
568    }
569
570    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
571    /// the configuration in `via` to set up the message transport.
572    ///
573    /// This provides load balancing by evenly distributing work across cluster members. The
574    /// distribution is deterministic based on element order - the first element goes to member 0,
575    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
576    ///
577    /// # Non-Determinism
578    /// The set of cluster members may asynchronously change over time. Each element is distributed
579    /// based on the current cluster membership _at that point in time_. Depending on when cluster
580    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
581    /// membership is stable, the order of members in the round-robin pattern may change across runs.
582    ///
583    /// # Ordering Requirements
584    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
585    /// order of messages and retries affects the round-robin pattern.
586    ///
587    /// # Example
588    /// ```rust
589    /// # #[cfg(feature = "deploy")] {
590    /// # use hydro_lang::prelude::*;
591    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
592    /// # use futures::StreamExt;
593    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
594    /// let p1 = flow.process::<()>();
595    /// let workers: Cluster<()> = flow.cluster::<()>();
596    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
597    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
598    /// on_worker.send(&p2, TCP.fail_stop().bincode())
599    /// # .first().values() // we use first to assert that each member gets one element
600    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
601    /// // - MemberId::<()>(?): [1]
602    /// // - MemberId::<()>(?): [2]
603    /// // - MemberId::<()>(?): [3]
604    /// // - MemberId::<()>(?): [4]
605    /// # }, |mut stream| async move {
606    /// # let mut results = Vec::new();
607    /// # for w in 0..4 {
608    /// #     results.push(stream.next().await.unwrap());
609    /// # }
610    /// # results.sort();
611    /// # assert_eq!(results, vec![1, 2, 3, 4]);
612    /// # }));
613    /// # }
614    /// ```
615    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
616        self,
617        to: &Cluster<'a, L2>,
618        via: N,
619        nondet_membership: NonDet,
620    ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
621    where
622        T: Serialize + DeserializeOwned,
623    {
624        let ids = track_membership(self.location.source_cluster_members(to));
625        sliced! {
626            let members_snapshot = use(ids, nondet_membership);
627            let elements = use(self.enumerate(), nondet_membership);
628
629            let current_members = members_snapshot
630                .filter(q!(|b| *b))
631                .keys()
632                .assume_ordering::<TotalOrder>(nondet_membership)
633                .collect_vec();
634
635            elements
636                .cross_singleton(current_members)
637                .map(q!(|(data, members)| (
638                    members[data.0 % members.len()].clone(),
639                    data.1
640                )))
641        }
642        .demux(to, via)
643    }
644}
645
646impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
647    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
648    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
649    /// [`bincode`] to serialize/deserialize messages.
650    ///
651    /// This provides load balancing by evenly distributing work across cluster members. The
652    /// distribution is deterministic based on element order - the first element goes to member 0,
653    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
654    ///
655    /// # Non-Determinism
656    /// The set of cluster members may asynchronously change over time. Each element is distributed
657    /// based on the current cluster membership _at that point in time_. Depending on when cluster
658    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
659    /// membership is stable, the order of members in the round-robin pattern may change across runs.
660    ///
661    /// # Ordering Requirements
662    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
663    /// order of messages and retries affects the round-robin pattern.
664    ///
665    /// # Example
666    /// ```rust
667    /// # #[cfg(feature = "deploy")] {
668    /// # use hydro_lang::prelude::*;
669    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
670    /// # use hydro_lang::location::MemberId;
671    /// # use futures::StreamExt;
672    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
673    /// let p1 = flow.process::<()>();
674    /// let workers1: Cluster<()> = flow.cluster::<()>();
675    /// let workers2: Cluster<()> = flow.cluster::<()>();
676    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
677    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
678    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
679    /// on_worker2.send_bincode(&p2)
680    /// # .entries()
681    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
682    /// # }, |mut stream| async move {
683    /// # let mut results = Vec::new();
684    /// # let mut locations = std::collections::HashSet::new();
685    /// # for w in 0..=16 {
686    /// #     let (location, v) = stream.next().await.unwrap();
687    /// #     locations.insert(location);
688    /// #     results.push(v);
689    /// # }
690    /// # results.sort();
691    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
692    /// # assert_eq!(locations.len(), 16);
693    /// # }));
694    /// # }
695    /// ```
696    pub fn round_robin_bincode<L2: 'a>(
697        self,
698        other: &Cluster<'a, L2>,
699        nondet_membership: NonDet,
700    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
701    where
702        T: Serialize + DeserializeOwned,
703    {
704        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
705    }
706
707    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
708    /// the configuration in `via` to set up the message transport.
709    ///
710    /// This provides load balancing by evenly distributing work across cluster members. The
711    /// distribution is deterministic based on element order - the first element goes to member 0,
712    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
713    ///
714    /// # Non-Determinism
715    /// The set of cluster members may asynchronously change over time. Each element is distributed
716    /// based on the current cluster membership _at that point in time_. Depending on when cluster
717    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
718    /// membership is stable, the order of members in the round-robin pattern may change across runs.
719    ///
720    /// # Ordering Requirements
721    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
722    /// order of messages and retries affects the round-robin pattern.
723    ///
724    /// # Example
725    /// ```rust
726    /// # #[cfg(feature = "deploy")] {
727    /// # use hydro_lang::prelude::*;
728    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
729    /// # use hydro_lang::location::MemberId;
730    /// # use futures::StreamExt;
731    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
732    /// let p1 = flow.process::<()>();
733    /// let workers1: Cluster<()> = flow.cluster::<()>();
734    /// let workers2: Cluster<()> = flow.cluster::<()>();
735    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
736    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
737    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
738    /// on_worker2.send(&p2, TCP.fail_stop().bincode())
739    /// # .entries()
740    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
741    /// # }, |mut stream| async move {
742    /// # let mut results = Vec::new();
743    /// # let mut locations = std::collections::HashSet::new();
744    /// # for w in 0..=16 {
745    /// #     let (location, v) = stream.next().await.unwrap();
746    /// #     locations.insert(location);
747    /// #     results.push(v);
748    /// # }
749    /// # results.sort();
750    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
751    /// # assert_eq!(locations.len(), 16);
752    /// # }));
753    /// # }
754    /// ```
755    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
756        self,
757        to: &Cluster<'a, L2>,
758        via: N,
759        nondet_membership: NonDet,
760    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
761    where
762        T: Serialize + DeserializeOwned,
763    {
764        let ids = track_membership(self.location.source_cluster_members(to));
765        sliced! {
766            let members_snapshot = use(ids, nondet_membership);
767            let elements = use(self.enumerate(), nondet_membership);
768
769            let current_members = members_snapshot
770                .filter(q!(|b| *b))
771                .keys()
772                .assume_ordering::<TotalOrder>(nondet_membership)
773                .collect_vec();
774
775            elements
776                .cross_singleton(current_members)
777                .map(q!(|(data, members)| (
778                    members[data.0 % members.len()].clone(),
779                    data.1
780                )))
781        }
782        .demux(to, via)
783    }
784}
785
786impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
787    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
788    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
789    /// using [`bincode`] to serialize/deserialize messages.
790    ///
791    /// Each cluster member sends its local stream elements, and they are collected at the destination
792    /// as a [`KeyedStream`] where keys identify the source cluster member.
793    ///
794    /// # Example
795    /// ```rust
796    /// # #[cfg(feature = "deploy")] {
797    /// # use hydro_lang::prelude::*;
798    /// # use futures::StreamExt;
799    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
800    /// let workers: Cluster<()> = flow.cluster::<()>();
801    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
802    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
803    /// # all_received.entries()
804    /// # }, |mut stream| async move {
805    /// // if there are 4 members in the cluster, we should receive 4 elements
806    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
807    /// # let mut results = Vec::new();
808    /// # for w in 0..4 {
809    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
810    /// # }
811    /// # results.sort();
812    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
813    /// # }));
814    /// # }
815    /// ```
816    ///
817    /// If you don't need to know the source for each element, you can use `.values()`
818    /// to get just the data:
819    /// ```rust
820    /// # #[cfg(feature = "deploy")] {
821    /// # use hydro_lang::prelude::*;
822    /// # use hydro_lang::live_collections::stream::NoOrder;
823    /// # use futures::StreamExt;
824    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
825    /// # let workers: Cluster<()> = flow.cluster::<()>();
826    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
827    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
828    /// # values
829    /// # }, |mut stream| async move {
830    /// # let mut results = Vec::new();
831    /// # for w in 0..4 {
832    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
833    /// # }
834    /// # results.sort();
835    /// // if there are 4 members in the cluster, we should receive 4 elements
836    /// // 1, 1, 1, 1
837    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
838    /// # }));
839    /// # }
840    /// ```
841    pub fn send_bincode<L2>(
842        self,
843        other: &Process<'a, L2>,
844    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
845    where
846        T: Serialize + DeserializeOwned,
847    {
848        self.send(other, TCP.fail_stop().bincode())
849    }
850
851    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
852    /// using the configuration in `via` to set up the message transport.
853    ///
854    /// Each cluster member sends its local stream elements, and they are collected at the destination
855    /// as a [`KeyedStream`] where keys identify the source cluster member.
856    ///
857    /// # Example
858    /// ```rust
859    /// # #[cfg(feature = "deploy")] {
860    /// # use hydro_lang::prelude::*;
861    /// # use futures::StreamExt;
862    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
863    /// let workers: Cluster<()> = flow.cluster::<()>();
864    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
865    /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
866    /// # all_received.entries()
867    /// # }, |mut stream| async move {
868    /// // if there are 4 members in the cluster, we should receive 4 elements
869    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
870    /// # let mut results = Vec::new();
871    /// # for w in 0..4 {
872    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
873    /// # }
874    /// # results.sort();
875    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
876    /// # }));
877    /// # }
878    /// ```
879    ///
880    /// If you don't need to know the source for each element, you can use `.values()`
881    /// to get just the data:
882    /// ```rust
883    /// # #[cfg(feature = "deploy")] {
884    /// # use hydro_lang::prelude::*;
885    /// # use hydro_lang::live_collections::stream::NoOrder;
886    /// # use futures::StreamExt;
887    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
888    /// # let workers: Cluster<()> = flow.cluster::<()>();
889    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
890    /// let values: Stream<i32, _, _, NoOrder> =
891    ///     numbers.send(&process, TCP.fail_stop().bincode()).values();
892    /// # values
893    /// # }, |mut stream| async move {
894    /// # let mut results = Vec::new();
895    /// # for w in 0..4 {
896    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
897    /// # }
898    /// # results.sort();
899    /// // if there are 4 members in the cluster, we should receive 4 elements
900    /// // 1, 1, 1, 1
901    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
902    /// # }));
903    /// # }
904    /// ```
905    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
906    pub fn send<L2, N: NetworkFor<T>>(
907        self,
908        to: &Process<'a, L2>,
909        via: N,
910    ) -> KeyedStream<
911        MemberId<L>,
912        T,
913        Process<'a, L2>,
914        Unbounded,
915        <O as MinOrder<N::OrderingGuarantee>>::Min,
916        R,
917    >
918    where
919        T: Serialize + DeserializeOwned,
920        O: MinOrder<N::OrderingGuarantee>,
921    {
922        let serialize_pipeline = Some(N::serialize_thunk(false));
923
924        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
925
926        let name = via.name();
927        if to.multiversioned() && name.is_none() {
928            panic!(
929                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
930            );
931        }
932
933        let raw_stream: Stream<
934            (MemberId<L>, T),
935            Process<'a, L2>,
936            Unbounded,
937            <O as MinOrder<N::OrderingGuarantee>>::Min,
938            R,
939        > = Stream::new(
940            to.clone(),
941            HydroNode::Network {
942                name: name.map(ToOwned::to_owned),
943                networking_info: N::networking_info(),
944                serialize_fn: serialize_pipeline.map(|e| e.into()),
945                instantiate_fn: DebugInstantiate::Building,
946                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
947                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
948                metadata: to.new_node_metadata(Stream::<
949                    (MemberId<L>, T),
950                    Process<'a, L2>,
951                    Unbounded,
952                    <O as MinOrder<N::OrderingGuarantee>>::Min,
953                    R,
954                >::collection_kind()),
955            },
956        );
957
958        raw_stream.into_keyed()
959    }
960
961    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
962    /// Broadcasts elements of this stream at each source member to all members of a destination
963    /// cluster, using [`bincode`] to serialize/deserialize messages.
964    ///
965    /// Each source member sends each of its stream elements to **every** member of the cluster
966    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
967    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
968    /// **only data elements** and sends each element to all cluster members.
969    ///
970    /// # Non-Determinism
971    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
972    /// to the current cluster members known _at that point in time_ at the source member. Depending
973    /// on when each source member is notified of membership changes, it will broadcast each element
974    /// to different members.
975    ///
976    /// # Example
977    /// ```rust
978    /// # #[cfg(feature = "deploy")] {
979    /// # use hydro_lang::prelude::*;
980    /// # use hydro_lang::location::MemberId;
981    /// # use futures::StreamExt;
982    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
983    /// # type Source = ();
984    /// # type Destination = ();
985    /// let source: Cluster<Source> = flow.cluster::<Source>();
986    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
987    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
988    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
989    /// # on_destination.entries().send_bincode(&p2).entries()
990    /// // if there are 4 members in the desination, each receives one element from each source member
991    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
992    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
993    /// // - ...
994    /// # }, |mut stream| async move {
995    /// # let mut results = Vec::new();
996    /// # for w in 0..16 {
997    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
998    /// # }
999    /// # results.sort();
1000    /// # assert_eq!(results, vec![
1001    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1002    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1003    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1004    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1005    /// # ]);
1006    /// # }));
1007    /// # }
1008    /// ```
1009    pub fn broadcast_bincode<L2: 'a>(
1010        self,
1011        other: &Cluster<'a, L2>,
1012        nondet_membership: NonDet,
1013    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1014    where
1015        T: Clone + Serialize + DeserializeOwned,
1016    {
1017        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1018    }
1019
1020    /// Broadcasts elements of this stream at each source member to all members of a destination
1021    /// cluster, using the configuration in `via` to set up the message transport.
1022    ///
1023    /// Each source member sends each of its stream elements to **every** member of the cluster
1024    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1025    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1026    /// **only data elements** and sends each element to all cluster members.
1027    ///
1028    /// # Non-Determinism
1029    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1030    /// to the current cluster members known _at that point in time_ at the source member. Depending
1031    /// on when each source member is notified of membership changes, it will broadcast each element
1032    /// to different members.
1033    ///
1034    /// # Example
1035    /// ```rust
1036    /// # #[cfg(feature = "deploy")] {
1037    /// # use hydro_lang::prelude::*;
1038    /// # use hydro_lang::location::MemberId;
1039    /// # use futures::StreamExt;
1040    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1041    /// # type Source = ();
1042    /// # type Destination = ();
1043    /// let source: Cluster<Source> = flow.cluster::<Source>();
1044    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1045    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1046    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1047    /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1048    /// // if there are 4 members in the desination, each receives one element from each source member
1049    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1050    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1051    /// // - ...
1052    /// # }, |mut stream| async move {
1053    /// # let mut results = Vec::new();
1054    /// # for w in 0..16 {
1055    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1056    /// # }
1057    /// # results.sort();
1058    /// # assert_eq!(results, vec![
1059    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1060    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1061    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1062    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1063    /// # ]);
1064    /// # }));
1065    /// # }
1066    /// ```
1067    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1068    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1069        self,
1070        to: &Cluster<'a, L2>,
1071        via: N,
1072        nondet_membership: NonDet,
1073    ) -> KeyedStream<
1074        MemberId<L>,
1075        T,
1076        Cluster<'a, L2>,
1077        Unbounded,
1078        <O as MinOrder<N::OrderingGuarantee>>::Min,
1079        R,
1080    >
1081    where
1082        T: Clone + Serialize + DeserializeOwned,
1083        O: MinOrder<N::OrderingGuarantee>,
1084    {
1085        let ids = track_membership(self.location.source_cluster_members(to));
1086        sliced! {
1087            let members_snapshot = use(ids, nondet_membership);
1088            let elements = use(self, nondet_membership);
1089
1090            let current_members = members_snapshot.filter(q!(|b| *b));
1091            elements.repeat_with_keys(current_members)
1092        }
1093        .demux(to, via)
1094    }
1095}
1096
1097impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1098    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1099{
1100    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1101    /// Sends elements of this stream at each source member to specific members of a destination
1102    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1103    ///
1104    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1105    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1106    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1107    /// all members.
1108    ///
1109    /// Each cluster member sends its local stream elements, and they are collected at each
1110    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1111    ///
1112    /// # Example
1113    /// ```rust
1114    /// # #[cfg(feature = "deploy")] {
1115    /// # use hydro_lang::prelude::*;
1116    /// # use futures::StreamExt;
1117    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1118    /// # type Source = ();
1119    /// # type Destination = ();
1120    /// let source: Cluster<Source> = flow.cluster::<Source>();
1121    /// let to_send: Stream<_, Cluster<_>, _> = source
1122    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1123    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1124    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1125    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1126    /// # all_received.entries().send_bincode(&p2).entries()
1127    /// # }, |mut stream| async move {
1128    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1129    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1130    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1131    /// // - ...
1132    /// # let mut results = Vec::new();
1133    /// # for w in 0..16 {
1134    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1135    /// # }
1136    /// # results.sort();
1137    /// # assert_eq!(results, vec![
1138    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1139    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1140    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1141    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1142    /// # ]);
1143    /// # }));
1144    /// # }
1145    /// ```
1146    pub fn demux_bincode(
1147        self,
1148        other: &Cluster<'a, L2>,
1149    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1150    where
1151        T: Serialize + DeserializeOwned,
1152    {
1153        self.demux(other, TCP.fail_stop().bincode())
1154    }
1155
1156    /// Sends elements of this stream at each source member to specific members of a destination
1157    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1158    /// message transport.
1159    ///
1160    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1161    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1162    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1163    /// all members.
1164    ///
1165    /// Each cluster member sends its local stream elements, and they are collected at each
1166    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1167    ///
1168    /// # Example
1169    /// ```rust
1170    /// # #[cfg(feature = "deploy")] {
1171    /// # use hydro_lang::prelude::*;
1172    /// # use futures::StreamExt;
1173    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1174    /// # type Source = ();
1175    /// # type Destination = ();
1176    /// let source: Cluster<Source> = flow.cluster::<Source>();
1177    /// let to_send: Stream<_, Cluster<_>, _> = source
1178    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1179    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1180    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1181    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1182    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1183    /// # }, |mut stream| async move {
1184    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1185    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1186    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1187    /// // - ...
1188    /// # let mut results = Vec::new();
1189    /// # for w in 0..16 {
1190    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1191    /// # }
1192    /// # results.sort();
1193    /// # assert_eq!(results, vec![
1194    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1195    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1196    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1197    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1198    /// # ]);
1199    /// # }));
1200    /// # }
1201    /// ```
1202    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1203    pub fn demux<N: NetworkFor<T>>(
1204        self,
1205        to: &Cluster<'a, L2>,
1206        via: N,
1207    ) -> KeyedStream<
1208        MemberId<L>,
1209        T,
1210        Cluster<'a, L2>,
1211        Unbounded,
1212        <O as MinOrder<N::OrderingGuarantee>>::Min,
1213        R,
1214    >
1215    where
1216        T: Serialize + DeserializeOwned,
1217        O: MinOrder<N::OrderingGuarantee>,
1218    {
1219        self.into_keyed().demux(to, via)
1220    }
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225    #[cfg(feature = "sim")]
1226    use stageleft::q;
1227
1228    #[cfg(feature = "sim")]
1229    use crate::live_collections::sliced::sliced;
1230    #[cfg(feature = "sim")]
1231    use crate::location::{Location, MemberId};
1232    #[cfg(feature = "sim")]
1233    use crate::networking::TCP;
1234    #[cfg(feature = "sim")]
1235    use crate::nondet::nondet;
1236    #[cfg(feature = "sim")]
1237    use crate::prelude::FlowBuilder;
1238
1239    #[cfg(feature = "sim")]
1240    #[test]
1241    fn sim_send_bincode_o2o() {
1242        use crate::networking::TCP;
1243
1244        let mut flow = FlowBuilder::new();
1245        let node = flow.process::<()>();
1246        let node2 = flow.process::<()>();
1247
1248        let (in_send, input) = node.sim_input();
1249
1250        let out_recv = input
1251            .send(&node2, TCP.fail_stop().bincode())
1252            .batch(&node2.tick(), nondet!(/** test */))
1253            .count()
1254            .all_ticks()
1255            .sim_output();
1256
1257        let instances = flow.sim().exhaustive(async || {
1258            in_send.send(());
1259            in_send.send(());
1260            in_send.send(());
1261
1262            let received = out_recv.collect::<Vec<_>>().await;
1263            assert!(received.into_iter().sum::<usize>() == 3);
1264        });
1265
1266        assert_eq!(instances, 4); // 2^{3 - 1}
1267    }
1268
1269    #[cfg(feature = "sim")]
1270    #[test]
1271    fn sim_send_bincode_m2o() {
1272        let mut flow = FlowBuilder::new();
1273        let cluster = flow.cluster::<()>();
1274        let node = flow.process::<()>();
1275
1276        let input = cluster.source_iter(q!(vec![1]));
1277
1278        let out_recv = input
1279            .send(&node, TCP.fail_stop().bincode())
1280            .entries()
1281            .batch(&node.tick(), nondet!(/** test */))
1282            .all_ticks()
1283            .sim_output();
1284
1285        let instances = flow
1286            .sim()
1287            .with_cluster_size(&cluster, 4)
1288            .exhaustive(async || {
1289                out_recv
1290                    .assert_yields_only_unordered(vec![
1291                        (MemberId::from_raw_id(0), 1),
1292                        (MemberId::from_raw_id(1), 1),
1293                        (MemberId::from_raw_id(2), 1),
1294                        (MemberId::from_raw_id(3), 1),
1295                    ])
1296                    .await
1297            });
1298
1299        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1300    }
1301
1302    #[cfg(feature = "sim")]
1303    #[test]
1304    fn sim_send_bincode_multiple_m2o() {
1305        let mut flow = FlowBuilder::new();
1306        let cluster1 = flow.cluster::<()>();
1307        let cluster2 = flow.cluster::<()>();
1308        let node = flow.process::<()>();
1309
1310        let out_recv_1 = cluster1
1311            .source_iter(q!(vec![1]))
1312            .send(&node, TCP.fail_stop().bincode())
1313            .entries()
1314            .sim_output();
1315
1316        let out_recv_2 = cluster2
1317            .source_iter(q!(vec![2]))
1318            .send(&node, TCP.fail_stop().bincode())
1319            .entries()
1320            .sim_output();
1321
1322        let instances = flow
1323            .sim()
1324            .with_cluster_size(&cluster1, 3)
1325            .with_cluster_size(&cluster2, 4)
1326            .exhaustive(async || {
1327                out_recv_1
1328                    .assert_yields_only_unordered(vec![
1329                        (MemberId::from_raw_id(0), 1),
1330                        (MemberId::from_raw_id(1), 1),
1331                        (MemberId::from_raw_id(2), 1),
1332                    ])
1333                    .await;
1334
1335                out_recv_2
1336                    .assert_yields_only_unordered(vec![
1337                        (MemberId::from_raw_id(0), 2),
1338                        (MemberId::from_raw_id(1), 2),
1339                        (MemberId::from_raw_id(2), 2),
1340                        (MemberId::from_raw_id(3), 2),
1341                    ])
1342                    .await;
1343            });
1344
1345        assert_eq!(instances, 1);
1346    }
1347
1348    #[cfg(feature = "sim")]
1349    #[test]
1350    fn sim_send_bincode_o2m() {
1351        let mut flow = FlowBuilder::new();
1352        let cluster = flow.cluster::<()>();
1353        let node = flow.process::<()>();
1354
1355        let input = node.source_iter(q!(vec![
1356            (MemberId::from_raw_id(0), 123),
1357            (MemberId::from_raw_id(1), 456),
1358        ]));
1359
1360        let out_recv = input
1361            .demux(&cluster, TCP.fail_stop().bincode())
1362            .map(q!(|x| x + 1))
1363            .send(&node, TCP.fail_stop().bincode())
1364            .entries()
1365            .sim_output();
1366
1367        flow.sim()
1368            .with_cluster_size(&cluster, 4)
1369            .exhaustive(async || {
1370                out_recv
1371                    .assert_yields_only_unordered(vec![
1372                        (MemberId::from_raw_id(0), 124),
1373                        (MemberId::from_raw_id(1), 457),
1374                    ])
1375                    .await
1376            });
1377    }
1378
1379    #[cfg(feature = "sim")]
1380    #[test]
1381    fn sim_broadcast_bincode_o2m() {
1382        let mut flow = FlowBuilder::new();
1383        let cluster = flow.cluster::<()>();
1384        let node = flow.process::<()>();
1385
1386        let input = node.source_iter(q!(vec![123, 456]));
1387
1388        let out_recv = input
1389            .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1390            .map(q!(|x| x + 1))
1391            .send(&node, TCP.fail_stop().bincode())
1392            .entries()
1393            .sim_output();
1394
1395        let mut c_1_produced = false;
1396        let mut c_2_produced = false;
1397
1398        flow.sim()
1399            .with_cluster_size(&cluster, 2)
1400            .exhaustive(async || {
1401                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1402
1403                // check that order is preserved
1404                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1405                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1406                    c_1_produced = true;
1407                }
1408
1409                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1410                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1411                    c_2_produced = true;
1412                }
1413            });
1414
1415        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1416    }
1417
1418    #[cfg(feature = "sim")]
1419    #[test]
1420    fn sim_send_bincode_m2m() {
1421        let mut flow = FlowBuilder::new();
1422        let cluster = flow.cluster::<()>();
1423        let node = flow.process::<()>();
1424
1425        let input = node.source_iter(q!(vec![
1426            (MemberId::from_raw_id(0), 123),
1427            (MemberId::from_raw_id(1), 456),
1428        ]));
1429
1430        let out_recv = input
1431            .demux(&cluster, TCP.fail_stop().bincode())
1432            .map(q!(|x| x + 1))
1433            .flat_map_ordered(q!(|x| vec![
1434                (MemberId::from_raw_id(0), x),
1435                (MemberId::from_raw_id(1), x),
1436            ]))
1437            .demux(&cluster, TCP.fail_stop().bincode())
1438            .entries()
1439            .send(&node, TCP.fail_stop().bincode())
1440            .entries()
1441            .sim_output();
1442
1443        flow.sim()
1444            .with_cluster_size(&cluster, 4)
1445            .exhaustive(async || {
1446                out_recv
1447                    .assert_yields_only_unordered(vec![
1448                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1449                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1450                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1451                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1452                    ])
1453                    .await
1454            });
1455    }
1456
1457    #[cfg(feature = "sim")]
1458    #[test]
1459    fn sim_lossy_delayed_forever_o2o() {
1460        use std::collections::HashSet;
1461
1462        use crate::properties::manual_proof;
1463
1464        let mut flow = FlowBuilder::new();
1465        let node = flow.process::<()>();
1466        let node2 = flow.process::<()>();
1467
1468        let received = node
1469            .source_iter(q!(0..3_u32))
1470            .send(&node2, TCP.lossy_delayed_forever().bincode())
1471            .fold(
1472                q!(|| std::collections::HashSet::<u32>::new()),
1473                q!(
1474                    |set, v| {
1475                        set.insert(v);
1476                    },
1477                    commutative = manual_proof!(/** set insert is commutative */)
1478                ),
1479            );
1480
1481        let out_recv = sliced! {
1482            let snapshot = use(received, nondet!(/** test */));
1483            snapshot.into_stream()
1484        }
1485        .sim_output();
1486
1487        let mut saw_non_contiguous = false;
1488
1489        flow.sim().test_safety_only().exhaustive(async || {
1490            let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1491
1492            // Check each individual snapshot for a non-contiguous subset.
1493            for set in &snapshots {
1494                #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1495                if set.len() >= 2 && set.len() < 3 {
1496                    let min = *set.iter().min().unwrap();
1497                    let max = *set.iter().max().unwrap();
1498                    if set.len() < (max - min + 1) as usize {
1499                        saw_non_contiguous = true;
1500                    }
1501                }
1502            }
1503        });
1504
1505        assert!(
1506            saw_non_contiguous,
1507            "Expected at least one execution with a non-contiguous subset of inputs"
1508        );
1509    }
1510}