hydro_lang/deploy/maelstrom/
deploy_runtime_maelstrom.rs1#![allow(
7 unused,
8 reason = "unused in trybuild but the __staged version is needed"
9)]
10#![allow(missing_docs, reason = "used internally")]
11
12use std::io::{BufRead, Write};
13
14use futures::{Stream, StreamExt};
15use serde::{Deserialize, Serialize};
16use stageleft::{QuotedWithContext, RuntimeData, q};
17
18use crate::forward_handle::ForwardHandle;
19use crate::live_collections::boundedness::Unbounded;
20use crate::live_collections::keyed_stream::KeyedStream;
21use crate::live_collections::stream::{ExactlyOnce, NoOrder, TotalOrder};
22use crate::location::dynamic::LocationId;
23use crate::location::member_id::TaglessMemberId;
24use crate::location::{Cluster, LocationKey, MembershipEvent, NoTick};
25use crate::nondet::nondet;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct MaelstromMessage<T> {
30 pub src: String,
31 pub dest: String,
32 pub body: T,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct InitBody {
38 #[serde(rename = "type")]
39 pub msg_type: String,
40 pub msg_id: u64,
41 pub node_id: String,
42 pub node_ids: Vec<String>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct InitOkBody {
48 #[serde(rename = "type")]
49 pub msg_type: String,
50 pub in_reply_to: u64,
51}
52
53pub struct MaelstromMeta {
56 pub node_id: String,
57 pub node_ids: Vec<String>,
58 stdin_tx: tokio::sync::broadcast::Sender<String>,
59}
60
61impl MaelstromMeta {
62 pub fn subscribe_stdin(&self) -> tokio_stream::wrappers::BroadcastStream<String> {
65 tokio_stream::wrappers::BroadcastStream::new(self.stdin_tx.subscribe())
66 }
67
68 pub fn start_receiving(&self) {
70 let tx_clone = self.stdin_tx.clone();
71
72 std::thread::spawn(move || {
74 let stdin = std::io::stdin();
75 for line in stdin.lock().lines() {
76 match line {
77 Ok(l) => {
78 let _ = tx_clone.send(l);
80 }
81 Err(_) => break,
82 }
83 }
84 });
85 }
86}
87
88pub fn maelstrom_init() -> MaelstromMeta {
92 let stdin = std::io::stdin();
93 let mut stdout = std::io::stdout();
94
95 let mut line = String::new();
97 stdin
98 .lock()
99 .read_line(&mut line)
100 .expect("Failed to read init message");
101
102 let msg: MaelstromMessage<InitBody> =
103 serde_json::from_str(&line).expect("Failed to parse init message");
104
105 assert_eq!(msg.body.msg_type, "init", "First message must be init");
106
107 let (stdin_tx, _) = tokio::sync::broadcast::channel::<String>(1024);
109
110 let meta = MaelstromMeta {
111 node_id: msg.body.node_id.clone(),
112 node_ids: msg.body.node_ids.clone(),
113 stdin_tx,
114 };
115
116 let response = MaelstromMessage {
118 src: msg.body.node_id,
119 dest: msg.src,
120 body: InitOkBody {
121 msg_type: "init_ok".to_owned(),
122 in_reply_to: msg.body.msg_id,
123 },
124 };
125
126 let response_json = serde_json::to_string(&response).expect("Failed to serialize init_ok");
127 writeln!(stdout, "{}", response_json).expect("Failed to write init_ok");
128 stdout.flush().expect("Failed to flush stdout");
129
130 meta
131}
132
133pub(super) fn cluster_members<'a>(
137 meta: RuntimeData<&'a MaelstromMeta>,
138 _of_cluster: LocationKey,
139) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
140 q!({
141 let members: &'static [TaglessMemberId] = Box::leak(
142 meta.node_ids
143 .iter()
144 .map(|id| TaglessMemberId::from_maelstrom_node_id(id.clone()))
145 .collect::<Vec<TaglessMemberId>>()
146 .into_boxed_slice(),
147 );
148 members
149 })
150}
151
152pub(super) fn cluster_self_id<'a>(
154 meta: RuntimeData<&'a MaelstromMeta>,
155) -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
156 q!(TaglessMemberId::from_maelstrom_node_id(
157 meta.node_id.clone()
158 ))
159}
160
161pub(super) fn cluster_membership_stream<'a>(
164 _location_id: &LocationId,
165) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
166{
167 let meta: RuntimeData<&MaelstromMeta> = RuntimeData::new("__hydro_lang_maelstrom_meta");
168 q!(Box::new(futures::stream::iter(
169 meta.node_ids
170 .iter()
171 .map(|id| (
172 TaglessMemberId::from_maelstrom_node_id(id.clone()),
173 MembershipEvent::Joined
174 ))
175 .collect::<Vec<_>>()
176 ))
177 as Box<
178 dyn futures::Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin,
179 >)
180}
181
182pub(super) fn deploy_maelstrom_m2m(meta: RuntimeData<&MaelstromMeta>) -> (syn::Expr, syn::Expr) {
185 let sink_expr = q!({
187 let node_id = meta.node_id.clone();
188 sinktools::map(
189 move |(dest_id, payload): (TaglessMemberId, bytes::Bytes)| {
190 let msg = serde_json::json!({
191 "src": node_id,
192 "dest": dest_id.get_maelstrom_node_id(),
193 "body": {
194 "type": "hydro_data",
195 "data": payload.to_vec()
196 }
197 });
198 serde_json::to_string(&msg).unwrap() + "\n"
199 },
200 futures::sink::unfold((), |(), line: String| {
201 Box::pin(async move {
202 print!("{}", line);
203 std::io::stdout().flush().ok();
204 Ok::<_, std::io::Error>(())
205 })
206 }),
207 )
208 })
209 .splice_untyped_ctx(&());
210
211 let source_expr = q!({
213 let node_ids: std::collections::HashSet<String> = meta.node_ids.iter().cloned().collect();
214 let lines = meta.subscribe_stdin();
215 futures::StreamExt::filter_map(lines, move |line_result| {
216 let node_ids = node_ids.clone();
217 Box::pin(async move {
218 let line = line_result.ok()?;
219 let mut msg =
220 serde_json::from_str::<MaelstromMessage<serde_json::Value>>(&line).ok()?;
221 if msg
223 .body
224 .get("type")
225 .is_some_and(|t| t.as_str() == Some("hydro_data"))
226 {
227 let deser: Vec<u8> =
228 serde_json::from_value(msg.body.get_mut("data").unwrap().take()).unwrap();
229 Some(Ok::<_, std::io::Error>((
230 TaglessMemberId::from_maelstrom_node_id(msg.src),
231 bytes::BytesMut::from(&deser[..]),
232 )))
233 } else {
234 None
235 }
236 })
237 })
238 })
239 .splice_untyped_ctx(&());
240
241 (sink_expr, source_expr)
242}
243
244pub fn maelstrom_client_source(
250 meta: &MaelstromMeta,
251) -> impl Stream<Item = (String, serde_json::Value)> + Unpin {
252 use std::collections::HashSet;
253
254 let node_ids: HashSet<String> = meta.node_ids.iter().cloned().collect();
255 let lines = meta.subscribe_stdin();
256
257 Box::pin(lines.filter_map(move |line_result| {
258 let node_ids = node_ids.clone();
259 async move {
260 let line = line_result.ok()?;
261 let msg: MaelstromMessage<serde_json::Value> = serde_json::from_str(&line).ok()?;
262 if !node_ids.contains(&msg.src) {
264 Some((msg.src, msg.body))
265 } else {
266 None
267 }
268 }
269 }))
270}
271
272pub fn maelstrom_send_response(node_id: &str, client_id: &str, body: serde_json::Value) {
276 use std::io::Write;
277
278 let msg = MaelstromMessage {
279 src: node_id.to_owned(),
280 dest: client_id.to_owned(),
281 body,
282 };
283
284 let json = serde_json::to_string(&msg).expect("Failed to serialize response");
285 println!("{}", json);
286 std::io::stdout().flush().ok();
287}