Skip to main content

Location

Trait Location 

Source
pub trait Location<'a>: DynLocation {
    type Root: Location<'a>;

Show 19 methods // Required method fn root(&self) -> Self::Root; // Provided methods fn try_tick(&self) -> Option<Tick<Self>> { ... } fn id(&self) -> LocationId { ... } fn tick(&self) -> Tick<Self> where Self: NoTick { ... } fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce> where Self: Sized + NoTick { ... } fn source_stream<T, E>( &self, e: impl QuotedWithContext<'a, E, Self>, ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce> where E: FuturesStream<Item = T> + Unpin, Self: Sized + NoTick { ... } fn source_iter<T, E>( &self, e: impl QuotedWithContext<'a, E, Self>, ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce> where E: IntoIterator<Item = T>, Self: Sized + NoTick { ... } fn source_cluster_members<C: 'a>( &self, cluster: &Cluster<'a, C>, ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded> where Self: Sized + NoTick { ... } fn source_external_bytes<L>( &self, from: &External<'_, L>, ) -> (ExternalBytesPort, Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>) where Self: Sized + NoTick { ... } fn source_external_bincode<L, T, O: Ordering, R: Retries>( &self, from: &External<'_, L>, ) -> (ExternalBincodeSink<T, NotMany, O, R>, Stream<T, Self, Unbounded, O, R>) where Self: Sized + NoTick, T: Serialize + DeserializeOwned { ... } fn sim_input<T, O: Ordering, R: Retries>( &self, ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>) where Self: Sized + NoTick, T: Serialize + DeserializeOwned { ... } fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>( &self, from: &External<'_, L>, port_hint: NetworkHint, ) -> (ExternalBytesPort<NotMany>, Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>) where Self: Sized + NoTick { ... } fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>( &self, from: &External<'_, L>, ) -> (ExternalBincodeBidi<InT, OutT, NotMany>, Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>) where Self: Sized + NoTick { ... } fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>( &self, from: &External<'_, L>, port_hint: NetworkHint, ) -> (ExternalBytesPort<Many>, KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>, KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>) where Self: Sized + NoTick { ... } fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>( &self, from: &External<'_, L>, ) -> (ExternalBincodeBidi<InT, OutT, Many>, KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>, KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>) where Self: Sized + NoTick { ... } fn singleton<T>( &self, e: impl QuotedWithContext<'a, T, Self>, ) -> Singleton<T, Self, Bounded> where T: Clone, Self: Sized { ... } fn source_interval( &self, interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, _nondet: NonDet, ) -> Stream<Instant, Self, Unbounded, TotalOrder, ExactlyOnce> where Self: Sized + NoTick { ... } fn source_interval_delayed( &self, delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, _nondet: NonDet, ) -> Stream<Instant, Self, Unbounded, TotalOrder, ExactlyOnce> where Self: Sized + NoTick { ... } fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S) where S: CycleCollection<'a, ForwardRef, Location = Self> { ... }
}
Expand description

A location where data can be materialized and computation can be executed.

Hydro is a global, distributed programming model. This means that the data and computation in a Hydro program can be spread across multiple machines, data centers, and even continents. To achieve this, Hydro uses the concept of locations to keep track of where data is located and computation is executed.

Each live collection type (in crate::live_collections) has a type parameter L which will always be a type that implements the Location trait (e.g. Process and Cluster). To create distributed programs, Hydro provides a variety of APIs to allow live collections to be moved between locations via network send/receive.

See the Hydro docs for more information.

Required Associated Types§

Source

type Root: Location<'a>

The root location type for this location.

For top-level locations like Process and Cluster, this is Self. For nested locations like Tick, this is the root location that contains it.

Required Methods§

Source

fn root(&self) -> Self::Root

Returns the root location for this location.

For top-level locations like Process and Cluster, this returns self. For nested locations like Tick, this returns the root location that contains it.

Provided Methods§

Source

fn try_tick(&self) -> Option<Tick<Self>>

Attempts to create a new Tick clock domain at this location.

Returns Some(Tick) if this is a top-level location (like Process or Cluster), or None if this location is already inside a tick (nested ticks are not supported).

Prefer using Location::tick when you know the location is top-level.

Source

fn id(&self) -> LocationId

Returns the unique identifier for this location.

Source

fn tick(&self) -> Tick<Self>
where Self: NoTick,

Creates a new Tick clock domain at this location.

