Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{
34    ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
35};
36use crate::forward_handle::ForwardRef;
37#[cfg(stageleft_runtime)]
38use crate::forward_handle::{CycleCollection, ForwardHandle};
39use crate::live_collections::boundedness::{Bounded, Unbounded};
40use crate::live_collections::keyed_stream::KeyedStream;
41use crate::live_collections::singleton::Singleton;
42use crate::live_collections::stream::{
43    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
44};
45use crate::location::dynamic::LocationId;
46use crate::location::external_process::{
47    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
48};
49use crate::nondet::NonDet;
50#[cfg(feature = "sim")]
51use crate::sim::SimSender;
52use crate::staging_util::get_this_crate;
53
54pub mod dynamic;
55
56pub mod external_process;
57pub use external_process::External;
58
59pub mod process;
60pub use process::Process;
61
62pub mod cluster;
63pub use cluster::Cluster;
64
65pub mod member_id;
66pub use member_id::{MemberId, TaglessMemberId};
67
68pub mod tick;
69pub use tick::{Atomic, NoTick, Tick};
70
71/// An event indicating a change in membership status of a location in a group
72/// (e.g. a node in a [`Cluster`] or an external client connection).
73#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
74pub enum MembershipEvent {
75    /// The member has joined the group and is now active.
76    Joined,
77    /// The member has left the group and is no longer active.
78    Left,
79}
80
81/// A hint for configuring the network transport used by an external connection.
82///
83/// This controls how the underlying TCP listener is set up when binding
84/// external client connections via methods like [`Location::bind_single_client`]
85/// or [`Location::bidi_external_many_bytes`].
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
87pub enum NetworkHint {
88    /// Automatically select the network configuration (e.g. an ephemeral port).
89    Auto,
90    /// Use a TCP port, optionally specifying a fixed port number.
91    ///
92    /// If `None`, an available port will be chosen automatically.
93    /// If `Some(port)`, the given port number will be used.
94    TcpPort(Option<u16>),
95}
96
97pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
98    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
99}
100
101#[stageleft::export(LocationKey)]
102new_key_type! {
103    /// A unique identifier for a clock tick.
104    pub struct LocationKey;
105}
106
107impl std::fmt::Display for LocationKey {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
110    }
111}
112
113/// This is used for the ECS membership stream.
114/// TODO(mingwei): Make this more robust?
115impl std::str::FromStr for LocationKey {
116    type Err = Option<ParseIntError>;
117
118    fn from_str(s: &str) -> Result<Self, Self::Err> {
119        let nvn = s.strip_prefix("loc").ok_or(None)?;
120        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
121        let idx: u64 = idx.parse()?;
122        let ver: u64 = ver.parse()?;
123        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
124    }
125}
126
127impl LocationKey {
128    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
129    /// The first location key, used by the simulator as the default external location.
130    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
131
132    /// A key for testing with index 1.
133    #[cfg(test)]
134    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); // `1v255`
135
136    /// A key for testing with index 2.
137    #[cfg(test)]
138    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); // `2v255`
139}
140
141/// This is used within `q!` code in docker and ECS.
142impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
143    type O = LocationKey;
144
145    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
146    where
147        Self: Sized,
148    {
149        let root = get_this_crate();
150        let n = Key::data(&self).as_ffi();
151        (
152            QuoteTokens {
153                prelude: None,
154                expr: Some(quote! {
155                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
156                }),
157            },
158            (),
159        )
160    }
161}
162
163/// A simple enum for the type of a root location.
164#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
165pub enum LocationType {
166    /// A process (single node).
167    Process,
168    /// A cluster (multiple nodes).
169    Cluster,
170    /// An external client.
171    External,
172}
173
174/// A location where data can be materialized and computation can be executed.
175///
176/// Hydro is a **global**, **distributed** programming model. This means that the data
177/// and computation in a Hydro program can be spread across multiple machines, data
178/// centers, and even continents. To achieve this, Hydro uses the concept of
179/// **locations** to keep track of _where_ data is located and computation is executed.
180///
181/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
182/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
183/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
184/// to allow live collections to be _moved_ between locations via network send/receive.
185///
186/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
187#[expect(
188    private_bounds,
189    reason = "only internal Hydro code can define location types"
190)]
191pub trait Location<'a>: dynamic::DynLocation {
192    /// The root location type for this location.
193    ///
194    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
195    /// For nested locations like [`Tick`], this is the root location that contains it.
196    type Root: Location<'a>;
197
198    /// Returns the root location for this location.
199    ///
200    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
201    /// For nested locations like [`Tick`], this returns the root location that contains it.
202    fn root(&self) -> Self::Root;
203
204    /// Attempts to create a new [`Tick`] clock domain at this location.
205    ///
206    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
207    /// or `None` if this location is already inside a tick (nested ticks are not supported).
208    ///
209    /// Prefer using [`Location::tick`] when you know the location is top-level.
210    fn try_tick(&self) -> Option<Tick<Self>> {
211        if Self::is_top_level() {
212            let id = self.flow_state().borrow_mut().next_clock_id();
213            Some(Tick {
214                id,
215                l: self.clone(),
216            })
217        } else {
218            None
219        }
220    }
221
222    /// Returns the unique identifier for this location.
223    fn id(&self) -> LocationId {
224        dynamic::DynLocation::id(self)
225    }
226
227    /// Creates a new [`Tick`] clock domain at this location.
228    ///
229    /// A tick represents a logical clock that can be used to batch streaming data
230    /// into discrete time steps. This is useful for implementing iterative algorithms
231    /// or for synchronizing data across multiple streams.
232    ///
233    /// # Example
234    /// ```rust
235    /// # #[cfg(feature = "deploy")] {
236    /// # use hydro_lang::prelude::*;
237    /// # use futures::StreamExt;
238    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
239    /// let tick = process.tick();
240    /// let inside_tick = process
241    ///     .source_iter(q!(vec![1, 2, 3, 4]))
242    ///     .batch(&tick, nondet!(/** test */));
243    /// inside_tick.all_ticks()
244    /// # }, |mut stream| async move {
245    /// // 1, 2, 3, 4
246    /// # for w in vec![1, 2, 3, 4] {
247    /// #     assert_eq!(stream.next().await.unwrap(), w);
248    /// # }
249    /// # }));
250    /// # }
251    /// ```
252    fn tick(&self) -> Tick<Self>
253    where
254        Self: NoTick,
255    {
256        let id = self.flow_state().borrow_mut().next_clock_id();
257        Tick {
258            id,
259            l: self.clone(),
260        }
261    }
262
263    /// Creates an unbounded stream that continuously emits unit values `()`.
264    ///
265    /// This is useful for driving computations that need to run continuously,
266    /// such as polling or heartbeat mechanisms.
267    ///
268    /// # Example
269    /// ```rust
270    /// # #[cfg(feature = "deploy")] {
271    /// # use hydro_lang::prelude::*;
272    /// # use futures::StreamExt;
273    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
274    /// let tick = process.tick();
275    /// process.spin()
276    ///     .batch(&tick, nondet!(/** test */))
277    ///     .map(q!(|_| 42))
278    ///     .all_ticks()
279    /// # }, |mut stream| async move {
280    /// // 42, 42, 42, ...
281    /// # assert_eq!(stream.next().await.unwrap(), 42);
282    /// # assert_eq!(stream.next().await.unwrap(), 42);
283    /// # assert_eq!(stream.next().await.unwrap(), 42);
284    /// # }));
285    /// # }
286    /// ```
287    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
288    where
289        Self: Sized + NoTick,
290    {
291        Stream::new(
292            self.clone(),
293            HydroNode::Source {
294                source: HydroSource::Spin(),
295                metadata: self.new_node_metadata(Stream::<
296                    (),
297                    Self,
298                    Unbounded,
299                    TotalOrder,
300                    ExactlyOnce,
301                >::collection_kind()),
302            },
303        )
304    }
305
306    /// Creates a stream from an async [`FuturesStream`].
307    ///
308    /// This is useful for integrating with external async data sources,
309    /// such as network connections or file readers.
310    ///
311    /// # Example
312    /// ```rust
313    /// # #[cfg(feature = "deploy")] {
314    /// # use hydro_lang::prelude::*;
315    /// # use futures::StreamExt;
316    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
317    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
318    /// # }, |mut stream| async move {
319    /// // 1, 2, 3
320    /// # for w in vec![1, 2, 3] {
321    /// #     assert_eq!(stream.next().await.unwrap(), w);
322    /// # }
323    /// # }));
324    /// # }
325    /// ```
326    fn source_stream<T, E>(
327        &self,
328        e: impl QuotedWithContext<'a, E, Self>,
329    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
330    where
331        E: FuturesStream<Item = T> + Unpin,
332        Self: Sized + NoTick,
333    {
334        let e = e.splice_untyped_ctx(self);
335
336        Stream::new(
337            self.clone(),
338            HydroNode::Source {
339                source: HydroSource::Stream(e.into()),
340                metadata: self.new_node_metadata(Stream::<
341                    T,
342                    Self,
343                    Unbounded,
344                    TotalOrder,
345                    ExactlyOnce,
346                >::collection_kind()),
347            },
348        )
349    }
350
351    /// Creates a bounded stream from an iterator.
352    ///
353    /// The iterator is evaluated once at runtime, and all elements are emitted
354    /// in order. This is useful for creating streams from static data or
355    /// for testing.
356    ///
357    /// # Example
358    /// ```rust
359    /// # #[cfg(feature = "deploy")] {
360    /// # use hydro_lang::prelude::*;
361    /// # use futures::StreamExt;
362    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
363    /// process.source_iter(q!(vec![1, 2, 3, 4]))
364    /// # }, |mut stream| async move {
365    /// // 1, 2, 3, 4
366    /// # for w in vec![1, 2, 3, 4] {
367    /// #     assert_eq!(stream.next().await.unwrap(), w);
368    /// # }
369    /// # }));
370    /// # }
371    /// ```
372    fn source_iter<T, E>(
373        &self,
374        e: impl QuotedWithContext<'a, E, Self>,
375    ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
376    where
377        E: IntoIterator<Item = T>,
378        Self: Sized,
379    {
380        let e = e.splice_typed_ctx(self);
381
382        Stream::new(
383            self.clone(),
384            HydroNode::Source {
385                source: HydroSource::Iter(e.into()),
386                metadata: self.new_node_metadata(
387                    Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
388                ),
389            },
390        )
391    }
392
393    /// Creates a stream of membership events for a cluster.
394    ///
395    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
396    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
397    /// keyed by the [`MemberId`] of the cluster member.
398    ///
399    /// This is useful for implementing protocols that need to track cluster membership,
400    /// such as broadcasting to all members or detecting failures.
401    ///
402    /// # Example
403    /// ```rust
404    /// # #[cfg(feature = "deploy")] {
405    /// # use hydro_lang::prelude::*;
406    /// # use futures::StreamExt;
407    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
408    /// let p1 = flow.process::<()>();
409    /// let workers: Cluster<()> = flow.cluster::<()>();
410    /// # // do nothing on each worker
411    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
412    /// let cluster_members = p1.source_cluster_members(&workers);
413    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
414    /// // if there are 4 members in the cluster, we would see a join event for each
415    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
416    /// # }, |mut stream| async move {
417    /// # let mut results = Vec::new();
418    /// # for w in 0..4 {
419    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
420    /// # }
421    /// # results.sort();
422    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
423    /// # }));
424    /// # }
425    /// ```
426    fn source_cluster_members<C: 'a>(
427        &self,
428        cluster: &Cluster<'a, C>,
429    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
430    where
431        Self: Sized + NoTick,
432    {
433        Stream::new(
434            self.clone(),
435            HydroNode::Source {
436                source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
437                metadata: self.new_node_metadata(Stream::<
438                    (TaglessMemberId, MembershipEvent),
439                    Self,
440                    Unbounded,
441                    TotalOrder,
442                    ExactlyOnce,
443                >::collection_kind()),
444            },
445        )
446        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
447        .into_keyed()
448    }
449
450    /// Creates a one-way connection from an external process to receive raw bytes.
451    ///
452    /// Returns a port handle for the external process to connect to, and a stream
453    /// of received byte buffers.
454    ///
455    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
456    /// or [`Location::source_external_bincode`].
457    fn source_external_bytes<L>(
458        &self,
459        from: &External<L>,
460    ) -> (
461        ExternalBytesPort,
462        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
463    )
464    where
465        Self: Sized + NoTick,
466    {
467        let (port, stream, sink) =
468            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
469
470        sink.complete(self.source_iter(q!([])));
471
472        (port, stream)
473    }
474
475    /// Creates a one-way connection from an external process to receive bincode-serialized data.
476    ///
477    /// Returns a sink handle for the external process to send data to, and a stream
478    /// of received values.
479    ///
480    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
481    #[expect(clippy::type_complexity, reason = "stream markers")]
482    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
483        &self,
484        from: &External<L>,
485    ) -> (
486        ExternalBincodeSink<T, NotMany, O, R>,
487        Stream<T, Self, Unbounded, O, R>,
488    )
489    where
490        Self: Sized + NoTick,
491        T: Serialize + DeserializeOwned,
492    {
493        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
494        sink.complete(self.source_iter(q!([])));
495
496        (
497            ExternalBincodeSink {
498                process_key: from.key,
499                port_id: port.port_id,
500                _phantom: PhantomData,
501            },
502            stream.weaken_ordering().weaken_retries(),
503        )
504    }
505
506    /// Sets up a simulated input port on this location for testing.
507    ///
508    /// Returns a handle to send messages to the location as well as a stream
509    /// of received messages. This is only available when the `sim` feature is enabled.
510    #[cfg(feature = "sim")]
511    #[expect(clippy::type_complexity, reason = "stream markers")]
512    fn sim_input<T, O: Ordering, R: Retries>(
513        &self,
514    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
515    where
516        Self: Sized + NoTick,
517        T: Serialize + DeserializeOwned,
518    {
519        let external_location: External<'a, ()> = External {
520            key: LocationKey::FIRST,
521            flow_state: self.flow_state().clone(),
522            _phantom: PhantomData,
523        };
524
525        let (external, stream) = self.source_external_bincode(&external_location);
526
527        (SimSender(external.port_id, PhantomData), stream)
528    }
529
530    /// Creates an external input stream for embedded deployment mode.
531    ///
532    /// The `name` parameter specifies the name of the generated function parameter
533    /// that will supply data to this stream at runtime. The generated function will
534    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
535    fn embedded_input<T>(
536        &self,
537        name: impl Into<String>,
538    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
539    where
540        Self: Sized + NoTick,
541    {
542        let ident = syn::Ident::new(&name.into(), Span::call_site());
543
544        Stream::new(
545            self.clone(),
546            HydroNode::Source {
547                source: HydroSource::Embedded(ident),
548                metadata: self.new_node_metadata(Stream::<
549                    T,
550                    Self,
551                    Unbounded,
552                    TotalOrder,
553                    ExactlyOnce,
554                >::collection_kind()),
555            },
556        )
557    }
558
559    /// Creates an embedded singleton input for embedded deployment mode.
560    ///
561    /// The `name` parameter specifies the name of the generated function parameter
562    /// that will supply data to this singleton at runtime. The generated function will
563    /// accept a plain `T` parameter with this name.
564    fn embedded_singleton_input<T>(&self, name: impl Into<String>) -> Singleton<T, Self, Bounded>
565    where
566        Self: Sized + NoTick,
567    {
568        let ident = syn::Ident::new(&name.into(), Span::call_site());
569
570        Singleton::new(
571            self.clone(),
572            HydroNode::Source {
573                source: HydroSource::EmbeddedSingleton(ident),
574                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
575            },
576        )
577    }
578
579    /// Establishes a server on this location to receive a bidirectional connection from a single
580    /// client, identified by the given `External` handle. Returns a port handle for the external
581    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
582    /// messages.
583    ///
584    /// # Example
585    /// ```rust
586    /// # #[cfg(feature = "deploy")] {
587    /// # use hydro_lang::prelude::*;
588    /// # use hydro_deploy::Deployment;
589    /// # use futures::{SinkExt, StreamExt};
590    /// # tokio_test::block_on(async {
591    /// # use bytes::Bytes;
592    /// # use hydro_lang::location::NetworkHint;
593    /// # use tokio_util::codec::LengthDelimitedCodec;
594    /// # let mut flow = FlowBuilder::new();
595    /// let node = flow.process::<()>();
596    /// let external = flow.external::<()>();
597    /// let (port, incoming, outgoing) =
598    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
599    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
600    ///     let mut resp: Vec<u8> = data.into();
601    ///     resp.push(42);
602    ///     resp.into() // : Bytes
603    /// })));
604    ///
605    /// # let mut deployment = Deployment::new();
606    /// let nodes = flow // ... with_process and with_external
607    /// #     .with_process(&node, deployment.Localhost())
608    /// #     .with_external(&external, deployment.Localhost())
609    /// #     .deploy(&mut deployment);
610    ///
611    /// deployment.deploy().await.unwrap();
612    /// deployment.start().await.unwrap();
613    ///
614    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
615    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
616    /// assert_eq!(
617    ///     external_out.next().await.unwrap().unwrap(),
618    ///     vec![1, 2, 3, 42]
619    /// );
620    /// # });
621    /// # }
622    /// ```
623    #[expect(clippy::type_complexity, reason = "stream markers")]
624    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
625        &self,
626        from: &External<L>,
627        port_hint: NetworkHint,
628    ) -> (
629        ExternalBytesPort<NotMany>,
630        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
631        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
632    )
633    where
634        Self: Sized + NoTick,
635    {
636        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
637
638        let (fwd_ref, to_sink) =
639            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
640        let mut flow_state_borrow = self.flow_state().borrow_mut();
641
642        flow_state_borrow.push_root(HydroRoot::SendExternal {
643            to_external_key: from.key,
644            to_port_id: next_external_port_id,
645            to_many: false,
646            unpaired: false,
647            serialize_fn: None,
648            instantiate_fn: DebugInstantiate::Building,
649            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
650            op_metadata: HydroIrOpMetadata::new(),
651        });
652
653        let raw_stream: Stream<
654            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
655            Self,
656            Unbounded,
657            TotalOrder,
658            ExactlyOnce,
659        > = Stream::new(
660            self.clone(),
661            HydroNode::ExternalInput {
662                from_external_key: from.key,
663                from_port_id: next_external_port_id,
664                from_many: false,
665                codec_type: quote_type::<Codec>().into(),
666                port_hint,
667                instantiate_fn: DebugInstantiate::Building,
668                deserialize_fn: None,
669                metadata: self.new_node_metadata(Stream::<
670                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
671                    Self,
672                    Unbounded,
673                    TotalOrder,
674                    ExactlyOnce,
675                >::collection_kind()),
676            },
677        );
678
679        (
680            ExternalBytesPort {
681                process_key: from.key,
682                port_id: next_external_port_id,
683                _phantom: PhantomData,
684            },
685            raw_stream.flatten_ordered(),
686            fwd_ref,
687        )
688    }
689
690    /// Establishes a bidirectional connection from a single external client using bincode serialization.
691    ///
692    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
693    /// and a handle to send outgoing messages. This is a convenience wrapper around
694    /// [`Location::bind_single_client`] that uses bincode for serialization.
695    ///
696    /// # Type Parameters
697    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
698    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
699    #[expect(clippy::type_complexity, reason = "stream markers")]
700    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
701        &self,
702        from: &External<L>,
703    ) -> (
704        ExternalBincodeBidi<InT, OutT, NotMany>,
705        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
706        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
707    )
708    where
709        Self: Sized + NoTick,
710    {
711        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
712
713        let (fwd_ref, to_sink) =
714            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
715        let mut flow_state_borrow = self.flow_state().borrow_mut();
716
717        let root = get_this_crate();
718
719        let out_t_type = quote_type::<OutT>();
720        let ser_fn: syn::Expr = syn::parse_quote! {
721            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
722                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
723            )
724        };
725
726        flow_state_borrow.push_root(HydroRoot::SendExternal {
727            to_external_key: from.key,
728            to_port_id: next_external_port_id,
729            to_many: false,
730            unpaired: false,
731            serialize_fn: Some(ser_fn.into()),
732            instantiate_fn: DebugInstantiate::Building,
733            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
734            op_metadata: HydroIrOpMetadata::new(),
735        });
736
737        let in_t_type = quote_type::<InT>();
738
739        let deser_fn: syn::Expr = syn::parse_quote! {
740            |res| {
741                let b = res.unwrap();
742                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
743            }
744        };
745
746        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
747            self.clone(),
748            HydroNode::ExternalInput {
749                from_external_key: from.key,
750                from_port_id: next_external_port_id,
751                from_many: false,
752                codec_type: quote_type::<LengthDelimitedCodec>().into(),
753                port_hint: NetworkHint::Auto,
754                instantiate_fn: DebugInstantiate::Building,
755                deserialize_fn: Some(deser_fn.into()),
756                metadata: self.new_node_metadata(Stream::<
757                    InT,
758                    Self,
759                    Unbounded,
760                    TotalOrder,
761                    ExactlyOnce,
762                >::collection_kind()),
763            },
764        );
765
766        (
767            ExternalBincodeBidi {
768                process_key: from.key,
769                port_id: next_external_port_id,
770                _phantom: PhantomData,
771            },
772            raw_stream,
773            fwd_ref,
774        )
775    }
776
777    /// Establishes a server on this location to receive bidirectional connections from multiple
778    /// external clients using raw bytes.
779    ///
780    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
781    /// connections. Each client is assigned a unique `u64` identifier.
782    ///
783    /// Returns:
784    /// - A port handle for external processes to connect to
785    /// - A keyed stream of incoming messages, keyed by client ID
786    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
787    /// - A handle to send outgoing messages, keyed by client ID
788    #[expect(clippy::type_complexity, reason = "stream markers")]
789    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
790        &self,
791        from: &External<L>,
792        port_hint: NetworkHint,
793    ) -> (
794        ExternalBytesPort<Many>,
795        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
796        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
797        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
798    )
799    where
800        Self: Sized + NoTick,
801    {
802        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
803
804        let (fwd_ref, to_sink) =
805            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
806        let mut flow_state_borrow = self.flow_state().borrow_mut();
807
808        flow_state_borrow.push_root(HydroRoot::SendExternal {
809            to_external_key: from.key,
810            to_port_id: next_external_port_id,
811            to_many: true,
812            unpaired: false,
813            serialize_fn: None,
814            instantiate_fn: DebugInstantiate::Building,
815            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
816            op_metadata: HydroIrOpMetadata::new(),
817        });
818
819        let raw_stream: Stream<
820            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
821            Self,
822            Unbounded,
823            TotalOrder,
824            ExactlyOnce,
825        > = Stream::new(
826            self.clone(),
827            HydroNode::ExternalInput {
828                from_external_key: from.key,
829                from_port_id: next_external_port_id,
830                from_many: true,
831                codec_type: quote_type::<Codec>().into(),
832                port_hint,
833                instantiate_fn: DebugInstantiate::Building,
834                deserialize_fn: None,
835                metadata: self.new_node_metadata(Stream::<
836                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
837                    Self,
838                    Unbounded,
839                    TotalOrder,
840                    ExactlyOnce,
841                >::collection_kind()),
842            },
843        );
844
845        let membership_stream_ident = syn::Ident::new(
846            &format!(
847                "__hydro_deploy_many_{}_{}_membership",
848                from.key, next_external_port_id
849            ),
850            Span::call_site(),
851        );
852        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
853        let raw_membership_stream: KeyedStream<
854            u64,
855            bool,
856            Self,
857            Unbounded,
858            TotalOrder,
859            ExactlyOnce,
860        > = KeyedStream::new(
861            self.clone(),
862            HydroNode::Source {
863                source: HydroSource::Stream(membership_stream_expr.into()),
864                metadata: self.new_node_metadata(KeyedStream::<
865                    u64,
866                    bool,
867                    Self,
868                    Unbounded,
869                    TotalOrder,
870                    ExactlyOnce,
871                >::collection_kind()),
872            },
873        );
874
875        (
876            ExternalBytesPort {
877                process_key: from.key,
878                port_id: next_external_port_id,
879                _phantom: PhantomData,
880            },
881            raw_stream
882                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
883                .into_keyed(),
884            raw_membership_stream.map(q!(|join| {
885                if join {
886                    MembershipEvent::Joined
887                } else {
888                    MembershipEvent::Left
889                }
890            })),
891            fwd_ref,
892        )
893    }
894
895    /// Establishes a server on this location to receive bidirectional connections from multiple
896    /// external clients using bincode serialization.
897    ///
898    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
899    /// client connections. Each client is assigned a unique `u64` identifier.
900    ///
901    /// Returns:
902    /// - A port handle for external processes to connect to
903    /// - A keyed stream of incoming messages, keyed by client ID
904    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
905    /// - A handle to send outgoing messages, keyed by client ID
906    ///
907    /// # Type Parameters
908    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
909    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
910    #[expect(clippy::type_complexity, reason = "stream markers")]
911    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
912        &self,
913        from: &External<L>,
914    ) -> (
915        ExternalBincodeBidi<InT, OutT, Many>,
916        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
917        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
918        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
919    )
920    where
921        Self: Sized + NoTick,
922    {
923        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
924
925        let (fwd_ref, to_sink) =
926            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
927        let mut flow_state_borrow = self.flow_state().borrow_mut();
928
929        let root = get_this_crate();
930
931        let out_t_type = quote_type::<OutT>();
932        let ser_fn: syn::Expr = syn::parse_quote! {
933            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
934                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
935            )
936        };
937
938        flow_state_borrow.push_root(HydroRoot::SendExternal {
939            to_external_key: from.key,
940            to_port_id: next_external_port_id,
941            to_many: true,
942            unpaired: false,
943            serialize_fn: Some(ser_fn.into()),
944            instantiate_fn: DebugInstantiate::Building,
945            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
946            op_metadata: HydroIrOpMetadata::new(),
947        });
948
949        let in_t_type = quote_type::<InT>();
950
951        let deser_fn: syn::Expr = syn::parse_quote! {
952            |res| {
953                let (id, b) = res.unwrap();
954                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
955            }
956        };
957
958        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
959            KeyedStream::new(
960                self.clone(),
961                HydroNode::ExternalInput {
962                    from_external_key: from.key,
963                    from_port_id: next_external_port_id,
964                    from_many: true,
965                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
966                    port_hint: NetworkHint::Auto,
967                    instantiate_fn: DebugInstantiate::Building,
968                    deserialize_fn: Some(deser_fn.into()),
969                    metadata: self.new_node_metadata(KeyedStream::<
970                        u64,
971                        InT,
972                        Self,
973                        Unbounded,
974                        TotalOrder,
975                        ExactlyOnce,
976                    >::collection_kind()),
977                },
978            );
979
980        let membership_stream_ident = syn::Ident::new(
981            &format!(
982                "__hydro_deploy_many_{}_{}_membership",
983                from.key, next_external_port_id
984            ),
985            Span::call_site(),
986        );
987        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
988        let raw_membership_stream: KeyedStream<
989            u64,
990            bool,
991            Self,
992            Unbounded,
993            TotalOrder,
994            ExactlyOnce,
995        > = KeyedStream::new(
996            self.clone(),
997            HydroNode::Source {
998                source: HydroSource::Stream(membership_stream_expr.into()),
999                metadata: self.new_node_metadata(KeyedStream::<
1000                    u64,
1001                    bool,
1002                    Self,
1003                    Unbounded,
1004                    TotalOrder,
1005                    ExactlyOnce,
1006                >::collection_kind()),
1007            },
1008        );
1009
1010        (
1011            ExternalBincodeBidi {
1012                process_key: from.key,
1013                port_id: next_external_port_id,
1014                _phantom: PhantomData,
1015            },
1016            raw_stream,
1017            raw_membership_stream.map(q!(|join| {
1018                if join {
1019                    MembershipEvent::Joined
1020                } else {
1021                    MembershipEvent::Left
1022                }
1023            })),
1024            fwd_ref,
1025        )
1026    }
1027
1028    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1029    ///
1030    /// # Example
1031    /// ```rust
1032    /// # #[cfg(feature = "deploy")] {
1033    /// # use hydro_lang::prelude::*;
1034    /// # use futures::StreamExt;
1035    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1036    /// let tick = process.tick();
1037    /// let singleton = tick.singleton(q!(5));
1038    /// # singleton.all_ticks()
1039    /// # }, |mut stream| async move {
1040    /// // 5
1041    /// # assert_eq!(stream.next().await.unwrap(), 5);
1042    /// # }));
1043    /// # }
1044    /// ```
1045    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1046    where
1047        T: Clone,
1048        Self: Sized,
1049    {
1050        let e = e.splice_untyped_ctx(self);
1051
1052        Singleton::new(
1053            self.clone(),
1054            HydroNode::SingletonSource {
1055                value: e.into(),
1056                first_tick_only: false,
1057                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1058            },
1059        )
1060    }
1061
1062    /// Generates a stream with values emitted at a fixed interval, with
1063    /// each value being the current time (as an [`tokio::time::Instant`]).
1064    ///
1065    /// The clock source used is monotonic, so elements will be emitted in
1066    /// increasing order.
1067    ///
1068    /// # Non-Determinism
1069    /// Because this stream is generated by an OS timer, it will be
1070    /// non-deterministic because each timestamp will be arbitrary.
1071    fn source_interval(
1072        &self,
1073        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1074        _nondet: NonDet,
1075    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1076    where
1077        Self: Sized + NoTick,
1078    {
1079        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1080            tokio::time::interval(interval)
1081        )))
1082    }
1083
1084    /// Generates a stream with values emitted at a fixed interval (with an
1085    /// initial delay), with each value being the current time
1086    /// (as an [`tokio::time::Instant`]).
1087    ///
1088    /// The clock source used is monotonic, so elements will be emitted in
1089    /// increasing order.
1090    ///
1091    /// # Non-Determinism
1092    /// Because this stream is generated by an OS timer, it will be
1093    /// non-deterministic because each timestamp will be arbitrary.
1094    fn source_interval_delayed(
1095        &self,
1096        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1097        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1098        _nondet: NonDet,
1099    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1100    where
1101        Self: Sized + NoTick,
1102    {
1103        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1104            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1105        )))
1106    }
1107
1108    /// Creates a forward reference for defining recursive or mutually-dependent dataflows.
1109    ///
1110    /// Returns a handle that must be completed with the actual stream, and a placeholder
1111    /// stream that can be used in the dataflow graph before the actual stream is defined.
1112    ///
1113    /// This is useful for implementing feedback loops or recursive computations where
1114    /// a stream depends on its own output.
1115    ///
1116    /// # Example
1117    /// ```rust
1118    /// # #[cfg(feature = "deploy")] {
1119    /// # use hydro_lang::prelude::*;
1120    /// # use hydro_lang::live_collections::stream::NoOrder;
1121    /// # use futures::StreamExt;
1122    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1123    /// // Create a forward reference for the feedback stream
1124    /// let (complete, feedback) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1125    ///
1126    /// // Combine initial input with feedback, then increment
1127    /// let input: Stream<_, _, Unbounded> = process.source_iter(q!([1])).into();
1128    /// let output: Stream<_, _, _, NoOrder> = input.merge_unordered(feedback).map(q!(|x| x + 1));
1129    ///
1130    /// // Complete the forward reference with the output
1131    /// complete.complete(output.clone());
1132    /// output
1133    /// # }, |mut stream| async move {
1134    /// // 2, 3, 4, 5, ...
1135    /// # assert_eq!(stream.next().await.unwrap(), 2);
1136    /// # assert_eq!(stream.next().await.unwrap(), 3);
1137    /// # assert_eq!(stream.next().await.unwrap(), 4);
1138    /// # }));
1139    /// # }
1140    /// ```
1141    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1142    where
1143        S: CycleCollection<'a, ForwardRef, Location = Self>,
1144    {
1145        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1146        (
1147            ForwardHandle::new(cycle_id, Location::id(self)),
1148            S::create_source(cycle_id, self.clone()),
1149        )
1150    }
1151}
1152
1153#[cfg(feature = "deploy")]
1154#[cfg(test)]
1155mod tests {
1156    use std::collections::HashSet;
1157
1158    use futures::{SinkExt, StreamExt};
1159    use hydro_deploy::Deployment;
1160    use stageleft::q;
1161    use tokio_util::codec::LengthDelimitedCodec;
1162
1163    use crate::compile::builder::FlowBuilder;
1164    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1165    use crate::location::{Location, NetworkHint};
1166    use crate::nondet::nondet;
1167
1168    #[tokio::test]
1169    async fn top_level_singleton_replay_cardinality() {
1170        let mut deployment = Deployment::new();
1171
1172        let mut flow = FlowBuilder::new();
1173        let node = flow.process::<()>();
1174        let external = flow.external::<()>();
1175
1176        let (in_port, input) =
1177            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1178        let singleton = node.singleton(q!(123));
1179        let tick = node.tick();
1180        let out = input
1181            .batch(&tick, nondet!(/** test */))
1182            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1183            .cross_singleton(
1184                singleton
1185                    .snapshot(&tick, nondet!(/** test */))
1186                    .into_stream()
1187                    .count(),
1188            )
1189            .all_ticks()
1190            .send_bincode_external(&external);
1191
1192        let nodes = flow
1193            .with_process(&node, deployment.Localhost())
1194            .with_external(&external, deployment.Localhost())
1195            .deploy(&mut deployment);
1196
1197        deployment.deploy().await.unwrap();
1198
1199        let mut external_in = nodes.connect(in_port).await;
1200        let mut external_out = nodes.connect(out).await;
1201
1202        deployment.start().await.unwrap();
1203
1204        external_in.send(1).await.unwrap();
1205        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1206
1207        external_in.send(2).await.unwrap();
1208        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1209    }
1210
1211    #[tokio::test]
1212    async fn tick_singleton_replay_cardinality() {
1213        let mut deployment = Deployment::new();
1214
1215        let mut flow = FlowBuilder::new();
1216        let node = flow.process::<()>();
1217        let external = flow.external::<()>();
1218
1219        let (in_port, input) =
1220            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1221        let tick = node.tick();
1222        let singleton = tick.singleton(q!(123));
1223        let out = input
1224            .batch(&tick, nondet!(/** test */))
1225            .cross_singleton(singleton.clone())
1226            .cross_singleton(singleton.into_stream().count())
1227            .all_ticks()
1228            .send_bincode_external(&external);
1229
1230        let nodes = flow
1231            .with_process(&node, deployment.Localhost())
1232            .with_external(&external, deployment.Localhost())
1233            .deploy(&mut deployment);
1234
1235        deployment.deploy().await.unwrap();
1236
1237        let mut external_in = nodes.connect(in_port).await;
1238        let mut external_out = nodes.connect(out).await;
1239
1240        deployment.start().await.unwrap();
1241
1242        external_in.send(1).await.unwrap();
1243        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1244
1245        external_in.send(2).await.unwrap();
1246        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1247    }
1248
1249    #[tokio::test]
1250    async fn external_bytes() {
1251        let mut deployment = Deployment::new();
1252
1253        let mut flow = FlowBuilder::new();
1254        let first_node = flow.process::<()>();
1255        let external = flow.external::<()>();
1256
1257        let (in_port, input) = first_node.source_external_bytes(&external);
1258        let out = input.send_bincode_external(&external);
1259
1260        let nodes = flow
1261            .with_process(&first_node, deployment.Localhost())
1262            .with_external(&external, deployment.Localhost())
1263            .deploy(&mut deployment);
1264
1265        deployment.deploy().await.unwrap();
1266
1267        let mut external_in = nodes.connect(in_port).await.1;
1268        let mut external_out = nodes.connect(out).await;
1269
1270        deployment.start().await.unwrap();
1271
1272        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1273
1274        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1275    }
1276
1277    #[tokio::test]
1278    async fn multi_external_source() {
1279        let mut deployment = Deployment::new();
1280
1281        let mut flow = FlowBuilder::new();
1282        let first_node = flow.process::<()>();
1283        let external = flow.external::<()>();
1284
1285        let (in_port, input, _membership, complete_sink) =
1286            first_node.bidi_external_many_bincode(&external);
1287        let out = input.entries().send_bincode_external(&external);
1288        complete_sink.complete(
1289            first_node
1290                .source_iter::<(u64, ()), _>(q!([]))
1291                .into_keyed()
1292                .weaken_ordering(),
1293        );
1294
1295        let nodes = flow
1296            .with_process(&first_node, deployment.Localhost())
1297            .with_external(&external, deployment.Localhost())
1298            .deploy(&mut deployment);
1299
1300        deployment.deploy().await.unwrap();
1301
1302        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1303        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1304        let external_out = nodes.connect(out).await;
1305
1306        deployment.start().await.unwrap();
1307
1308        external_in_1.send(123).await.unwrap();
1309        external_in_2.send(456).await.unwrap();
1310
1311        assert_eq!(
1312            external_out.take(2).collect::<HashSet<_>>().await,
1313            vec![(0, 123), (1, 456)].into_iter().collect()
1314        );
1315    }
1316
1317    #[tokio::test]
1318    async fn second_connection_only_multi_source() {
1319        let mut deployment = Deployment::new();
1320
1321        let mut flow = FlowBuilder::new();
1322        let first_node = flow.process::<()>();
1323        let external = flow.external::<()>();
1324
1325        let (in_port, input, _membership, complete_sink) =
1326            first_node.bidi_external_many_bincode(&external);
1327        let out = input.entries().send_bincode_external(&external);
1328        complete_sink.complete(
1329            first_node
1330                .source_iter::<(u64, ()), _>(q!([]))
1331                .into_keyed()
1332                .weaken_ordering(),
1333        );
1334
1335        let nodes = flow
1336            .with_process(&first_node, deployment.Localhost())
1337            .with_external(&external, deployment.Localhost())
1338            .deploy(&mut deployment);
1339
1340        deployment.deploy().await.unwrap();
1341
1342        // intentionally skipped to test stream waking logic
1343        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1344        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1345        let mut external_out = nodes.connect(out).await;
1346
1347        deployment.start().await.unwrap();
1348
1349        external_in_2.send(456).await.unwrap();
1350
1351        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1352    }
1353
1354    #[tokio::test]
1355    async fn multi_external_bytes() {
1356        let mut deployment = Deployment::new();
1357
1358        let mut flow = FlowBuilder::new();
1359        let first_node = flow.process::<()>();
1360        let external = flow.external::<()>();
1361
1362        let (in_port, input, _membership, complete_sink) = first_node
1363            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1364        let out = input.entries().send_bincode_external(&external);
1365        complete_sink.complete(
1366            first_node
1367                .source_iter(q!([]))
1368                .into_keyed()
1369                .weaken_ordering(),
1370        );
1371
1372        let nodes = flow
1373            .with_process(&first_node, deployment.Localhost())
1374            .with_external(&external, deployment.Localhost())
1375            .deploy(&mut deployment);
1376
1377        deployment.deploy().await.unwrap();
1378
1379        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1380        let mut external_in_2 = nodes.connect(in_port).await.1;
1381        let external_out = nodes.connect(out).await;
1382
1383        deployment.start().await.unwrap();
1384
1385        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1386        external_in_2.send(vec![4, 5].into()).await.unwrap();
1387
1388        assert_eq!(
1389            external_out.take(2).collect::<HashSet<_>>().await,
1390            vec![
1391                (0, (&[1u8, 2, 3] as &[u8]).into()),
1392                (1, (&[4u8, 5] as &[u8]).into())
1393            ]
1394            .into_iter()
1395            .collect()
1396        );
1397    }
1398
1399    #[tokio::test]
1400    async fn single_client_external_bytes() {
1401        let mut deployment = Deployment::new();
1402        let mut flow = FlowBuilder::new();
1403        let first_node = flow.process::<()>();
1404        let external = flow.external::<()>();
1405        let (port, input, complete_sink) = first_node
1406            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1407        complete_sink.complete(input.map(q!(|data| {
1408            let mut resp: Vec<u8> = data.into();
1409            resp.push(42);
1410            resp.into() // : Bytes
1411        })));
1412
1413        let nodes = flow
1414            .with_process(&first_node, deployment.Localhost())
1415            .with_external(&external, deployment.Localhost())
1416            .deploy(&mut deployment);
1417
1418        deployment.deploy().await.unwrap();
1419        deployment.start().await.unwrap();
1420
1421        let (mut external_out, mut external_in) = nodes.connect(port).await;
1422
1423        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1424        assert_eq!(
1425            external_out.next().await.unwrap().unwrap(),
1426            vec![1, 2, 3, 42]
1427        );
1428    }
1429
1430    #[tokio::test]
1431    async fn echo_external_bytes() {
1432        let mut deployment = Deployment::new();
1433
1434        let mut flow = FlowBuilder::new();
1435        let first_node = flow.process::<()>();
1436        let external = flow.external::<()>();
1437
1438        let (port, input, _membership, complete_sink) = first_node
1439            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1440        complete_sink
1441            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1442
1443        let nodes = flow
1444            .with_process(&first_node, deployment.Localhost())
1445            .with_external(&external, deployment.Localhost())
1446            .deploy(&mut deployment);
1447
1448        deployment.deploy().await.unwrap();
1449
1450        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1451        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1452
1453        deployment.start().await.unwrap();
1454
1455        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1456        external_in_2.send(vec![4, 5].into()).await.unwrap();
1457
1458        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1459        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1460    }
1461
1462    #[tokio::test]
1463    async fn echo_external_bincode() {
1464        let mut deployment = Deployment::new();
1465
1466        let mut flow = FlowBuilder::new();
1467        let first_node = flow.process::<()>();
1468        let external = flow.external::<()>();
1469
1470        let (port, input, _membership, complete_sink) =
1471            first_node.bidi_external_many_bincode(&external);
1472        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1473
1474        let nodes = flow
1475            .with_process(&first_node, deployment.Localhost())
1476            .with_external(&external, deployment.Localhost())
1477            .deploy(&mut deployment);
1478
1479        deployment.deploy().await.unwrap();
1480
1481        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1482        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1483
1484        deployment.start().await.unwrap();
1485
1486        external_in_1.send("hi".to_owned()).await.unwrap();
1487        external_in_2.send("hello".to_owned()).await.unwrap();
1488
1489        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1490        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1491    }
1492}