hydro_lang/networking/mod.rs
1//! Types for configuring network channels with serialization formats, transports, etc.
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use crate::live_collections::stream::networking::{deserialize_bincode, serialize_bincode};
9use crate::live_collections::stream::{NoOrder, TotalOrder};
10use crate::nondet::NonDet;
11
12#[sealed::sealed]
13trait SerKind<T: ?Sized> {
14 fn serialize_thunk(is_demux: bool) -> syn::Expr;
15
16 fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr;
17}
18
19/// Serialize items using the [`bincode`] crate.
20pub enum Bincode {}
21
22#[sealed::sealed]
23impl<T: Serialize + DeserializeOwned> SerKind<T> for Bincode {
24 fn serialize_thunk(is_demux: bool) -> syn::Expr {
25 serialize_bincode::<T>(is_demux)
26 }
27
28 fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr {
29 deserialize_bincode::<T>(tagged)
30 }
31}
32
33/// An unconfigured serialization backend.
34pub enum NoSer {}
35
36/// A transport backend for network channels.
37#[sealed::sealed]
38pub trait TransportKind {
39 /// The ordering guarantee provided by this transport.
40 type OrderingGuarantee: crate::live_collections::stream::Ordering;
41
42 /// Returns the [`NetworkingInfo`] describing this transport's configuration.
43 fn networking_info() -> NetworkingInfo;
44}
45
46#[sealed::sealed]
47#[diagnostic::on_unimplemented(
48 message = "TCP transport requires a failure policy. For example, `TCP.fail_stop()` stops sending messages after a failed connection."
49)]
50/// A failure policy for TCP connections, determining how the transport handles
51/// connection failures and what ordering guarantees the output stream provides.
52pub trait TcpFailPolicy {
53 /// The ordering guarantee provided by this failure policy.
54 type OrderingGuarantee: crate::live_collections::stream::Ordering;
55
56 /// Returns the [`TcpFault`] variant for this failure policy.
57 fn tcp_fault() -> TcpFault;
58}
59
60/// A TCP failure policy that stops sending messages after a failed connection.
61pub enum FailStop {}
62#[sealed::sealed]
63impl TcpFailPolicy for FailStop {
64 type OrderingGuarantee = TotalOrder;
65
66 fn tcp_fault() -> TcpFault {
67 TcpFault::FailStop
68 }
69}
70
71/// A TCP failure policy that allows messages to be lost.
72pub enum Lossy {}
73#[sealed::sealed]
74impl TcpFailPolicy for Lossy {
75 type OrderingGuarantee = TotalOrder;
76
77 fn tcp_fault() -> TcpFault {
78 TcpFault::Lossy
79 }
80}
81
82/// A TCP failure policy that treats dropped messages as indefinitely delayed.
83///
84/// Unlike [`Lossy`], this does not require a [`NonDet`] annotation because the output
85/// stream is always lower in the partial order than the ideal stream (dropped messages
86/// are modeled as infinite delays). The tradeoff is that the output has [`NoOrder`]
87/// guarantees, imposing stricter conditions on downstream consumers.
88///
89/// When using this mode in the Hydro simulator, you must call
90/// [`.test_safety_only()`](crate::sim::flow::SimFlow::test_safety_only) because the
91/// simulator models dropped messages as indefinitely delayed, which only tests safety
92/// properties (not liveness).
93pub enum LossyDelayedForever {}
94#[sealed::sealed]
95impl TcpFailPolicy for LossyDelayedForever {
96 type OrderingGuarantee = NoOrder;
97
98 fn tcp_fault() -> TcpFault {
99 TcpFault::LossyDelayedForever
100 }
101}
102
103/// Send items across a length-delimited TCP channel.
104pub struct Tcp<F> {
105 _phantom: PhantomData<F>,
106}
107
108#[sealed::sealed]
109impl<F: TcpFailPolicy> TransportKind for Tcp<F> {
110 type OrderingGuarantee = F::OrderingGuarantee;
111
112 fn networking_info() -> NetworkingInfo {
113 NetworkingInfo::Tcp {
114 fault: F::tcp_fault(),
115 }
116 }
117}
118
119/// A networking backend implementation that supports items of type `T`.
120#[sealed::sealed]
121pub trait NetworkFor<T: ?Sized> {
122 /// The ordering guarantee provided by this network configuration.
123 /// When combined with an input stream's ordering `O`, the output ordering
124 /// will be `<O as MinOrder<Self::OrderingGuarantee>>::Min`.
125 type OrderingGuarantee: crate::live_collections::stream::Ordering;
126
127 /// Generates serialization logic for sending `T`.
128 fn serialize_thunk(is_demux: bool) -> syn::Expr;
129
130 /// Generates deserialization logic for receiving `T`.
131 fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr;
132
133 /// Returns the optional name of the network channel.
134 fn name(&self) -> Option<&str>;
135
136 /// Returns the [`NetworkingInfo`] describing this network channel's transport and fault model.
137 fn networking_info() -> NetworkingInfo;
138}
139
140/// The fault model for a TCP connection.
141#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
142pub enum TcpFault {
143 /// Stops sending messages after a failed connection.
144 FailStop,
145 /// Messages may be lost (e.g. due to network partitions).
146 Lossy,
147 /// Dropped messages are treated as indefinitely delayed with no ordering guarantee.
148 LossyDelayedForever,
149}
150
151/// Describes the networking configuration for a network channel at the IR level.
152#[derive(Debug, Clone, PartialEq, Eq, Hash)]
153pub enum NetworkingInfo {
154 /// A TCP-based network channel with a specific fault model.
155 Tcp {
156 /// The fault model for this TCP connection.
157 fault: TcpFault,
158 },
159}
160
161/// A network channel configuration with `T` as transport backend and `S` as the serialization
162/// backend.
163pub struct NetworkingConfig<Tr: ?Sized, S: ?Sized, Name = ()> {
164 name: Option<Name>,
165 _phantom: (PhantomData<Tr>, PhantomData<S>),
166}
167
168impl<Tr: ?Sized, S: ?Sized> NetworkingConfig<Tr, S> {
169 /// Names the network channel and enables stable communication across multiple service versions.
170 pub fn name(self, name: impl Into<String>) -> NetworkingConfig<Tr, S, String> {
171 NetworkingConfig {
172 name: Some(name.into()),
173 _phantom: (PhantomData, PhantomData),
174 }
175 }
176}
177
178impl<Tr: ?Sized, N> NetworkingConfig<Tr, NoSer, N> {
179 /// Configures the network channel to use [`bincode`] to serialize items.
180 pub const fn bincode(mut self) -> NetworkingConfig<Tr, Bincode, N> {
181 let taken_name = self.name.take();
182 std::mem::forget(self); // nothing else is stored
183 NetworkingConfig {
184 name: taken_name,
185 _phantom: (PhantomData, PhantomData),
186 }
187 }
188}
189
190impl<S: ?Sized> NetworkingConfig<Tcp<()>, S> {
191 /// Configures the TCP transport to stop sending messages after a failed connection.
192 ///
193 /// Note that the Hydro simulator will not simulate connection failures that impact the
194 /// *liveness* of a program. If an output assertion depends on a `fail_stop` channel
195 /// making progress, that channel will not experience a failure that would cause the test to
196 /// block indefinitely. However, any *safety* issues caused by connection failures will still
197 /// be caught, such as a race condition between a failed connection and some other message.
198 pub const fn fail_stop(self) -> NetworkingConfig<Tcp<FailStop>, S> {
199 NetworkingConfig {
200 name: self.name,
201 _phantom: (PhantomData, PhantomData),
202 }
203 }
204
205 /// Configures the TCP transport to allow messages to be lost.
206 ///
207 /// This is appropriate for networks where messages may be dropped, such as when
208 /// running under a Maelstrom partition nemesis. Unlike `fail_stop`, which guarantees
209 /// a prefix of messages is delivered, `lossy` makes no such guarantee.
210 ///
211 /// # Non-Determinism
212 /// A lossy TCP channel will non-deterministically drop messages during execution.
213 pub const fn lossy(self, nondet: NonDet) -> NetworkingConfig<Tcp<Lossy>, S> {
214 let _ = nondet;
215 NetworkingConfig {
216 name: self.name,
217 _phantom: (PhantomData, PhantomData),
218 }
219 }
220
221 /// Configures the TCP transport to treat dropped messages as indefinitely delayed.
222 ///
223 /// This is appropriate for networks where messages may be dropped, such as when
224 /// running under a Maelstrom partition nemesis. Unlike [`Self::lossy`], this does
225 /// *not* require a [`NonDet`] annotation because the output is always lower in the
226 /// partial order than the ideal stream. However, the output stream will have
227 /// [`NoOrder`] guarantees, imposing stricter conditions on downstream consumers.
228 ///
229 /// Unlike [`Self::lossy`], this mode can easily be simulated in exhaustive mode
230 /// without running into fairness issues.
231 ///
232 /// When using this mode in the Hydro simulator, you must call
233 /// [`.test_safety_only()`](crate::sim::flow::SimFlow::test_safety_only) to opt in,
234 /// because the simulator models dropped messages as indefinitely delayed, which only
235 /// tests safety properties (not liveness).
236 pub const fn lossy_delayed_forever(self) -> NetworkingConfig<Tcp<LossyDelayedForever>, S> {
237 NetworkingConfig {
238 name: self.name,
239 _phantom: (PhantomData, PhantomData),
240 }
241 }
242}
243
244#[sealed::sealed]
245impl<Tr: ?Sized, S: ?Sized, T: ?Sized> NetworkFor<T> for NetworkingConfig<Tr, S>
246where
247 Tr: TransportKind,
248 S: SerKind<T>,
249{
250 type OrderingGuarantee = Tr::OrderingGuarantee;
251
252 fn serialize_thunk(is_demux: bool) -> syn::Expr {
253 S::serialize_thunk(is_demux)
254 }
255
256 fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr {
257 S::deserialize_thunk(tagged)
258 }
259
260 fn name(&self) -> Option<&str> {
261 None
262 }
263
264 fn networking_info() -> NetworkingInfo {
265 Tr::networking_info()
266 }
267}
268
269#[sealed::sealed]
270impl<Tr: ?Sized, S: ?Sized, T: ?Sized> NetworkFor<T> for NetworkingConfig<Tr, S, String>
271where
272 Tr: TransportKind,
273 S: SerKind<T>,
274{
275 type OrderingGuarantee = Tr::OrderingGuarantee;
276
277 fn serialize_thunk(is_demux: bool) -> syn::Expr {
278 S::serialize_thunk(is_demux)
279 }
280
281 fn deserialize_thunk(tagged: Option<&syn::Type>) -> syn::Expr {
282 S::deserialize_thunk(tagged)
283 }
284
285 fn name(&self) -> Option<&str> {
286 self.name.as_deref()
287 }
288
289 fn networking_info() -> NetworkingInfo {
290 Tr::networking_info()
291 }
292}
293
294/// A network channel that uses length-delimited TCP for transport.
295pub const TCP: NetworkingConfig<Tcp<()>, NoSer> = NetworkingConfig {
296 name: None,
297 _phantom: (PhantomData, PhantomData),
298};