Skip to main content

hydro_lang/deploy/maelstrom/
deploy_maelstrom.rs

1//! Deployment backend for Hydro that targets Maelstrom for distributed systems testing.
2//!
3//! Maelstrom is a workbench for learning distributed systems by writing your own.
4//! This backend compiles Hydro programs to binaries that communicate via Maelstrom's
5//! stdin/stdout JSON protocol.
6
7use std::cell::RefCell;
8use std::future::Future;
9use std::io::{BufRead, BufReader, Error};
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::process::Stdio;
13use std::rc::Rc;
14
15use bytes::{Bytes, BytesMut};
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, Stream};
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use stageleft::{QuotedWithContext, RuntimeData};
21
22use super::deploy_runtime_maelstrom::*;
23use crate::compile::builder::ExternalPortId;
24use crate::compile::deploy_provider::{ClusterSpec, Deploy, Node, RegisterPort};
25use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
26use crate::location::dynamic::LocationId;
27use crate::location::member_id::TaglessMemberId;
28use crate::location::{LocationKey, MembershipEvent, NetworkHint};
29
30/// Deployment backend that targets Maelstrom for distributed systems testing.
31///
32/// This backend compiles Hydro programs to binaries that communicate via Maelstrom's
33/// stdin/stdout JSON protocol. It is restricted to programs with:
34/// - Exactly one cluster (no processes)
35/// - A single external input channel for client communication
36pub enum MaelstromDeploy {}
37
38impl<'a> Deploy<'a> for MaelstromDeploy {
39    type Meta = ();
40    type InstantiateEnv = MaelstromDeployment;
41
42    type Process = MaelstromProcess;
43    type Cluster = MaelstromCluster;
44    type External = MaelstromExternal;
45
46    fn o2o_sink_source(
47        _p1: &Self::Process,
48        _p1_port: &<Self::Process as Node>::Port,
49        _p2: &Self::Process,
50        _p2_port: &<Self::Process as Node>::Port,
51    ) -> (syn::Expr, syn::Expr) {
52        panic!("Maelstrom deployment does not support processes, only clusters")
53    }
54
55    fn o2o_connect(
56        _p1: &Self::Process,
57        _p1_port: &<Self::Process as Node>::Port,
58        _p2: &Self::Process,
59        _p2_port: &<Self::Process as Node>::Port,
60    ) -> Box<dyn FnOnce()> {
61        panic!("Maelstrom deployment does not support processes, only clusters")
62    }
63
64    fn o2m_sink_source(
65        _p1: &Self::Process,
66        _p1_port: &<Self::Process as Node>::Port,
67        _c2: &Self::Cluster,
68        _c2_port: &<Self::Cluster as Node>::Port,
69    ) -> (syn::Expr, syn::Expr) {
70        panic!("Maelstrom deployment does not support processes, only clusters")
71    }
72
73    fn o2m_connect(
74        _p1: &Self::Process,
75        _p1_port: &<Self::Process as Node>::Port,
76        _c2: &Self::Cluster,
77        _c2_port: &<Self::Cluster as Node>::Port,
78    ) -> Box<dyn FnOnce()> {
79        panic!("Maelstrom deployment does not support processes, only clusters")
80    }
81
82    fn m2o_sink_source(
83        _c1: &Self::Cluster,
84        _c1_port: &<Self::Cluster as Node>::Port,
85        _p2: &Self::Process,
86        _p2_port: &<Self::Process as Node>::Port,
87    ) -> (syn::Expr, syn::Expr) {
88        panic!("Maelstrom deployment does not support processes, only clusters")
89    }
90
91    fn m2o_connect(
92        _c1: &Self::Cluster,
93        _c1_port: &<Self::Cluster as Node>::Port,
94        _p2: &Self::Process,
95        _p2_port: &<Self::Process as Node>::Port,
96    ) -> Box<dyn FnOnce()> {
97        panic!("Maelstrom deployment does not support processes, only clusters")
98    }
99
100    fn m2m_sink_source(
101        _c1: &Self::Cluster,
102        _c1_port: &<Self::Cluster as Node>::Port,
103        _c2: &Self::Cluster,
104        _c2_port: &<Self::Cluster as Node>::Port,
105    ) -> (syn::Expr, syn::Expr) {
106        deploy_maelstrom_m2m(RuntimeData::new("__hydro_lang_maelstrom_meta"))
107    }
108
109    fn m2m_connect(
110        _c1: &Self::Cluster,
111        _c1_port: &<Self::Cluster as Node>::Port,
112        _c2: &Self::Cluster,
113        _c2_port: &<Self::Cluster as Node>::Port,
114    ) -> Box<dyn FnOnce()> {
115        // No runtime connection needed for Maelstrom - all routing is via stdin/stdout
116        Box::new(|| {})
117    }
118
119    fn e2o_many_source(
120        _extra_stmts: &mut Vec<syn::Stmt>,
121        _p2: &Self::Process,
122        _p2_port: &<Self::Process as Node>::Port,
123        _codec_type: &syn::Type,
124        _shared_handle: String,
125    ) -> syn::Expr {
126        panic!("Maelstrom deployment does not support processes, only clusters")
127    }
128
129    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
130        panic!("Maelstrom deployment does not support processes, only clusters")
131    }
132
133    fn e2o_source(
134        _extra_stmts: &mut Vec<syn::Stmt>,
135        _p1: &Self::External,
136        _p1_port: &<Self::External as Node>::Port,
137        _p2: &Self::Process,
138        _p2_port: &<Self::Process as Node>::Port,
139        _codec_type: &syn::Type,
140        _shared_handle: String,
141    ) -> syn::Expr {
142        panic!("Maelstrom deployment does not support processes, only clusters")
143    }
144
145    fn e2o_connect(
146        _p1: &Self::External,
147        _p1_port: &<Self::External as Node>::Port,
148        _p2: &Self::Process,
149        _p2_port: &<Self::Process as Node>::Port,
150        _many: bool,
151        _server_hint: NetworkHint,
152    ) -> Box<dyn FnOnce()> {
153        panic!("Maelstrom deployment does not support processes, only clusters")
154    }
155
156    fn o2e_sink(
157        _p1: &Self::Process,
158        _p1_port: &<Self::Process as Node>::Port,
159        _p2: &Self::External,
160        _p2_port: &<Self::External as Node>::Port,
161        _shared_handle: String,
162    ) -> syn::Expr {
163        panic!("Maelstrom deployment does not support processes, only clusters")
164    }
165
166    fn cluster_ids(
167        _of_cluster: LocationKey,
168    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
169        cluster_members(RuntimeData::new("__hydro_lang_maelstrom_meta"), _of_cluster)
170    }
171
172    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
173        cluster_self_id(RuntimeData::new("__hydro_lang_maelstrom_meta"))
174    }
175
176    fn cluster_membership_stream(
177        location_id: &LocationId,
178    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
179    {
180        cluster_membership_stream(location_id)
181    }
182}
183
184/// A dummy process type for Maelstrom (processes are not supported).
185#[derive(Clone)]
186pub struct MaelstromProcess {
187    _private: (),
188}
189
190impl Node for MaelstromProcess {
191    type Port = String;
192    type Meta = ();
193    type InstantiateEnv = MaelstromDeployment;
194
195    fn next_port(&self) -> Self::Port {
196        panic!("Maelstrom deployment does not support processes")
197    }
198
199    fn update_meta(&self, _meta: &Self::Meta) {}
200
201    fn instantiate(
202        &self,
203        _env: &mut Self::InstantiateEnv,
204        _meta: &mut Self::Meta,
205        _graph: DfirGraph,
206        _extra_stmts: &[syn::Stmt],
207        _sidecars: &[syn::Expr],
208    ) {
209        panic!("Maelstrom deployment does not support processes")
210    }
211}
212
213/// Represents a cluster in Maelstrom deployment.
214#[derive(Clone)]
215pub struct MaelstromCluster {
216    next_port: Rc<RefCell<usize>>,
217    name_hint: Option<String>,
218}
219
220impl Node for MaelstromCluster {
221    type Port = String;
222    type Meta = ();
223    type InstantiateEnv = MaelstromDeployment;
224
225    fn next_port(&self) -> Self::Port {
226        let next_port = *self.next_port.borrow();
227        *self.next_port.borrow_mut() += 1;
228        format!("port_{}", next_port)
229    }
230
231    fn update_meta(&self, _meta: &Self::Meta) {}
232
233    fn instantiate(
234        &self,
235        env: &mut Self::InstantiateEnv,
236        _meta: &mut Self::Meta,
237        graph: DfirGraph,
238        extra_stmts: &[syn::Stmt],
239        sidecars: &[syn::Expr],
240    ) {
241        let (bin_name, config) = create_graph_trybuild(
242            graph,
243            extra_stmts,
244            sidecars,
245            self.name_hint.as_deref(),
246            crate::compile::trybuild::generate::DeployMode::Maelstrom,
247            LinkingMode::Static,
248        );
249
250        env.bin_name = Some(bin_name);
251        env.project_dir = Some(config.project_dir);
252        env.target_dir = Some(config.target_dir);
253        env.features = config.features;
254    }
255}
256
257/// Represents an external client in Maelstrom deployment.
258#[derive(Clone)]
259pub enum MaelstromExternal {}
260
261impl Node for MaelstromExternal {
262    type Port = String;
263    type Meta = ();
264    type InstantiateEnv = MaelstromDeployment;
265
266    fn next_port(&self) -> Self::Port {
267        unreachable!()
268    }
269
270    fn update_meta(&self, _meta: &Self::Meta) {}
271
272    fn instantiate(
273        &self,
274        _env: &mut Self::InstantiateEnv,
275        _meta: &mut Self::Meta,
276        _graph: DfirGraph,
277        _extra_stmts: &[syn::Stmt],
278        _sidecars: &[syn::Expr],
279    ) {
280        unreachable!()
281    }
282}
283
284impl<'a> RegisterPort<'a, MaelstromDeploy> for MaelstromExternal {
285    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
286        unreachable!()
287    }
288
289    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
290    fn as_bytes_bidi(
291        &self,
292        _external_port_id: ExternalPortId,
293    ) -> impl Future<
294        Output = (
295            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
296            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
297        ),
298    > + 'a {
299        async move { unreachable!() }
300    }
301
302    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
303    fn as_bincode_bidi<InT, OutT>(
304        &self,
305        _external_port_id: ExternalPortId,
306    ) -> impl Future<
307        Output = (
308            Pin<Box<dyn Stream<Item = OutT>>>,
309            Pin<Box<dyn Sink<InT, Error = Error>>>,
310        ),
311    > + 'a
312    where
313        InT: Serialize + 'static,
314        OutT: DeserializeOwned + 'static,
315    {
316        async move { unreachable!() }
317    }
318
319    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
320    fn as_bincode_sink<T: Serialize + 'static>(
321        &self,
322        _external_port_id: ExternalPortId,
323    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
324        async move { unreachable!() }
325    }
326
327    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
328    fn as_bincode_source<T: DeserializeOwned + 'static>(
329        &self,
330        _external_port_id: ExternalPortId,
331    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
332        async move { unreachable!() }
333    }
334}
335
336/// Specification for building a Maelstrom cluster.
337#[derive(Clone)]
338pub struct MaelstromClusterSpec;
339
340impl<'a> ClusterSpec<'a, MaelstromDeploy> for MaelstromClusterSpec {
341    fn build(self, key: LocationKey, name_hint: &str) -> MaelstromCluster {
342        assert_eq!(
343            key,
344            LocationKey::FIRST,
345            "there should only be one location for a Maelstrom deployment"
346        );
347        MaelstromCluster {
348            next_port: Rc::new(RefCell::new(0)),
349            name_hint: Some(name_hint.to_owned()),
350        }
351    }
352}
353
354/// The Maelstrom deployment environment.
355///
356/// This holds configuration for the Maelstrom run and accumulates
357/// compilation artifacts during deployment.
358pub struct MaelstromDeployment {
359    /// Number of nodes in the cluster.
360    pub node_count: usize,
361    /// Path to the maelstrom binary.
362    pub maelstrom_path: PathBuf,
363    /// Workload to run (e.g., "echo", "broadcast", "g-counter").
364    pub workload: String,
365    /// Time limit in seconds.
366    pub time_limit: Option<u64>,
367    /// Rate of requests per second.
368    pub rate: Option<u64>,
369    /// The availability of nodes.
370    pub availability: Option<String>,
371    /// Nemesis to run during tests.
372    pub nemesis: Option<String>,
373    /// Additional maelstrom arguments.
374    pub extra_args: Vec<String>,
375
376    // Populated during deployment
377    pub(crate) bin_name: Option<String>,
378    pub(crate) project_dir: Option<PathBuf>,
379    pub(crate) target_dir: Option<PathBuf>,
380    pub(crate) features: Option<Vec<String>>,
381}
382
383impl MaelstromDeployment {
384    /// Create a new Maelstrom deployment with the given node count.
385    pub fn new(workload: impl Into<String>) -> Self {
386        Self {
387            node_count: 1,
388            maelstrom_path: PathBuf::from("maelstrom"),
389            workload: workload.into(),
390            time_limit: None,
391            rate: None,
392            availability: None,
393            nemesis: None,
394            extra_args: vec![],
395            bin_name: None,
396            project_dir: None,
397            target_dir: None,
398            features: None,
399        }
400    }
401
402    /// Set the node count.
403    pub fn node_count(mut self, count: usize) -> Self {
404        self.node_count = count;
405        self
406    }
407
408    /// Set the path to the maelstrom binary.
409    pub fn maelstrom_path(mut self, path: impl Into<PathBuf>) -> Self {
410        self.maelstrom_path = path.into();
411        self
412    }
413
414    /// Set the time limit in seconds.
415    pub fn time_limit(mut self, seconds: u64) -> Self {
416        self.time_limit = Some(seconds);
417        self
418    }
419
420    /// Set the request rate per second.
421    pub fn rate(mut self, rate: u64) -> Self {
422        self.rate = Some(rate);
423        self
424    }
425
426    /// Set the availability for the test.
427    pub fn availability(mut self, availability: impl Into<String>) -> Self {
428        self.availability = Some(availability.into());
429        self
430    }
431
432    /// Set the nemesis for the test.
433    pub fn nemesis(mut self, nemesis: impl Into<String>) -> Self {
434        self.nemesis = Some(nemesis.into());
435        self
436    }
437
438    /// Add extra arguments to pass to maelstrom.
439    pub fn extra_args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
440        self.extra_args.extend(args.into_iter().map(Into::into));
441        self
442    }
443
444    /// Build the compiled binary in dev mode.
445    /// Returns the path to the compiled binary.
446    pub fn build(&self) -> Result<PathBuf, Error> {
447        let bin_name = self
448            .bin_name
449            .as_ref()
450            .expect("No binary name set - did you call deploy?");
451        let project_dir = self.project_dir.as_ref().expect("No project dir set");
452        let target_dir = self.target_dir.as_ref().expect("No target dir set");
453
454        let mut cmd = std::process::Command::new("cargo");
455        cmd.arg("build")
456            .arg("--example")
457            .arg(bin_name)
458            .arg("--no-default-features")
459            .current_dir(project_dir)
460            .env("CARGO_TARGET_DIR", target_dir)
461            .env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
462
463        // Always include maelstrom_runtime feature for runtime support
464        let mut all_features = vec!["hydro___feature_maelstrom_runtime".to_owned()];
465        if let Some(features) = &self.features {
466            all_features.extend(features.iter().cloned());
467        }
468        if !all_features.is_empty() {
469            cmd.arg("--features").arg(all_features.join(","));
470        }
471
472        let status = cmd.status()?;
473        if !status.success() {
474            return Err(Error::other(format!(
475                "cargo build failed with status: {}",
476                status
477            )));
478        }
479
480        Ok(target_dir.join("debug").join("examples").join(bin_name))
481    }
482
483    /// Run Maelstrom with the compiled binary, return Ok(()) if all checks pass.
484    ///
485    /// This will block until Maelstrom completes.
486    pub fn run(self) -> Result<(), Error> {
487        let binary_path = self.build()?;
488
489        let mut cmd = std::process::Command::new(&self.maelstrom_path);
490        cmd.arg("test")
491            .arg("-w")
492            .arg(&self.workload)
493            .arg("--bin")
494            .arg(&binary_path)
495            .arg("--node-count")
496            .arg(self.node_count.to_string())
497            .stdout(Stdio::piped());
498
499        if let Some(time_limit) = self.time_limit {
500            cmd.arg("--time-limit").arg(time_limit.to_string());
501        }
502
503        if let Some(rate) = self.rate {
504            cmd.arg("--rate").arg(rate.to_string());
505        }
506
507        if let Some(availability) = self.availability {
508            cmd.arg("--availability").arg(availability);
509        }
510
511        if let Some(nemesis) = self.nemesis {
512            cmd.arg("--nemesis").arg(nemesis);
513        }
514
515        for arg in &self.extra_args {
516            cmd.arg(arg);
517        }
518
519        let spawned = cmd.spawn()?;
520
521        for line in BufReader::new(spawned.stdout.unwrap()).lines() {
522            let line = line?;
523            eprintln!("{}", &line);
524
525            if line.starts_with("Analysis invalid!") {
526                return Err(Error::other("Analysis was invalid"));
527            } else if line.starts_with("Errors occurred during analysis, but no anomalies found.")
528                || line.starts_with("Everything looks good!")
529            {
530                return Ok(());
531            }
532        }
533
534        Err(Error::other("Maelstrom produced an unexpected result"))
535    }
536
537    /// Get the path to the compiled binary (after building).
538    pub fn binary_path(&self) -> Option<PathBuf> {
539        let bin_name = self.bin_name.as_ref()?;
540        let target_dir = self.target_dir.as_ref()?;
541        Some(target_dir.join("debug").join("examples").join(bin_name))
542    }
543}