Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
34use crate::forward_handle::ForwardRef;
35#[cfg(stageleft_runtime)]
36use crate::forward_handle::{CycleCollection, ForwardHandle};
37use crate::live_collections::boundedness::{Bounded, Unbounded};
38use crate::live_collections::keyed_stream::KeyedStream;
39use crate::live_collections::singleton::Singleton;
40use crate::live_collections::stream::{
41    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
42};
43use crate::location::dynamic::LocationId;
44use crate::location::external_process::{
45    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
46};
47use crate::nondet::NonDet;
48#[cfg(feature = "sim")]
49use crate::sim::SimSender;
50use crate::staging_util::get_this_crate;
51
52pub mod dynamic;
53
54#[expect(missing_docs, reason = "TODO")]
55pub mod external_process;
56pub use external_process::External;
57
58#[expect(missing_docs, reason = "TODO")]
59pub mod process;
60pub use process::Process;
61
62#[expect(missing_docs, reason = "TODO")]
63pub mod cluster;
64pub use cluster::Cluster;
65
66#[expect(missing_docs, reason = "TODO")]
67pub mod member_id;
68pub use member_id::{MemberId, TaglessMemberId};
69
70#[expect(missing_docs, reason = "TODO")]
71pub mod tick;
72pub use tick::{Atomic, NoTick, Tick};
73
74#[expect(missing_docs, reason = "TODO")]
75#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
76pub enum MembershipEvent {
77    Joined,
78    Left,
79}
80
81#[expect(missing_docs, reason = "TODO")]
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
83pub enum NetworkHint {
84    Auto,
85    TcpPort(Option<u16>),
86}
87
88pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
89    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
90}
91
92#[stageleft::export(LocationKey)]
93new_key_type! {
94    /// A unique identifier for a clock tick.
95    pub struct LocationKey;
96}
97
98impl std::fmt::Display for LocationKey {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
101    }
102}
103
104/// This is used for the ECS membership stream.
105/// TODO(mingwei): Make this more robust?
106impl std::str::FromStr for LocationKey {
107    type Err = Option<ParseIntError>;
108
109    fn from_str(s: &str) -> Result<Self, Self::Err> {
110        let nvn = s.strip_prefix("loc").ok_or(None)?;
111        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
112        let idx: u64 = idx.parse()?;
113        let ver: u64 = ver.parse()?;
114        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
115    }
116}
117
118impl LocationKey {
119    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
120    /// The first location key, used by the simulator as the default external location.
121    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
122
123    /// A key for testing with index 1.
124    #[cfg(test)]
125    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); // `1v255`
126
127    /// A key for testing with index 2.
128    #[cfg(test)]
129    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); // `2v255`
130}
131
132/// This is used within `q!` code in docker and ECS.
133impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
134    type O = LocationKey;
135
136    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
137    where
138        Self: Sized,
139    {
140        let root = get_this_crate();
141        let n = Key::data(&self).as_ffi();
142        (
143            QuoteTokens {
144                prelude: None,
145                expr: Some(quote! {
146                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
147                }),
148            },
149            (),
150        )
151    }
152}
153
154/// A simple enum for the type of a root location.
155#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
156pub enum LocationType {
157    /// A process (single node).
158    Process,
159    /// A cluster (multiple nodes).
160    Cluster,
161    /// An external client.
162    External,
163}
164
165/// A location where data can be materialized and computation can be executed.
166///
167/// Hydro is a **global**, **distributed** programming model. This means that the data
168/// and computation in a Hydro program can be spread across multiple machines, data
169/// centers, and even continents. To achieve this, Hydro uses the concept of
170/// **locations** to keep track of _where_ data is located and computation is executed.
171///
172/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
173/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
174/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
175/// to allow live collections to be _moved_ between locations via network send/receive.
176///
177/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
178#[expect(
179    private_bounds,
180    reason = "only internal Hydro code can define location types"
181)]
182pub trait Location<'a>: dynamic::DynLocation {
183    /// The root location type for this location.
184    ///
185    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
186    /// For nested locations like [`Tick`], this is the root location that contains it.
187    type Root: Location<'a>;
188
189    /// Returns the root location for this location.
190    ///
191    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
192    /// For nested locations like [`Tick`], this returns the root location that contains it.
193    fn root(&self) -> Self::Root;
194
195    /// Attempts to create a new [`Tick`] clock domain at this location.
196    ///
197    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
198    /// or `None` if this location is already inside a tick (nested ticks are not supported).
199    ///
200    /// Prefer using [`Location::tick`] when you know the location is top-level.
201    fn try_tick(&self) -> Option<Tick<Self>> {
202        if Self::is_top_level() {
203            let next_id = self.flow_state().borrow_mut().next_clock_id;
204            self.flow_state().borrow_mut().next_clock_id += 1;
205            Some(Tick {
206                id: next_id,
207                l: self.clone(),
208            })
209        } else {
210            None
211        }
212    }
213
214    /// Returns the unique identifier for this location.
215    fn id(&self) -> LocationId {
216        dynamic::DynLocation::id(self)
217    }
218
219    /// Creates a new [`Tick`] clock domain at this location.
220    ///
221    /// A tick represents a logical clock that can be used to batch streaming data
222    /// into discrete time steps. This is useful for implementing iterative algorithms
223    /// or for synchronizing data across multiple streams.
224    ///
225    /// # Example
226    /// ```rust
227    /// # #[cfg(feature = "deploy")] {
228    /// # use hydro_lang::prelude::*;
229    /// # use futures::StreamExt;
230    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
231    /// let tick = process.tick();
232    /// let inside_tick = process
233    ///     .source_iter(q!(vec![1, 2, 3, 4]))
234    ///     .batch(&tick, nondet!(/** test */));
235    /// inside_tick.all_ticks()
236    /// # }, |mut stream| async move {
237    /// // 1, 2, 3, 4
238    /// # for w in vec![1, 2, 3, 4] {
239    /// #     assert_eq!(stream.next().await.unwrap(), w);
240    /// # }
241    /// # }));
242    /// # }
243    /// ```
244    fn tick(&self) -> Tick<Self>
245    where
246        Self: NoTick,
247    {
248        let next_id = self.flow_state().borrow_mut().next_clock_id;
249        self.flow_state().borrow_mut().next_clock_id += 1;
250        Tick {
251            id: next_id,
252            l: self.clone(),
253        }
254    }
255
256    /// Creates an unbounded stream that continuously emits unit values `()`.
257    ///
258    /// This is useful for driving computations that need to run continuously,
259    /// such as polling or heartbeat mechanisms.
260    ///
261    /// # Example
262    /// ```rust
263    /// # #[cfg(feature = "deploy")] {
264    /// # use hydro_lang::prelude::*;
265    /// # use futures::StreamExt;
266    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
267    /// let tick = process.tick();
268    /// process.spin()
269    ///     .batch(&tick, nondet!(/** test */))
270    ///     .map(q!(|_| 42))
271    ///     .all_ticks()
272    /// # }, |mut stream| async move {
273    /// // 42, 42, 42, ...
274    /// # assert_eq!(stream.next().await.unwrap(), 42);
275    /// # assert_eq!(stream.next().await.unwrap(), 42);
276    /// # assert_eq!(stream.next().await.unwrap(), 42);
277    /// # }));
278    /// # }
279    /// ```
280    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
281    where
282        Self: Sized + NoTick,
283    {
284        Stream::new(
285            self.clone(),
286            HydroNode::Source {
287                source: HydroSource::Spin(),
288                metadata: self.new_node_metadata(Stream::<
289                    (),
290                    Self,
291                    Unbounded,
292                    TotalOrder,
293                    ExactlyOnce,
294                >::collection_kind()),
295            },
296        )
297    }
298
299    /// Creates a stream from an async [`FuturesStream`].
300    ///
301    /// This is useful for integrating with external async data sources,
302    /// such as network connections or file readers.
303    ///
304    /// # Example
305    /// ```rust
306    /// # #[cfg(feature = "deploy")] {
307    /// # use hydro_lang::prelude::*;
308    /// # use futures::StreamExt;
309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
311    /// # }, |mut stream| async move {
312    /// // 1, 2, 3
313    /// # for w in vec![1, 2, 3] {
314    /// #     assert_eq!(stream.next().await.unwrap(), w);
315    /// # }
316    /// # }));
317    /// # }
318    /// ```
319    fn source_stream<T, E>(
320        &self,
321        e: impl QuotedWithContext<'a, E, Self>,
322    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
323    where
324        E: FuturesStream<Item = T> + Unpin,
325        Self: Sized + NoTick,
326    {
327        let e = e.splice_untyped_ctx(self);
328
329        Stream::new(
330            self.clone(),
331            HydroNode::Source {
332                source: HydroSource::Stream(e.into()),
333                metadata: self.new_node_metadata(Stream::<
334                    T,
335                    Self,
336                    Unbounded,
337                    TotalOrder,
338                    ExactlyOnce,
339                >::collection_kind()),
340            },
341        )
342    }
343
344    /// Creates a bounded stream from an iterator.
345    ///
346    /// The iterator is evaluated once at runtime, and all elements are emitted
347    /// in order. This is useful for creating streams from static data or
348    /// for testing.
349    ///
350    /// # Example
351    /// ```rust
352    /// # #[cfg(feature = "deploy")] {
353    /// # use hydro_lang::prelude::*;
354    /// # use futures::StreamExt;
355    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
356    /// process.source_iter(q!(vec![1, 2, 3, 4]))
357    /// # }, |mut stream| async move {
358    /// // 1, 2, 3, 4
359    /// # for w in vec![1, 2, 3, 4] {
360    /// #     assert_eq!(stream.next().await.unwrap(), w);
361    /// # }
362    /// # }));
363    /// # }
364    /// ```
365    fn source_iter<T, E>(
366        &self,
367        e: impl QuotedWithContext<'a, E, Self>,
368    ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
369    where
370        E: IntoIterator<Item = T>,
371        Self: Sized + NoTick,
372    {
373        let e = e.splice_typed_ctx(self);
374
375        Stream::new(
376            self.clone(),
377            HydroNode::Source {
378                source: HydroSource::Iter(e.into()),
379                metadata: self.new_node_metadata(
380                    Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
381                ),
382            },
383        )
384    }
385
386    /// Creates a stream of membership events for a cluster.
387    ///
388    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
389    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
390    /// keyed by the [`MemberId`] of the cluster member.
391    ///
392    /// This is useful for implementing protocols that need to track cluster membership,
393    /// such as broadcasting to all members or detecting failures.
394    ///
395    /// # Example
396    /// ```rust
397    /// # #[cfg(feature = "deploy")] {
398    /// # use hydro_lang::prelude::*;
399    /// # use futures::StreamExt;
400    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
401    /// let p1 = flow.process::<()>();
402    /// let workers: Cluster<()> = flow.cluster::<()>();
403    /// # // do nothing on each worker
404    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
405    /// let cluster_members = p1.source_cluster_members(&workers);
406    /// # cluster_members.entries().send(&p2, TCP.bincode())
407    /// // if there are 4 members in the cluster, we would see a join event for each
408    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
409    /// # }, |mut stream| async move {
410    /// # let mut results = Vec::new();
411    /// # for w in 0..4 {
412    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
413    /// # }
414    /// # results.sort();
415    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
416    /// # }));
417    /// # }
418    /// ```
419    fn source_cluster_members<C: 'a>(
420        &self,
421        cluster: &Cluster<'a, C>,
422    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
423    where
424        Self: Sized + NoTick,
425    {
426        Stream::new(
427            self.clone(),
428            HydroNode::Source {
429                source: HydroSource::ClusterMembers(cluster.id()),
430                metadata: self.new_node_metadata(Stream::<
431                    (TaglessMemberId, MembershipEvent),
432                    Self,
433                    Unbounded,
434                    TotalOrder,
435                    ExactlyOnce,
436                >::collection_kind()),
437            },
438        )
439        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
440        .into_keyed()
441    }
442
443    /// Creates a one-way connection from an external process to receive raw bytes.
444    ///
445    /// Returns a port handle for the external process to connect to, and a stream
446    /// of received byte buffers.
447    ///
448    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
449    /// or [`Location::source_external_bincode`].
450    fn source_external_bytes<L>(
451        &self,
452        from: &External<L>,
453    ) -> (
454        ExternalBytesPort,
455        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
456    )
457    where
458        Self: Sized + NoTick,
459    {
460        let (port, stream, sink) =
461            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
462
463        sink.complete(self.source_iter(q!([])));
464
465        (port, stream)
466    }
467
468    /// Creates a one-way connection from an external process to receive bincode-serialized data.
469    ///
470    /// Returns a sink handle for the external process to send data to, and a stream
471    /// of received values.
472    ///
473    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
474    #[expect(clippy::type_complexity, reason = "stream markers")]
475    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
476        &self,
477        from: &External<L>,
478    ) -> (
479        ExternalBincodeSink<T, NotMany, O, R>,
480        Stream<T, Self, Unbounded, O, R>,
481    )
482    where
483        Self: Sized + NoTick,
484        T: Serialize + DeserializeOwned,
485    {
486        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
487        sink.complete(self.source_iter(q!([])));
488
489        (
490            ExternalBincodeSink {
491                process_key: from.key,
492                port_id: port.port_id,
493                _phantom: PhantomData,
494            },
495            stream.weaken_ordering().weaken_retries(),
496        )
497    }
498
499    /// Sets up a simulated input port on this location for testing.
500    ///
501    /// Returns a handle to send messages to the location as well as a stream
502    /// of received messages. This is only available when the `sim` feature is enabled.
503    #[cfg(feature = "sim")]
504    #[expect(clippy::type_complexity, reason = "stream markers")]
505    fn sim_input<T, O: Ordering, R: Retries>(
506        &self,
507    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
508    where
509        Self: Sized + NoTick,
510        T: Serialize + DeserializeOwned,
511    {
512        let external_location: External<'a, ()> = External {
513            key: LocationKey::FIRST,
514            flow_state: self.flow_state().clone(),
515            _phantom: PhantomData,
516        };
517
518        let (external, stream) = self.source_external_bincode(&external_location);
519
520        (SimSender(external.port_id, PhantomData), stream)
521    }
522
523    /// Establishes a server on this location to receive a bidirectional connection from a single
524    /// client, identified by the given `External` handle. Returns a port handle for the external
525    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
526    /// messages.
527    ///
528    /// # Example
529    /// ```rust
530    /// # #[cfg(feature = "deploy")] {
531    /// # use hydro_lang::prelude::*;
532    /// # use hydro_deploy::Deployment;
533    /// # use futures::{SinkExt, StreamExt};
534    /// # tokio_test::block_on(async {
535    /// # use bytes::Bytes;
536    /// # use hydro_lang::location::NetworkHint;
537    /// # use tokio_util::codec::LengthDelimitedCodec;
538    /// # let mut flow = FlowBuilder::new();
539    /// let node = flow.process::<()>();
540    /// let external = flow.external::<()>();
541    /// let (port, incoming, outgoing) =
542    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
543    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
544    ///     let mut resp: Vec<u8> = data.into();
545    ///     resp.push(42);
546    ///     resp.into() // : Bytes
547    /// })));
548    ///
549    /// # let mut deployment = Deployment::new();
550    /// let nodes = flow // ... with_process and with_external
551    /// #     .with_process(&node, deployment.Localhost())
552    /// #     .with_external(&external, deployment.Localhost())
553    /// #     .deploy(&mut deployment);
554    ///
555    /// deployment.deploy().await.unwrap();
556    /// deployment.start().await.unwrap();
557    ///
558    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
559    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
560    /// assert_eq!(
561    ///     external_out.next().await.unwrap().unwrap(),
562    ///     vec![1, 2, 3, 42]
563    /// );
564    /// # });
565    /// # }
566    /// ```
567    #[expect(clippy::type_complexity, reason = "stream markers")]
568    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
569        &self,
570        from: &External<L>,
571        port_hint: NetworkHint,
572    ) -> (
573        ExternalBytesPort<NotMany>,
574        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
575        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
576    )
577    where
578        Self: Sized + NoTick,
579    {
580        let next_external_port_id = from
581            .flow_state
582            .borrow_mut()
583            .next_external_port
584            .get_and_increment();
585
586        let (fwd_ref, to_sink) =
587            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
588        let mut flow_state_borrow = self.flow_state().borrow_mut();
589
590        flow_state_borrow.push_root(HydroRoot::SendExternal {
591            to_external_key: from.key,
592            to_port_id: next_external_port_id,
593            to_many: false,
594            unpaired: false,
595            serialize_fn: None,
596            instantiate_fn: DebugInstantiate::Building,
597            input: Box::new(to_sink.ir_node.into_inner()),
598            op_metadata: HydroIrOpMetadata::new(),
599        });
600
601        let raw_stream: Stream<
602            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
603            Self,
604            Unbounded,
605            TotalOrder,
606            ExactlyOnce,
607        > = Stream::new(
608            self.clone(),
609            HydroNode::ExternalInput {
610                from_external_key: from.key,
611                from_port_id: next_external_port_id,
612                from_many: false,
613                codec_type: quote_type::<Codec>().into(),
614                port_hint,
615                instantiate_fn: DebugInstantiate::Building,
616                deserialize_fn: None,
617                metadata: self.new_node_metadata(Stream::<
618                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
619                    Self,
620                    Unbounded,
621                    TotalOrder,
622                    ExactlyOnce,
623                >::collection_kind()),
624            },
625        );
626
627        (
628            ExternalBytesPort {
629                process_key: from.key,
630                port_id: next_external_port_id,
631                _phantom: PhantomData,
632            },
633            raw_stream.flatten_ordered(),
634            fwd_ref,
635        )
636    }
637
638    /// Establishes a bidirectional connection from a single external client using bincode serialization.
639    ///
640    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
641    /// and a handle to send outgoing messages. This is a convenience wrapper around
642    /// [`Location::bind_single_client`] that uses bincode for serialization.
643    ///
644    /// # Type Parameters
645    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
646    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
647    #[expect(clippy::type_complexity, reason = "stream markers")]
648    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
649        &self,
650        from: &External<L>,
651    ) -> (
652        ExternalBincodeBidi<InT, OutT, NotMany>,
653        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
654        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
655    )
656    where
657        Self: Sized + NoTick,
658    {
659        let next_external_port_id = from
660            .flow_state
661            .borrow_mut()
662            .next_external_port
663            .get_and_increment();
664
665        let (fwd_ref, to_sink) =
666            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
667        let mut flow_state_borrow = self.flow_state().borrow_mut();
668
669        let root = get_this_crate();
670
671        let out_t_type = quote_type::<OutT>();
672        let ser_fn: syn::Expr = syn::parse_quote! {
673            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
674                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
675            )
676        };
677
678        flow_state_borrow.push_root(HydroRoot::SendExternal {
679            to_external_key: from.key,
680            to_port_id: next_external_port_id,
681            to_many: false,
682            unpaired: false,
683            serialize_fn: Some(ser_fn.into()),
684            instantiate_fn: DebugInstantiate::Building,
685            input: Box::new(to_sink.ir_node.into_inner()),
686            op_metadata: HydroIrOpMetadata::new(),
687        });
688
689        let in_t_type = quote_type::<InT>();
690
691        let deser_fn: syn::Expr = syn::parse_quote! {
692            |res| {
693                let b = res.unwrap();
694                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
695            }
696        };
697
698        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
699            self.clone(),
700            HydroNode::ExternalInput {
701                from_external_key: from.key,
702                from_port_id: next_external_port_id,
703                from_many: false,
704                codec_type: quote_type::<LengthDelimitedCodec>().into(),
705                port_hint: NetworkHint::Auto,
706                instantiate_fn: DebugInstantiate::Building,
707                deserialize_fn: Some(deser_fn.into()),
708                metadata: self.new_node_metadata(Stream::<
709                    InT,
710                    Self,
711                    Unbounded,
712                    TotalOrder,
713                    ExactlyOnce,
714                >::collection_kind()),
715            },
716        );
717
718        (
719            ExternalBincodeBidi {
720                process_key: from.key,
721                port_id: next_external_port_id,
722                _phantom: PhantomData,
723            },
724            raw_stream,
725            fwd_ref,
726        )
727    }
728
729    /// Establishes a server on this location to receive bidirectional connections from multiple
730    /// external clients using raw bytes.
731    ///
732    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
733    /// connections. Each client is assigned a unique `u64` identifier.
734    ///
735    /// Returns:
736    /// - A port handle for external processes to connect to
737    /// - A keyed stream of incoming messages, keyed by client ID
738    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
739    /// - A handle to send outgoing messages, keyed by client ID
740    #[expect(clippy::type_complexity, reason = "stream markers")]
741    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
742        &self,
743        from: &External<L>,
744        port_hint: NetworkHint,
745    ) -> (
746        ExternalBytesPort<Many>,
747        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
748        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
749        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
750    )
751    where
752        Self: Sized + NoTick,
753    {
754        let next_external_port_id = from
755            .flow_state
756            .borrow_mut()
757            .next_external_port
758            .get_and_increment();
759
760        let (fwd_ref, to_sink) =
761            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
762        let mut flow_state_borrow = self.flow_state().borrow_mut();
763
764        flow_state_borrow.push_root(HydroRoot::SendExternal {
765            to_external_key: from.key,
766            to_port_id: next_external_port_id,
767            to_many: true,
768            unpaired: false,
769            serialize_fn: None,
770            instantiate_fn: DebugInstantiate::Building,
771            input: Box::new(to_sink.entries().ir_node.into_inner()),
772            op_metadata: HydroIrOpMetadata::new(),
773        });
774
775        let raw_stream: Stream<
776            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
777            Self,
778            Unbounded,
779            TotalOrder,
780            ExactlyOnce,
781        > = Stream::new(
782            self.clone(),
783            HydroNode::ExternalInput {
784                from_external_key: from.key,
785                from_port_id: next_external_port_id,
786                from_many: true,
787                codec_type: quote_type::<Codec>().into(),
788                port_hint,
789                instantiate_fn: DebugInstantiate::Building,
790                deserialize_fn: None,
791                metadata: self.new_node_metadata(Stream::<
792                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
793                    Self,
794                    Unbounded,
795                    TotalOrder,
796                    ExactlyOnce,
797                >::collection_kind()),
798            },
799        );
800
801        let membership_stream_ident = syn::Ident::new(
802            &format!(
803                "__hydro_deploy_many_{}_{}_membership",
804                from.key, next_external_port_id
805            ),
806            Span::call_site(),
807        );
808        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
809        let raw_membership_stream: KeyedStream<
810            u64,
811            bool,
812            Self,
813            Unbounded,
814            TotalOrder,
815            ExactlyOnce,
816        > = KeyedStream::new(
817            self.clone(),
818            HydroNode::Source {
819                source: HydroSource::Stream(membership_stream_expr.into()),
820                metadata: self.new_node_metadata(KeyedStream::<
821                    u64,
822                    bool,
823                    Self,
824                    Unbounded,
825                    TotalOrder,
826                    ExactlyOnce,
827                >::collection_kind()),
828            },
829        );
830
831        (
832            ExternalBytesPort {
833                process_key: from.key,
834                port_id: next_external_port_id,
835                _phantom: PhantomData,
836            },
837            raw_stream
838                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
839                .into_keyed(),
840            raw_membership_stream.map(q!(|join| {
841                if join {
842                    MembershipEvent::Joined
843                } else {
844                    MembershipEvent::Left
845                }
846            })),
847            fwd_ref,
848        )
849    }
850
851    /// Establishes a server on this location to receive bidirectional connections from multiple
852    /// external clients using bincode serialization.
853    ///
854    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
855    /// client connections. Each client is assigned a unique `u64` identifier.
856    ///
857    /// Returns:
858    /// - A port handle for external processes to connect to
859    /// - A keyed stream of incoming messages, keyed by client ID
860    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
861    /// - A handle to send outgoing messages, keyed by client ID
862    ///
863    /// # Type Parameters
864    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
865    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
866    #[expect(clippy::type_complexity, reason = "stream markers")]
867    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
868        &self,
869        from: &External<L>,
870    ) -> (
871        ExternalBincodeBidi<InT, OutT, Many>,
872        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
873        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
874        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
875    )
876    where
877        Self: Sized + NoTick,
878    {
879        let next_external_port_id = from
880            .flow_state
881            .borrow_mut()
882            .next_external_port
883            .get_and_increment();
884
885        let (fwd_ref, to_sink) =
886            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
887        let mut flow_state_borrow = self.flow_state().borrow_mut();
888
889        let root = get_this_crate();
890
891        let out_t_type = quote_type::<OutT>();
892        let ser_fn: syn::Expr = syn::parse_quote! {
893            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
894                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
895            )
896        };
897
898        flow_state_borrow.push_root(HydroRoot::SendExternal {
899            to_external_key: from.key,
900            to_port_id: next_external_port_id,
901            to_many: true,
902            unpaired: false,
903            serialize_fn: Some(ser_fn.into()),
904            instantiate_fn: DebugInstantiate::Building,
905            input: Box::new(to_sink.entries().ir_node.into_inner()),
906            op_metadata: HydroIrOpMetadata::new(),
907        });
908
909        let in_t_type = quote_type::<InT>();
910
911        let deser_fn: syn::Expr = syn::parse_quote! {
912            |res| {
913                let (id, b) = res.unwrap();
914                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
915            }
916        };
917
918        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
919            KeyedStream::new(
920                self.clone(),
921                HydroNode::ExternalInput {
922                    from_external_key: from.key,
923                    from_port_id: next_external_port_id,
924                    from_many: true,
925                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
926                    port_hint: NetworkHint::Auto,
927                    instantiate_fn: DebugInstantiate::Building,
928                    deserialize_fn: Some(deser_fn.into()),
929                    metadata: self.new_node_metadata(KeyedStream::<
930                        u64,
931                        InT,
932                        Self,
933                        Unbounded,
934                        TotalOrder,
935                        ExactlyOnce,
936                    >::collection_kind()),
937                },
938            );
939
940        let membership_stream_ident = syn::Ident::new(
941            &format!(
942                "__hydro_deploy_many_{}_{}_membership",
943                from.key, next_external_port_id
944            ),
945            Span::call_site(),
946        );
947        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
948        let raw_membership_stream: KeyedStream<
949            u64,
950            bool,
951            Self,
952            Unbounded,
953            TotalOrder,
954            ExactlyOnce,
955        > = KeyedStream::new(
956            self.clone(),
957            HydroNode::Source {
958                source: HydroSource::Stream(membership_stream_expr.into()),
959                metadata: self.new_node_metadata(KeyedStream::<
960                    u64,
961                    bool,
962                    Self,
963                    Unbounded,
964                    TotalOrder,
965                    ExactlyOnce,
966                >::collection_kind()),
967            },
968        );
969
970        (
971            ExternalBincodeBidi {
972                process_key: from.key,
973                port_id: next_external_port_id,
974                _phantom: PhantomData,
975            },
976            raw_stream,
977            raw_membership_stream.map(q!(|join| {
978                if join {
979                    MembershipEvent::Joined
980                } else {
981                    MembershipEvent::Left
982                }
983            })),
984            fwd_ref,
985        )
986    }
987
988    /// Constructs a [`Singleton`] materialized at this location with the given static value.
989    ///
990    /// # Example
991    /// ```rust
992    /// # #[cfg(feature = "deploy")] {
993    /// # use hydro_lang::prelude::*;
994    /// # use futures::StreamExt;
995    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
996    /// let tick = process.tick();
997    /// let singleton = tick.singleton(q!(5));
998    /// # singleton.all_ticks()
999    /// # }, |mut stream| async move {
1000    /// // 5
1001    /// # assert_eq!(stream.next().await.unwrap(), 5);
1002    /// # }));
1003    /// # }
1004    /// ```
1005    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1006    where
1007        T: Clone,
1008        Self: Sized,
1009    {
1010        let e = e.splice_untyped_ctx(self);
1011
1012        Singleton::new(
1013            self.clone(),
1014            HydroNode::SingletonSource {
1015                value: e.into(),
1016                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1017            },
1018        )
1019    }
1020
1021    /// Generates a stream with values emitted at a fixed interval, with
1022    /// each value being the current time (as an [`tokio::time::Instant`]).
1023    ///
1024    /// The clock source used is monotonic, so elements will be emitted in
1025    /// increasing order.
1026    ///
1027    /// # Non-Determinism
1028    /// Because this stream is generated by an OS timer, it will be
1029    /// non-deterministic because each timestamp will be arbitrary.
1030    fn source_interval(
1031        &self,
1032        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1033        _nondet: NonDet,
1034    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1035    where
1036        Self: Sized + NoTick,
1037    {
1038        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1039            tokio::time::interval(interval)
1040        )))
1041    }
1042
1043    /// Generates a stream with values emitted at a fixed interval (with an
1044    /// initial delay), with each value being the current time
1045    /// (as an [`tokio::time::Instant`]).
1046    ///
1047    /// The clock source used is monotonic, so elements will be emitted in
1048    /// increasing order.
1049    ///
1050    /// # Non-Determinism
1051    /// Because this stream is generated by an OS timer, it will be
1052    /// non-deterministic because each timestamp will be arbitrary.
1053    fn source_interval_delayed(
1054        &self,
1055        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1056        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1057        _nondet: NonDet,
1058    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1059    where
1060        Self: Sized + NoTick,
1061    {
1062        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1063            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1064        )))
1065    }
1066
1067    /// Creates a forward reference for defining recursive or mutually-dependent dataflows.
1068    ///
1069    /// Returns a handle that must be completed with the actual stream, and a placeholder
1070    /// stream that can be used in the dataflow graph before the actual stream is defined.
1071    ///
1072    /// This is useful for implementing feedback loops or recursive computations where
1073    /// a stream depends on its own output.
1074    ///
1075    /// # Example
1076    /// ```rust
1077    /// # #[cfg(feature = "deploy")] {
1078    /// # use hydro_lang::prelude::*;
1079    /// # use hydro_lang::live_collections::stream::NoOrder;
1080    /// # use futures::StreamExt;
1081    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1082    /// // Create a forward reference for the feedback stream
1083    /// let (complete, feedback) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1084    ///
1085    /// // Combine initial input with feedback, then increment
1086    /// let input: Stream<_, _, Unbounded> = process.source_iter(q!([1])).into();
1087    /// let output: Stream<_, _, _, NoOrder> = input.interleave(feedback).map(q!(|x| x + 1));
1088    ///
1089    /// // Complete the forward reference with the output
1090    /// complete.complete(output.clone());
1091    /// output
1092    /// # }, |mut stream| async move {
1093    /// // 2, 3, 4, 5, ...
1094    /// # assert_eq!(stream.next().await.unwrap(), 2);
1095    /// # assert_eq!(stream.next().await.unwrap(), 3);
1096    /// # assert_eq!(stream.next().await.unwrap(), 4);
1097    /// # }));
1098    /// # }
1099    /// ```
1100    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1101    where
1102        S: CycleCollection<'a, ForwardRef, Location = Self>,
1103    {
1104        let next_id = self.flow_state().borrow_mut().next_cycle_id();
1105        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
1106
1107        (
1108            ForwardHandle {
1109                completed: false,
1110                ident: ident.clone(),
1111                expected_location: Location::id(self),
1112                _phantom: PhantomData,
1113            },
1114            S::create_source(ident, self.clone()),
1115        )
1116    }
1117}
1118
1119#[cfg(feature = "deploy")]
1120#[cfg(test)]
1121mod tests {
1122    use std::collections::HashSet;
1123
1124    use futures::{SinkExt, StreamExt};
1125    use hydro_deploy::Deployment;
1126    use stageleft::q;
1127    use tokio_util::codec::LengthDelimitedCodec;
1128
1129    use crate::compile::builder::FlowBuilder;
1130    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1131    use crate::location::{Location, NetworkHint};
1132    use crate::nondet::nondet;
1133
1134    #[tokio::test]
1135    async fn top_level_singleton_replay_cardinality() {
1136        let mut deployment = Deployment::new();
1137
1138        let mut flow = FlowBuilder::new();
1139        let node = flow.process::<()>();
1140        let external = flow.external::<()>();
1141
1142        let (in_port, input) =
1143            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1144        let singleton = node.singleton(q!(123));
1145        let tick = node.tick();
1146        let out = input
1147            .batch(&tick, nondet!(/** test */))
1148            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1149            .cross_singleton(
1150                singleton
1151                    .snapshot(&tick, nondet!(/** test */))
1152                    .into_stream()
1153                    .count(),
1154            )
1155            .all_ticks()
1156            .send_bincode_external(&external);
1157
1158        let nodes = flow
1159            .with_process(&node, deployment.Localhost())
1160            .with_external(&external, deployment.Localhost())
1161            .deploy(&mut deployment);
1162
1163        deployment.deploy().await.unwrap();
1164
1165        let mut external_in = nodes.connect(in_port).await;
1166        let mut external_out = nodes.connect(out).await;
1167
1168        deployment.start().await.unwrap();
1169
1170        external_in.send(1).await.unwrap();
1171        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1172
1173        external_in.send(2).await.unwrap();
1174        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1175    }
1176
1177    #[tokio::test]
1178    async fn tick_singleton_replay_cardinality() {
1179        let mut deployment = Deployment::new();
1180
1181        let mut flow = FlowBuilder::new();
1182        let node = flow.process::<()>();
1183        let external = flow.external::<()>();
1184
1185        let (in_port, input) =
1186            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1187        let tick = node.tick();
1188        let singleton = tick.singleton(q!(123));
1189        let out = input
1190            .batch(&tick, nondet!(/** test */))
1191            .cross_singleton(singleton.clone())
1192            .cross_singleton(singleton.into_stream().count())
1193            .all_ticks()
1194            .send_bincode_external(&external);
1195
1196        let nodes = flow
1197            .with_process(&node, deployment.Localhost())
1198            .with_external(&external, deployment.Localhost())
1199            .deploy(&mut deployment);
1200
1201        deployment.deploy().await.unwrap();
1202
1203        let mut external_in = nodes.connect(in_port).await;
1204        let mut external_out = nodes.connect(out).await;
1205
1206        deployment.start().await.unwrap();
1207
1208        external_in.send(1).await.unwrap();
1209        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1210
1211        external_in.send(2).await.unwrap();
1212        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1213    }
1214
1215    #[tokio::test]
1216    async fn external_bytes() {
1217        let mut deployment = Deployment::new();
1218
1219        let mut flow = FlowBuilder::new();
1220        let first_node = flow.process::<()>();
1221        let external = flow.external::<()>();
1222
1223        let (in_port, input) = first_node.source_external_bytes(&external);
1224        let out = input.send_bincode_external(&external);
1225
1226        let nodes = flow
1227            .with_process(&first_node, deployment.Localhost())
1228            .with_external(&external, deployment.Localhost())
1229            .deploy(&mut deployment);
1230
1231        deployment.deploy().await.unwrap();
1232
1233        let mut external_in = nodes.connect(in_port).await.1;
1234        let mut external_out = nodes.connect(out).await;
1235
1236        deployment.start().await.unwrap();
1237
1238        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1239
1240        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1241    }
1242
1243    #[tokio::test]
1244    async fn multi_external_source() {
1245        let mut deployment = Deployment::new();
1246
1247        let mut flow = FlowBuilder::new();
1248        let first_node = flow.process::<()>();
1249        let external = flow.external::<()>();
1250
1251        let (in_port, input, _membership, complete_sink) =
1252            first_node.bidi_external_many_bincode(&external);
1253        let out = input.entries().send_bincode_external(&external);
1254        complete_sink.complete(
1255            first_node
1256                .source_iter::<(u64, ()), _>(q!([]))
1257                .into_keyed()
1258                .weaken_ordering(),
1259        );
1260
1261        let nodes = flow
1262            .with_process(&first_node, deployment.Localhost())
1263            .with_external(&external, deployment.Localhost())
1264            .deploy(&mut deployment);
1265
1266        deployment.deploy().await.unwrap();
1267
1268        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1269        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1270        let external_out = nodes.connect(out).await;
1271
1272        deployment.start().await.unwrap();
1273
1274        external_in_1.send(123).await.unwrap();
1275        external_in_2.send(456).await.unwrap();
1276
1277        assert_eq!(
1278            external_out.take(2).collect::<HashSet<_>>().await,
1279            vec![(0, 123), (1, 456)].into_iter().collect()
1280        );
1281    }
1282
1283    #[tokio::test]
1284    async fn second_connection_only_multi_source() {
1285        let mut deployment = Deployment::new();
1286
1287        let mut flow = FlowBuilder::new();
1288        let first_node = flow.process::<()>();
1289        let external = flow.external::<()>();
1290
1291        let (in_port, input, _membership, complete_sink) =
1292            first_node.bidi_external_many_bincode(&external);
1293        let out = input.entries().send_bincode_external(&external);
1294        complete_sink.complete(
1295            first_node
1296                .source_iter::<(u64, ()), _>(q!([]))
1297                .into_keyed()
1298                .weaken_ordering(),
1299        );
1300
1301        let nodes = flow
1302            .with_process(&first_node, deployment.Localhost())
1303            .with_external(&external, deployment.Localhost())
1304            .deploy(&mut deployment);
1305
1306        deployment.deploy().await.unwrap();
1307
1308        // intentionally skipped to test stream waking logic
1309        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1310        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1311        let mut external_out = nodes.connect(out).await;
1312
1313        deployment.start().await.unwrap();
1314
1315        external_in_2.send(456).await.unwrap();
1316
1317        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1318    }
1319
1320    #[tokio::test]
1321    async fn multi_external_bytes() {
1322        let mut deployment = Deployment::new();
1323
1324        let mut flow = FlowBuilder::new();
1325        let first_node = flow.process::<()>();
1326        let external = flow.external::<()>();
1327
1328        let (in_port, input, _membership, complete_sink) = first_node
1329            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1330        let out = input.entries().send_bincode_external(&external);
1331        complete_sink.complete(
1332            first_node
1333                .source_iter(q!([]))
1334                .into_keyed()
1335                .weaken_ordering(),
1336        );
1337
1338        let nodes = flow
1339            .with_process(&first_node, deployment.Localhost())
1340            .with_external(&external, deployment.Localhost())
1341            .deploy(&mut deployment);
1342
1343        deployment.deploy().await.unwrap();
1344
1345        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1346        let mut external_in_2 = nodes.connect(in_port).await.1;
1347        let external_out = nodes.connect(out).await;
1348
1349        deployment.start().await.unwrap();
1350
1351        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1352        external_in_2.send(vec![4, 5].into()).await.unwrap();
1353
1354        assert_eq!(
1355            external_out.take(2).collect::<HashSet<_>>().await,
1356            vec![
1357                (0, (&[1u8, 2, 3] as &[u8]).into()),
1358                (1, (&[4u8, 5] as &[u8]).into())
1359            ]
1360            .into_iter()
1361            .collect()
1362        );
1363    }
1364
1365    #[tokio::test]
1366    async fn single_client_external_bytes() {
1367        let mut deployment = Deployment::new();
1368        let mut flow = FlowBuilder::new();
1369        let first_node = flow.process::<()>();
1370        let external = flow.external::<()>();
1371        let (port, input, complete_sink) = first_node
1372            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1373        complete_sink.complete(input.map(q!(|data| {
1374            let mut resp: Vec<u8> = data.into();
1375            resp.push(42);
1376            resp.into() // : Bytes
1377        })));
1378
1379        let nodes = flow
1380            .with_process(&first_node, deployment.Localhost())
1381            .with_external(&external, deployment.Localhost())
1382            .deploy(&mut deployment);
1383
1384        deployment.deploy().await.unwrap();
1385        deployment.start().await.unwrap();
1386
1387        let (mut external_out, mut external_in) = nodes.connect(port).await;
1388
1389        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1390        assert_eq!(
1391            external_out.next().await.unwrap().unwrap(),
1392            vec![1, 2, 3, 42]
1393        );
1394    }
1395
1396    #[tokio::test]
1397    async fn echo_external_bytes() {
1398        let mut deployment = Deployment::new();
1399
1400        let mut flow = FlowBuilder::new();
1401        let first_node = flow.process::<()>();
1402        let external = flow.external::<()>();
1403
1404        let (port, input, _membership, complete_sink) = first_node
1405            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1406        complete_sink
1407            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1408
1409        let nodes = flow
1410            .with_process(&first_node, deployment.Localhost())
1411            .with_external(&external, deployment.Localhost())
1412            .deploy(&mut deployment);
1413
1414        deployment.deploy().await.unwrap();
1415
1416        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1417        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1418
1419        deployment.start().await.unwrap();
1420
1421        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1422        external_in_2.send(vec![4, 5].into()).await.unwrap();
1423
1424        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1425        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1426    }
1427
1428    #[tokio::test]
1429    async fn echo_external_bincode() {
1430        let mut deployment = Deployment::new();
1431
1432        let mut flow = FlowBuilder::new();
1433        let first_node = flow.process::<()>();
1434        let external = flow.external::<()>();
1435
1436        let (port, input, _membership, complete_sink) =
1437            first_node.bidi_external_many_bincode(&external);
1438        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1439
1440        let nodes = flow
1441            .with_process(&first_node, deployment.Localhost())
1442            .with_external(&external, deployment.Localhost())
1443            .deploy(&mut deployment);
1444
1445        deployment.deploy().await.unwrap();
1446
1447        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1448        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1449
1450        deployment.start().await.unwrap();
1451
1452        external_in_1.send("hi".to_owned()).await.unwrap();
1453        external_in_2.send("hello".to_owned()).await.unwrap();
1454
1455        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1456        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1457    }
1458}