pub trait Location<'a>: DynLocation {
type Root: Location<'a>;
Show 19 methods
// Required method
fn root(&self) -> Self::Root;
// Provided methods
fn try_tick(&self) -> Option<Tick<Self>> { ... }
fn id(&self) -> LocationId { ... }
fn tick(&self) -> Tick<Self>
where Self: NoTick { ... }
fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
where Self: Sized + NoTick { ... }
fn source_stream<T, E>(
&self,
e: impl QuotedWithContext<'a, E, Self>,
) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
where E: FuturesStream<Item = T> + Unpin,
Self: Sized + NoTick { ... }
fn source_iter<T, E>(
&self,
e: impl QuotedWithContext<'a, E, Self>,
) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
where E: IntoIterator<Item = T>,
Self: Sized + NoTick { ... }
fn source_cluster_members<C: 'a>(
&self,
cluster: &Cluster<'a, C>,
) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
where Self: Sized + NoTick { ... }
fn source_external_bytes<L>(
&self,
from: &External<'_, L>,
) -> (ExternalBytesPort, Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>)
where Self: Sized + NoTick { ... }
fn source_external_bincode<L, T, O: Ordering, R: Retries>(
&self,
from: &External<'_, L>,
) -> (ExternalBincodeSink<T, NotMany, O, R>, Stream<T, Self, Unbounded, O, R>)
where Self: Sized + NoTick,
T: Serialize + DeserializeOwned { ... }
fn sim_input<T, O: Ordering, R: Retries>(
&self,
) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
where Self: Sized + NoTick,
T: Serialize + DeserializeOwned { ... }
fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
&self,
from: &External<'_, L>,
port_hint: NetworkHint,
) -> (ExternalBytesPort<NotMany>, Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>)
where Self: Sized + NoTick { ... }
fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
&self,
from: &External<'_, L>,
) -> (ExternalBincodeBidi<InT, OutT, NotMany>, Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>)
where Self: Sized + NoTick { ... }
fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
&self,
from: &External<'_, L>,
port_hint: NetworkHint,
) -> (ExternalBytesPort<Many>, KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>, KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>)
where Self: Sized + NoTick { ... }
fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
&self,
from: &External<'_, L>,
) -> (ExternalBincodeBidi<InT, OutT, Many>, KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>, KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>)
where Self: Sized + NoTick { ... }
fn singleton<T>(
&self,
e: impl QuotedWithContext<'a, T, Self>,
) -> Singleton<T, Self, Bounded>
where T: Clone,
Self: Sized { ... }
fn source_interval(
&self,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
_nondet: NonDet,
) -> Stream<Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
where Self: Sized + NoTick { ... }
fn source_interval_delayed(
&self,
delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
_nondet: NonDet,
) -> Stream<Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
where Self: Sized + NoTick { ... }
fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
where S: CycleCollection<'a, ForwardRef, Location = Self> { ... }
}Expand description
A location where data can be materialized and computation can be executed.
Hydro is a global, distributed programming model. This means that the data and computation in a Hydro program can be spread across multiple machines, data centers, and even continents. To achieve this, Hydro uses the concept of locations to keep track of where data is located and computation is executed.
Each live collection type (in crate::live_collections) has a type parameter L
which will always be a type that implements the Location trait (e.g. Process
and Cluster). To create distributed programs, Hydro provides a variety of APIs
to allow live collections to be moved between locations via network send/receive.
See the Hydro docs for more information.
Required Associated Types§
Required Methods§
Provided Methods§
Sourcefn try_tick(&self) -> Option<Tick<Self>>
fn try_tick(&self) -> Option<Tick<Self>>
Attempts to create a new Tick clock domain at this location.
Returns Some(Tick) if this is a top-level location (like Process or Cluster),
or None if this location is already inside a tick (nested ticks are not supported).
Prefer using Location::tick when you know the location is top-level.
Sourcefn id(&self) -> LocationId
fn id(&self) -> LocationId
Returns the unique identifier for this location.
Sourcefn tick(&self) -> Tick<Self>where
Self: NoTick,
fn tick(&self) -> Tick<Self>where
Self: NoTick,
Creates a new Tick clock domain at this location.
A tick represents a logical clock that can be used to batch streaming data into discrete time steps. This is useful for implementing iterative algorithms or for synchronizing data across multiple streams.
§Example
let tick = process.tick();
let inside_tick = process
.source_iter(q!(vec![1, 2, 3, 4]))
.batch(&tick, nondet!(/** test */));
inside_tick.all_ticks()
// 1, 2, 3, 4Sourcefn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
Creates an unbounded stream that continuously emits unit values ().
This is useful for driving computations that need to run continuously, such as polling or heartbeat mechanisms.
§Example
let tick = process.tick();
process.spin()
.batch(&tick, nondet!(/** test */))
.map(q!(|_| 42))
.all_ticks()
// 42, 42, 42, ...Sourcefn source_stream<T, E>(
&self,
e: impl QuotedWithContext<'a, E, Self>,
) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
fn source_stream<T, E>( &self, e: impl QuotedWithContext<'a, E, Self>, ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
Creates a stream from an async [FuturesStream].
This is useful for integrating with external async data sources, such as network connections or file readers.
§Example
process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
// 1, 2, 3Sourcefn source_iter<T, E>(
&self,
e: impl QuotedWithContext<'a, E, Self>,
) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
fn source_iter<T, E>( &self, e: impl QuotedWithContext<'a, E, Self>, ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
Creates a bounded stream from an iterator.
The iterator is evaluated once at runtime, and all elements are emitted in order. This is useful for creating streams from static data or for testing.
§Example
process.source_iter(q!(vec![1, 2, 3, 4]))
// 1, 2, 3, 4Sourcefn source_cluster_members<C: 'a>(
&self,
cluster: &Cluster<'a, C>,
) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
fn source_cluster_members<C: 'a>( &self, cluster: &Cluster<'a, C>, ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
Creates a stream of membership events for a cluster.
This stream emits MembershipEvent::Joined when a cluster member joins
and MembershipEvent::Left when a cluster member leaves. The stream is
keyed by the MemberId of the cluster member.
This is useful for implementing protocols that need to track cluster membership, such as broadcasting to all members or detecting failures.
§Example
let p1 = flow.process::<()>();
let workers: Cluster<()> = flow.cluster::<()>();
let cluster_members = p1.source_cluster_members(&workers);
// if there are 4 members in the cluster, we would see a join event for each
// { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }Sourcefn source_external_bytes<L>(
&self,
from: &External<'_, L>,
) -> (ExternalBytesPort, Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>)
fn source_external_bytes<L>( &self, from: &External<'_, L>, ) -> (ExternalBytesPort, Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>)
Creates a one-way connection from an external process to receive raw bytes.
Returns a port handle for the external process to connect to, and a stream of received byte buffers.
For bidirectional communication or typed data, see Location::bind_single_client
or Location::source_external_bincode.
Sourcefn source_external_bincode<L, T, O: Ordering, R: Retries>(
&self,
from: &External<'_, L>,
) -> (ExternalBincodeSink<T, NotMany, O, R>, Stream<T, Self, Unbounded, O, R>)
fn source_external_bincode<L, T, O: Ordering, R: Retries>( &self, from: &External<'_, L>, ) -> (ExternalBincodeSink<T, NotMany, O, R>, Stream<T, Self, Unbounded, O, R>)
Creates a one-way connection from an external process to receive bincode-serialized data.
Returns a sink handle for the external process to send data to, and a stream of received values.
For bidirectional communication, see Location::bind_single_client_bincode.
Sourcefn sim_input<T, O: Ordering, R: Retries>(
&self,
) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
Available on crate feature sim only.
fn sim_input<T, O: Ordering, R: Retries>( &self, ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
sim only.Sets up a simulated input port on this location for testing.
Returns a handle to send messages to the location as well as a stream
of received messages. This is only available when the sim feature is enabled.
Sourcefn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
&self,
from: &External<'_, L>,
port_hint: NetworkHint,
) -> (ExternalBytesPort<NotMany>, Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>)
fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>( &self, from: &External<'_, L>, port_hint: NetworkHint, ) -> (ExternalBytesPort<NotMany>, Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>)
Establishes a server on this location to receive a bidirectional connection from a single
client, identified by the given External handle. Returns a port handle for the external
process to connect to, a stream of incoming messages, and a handle to send outgoing
messages.
§Example
let node = flow.process::<()>();
let external = flow.external::<()>();
let (port, incoming, outgoing) =
node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
let mut resp: Vec<u8> = data.into();
resp.push(42);
resp.into() // : Bytes
})));
let nodes = flow // ... with_process and with_external
deployment.deploy().await.unwrap();
deployment.start().await.unwrap();
let (mut external_out, mut external_in) = nodes.connect(port).await;
external_in.send(vec![1, 2, 3].into()).await.unwrap();
assert_eq!(
external_out.next().await.unwrap().unwrap(),
vec![1, 2, 3, 42]
);Sourcefn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
&self,
from: &External<'_, L>,
) -> (ExternalBincodeBidi<InT, OutT, NotMany>, Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>)
fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>( &self, from: &External<'_, L>, ) -> (ExternalBincodeBidi<InT, OutT, NotMany>, Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>)
Establishes a bidirectional connection from a single external client using bincode serialization.
Returns a port handle for the external process to connect to, a stream of incoming messages,
and a handle to send outgoing messages. This is a convenience wrapper around
Location::bind_single_client that uses bincode for serialization.
§Type Parameters
InT: The type of incoming messages (must implementDeserializeOwned)OutT: The type of outgoing messages (must implementSerialize)
Sourcefn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
&self,
from: &External<'_, L>,
port_hint: NetworkHint,
) -> (ExternalBytesPort<Many>, KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>, KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>)
fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>( &self, from: &External<'_, L>, port_hint: NetworkHint, ) -> (ExternalBytesPort<Many>, KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>, KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>)
Establishes a server on this location to receive bidirectional connections from multiple external clients using raw bytes.
Unlike Location::bind_single_client, this method supports multiple concurrent client
connections. Each client is assigned a unique u64 identifier.
Returns:
- A port handle for external processes to connect to
- A keyed stream of incoming messages, keyed by client ID
- A keyed stream of membership events (client joins/leaves), keyed by client ID
- A handle to send outgoing messages, keyed by client ID
Sourcefn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
&self,
from: &External<'_, L>,
) -> (ExternalBincodeBidi<InT, OutT, Many>, KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>, KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>)
fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>( &self, from: &External<'_, L>, ) -> (ExternalBincodeBidi<InT, OutT, Many>, KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>, KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>)
Establishes a server on this location to receive bidirectional connections from multiple external clients using bincode serialization.
Unlike Location::bind_single_client_bincode, this method supports multiple concurrent
client connections. Each client is assigned a unique u64 identifier.
Returns:
- A port handle for external processes to connect to
- A keyed stream of incoming messages, keyed by client ID
- A keyed stream of membership events (client joins/leaves), keyed by client ID
- A handle to send outgoing messages, keyed by client ID
§Type Parameters
InT: The type of incoming messages (must implementDeserializeOwned)OutT: The type of outgoing messages (must implementSerialize)
Sourcefn singleton<T>(
&self,
e: impl QuotedWithContext<'a, T, Self>,
) -> Singleton<T, Self, Bounded>
fn singleton<T>( &self, e: impl QuotedWithContext<'a, T, Self>, ) -> Singleton<T, Self, Bounded>
Sourcefn source_interval(
&self,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
_nondet: NonDet,
) -> Stream<Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
fn source_interval( &self, interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, _nondet: NonDet, ) -> Stream<Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
Generates a stream with values emitted at a fixed interval, with
each value being the current time (as an [tokio::time::Instant]).
The clock source used is monotonic, so elements will be emitted in increasing order.
§Non-Determinism
Because this stream is generated by an OS timer, it will be non-deterministic because each timestamp will be arbitrary.
Sourcefn source_interval_delayed(
&self,
delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
_nondet: NonDet,
) -> Stream<Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
fn source_interval_delayed( &self, delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, _nondet: NonDet, ) -> Stream<Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
Generates a stream with values emitted at a fixed interval (with an
initial delay), with each value being the current time
(as an [tokio::time::Instant]).
The clock source used is monotonic, so elements will be emitted in increasing order.
§Non-Determinism
Because this stream is generated by an OS timer, it will be non-deterministic because each timestamp will be arbitrary.
Sourcefn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)where
S: CycleCollection<'a, ForwardRef, Location = Self>,
fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)where
S: CycleCollection<'a, ForwardRef, Location = Self>,
Creates a forward reference for defining recursive or mutually-dependent dataflows.
Returns a handle that must be completed with the actual stream, and a placeholder stream that can be used in the dataflow graph before the actual stream is defined.
This is useful for implementing feedback loops or recursive computations where a stream depends on its own output.
§Example
// Create a forward reference for the feedback stream
let (complete, feedback) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
// Combine initial input with feedback, then increment
let input: Stream<_, _, Unbounded> = process.source_iter(q!([1])).into();
let output: Stream<_, _, _, NoOrder> = input.interleave(feedback).map(q!(|x| x + 1));
// Complete the forward reference with the output
complete.complete(output.clone());
output
// 2, 3, 4, 5, ...Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.