hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{Ordering, Retries, Stream};
11#[cfg(stageleft_runtime)]
12use crate::location::dynamic::DynLocation;
13use crate::location::{Cluster, MemberId, Process};
14use crate::networking::{NetworkFor, TCP};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19 #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
20 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
21 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
22 ///
23 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
24 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
25 /// API allows precise targeting of specific cluster members rather than broadcasting to
26 /// all members.
27 ///
28 /// # Example
29 /// ```rust
30 /// # #[cfg(feature = "deploy")] {
31 /// # use hydro_lang::prelude::*;
32 /// # use futures::StreamExt;
33 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
34 /// let p1 = flow.process::<()>();
35 /// let workers: Cluster<()> = flow.cluster::<()>();
36 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
37 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
38 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
39 /// .into_keyed()
40 /// .demux_bincode(&workers);
41 /// # on_worker.send_bincode(&p2).entries()
42 /// // if there are 4 members in the cluster, each receives one element
43 /// // - MemberId::<()>(0): [0]
44 /// // - MemberId::<()>(1): [1]
45 /// // - MemberId::<()>(2): [2]
46 /// // - MemberId::<()>(3): [3]
47 /// # }, |mut stream| async move {
48 /// # let mut results = Vec::new();
49 /// # for w in 0..4 {
50 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
51 /// # }
52 /// # results.sort();
53 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
54 /// # }));
55 /// # }
56 /// ```
57 pub fn demux_bincode(
58 self,
59 other: &Cluster<'a, L2>,
60 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
61 where
62 T: Serialize + DeserializeOwned,
63 {
64 self.demux(other, TCP.bincode())
65 }
66
67 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
68 /// identifying the recipient for each group and using the configuration in `via` to set up the
69 /// message transport.
70 ///
71 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
72 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
73 /// API allows precise targeting of specific cluster members rather than broadcasting to
74 /// all members.
75 ///
76 /// # Example
77 /// ```rust
78 /// # #[cfg(feature = "deploy")] {
79 /// # use hydro_lang::prelude::*;
80 /// # use futures::StreamExt;
81 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
82 /// let p1 = flow.process::<()>();
83 /// let workers: Cluster<()> = flow.cluster::<()>();
84 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
85 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
86 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87 /// .into_keyed()
88 /// .demux(&workers, TCP.bincode());
89 /// # on_worker.send(&p2, TCP.bincode()).entries()
90 /// // if there are 4 members in the cluster, each receives one element
91 /// // - MemberId::<()>(0): [0]
92 /// // - MemberId::<()>(1): [1]
93 /// // - MemberId::<()>(2): [2]
94 /// // - MemberId::<()>(3): [3]
95 /// # }, |mut stream| async move {
96 /// # let mut results = Vec::new();
97 /// # for w in 0..4 {
98 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
99 /// # }
100 /// # results.sort();
101 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
102 /// # }));
103 /// # }
104 /// ```
105 pub fn demux<N: NetworkFor<T>>(
106 self,
107 to: &Cluster<'a, L2>,
108 via: N,
109 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
110 where
111 T: Serialize + DeserializeOwned,
112 {
113 let serialize_pipeline = Some(N::serialize_thunk(true));
114
115 let deserialize_pipeline = Some(N::deserialize_thunk(None));
116
117 let name = via.name();
118 if to.multiversioned() && name.is_none() {
119 panic!(
120 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
121 );
122 }
123
124 Stream::new(
125 to.clone(),
126 HydroNode::Network {
127 name: name.map(ToOwned::to_owned),
128 serialize_fn: serialize_pipeline.map(|e| e.into()),
129 instantiate_fn: DebugInstantiate::Building,
130 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
131 input: Box::new(self.ir_node.into_inner()),
132 metadata: to.new_node_metadata(
133 Stream::<T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
134 ),
135 },
136 )
137 }
138}
139
140impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
141 KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
142{
143 #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
144 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
145 /// compound key where the first element is the recipient's [`MemberId`] and the second element
146 /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
147 /// messages.
148 ///
149 /// # Example
150 /// ```rust
151 /// # #[cfg(feature = "deploy")] {
152 /// # use hydro_lang::prelude::*;
153 /// # use futures::StreamExt;
154 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
155 /// let p1 = flow.process::<()>();
156 /// let workers: Cluster<()> = flow.cluster::<()>();
157 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
158 /// .source_iter(q!(vec![0, 1, 2, 3]))
159 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
160 /// .into_keyed();
161 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
162 /// # on_worker.entries().send_bincode(&p2).entries()
163 /// // if there are 4 members in the cluster, each receives one element
164 /// // - MemberId::<()>(0): { 0: [123] }
165 /// // - MemberId::<()>(1): { 1: [124] }
166 /// // - ...
167 /// # }, |mut stream| async move {
168 /// # let mut results = Vec::new();
169 /// # for w in 0..4 {
170 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
171 /// # }
172 /// # results.sort();
173 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
174 /// # }));
175 /// # }
176 /// ```
177 pub fn demux_bincode(
178 self,
179 other: &Cluster<'a, L2>,
180 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
181 where
182 K: Serialize + DeserializeOwned,
183 T: Serialize + DeserializeOwned,
184 {
185 self.demux(other, TCP.bincode())
186 }
187
188 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
189 /// compound key where the first element is the recipient's [`MemberId`] and the second element
190 /// is a key that will be sent along with the value, using the configuration in `via` to set up
191 /// the message transport.
192 ///
193 /// # Example
194 /// ```rust
195 /// # #[cfg(feature = "deploy")] {
196 /// # use hydro_lang::prelude::*;
197 /// # use futures::StreamExt;
198 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
199 /// let p1 = flow.process::<()>();
200 /// let workers: Cluster<()> = flow.cluster::<()>();
201 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
202 /// .source_iter(q!(vec![0, 1, 2, 3]))
203 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
204 /// .into_keyed();
205 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.bincode());
206 /// # on_worker.entries().send(&p2, TCP.bincode()).entries()
207 /// // if there are 4 members in the cluster, each receives one element
208 /// // - MemberId::<()>(0): { 0: [123] }
209 /// // - MemberId::<()>(1): { 1: [124] }
210 /// // - ...
211 /// # }, |mut stream| async move {
212 /// # let mut results = Vec::new();
213 /// # for w in 0..4 {
214 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
215 /// # }
216 /// # results.sort();
217 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
218 /// # }));
219 /// # }
220 /// ```
221 pub fn demux<N: NetworkFor<(K, T)>>(
222 self,
223 to: &Cluster<'a, L2>,
224 via: N,
225 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
226 where
227 K: Serialize + DeserializeOwned,
228 T: Serialize + DeserializeOwned,
229 {
230 let serialize_pipeline = Some(N::serialize_thunk(true));
231
232 let deserialize_pipeline = Some(N::deserialize_thunk(None));
233
234 let name = via.name();
235 if to.multiversioned() && name.is_none() {
236 panic!(
237 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
238 );
239 }
240
241 KeyedStream::new(
242 to.clone(),
243 HydroNode::Network {
244 name: name.map(ToOwned::to_owned),
245 serialize_fn: serialize_pipeline.map(|e| e.into()),
246 instantiate_fn: DebugInstantiate::Building,
247 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
248 input: Box::new(
249 self.entries()
250 .map(q!(|((id, k), v)| (id, (k, v))))
251 .ir_node
252 .into_inner(),
253 ),
254 metadata: to.new_node_metadata(
255 KeyedStream::<K, T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
256 ),
257 },
258 )
259 }
260}
261
262impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
263 KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
264{
265 #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
266 /// Sends each group of this stream at each source member to a specific member of a destination
267 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
268 /// [`bincode`] to serialize/deserialize messages.
269 ///
270 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
271 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
272 /// API allows precise targeting of specific cluster members rather than broadcasting to all
273 /// members.
274 ///
275 /// Each cluster member sends its local stream elements, and they are collected at each
276 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
277 ///
278 /// # Example
279 /// ```rust
280 /// # #[cfg(feature = "deploy")] {
281 /// # use hydro_lang::prelude::*;
282 /// # use futures::StreamExt;
283 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
284 /// # type Source = ();
285 /// # type Destination = ();
286 /// let source: Cluster<Source> = flow.cluster::<Source>();
287 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
288 /// .source_iter(q!(vec![0, 1, 2, 3]))
289 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
290 /// .into_keyed();
291 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
292 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
293 /// # all_received.entries().send_bincode(&p2).entries()
294 /// # }, |mut stream| async move {
295 /// // if there are 4 members in the destination cluster, each receives one message from each source member
296 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
297 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
298 /// // - ...
299 /// # let mut results = Vec::new();
300 /// # for w in 0..16 {
301 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
302 /// # }
303 /// # results.sort();
304 /// # assert_eq!(results, vec![
305 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
306 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
307 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
308 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
309 /// # ]);
310 /// # }));
311 /// # }
312 /// ```
313 pub fn demux_bincode(
314 self,
315 other: &Cluster<'a, L2>,
316 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
317 where
318 T: Serialize + DeserializeOwned,
319 {
320 self.demux(other, TCP.bincode())
321 }
322
323 /// Sends each group of this stream at each source member to a specific member of a destination
324 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
325 /// configuration in `via` to set up the message transport.
326 ///
327 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
328 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
329 /// API allows precise targeting of specific cluster members rather than broadcasting to all
330 /// members.
331 ///
332 /// Each cluster member sends its local stream elements, and they are collected at each
333 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
334 ///
335 /// # Example
336 /// ```rust
337 /// # #[cfg(feature = "deploy")] {
338 /// # use hydro_lang::prelude::*;
339 /// # use futures::StreamExt;
340 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
341 /// # type Source = ();
342 /// # type Destination = ();
343 /// let source: Cluster<Source> = flow.cluster::<Source>();
344 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
345 /// .source_iter(q!(vec![0, 1, 2, 3]))
346 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
347 /// .into_keyed();
348 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
349 /// let all_received = to_send.demux(&destination, TCP.bincode()); // KeyedStream<MemberId<Source>, i32, ...>
350 /// # all_received.entries().send(&p2, TCP.bincode()).entries()
351 /// # }, |mut stream| async move {
352 /// // if there are 4 members in the destination cluster, each receives one message from each source member
353 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
354 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
355 /// // - ...
356 /// # let mut results = Vec::new();
357 /// # for w in 0..16 {
358 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
359 /// # }
360 /// # results.sort();
361 /// # assert_eq!(results, vec![
362 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
363 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
364 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
365 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
366 /// # ]);
367 /// # }));
368 /// # }
369 /// ```
370 pub fn demux<N: NetworkFor<T>>(
371 self,
372 to: &Cluster<'a, L2>,
373 via: N,
374 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
375 where
376 T: Serialize + DeserializeOwned,
377 {
378 let serialize_pipeline = Some(N::serialize_thunk(true));
379
380 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
381
382 let name = via.name();
383 if to.multiversioned() && name.is_none() {
384 panic!(
385 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
386 );
387 }
388
389 let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
390 to.clone(),
391 HydroNode::Network {
392 name: name.map(ToOwned::to_owned),
393 serialize_fn: serialize_pipeline.map(|e| e.into()),
394 instantiate_fn: DebugInstantiate::Building,
395 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
396 input: Box::new(self.ir_node.into_inner()),
397 metadata: to.new_node_metadata(Stream::<
398 (MemberId<L>, T),
399 Cluster<'a, L2>,
400 Unbounded,
401 O,
402 R,
403 >::collection_kind()),
404 },
405 );
406
407 raw_stream.into_keyed()
408 }
409}
410
411impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
412 KeyedStream<K, V, Cluster<'a, L>, B, O, R>
413{
414 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
415 #[deprecated = "use KeyedStream::send(..., TCP.bincode()) instead"]
416 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
417 /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
418 /// has a compound key where the first element is the sender's [`MemberId`] and the second
419 /// element is the original key.
420 ///
421 /// # Example
422 /// ```rust
423 /// # #[cfg(feature = "deploy")] {
424 /// # use hydro_lang::prelude::*;
425 /// # use futures::StreamExt;
426 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
427 /// # type Source = ();
428 /// # type Destination = ();
429 /// let source: Cluster<Source> = flow.cluster::<Source>();
430 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
431 /// .source_iter(q!(vec![0, 1, 2, 3]))
432 /// .map(q!(|x| (x, x + 123)))
433 /// .into_keyed();
434 /// let destination_process = flow.process::<Destination>();
435 /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
436 /// # all_received.entries().send_bincode(&p2)
437 /// # }, |mut stream| async move {
438 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
439 /// // {
440 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
441 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
442 /// // ...
443 /// // }
444 /// # let mut results = Vec::new();
445 /// # for w in 0..16 {
446 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
447 /// # }
448 /// # results.sort();
449 /// # assert_eq!(results, vec![
450 /// # "((MemberId::<()>(0), 0), 123)",
451 /// # "((MemberId::<()>(0), 1), 124)",
452 /// # "((MemberId::<()>(0), 2), 125)",
453 /// # "((MemberId::<()>(0), 3), 126)",
454 /// # "((MemberId::<()>(1), 0), 123)",
455 /// # "((MemberId::<()>(1), 1), 124)",
456 /// # "((MemberId::<()>(1), 2), 125)",
457 /// # "((MemberId::<()>(1), 3), 126)",
458 /// # "((MemberId::<()>(2), 0), 123)",
459 /// # "((MemberId::<()>(2), 1), 124)",
460 /// # "((MemberId::<()>(2), 2), 125)",
461 /// # "((MemberId::<()>(2), 3), 126)",
462 /// # "((MemberId::<()>(3), 0), 123)",
463 /// # "((MemberId::<()>(3), 1), 124)",
464 /// # "((MemberId::<()>(3), 2), 125)",
465 /// # "((MemberId::<()>(3), 3), 126)",
466 /// # ]);
467 /// # }));
468 /// # }
469 /// ```
470 pub fn send_bincode<L2>(
471 self,
472 other: &Process<'a, L2>,
473 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
474 where
475 K: Serialize + DeserializeOwned,
476 V: Serialize + DeserializeOwned,
477 {
478 self.send(other, TCP.bincode())
479 }
480
481 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
482 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
483 /// network, using the configuration in `via` to set up the message transport. The resulting
484 /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
485 /// the second element is the original key.
486 ///
487 /// # Example
488 /// ```rust
489 /// # #[cfg(feature = "deploy")] {
490 /// # use hydro_lang::prelude::*;
491 /// # use futures::StreamExt;
492 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
493 /// # type Source = ();
494 /// # type Destination = ();
495 /// let source: Cluster<Source> = flow.cluster::<Source>();
496 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
497 /// .source_iter(q!(vec![0, 1, 2, 3]))
498 /// .map(q!(|x| (x, x + 123)))
499 /// .into_keyed();
500 /// let destination_process = flow.process::<Destination>();
501 /// let all_received = to_send.send(&destination_process, TCP.bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
502 /// # all_received.entries().send(&p2, TCP.bincode())
503 /// # }, |mut stream| async move {
504 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
505 /// // {
506 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
507 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
508 /// // ...
509 /// // }
510 /// # let mut results = Vec::new();
511 /// # for w in 0..16 {
512 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
513 /// # }
514 /// # results.sort();
515 /// # assert_eq!(results, vec![
516 /// # "((MemberId::<()>(0), 0), 123)",
517 /// # "((MemberId::<()>(0), 1), 124)",
518 /// # "((MemberId::<()>(0), 2), 125)",
519 /// # "((MemberId::<()>(0), 3), 126)",
520 /// # "((MemberId::<()>(1), 0), 123)",
521 /// # "((MemberId::<()>(1), 1), 124)",
522 /// # "((MemberId::<()>(1), 2), 125)",
523 /// # "((MemberId::<()>(1), 3), 126)",
524 /// # "((MemberId::<()>(2), 0), 123)",
525 /// # "((MemberId::<()>(2), 1), 124)",
526 /// # "((MemberId::<()>(2), 2), 125)",
527 /// # "((MemberId::<()>(2), 3), 126)",
528 /// # "((MemberId::<()>(3), 0), 123)",
529 /// # "((MemberId::<()>(3), 1), 124)",
530 /// # "((MemberId::<()>(3), 2), 125)",
531 /// # "((MemberId::<()>(3), 3), 126)",
532 /// # ]);
533 /// # }));
534 /// # }
535 /// ```
536 pub fn send<L2, N: NetworkFor<(K, V)>>(
537 self,
538 to: &Process<'a, L2>,
539 via: N,
540 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
541 where
542 K: Serialize + DeserializeOwned,
543 V: Serialize + DeserializeOwned,
544 {
545 let serialize_pipeline = Some(N::serialize_thunk(false));
546
547 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
548
549 let name = via.name();
550 if to.multiversioned() && name.is_none() {
551 panic!(
552 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
553 );
554 }
555
556 let raw_stream: Stream<(MemberId<L>, (K, V)), Process<'a, L2>, Unbounded, O, R> =
557 Stream::new(
558 to.clone(),
559 HydroNode::Network {
560 name: name.map(ToOwned::to_owned),
561 serialize_fn: serialize_pipeline.map(|e| e.into()),
562 instantiate_fn: DebugInstantiate::Building,
563 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
564 input: Box::new(self.ir_node.into_inner()),
565 metadata: to.new_node_metadata(Stream::<
566 (MemberId<L>, (K, V)),
567 Cluster<'a, L2>,
568 Unbounded,
569 O,
570 R,
571 >::collection_kind()),
572 },
573 );
574
575 raw_stream
576 .map(q!(|(sender, (k, v))| ((sender, k), v)))
577 .into_keyed()
578 }
579}