Skip to main content

hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{MinOrder, Ordering, Retries, Stream};
11#[cfg(stageleft_runtime)]
12use crate::location::dynamic::DynLocation;
13use crate::location::{Cluster, MemberId, Process};
14use crate::networking::{NetworkFor, TCP};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
20    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
21    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
22    ///
23    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
24    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
25    /// API allows precise targeting of specific cluster members rather than broadcasting to
26    /// all members.
27    ///
28    /// # Example
29    /// ```rust
30    /// # #[cfg(feature = "deploy")] {
31    /// # use hydro_lang::prelude::*;
32    /// # use futures::StreamExt;
33    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
34    /// let p1 = flow.process::<()>();
35    /// let workers: Cluster<()> = flow.cluster::<()>();
36    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
37    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
38    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
39    ///     .into_keyed()
40    ///     .demux_bincode(&workers);
41    /// # on_worker.send_bincode(&p2).entries()
42    /// // if there are 4 members in the cluster, each receives one element
43    /// // - MemberId::<()>(0): [0]
44    /// // - MemberId::<()>(1): [1]
45    /// // - MemberId::<()>(2): [2]
46    /// // - MemberId::<()>(3): [3]
47    /// # }, |mut stream| async move {
48    /// # let mut results = Vec::new();
49    /// # for w in 0..4 {
50    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
51    /// # }
52    /// # results.sort();
53    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
54    /// # }));
55    /// # }
56    /// ```
57    pub fn demux_bincode(
58        self,
59        other: &Cluster<'a, L2>,
60    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
61    where
62        T: Serialize + DeserializeOwned,
63    {
64        self.demux(other, TCP.fail_stop().bincode())
65    }
66
67    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
68    /// identifying the recipient for each group and using the configuration in `via` to set up the
69    /// message transport.
70    ///
71    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
72    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
73    /// API allows precise targeting of specific cluster members rather than broadcasting to
74    /// all members.
75    ///
76    /// # Example
77    /// ```rust
78    /// # #[cfg(feature = "deploy")] {
79    /// # use hydro_lang::prelude::*;
80    /// # use futures::StreamExt;
81    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
82    /// let p1 = flow.process::<()>();
83    /// let workers: Cluster<()> = flow.cluster::<()>();
84    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
85    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
86    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87    ///     .into_keyed()
88    ///     .demux(&workers, TCP.fail_stop().bincode());
89    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
90    /// // if there are 4 members in the cluster, each receives one element
91    /// // - MemberId::<()>(0): [0]
92    /// // - MemberId::<()>(1): [1]
93    /// // - MemberId::<()>(2): [2]
94    /// // - MemberId::<()>(3): [3]
95    /// # }, |mut stream| async move {
96    /// # let mut results = Vec::new();
97    /// # for w in 0..4 {
98    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
99    /// # }
100    /// # results.sort();
101    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
102    /// # }));
103    /// # }
104    /// ```
105    pub fn demux<N: NetworkFor<T>>(
106        self,
107        to: &Cluster<'a, L2>,
108        via: N,
109    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
110    where
111        T: Serialize + DeserializeOwned,
112        O: MinOrder<N::OrderingGuarantee>,
113    {
114        let serialize_pipeline = Some(N::serialize_thunk(true));
115
116        let deserialize_pipeline = Some(N::deserialize_thunk(None));
117
118        let name = via.name();
119        if to.multiversioned() && name.is_none() {
120            panic!(
121                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
122            );
123        }
124
125        Stream::new(
126            to.clone(),
127            HydroNode::Network {
128                name: name.map(ToOwned::to_owned),
129                networking_info: N::networking_info(),
130                serialize_fn: serialize_pipeline.map(|e| e.into()),
131                instantiate_fn: DebugInstantiate::Building,
132                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
133                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
134                metadata: to.new_node_metadata(Stream::<
135                    T,
136                    Cluster<'a, L2>,
137                    Unbounded,
138                    <O as MinOrder<N::OrderingGuarantee>>::Min,
139                    R,
140                >::collection_kind()),
141            },
142        )
143    }
144}
145
146impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
147    KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
148{
149    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
150    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
151    /// compound key where the first element is the recipient's [`MemberId`] and the second element
152    /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
153    /// messages.
154    ///
155    /// # Example
156    /// ```rust
157    /// # #[cfg(feature = "deploy")] {
158    /// # use hydro_lang::prelude::*;
159    /// # use futures::StreamExt;
160    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
161    /// let p1 = flow.process::<()>();
162    /// let workers: Cluster<()> = flow.cluster::<()>();
163    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
164    ///     .source_iter(q!(vec![0, 1, 2, 3]))
165    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
166    ///     .into_keyed();
167    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
168    /// # on_worker.entries().send_bincode(&p2).entries()
169    /// // if there are 4 members in the cluster, each receives one element
170    /// // - MemberId::<()>(0): { 0: [123] }
171    /// // - MemberId::<()>(1): { 1: [124] }
172    /// // - ...
173    /// # }, |mut stream| async move {
174    /// # let mut results = Vec::new();
175    /// # for w in 0..4 {
176    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
177    /// # }
178    /// # results.sort();
179    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
180    /// # }));
181    /// # }
182    /// ```
183    pub fn demux_bincode(
184        self,
185        other: &Cluster<'a, L2>,
186    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
187    where
188        K: Serialize + DeserializeOwned,
189        T: Serialize + DeserializeOwned,
190    {
191        self.demux(other, TCP.fail_stop().bincode())
192    }
193
194    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
195    /// compound key where the first element is the recipient's [`MemberId`] and the second element
196    /// is a key that will be sent along with the value, using the configuration in `via` to set up
197    /// the message transport.
198    ///
199    /// # Example
200    /// ```rust
201    /// # #[cfg(feature = "deploy")] {
202    /// # use hydro_lang::prelude::*;
203    /// # use futures::StreamExt;
204    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
205    /// let p1 = flow.process::<()>();
206    /// let workers: Cluster<()> = flow.cluster::<()>();
207    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
208    ///     .source_iter(q!(vec![0, 1, 2, 3]))
209    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
210    ///     .into_keyed();
211    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
212    /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
213    /// // if there are 4 members in the cluster, each receives one element
214    /// // - MemberId::<()>(0): { 0: [123] }
215    /// // - MemberId::<()>(1): { 1: [124] }
216    /// // - ...
217    /// # }, |mut stream| async move {
218    /// # let mut results = Vec::new();
219    /// # for w in 0..4 {
220    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
221    /// # }
222    /// # results.sort();
223    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
224    /// # }));
225    /// # }
226    /// ```
227    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
228    pub fn demux<N: NetworkFor<(K, T)>>(
229        self,
230        to: &Cluster<'a, L2>,
231        via: N,
232    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
233    where
234        K: Serialize + DeserializeOwned,
235        T: Serialize + DeserializeOwned,
236        O: MinOrder<N::OrderingGuarantee>,
237    {
238        let serialize_pipeline = Some(N::serialize_thunk(true));
239
240        let deserialize_pipeline = Some(N::deserialize_thunk(None));
241
242        let name = via.name();
243        if to.multiversioned() && name.is_none() {
244            panic!(
245                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
246            );
247        }
248
249        KeyedStream::new(
250            to.clone(),
251            HydroNode::Network {
252                name: name.map(ToOwned::to_owned),
253                networking_info: N::networking_info(),
254                serialize_fn: serialize_pipeline.map(|e| e.into()),
255                instantiate_fn: DebugInstantiate::Building,
256                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
257                input: Box::new(
258                    self.entries()
259                        .map(q!(|((id, k), v)| (id, (k, v))))
260                        .ir_node
261                        .replace(HydroNode::Placeholder),
262                ),
263                metadata: to.new_node_metadata(KeyedStream::<
264                    K,
265                    T,
266                    Cluster<'a, L2>,
267                    Unbounded,
268                    <O as MinOrder<N::OrderingGuarantee>>::Min,
269                    R,
270                >::collection_kind()),
271            },
272        )
273    }
274}
275
276impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
277    KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
278{
279    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
280    /// Sends each group of this stream at each source member to a specific member of a destination
281    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
282    /// [`bincode`] to serialize/deserialize messages.
283    ///
284    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
285    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
286    /// API allows precise targeting of specific cluster members rather than broadcasting to all
287    /// members.
288    ///
289    /// Each cluster member sends its local stream elements, and they are collected at each
290    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
291    ///
292    /// # Example
293    /// ```rust
294    /// # #[cfg(feature = "deploy")] {
295    /// # use hydro_lang::prelude::*;
296    /// # use futures::StreamExt;
297    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
298    /// # type Source = ();
299    /// # type Destination = ();
300    /// let source: Cluster<Source> = flow.cluster::<Source>();
301    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
302    ///     .source_iter(q!(vec![0, 1, 2, 3]))
303    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
304    ///     .into_keyed();
305    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
306    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
307    /// # all_received.entries().send_bincode(&p2).entries()
308    /// # }, |mut stream| async move {
309    /// // if there are 4 members in the destination cluster, each receives one message from each source member
310    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
311    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
312    /// // - ...
313    /// # let mut results = Vec::new();
314    /// # for w in 0..16 {
315    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
316    /// # }
317    /// # results.sort();
318    /// # assert_eq!(results, vec![
319    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
320    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
321    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
322    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
323    /// # ]);
324    /// # }));
325    /// # }
326    /// ```
327    pub fn demux_bincode(
328        self,
329        other: &Cluster<'a, L2>,
330    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
331    where
332        T: Serialize + DeserializeOwned,
333    {
334        self.demux(other, TCP.fail_stop().bincode())
335    }
336
337    /// Sends each group of this stream at each source member to a specific member of a destination
338    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
339    /// configuration in `via` to set up the message transport.
340    ///
341    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
342    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
343    /// API allows precise targeting of specific cluster members rather than broadcasting to all
344    /// members.
345    ///
346    /// Each cluster member sends its local stream elements, and they are collected at each
347    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
348    ///
349    /// # Example
350    /// ```rust
351    /// # #[cfg(feature = "deploy")] {
352    /// # use hydro_lang::prelude::*;
353    /// # use futures::StreamExt;
354    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
355    /// # type Source = ();
356    /// # type Destination = ();
357    /// let source: Cluster<Source> = flow.cluster::<Source>();
358    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
359    ///     .source_iter(q!(vec![0, 1, 2, 3]))
360    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
361    ///     .into_keyed();
362    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
363    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
364    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
365    /// # }, |mut stream| async move {
366    /// // if there are 4 members in the destination cluster, each receives one message from each source member
367    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
368    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
369    /// // - ...
370    /// # let mut results = Vec::new();
371    /// # for w in 0..16 {
372    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
373    /// # }
374    /// # results.sort();
375    /// # assert_eq!(results, vec![
376    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
377    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
378    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
379    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
380    /// # ]);
381    /// # }));
382    /// # }
383    /// ```
384    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
385    pub fn demux<N: NetworkFor<T>>(
386        self,
387        to: &Cluster<'a, L2>,
388        via: N,
389    ) -> KeyedStream<
390        MemberId<L>,
391        T,
392        Cluster<'a, L2>,
393        Unbounded,
394        <O as MinOrder<N::OrderingGuarantee>>::Min,
395        R,
396    >
397    where
398        T: Serialize + DeserializeOwned,
399        O: MinOrder<N::OrderingGuarantee>,
400    {
401        let serialize_pipeline = Some(N::serialize_thunk(true));
402
403        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
404
405        let name = via.name();
406        if to.multiversioned() && name.is_none() {
407            panic!(
408                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
409            );
410        }
411
412        let raw_stream: Stream<
413            (MemberId<L>, T),
414            Cluster<'a, L2>,
415            Unbounded,
416            <O as MinOrder<N::OrderingGuarantee>>::Min,
417            R,
418        > = Stream::new(
419            to.clone(),
420            HydroNode::Network {
421                name: name.map(ToOwned::to_owned),
422                networking_info: N::networking_info(),
423                serialize_fn: serialize_pipeline.map(|e| e.into()),
424                instantiate_fn: DebugInstantiate::Building,
425                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
426                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
427                metadata: to.new_node_metadata(Stream::<
428                    (MemberId<L>, T),
429                    Cluster<'a, L2>,
430                    Unbounded,
431                    <O as MinOrder<N::OrderingGuarantee>>::Min,
432                    R,
433                >::collection_kind()),
434            },
435        );
436
437        raw_stream.into_keyed()
438    }
439}
440
441impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
442    KeyedStream<K, V, Cluster<'a, L>, B, O, R>
443{
444    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
445    #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
446    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
447    /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
448    /// has a compound key where the first element is the sender's [`MemberId`] and the second
449    /// element is the original key.
450    ///
451    /// # Example
452    /// ```rust
453    /// # #[cfg(feature = "deploy")] {
454    /// # use hydro_lang::prelude::*;
455    /// # use futures::StreamExt;
456    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
457    /// # type Source = ();
458    /// # type Destination = ();
459    /// let source: Cluster<Source> = flow.cluster::<Source>();
460    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
461    ///     .source_iter(q!(vec![0, 1, 2, 3]))
462    ///     .map(q!(|x| (x, x + 123)))
463    ///     .into_keyed();
464    /// let destination_process = flow.process::<Destination>();
465    /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
466    /// # all_received.entries().send_bincode(&p2)
467    /// # }, |mut stream| async move {
468    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
469    /// // {
470    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
471    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
472    /// //     ...
473    /// // }
474    /// # let mut results = Vec::new();
475    /// # for w in 0..16 {
476    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
477    /// # }
478    /// # results.sort();
479    /// # assert_eq!(results, vec![
480    /// #   "((MemberId::<()>(0), 0), 123)",
481    /// #   "((MemberId::<()>(0), 1), 124)",
482    /// #   "((MemberId::<()>(0), 2), 125)",
483    /// #   "((MemberId::<()>(0), 3), 126)",
484    /// #   "((MemberId::<()>(1), 0), 123)",
485    /// #   "((MemberId::<()>(1), 1), 124)",
486    /// #   "((MemberId::<()>(1), 2), 125)",
487    /// #   "((MemberId::<()>(1), 3), 126)",
488    /// #   "((MemberId::<()>(2), 0), 123)",
489    /// #   "((MemberId::<()>(2), 1), 124)",
490    /// #   "((MemberId::<()>(2), 2), 125)",
491    /// #   "((MemberId::<()>(2), 3), 126)",
492    /// #   "((MemberId::<()>(3), 0), 123)",
493    /// #   "((MemberId::<()>(3), 1), 124)",
494    /// #   "((MemberId::<()>(3), 2), 125)",
495    /// #   "((MemberId::<()>(3), 3), 126)",
496    /// # ]);
497    /// # }));
498    /// # }
499    /// ```
500    pub fn send_bincode<L2>(
501        self,
502        other: &Process<'a, L2>,
503    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
504    where
505        K: Serialize + DeserializeOwned,
506        V: Serialize + DeserializeOwned,
507    {
508        self.send(other, TCP.fail_stop().bincode())
509    }
510
511    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
512    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
513    /// network, using the configuration in `via` to set up the message transport. The resulting
514    /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
515    /// the second element is the original key.
516    ///
517    /// # Example
518    /// ```rust
519    /// # #[cfg(feature = "deploy")] {
520    /// # use hydro_lang::prelude::*;
521    /// # use futures::StreamExt;
522    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
523    /// # type Source = ();
524    /// # type Destination = ();
525    /// let source: Cluster<Source> = flow.cluster::<Source>();
526    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
527    ///     .source_iter(q!(vec![0, 1, 2, 3]))
528    ///     .map(q!(|x| (x, x + 123)))
529    ///     .into_keyed();
530    /// let destination_process = flow.process::<Destination>();
531    /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
532    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
533    /// # }, |mut stream| async move {
534    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
535    /// // {
536    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
537    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
538    /// //     ...
539    /// // }
540    /// # let mut results = Vec::new();
541    /// # for w in 0..16 {
542    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
543    /// # }
544    /// # results.sort();
545    /// # assert_eq!(results, vec![
546    /// #   "((MemberId::<()>(0), 0), 123)",
547    /// #   "((MemberId::<()>(0), 1), 124)",
548    /// #   "((MemberId::<()>(0), 2), 125)",
549    /// #   "((MemberId::<()>(0), 3), 126)",
550    /// #   "((MemberId::<()>(1), 0), 123)",
551    /// #   "((MemberId::<()>(1), 1), 124)",
552    /// #   "((MemberId::<()>(1), 2), 125)",
553    /// #   "((MemberId::<()>(1), 3), 126)",
554    /// #   "((MemberId::<()>(2), 0), 123)",
555    /// #   "((MemberId::<()>(2), 1), 124)",
556    /// #   "((MemberId::<()>(2), 2), 125)",
557    /// #   "((MemberId::<()>(2), 3), 126)",
558    /// #   "((MemberId::<()>(3), 0), 123)",
559    /// #   "((MemberId::<()>(3), 1), 124)",
560    /// #   "((MemberId::<()>(3), 2), 125)",
561    /// #   "((MemberId::<()>(3), 3), 126)",
562    /// # ]);
563    /// # }));
564    /// # }
565    /// ```
566    pub fn send<L2, N: NetworkFor<(K, V)>>(
567        self,
568        to: &Process<'a, L2>,
569        via: N,
570    ) -> KeyedStream<
571        (MemberId<L>, K),
572        V,
573        Process<'a, L2>,
574        Unbounded,
575        <O as MinOrder<N::OrderingGuarantee>>::Min,
576        R,
577    >
578    where
579        K: Serialize + DeserializeOwned,
580        V: Serialize + DeserializeOwned,
581        O: MinOrder<N::OrderingGuarantee>,
582    {
583        let serialize_pipeline = Some(N::serialize_thunk(false));
584
585        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
586
587        let name = via.name();
588        if to.multiversioned() && name.is_none() {
589            panic!(
590                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
591            );
592        }
593
594        let raw_stream: Stream<
595            (MemberId<L>, (K, V)),
596            Process<'a, L2>,
597            Unbounded,
598            <O as MinOrder<N::OrderingGuarantee>>::Min,
599            R,
600        > = Stream::new(
601            to.clone(),
602            HydroNode::Network {
603                name: name.map(ToOwned::to_owned),
604                networking_info: N::networking_info(),
605                serialize_fn: serialize_pipeline.map(|e| e.into()),
606                instantiate_fn: DebugInstantiate::Building,
607                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
608                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
609                metadata: to.new_node_metadata(Stream::<
610                    (MemberId<L>, (K, V)),
611                    Cluster<'a, L2>,
612                    Unbounded,
613                    <O as MinOrder<N::OrderingGuarantee>>::Min,
614                    R,
615                >::collection_kind()),
616            },
617        );
618
619        raw_stream
620            .map(q!(|(sender, (k, v))| ((sender, k), v)))
621            .into_keyed()
622    }
623}