Skip to main content

hydro_lang/networking/
mod.rs

1//! Types for configuring network channels with serialization formats, transports, etc.
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use crate::live_collections::stream::networking::{deserialize_bincode, serialize_bincode};
9use crate::live_collections::stream::{NoOrder, TotalOrder};
10use crate::nondet::NonDet;
11
12#[sealed::sealed]
13trait SerKind<T: ?Sized> {
14    fn serialize_thunk(is_demux: bool) -> syn::Expr;
15
16    fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr;
17}
18
19/// Serialize items using the [`bincode`] crate.
20pub enum Bincode {}
21
22#[sealed::sealed]
23impl<T: Serialize + DeserializeOwned> SerKind<T> for Bincode {
24    fn serialize_thunk(is_demux: bool) -> syn::Expr {
25        serialize_bincode::<T>(is_demux)
26    }
27
28    fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr {
29        deserialize_bincode::<T>(tagged)
30    }
31}
32
33/// An unconfigured serialization backend.
34pub enum NoSer {}
35
36/// A transport backend for network channels.
37#[sealed::sealed]
38pub trait TransportKind {
39    /// The ordering guarantee provided by this transport.
40    type OrderingGuarantee: crate::live_collections::stream::Ordering;
41
42    /// Returns the [`NetworkingInfo`] describing this transport's configuration.
43    fn networking_info() -> NetworkingInfo;
44}
45
46#[sealed::sealed]
47#[diagnostic::on_unimplemented(
48    message = "TCP transport requires a failure policy. For example, `TCP.fail_stop()` stops sending messages after a failed connection."
49)]
50/// A failure policy for TCP connections, determining how the transport handles
51/// connection failures and what ordering guarantees the output stream provides.
52pub trait TcpFailPolicy {
53    /// The ordering guarantee provided by this failure policy.
54    type OrderingGuarantee: crate::live_collections::stream::Ordering;
55
56    /// Returns the [`TcpFault`] variant for this failure policy.
57    fn tcp_fault() -> TcpFault;
58}
59
60/// A TCP failure policy that stops sending messages after a failed connection.
61pub enum FailStop {}
62#[sealed::sealed]
63impl TcpFailPolicy for FailStop {
64    type OrderingGuarantee = TotalOrder;
65
66    fn tcp_fault() -> TcpFault {
67        TcpFault::FailStop
68    }
69}
70
71/// A TCP failure policy that allows messages to be lost.
72pub enum Lossy {}
73#[sealed::sealed]
74impl TcpFailPolicy for Lossy {
75    type OrderingGuarantee = TotalOrder;
76
77    fn tcp_fault() -> TcpFault {
78        TcpFault::Lossy
79    }
80}
81
82/// A TCP failure policy that treats dropped messages as indefinitely delayed.
83///
84/// Unlike [`Lossy`], this does not require a [`NonDet`] annotation because the output
85/// stream is always lower in the partial order than the ideal stream (dropped messages
86/// are modeled as infinite delays). The tradeoff is that the output has [`NoOrder`]
87/// guarantees, imposing stricter conditions on downstream consumers.
88///
89/// When using this mode in the Hydro simulator, you must call
90/// [`.test_safety_only()`](crate::sim::flow::SimFlow::test_safety_only) because the
91/// simulator models dropped messages as indefinitely delayed, which only tests safety
92/// properties (not liveness).
93pub enum LossyDelayedForever {}
94#[sealed::sealed]
95impl TcpFailPolicy for LossyDelayedForever {
96    type OrderingGuarantee = NoOrder;
97
98    fn tcp_fault() -> TcpFault {
99        TcpFault::LossyDelayedForever
100    }
101}
102
103/// Send items across a length-delimited TCP channel.
104pub struct Tcp<F> {
105    _phantom: PhantomData<F>,
106}
107
108#[sealed::sealed]
109impl<F: TcpFailPolicy> TransportKind for Tcp<F> {
110    type OrderingGuarantee = F::OrderingGuarantee;
111
112    fn networking_info() -> NetworkingInfo {
113        NetworkingInfo::Tcp {
114            fault: F::tcp_fault(),
115        }
116    }
117}
118
119/// A networking backend implementation that supports items of type `T`.
120#[sealed::sealed]
121pub trait NetworkFor<T: ?Sized> {
122    /// The ordering guarantee provided by this network configuration.
123    /// When combined with an input stream's ordering `O`, the output ordering
124    /// will be `<O as MinOrder<Self::OrderingGuarantee>>::Min`.
125    type OrderingGuarantee: crate::live_collections::stream::Ordering;
126
127    /// Generates serialization logic for sending `T`.
128    fn serialize_thunk(is_demux: bool) -> syn::Expr;
129
130    /// Generates deserialization logic for receiving `T`.
131    fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr;
132
133    /// Returns the optional name of the network channel.
134    fn name(&self) -> Option<&str>;
135
136    /// Returns the [`NetworkingInfo`] describing this network channel's transport and fault model.
137    fn networking_info() -> NetworkingInfo;
138}
139
140/// The fault model for a TCP connection.
141#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
142pub enum TcpFault {
143    /// Stops sending messages after a failed connection.
144    FailStop,
145    /// Messages may be lost (e.g. due to network partitions).
146    Lossy,
147    /// Dropped messages are treated as indefinitely delayed with no ordering guarantee.
148    LossyDelayedForever,
149}
150
151/// Describes the networking configuration for a network channel at the IR level.
152#[derive(Debug, Clone, PartialEq, Eq, Hash)]
153pub enum NetworkingInfo {
154    /// A TCP-based network channel with a specific fault model.
155    Tcp {
156        /// The fault model for this TCP connection.
157        fault: TcpFault,
158    },
159}
160
161/// A network channel configuration with `T` as transport backend and `S` as the serialization
162/// backend.
163pub struct NetworkingConfig<Tr: ?Sized, S: ?Sized, Name = ()> {
164    name: Option<Name>,
165    _phantom: (PhantomData<Tr>, PhantomData<S>),
166}
167
168impl<Tr: ?Sized, S: ?Sized> NetworkingConfig<Tr, S> {
169    /// Names the network channel and enables stable communication across multiple service versions.
170    pub fn name(self, name: impl Into<String>) -> NetworkingConfig<Tr, S, String> {
171        NetworkingConfig {
172            name: Some(name.into()),
173            _phantom: (PhantomData, PhantomData),
174        }
175    }
176}
177
178impl<Tr: ?Sized, N> NetworkingConfig<Tr, NoSer, N> {
179    /// Configures the network channel to use [`bincode`] to serialize items.
180    pub const fn bincode(mut self) -> NetworkingConfig<Tr, Bincode, N> {
181        let taken_name = self.name.take();
182        std::mem::forget(self); // nothing else is stored
183        NetworkingConfig {
184            name: taken_name,
185            _phantom: (PhantomData, PhantomData),
186        }
187    }
188}
189
190impl<S: ?Sized> NetworkingConfig<Tcp<()>, S> {
191    /// Configures the TCP transport to stop sending messages after a failed connection.
192    ///
193    /// Note that the Hydro simulator will not simulate connection failures that impact the
194    /// *liveness* of a program. If an output assertion depends on a `fail_stop` channel
195    /// making progress, that channel will not experience a failure that would cause the test to
196    /// block indefinitely. However, any *safety* issues caused by connection failures will still
197    /// be caught, such as a race condition between a failed connection and some other message.
198    pub const fn fail_stop(self) -> NetworkingConfig<Tcp<FailStop>, S> {
199        NetworkingConfig {
200            name: self.name,
201            _phantom: (PhantomData, PhantomData),
202        }
203    }
204
205    /// Configures the TCP transport to allow messages to be lost.
206    ///
207    /// This is appropriate for networks where messages may be dropped, such as when
208    /// running under a Maelstrom partition nemesis. Unlike `fail_stop`, which guarantees
209    /// a prefix of messages is delivered, `lossy` makes no such guarantee.
210    ///
211    /// # Non-Determinism
212    /// A lossy TCP channel will non-deterministically drop messages during execution.
213    pub const fn lossy(self, nondet: NonDet) -> NetworkingConfig<Tcp<Lossy>, S> {
214        let _ = nondet;
215        NetworkingConfig {
216            name: self.name,
217            _phantom: (PhantomData, PhantomData),
218        }
219    }
220
221    /// Configures the TCP transport to treat dropped messages as indefinitely delayed.
222    ///
223    /// This is appropriate for networks where messages may be dropped, such as when
224    /// running under a Maelstrom partition nemesis. Unlike [`Self::lossy`], this does
225    /// *not* require a [`NonDet`] annotation because the output is always lower in the
226    /// partial order than the ideal stream. However, the output stream will have
227    /// [`NoOrder`] guarantees, imposing stricter conditions on downstream consumers.
228    ///
229    /// Unlike [`Self::lossy`], this mode can easily be simulated in exhaustive mode
230    /// without running into fairness issues.
231    ///
232    /// When using this mode in the Hydro simulator, you must call
233    /// [`.test_safety_only()`](crate::sim::flow::SimFlow::test_safety_only) to opt in,
234    /// because the simulator models dropped messages as indefinitely delayed, which only
235    /// tests safety properties (not liveness).
236    pub const fn lossy_delayed_forever(self) -> NetworkingConfig<Tcp<LossyDelayedForever>, S> {
237        NetworkingConfig {
238            name: self.name,
239            _phantom: (PhantomData, PhantomData),
240        }
241    }
242}
243
244#[sealed::sealed]
245impl<Tr: ?Sized, S: ?Sized, T: ?Sized> NetworkFor<T> for NetworkingConfig<Tr, S>
246where
247    Tr: TransportKind,
248    S: SerKind<T>,
249{
250    type OrderingGuarantee = Tr::OrderingGuarantee;
251
252    fn serialize_thunk(is_demux: bool) -> syn::Expr {
253        S::serialize_thunk(is_demux)
254    }
255
256    fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr {
257        S::deserialize_thunk(tagged)
258    }
259
260    fn name(&self) -> Option<&str> {
261        None
262    }
263
264    fn networking_info() -> NetworkingInfo {
265        Tr::networking_info()
266    }
267}
268
269#[sealed::sealed]
270impl<Tr: ?Sized, S: ?Sized, T: ?Sized> NetworkFor<T> for NetworkingConfig<Tr, S, String>
271where
272    Tr: TransportKind,
273    S: SerKind<T>,
274{
275    type OrderingGuarantee = Tr::OrderingGuarantee;
276
277    fn serialize_thunk(is_demux: bool) -> syn::Expr {
278        S::serialize_thunk(is_demux)
279    }
280
281    fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr {
282        S::deserialize_thunk(tagged)
283    }
284
285    fn name(&self) -> Option<&str> {
286        self.name.as_deref()
287    }
288
289    fn networking_info() -> NetworkingInfo {
290        Tr::networking_info()
291    }
292}
293
294/// A network channel that uses length-delimited TCP for transport.
295pub const TCP: NetworkingConfig<Tcp<()>, NoSer> = NetworkingConfig {
296    name: None,
297    _phantom: (PhantomData, PhantomData),
298};