Skip to main content

hydro_lang/deploy/maelstrom/
deploy_runtime_maelstrom.rs

1//! Runtime support for Maelstrom deployment backend.
2//!
3//! This module provides the runtime code that runs inside Maelstrom nodes,
4//! handling stdin/stdout JSON message passing according to the Maelstrom protocol.
5
6#![allow(
7    unused,
8    reason = "unused in trybuild but the __staged version is needed"
9)]
10#![allow(missing_docs, reason = "used internally")]
11
12use std::io::{BufRead, Write};
13
14use futures::{Stream, StreamExt};
15use serde::{Deserialize, Serialize};
16use stageleft::{QuotedWithContext, RuntimeData, q};
17
18use crate::forward_handle::ForwardHandle;
19use crate::live_collections::boundedness::Unbounded;
20use crate::live_collections::keyed_stream::KeyedStream;
21use crate::live_collections::stream::{ExactlyOnce, NoOrder, TotalOrder};
22use crate::location::dynamic::LocationId;
23use crate::location::member_id::TaglessMemberId;
24use crate::location::{Cluster, LocationKey, MembershipEvent, NoTick};
25use crate::nondet::nondet;
26
27/// Maelstrom message envelope structure.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct MaelstromMessage<T> {
30    pub src: String,
31    pub dest: String,
32    pub body: T,
33}
34
35/// Maelstrom init message body.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct InitBody {
38    #[serde(rename = "type")]
39    pub msg_type: String,
40    pub msg_id: u64,
41    pub node_id: String,
42    pub node_ids: Vec<String>,
43}
44
45/// Maelstrom init_ok response body.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct InitOkBody {
48    #[serde(rename = "type")]
49    pub msg_type: String,
50    pub in_reply_to: u64,
51}
52
53/// Metadata for a Maelstrom node, populated from the init message.
54/// Also manages a shared stdin reader that broadcasts lines to multiple subscribers.
55pub struct MaelstromMeta {
56    pub node_id: String,
57    pub node_ids: Vec<String>,
58    stdin_tx: tokio::sync::broadcast::Sender<String>,
59}
60
61impl MaelstromMeta {
62    /// Subscribe to stdin lines. Each subscriber receives all lines read from stdin.
63    /// Multiple subscribers can be created and each will receive a copy of every line.
64    pub fn subscribe_stdin(&self) -> tokio_stream::wrappers::BroadcastStream<String> {
65        tokio_stream::wrappers::BroadcastStream::new(self.stdin_tx.subscribe())
66    }
67
68    /// Start receiving incoming messages from clients and other nodes, after launching the DFIR.
69    pub fn start_receiving(&self) {
70        let tx_clone = self.stdin_tx.clone();
71
72        // Spawn thread to read stdin and broadcast lines
73        std::thread::spawn(move || {
74            let stdin = std::io::stdin();
75            for line in stdin.lock().lines() {
76                match line {
77                    Ok(l) => {
78                        // Ignore send errors (no receivers)
79                        let _ = tx_clone.send(l);
80                    }
81                    Err(_) => break,
82                }
83            }
84        });
85    }
86}
87
88/// Initialize a Maelstrom node by reading the init message from stdin.
89/// Returns the node metadata and sends init_ok response.
90/// Also spawns a background thread to read stdin and broadcast lines to subscribers.
91pub fn maelstrom_init() -> MaelstromMeta {
92    let stdin = std::io::stdin();
93    let mut stdout = std::io::stdout();
94
95    // Read the init message
96    let mut line = String::new();
97    stdin
98        .lock()
99        .read_line(&mut line)
100        .expect("Failed to read init message");
101
102    let msg: MaelstromMessage<InitBody> =
103        serde_json::from_str(&line).expect("Failed to parse init message");
104
105    assert_eq!(msg.body.msg_type, "init", "First message must be init");
106
107    // Set up broadcast channel for stdin lines
108    let (stdin_tx, _) = tokio::sync::broadcast::channel::<String>(1024);
109
110    let meta = MaelstromMeta {
111        node_id: msg.body.node_id.clone(),
112        node_ids: msg.body.node_ids.clone(),
113        stdin_tx,
114    };
115
116    // Send init_ok response
117    let response = MaelstromMessage {
118        src: msg.body.node_id,
119        dest: msg.src,
120        body: InitOkBody {
121            msg_type: "init_ok".to_owned(),
122            in_reply_to: msg.body.msg_id,
123        },
124    };
125
126    let response_json = serde_json::to_string(&response).expect("Failed to serialize init_ok");
127    writeln!(stdout, "{}", response_json).expect("Failed to write init_ok");
128    stdout.flush().expect("Failed to flush stdout");
129
130    meta
131}
132
133/// Get the cluster member IDs from the Maelstrom metadata.
134/// The `meta` parameter is a RuntimeData reference to the MaelstromMeta that will be
135/// available at runtime as `__hydro_lang_maelstrom_meta`.
136pub(super) fn cluster_members<'a>(
137    meta: RuntimeData<&'a MaelstromMeta>,
138    _of_cluster: LocationKey,
139) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
140    q!({
141        let members: &'static [TaglessMemberId] = Box::leak(
142            meta.node_ids
143                .iter()
144                .map(|id| TaglessMemberId::from_maelstrom_node_id(id.clone()))
145                .collect::<Vec<TaglessMemberId>>()
146                .into_boxed_slice(),
147        );
148        members
149    })
150}
151
152/// Get the self ID for this cluster member.
153pub(super) fn cluster_self_id<'a>(
154    meta: RuntimeData<&'a MaelstromMeta>,
155) -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
156    q!(TaglessMemberId::from_maelstrom_node_id(
157        meta.node_id.clone()
158    ))
159}
160
161/// Get the cluster membership stream (static for Maelstrom - all members join at start).
162/// This references `__hydro_lang_maelstrom_meta` which will be in scope at runtime.
163pub(super) fn cluster_membership_stream<'a>(
164    _location_id: &LocationId,
165) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
166{
167    let meta: RuntimeData<&MaelstromMeta> = RuntimeData::new("__hydro_lang_maelstrom_meta");
168    q!(Box::new(futures::stream::iter(
169        meta.node_ids
170            .iter()
171            .map(|id| (
172                TaglessMemberId::from_maelstrom_node_id(id.clone()),
173                MembershipEvent::Joined
174            ))
175            .collect::<Vec<_>>()
176    ))
177        as Box<
178            dyn futures::Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin,
179        >)
180}
181
182/// Create sink and source for m2m (cluster member to cluster member) communication.
183/// Messages are routed through Maelstrom's network via stdin/stdout.
184pub(super) fn deploy_maelstrom_m2m(meta: RuntimeData<&MaelstromMeta>) -> (syn::Expr, syn::Expr) {
185    // Sink: serialize and write to stdout with Maelstrom message envelope
186    let sink_expr = q!({
187        let node_id = meta.node_id.clone();
188        sinktools::map(
189            move |(dest_id, payload): (TaglessMemberId, bytes::Bytes)| {
190                let msg = serde_json::json!({
191                    "src": node_id,
192                    "dest": dest_id.get_maelstrom_node_id(),
193                    "body": {
194                        "type": "hydro_data",
195                        "data": payload.to_vec()
196                    }
197                });
198                serde_json::to_string(&msg).unwrap() + "\n"
199            },
200            futures::sink::unfold((), |(), line: String| {
201                Box::pin(async move {
202                    print!("{}", line);
203                    std::io::stdout().flush().ok();
204                    Ok::<_, std::io::Error>(())
205                })
206            }),
207        )
208    })
209    .splice_untyped_ctx(&());
210
211    // Source: subscribe to the shared stdin broadcast stream
212    let source_expr = q!({
213        let node_ids: std::collections::HashSet<String> = meta.node_ids.iter().cloned().collect();
214        let lines = meta.subscribe_stdin();
215        futures::StreamExt::filter_map(lines, move |line_result| {
216            let node_ids = node_ids.clone();
217            Box::pin(async move {
218                let line = line_result.ok()?;
219                let mut msg =
220                    serde_json::from_str::<MaelstromMessage<serde_json::Value>>(&line).ok()?;
221                // Only process messages from other nodes (not clients)
222                if msg
223                    .body
224                    .get("type")
225                    .is_some_and(|t| t.as_str() == Some("hydro_data"))
226                {
227                    let deser: Vec<u8> =
228                        serde_json::from_value(msg.body.get_mut("data").unwrap().take()).unwrap();
229                    Some(Ok::<_, std::io::Error>((
230                        TaglessMemberId::from_maelstrom_node_id(msg.src),
231                        bytes::BytesMut::from(&deser[..]),
232                    )))
233                } else {
234                    None
235                }
236            })
237        })
238    })
239    .splice_untyped_ctx(&());
240
241    (sink_expr, source_expr)
242}
243
244/// Creates a stream of client messages from Maelstrom stdin.
245/// Returns tuples of (client_id, message_body) where client_id is the source client
246/// and message_body is the JSON value of the message body.
247///
248/// This function is meant to be used with `source_stream` on a Cluster location.
249pub fn maelstrom_client_source(
250    meta: &MaelstromMeta,
251) -> impl Stream<Item = (String, serde_json::Value)> + Unpin {
252    use std::collections::HashSet;
253
254    let node_ids: HashSet<String> = meta.node_ids.iter().cloned().collect();
255    let lines = meta.subscribe_stdin();
256
257    Box::pin(lines.filter_map(move |line_result| {
258        let node_ids = node_ids.clone();
259        async move {
260            let line = line_result.ok()?;
261            let msg: MaelstromMessage<serde_json::Value> = serde_json::from_str(&line).ok()?;
262            // Only process messages from clients (not other nodes)
263            if !node_ids.contains(&msg.src) {
264                Some((msg.src, msg.body))
265            } else {
266                None
267            }
268        }
269    }))
270}
271
272/// Sends a response to a Maelstrom client via stdout.
273///
274/// This function is meant to be used with `for_each` on a stream of responses.
275pub fn maelstrom_send_response(node_id: &str, client_id: &str, body: serde_json::Value) {
276    use std::io::Write;
277
278    let msg = MaelstromMessage {
279        src: node_id.to_owned(),
280        dest: client_id.to_owned(),
281        body,
282    };
283
284    let json = serde_json::to_string(&msg).expect("Failed to serialize response");
285    println!("{}", json);
286    std::io::stdout().flush().ok();
287}