hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33 membership.fold(
34 q!(|| false),
35 q!(|present, event| {
36 match event {
37 MembershipEvent::Joined => *present = true,
38 MembershipEvent::Left => *present = false,
39 }
40 }),
41 )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45 let root = get_this_crate();
46
47 if is_demux {
48 parse_quote! {
49 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50 |(id, data)| {
51 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52 }
53 )
54 }
55 } else {
56 parse_quote! {
57 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58 |data| {
59 #root::runtime_support::bincode::serialize(&data).unwrap().into()
60 }
61 )
62 }
63 }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67 serialize_bincode_with_type(is_demux, "e_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71 let root = get_this_crate();
72 if let Some(c_type) = tagged {
73 parse_quote! {
74 |res| {
75 let (id, b) = res.unwrap();
76 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77 }
78 }
79 } else {
80 parse_quote! {
81 |res| {
82 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83 }
84 }
85 }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89 deserialize_bincode_with_type(tagged, "e_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93 #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
94 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95 /// using [`bincode`] to serialize/deserialize messages.
96 ///
97 /// The returned stream captures the elements received at the destination, where values will
98 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101 /// dropped no further messages will be sent.
102 ///
103 /// # Example
104 /// ```rust
105 /// # #[cfg(feature = "deploy")] {
106 /// # use hydro_lang::prelude::*;
107 /// # use futures::StreamExt;
108 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109 /// let p1 = flow.process::<()>();
110 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111 /// let p2 = flow.process::<()>();
112 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113 /// // 1, 2, 3
114 /// # on_p2.send_bincode(&p_out)
115 /// # }, |mut stream| async move {
116 /// # for w in 1..=3 {
117 /// # assert_eq!(stream.next().await, Some(w));
118 /// # }
119 /// # }));
120 /// # }
121 /// ```
122 pub fn send_bincode<L2>(
123 self,
124 other: &Process<'a, L2>,
125 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126 where
127 T: Serialize + DeserializeOwned,
128 {
129 self.send(other, TCP.bincode())
130 }
131
132 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133 /// using the configuration in `via` to set up the message transport.
134 ///
135 /// The returned stream captures the elements received at the destination, where values will
136 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139 /// dropped no further messages will be sent.
140 ///
141 /// # Example
142 /// ```rust
143 /// # #[cfg(feature = "deploy")] {
144 /// # use hydro_lang::prelude::*;
145 /// # use futures::StreamExt;
146 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147 /// let p1 = flow.process::<()>();
148 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149 /// let p2 = flow.process::<()>();
150 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.bincode());
151 /// // 1, 2, 3
152 /// # on_p2.send(&p_out, TCP.bincode())
153 /// # }, |mut stream| async move {
154 /// # for w in 1..=3 {
155 /// # assert_eq!(stream.next().await, Some(w));
156 /// # }
157 /// # }));
158 /// # }
159 /// ```
160 pub fn send<L2, N: NetworkFor<T>>(
161 self,
162 to: &Process<'a, L2>,
163 via: N,
164 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
165 where
166 T: Serialize + DeserializeOwned,
167 {
168 let serialize_pipeline = Some(N::serialize_thunk(false));
169 let deserialize_pipeline = Some(N::deserialize_thunk(None));
170
171 let name = via.name();
172 if to.multiversioned() && name.is_none() {
173 panic!(
174 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
175 );
176 }
177
178 Stream::new(
179 to.clone(),
180 HydroNode::Network {
181 name: name.map(ToOwned::to_owned),
182 serialize_fn: serialize_pipeline.map(|e| e.into()),
183 instantiate_fn: DebugInstantiate::Building,
184 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
185 input: Box::new(self.ir_node.into_inner()),
186 metadata: to.new_node_metadata(
187 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
188 ),
189 },
190 )
191 }
192
193 #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
194 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
195 /// using [`bincode`] to serialize/deserialize messages.
196 ///
197 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
198 /// membership information. This is a common pattern in distributed systems for broadcasting data to
199 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
200 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
201 /// each element to all cluster members.
202 ///
203 /// # Non-Determinism
204 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
205 /// to the current cluster members _at that point in time_. Depending on when we are notified of
206 /// membership changes, we will broadcast each element to different members.
207 ///
208 /// # Example
209 /// ```rust
210 /// # #[cfg(feature = "deploy")] {
211 /// # use hydro_lang::prelude::*;
212 /// # use futures::StreamExt;
213 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
214 /// let p1 = flow.process::<()>();
215 /// let workers: Cluster<()> = flow.cluster::<()>();
216 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
217 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
218 /// # on_worker.send_bincode(&p2).entries()
219 /// // if there are 4 members in the cluster, each receives one element
220 /// // - MemberId::<()>(0): [123]
221 /// // - MemberId::<()>(1): [123]
222 /// // - MemberId::<()>(2): [123]
223 /// // - MemberId::<()>(3): [123]
224 /// # }, |mut stream| async move {
225 /// # let mut results = Vec::new();
226 /// # for w in 0..4 {
227 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
228 /// # }
229 /// # results.sort();
230 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
231 /// # }));
232 /// # }
233 /// ```
234 pub fn broadcast_bincode<L2: 'a>(
235 self,
236 other: &Cluster<'a, L2>,
237 nondet_membership: NonDet,
238 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
239 where
240 T: Clone + Serialize + DeserializeOwned,
241 {
242 self.broadcast(other, TCP.bincode(), nondet_membership)
243 }
244
245 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
246 /// using the configuration in `via` to set up the message transport.
247 ///
248 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
249 /// membership information. This is a common pattern in distributed systems for broadcasting data to
250 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
251 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
252 /// each element to all cluster members.
253 ///
254 /// # Non-Determinism
255 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
256 /// to the current cluster members _at that point in time_. Depending on when we are notified of
257 /// membership changes, we will broadcast each element to different members.
258 ///
259 /// # Example
260 /// ```rust
261 /// # #[cfg(feature = "deploy")] {
262 /// # use hydro_lang::prelude::*;
263 /// # use futures::StreamExt;
264 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
265 /// let p1 = flow.process::<()>();
266 /// let workers: Cluster<()> = flow.cluster::<()>();
267 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
268 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
269 /// # on_worker.send(&p2, TCP.bincode()).entries()
270 /// // if there are 4 members in the cluster, each receives one element
271 /// // - MemberId::<()>(0): [123]
272 /// // - MemberId::<()>(1): [123]
273 /// // - MemberId::<()>(2): [123]
274 /// // - MemberId::<()>(3): [123]
275 /// # }, |mut stream| async move {
276 /// # let mut results = Vec::new();
277 /// # for w in 0..4 {
278 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
279 /// # }
280 /// # results.sort();
281 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
282 /// # }));
283 /// # }
284 /// ```
285 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
286 self,
287 to: &Cluster<'a, L2>,
288 via: N,
289 nondet_membership: NonDet,
290 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
291 where
292 T: Clone + Serialize + DeserializeOwned,
293 {
294 let ids = track_membership(self.location.source_cluster_members(to));
295 sliced! {
296 let members_snapshot = use(ids, nondet_membership);
297 let elements = use(self, nondet_membership);
298
299 let current_members = members_snapshot.filter(q!(|b| *b));
300 elements.repeat_with_keys(current_members)
301 }
302 .demux(to, via)
303 }
304
305 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
306 /// serialization. The external process can receive these elements by establishing a TCP
307 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
308 ///
309 /// # Example
310 /// ```rust
311 /// # #[cfg(feature = "deploy")] {
312 /// # use hydro_lang::prelude::*;
313 /// # use futures::StreamExt;
314 /// # tokio_test::block_on(async move {
315 /// let mut flow = FlowBuilder::new();
316 /// let process = flow.process::<()>();
317 /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
318 /// let external = flow.external::<()>();
319 /// let external_handle = numbers.send_bincode_external(&external);
320 ///
321 /// let mut deployment = hydro_deploy::Deployment::new();
322 /// let nodes = flow
323 /// .with_process(&process, deployment.Localhost())
324 /// .with_external(&external, deployment.Localhost())
325 /// .deploy(&mut deployment);
326 ///
327 /// deployment.deploy().await.unwrap();
328 /// // establish the TCP connection
329 /// let mut external_recv_stream = nodes.connect(external_handle).await;
330 /// deployment.start().await.unwrap();
331 ///
332 /// for w in 1..=3 {
333 /// assert_eq!(external_recv_stream.next().await, Some(w));
334 /// }
335 /// # });
336 /// # }
337 /// ```
338 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
339 where
340 T: Serialize + DeserializeOwned,
341 {
342 let serialize_pipeline = Some(serialize_bincode::<T>(false));
343
344 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
345
346 let external_port_id = flow_state_borrow.next_external_port.get_and_increment();
347
348 flow_state_borrow.push_root(HydroRoot::SendExternal {
349 to_external_key: other.key,
350 to_port_id: external_port_id,
351 to_many: false,
352 unpaired: true,
353 serialize_fn: serialize_pipeline.map(|e| e.into()),
354 instantiate_fn: DebugInstantiate::Building,
355 input: Box::new(self.ir_node.into_inner()),
356 op_metadata: HydroIrOpMetadata::new(),
357 });
358
359 ExternalBincodeStream {
360 process_key: other.key,
361 port_id: external_port_id,
362 _phantom: PhantomData,
363 }
364 }
365
366 #[cfg(feature = "sim")]
367 /// Sets up a simulation output port for this stream, allowing test code to receive elements
368 /// sent to this stream during simulation.
369 pub fn sim_output(self) -> SimReceiver<T, O, R>
370 where
371 T: Serialize + DeserializeOwned,
372 {
373 let external_location: External<'a, ()> = External {
374 key: LocationKey::FIRST,
375 flow_state: self.location.flow_state().clone(),
376 _phantom: PhantomData,
377 };
378
379 let external = self.send_bincode_external(&external_location);
380
381 SimReceiver(external.port_id, PhantomData)
382 }
383}
384
385impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
386 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
387{
388 #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
389 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
390 /// using [`bincode`] to serialize/deserialize messages.
391 ///
392 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
393 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
394 /// this API allows precise targeting of specific cluster members rather than broadcasting to
395 /// all members.
396 ///
397 /// # Example
398 /// ```rust
399 /// # #[cfg(feature = "deploy")] {
400 /// # use hydro_lang::prelude::*;
401 /// # use futures::StreamExt;
402 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
403 /// let p1 = flow.process::<()>();
404 /// let workers: Cluster<()> = flow.cluster::<()>();
405 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
406 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
407 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
408 /// .demux_bincode(&workers);
409 /// # on_worker.send_bincode(&p2).entries()
410 /// // if there are 4 members in the cluster, each receives one element
411 /// // - MemberId::<()>(0): [0]
412 /// // - MemberId::<()>(1): [1]
413 /// // - MemberId::<()>(2): [2]
414 /// // - MemberId::<()>(3): [3]
415 /// # }, |mut stream| async move {
416 /// # let mut results = Vec::new();
417 /// # for w in 0..4 {
418 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
419 /// # }
420 /// # results.sort();
421 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
422 /// # }));
423 /// # }
424 /// ```
425 pub fn demux_bincode(
426 self,
427 other: &Cluster<'a, L2>,
428 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
429 where
430 T: Serialize + DeserializeOwned,
431 {
432 self.demux(other, TCP.bincode())
433 }
434
435 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
436 /// using the configuration in `via` to set up the message transport.
437 ///
438 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
439 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
440 /// this API allows precise targeting of specific cluster members rather than broadcasting to
441 /// all members.
442 ///
443 /// # Example
444 /// ```rust
445 /// # #[cfg(feature = "deploy")] {
446 /// # use hydro_lang::prelude::*;
447 /// # use futures::StreamExt;
448 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
449 /// let p1 = flow.process::<()>();
450 /// let workers: Cluster<()> = flow.cluster::<()>();
451 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
452 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
453 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
454 /// .demux(&workers, TCP.bincode());
455 /// # on_worker.send(&p2, TCP.bincode()).entries()
456 /// // if there are 4 members in the cluster, each receives one element
457 /// // - MemberId::<()>(0): [0]
458 /// // - MemberId::<()>(1): [1]
459 /// // - MemberId::<()>(2): [2]
460 /// // - MemberId::<()>(3): [3]
461 /// # }, |mut stream| async move {
462 /// # let mut results = Vec::new();
463 /// # for w in 0..4 {
464 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
465 /// # }
466 /// # results.sort();
467 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
468 /// # }));
469 /// # }
470 /// ```
471 pub fn demux<N: NetworkFor<T>>(
472 self,
473 to: &Cluster<'a, L2>,
474 via: N,
475 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
476 where
477 T: Serialize + DeserializeOwned,
478 {
479 self.into_keyed().demux(to, via)
480 }
481}
482
483impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
484 #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
485 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
486 /// [`bincode`] to serialize/deserialize messages.
487 ///
488 /// This provides load balancing by evenly distributing work across cluster members. The
489 /// distribution is deterministic based on element order - the first element goes to member 0,
490 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
491 ///
492 /// # Non-Determinism
493 /// The set of cluster members may asynchronously change over time. Each element is distributed
494 /// based on the current cluster membership _at that point in time_. Depending on when cluster
495 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
496 /// membership is stable, the order of members in the round-robin pattern may change across runs.
497 ///
498 /// # Ordering Requirements
499 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
500 /// order of messages and retries affects the round-robin pattern.
501 ///
502 /// # Example
503 /// ```rust
504 /// # #[cfg(feature = "deploy")] {
505 /// # use hydro_lang::prelude::*;
506 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
507 /// # use futures::StreamExt;
508 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
509 /// let p1 = flow.process::<()>();
510 /// let workers: Cluster<()> = flow.cluster::<()>();
511 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
512 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
513 /// on_worker.send_bincode(&p2)
514 /// # .first().values() // we use first to assert that each member gets one element
515 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
516 /// // - MemberId::<()>(?): [1]
517 /// // - MemberId::<()>(?): [2]
518 /// // - MemberId::<()>(?): [3]
519 /// // - MemberId::<()>(?): [4]
520 /// # }, |mut stream| async move {
521 /// # let mut results = Vec::new();
522 /// # for w in 0..4 {
523 /// # results.push(stream.next().await.unwrap());
524 /// # }
525 /// # results.sort();
526 /// # assert_eq!(results, vec![1, 2, 3, 4]);
527 /// # }));
528 /// # }
529 /// ```
530 pub fn round_robin_bincode<L2: 'a>(
531 self,
532 other: &Cluster<'a, L2>,
533 nondet_membership: NonDet,
534 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
535 where
536 T: Serialize + DeserializeOwned,
537 {
538 self.round_robin(other, TCP.bincode(), nondet_membership)
539 }
540
541 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
542 /// the configuration in `via` to set up the message transport.
543 ///
544 /// This provides load balancing by evenly distributing work across cluster members. The
545 /// distribution is deterministic based on element order - the first element goes to member 0,
546 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
547 ///
548 /// # Non-Determinism
549 /// The set of cluster members may asynchronously change over time. Each element is distributed
550 /// based on the current cluster membership _at that point in time_. Depending on when cluster
551 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
552 /// membership is stable, the order of members in the round-robin pattern may change across runs.
553 ///
554 /// # Ordering Requirements
555 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
556 /// order of messages and retries affects the round-robin pattern.
557 ///
558 /// # Example
559 /// ```rust
560 /// # #[cfg(feature = "deploy")] {
561 /// # use hydro_lang::prelude::*;
562 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
563 /// # use futures::StreamExt;
564 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
565 /// let p1 = flow.process::<()>();
566 /// let workers: Cluster<()> = flow.cluster::<()>();
567 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
568 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
569 /// on_worker.send(&p2, TCP.bincode())
570 /// # .first().values() // we use first to assert that each member gets one element
571 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
572 /// // - MemberId::<()>(?): [1]
573 /// // - MemberId::<()>(?): [2]
574 /// // - MemberId::<()>(?): [3]
575 /// // - MemberId::<()>(?): [4]
576 /// # }, |mut stream| async move {
577 /// # let mut results = Vec::new();
578 /// # for w in 0..4 {
579 /// # results.push(stream.next().await.unwrap());
580 /// # }
581 /// # results.sort();
582 /// # assert_eq!(results, vec![1, 2, 3, 4]);
583 /// # }));
584 /// # }
585 /// ```
586 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
587 self,
588 to: &Cluster<'a, L2>,
589 via: N,
590 nondet_membership: NonDet,
591 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
592 where
593 T: Serialize + DeserializeOwned,
594 {
595 let ids = track_membership(self.location.source_cluster_members(to));
596 sliced! {
597 let members_snapshot = use(ids, nondet_membership);
598 let elements = use(self.enumerate(), nondet_membership);
599
600 let current_members = members_snapshot
601 .filter(q!(|b| *b))
602 .keys()
603 .assume_ordering(nondet_membership)
604 .collect_vec();
605
606 elements
607 .cross_singleton(current_members)
608 .map(q!(|(data, members)| (
609 members[data.0 % members.len()].clone(),
610 data.1
611 )))
612 }
613 .demux(to, via)
614 }
615}
616
617impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
618 #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
619 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
620 /// [`bincode`] to serialize/deserialize messages.
621 ///
622 /// This provides load balancing by evenly distributing work across cluster members. The
623 /// distribution is deterministic based on element order - the first element goes to member 0,
624 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
625 ///
626 /// # Non-Determinism
627 /// The set of cluster members may asynchronously change over time. Each element is distributed
628 /// based on the current cluster membership _at that point in time_. Depending on when cluster
629 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
630 /// membership is stable, the order of members in the round-robin pattern may change across runs.
631 ///
632 /// # Ordering Requirements
633 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
634 /// order of messages and retries affects the round-robin pattern.
635 ///
636 /// # Example
637 /// ```rust
638 /// # #[cfg(feature = "deploy")] {
639 /// # use hydro_lang::prelude::*;
640 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
641 /// # use hydro_lang::location::MemberId;
642 /// # use futures::StreamExt;
643 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
644 /// let p1 = flow.process::<()>();
645 /// let workers1: Cluster<()> = flow.cluster::<()>();
646 /// let workers2: Cluster<()> = flow.cluster::<()>();
647 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
648 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
649 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
650 /// on_worker2.send_bincode(&p2)
651 /// # .entries()
652 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
653 /// # }, |mut stream| async move {
654 /// # let mut results = Vec::new();
655 /// # let mut locations = std::collections::HashSet::new();
656 /// # for w in 0..=16 {
657 /// # let (location, v) = stream.next().await.unwrap();
658 /// # locations.insert(location);
659 /// # results.push(v);
660 /// # }
661 /// # results.sort();
662 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
663 /// # assert_eq!(locations.len(), 16);
664 /// # }));
665 /// # }
666 /// ```
667 pub fn round_robin_bincode<L2: 'a>(
668 self,
669 other: &Cluster<'a, L2>,
670 nondet_membership: NonDet,
671 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
672 where
673 T: Serialize + DeserializeOwned,
674 {
675 self.round_robin(other, TCP.bincode(), nondet_membership)
676 }
677
678 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
679 /// the configuration in `via` to set up the message transport.
680 ///
681 /// This provides load balancing by evenly distributing work across cluster members. The
682 /// distribution is deterministic based on element order - the first element goes to member 0,
683 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
684 ///
685 /// # Non-Determinism
686 /// The set of cluster members may asynchronously change over time. Each element is distributed
687 /// based on the current cluster membership _at that point in time_. Depending on when cluster
688 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
689 /// membership is stable, the order of members in the round-robin pattern may change across runs.
690 ///
691 /// # Ordering Requirements
692 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
693 /// order of messages and retries affects the round-robin pattern.
694 ///
695 /// # Example
696 /// ```rust
697 /// # #[cfg(feature = "deploy")] {
698 /// # use hydro_lang::prelude::*;
699 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
700 /// # use hydro_lang::location::MemberId;
701 /// # use futures::StreamExt;
702 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
703 /// let p1 = flow.process::<()>();
704 /// let workers1: Cluster<()> = flow.cluster::<()>();
705 /// let workers2: Cluster<()> = flow.cluster::<()>();
706 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
707 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.bincode(), nondet!(/** assuming stable membership */));
708 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
709 /// on_worker2.send(&p2, TCP.bincode())
710 /// # .entries()
711 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
712 /// # }, |mut stream| async move {
713 /// # let mut results = Vec::new();
714 /// # let mut locations = std::collections::HashSet::new();
715 /// # for w in 0..=16 {
716 /// # let (location, v) = stream.next().await.unwrap();
717 /// # locations.insert(location);
718 /// # results.push(v);
719 /// # }
720 /// # results.sort();
721 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
722 /// # assert_eq!(locations.len(), 16);
723 /// # }));
724 /// # }
725 /// ```
726 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
727 self,
728 to: &Cluster<'a, L2>,
729 via: N,
730 nondet_membership: NonDet,
731 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
732 where
733 T: Serialize + DeserializeOwned,
734 {
735 let ids = track_membership(self.location.source_cluster_members(to));
736 sliced! {
737 let members_snapshot = use(ids, nondet_membership);
738 let elements = use(self.enumerate(), nondet_membership);
739
740 let current_members = members_snapshot
741 .filter(q!(|b| *b))
742 .keys()
743 .assume_ordering(nondet_membership)
744 .collect_vec();
745
746 elements
747 .cross_singleton(current_members)
748 .map(q!(|(data, members)| (
749 members[data.0 % members.len()].clone(),
750 data.1
751 )))
752 }
753 .demux(to, via)
754 }
755}
756
757impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
758 #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
759 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
760 /// using [`bincode`] to serialize/deserialize messages.
761 ///
762 /// Each cluster member sends its local stream elements, and they are collected at the destination
763 /// as a [`KeyedStream`] where keys identify the source cluster member.
764 ///
765 /// # Example
766 /// ```rust
767 /// # #[cfg(feature = "deploy")] {
768 /// # use hydro_lang::prelude::*;
769 /// # use futures::StreamExt;
770 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
771 /// let workers: Cluster<()> = flow.cluster::<()>();
772 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
773 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
774 /// # all_received.entries()
775 /// # }, |mut stream| async move {
776 /// // if there are 4 members in the cluster, we should receive 4 elements
777 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
778 /// # let mut results = Vec::new();
779 /// # for w in 0..4 {
780 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
781 /// # }
782 /// # results.sort();
783 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
784 /// # }));
785 /// # }
786 /// ```
787 ///
788 /// If you don't need to know the source for each element, you can use `.values()`
789 /// to get just the data:
790 /// ```rust
791 /// # #[cfg(feature = "deploy")] {
792 /// # use hydro_lang::prelude::*;
793 /// # use hydro_lang::live_collections::stream::NoOrder;
794 /// # use futures::StreamExt;
795 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
796 /// # let workers: Cluster<()> = flow.cluster::<()>();
797 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
798 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
799 /// # values
800 /// # }, |mut stream| async move {
801 /// # let mut results = Vec::new();
802 /// # for w in 0..4 {
803 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
804 /// # }
805 /// # results.sort();
806 /// // if there are 4 members in the cluster, we should receive 4 elements
807 /// // 1, 1, 1, 1
808 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
809 /// # }));
810 /// # }
811 /// ```
812 pub fn send_bincode<L2>(
813 self,
814 other: &Process<'a, L2>,
815 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
816 where
817 T: Serialize + DeserializeOwned,
818 {
819 self.send(other, TCP.bincode())
820 }
821
822 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
823 /// using the configuration in `via` to set up the message transport.
824 ///
825 /// Each cluster member sends its local stream elements, and they are collected at the destination
826 /// as a [`KeyedStream`] where keys identify the source cluster member.
827 ///
828 /// # Example
829 /// ```rust
830 /// # #[cfg(feature = "deploy")] {
831 /// # use hydro_lang::prelude::*;
832 /// # use futures::StreamExt;
833 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
834 /// let workers: Cluster<()> = flow.cluster::<()>();
835 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
836 /// let all_received = numbers.send(&process, TCP.bincode()); // KeyedStream<MemberId<()>, i32, ...>
837 /// # all_received.entries()
838 /// # }, |mut stream| async move {
839 /// // if there are 4 members in the cluster, we should receive 4 elements
840 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
841 /// # let mut results = Vec::new();
842 /// # for w in 0..4 {
843 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
844 /// # }
845 /// # results.sort();
846 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
847 /// # }));
848 /// # }
849 /// ```
850 ///
851 /// If you don't need to know the source for each element, you can use `.values()`
852 /// to get just the data:
853 /// ```rust
854 /// # #[cfg(feature = "deploy")] {
855 /// # use hydro_lang::prelude::*;
856 /// # use hydro_lang::live_collections::stream::NoOrder;
857 /// # use futures::StreamExt;
858 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
859 /// # let workers: Cluster<()> = flow.cluster::<()>();
860 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
861 /// let values: Stream<i32, _, _, NoOrder> = numbers.send(&process, TCP.bincode()).values();
862 /// # values
863 /// # }, |mut stream| async move {
864 /// # let mut results = Vec::new();
865 /// # for w in 0..4 {
866 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
867 /// # }
868 /// # results.sort();
869 /// // if there are 4 members in the cluster, we should receive 4 elements
870 /// // 1, 1, 1, 1
871 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
872 /// # }));
873 /// # }
874 /// ```
875 pub fn send<L2, N: NetworkFor<T>>(
876 self,
877 to: &Process<'a, L2>,
878 via: N,
879 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
880 where
881 T: Serialize + DeserializeOwned,
882 {
883 let serialize_pipeline = Some(N::serialize_thunk(false));
884
885 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
886
887 let name = via.name();
888 if to.multiversioned() && name.is_none() {
889 panic!(
890 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
891 );
892 }
893
894 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
895 to.clone(),
896 HydroNode::Network {
897 name: name.map(ToOwned::to_owned),
898 serialize_fn: serialize_pipeline.map(|e| e.into()),
899 instantiate_fn: DebugInstantiate::Building,
900 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
901 input: Box::new(self.ir_node.into_inner()),
902 metadata: to.new_node_metadata(Stream::<
903 (MemberId<L>, T),
904 Process<'a, L2>,
905 Unbounded,
906 O,
907 R,
908 >::collection_kind()),
909 },
910 );
911
912 raw_stream.into_keyed()
913 }
914
915 #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
916 /// Broadcasts elements of this stream at each source member to all members of a destination
917 /// cluster, using [`bincode`] to serialize/deserialize messages.
918 ///
919 /// Each source member sends each of its stream elements to **every** member of the cluster
920 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
921 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
922 /// **only data elements** and sends each element to all cluster members.
923 ///
924 /// # Non-Determinism
925 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
926 /// to the current cluster members known _at that point in time_ at the source member. Depending
927 /// on when each source member is notified of membership changes, it will broadcast each element
928 /// to different members.
929 ///
930 /// # Example
931 /// ```rust
932 /// # #[cfg(feature = "deploy")] {
933 /// # use hydro_lang::prelude::*;
934 /// # use hydro_lang::location::MemberId;
935 /// # use futures::StreamExt;
936 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
937 /// # type Source = ();
938 /// # type Destination = ();
939 /// let source: Cluster<Source> = flow.cluster::<Source>();
940 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
941 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
942 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
943 /// # on_destination.entries().send_bincode(&p2).entries()
944 /// // if there are 4 members in the desination, each receives one element from each source member
945 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
946 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
947 /// // - ...
948 /// # }, |mut stream| async move {
949 /// # let mut results = Vec::new();
950 /// # for w in 0..16 {
951 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
952 /// # }
953 /// # results.sort();
954 /// # assert_eq!(results, vec![
955 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
956 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
957 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
958 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
959 /// # ]);
960 /// # }));
961 /// # }
962 /// ```
963 pub fn broadcast_bincode<L2: 'a>(
964 self,
965 other: &Cluster<'a, L2>,
966 nondet_membership: NonDet,
967 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
968 where
969 T: Clone + Serialize + DeserializeOwned,
970 {
971 self.broadcast(other, TCP.bincode(), nondet_membership)
972 }
973
974 /// Broadcasts elements of this stream at each source member to all members of a destination
975 /// cluster, using the configuration in `via` to set up the message transport.
976 ///
977 /// Each source member sends each of its stream elements to **every** member of the cluster
978 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
979 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
980 /// **only data elements** and sends each element to all cluster members.
981 ///
982 /// # Non-Determinism
983 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
984 /// to the current cluster members known _at that point in time_ at the source member. Depending
985 /// on when each source member is notified of membership changes, it will broadcast each element
986 /// to different members.
987 ///
988 /// # Example
989 /// ```rust
990 /// # #[cfg(feature = "deploy")] {
991 /// # use hydro_lang::prelude::*;
992 /// # use hydro_lang::location::MemberId;
993 /// # use futures::StreamExt;
994 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
995 /// # type Source = ();
996 /// # type Destination = ();
997 /// let source: Cluster<Source> = flow.cluster::<Source>();
998 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
999 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1000 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.bincode(), nondet!(/** assuming stable membership */));
1001 /// # on_destination.entries().send(&p2, TCP.bincode()).entries()
1002 /// // if there are 4 members in the desination, each receives one element from each source member
1003 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1004 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1005 /// // - ...
1006 /// # }, |mut stream| async move {
1007 /// # let mut results = Vec::new();
1008 /// # for w in 0..16 {
1009 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1010 /// # }
1011 /// # results.sort();
1012 /// # assert_eq!(results, vec![
1013 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1014 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1015 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1016 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1017 /// # ]);
1018 /// # }));
1019 /// # }
1020 /// ```
1021 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1022 self,
1023 to: &Cluster<'a, L2>,
1024 via: N,
1025 nondet_membership: NonDet,
1026 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1027 where
1028 T: Clone + Serialize + DeserializeOwned,
1029 {
1030 let ids = track_membership(self.location.source_cluster_members(to));
1031 sliced! {
1032 let members_snapshot = use(ids, nondet_membership);
1033 let elements = use(self, nondet_membership);
1034
1035 let current_members = members_snapshot.filter(q!(|b| *b));
1036 elements.repeat_with_keys(current_members)
1037 }
1038 .demux(to, via)
1039 }
1040}
1041
1042impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1043 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1044{
1045 #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
1046 /// Sends elements of this stream at each source member to specific members of a destination
1047 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1048 ///
1049 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1050 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1051 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1052 /// all members.
1053 ///
1054 /// Each cluster member sends its local stream elements, and they are collected at each
1055 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1056 ///
1057 /// # Example
1058 /// ```rust
1059 /// # #[cfg(feature = "deploy")] {
1060 /// # use hydro_lang::prelude::*;
1061 /// # use futures::StreamExt;
1062 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1063 /// # type Source = ();
1064 /// # type Destination = ();
1065 /// let source: Cluster<Source> = flow.cluster::<Source>();
1066 /// let to_send: Stream<_, Cluster<_>, _> = source
1067 /// .source_iter(q!(vec![0, 1, 2, 3]))
1068 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1069 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1070 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1071 /// # all_received.entries().send_bincode(&p2).entries()
1072 /// # }, |mut stream| async move {
1073 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1074 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1075 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1076 /// // - ...
1077 /// # let mut results = Vec::new();
1078 /// # for w in 0..16 {
1079 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1080 /// # }
1081 /// # results.sort();
1082 /// # assert_eq!(results, vec![
1083 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1084 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1085 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1086 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1087 /// # ]);
1088 /// # }));
1089 /// # }
1090 /// ```
1091 pub fn demux_bincode(
1092 self,
1093 other: &Cluster<'a, L2>,
1094 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1095 where
1096 T: Serialize + DeserializeOwned,
1097 {
1098 self.demux(other, TCP.bincode())
1099 }
1100
1101 /// Sends elements of this stream at each source member to specific members of a destination
1102 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1103 /// message transport.
1104 ///
1105 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1106 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1107 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1108 /// all members.
1109 ///
1110 /// Each cluster member sends its local stream elements, and they are collected at each
1111 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1112 ///
1113 /// # Example
1114 /// ```rust
1115 /// # #[cfg(feature = "deploy")] {
1116 /// # use hydro_lang::prelude::*;
1117 /// # use futures::StreamExt;
1118 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1119 /// # type Source = ();
1120 /// # type Destination = ();
1121 /// let source: Cluster<Source> = flow.cluster::<Source>();
1122 /// let to_send: Stream<_, Cluster<_>, _> = source
1123 /// .source_iter(q!(vec![0, 1, 2, 3]))
1124 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1125 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1126 /// let all_received = to_send.demux(&destination, TCP.bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1127 /// # all_received.entries().send(&p2, TCP.bincode()).entries()
1128 /// # }, |mut stream| async move {
1129 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1130 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1131 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1132 /// // - ...
1133 /// # let mut results = Vec::new();
1134 /// # for w in 0..16 {
1135 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1136 /// # }
1137 /// # results.sort();
1138 /// # assert_eq!(results, vec![
1139 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1140 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1141 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1142 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1143 /// # ]);
1144 /// # }));
1145 /// # }
1146 /// ```
1147 pub fn demux<N: NetworkFor<T>>(
1148 self,
1149 to: &Cluster<'a, L2>,
1150 via: N,
1151 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1152 where
1153 T: Serialize + DeserializeOwned,
1154 {
1155 self.into_keyed().demux(to, via)
1156 }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161 #[cfg(feature = "sim")]
1162 use stageleft::q;
1163
1164 #[cfg(feature = "sim")]
1165 use crate::location::{Location, MemberId};
1166 #[cfg(feature = "sim")]
1167 use crate::networking::TCP;
1168 #[cfg(feature = "sim")]
1169 use crate::nondet::nondet;
1170 #[cfg(feature = "sim")]
1171 use crate::prelude::FlowBuilder;
1172
1173 #[cfg(feature = "sim")]
1174 #[test]
1175 fn sim_send_bincode_o2o() {
1176 use crate::networking::TCP;
1177
1178 let mut flow = FlowBuilder::new();
1179 let node = flow.process::<()>();
1180 let node2 = flow.process::<()>();
1181
1182 let (in_send, input) = node.sim_input();
1183
1184 let out_recv = input
1185 .send(&node2, TCP.bincode())
1186 .batch(&node2.tick(), nondet!(/** test */))
1187 .count()
1188 .all_ticks()
1189 .sim_output();
1190
1191 let instances = flow.sim().exhaustive(async || {
1192 in_send.send(());
1193 in_send.send(());
1194 in_send.send(());
1195
1196 let received = out_recv.collect::<Vec<_>>().await;
1197 assert!(received.into_iter().sum::<usize>() == 3);
1198 });
1199
1200 assert_eq!(instances, 4); // 2^{3 - 1}
1201 }
1202
1203 #[cfg(feature = "sim")]
1204 #[test]
1205 fn sim_send_bincode_m2o() {
1206 let mut flow = FlowBuilder::new();
1207 let cluster = flow.cluster::<()>();
1208 let node = flow.process::<()>();
1209
1210 let input = cluster.source_iter(q!(vec![1]));
1211
1212 let out_recv = input
1213 .send(&node, TCP.bincode())
1214 .entries()
1215 .batch(&node.tick(), nondet!(/** test */))
1216 .all_ticks()
1217 .sim_output();
1218
1219 let instances = flow
1220 .sim()
1221 .with_cluster_size(&cluster, 4)
1222 .exhaustive(async || {
1223 out_recv
1224 .assert_yields_only_unordered(vec![
1225 (MemberId::from_raw_id(0), 1),
1226 (MemberId::from_raw_id(1), 1),
1227 (MemberId::from_raw_id(2), 1),
1228 (MemberId::from_raw_id(3), 1),
1229 ])
1230 .await
1231 });
1232
1233 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1234 }
1235
1236 #[cfg(feature = "sim")]
1237 #[test]
1238 fn sim_send_bincode_multiple_m2o() {
1239 let mut flow = FlowBuilder::new();
1240 let cluster1 = flow.cluster::<()>();
1241 let cluster2 = flow.cluster::<()>();
1242 let node = flow.process::<()>();
1243
1244 let out_recv_1 = cluster1
1245 .source_iter(q!(vec![1]))
1246 .send(&node, TCP.bincode())
1247 .entries()
1248 .sim_output();
1249
1250 let out_recv_2 = cluster2
1251 .source_iter(q!(vec![2]))
1252 .send(&node, TCP.bincode())
1253 .entries()
1254 .sim_output();
1255
1256 let instances = flow
1257 .sim()
1258 .with_cluster_size(&cluster1, 3)
1259 .with_cluster_size(&cluster2, 4)
1260 .exhaustive(async || {
1261 out_recv_1
1262 .assert_yields_only_unordered(vec![
1263 (MemberId::from_raw_id(0), 1),
1264 (MemberId::from_raw_id(1), 1),
1265 (MemberId::from_raw_id(2), 1),
1266 ])
1267 .await;
1268
1269 out_recv_2
1270 .assert_yields_only_unordered(vec![
1271 (MemberId::from_raw_id(0), 2),
1272 (MemberId::from_raw_id(1), 2),
1273 (MemberId::from_raw_id(2), 2),
1274 (MemberId::from_raw_id(3), 2),
1275 ])
1276 .await;
1277 });
1278
1279 assert_eq!(instances, 1);
1280 }
1281
1282 #[cfg(feature = "sim")]
1283 #[test]
1284 fn sim_send_bincode_o2m() {
1285 let mut flow = FlowBuilder::new();
1286 let cluster = flow.cluster::<()>();
1287 let node = flow.process::<()>();
1288
1289 let input = node.source_iter(q!(vec![
1290 (MemberId::from_raw_id(0), 123),
1291 (MemberId::from_raw_id(1), 456),
1292 ]));
1293
1294 let out_recv = input
1295 .demux(&cluster, TCP.bincode())
1296 .map(q!(|x| x + 1))
1297 .send(&node, TCP.bincode())
1298 .entries()
1299 .sim_output();
1300
1301 flow.sim()
1302 .with_cluster_size(&cluster, 4)
1303 .exhaustive(async || {
1304 out_recv
1305 .assert_yields_only_unordered(vec![
1306 (MemberId::from_raw_id(0), 124),
1307 (MemberId::from_raw_id(1), 457),
1308 ])
1309 .await
1310 });
1311 }
1312
1313 #[cfg(feature = "sim")]
1314 #[test]
1315 fn sim_broadcast_bincode_o2m() {
1316 let mut flow = FlowBuilder::new();
1317 let cluster = flow.cluster::<()>();
1318 let node = flow.process::<()>();
1319
1320 let input = node.source_iter(q!(vec![123, 456]));
1321
1322 let out_recv = input
1323 .broadcast(&cluster, TCP.bincode(), nondet!(/** test */))
1324 .map(q!(|x| x + 1))
1325 .send(&node, TCP.bincode())
1326 .entries()
1327 .sim_output();
1328
1329 let mut c_1_produced = false;
1330 let mut c_2_produced = false;
1331
1332 flow.sim()
1333 .with_cluster_size(&cluster, 2)
1334 .exhaustive(async || {
1335 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1336
1337 // check that order is preserved
1338 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1339 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1340 c_1_produced = true;
1341 }
1342
1343 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1344 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1345 c_2_produced = true;
1346 }
1347 });
1348
1349 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1350 }
1351
1352 #[cfg(feature = "sim")]
1353 #[test]
1354 fn sim_send_bincode_m2m() {
1355 let mut flow = FlowBuilder::new();
1356 let cluster = flow.cluster::<()>();
1357 let node = flow.process::<()>();
1358
1359 let input = node.source_iter(q!(vec![
1360 (MemberId::from_raw_id(0), 123),
1361 (MemberId::from_raw_id(1), 456),
1362 ]));
1363
1364 let out_recv = input
1365 .demux(&cluster, TCP.bincode())
1366 .map(q!(|x| x + 1))
1367 .flat_map_ordered(q!(|x| vec![
1368 (MemberId::from_raw_id(0), x),
1369 (MemberId::from_raw_id(1), x),
1370 ]))
1371 .demux(&cluster, TCP.bincode())
1372 .entries()
1373 .send(&node, TCP.bincode())
1374 .entries()
1375 .sim_output();
1376
1377 flow.sim()
1378 .with_cluster_size(&cluster, 4)
1379 .exhaustive(async || {
1380 out_recv
1381 .assert_yields_only_unordered(vec![
1382 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1383 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1384 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1385 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1386 ])
1387 .await
1388 });
1389 }
1390}