A tick represents a logical clock that can be used to batch streaming data into discrete time steps. This is useful for implementing iterative algorithms or for synchronizing data across multiple streams.

§Example
let tick = process.tick();
let inside_tick = process
    .source_iter(q!(vec![1, 2, 3, 4]))
    .batch(&tick, nondet!(/** test */));
inside_tick.all_ticks()
// 1, 2, 3, 4
Source

fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
where Self: Sized + NoTick,

Creates an unbounded stream that continuously emits unit values ().

This is useful for driving computations that need to run continuously, such as polling or heartbeat mechanisms.

§Example
let tick = process.tick();
process.spin()
    .batch(&tick, nondet!(/** test */))
    .map(q!(|_| 42))
    .all_ticks()
// 42, 42, 42, ...
Source

fn source_stream<T, E>( &self, e: impl QuotedWithContext<'a, E, Self>, ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
where E: FuturesStream<Item = T> + Unpin, Self: Sized + NoTick,

Creates a stream from an async [FuturesStream].

This is useful for integrating with external async data sources, such as network connections or file readers.

§Example
process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
// 1, 2, 3
Source

fn source_iter<T, E>( &self, e: impl QuotedWithContext<'a, E, Self>, ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
where E: IntoIterator<Item = T>, Self: Sized + NoTick,

Creates a bounded stream from an iterator.

The iterator is evaluated once at runtime, and all elements are emitted in order. This is useful for creating streams from static data or for testing.

§Example
process.source_iter(q!(vec![1, 2, 3, 4]))
// 1, 2, 3, 4
Source

fn source_cluster_members<C: 'a>( &self, cluster: &Cluster<'a, C>, ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
where Self: Sized + NoTick,

Creates a stream of membership events for a cluster.

This stream emits MembershipEvent::Joined when a cluster member joins and MembershipEvent::Left when a cluster member leaves. The stream is keyed by the MemberId of the cluster member.

This is useful for implementing protocols that need to track cluster membership, such as broadcasting to all members or detecting failures.

§Example
let p1 = flow.process::<()>();
let workers: Cluster<()> = flow.cluster::<()>();
let cluster_members = p1.source_cluster_members(&workers);
// if there are 4 members in the cluster, we would see a join event for each
// { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
Source

fn source_external_bytes<L>( &self, from: &External<'_, L>, ) -> (ExternalBytesPort, Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>)
where Self: Sized + NoTick,

Creates a one-way connection from an external process to receive raw bytes.

Returns a port handle for the external process to connect to, and a stream of received byte buffers.

For bidirectional communication or typed data, see Location::bind_single_client or Location::source_external_bincode.

Source

fn source_external_bincode<L, T, O: Ordering, R: Retries>( &self, from: &External<'_, L>, ) -> (ExternalBincodeSink<T, NotMany, O, R>, Stream<T, Self, Unbounded, O, R>)
where Self: Sized + NoTick, T: Serialize + DeserializeOwned,

Creates a one-way connection from an external process to receive bincode-serialized data.

Returns a sink handle for the external process to send data to, and a stream of received values.

For bidirectional communication, see Location::bind_single_client_bincode.

Source

fn sim_input<T, O: Ordering, R: Retries>( &self, ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
where Self: Sized + NoTick, T: Serialize + DeserializeOwned,

Available on crate feature sim only.

Sets up a simulated input port on this location for testing.

Returns a handle to send messages to the location as well as a stream of received messages. This is only available when the sim feature is enabled.

Source

fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>( &self, from: &External<'_, L>, port_hint: NetworkHint, ) -> (ExternalBytesPort<NotMany>, Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>)
where Self: Sized + NoTick,

Establishes a server on this location to receive a bidirectional connection from a single client, identified by the given External handle. Returns a port handle for the external process to connect to, a stream of incoming messages, and a handle to send outgoing messages.

§Example
let node = flow.process::<()>();
let external = flow.external::<()>();
let (port, incoming, outgoing) =
    node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
    let mut resp: Vec<u8> = data.into();
    resp.push(42);
    resp.into() // : Bytes
})));

let nodes = flow // ... with_process and with_external

deployment.deploy().await.unwrap();
deployment.start().await.unwrap();

let (mut external_out, mut external_in) = nodes.connect(port).await;
external_in.send(vec![1, 2, 3].into()).await.unwrap();
assert_eq!(
    external_out.next().await.unwrap().unwrap(),
    vec![1, 2, 3, 42]
);
Source

fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>( &self, from: &External<'_, L>, ) -> (ExternalBincodeBidi<InT, OutT, NotMany>, Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>)
where Self: Sized + NoTick,

Establishes a bidirectional connection from a single external client using bincode serialization.

Returns a port handle for the external process to connect to, a stream of incoming messages, and a handle to send outgoing messages. This is a convenience wrapper around Location::bind_single_client that uses bincode for serialization.

§Type Parameters
  • InT: The type of incoming messages (must implement DeserializeOwned)
  • OutT: The type of outgoing messages (must implement Serialize)
Source

fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>( &self, from: &External<'_, L>, port_hint: NetworkHint, ) -> (ExternalBytesPort<Many>, KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>, KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>)
where Self: Sized + NoTick,

Establishes a server on this location to receive bidirectional connections from multiple external clients using raw bytes.

Unlike Location::bind_single_client, this method supports multiple concurrent client connections. Each client is assigned a unique u64 identifier.

Returns:

  • A port handle for external processes to connect to
  • A keyed stream of incoming messages, keyed by client ID
  • A keyed stream of membership events (client joins/leaves), keyed by client ID
  • A handle to send outgoing messages, keyed by client ID
Source

fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>( &self, from: &External<'_, L>, ) -> (ExternalBincodeBidi<InT, OutT, Many>, KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>, KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>, ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>)
where Self: Sized + NoTick,

Establishes a server on this location to receive bidirectional connections from multiple external clients using bincode serialization.

Unlike Location::bind_single_client_bincode, this method supports multiple concurrent client connections. Each client is assigned a unique u64 identifier.

Returns:

  • A port handle for external processes to connect to
  • A keyed stream of incoming messages, keyed by client ID
  • A keyed stream of membership events (client joins/leaves), keyed by client ID
  • A handle to send outgoing messages, keyed by client ID
§Type Parameters
  • InT: The type of incoming messages (must implement DeserializeOwned)
  • OutT: The type of outgoing messages (must implement Serialize)
Source

fn singleton<T>( &self, e: impl QuotedWithContext<'a, T, Self>, ) -> Singleton<T, Self, Bounded>
where T: Clone, Self: Sized,

Constructs a Singleton materialized at this location with the given static value.

§Example
let tick = process.tick();
let singleton = tick.singleton(q!(5));
// 5
Source

fn source_interval( &self, interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, _nondet: NonDet, ) -> Stream<Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
where Self: Sized + NoTick,

Generates a stream with values emitted at a fixed interval, with each value being the current time (as an [tokio::time::Instant]).

The clock source used is monotonic, so elements will be emitted in increasing order.

§Non-Determinism

Because this stream is generated by an OS timer, it will be non-deterministic because each timestamp will be arbitrary.

Source

fn source_interval_delayed( &self, delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, _nondet: NonDet, ) -> Stream<Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
where Self: Sized + NoTick,

Generates a stream with values emitted at a fixed interval (with an initial delay), with each value being the current time (as an [tokio::time::Instant]).

The clock source used is monotonic, so elements will be emitted in increasing order.

§Non-Determinism

Because this stream is generated by an OS timer, it will be non-deterministic because each timestamp will be arbitrary.

Source

fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
where S: CycleCollection<'a, ForwardRef, Location = Self>,

Creates a forward reference for defining recursive or mutually-dependent dataflows.

Returns a handle that must be completed with the actual stream, and a placeholder stream that can be used in the dataflow graph before the actual stream is defined.

This is useful for implementing feedback loops or recursive computations where a stream depends on its own output.

§Example
// Create a forward reference for the feedback stream
let (complete, feedback) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();

// Combine initial input with feedback, then increment
let input: Stream<_, _, Unbounded> = process.source_iter(q!([1])).into();
let output: Stream<_, _, _, NoOrder> = input.interleave(feedback).map(q!(|x| x + 1));

// Complete the forward reference with the output
complete.complete(output.clone());
output
// 2, 3, 4, 5, ...

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<'a, C> Location<'a> for Cluster<'a, C>

Source§

type Root = Cluster<'a, C>

Source§

impl<'a, L> Location<'a> for Atomic<L>
where L: Location<'a>,

Source§

type Root = <L as Location<'a>>::Root

Source§

impl<'a, L> Location<'a> for Tick<L>
where L: Location<'a>,

Source§

type Root = <L as Location<'a>>::Root

Source§

impl<'a, P> Location<'a> for Process<'a, P>

Source§

type Root = Process<'a, P>