hydro_lang/deploy/maelstrom/
mod.rs1use serde::Serialize;
4use serde::de::DeserializeOwned;
5
6use crate::forward_handle::ForwardHandle;
7use crate::live_collections::KeyedStream;
8use crate::location::{Cluster, NoTick};
9use crate::nondet::nondet;
10
11#[cfg(stageleft_runtime)]
12#[cfg(feature = "maelstrom")]
13#[cfg_attr(docsrs, doc(cfg(feature = "maelstrom")))]
14pub mod deploy_maelstrom;
15
16pub mod deploy_runtime_maelstrom;
17
18#[expect(clippy::type_complexity, reason = "stream markers")]
36pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
37 cluster: &Cluster<'a, C>,
38) -> (
39 KeyedStream<String, In, Cluster<'a, C>>,
40 ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
41)
42where
43 Cluster<'a, C>: NoTick,
44{
45 use stageleft::q;
46
47 use crate::location::Location;
48
49 let meta: stageleft::RuntimeData<&deploy_runtime_maelstrom::MaelstromMeta> =
50 stageleft::RuntimeData::new("__hydro_lang_maelstrom_meta");
51
52 let input: KeyedStream<String, In, Cluster<'a, C>> = cluster
54 .source_stream(q!(deploy_runtime_maelstrom::maelstrom_client_source(meta)))
55 .into_keyed()
56 .map(q!(|b| serde_json::from_value(b).unwrap()));
57
58 let (fwd_handle, output_stream) =
60 cluster.forward_ref::<KeyedStream<String, Out, Cluster<'a, C>>>();
61
62 output_stream
64 .entries()
65 .assume_ordering(nondet!())
66 .for_each(q!(|(client_id, body)| {
67 deploy_runtime_maelstrom::maelstrom_send_response(
68 &meta.node_id,
69 &client_id,
70 serde_json::to_value(body).unwrap(),
71 );
72 }));
73
74 (input, fwd_handle)
75}