1use std::future::Future;
4use std::panic::{AssertUnwindSafe, catch_unwind};
5use std::pin::Pin;
6
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9
10use crate::compile::builder::FlowBuilder;
11use crate::live_collections::boundedness::{Boundedness, Unbounded};
12use crate::live_collections::stream::{Ordering, Retries, Stream};
13use crate::location::Process;
14
15pub async fn multi_location_test<'a, T, C, O: Ordering, R: Retries>(
20 thunk: impl FnOnce(
21 &mut FlowBuilder<'a>,
22 &Process<'a, ()>,
23 ) -> Stream<T, Process<'a>, Unbounded, O, R>,
24 check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
25) where
26 T: Serialize + DeserializeOwned + 'static,
27 C: Future<Output = ()>,
28{
29 let mut deployment = hydro_deploy::Deployment::new();
30 let mut flow = FlowBuilder::new();
31 let process = flow.process::<()>();
32 let external = flow.external::<()>();
33 let out = (thunk)(&mut flow, &process);
34 let out_port = out.send_bincode_external(&external);
35 let nodes = flow
36 .with_remaining_processes(|| deployment.Localhost())
37 .with_remaining_clusters(|| vec![deployment.Localhost(); 4])
38 .with_external(&external, deployment.Localhost())
39 .deploy(&mut deployment);
40
41 deployment.deploy().await.unwrap();
42
43 let external_out = nodes.connect(out_port).await;
44 deployment.start().await.unwrap();
45
46 check(external_out).await;
47}
48
49pub async fn stream_transform_test<'a, T, C, B: Boundedness, O: Ordering, R: Retries>(
52 thunk: impl FnOnce(&Process<'a>) -> Stream<T, Process<'a>, B, O, R>,
53 check: impl FnOnce(Pin<Box<dyn futures::Stream<Item = T>>>) -> C,
54) where
55 T: Serialize + DeserializeOwned + 'static,
56 C: Future<Output = ()>,
57{
58 let mut deployment = hydro_deploy::Deployment::new();
59 let mut flow = FlowBuilder::new();
60 let process = flow.process::<()>();
61 let external = flow.external::<()>();
62 let out = thunk(&process);
63 let out_port = out.send_bincode_external(&external);
64 let nodes = flow
65 .with_process(&process, deployment.Localhost())
66 .with_external(&external, deployment.Localhost())
67 .deploy(&mut deployment);
68
69 deployment.deploy().await.unwrap();
70
71 let external_out = nodes.connect(out_port).await;
72 deployment.start().await.unwrap();
73
74 check(external_out).await;
75}
76
77pub fn assert_panics_with_message(func: impl FnOnce(), msg: &'static str) {
80 let err = catch_unwind(AssertUnwindSafe(func)).expect_err("Didn't panic!");
81
82 let chk = |panic_msg: &'_ str| {
83 if !panic_msg.contains(msg) {
84 panic!(
85 "Expected a panic message containing `{}`; got: `{}`.",
86 msg, panic_msg
87 );
88 }
89 };
90
91 err.downcast::<String>()
92 .map(|s| chk(&s))
93 .or_else(|err| err.downcast::<&'static str>().map(|s| chk(*s)))
94 .expect("Unexpected panic type!");
95}