Skip to main content

hydro_lang/
test_util.rs

1//! Various utilities for testing short Hydro programs, especially in doctests.
2
3use std::future::Future;
4use std::panic::{AssertUnwindSafe, catch_unwind};
5use std::pin::Pin;
6
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9
10use crate::compile::builder::FlowBuilder;
11use crate::live_collections::boundedness::{Boundedness, Unbounded};
12use crate::live_collections::stream::{Ordering, Retries, Stream};
13use crate::location::Process;
14
15/// Sets up a test with multiple processes / clusters declared in the test logic (`thunk`). The test logic must return
16/// a single streaming output, which can then be read in `check` (an async closure) to perform assertions.
17///
18/// Each declared process is deployed as a single local process, and each cluster is deployed as four local processes.
19pub async fn multi_location_test<'a, T, C, O: Ordering, R: Retries>(
20    thunk: impl FnOnce(
21        &mut FlowBuilder<'a>,
22        &Process<'a, ()>,
23    ) -> Stream<T, Process<'a>, Unbounded, O, R>,
24    check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
25) where
26    T: Serialize + DeserializeOwned + 'static,
27    C: Future<Output = ()>,
28{
29    let mut deployment = hydro_deploy::Deployment::new();
30    let mut flow = FlowBuilder::new();
31    let process = flow.process::<()>();
32    let external = flow.external::<()>();
33    let out = (thunk)(&mut flow, &process);
34    let out_port = out.send_bincode_external(&external);
35    let nodes = flow
36        .with_remaining_processes(|| deployment.Localhost())
37        .with_remaining_clusters(|| vec![deployment.Localhost(); 4])
38        .with_external(&external, deployment.Localhost())
39        .deploy(&mut deployment);
40
41    deployment.deploy().await.unwrap();
42
43    let external_out = nodes.connect(out_port).await;
44    deployment.start().await.unwrap();
45
46    check(external_out).await;
47}
48
49/// Sets up a test declared in `thunk` that executes on a single [`Process`], returning a streaming output
50/// that can be read in `check` (an async closure) to perform assertions.
51pub async fn stream_transform_test<'a, T, C, B: Boundedness, O: Ordering, R: Retries>(
52    thunk: impl FnOnce(&Process<'a>) -> Stream<T, Process<'a>, B, O, R>,
53    check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
54) where
55    T: Serialize + DeserializeOwned + 'static,
56    C: Future<Output = ()>,
57{
58    let mut deployment = hydro_deploy::Deployment::new();
59    let mut flow = FlowBuilder::new();
60    let process = flow.process::<()>();
61    let external = flow.external::<()>();
62    let out = thunk(&process);
63    let out_port = out.send_bincode_external(&external);
64    let nodes = flow
65        .with_process(&process, deployment.Localhost())
66        .with_external(&external, deployment.Localhost())
67        .deploy(&mut deployment);
68
69    deployment.deploy().await.unwrap();
70
71    let external_out = nodes.connect(out_port).await;
72    deployment.start().await.unwrap();
73
74    check(external_out).await;
75}
76
77// from https://users.rust-lang.org/t/how-to-write-doctest-that-panic-with-an-expected-message/58650
78/// Asserts that running the given closure results in a panic with a message containing `msg`.
79pub fn assert_panics_with_message(func: impl FnOnce(), msg: &'static str) {
80    let err = catch_unwind(AssertUnwindSafe(func)).expect_err("Didn't panic!");
81
82    let chk = |panic_msg: &'_ str| {
83        if !panic_msg.contains(msg) {
84            panic!(
85                "Expected a panic message containing `{}`; got: `{}`.",
86                msg, panic_msg
87            );
88        }
89    };
90
91    err.downcast::<String>()
92        .map(|s| chk(&s))
93        .or_else(|err| err.downcast::<&'static str>().map(|s| chk(*s)))
94        .expect("Unexpected panic type!");
95}