1use std::cell::RefCell;
8use std::future::Future;
9use std::io::{BufRead, BufReader, Error};
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::process::Stdio;
13use std::rc::Rc;
14
15use bytes::{Bytes, BytesMut};
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, Stream};
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use stageleft::{QuotedWithContext, RuntimeData};
21
22use super::deploy_runtime_maelstrom::*;
23use crate::compile::builder::ExternalPortId;
24use crate::compile::deploy_provider::{ClusterSpec, Deploy, Node, RegisterPort};
25use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
26use crate::location::dynamic::LocationId;
27use crate::location::member_id::TaglessMemberId;
28use crate::location::{LocationKey, MembershipEvent, NetworkHint};
29
30pub enum MaelstromDeploy {}
37
38impl<'a> Deploy<'a> for MaelstromDeploy {
39 type Meta = ();
40 type InstantiateEnv = MaelstromDeployment;
41
42 type Process = MaelstromProcess;
43 type Cluster = MaelstromCluster;
44 type External = MaelstromExternal;
45
46 fn o2o_sink_source(
47 _p1: &Self::Process,
48 _p1_port: &<Self::Process as Node>::Port,
49 _p2: &Self::Process,
50 _p2_port: &<Self::Process as Node>::Port,
51 ) -> (syn::Expr, syn::Expr) {
52 panic!("Maelstrom deployment does not support processes, only clusters")
53 }
54
55 fn o2o_connect(
56 _p1: &Self::Process,
57 _p1_port: &<Self::Process as Node>::Port,
58 _p2: &Self::Process,
59 _p2_port: &<Self::Process as Node>::Port,
60 ) -> Box<dyn FnOnce()> {
61 panic!("Maelstrom deployment does not support processes, only clusters")
62 }
63
64 fn o2m_sink_source(
65 _p1: &Self::Process,
66 _p1_port: &<Self::Process as Node>::Port,
67 _c2: &Self::Cluster,
68 _c2_port: &<Self::Cluster as Node>::Port,
69 ) -> (syn::Expr, syn::Expr) {
70 panic!("Maelstrom deployment does not support processes, only clusters")
71 }
72
73 fn o2m_connect(
74 _p1: &Self::Process,
75 _p1_port: &<Self::Process as Node>::Port,
76 _c2: &Self::Cluster,
77 _c2_port: &<Self::Cluster as Node>::Port,
78 ) -> Box<dyn FnOnce()> {
79 panic!("Maelstrom deployment does not support processes, only clusters")
80 }
81
82 fn m2o_sink_source(
83 _c1: &Self::Cluster,
84 _c1_port: &<Self::Cluster as Node>::Port,
85 _p2: &Self::Process,
86 _p2_port: &<Self::Process as Node>::Port,
87 ) -> (syn::Expr, syn::Expr) {
88 panic!("Maelstrom deployment does not support processes, only clusters")
89 }
90
91 fn m2o_connect(
92 _c1: &Self::Cluster,
93 _c1_port: &<Self::Cluster as Node>::Port,
94 _p2: &Self::Process,
95 _p2_port: &<Self::Process as Node>::Port,
96 ) -> Box<dyn FnOnce()> {
97 panic!("Maelstrom deployment does not support processes, only clusters")
98 }
99
100 fn m2m_sink_source(
101 _c1: &Self::Cluster,
102 _c1_port: &<Self::Cluster as Node>::Port,
103 _c2: &Self::Cluster,
104 _c2_port: &<Self::Cluster as Node>::Port,
105 ) -> (syn::Expr, syn::Expr) {
106 deploy_maelstrom_m2m(RuntimeData::new("__hydro_lang_maelstrom_meta"))
107 }
108
109 fn m2m_connect(
110 _c1: &Self::Cluster,
111 _c1_port: &<Self::Cluster as Node>::Port,
112 _c2: &Self::Cluster,
113 _c2_port: &<Self::Cluster as Node>::Port,
114 ) -> Box<dyn FnOnce()> {
115 Box::new(|| {})
117 }
118
119 fn e2o_many_source(
120 _extra_stmts: &mut Vec<syn::Stmt>,
121 _p2: &Self::Process,
122 _p2_port: &<Self::Process as Node>::Port,
123 _codec_type: &syn::Type,
124 _shared_handle: String,
125 ) -> syn::Expr {
126 panic!("Maelstrom deployment does not support processes, only clusters")
127 }
128
129 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
130 panic!("Maelstrom deployment does not support processes, only clusters")
131 }
132
133 fn e2o_source(
134 _extra_stmts: &mut Vec<syn::Stmt>,
135 _p1: &Self::External,
136 _p1_port: &<Self::External as Node>::Port,
137 _p2: &Self::Process,
138 _p2_port: &<Self::Process as Node>::Port,
139 _codec_type: &syn::Type,
140 _shared_handle: String,
141 ) -> syn::Expr {
142 panic!("Maelstrom deployment does not support processes, only clusters")
143 }
144
145 fn e2o_connect(
146 _p1: &Self::External,
147 _p1_port: &<Self::External as Node>::Port,
148 _p2: &Self::Process,
149 _p2_port: &<Self::Process as Node>::Port,
150 _many: bool,
151 _server_hint: NetworkHint,
152 ) -> Box<dyn FnOnce()> {
153 panic!("Maelstrom deployment does not support processes, only clusters")
154 }
155
156 fn o2e_sink(
157 _p1: &Self::Process,
158 _p1_port: &<Self::Process as Node>::Port,
159 _p2: &Self::External,
160 _p2_port: &<Self::External as Node>::Port,
161 _shared_handle: String,
162 ) -> syn::Expr {
163 panic!("Maelstrom deployment does not support processes, only clusters")
164 }
165
166 fn cluster_ids(
167 _of_cluster: LocationKey,
168 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
169 cluster_members(RuntimeData::new("__hydro_lang_maelstrom_meta"), _of_cluster)
170 }
171
172 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
173 cluster_self_id(RuntimeData::new("__hydro_lang_maelstrom_meta"))
174 }
175
176 fn cluster_membership_stream(
177 location_id: &LocationId,
178 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
179 {
180 cluster_membership_stream(location_id)
181 }
182}
183
184#[derive(Clone)]
186pub struct MaelstromProcess {
187 _private: (),
188}
189
190impl Node for MaelstromProcess {
191 type Port = String;
192 type Meta = ();
193 type InstantiateEnv = MaelstromDeployment;
194
195 fn next_port(&self) -> Self::Port {
196 panic!("Maelstrom deployment does not support processes")
197 }
198
199 fn update_meta(&self, _meta: &Self::Meta) {}
200
201 fn instantiate(
202 &self,
203 _env: &mut Self::InstantiateEnv,
204 _meta: &mut Self::Meta,
205 _graph: DfirGraph,
206 _extra_stmts: &[syn::Stmt],
207 _sidecars: &[syn::Expr],
208 ) {
209 panic!("Maelstrom deployment does not support processes")
210 }
211}
212
213#[derive(Clone)]
215pub struct MaelstromCluster {
216 next_port: Rc<RefCell<usize>>,
217 name_hint: Option<String>,
218}
219
220impl Node for MaelstromCluster {
221 type Port = String;
222 type Meta = ();
223 type InstantiateEnv = MaelstromDeployment;
224
225 fn next_port(&self) -> Self::Port {
226 let next_port = *self.next_port.borrow();
227 *self.next_port.borrow_mut() += 1;
228 format!("port_{}", next_port)
229 }
230
231 fn update_meta(&self, _meta: &Self::Meta) {}
232
233 fn instantiate(
234 &self,
235 env: &mut Self::InstantiateEnv,
236 _meta: &mut Self::Meta,
237 graph: DfirGraph,
238 extra_stmts: &[syn::Stmt],
239 sidecars: &[syn::Expr],
240 ) {
241 let (bin_name, config) = create_graph_trybuild(
242 graph,
243 extra_stmts,
244 sidecars,
245 self.name_hint.as_deref(),
246 crate::compile::trybuild::generate::DeployMode::Maelstrom,
247 LinkingMode::Static,
248 );
249
250 env.bin_name = Some(bin_name);
251 env.project_dir = Some(config.project_dir);
252 env.target_dir = Some(config.target_dir);
253 env.features = config.features;
254 }
255}
256
257#[derive(Clone)]
259pub enum MaelstromExternal {}
260
261impl Node for MaelstromExternal {
262 type Port = String;
263 type Meta = ();
264 type InstantiateEnv = MaelstromDeployment;
265
266 fn next_port(&self) -> Self::Port {
267 unreachable!()
268 }
269
270 fn update_meta(&self, _meta: &Self::Meta) {}
271
272 fn instantiate(
273 &self,
274 _env: &mut Self::InstantiateEnv,
275 _meta: &mut Self::Meta,
276 _graph: DfirGraph,
277 _extra_stmts: &[syn::Stmt],
278 _sidecars: &[syn::Expr],
279 ) {
280 unreachable!()
281 }
282}
283
284impl<'a> RegisterPort<'a, MaelstromDeploy> for MaelstromExternal {
285 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
286 unreachable!()
287 }
288
289 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
290 fn as_bytes_bidi(
291 &self,
292 _external_port_id: ExternalPortId,
293 ) -> impl Future<
294 Output = (
295 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
296 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
297 ),
298 > + 'a {
299 async move { unreachable!() }
300 }
301
302 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
303 fn as_bincode_bidi<InT, OutT>(
304 &self,
305 _external_port_id: ExternalPortId,
306 ) -> impl Future<
307 Output = (
308 Pin<Box<dyn Stream<Item = OutT>>>,
309 Pin<Box<dyn Sink<InT, Error = Error>>>,
310 ),
311 > + 'a
312 where
313 InT: Serialize + 'static,
314 OutT: DeserializeOwned + 'static,
315 {
316 async move { unreachable!() }
317 }
318
319 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
320 fn as_bincode_sink<T: Serialize + 'static>(
321 &self,
322 _external_port_id: ExternalPortId,
323 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
324 async move { unreachable!() }
325 }
326
327 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
328 fn as_bincode_source<T: DeserializeOwned + 'static>(
329 &self,
330 _external_port_id: ExternalPortId,
331 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
332 async move { unreachable!() }
333 }
334}
335
336#[derive(Clone)]
338pub struct MaelstromClusterSpec;
339
340impl<'a> ClusterSpec<'a, MaelstromDeploy> for MaelstromClusterSpec {
341 fn build(self, key: LocationKey, name_hint: &str) -> MaelstromCluster {
342 assert_eq!(
343 key,
344 LocationKey::FIRST,
345 "there should only be one location for a Maelstrom deployment"
346 );
347 MaelstromCluster {
348 next_port: Rc::new(RefCell::new(0)),
349 name_hint: Some(name_hint.to_owned()),
350 }
351 }
352}
353
354pub struct MaelstromDeployment {
359 pub node_count: usize,
361 pub maelstrom_path: PathBuf,
363 pub workload: String,
365 pub time_limit: Option<u64>,
367 pub rate: Option<u64>,
369 pub availability: Option<String>,
371 pub nemesis: Option<String>,
373 pub extra_args: Vec<String>,
375
376 pub(crate) bin_name: Option<String>,
378 pub(crate) project_dir: Option<PathBuf>,
379 pub(crate) target_dir: Option<PathBuf>,
380 pub(crate) features: Option<Vec<String>>,
381}
382
383impl MaelstromDeployment {
384 pub fn new(workload: impl Into<String>) -> Self {
386 Self {
387 node_count: 1,
388 maelstrom_path: PathBuf::from("maelstrom"),
389 workload: workload.into(),
390 time_limit: None,
391 rate: None,
392 availability: None,
393 nemesis: None,
394 extra_args: vec![],
395 bin_name: None,
396 project_dir: None,
397 target_dir: None,
398 features: None,
399 }
400 }
401
402 pub fn node_count(mut self, count: usize) -> Self {
404 self.node_count = count;
405 self
406 }
407
408 pub fn maelstrom_path(mut self, path: impl Into<PathBuf>) -> Self {
410 self.maelstrom_path = path.into();
411 self
412 }
413
414 pub fn time_limit(mut self, seconds: u64) -> Self {
416 self.time_limit = Some(seconds);
417 self
418 }
419
420 pub fn rate(mut self, rate: u64) -> Self {
422 self.rate = Some(rate);
423 self
424 }
425
426 pub fn availability(mut self, availability: impl Into<String>) -> Self {
428 self.availability = Some(availability.into());
429 self
430 }
431
432 pub fn nemesis(mut self, nemesis: impl Into<String>) -> Self {
434 self.nemesis = Some(nemesis.into());
435 self
436 }
437
438 pub fn extra_args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
440 self.extra_args.extend(args.into_iter().map(Into::into));
441 self
442 }
443
444 pub fn build(&self) -> Result<PathBuf, Error> {
447 let bin_name = self
448 .bin_name
449 .as_ref()
450 .expect("No binary name set - did you call deploy?");
451 let project_dir = self.project_dir.as_ref().expect("No project dir set");
452 let target_dir = self.target_dir.as_ref().expect("No target dir set");
453
454 let mut cmd = std::process::Command::new("cargo");
455 cmd.arg("build")
456 .arg("--example")
457 .arg(bin_name)
458 .arg("--no-default-features")
459 .current_dir(project_dir)
460 .env("CARGO_TARGET_DIR", target_dir)
461 .env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
462
463 let mut all_features = vec!["hydro___feature_maelstrom_runtime".to_owned()];
465 if let Some(features) = &self.features {
466 all_features.extend(features.iter().cloned());
467 }
468 if !all_features.is_empty() {
469 cmd.arg("--features").arg(all_features.join(","));
470 }
471
472 let status = cmd.status()?;
473 if !status.success() {
474 return Err(Error::other(format!(
475 "cargo build failed with status: {}",
476 status
477 )));
478 }
479
480 Ok(target_dir.join("debug").join("examples").join(bin_name))
481 }
482
483 pub fn run(self) -> Result<(), Error> {
487 let binary_path = self.build()?;
488
489 let mut cmd = std::process::Command::new(&self.maelstrom_path);
490 cmd.arg("test")
491 .arg("-w")
492 .arg(&self.workload)
493 .arg("--bin")
494 .arg(&binary_path)
495 .arg("--node-count")
496 .arg(self.node_count.to_string())
497 .stdout(Stdio::piped());
498
499 if let Some(time_limit) = self.time_limit {
500 cmd.arg("--time-limit").arg(time_limit.to_string());
501 }
502
503 if let Some(rate) = self.rate {
504 cmd.arg("--rate").arg(rate.to_string());
505 }
506
507 if let Some(availability) = self.availability {
508 cmd.arg("--availability").arg(availability);
509 }
510
511 if let Some(nemesis) = self.nemesis {
512 cmd.arg("--nemesis").arg(nemesis);
513 }
514
515 for arg in &self.extra_args {
516 cmd.arg(arg);
517 }
518
519 let spawned = cmd.spawn()?;
520
521 for line in BufReader::new(spawned.stdout.unwrap()).lines() {
522 let line = line?;
523 eprintln!("{}", &line);
524
525 if line.starts_with("Analysis invalid!") {
526 return Err(Error::other("Analysis was invalid"));
527 } else if line.starts_with("Errors occurred during analysis, but no anomalies found.")
528 || line.starts_with("Everything looks good!")
529 {
530 return Ok(());
531 }
532 }
533
534 Err(Error::other("Maelstrom produced an unexpected result"))
535 }
536
537 pub fn binary_path(&self) -> Option<PathBuf> {
539 let bin_name = self.bin_name.as_ref()?;
540 let target_dir = self.target_dir.as_ref()?;
541 Some(target_dir.join("debug").join("examples").join(bin_name))
542 }
543}