hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{MinOrder, Ordering, Retries, Stream};
11#[cfg(stageleft_runtime)]
12use crate::location::dynamic::DynLocation;
13use crate::location::{Cluster, MemberId, Process};
14use crate::networking::{NetworkFor, TCP};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
20 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
21 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
22 ///
23 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
24 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
25 /// API allows precise targeting of specific cluster members rather than broadcasting to
26 /// all members.
27 ///
28 /// # Example
29 /// ```rust
30 /// # #[cfg(feature = "deploy")] {
31 /// # use hydro_lang::prelude::*;
32 /// # use futures::StreamExt;
33 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
34 /// let p1 = flow.process::<()>();
35 /// let workers: Cluster<()> = flow.cluster::<()>();
36 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
37 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
38 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
39 /// .into_keyed()
40 /// .demux_bincode(&workers);
41 /// # on_worker.send_bincode(&p2).entries()
42 /// // if there are 4 members in the cluster, each receives one element
43 /// // - MemberId::<()>(0): [0]
44 /// // - MemberId::<()>(1): [1]
45 /// // - MemberId::<()>(2): [2]
46 /// // - MemberId::<()>(3): [3]
47 /// # }, |mut stream| async move {
48 /// # let mut results = Vec::new();
49 /// # for w in 0..4 {
50 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
51 /// # }
52 /// # results.sort();
53 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
54 /// # }));
55 /// # }
56 /// ```
57 pub fn demux_bincode(
58 self,
59 other: &Cluster<'a, L2>,
60 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
61 where
62 T: Serialize + DeserializeOwned,
63 {
64 self.demux(other, TCP.fail_stop().bincode())
65 }
66
67 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
68 /// identifying the recipient for each group and using the configuration in `via` to set up the
69 /// message transport.
70 ///
71 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
72 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
73 /// API allows precise targeting of specific cluster members rather than broadcasting to
74 /// all members.
75 ///
76 /// # Example
77 /// ```rust
78 /// # #[cfg(feature = "deploy")] {
79 /// # use hydro_lang::prelude::*;
80 /// # use futures::StreamExt;
81 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
82 /// let p1 = flow.process::<()>();
83 /// let workers: Cluster<()> = flow.cluster::<()>();
84 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
85 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
86 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87 /// .into_keyed()
88 /// .demux(&workers, TCP.fail_stop().bincode());
89 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
90 /// // if there are 4 members in the cluster, each receives one element
91 /// // - MemberId::<()>(0): [0]
92 /// // - MemberId::<()>(1): [1]
93 /// // - MemberId::<()>(2): [2]
94 /// // - MemberId::<()>(3): [3]
95 /// # }, |mut stream| async move {
96 /// # let mut results = Vec::new();
97 /// # for w in 0..4 {
98 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
99 /// # }
100 /// # results.sort();
101 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
102 /// # }));
103 /// # }
104 /// ```
105 pub fn demux<N: NetworkFor<T>>(
106 self,
107 to: &Cluster<'a, L2>,
108 via: N,
109 ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
110 where
111 T: Serialize + DeserializeOwned,
112 O: MinOrder<N::OrderingGuarantee>,
113 {
114 let serialize_pipeline = Some(N::serialize_thunk(true));
115
116 let deserialize_pipeline = Some(N::deserialize_thunk(None));
117
118 let name = via.name();
119 if to.multiversioned() && name.is_none() {
120 panic!(
121 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
122 );
123 }
124
125 Stream::new(
126 to.clone(),
127 HydroNode::Network {
128 name: name.map(ToOwned::to_owned),
129 networking_info: N::networking_info(),
130 serialize_fn: serialize_pipeline.map(|e| e.into()),
131 instantiate_fn: DebugInstantiate::Building,
132 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
133 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
134 metadata: to.new_node_metadata(Stream::<
135 T,
136 Cluster<'a, L2>,
137 Unbounded,
138 <O as MinOrder<N::OrderingGuarantee>>::Min,
139 R,
140 >::collection_kind()),
141 },
142 )
143 }
144}
145
146impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
147 KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
148{
149 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
150 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
151 /// compound key where the first element is the recipient's [`MemberId`] and the second element
152 /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
153 /// messages.
154 ///
155 /// # Example
156 /// ```rust
157 /// # #[cfg(feature = "deploy")] {
158 /// # use hydro_lang::prelude::*;
159 /// # use futures::StreamExt;
160 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
161 /// let p1 = flow.process::<()>();
162 /// let workers: Cluster<()> = flow.cluster::<()>();
163 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
164 /// .source_iter(q!(vec![0, 1, 2, 3]))
165 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
166 /// .into_keyed();
167 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
168 /// # on_worker.entries().send_bincode(&p2).entries()
169 /// // if there are 4 members in the cluster, each receives one element
170 /// // - MemberId::<()>(0): { 0: [123] }
171 /// // - MemberId::<()>(1): { 1: [124] }
172 /// // - ...
173 /// # }, |mut stream| async move {
174 /// # let mut results = Vec::new();
175 /// # for w in 0..4 {
176 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
177 /// # }
178 /// # results.sort();
179 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
180 /// # }));
181 /// # }
182 /// ```
183 pub fn demux_bincode(
184 self,
185 other: &Cluster<'a, L2>,
186 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
187 where
188 K: Serialize + DeserializeOwned,
189 T: Serialize + DeserializeOwned,
190 {
191 self.demux(other, TCP.fail_stop().bincode())
192 }
193
194 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
195 /// compound key where the first element is the recipient's [`MemberId`] and the second element
196 /// is a key that will be sent along with the value, using the configuration in `via` to set up
197 /// the message transport.
198 ///
199 /// # Example
200 /// ```rust
201 /// # #[cfg(feature = "deploy")] {
202 /// # use hydro_lang::prelude::*;
203 /// # use futures::StreamExt;
204 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
205 /// let p1 = flow.process::<()>();
206 /// let workers: Cluster<()> = flow.cluster::<()>();
207 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
208 /// .source_iter(q!(vec![0, 1, 2, 3]))
209 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
210 /// .into_keyed();
211 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
212 /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
213 /// // if there are 4 members in the cluster, each receives one element
214 /// // - MemberId::<()>(0): { 0: [123] }
215 /// // - MemberId::<()>(1): { 1: [124] }
216 /// // - ...
217 /// # }, |mut stream| async move {
218 /// # let mut results = Vec::new();
219 /// # for w in 0..4 {
220 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
221 /// # }
222 /// # results.sort();
223 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
224 /// # }));
225 /// # }
226 /// ```
227 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
228 pub fn demux<N: NetworkFor<(K, T)>>(
229 self,
230 to: &Cluster<'a, L2>,
231 via: N,
232 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
233 where
234 K: Serialize + DeserializeOwned,
235 T: Serialize + DeserializeOwned,
236 O: MinOrder<N::OrderingGuarantee>,
237 {
238 let serialize_pipeline = Some(N::serialize_thunk(true));
239
240 let deserialize_pipeline = Some(N::deserialize_thunk(None));
241
242 let name = via.name();
243 if to.multiversioned() && name.is_none() {
244 panic!(
245 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
246 );
247 }
248
249 KeyedStream::new(
250 to.clone(),
251 HydroNode::Network {
252 name: name.map(ToOwned::to_owned),
253 networking_info: N::networking_info(),
254 serialize_fn: serialize_pipeline.map(|e| e.into()),
255 instantiate_fn: DebugInstantiate::Building,
256 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
257 input: Box::new(
258 self.entries()
259 .map(q!(|((id, k), v)| (id, (k, v))))
260 .ir_node
261 .replace(HydroNode::Placeholder),
262 ),
263 metadata: to.new_node_metadata(KeyedStream::<
264 K,
265 T,
266 Cluster<'a, L2>,
267 Unbounded,
268 <O as MinOrder<N::OrderingGuarantee>>::Min,
269 R,
270 >::collection_kind()),
271 },
272 )
273 }
274}
275
276impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
277 KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
278{
279 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
280 /// Sends each group of this stream at each source member to a specific member of a destination
281 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
282 /// [`bincode`] to serialize/deserialize messages.
283 ///
284 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
285 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
286 /// API allows precise targeting of specific cluster members rather than broadcasting to all
287 /// members.
288 ///
289 /// Each cluster member sends its local stream elements, and they are collected at each
290 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
291 ///
292 /// # Example
293 /// ```rust
294 /// # #[cfg(feature = "deploy")] {
295 /// # use hydro_lang::prelude::*;
296 /// # use futures::StreamExt;
297 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
298 /// # type Source = ();
299 /// # type Destination = ();
300 /// let source: Cluster<Source> = flow.cluster::<Source>();
301 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
302 /// .source_iter(q!(vec![0, 1, 2, 3]))
303 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
304 /// .into_keyed();
305 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
306 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
307 /// # all_received.entries().send_bincode(&p2).entries()
308 /// # }, |mut stream| async move {
309 /// // if there are 4 members in the destination cluster, each receives one message from each source member
310 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
311 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
312 /// // - ...
313 /// # let mut results = Vec::new();
314 /// # for w in 0..16 {
315 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
316 /// # }
317 /// # results.sort();
318 /// # assert_eq!(results, vec![
319 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
320 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
321 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
322 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
323 /// # ]);
324 /// # }));
325 /// # }
326 /// ```
327 pub fn demux_bincode(
328 self,
329 other: &Cluster<'a, L2>,
330 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
331 where
332 T: Serialize + DeserializeOwned,
333 {
334 self.demux(other, TCP.fail_stop().bincode())
335 }
336
337 /// Sends each group of this stream at each source member to a specific member of a destination
338 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
339 /// configuration in `via` to set up the message transport.
340 ///
341 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
342 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
343 /// API allows precise targeting of specific cluster members rather than broadcasting to all
344 /// members.
345 ///
346 /// Each cluster member sends its local stream elements, and they are collected at each
347 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
348 ///
349 /// # Example
350 /// ```rust
351 /// # #[cfg(feature = "deploy")] {
352 /// # use hydro_lang::prelude::*;
353 /// # use futures::StreamExt;
354 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
355 /// # type Source = ();
356 /// # type Destination = ();
357 /// let source: Cluster<Source> = flow.cluster::<Source>();
358 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
359 /// .source_iter(q!(vec![0, 1, 2, 3]))
360 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
361 /// .into_keyed();
362 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
363 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
364 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
365 /// # }, |mut stream| async move {
366 /// // if there are 4 members in the destination cluster, each receives one message from each source member
367 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
368 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
369 /// // - ...
370 /// # let mut results = Vec::new();
371 /// # for w in 0..16 {
372 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
373 /// # }
374 /// # results.sort();
375 /// # assert_eq!(results, vec![
376 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
377 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
378 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
379 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
380 /// # ]);
381 /// # }));
382 /// # }
383 /// ```
384 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
385 pub fn demux<N: NetworkFor<T>>(
386 self,
387 to: &Cluster<'a, L2>,
388 via: N,
389 ) -> KeyedStream<
390 MemberId<L>,
391 T,
392 Cluster<'a, L2>,
393 Unbounded,
394 <O as MinOrder<N::OrderingGuarantee>>::Min,
395 R,
396 >
397 where
398 T: Serialize + DeserializeOwned,
399 O: MinOrder<N::OrderingGuarantee>,
400 {
401 let serialize_pipeline = Some(N::serialize_thunk(true));
402
403 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
404
405 let name = via.name();
406 if to.multiversioned() && name.is_none() {
407 panic!(
408 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
409 );
410 }
411
412 let raw_stream: Stream<
413 (MemberId<L>, T),
414 Cluster<'a, L2>,
415 Unbounded,
416 <O as MinOrder<N::OrderingGuarantee>>::Min,
417 R,
418 > = Stream::new(
419 to.clone(),
420 HydroNode::Network {
421 name: name.map(ToOwned::to_owned),
422 networking_info: N::networking_info(),
423 serialize_fn: serialize_pipeline.map(|e| e.into()),
424 instantiate_fn: DebugInstantiate::Building,
425 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
426 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
427 metadata: to.new_node_metadata(Stream::<
428 (MemberId<L>, T),
429 Cluster<'a, L2>,
430 Unbounded,
431 <O as MinOrder<N::OrderingGuarantee>>::Min,
432 R,
433 >::collection_kind()),
434 },
435 );
436
437 raw_stream.into_keyed()
438 }
439}
440
441impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
442 KeyedStream<K, V, Cluster<'a, L>, B, O, R>
443{
444 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
445 #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
446 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
447 /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
448 /// has a compound key where the first element is the sender's [`MemberId`] and the second
449 /// element is the original key.
450 ///
451 /// # Example
452 /// ```rust
453 /// # #[cfg(feature = "deploy")] {
454 /// # use hydro_lang::prelude::*;
455 /// # use futures::StreamExt;
456 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
457 /// # type Source = ();
458 /// # type Destination = ();
459 /// let source: Cluster<Source> = flow.cluster::<Source>();
460 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
461 /// .source_iter(q!(vec![0, 1, 2, 3]))
462 /// .map(q!(|x| (x, x + 123)))
463 /// .into_keyed();
464 /// let destination_process = flow.process::<Destination>();
465 /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
466 /// # all_received.entries().send_bincode(&p2)
467 /// # }, |mut stream| async move {
468 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
469 /// // {
470 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
471 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
472 /// // ...
473 /// // }
474 /// # let mut results = Vec::new();
475 /// # for w in 0..16 {
476 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
477 /// # }
478 /// # results.sort();
479 /// # assert_eq!(results, vec![
480 /// # "((MemberId::<()>(0), 0), 123)",
481 /// # "((MemberId::<()>(0), 1), 124)",
482 /// # "((MemberId::<()>(0), 2), 125)",
483 /// # "((MemberId::<()>(0), 3), 126)",
484 /// # "((MemberId::<()>(1), 0), 123)",
485 /// # "((MemberId::<()>(1), 1), 124)",
486 /// # "((MemberId::<()>(1), 2), 125)",
487 /// # "((MemberId::<()>(1), 3), 126)",
488 /// # "((MemberId::<()>(2), 0), 123)",
489 /// # "((MemberId::<()>(2), 1), 124)",
490 /// # "((MemberId::<()>(2), 2), 125)",
491 /// # "((MemberId::<()>(2), 3), 126)",
492 /// # "((MemberId::<()>(3), 0), 123)",
493 /// # "((MemberId::<()>(3), 1), 124)",
494 /// # "((MemberId::<()>(3), 2), 125)",
495 /// # "((MemberId::<()>(3), 3), 126)",
496 /// # ]);
497 /// # }));
498 /// # }
499 /// ```
500 pub fn send_bincode<L2>(
501 self,
502 other: &Process<'a, L2>,
503 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
504 where
505 K: Serialize + DeserializeOwned,
506 V: Serialize + DeserializeOwned,
507 {
508 self.send(other, TCP.fail_stop().bincode())
509 }
510
511 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
512 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
513 /// network, using the configuration in `via` to set up the message transport. The resulting
514 /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
515 /// the second element is the original key.
516 ///
517 /// # Example
518 /// ```rust
519 /// # #[cfg(feature = "deploy")] {
520 /// # use hydro_lang::prelude::*;
521 /// # use futures::StreamExt;
522 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
523 /// # type Source = ();
524 /// # type Destination = ();
525 /// let source: Cluster<Source> = flow.cluster::<Source>();
526 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
527 /// .source_iter(q!(vec![0, 1, 2, 3]))
528 /// .map(q!(|x| (x, x + 123)))
529 /// .into_keyed();
530 /// let destination_process = flow.process::<Destination>();
531 /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
532 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
533 /// # }, |mut stream| async move {
534 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
535 /// // {
536 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
537 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
538 /// // ...
539 /// // }
540 /// # let mut results = Vec::new();
541 /// # for w in 0..16 {
542 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
543 /// # }
544 /// # results.sort();
545 /// # assert_eq!(results, vec![
546 /// # "((MemberId::<()>(0), 0), 123)",
547 /// # "((MemberId::<()>(0), 1), 124)",
548 /// # "((MemberId::<()>(0), 2), 125)",
549 /// # "((MemberId::<()>(0), 3), 126)",
550 /// # "((MemberId::<()>(1), 0), 123)",
551 /// # "((MemberId::<()>(1), 1), 124)",
552 /// # "((MemberId::<()>(1), 2), 125)",
553 /// # "((MemberId::<()>(1), 3), 126)",
554 /// # "((MemberId::<()>(2), 0), 123)",
555 /// # "((MemberId::<()>(2), 1), 124)",
556 /// # "((MemberId::<()>(2), 2), 125)",
557 /// # "((MemberId::<()>(2), 3), 126)",
558 /// # "((MemberId::<()>(3), 0), 123)",
559 /// # "((MemberId::<()>(3), 1), 124)",
560 /// # "((MemberId::<()>(3), 2), 125)",
561 /// # "((MemberId::<()>(3), 3), 126)",
562 /// # ]);
563 /// # }));
564 /// # }
565 /// ```
566 pub fn send<L2, N: NetworkFor<(K, V)>>(
567 self,
568 to: &Process<'a, L2>,
569 via: N,
570 ) -> KeyedStream<
571 (MemberId<L>, K),
572 V,
573 Process<'a, L2>,
574 Unbounded,
575 <O as MinOrder<N::OrderingGuarantee>>::Min,
576 R,
577 >
578 where
579 K: Serialize + DeserializeOwned,
580 V: Serialize + DeserializeOwned,
581 O: MinOrder<N::OrderingGuarantee>,
582 {
583 let serialize_pipeline = Some(N::serialize_thunk(false));
584
585 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
586
587 let name = via.name();
588 if to.multiversioned() && name.is_none() {
589 panic!(
590 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
591 );
592 }
593
594 let raw_stream: Stream<
595 (MemberId<L>, (K, V)),
596 Process<'a, L2>,
597 Unbounded,
598 <O as MinOrder<N::OrderingGuarantee>>::Min,
599 R,
600 > = Stream::new(
601 to.clone(),
602 HydroNode::Network {
603 name: name.map(ToOwned::to_owned),
604 networking_info: N::networking_info(),
605 serialize_fn: serialize_pipeline.map(|e| e.into()),
606 instantiate_fn: DebugInstantiate::Building,
607 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
608 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
609 metadata: to.new_node_metadata(Stream::<
610 (MemberId<L>, (K, V)),
611 Cluster<'a, L2>,
612 Unbounded,
613 <O as MinOrder<N::OrderingGuarantee>>::Min,
614 R,
615 >::collection_kind()),
616 },
617 );
618
619 raw_stream
620 .map(q!(|(sender, (k, v))| ((sender, k), v)))
621 .into_keyed()
622 }
623}