Skip to main content

hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{Ordering, Retries, Stream};
11#[cfg(stageleft_runtime)]
12use crate::location::dynamic::DynLocation;
13use crate::location::{Cluster, MemberId, Process};
14use crate::networking::{NetworkFor, TCP};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19    #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
20    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
21    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
22    ///
23    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
24    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
25    /// API allows precise targeting of specific cluster members rather than broadcasting to
26    /// all members.
27    ///
28    /// # Example
29    /// ```rust
30    /// # #[cfg(feature = "deploy")] {
31    /// # use hydro_lang::prelude::*;
32    /// # use futures::StreamExt;
33    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
34    /// let p1 = flow.process::<()>();
35    /// let workers: Cluster<()> = flow.cluster::<()>();
36    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
37    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
38    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
39    ///     .into_keyed()
40    ///     .demux_bincode(&workers);
41    /// # on_worker.send_bincode(&p2).entries()
42    /// // if there are 4 members in the cluster, each receives one element
43    /// // - MemberId::<()>(0): [0]
44    /// // - MemberId::<()>(1): [1]
45    /// // - MemberId::<()>(2): [2]
46    /// // - MemberId::<()>(3): [3]
47    /// # }, |mut stream| async move {
48    /// # let mut results = Vec::new();
49    /// # for w in 0..4 {
50    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
51    /// # }
52    /// # results.sort();
53    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
54    /// # }));
55    /// # }
56    /// ```
57    pub fn demux_bincode(
58        self,
59        other: &Cluster<'a, L2>,
60    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
61    where
62        T: Serialize + DeserializeOwned,
63    {
64        self.demux(other, TCP.bincode())
65    }
66
67    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
68    /// identifying the recipient for each group and using the configuration in `via` to set up the
69    /// message transport.
70    ///
71    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
72    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
73    /// API allows precise targeting of specific cluster members rather than broadcasting to
74    /// all members.
75    ///
76    /// # Example
77    /// ```rust
78    /// # #[cfg(feature = "deploy")] {
79    /// # use hydro_lang::prelude::*;
80    /// # use futures::StreamExt;
81    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
82    /// let p1 = flow.process::<()>();
83    /// let workers: Cluster<()> = flow.cluster::<()>();
84    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
85    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
86    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87    ///     .into_keyed()
88    ///     .demux(&workers, TCP.bincode());
89    /// # on_worker.send(&p2, TCP.bincode()).entries()
90    /// // if there are 4 members in the cluster, each receives one element
91    /// // - MemberId::<()>(0): [0]
92    /// // - MemberId::<()>(1): [1]
93    /// // - MemberId::<()>(2): [2]
94    /// // - MemberId::<()>(3): [3]
95    /// # }, |mut stream| async move {
96    /// # let mut results = Vec::new();
97    /// # for w in 0..4 {
98    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
99    /// # }
100    /// # results.sort();
101    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
102    /// # }));
103    /// # }
104    /// ```
105    pub fn demux<N: NetworkFor<T>>(
106        self,
107        to: &Cluster<'a, L2>,
108        via: N,
109    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
110    where
111        T: Serialize + DeserializeOwned,
112    {
113        let serialize_pipeline = Some(N::serialize_thunk(true));
114
115        let deserialize_pipeline = Some(N::deserialize_thunk(None));
116
117        let name = via.name();
118        if to.multiversioned() && name.is_none() {
119            panic!(
120                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
121            );
122        }
123
124        Stream::new(
125            to.clone(),
126            HydroNode::Network {
127                name: name.map(ToOwned::to_owned),
128                serialize_fn: serialize_pipeline.map(|e| e.into()),
129                instantiate_fn: DebugInstantiate::Building,
130                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
131                input: Box::new(self.ir_node.into_inner()),
132                metadata: to.new_node_metadata(
133                    Stream::<T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
134                ),
135            },
136        )
137    }
138}
139
140impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
141    KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
142{
143    #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
144    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
145    /// compound key where the first element is the recipient's [`MemberId`] and the second element
146    /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
147    /// messages.
148    ///
149    /// # Example
150    /// ```rust
151    /// # #[cfg(feature = "deploy")] {
152    /// # use hydro_lang::prelude::*;
153    /// # use futures::StreamExt;
154    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
155    /// let p1 = flow.process::<()>();
156    /// let workers: Cluster<()> = flow.cluster::<()>();
157    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
158    ///     .source_iter(q!(vec![0, 1, 2, 3]))
159    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
160    ///     .into_keyed();
161    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
162    /// # on_worker.entries().send_bincode(&p2).entries()
163    /// // if there are 4 members in the cluster, each receives one element
164    /// // - MemberId::<()>(0): { 0: [123] }
165    /// // - MemberId::<()>(1): { 1: [124] }
166    /// // - ...
167    /// # }, |mut stream| async move {
168    /// # let mut results = Vec::new();
169    /// # for w in 0..4 {
170    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
171    /// # }
172    /// # results.sort();
173    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
174    /// # }));
175    /// # }
176    /// ```
177    pub fn demux_bincode(
178        self,
179        other: &Cluster<'a, L2>,
180    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
181    where
182        K: Serialize + DeserializeOwned,
183        T: Serialize + DeserializeOwned,
184    {
185        self.demux(other, TCP.bincode())
186    }
187
188    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
189    /// compound key where the first element is the recipient's [`MemberId`] and the second element
190    /// is a key that will be sent along with the value, using the configuration in `via` to set up
191    /// the message transport.
192    ///
193    /// # Example
194    /// ```rust
195    /// # #[cfg(feature = "deploy")] {
196    /// # use hydro_lang::prelude::*;
197    /// # use futures::StreamExt;
198    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
199    /// let p1 = flow.process::<()>();
200    /// let workers: Cluster<()> = flow.cluster::<()>();
201    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
202    ///     .source_iter(q!(vec![0, 1, 2, 3]))
203    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
204    ///     .into_keyed();
205    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.bincode());
206    /// # on_worker.entries().send(&p2, TCP.bincode()).entries()
207    /// // if there are 4 members in the cluster, each receives one element
208    /// // - MemberId::<()>(0): { 0: [123] }
209    /// // - MemberId::<()>(1): { 1: [124] }
210    /// // - ...
211    /// # }, |mut stream| async move {
212    /// # let mut results = Vec::new();
213    /// # for w in 0..4 {
214    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
215    /// # }
216    /// # results.sort();
217    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
218    /// # }));
219    /// # }
220    /// ```
221    pub fn demux<N: NetworkFor<(K, T)>>(
222        self,
223        to: &Cluster<'a, L2>,
224        via: N,
225    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
226    where
227        K: Serialize + DeserializeOwned,
228        T: Serialize + DeserializeOwned,
229    {
230        let serialize_pipeline = Some(N::serialize_thunk(true));
231
232        let deserialize_pipeline = Some(N::deserialize_thunk(None));
233
234        let name = via.name();
235        if to.multiversioned() && name.is_none() {
236            panic!(
237                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
238            );
239        }
240
241        KeyedStream::new(
242            to.clone(),
243            HydroNode::Network {
244                name: name.map(ToOwned::to_owned),
245                serialize_fn: serialize_pipeline.map(|e| e.into()),
246                instantiate_fn: DebugInstantiate::Building,
247                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
248                input: Box::new(
249                    self.entries()
250                        .map(q!(|((id, k), v)| (id, (k, v))))
251                        .ir_node
252                        .into_inner(),
253                ),
254                metadata: to.new_node_metadata(
255                    KeyedStream::<K, T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
256                ),
257            },
258        )
259    }
260}
261
262impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
263    KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
264{
265    #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
266    /// Sends each group of this stream at each source member to a specific member of a destination
267    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
268    /// [`bincode`] to serialize/deserialize messages.
269    ///
270    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
271    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
272    /// API allows precise targeting of specific cluster members rather than broadcasting to all
273    /// members.
274    ///
275    /// Each cluster member sends its local stream elements, and they are collected at each
276    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
277    ///
278    /// # Example
279    /// ```rust
280    /// # #[cfg(feature = "deploy")] {
281    /// # use hydro_lang::prelude::*;
282    /// # use futures::StreamExt;
283    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
284    /// # type Source = ();
285    /// # type Destination = ();
286    /// let source: Cluster<Source> = flow.cluster::<Source>();
287    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
288    ///     .source_iter(q!(vec![0, 1, 2, 3]))
289    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
290    ///     .into_keyed();
291    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
292    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
293    /// # all_received.entries().send_bincode(&p2).entries()
294    /// # }, |mut stream| async move {
295    /// // if there are 4 members in the destination cluster, each receives one message from each source member
296    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
297    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
298    /// // - ...
299    /// # let mut results = Vec::new();
300    /// # for w in 0..16 {
301    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
302    /// # }
303    /// # results.sort();
304    /// # assert_eq!(results, vec![
305    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
306    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
307    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
308    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
309    /// # ]);
310    /// # }));
311    /// # }
312    /// ```
313    pub fn demux_bincode(
314        self,
315        other: &Cluster<'a, L2>,
316    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
317    where
318        T: Serialize + DeserializeOwned,
319    {
320        self.demux(other, TCP.bincode())
321    }
322
323    /// Sends each group of this stream at each source member to a specific member of a destination
324    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
325    /// configuration in `via` to set up the message transport.
326    ///
327    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
328    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
329    /// API allows precise targeting of specific cluster members rather than broadcasting to all
330    /// members.
331    ///
332    /// Each cluster member sends its local stream elements, and they are collected at each
333    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
334    ///
335    /// # Example
336    /// ```rust
337    /// # #[cfg(feature = "deploy")] {
338    /// # use hydro_lang::prelude::*;
339    /// # use futures::StreamExt;
340    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
341    /// # type Source = ();
342    /// # type Destination = ();
343    /// let source: Cluster<Source> = flow.cluster::<Source>();
344    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
345    ///     .source_iter(q!(vec![0, 1, 2, 3]))
346    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
347    ///     .into_keyed();
348    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
349    /// let all_received = to_send.demux(&destination, TCP.bincode()); // KeyedStream<MemberId<Source>, i32, ...>
350    /// # all_received.entries().send(&p2, TCP.bincode()).entries()
351    /// # }, |mut stream| async move {
352    /// // if there are 4 members in the destination cluster, each receives one message from each source member
353    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
354    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
355    /// // - ...
356    /// # let mut results = Vec::new();
357    /// # for w in 0..16 {
358    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
359    /// # }
360    /// # results.sort();
361    /// # assert_eq!(results, vec![
362    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
363    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
364    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
365    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
366    /// # ]);
367    /// # }));
368    /// # }
369    /// ```
370    pub fn demux<N: NetworkFor<T>>(
371        self,
372        to: &Cluster<'a, L2>,
373        via: N,
374    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
375    where
376        T: Serialize + DeserializeOwned,
377    {
378        let serialize_pipeline = Some(N::serialize_thunk(true));
379
380        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
381
382        let name = via.name();
383        if to.multiversioned() && name.is_none() {
384            panic!(
385                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
386            );
387        }
388
389        let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
390            to.clone(),
391            HydroNode::Network {
392                name: name.map(ToOwned::to_owned),
393                serialize_fn: serialize_pipeline.map(|e| e.into()),
394                instantiate_fn: DebugInstantiate::Building,
395                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
396                input: Box::new(self.ir_node.into_inner()),
397                metadata: to.new_node_metadata(Stream::<
398                    (MemberId<L>, T),
399                    Cluster<'a, L2>,
400                    Unbounded,
401                    O,
402                    R,
403                >::collection_kind()),
404            },
405        );
406
407        raw_stream.into_keyed()
408    }
409}
410
411impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
412    KeyedStream<K, V, Cluster<'a, L>, B, O, R>
413{
414    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
415    #[deprecated = "use KeyedStream::send(..., TCP.bincode()) instead"]
416    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
417    /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
418    /// has a compound key where the first element is the sender's [`MemberId`] and the second
419    /// element is the original key.
420    ///
421    /// # Example
422    /// ```rust
423    /// # #[cfg(feature = "deploy")] {
424    /// # use hydro_lang::prelude::*;
425    /// # use futures::StreamExt;
426    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
427    /// # type Source = ();
428    /// # type Destination = ();
429    /// let source: Cluster<Source> = flow.cluster::<Source>();
430    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
431    ///     .source_iter(q!(vec![0, 1, 2, 3]))
432    ///     .map(q!(|x| (x, x + 123)))
433    ///     .into_keyed();
434    /// let destination_process = flow.process::<Destination>();
435    /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
436    /// # all_received.entries().send_bincode(&p2)
437    /// # }, |mut stream| async move {
438    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
439    /// // {
440    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
441    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
442    /// //     ...
443    /// // }
444    /// # let mut results = Vec::new();
445    /// # for w in 0..16 {
446    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
447    /// # }
448    /// # results.sort();
449    /// # assert_eq!(results, vec![
450    /// #   "((MemberId::<()>(0), 0), 123)",
451    /// #   "((MemberId::<()>(0), 1), 124)",
452    /// #   "((MemberId::<()>(0), 2), 125)",
453    /// #   "((MemberId::<()>(0), 3), 126)",
454    /// #   "((MemberId::<()>(1), 0), 123)",
455    /// #   "((MemberId::<()>(1), 1), 124)",
456    /// #   "((MemberId::<()>(1), 2), 125)",
457    /// #   "((MemberId::<()>(1), 3), 126)",
458    /// #   "((MemberId::<()>(2), 0), 123)",
459    /// #   "((MemberId::<()>(2), 1), 124)",
460    /// #   "((MemberId::<()>(2), 2), 125)",
461    /// #   "((MemberId::<()>(2), 3), 126)",
462    /// #   "((MemberId::<()>(3), 0), 123)",
463    /// #   "((MemberId::<()>(3), 1), 124)",
464    /// #   "((MemberId::<()>(3), 2), 125)",
465    /// #   "((MemberId::<()>(3), 3), 126)",
466    /// # ]);
467    /// # }));
468    /// # }
469    /// ```
470    pub fn send_bincode<L2>(
471        self,
472        other: &Process<'a, L2>,
473    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
474    where
475        K: Serialize + DeserializeOwned,
476        V: Serialize + DeserializeOwned,
477    {
478        self.send(other, TCP.bincode())
479    }
480
481    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
482    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
483    /// network, using the configuration in `via` to set up the message transport. The resulting
484    /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
485    /// the second element is the original key.
486    ///
487    /// # Example
488    /// ```rust
489    /// # #[cfg(feature = "deploy")] {
490    /// # use hydro_lang::prelude::*;
491    /// # use futures::StreamExt;
492    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
493    /// # type Source = ();
494    /// # type Destination = ();
495    /// let source: Cluster<Source> = flow.cluster::<Source>();
496    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
497    ///     .source_iter(q!(vec![0, 1, 2, 3]))
498    ///     .map(q!(|x| (x, x + 123)))
499    ///     .into_keyed();
500    /// let destination_process = flow.process::<Destination>();
501    /// let all_received = to_send.send(&destination_process, TCP.bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
502    /// # all_received.entries().send(&p2, TCP.bincode())
503    /// # }, |mut stream| async move {
504    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
505    /// // {
506    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
507    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
508    /// //     ...
509    /// // }
510    /// # let mut results = Vec::new();
511    /// # for w in 0..16 {
512    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
513    /// # }
514    /// # results.sort();
515    /// # assert_eq!(results, vec![
516    /// #   "((MemberId::<()>(0), 0), 123)",
517    /// #   "((MemberId::<()>(0), 1), 124)",
518    /// #   "((MemberId::<()>(0), 2), 125)",
519    /// #   "((MemberId::<()>(0), 3), 126)",
520    /// #   "((MemberId::<()>(1), 0), 123)",
521    /// #   "((MemberId::<()>(1), 1), 124)",
522    /// #   "((MemberId::<()>(1), 2), 125)",
523    /// #   "((MemberId::<()>(1), 3), 126)",
524    /// #   "((MemberId::<()>(2), 0), 123)",
525    /// #   "((MemberId::<()>(2), 1), 124)",
526    /// #   "((MemberId::<()>(2), 2), 125)",
527    /// #   "((MemberId::<()>(2), 3), 126)",
528    /// #   "((MemberId::<()>(3), 0), 123)",
529    /// #   "((MemberId::<()>(3), 1), 124)",
530    /// #   "((MemberId::<()>(3), 2), 125)",
531    /// #   "((MemberId::<()>(3), 3), 126)",
532    /// # ]);
533    /// # }));
534    /// # }
535    /// ```
536    pub fn send<L2, N: NetworkFor<(K, V)>>(
537        self,
538        to: &Process<'a, L2>,
539        via: N,
540    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
541    where
542        K: Serialize + DeserializeOwned,
543        V: Serialize + DeserializeOwned,
544    {
545        let serialize_pipeline = Some(N::serialize_thunk(false));
546
547        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
548
549        let name = via.name();
550        if to.multiversioned() && name.is_none() {
551            panic!(
552                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
553            );
554        }
555
556        let raw_stream: Stream<(MemberId<L>, (K, V)), Process<'a, L2>, Unbounded, O, R> =
557            Stream::new(
558                to.clone(),
559                HydroNode::Network {
560                    name: name.map(ToOwned::to_owned),
561                    serialize_fn: serialize_pipeline.map(|e| e.into()),
562                    instantiate_fn: DebugInstantiate::Building,
563                    deserialize_fn: deserialize_pipeline.map(|e| e.into()),
564                    input: Box::new(self.ir_node.into_inner()),
565                    metadata: to.new_node_metadata(Stream::<
566                        (MemberId<L>, (K, V)),
567                        Cluster<'a, L2>,
568                        Unbounded,
569                        O,
570                        R,
571                    >::collection_kind()),
572                },
573            );
574
575        raw_stream
576            .map(q!(|(sender, (k, v))| ((sender, k), v)))
577            .into_keyed()
578    }
579}