hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::DynLocation;
19use crate::location::external_process::ExternalBincodeStream;
20use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21use crate::nondet::NonDet;
22#[cfg(feature = "sim")]
23use crate::sim::SimReceiver;
24use crate::staging_util::get_this_crate;
25
26// same as the one in `hydro_std`, but internal use only
27fn track_membership<'a, C, L: Location<'a> + NoTick>(
28 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
29) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
30 membership.fold(
31 q!(|| false),
32 q!(|present, event| {
33 match event {
34 MembershipEvent::Joined => *present = true,
35 MembershipEvent::Left => *present = false,
36 }
37 }),
38 )
39}
40
41fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
42 let root = get_this_crate();
43
44 if is_demux {
45 parse_quote! {
46 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
47 |(id, data)| {
48 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
49 }
50 )
51 }
52 } else {
53 parse_quote! {
54 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
55 |data| {
56 #root::runtime_support::bincode::serialize(&data).unwrap().into()
57 }
58 )
59 }
60 }
61}
62
63pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
64 serialize_bincode_with_type(is_demux, "e_type::<T>())
65}
66
67fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
68 let root = get_this_crate();
69
70 if let Some(c_type) = tagged {
71 parse_quote! {
72 |res| {
73 let (id, b) = res.unwrap();
74 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
75 }
76 }
77 } else {
78 parse_quote! {
79 |res| {
80 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
81 }
82 }
83 }
84}
85
86pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
87 deserialize_bincode_with_type(tagged, "e_type::<T>())
88}
89
90impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
91 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
92 /// using [`bincode`] to serialize/deserialize messages.
93 ///
94 /// The returned stream captures the elements received at the destination, where values will
95 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
96 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
97 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
98 /// dropped no further messages will be sent.
99 ///
100 /// # Example
101 /// ```rust
102 /// # #[cfg(feature = "deploy")] {
103 /// # use hydro_lang::prelude::*;
104 /// # use futures::StreamExt;
105 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
106 /// let p1 = flow.process::<()>();
107 /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
108 /// let p2 = flow.process::<()>();
109 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
110 /// // 1, 2, 3
111 /// # on_p2.send_bincode(&p_out)
112 /// # }, |mut stream| async move {
113 /// # for w in 1..=3 {
114 /// # assert_eq!(stream.next().await, Some(w));
115 /// # }
116 /// # }));
117 /// # }
118 /// ```
119 pub fn send_bincode<L2>(
120 self,
121 other: &Process<'a, L2>,
122 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
123 where
124 T: Serialize + DeserializeOwned,
125 {
126 let serialize_pipeline = Some(serialize_bincode::<T>(false));
127
128 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
129
130 Stream::new(
131 other.clone(),
132 HydroNode::Network {
133 serialize_fn: serialize_pipeline.map(|e| e.into()),
134 instantiate_fn: DebugInstantiate::Building,
135 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
136 input: Box::new(self.ir_node.into_inner()),
137 metadata: other.new_node_metadata(
138 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
139 ),
140 },
141 )
142 }
143
144 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
145 /// using [`bincode`] to serialize/deserialize messages.
146 ///
147 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
148 /// membership information. This is a common pattern in distributed systems for broadcasting data to
149 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
150 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
151 /// each element to all cluster members.
152 ///
153 /// # Non-Determinism
154 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
155 /// to the current cluster members _at that point in time_. Depending on when we are notified of
156 /// membership changes, we will broadcast each element to different members.
157 ///
158 /// # Example
159 /// ```rust
160 /// # #[cfg(feature = "deploy")] {
161 /// # use hydro_lang::prelude::*;
162 /// # use futures::StreamExt;
163 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
164 /// let p1 = flow.process::<()>();
165 /// let workers: Cluster<()> = flow.cluster::<()>();
166 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
167 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
168 /// # on_worker.send_bincode(&p2).entries()
169 /// // if there are 4 members in the cluster, each receives one element
170 /// // - MemberId::<()>(0): [123]
171 /// // - MemberId::<()>(1): [123]
172 /// // - MemberId::<()>(2): [123]
173 /// // - MemberId::<()>(3): [123]
174 /// # }, |mut stream| async move {
175 /// # let mut results = Vec::new();
176 /// # for w in 0..4 {
177 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
178 /// # }
179 /// # results.sort();
180 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
181 /// # }));
182 /// # }
183 /// ```
184 pub fn broadcast_bincode<L2: 'a>(
185 self,
186 other: &Cluster<'a, L2>,
187 nondet_membership: NonDet,
188 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
189 where
190 T: Clone + Serialize + DeserializeOwned,
191 {
192 let ids = track_membership(self.location.source_cluster_members(other));
193 sliced! {
194 let members_snapshot = use(ids, nondet_membership);
195 let elements = use(self, nondet_membership);
196
197 let current_members = members_snapshot.filter(q!(|b| *b));
198 elements.repeat_with_keys(current_members)
199 }
200 .demux_bincode(other)
201 }
202
203 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
204 /// serialization. The external process can receive these elements by establishing a TCP
205 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
206 ///
207 /// # Example
208 /// ```rust
209 /// # #[cfg(feature = "deploy")] {
210 /// # use hydro_lang::prelude::*;
211 /// # use futures::StreamExt;
212 /// # tokio_test::block_on(async move {
213 /// let flow = FlowBuilder::new();
214 /// let process = flow.process::<()>();
215 /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
216 /// let external = flow.external::<()>();
217 /// let external_handle = numbers.send_bincode_external(&external);
218 ///
219 /// let mut deployment = hydro_deploy::Deployment::new();
220 /// let nodes = flow
221 /// .with_process(&process, deployment.Localhost())
222 /// .with_external(&external, deployment.Localhost())
223 /// .deploy(&mut deployment);
224 ///
225 /// deployment.deploy().await.unwrap();
226 /// // establish the TCP connection
227 /// let mut external_recv_stream = nodes.connect(external_handle).await;
228 /// deployment.start().await.unwrap();
229 ///
230 /// for w in 1..=3 {
231 /// assert_eq!(external_recv_stream.next().await, Some(w));
232 /// }
233 /// # });
234 /// # }
235 /// ```
236 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
237 where
238 T: Serialize + DeserializeOwned,
239 {
240 let serialize_pipeline = Some(serialize_bincode::<T>(false));
241
242 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
243
244 let external_key = flow_state_borrow.next_external_out;
245 flow_state_borrow.next_external_out += 1;
246
247 flow_state_borrow.push_root(HydroRoot::SendExternal {
248 to_external_id: other.id,
249 to_key: external_key,
250 to_many: false,
251 unpaired: true,
252 serialize_fn: serialize_pipeline.map(|e| e.into()),
253 instantiate_fn: DebugInstantiate::Building,
254 input: Box::new(self.ir_node.into_inner()),
255 op_metadata: HydroIrOpMetadata::new(),
256 });
257
258 ExternalBincodeStream {
259 process_id: other.id,
260 port_id: external_key,
261 _phantom: PhantomData,
262 }
263 }
264
265 #[cfg(feature = "sim")]
266 /// Sets up a simulation output port for this stream, allowing test code to receive elements
267 /// sent to this stream during simulation.
268 pub fn sim_output(self) -> SimReceiver<T, O, R>
269 where
270 T: Serialize + DeserializeOwned,
271 {
272 let external_location: External<'a, ()> = External {
273 id: 0,
274 flow_state: self.location.flow_state().clone(),
275 _phantom: PhantomData,
276 };
277
278 let external = self.send_bincode_external(&external_location);
279
280 SimReceiver(external.port_id, PhantomData)
281 }
282}
283
284impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
285 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
286{
287 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
288 /// using [`bincode`] to serialize/deserialize messages.
289 ///
290 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
291 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
292 /// this API allows precise targeting of specific cluster members rather than broadcasting to
293 /// all members.
294 ///
295 /// # Example
296 /// ```rust
297 /// # #[cfg(feature = "deploy")] {
298 /// # use hydro_lang::prelude::*;
299 /// # use futures::StreamExt;
300 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
301 /// let p1 = flow.process::<()>();
302 /// let workers: Cluster<()> = flow.cluster::<()>();
303 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
304 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
305 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
306 /// .demux_bincode(&workers);
307 /// # on_worker.send_bincode(&p2).entries()
308 /// // if there are 4 members in the cluster, each receives one element
309 /// // - MemberId::<()>(0): [0]
310 /// // - MemberId::<()>(1): [1]
311 /// // - MemberId::<()>(2): [2]
312 /// // - MemberId::<()>(3): [3]
313 /// # }, |mut stream| async move {
314 /// # let mut results = Vec::new();
315 /// # for w in 0..4 {
316 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
317 /// # }
318 /// # results.sort();
319 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
320 /// # }));
321 /// # }
322 /// ```
323 pub fn demux_bincode(
324 self,
325 other: &Cluster<'a, L2>,
326 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
327 where
328 T: Serialize + DeserializeOwned,
329 {
330 self.into_keyed().demux_bincode(other)
331 }
332}
333
334impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
335 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
336 /// [`bincode`] to serialize/deserialize messages.
337 ///
338 /// This provides load balancing by evenly distributing work across cluster members. The
339 /// distribution is deterministic based on element order - the first element goes to member 0,
340 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
341 ///
342 /// # Non-Determinism
343 /// The set of cluster members may asynchronously change over time. Each element is distributed
344 /// based on the current cluster membership _at that point in time_. Depending on when cluster
345 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
346 /// membership is stable, the order of members in the round-robin pattern may change across runs.
347 ///
348 /// # Ordering Requirements
349 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
350 /// order of messages and retries affects the round-robin pattern.
351 ///
352 /// # Example
353 /// ```rust
354 /// # #[cfg(feature = "deploy")] {
355 /// # use hydro_lang::prelude::*;
356 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
357 /// # use futures::StreamExt;
358 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
359 /// let p1 = flow.process::<()>();
360 /// let workers: Cluster<()> = flow.cluster::<()>();
361 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
362 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
363 /// on_worker.send_bincode(&p2)
364 /// # .first().values() // we use first to assert that each member gets one element
365 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
366 /// // - MemberId::<()>(?): [1]
367 /// // - MemberId::<()>(?): [2]
368 /// // - MemberId::<()>(?): [3]
369 /// // - MemberId::<()>(?): [4]
370 /// # }, |mut stream| async move {
371 /// # let mut results = Vec::new();
372 /// # for w in 0..4 {
373 /// # results.push(stream.next().await.unwrap());
374 /// # }
375 /// # results.sort();
376 /// # assert_eq!(results, vec![1, 2, 3, 4]);
377 /// # }));
378 /// # }
379 /// ```
380 pub fn round_robin_bincode<L2: 'a>(
381 self,
382 other: &Cluster<'a, L2>,
383 nondet_membership: NonDet,
384 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
385 where
386 T: Serialize + DeserializeOwned,
387 {
388 let ids = track_membership(self.location.source_cluster_members(other));
389 sliced! {
390 let members_snapshot = use(ids, nondet_membership);
391 let elements = use(self.enumerate(), nondet_membership);
392
393 let current_members = members_snapshot
394 .filter(q!(|b| *b))
395 .keys()
396 .assume_ordering(nondet_membership)
397 .collect_vec();
398
399 elements
400 .cross_singleton(current_members)
401 .map(q!(|(data, members)| (
402 members[data.0 % members.len()].clone(),
403 data.1
404 )))
405 }
406 .demux_bincode(other)
407 }
408}
409
410impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
411 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
412 /// [`bincode`] to serialize/deserialize messages.
413 ///
414 /// This provides load balancing by evenly distributing work across cluster members. The
415 /// distribution is deterministic based on element order - the first element goes to member 0,
416 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
417 ///
418 /// # Non-Determinism
419 /// The set of cluster members may asynchronously change over time. Each element is distributed
420 /// based on the current cluster membership _at that point in time_. Depending on when cluster
421 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
422 /// membership is stable, the order of members in the round-robin pattern may change across runs.
423 ///
424 /// # Ordering Requirements
425 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
426 /// order of messages and retries affects the round-robin pattern.
427 ///
428 /// # Example
429 /// ```rust
430 /// # #[cfg(feature = "deploy")] {
431 /// # use hydro_lang::prelude::*;
432 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
433 /// # use hydro_lang::location::MemberId;
434 /// # use futures::StreamExt;
435 /// # std::thread::spawn(|| {
436 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
437 /// let p1 = flow.process::<()>();
438 /// let workers1: Cluster<()> = flow.cluster::<()>();
439 /// let workers2: Cluster<()> = flow.cluster::<()>();
440 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
441 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
442 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
443 /// on_worker2.send_bincode(&p2)
444 /// # .entries()
445 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
446 /// # }, |mut stream| async move {
447 /// # let mut results = Vec::new();
448 /// # let mut locations = std::collections::HashSet::new();
449 /// # for w in 0..=16 {
450 /// # let (location, v) = stream.next().await.unwrap();
451 /// # locations.insert(location);
452 /// # results.push(v);
453 /// # }
454 /// # results.sort();
455 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
456 /// # assert_eq!(locations.len(), 16);
457 /// # }));
458 /// # }).join().unwrap();
459 /// # }
460 /// ```
461 pub fn round_robin_bincode<L2: 'a>(
462 self,
463 other: &Cluster<'a, L2>,
464 nondet_membership: NonDet,
465 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
466 where
467 T: Serialize + DeserializeOwned,
468 {
469 let ids = track_membership(self.location.source_cluster_members(other));
470 sliced! {
471 let members_snapshot = use(ids, nondet_membership);
472 let elements = use(self.enumerate(), nondet_membership);
473
474 let current_members = members_snapshot
475 .filter(q!(|b| *b))
476 .keys()
477 .assume_ordering(nondet_membership)
478 .collect_vec();
479
480 elements
481 .cross_singleton(current_members)
482 .map(q!(|(data, members)| (
483 members[data.0 % members.len()].clone(),
484 data.1
485 )))
486 }
487 .demux_bincode(other)
488 }
489}
490
491impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
492 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
493 /// using [`bincode`] to serialize/deserialize messages.
494 ///
495 /// Each cluster member sends its local stream elements, and they are collected at the destination
496 /// as a [`KeyedStream`] where keys identify the source cluster member.
497 ///
498 /// # Example
499 /// ```rust
500 /// # #[cfg(feature = "deploy")] {
501 /// # use hydro_lang::prelude::*;
502 /// # use futures::StreamExt;
503 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
504 /// let workers: Cluster<()> = flow.cluster::<()>();
505 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
506 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
507 /// # all_received.entries()
508 /// # }, |mut stream| async move {
509 /// // if there are 4 members in the cluster, we should receive 4 elements
510 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
511 /// # let mut results = Vec::new();
512 /// # for w in 0..4 {
513 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
514 /// # }
515 /// # results.sort();
516 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
517 /// # }));
518 /// # }
519 /// ```
520 ///
521 /// If you don't need to know the source for each element, you can use `.values()`
522 /// to get just the data:
523 /// ```rust
524 /// # #[cfg(feature = "deploy")] {
525 /// # use hydro_lang::prelude::*;
526 /// # use hydro_lang::live_collections::stream::NoOrder;
527 /// # use futures::StreamExt;
528 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
529 /// # let workers: Cluster<()> = flow.cluster::<()>();
530 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
531 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
532 /// # values
533 /// # }, |mut stream| async move {
534 /// # let mut results = Vec::new();
535 /// # for w in 0..4 {
536 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
537 /// # }
538 /// # results.sort();
539 /// // if there are 4 members in the cluster, we should receive 4 elements
540 /// // 1, 1, 1, 1
541 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
542 /// # }));
543 /// # }
544 /// ```
545 pub fn send_bincode<L2>(
546 self,
547 other: &Process<'a, L2>,
548 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
549 where
550 T: Serialize + DeserializeOwned,
551 {
552 let serialize_pipeline = Some(serialize_bincode::<T>(false));
553
554 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
555
556 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
557 other.clone(),
558 HydroNode::Network {
559 serialize_fn: serialize_pipeline.map(|e| e.into()),
560 instantiate_fn: DebugInstantiate::Building,
561 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
562 input: Box::new(self.ir_node.into_inner()),
563 metadata: other.new_node_metadata(Stream::<
564 (MemberId<L>, T),
565 Process<'a, L2>,
566 Unbounded,
567 O,
568 R,
569 >::collection_kind()),
570 },
571 );
572
573 raw_stream.into_keyed()
574 }
575
576 /// Broadcasts elements of this stream at each source member to all members of a destination
577 /// cluster, using [`bincode`] to serialize/deserialize messages.
578 ///
579 /// Each source member sends each of its stream elements to **every** member of the cluster
580 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
581 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
582 /// **only data elements** and sends each element to all cluster members.
583 ///
584 /// # Non-Determinism
585 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
586 /// to the current cluster members known _at that point in time_ at the source member. Depending
587 /// on when each source member is notified of membership changes, it will broadcast each element
588 /// to different members.
589 ///
590 /// # Example
591 /// ```rust
592 /// # #[cfg(feature = "deploy")] {
593 /// # use hydro_lang::prelude::*;
594 /// # use hydro_lang::location::MemberId;
595 /// # use futures::StreamExt;
596 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
597 /// # type Source = ();
598 /// # type Destination = ();
599 /// let source: Cluster<Source> = flow.cluster::<Source>();
600 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
601 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
602 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
603 /// # on_destination.entries().send_bincode(&p2).entries()
604 /// // if there are 4 members in the desination, each receives one element from each source member
605 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
606 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
607 /// // - ...
608 /// # }, |mut stream| async move {
609 /// # let mut results = Vec::new();
610 /// # for w in 0..16 {
611 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
612 /// # }
613 /// # results.sort();
614 /// # assert_eq!(results, vec![
615 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
616 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
617 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
618 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
619 /// # ]);
620 /// # }));
621 /// # }
622 /// ```
623 pub fn broadcast_bincode<L2: 'a>(
624 self,
625 other: &Cluster<'a, L2>,
626 nondet_membership: NonDet,
627 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
628 where
629 T: Clone + Serialize + DeserializeOwned,
630 {
631 let ids = track_membership(self.location.source_cluster_members(other));
632 sliced! {
633 let members_snapshot = use(ids, nondet_membership);
634 let elements = use(self, nondet_membership);
635
636 let current_members = members_snapshot.filter(q!(|b| *b));
637 elements.repeat_with_keys(current_members)
638 }
639 .demux_bincode(other)
640 }
641}
642
643impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
644 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
645{
646 /// Sends elements of this stream at each source member to specific members of a destination
647 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
648 ///
649 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
650 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
651 /// this API allows precise targeting of specific cluster members rather than broadcasting to
652 /// all members.
653 ///
654 /// Each cluster member sends its local stream elements, and they are collected at each
655 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
656 ///
657 /// # Example
658 /// ```rust
659 /// # #[cfg(feature = "deploy")] {
660 /// # use hydro_lang::prelude::*;
661 /// # use futures::StreamExt;
662 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
663 /// # type Source = ();
664 /// # type Destination = ();
665 /// let source: Cluster<Source> = flow.cluster::<Source>();
666 /// let to_send: Stream<_, Cluster<_>, _> = source
667 /// .source_iter(q!(vec![0, 1, 2, 3]))
668 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
669 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
670 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
671 /// # all_received.entries().send_bincode(&p2).entries()
672 /// # }, |mut stream| async move {
673 /// // if there are 4 members in the destination cluster, each receives one message from each source member
674 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
675 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
676 /// // - ...
677 /// # let mut results = Vec::new();
678 /// # for w in 0..16 {
679 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
680 /// # }
681 /// # results.sort();
682 /// # assert_eq!(results, vec![
683 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
684 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
685 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
686 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
687 /// # ]);
688 /// # }));
689 /// # }
690 /// ```
691 pub fn demux_bincode(
692 self,
693 other: &Cluster<'a, L2>,
694 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
695 where
696 T: Serialize + DeserializeOwned,
697 {
698 self.into_keyed().demux_bincode(other)
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 #[cfg(feature = "sim")]
705 use stageleft::q;
706
707 #[cfg(feature = "sim")]
708 use crate::location::{Location, MemberId};
709 #[cfg(feature = "sim")]
710 use crate::nondet::nondet;
711 #[cfg(feature = "sim")]
712 use crate::prelude::FlowBuilder;
713
714 #[cfg(feature = "sim")]
715 #[test]
716 fn sim_send_bincode_o2o() {
717 let flow = FlowBuilder::new();
718 let node = flow.process::<()>();
719 let node2 = flow.process::<()>();
720
721 let (in_send, input) = node.sim_input();
722
723 let out_recv = input
724 .send_bincode(&node2)
725 .batch(&node2.tick(), nondet!(/** test */))
726 .count()
727 .all_ticks()
728 .sim_output();
729
730 let instances = flow.sim().exhaustive(async || {
731 in_send.send(());
732 in_send.send(());
733 in_send.send(());
734
735 let received = out_recv.collect::<Vec<_>>().await;
736 assert!(received.into_iter().sum::<usize>() == 3);
737 });
738
739 assert_eq!(instances, 4); // 2^{3 - 1}
740 }
741
742 #[cfg(feature = "sim")]
743 #[test]
744 fn sim_send_bincode_m2o() {
745 let flow = FlowBuilder::new();
746 let cluster = flow.cluster::<()>();
747 let node = flow.process::<()>();
748
749 let input = cluster.source_iter(q!(vec![1]));
750
751 let out_recv = input
752 .send_bincode(&node)
753 .entries()
754 .batch(&node.tick(), nondet!(/** test */))
755 .all_ticks()
756 .sim_output();
757
758 let instances = flow
759 .sim()
760 .with_cluster_size(&cluster, 4)
761 .exhaustive(async || {
762 out_recv
763 .assert_yields_only_unordered(vec![
764 (MemberId::from_raw_id(0), 1),
765 (MemberId::from_raw_id(1), 1),
766 (MemberId::from_raw_id(2), 1),
767 (MemberId::from_raw_id(3), 1),
768 ])
769 .await
770 });
771
772 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
773 }
774
775 #[cfg(feature = "sim")]
776 #[test]
777 fn sim_send_bincode_multiple_m2o() {
778 let flow = FlowBuilder::new();
779 let cluster1 = flow.cluster::<()>();
780 let cluster2 = flow.cluster::<()>();
781 let node = flow.process::<()>();
782
783 let out_recv_1 = cluster1
784 .source_iter(q!(vec![1]))
785 .send_bincode(&node)
786 .entries()
787 .sim_output();
788
789 let out_recv_2 = cluster2
790 .source_iter(q!(vec![2]))
791 .send_bincode(&node)
792 .entries()
793 .sim_output();
794
795 let instances = flow
796 .sim()
797 .with_cluster_size(&cluster1, 3)
798 .with_cluster_size(&cluster2, 4)
799 .exhaustive(async || {
800 out_recv_1
801 .assert_yields_only_unordered(vec![
802 (MemberId::from_raw_id(0), 1),
803 (MemberId::from_raw_id(1), 1),
804 (MemberId::from_raw_id(2), 1),
805 ])
806 .await;
807
808 out_recv_2
809 .assert_yields_only_unordered(vec![
810 (MemberId::from_raw_id(0), 2),
811 (MemberId::from_raw_id(1), 2),
812 (MemberId::from_raw_id(2), 2),
813 (MemberId::from_raw_id(3), 2),
814 ])
815 .await;
816 });
817
818 assert_eq!(instances, 1);
819 }
820
821 #[cfg(feature = "sim")]
822 #[test]
823 fn sim_send_bincode_o2m() {
824 let flow = FlowBuilder::new();
825 let cluster = flow.cluster::<()>();
826 let node = flow.process::<()>();
827
828 let input = node.source_iter(q!(vec![
829 (MemberId::from_raw_id(0), 123),
830 (MemberId::from_raw_id(1), 456),
831 ]));
832
833 let out_recv = input
834 .demux_bincode(&cluster)
835 .map(q!(|x| x + 1))
836 .send_bincode(&node)
837 .entries()
838 .sim_output();
839
840 flow.sim()
841 .with_cluster_size(&cluster, 4)
842 .exhaustive(async || {
843 out_recv
844 .assert_yields_only_unordered(vec![
845 (MemberId::from_raw_id(0), 124),
846 (MemberId::from_raw_id(1), 457),
847 ])
848 .await
849 });
850 }
851
852 #[cfg(feature = "sim")]
853 #[test]
854 fn sim_broadcast_bincode_o2m() {
855 let flow = FlowBuilder::new();
856 let cluster = flow.cluster::<()>();
857 let node = flow.process::<()>();
858
859 let input = node.source_iter(q!(vec![123, 456]));
860
861 let out_recv = input
862 .broadcast_bincode(&cluster, nondet!(/** test */))
863 .map(q!(|x| x + 1))
864 .send_bincode(&node)
865 .entries()
866 .sim_output();
867
868 let mut c_1_produced = false;
869 let mut c_2_produced = false;
870
871 flow.sim()
872 .with_cluster_size(&cluster, 2)
873 .exhaustive(async || {
874 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
875
876 // check that order is preserved
877 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
878 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
879 c_1_produced = true;
880 }
881
882 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
883 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
884 c_2_produced = true;
885 }
886 });
887
888 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
889 }
890
891 #[cfg(feature = "sim")]
892 #[test]
893 fn sim_send_bincode_m2m() {
894 let flow = FlowBuilder::new();
895 let cluster = flow.cluster::<()>();
896 let node = flow.process::<()>();
897
898 let input = node.source_iter(q!(vec![
899 (MemberId::from_raw_id(0), 123),
900 (MemberId::from_raw_id(1), 456),
901 ]));
902
903 let out_recv = input
904 .demux_bincode(&cluster)
905 .map(q!(|x| x + 1))
906 .flat_map_ordered(q!(|x| vec![
907 (MemberId::from_raw_id(0), x),
908 (MemberId::from_raw_id(1), x),
909 ]))
910 .demux_bincode(&cluster)
911 .entries()
912 .send_bincode(&node)
913 .entries()
914 .sim_output();
915
916 flow.sim()
917 .with_cluster_size(&cluster, 4)
918 .exhaustive(async || {
919 out_recv
920 .assert_yields_only_unordered(vec![
921 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
922 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
923 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
924 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
925 ])
926 .await
927 });
928 }
929}