Skip to main content

hydro_lang/deploy/maelstrom/
mod.rs

1//! Deployment backend for running correctness tests against Jepsen Maelstrom (<https://github.com/jepsen-io/maelstrom>)
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5
6use crate::forward_handle::ForwardHandle;
7use crate::live_collections::KeyedStream;
8use crate::location::{Cluster, NoTick};
9use crate::nondet::nondet;
10
11#[cfg(stageleft_runtime)]
12#[cfg(feature = "maelstrom")]
13#[cfg_attr(docsrs, doc(cfg(feature = "maelstrom")))]
14pub mod deploy_maelstrom;
15
16pub mod deploy_runtime_maelstrom;
17
18/// Sets up bidirectional communication with Maelstrom clients on a cluster.
19///
20/// This function provides a similar API to `bidi_external_many_bytes` but for Maelstrom
21/// client communication. It returns a keyed input stream of client messages and accepts
22/// a keyed output stream of responses.
23///
24/// The key type is `String` (the client ID like "c1", "c2").
25/// The value type is `serde_json::Value` (the message body).
26///
27/// # Example
28/// ```ignore
29/// let (input, output_handle) = maelstrom_bidi_clients(&cluster);
30/// output_handle.complete(input.map(q!(|(client_id, body)| {
31///     // Process and return response
32///     (client_id, response_body)
33/// })));
34/// ```
35#[expect(clippy::type_complexity, reason = "stream markers")]
36pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
37    cluster: &Cluster<'a, C>,
38) -> (
39    KeyedStream<String, In, Cluster<'a, C>>,
40    ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
41)
42where
43    Cluster<'a, C>: NoTick,
44{
45    use stageleft::q;
46
47    use crate::location::Location;
48
49    let meta: stageleft::RuntimeData<&deploy_runtime_maelstrom::MaelstromMeta> =
50        stageleft::RuntimeData::new("__hydro_lang_maelstrom_meta");
51
52    // Create the input stream from Maelstrom clients
53    let input: KeyedStream<String, In, Cluster<'a, C>> = cluster
54        .source_stream(q!(deploy_runtime_maelstrom::maelstrom_client_source(meta)))
55        .into_keyed()
56        .map(q!(|b| serde_json::from_value(b).unwrap()));
57
58    // Create a forward reference for the output stream
59    let (fwd_handle, output_stream) =
60        cluster.forward_ref::<KeyedStream<String, Out, Cluster<'a, C>>>();
61
62    // Set up the output sink to send responses back to clients
63    output_stream
64        .entries()
65        .assume_ordering(nondet!(/** maelstrom responses can be sent in any order */))
66        .for_each(q!(|(client_id, body)| {
67            deploy_runtime_maelstrom::maelstrom_send_response(
68                &meta.node_id,
69                &client_id,
70                serde_json::to_value(body).unwrap(),
71            );
72        }));
73
74    (input, fwd_handle)
75}