Skip to main content

hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2
3use core::{fmt, panic};
4use std::cell::RefCell;
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::graph::Dfir;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::Mutex;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31use crate::sim::runtime::SimHook;
32
33struct SimConnections {
34    input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
35    output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
36    external_registered: HashMap<ExternalPortId, SimExternalPort>,
37}
38
39tokio::task_local! {
40    static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
41}
42
43/// A handle to a compiled Hydro simulation, which can be instantiated and run.
44pub struct CompiledSim {
45    pub(super) _path: TempPath,
46    pub(super) lib: Library,
47    pub(super) externals_port_registry: SimExternalPortRegistry,
48}
49
50#[sealed::sealed]
51/// A trait implemented by closures that can instantiate a compiled simulation.
52///
53/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
54pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
55#[sealed::sealed]
56impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
57
58fn null_handler(_args: fmt::Arguments) {}
59
60fn println_handler(args: fmt::Arguments) {
61    println!("{}", args);
62}
63
64fn eprintln_handler(args: fmt::Arguments) {
65    eprintln!("{}", args);
66}
67
68/// Creates a simulation instance, returning:
69/// - A list of async DFIRs to run (all process / cluster logic outside a tick)
70/// - A list of tick DFIRs to run (where the &'static str is for the tick location id)
71/// - A mapping of hooks for non-deterministic decisions at tick-input boundaries
72/// - A mapping of inline hooks for non-deterministic decisions inside ticks
73type SimLoaded<'a> = libloading::Symbol<
74    'a,
75    unsafe extern "Rust" fn(
76        should_color: bool,
77        // usize: SimExternalPort
78        external_out: HashMap<usize, UnboundedSender<Bytes>>,
79        // usize: SimExternalPort
80        external_in: HashMap<usize, UnboundedReceiverStream<Bytes>>,
81        println_handler: fn(fmt::Arguments<'_>),
82        eprintln_handler: fn(fmt::Arguments<'_>),
83    ) -> (
84        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
85        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
86        Hooks<&'static str>,
87        InlineHooks<&'static str>,
88    ),
89>;
90
91impl CompiledSim {
92    /// Executes the given closure with a single instance of the compiled simulation.
93    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
94        self.with_instantiator(|instantiator| thunk(instantiator()), true)
95    }
96
97    /// Executes the given closure with an [`Instantiator`], which can be called to create
98    /// independent instances of the simulation. This is useful for fuzzing, where we need to
99    /// re-execute the simulation several times with different decisions.
100    ///
101    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
102    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
103    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
104    pub fn with_instantiator<T>(
105        &self,
106        thunk: impl FnOnce(&dyn Instantiator) -> T,
107        always_log: bool,
108    ) -> T {
109        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
110        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
111        thunk(
112            &(|| CompiledSimInstance {
113                func: func.clone(),
114                externals_port_registry: self.externals_port_registry.clone(),
115                input_ports: HashMap::new(),
116                output_ports: HashMap::new(),
117                log,
118            }),
119        )
120    }
121
122    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
123    /// closure will be repeatedly executed with instances of the Hydro program where the
124    /// batching boundaries, order of messages, and retries are varied.
125    ///
126    /// During development, you should run the test that invokes this function with the `cargo sim`
127    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
128    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
129    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
130    /// be executed, and if no reproducer is found a small number of random executions will be
131    /// performed.
132    pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
133        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
134            .elements()
135            .into_iter()
136            .find(|e| {
137                !e.fn_name.starts_with("hydro_lang::sim::compiled")
138                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
139                    && !e.fn_name.starts_with("fuzz<")
140                    && !e.fn_name.starts_with("<hydro_lang::sim")
141            })
142            .unwrap();
143
144        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
145        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
146
147        let caller_fuzz_repro_path = repro_folder
148            .join(caller_fn.fn_name.replace("::", "__"))
149            .with_extension("bin");
150
151        if std::env::var("BOLERO_FUZZER").is_ok() {
152            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
153            std::fs::create_dir_all(&corpus_dir).unwrap();
154            let libfuzzer_args = format!(
155                "{} {} -artifact_prefix={}/ -handle_abrt=0",
156                corpus_dir.to_str().unwrap(),
157                corpus_dir.to_str().unwrap(),
158                corpus_dir.to_str().unwrap(),
159            );
160
161            std::fs::create_dir_all(&repro_folder).unwrap();
162
163            if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
164                unsafe {
165                    std::env::set_var(
166                        "BOLERO_FAILURE_OUTPUT",
167                        caller_fuzz_repro_path.to_str().unwrap(),
168                    );
169                }
170            }
171
172            unsafe {
173                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
174            }
175
176            self.with_instantiator(
177                |instantiator| {
178                    bolero::test(bolero::TargetLocation {
179                        package_name: "",
180                        manifest_dir: "",
181                        module_path: "",
182                        file: "",
183                        line: 0,
184                        item_path: "<unknown>::__bolero_item_path__",
185                        test_name: None,
186                    })
187                    .run_with_replay(move |is_replay| {
188                        let mut instance = instantiator();
189
190                        if instance.log {
191                            eprintln!(
192                                "{}",
193                                "\n==== New Simulation Instance ===="
194                                    .color(colored::Color::Cyan)
195                                    .bold()
196                            );
197                        }
198
199                        if is_replay {
200                            instance.log = true;
201                        }
202
203                        tokio::runtime::Builder::new_current_thread()
204                            .build()
205                            .unwrap()
206                            .block_on(async { instance.run(&mut thunk).await })
207                    })
208                },
209                false,
210            );
211        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
212            self.fuzz_repro(existing_bytes, async |compiled| {
213                compiled.launch();
214                thunk().await
215            });
216        } else {
217            eprintln!(
218                "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
219                caller_fuzz_repro_path.display()
220            );
221            self.with_instantiator(
222                |instantiator| {
223                    bolero::test(bolero::TargetLocation {
224                        package_name: "",
225                        manifest_dir: "",
226                        module_path: "",
227                        file: ".",
228                        line: 0,
229                        item_path: "<unknown>::__bolero_item_path__",
230                        test_name: None,
231                    })
232                    .with_iterations(8192)
233                    .run(move || {
234                        let instance = instantiator();
235                        tokio::runtime::Builder::new_current_thread()
236                            .build()
237                            .unwrap()
238                            .block_on(async { instance.run(&mut thunk).await })
239                    })
240                },
241                false,
242            );
243        }
244    }
245
246    /// Executes the given closure with a single instance of the compiled simulation, using the
247    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
248    /// failure found during fuzzing.
249    pub fn fuzz_repro<'a>(
250        &'a self,
251        bytes: Vec<u8>,
252        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
253    ) {
254        self.with_instance(|instance| {
255            bolero::bolero_engine::any::scope::with(
256                Box::new(bolero::bolero_engine::driver::object::Object(
257                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
258                )),
259                || {
260                    tokio::runtime::Builder::new_current_thread()
261                        .build()
262                        .unwrap()
263                        .block_on(async { instance.run_without_launching(thunk).await })
264                },
265            )
266        });
267    }
268
269    /// Exhaustively searches all possible executions of the simulation. The provided
270    /// closure will be repeatedly executed with instances of the Hydro program where the
271    /// batching boundaries, order of messages, and retries are varied.
272    ///
273    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
274    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
275    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
276    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
277    ///
278    /// Returns the number of distinct executions explored.
279    pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
280        if std::env::var("BOLERO_FUZZER").is_ok() {
281            eprintln!(
282                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
283            );
284            std::process::abort();
285        }
286
287        let mut count = 0;
288        let count_mut = &mut count;
289
290        self.with_instantiator(
291            |instantiator| {
292                bolero::test(bolero::TargetLocation {
293                    package_name: "",
294                    manifest_dir: "",
295                    module_path: "",
296                    file: "",
297                    line: 0,
298                    item_path: "<unknown>::__bolero_item_path__",
299                    test_name: None,
300                })
301                .exhaustive()
302                .run_with_replay(move |is_replay| {
303                    *count_mut += 1;
304
305                    let mut instance = instantiator();
306                    if instance.log {
307                        eprintln!(
308                            "{}",
309                            "\n==== New Simulation Instance ===="
310                                .color(colored::Color::Cyan)
311                                .bold()
312                        );
313                    }
314
315                    if is_replay {
316                        instance.log = true;
317                    }
318
319                    tokio::runtime::Builder::new_current_thread()
320                        .build()
321                        .unwrap()
322                        .block_on(async { instance.run(&mut thunk).await })
323                })
324            },
325            false,
326        );
327
328        count
329    }
330}
331
332/// A single instance of a compiled Hydro simulation, which provides methods to interactively
333/// execute the simulation, feed inputs, and receive outputs.
334pub struct CompiledSimInstance<'a> {
335    func: SimLoaded<'a>,
336    externals_port_registry: SimExternalPortRegistry,
337    output_ports: HashMap<SimExternalPort, UnboundedSender<Bytes>>,
338    input_ports: HashMap<SimExternalPort, UnboundedReceiverStream<Bytes>>,
339    log: bool,
340}
341
342impl<'a> CompiledSimInstance<'a> {
343    async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
344        self.run_without_launching(async |instance| {
345            instance.launch();
346            thunk().await;
347        })
348        .await;
349    }
350
351    async fn run_without_launching(
352        mut self,
353        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
354    ) {
355        let mut input_senders = HashMap::new();
356        let mut output_receivers = HashMap::new();
357        for registered_port in self.externals_port_registry.port_counter.range_up_to() {
358            {
359                let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
360                self.output_ports.insert(registered_port, sender);
361                output_receivers.insert(
362                    registered_port,
363                    Rc::new(Mutex::new(UnboundedReceiverStream::new(
364                        receiver.into_inner(),
365                    ))),
366                );
367            }
368
369            {
370                let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
371                self.input_ports.insert(registered_port, receiver);
372                input_senders.insert(registered_port, Rc::new(sender));
373            }
374        }
375
376        let local_set = tokio::task::LocalSet::new();
377        local_set
378            .run_until(CURRENT_SIM_CONNECTIONS.scope(
379                RefCell::new(SimConnections {
380                    input_senders,
381                    output_receivers,
382                    external_registered: self.externals_port_registry.registered.clone(),
383                }),
384                async move {
385                    thunk(self).await;
386                },
387            ))
388            .await;
389    }
390
391    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
392    /// be invoked but before receiving any messages.
393    fn launch(self) {
394        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
395    }
396
397    /// Returns a future that schedules simulation with the given logger for reporting the
398    /// simulation trace.
399    pub fn schedule_with_logger<W: std::io::Write>(
400        self,
401        log_writer: W,
402    ) -> impl use<W> + Future<Output = ()> {
403        self.schedule_with_maybe_logger(Some(log_writer))
404    }
405
406    fn schedule_with_maybe_logger<W: std::io::Write>(
407        self,
408        log_override: Option<W>,
409    ) -> impl use<W> + Future<Output = ()> {
410        let (async_dfirs, tick_dfirs, hooks, inline_hooks) = unsafe {
411            (self.func)(
412                colored::control::SHOULD_COLORIZE.should_colorize(),
413                self.output_ports
414                    .into_iter()
415                    .map(|(k, v)| (k.into_inner(), v))
416                    .collect(),
417                self.input_ports
418                    .into_iter()
419                    .map(|(k, v)| (k.into_inner(), v))
420                    .collect(),
421                if self.log {
422                    println_handler
423                } else {
424                    null_handler
425                },
426                if self.log {
427                    eprintln_handler
428                } else {
429                    null_handler
430                },
431            )
432        };
433
434        let not_ready_observation = async_dfirs
435            .iter()
436            .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
437            .collect();
438
439        let mut launched = LaunchedSim {
440            async_dfirs: async_dfirs
441                .into_iter()
442                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
443                .collect(),
444            possibly_ready_ticks: vec![],
445            not_ready_ticks: tick_dfirs
446                .into_iter()
447                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
448                .collect(),
449            possibly_ready_observation: vec![],
450            not_ready_observation,
451            hooks: hooks
452                .into_iter()
453                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
454                .collect(),
455            inline_hooks: inline_hooks
456                .into_iter()
457                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
458                .collect(),
459            log: if self.log {
460                if let Some(w) = log_override {
461                    LogKind::Custom(w)
462                } else {
463                    LogKind::Stderr
464                }
465            } else {
466                LogKind::Null
467            },
468        };
469
470        async move { launched.scheduler().await }
471    }
472}
473
474impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
475    fn clone(&self) -> Self {
476        *self
477    }
478}
479
480impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
481
482impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
483    async fn with_stream<Out>(
484        &self,
485        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
486    ) -> Out {
487        let receiver = CURRENT_SIM_CONNECTIONS.with(|connections| {
488            let connections = &mut *connections.borrow_mut();
489            connections
490                .output_receivers
491                .get(connections.external_registered.get(&self.0).unwrap())
492                .unwrap()
493                .clone()
494        });
495
496        let mut receiver_stream = receiver.lock().await;
497        thunk(&mut pin!(
498            &mut receiver_stream
499                .by_ref()
500                .map(|b| bincode::deserialize(&b).unwrap())
501        ))
502        .await
503    }
504
505    /// Asserts that the stream has ended and no more messages can possibly arrive.
506    pub fn assert_no_more(self) -> impl Future<Output = ()>
507    where
508        T: Debug,
509    {
510        FutureTrackingCaller {
511            future: async move {
512                self.with_stream(async |stream| {
513                    if let Some(next) = stream.next().await {
514                        return Err(format!(
515                            "Stream yielded unexpected message: {:?}, expected termination",
516                            next
517                        ));
518                    }
519                    Ok(())
520                })
521                .await
522            },
523        }
524    }
525}
526
527impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
528    /// Receives the next message from the external bincode stream. This will wait until a message
529    /// is available, or return `None` if no more messages can possibly arrive.
530    pub async fn next(&self) -> Option<T> {
531        self.with_stream(async |stream| stream.next().await).await
532    }
533
534    /// Collects all remaining messages from the external bincode stream into a collection. This
535    /// will wait until no more messages can possibly arrive.
536    pub async fn collect<C: Default + Extend<T>>(self) -> C {
537        self.with_stream(async |stream| stream.collect().await)
538            .await
539    }
540
541    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
542    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
543    pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
544        &self,
545        expected: I,
546    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
547    where
548        T: Debug + PartialEq<T2>,
549    {
550        FutureTrackingCaller {
551            future: async {
552                let mut expected: VecDeque<T2> = expected.into_iter().collect();
553
554                while !expected.is_empty() {
555                    if let Some(next) = self.next().await {
556                        let next_expected = expected.pop_front().unwrap();
557                        if next != next_expected {
558                            return Err(format!(
559                                "Stream yielded unexpected message: {:?}, expected: {:?}",
560                                next, next_expected
561                            ));
562                        }
563                    } else {
564                        return Err(format!(
565                            "Stream ended early, still expected: {:?}",
566                            expected
567                        ));
568                    }
569                }
570
571                Ok(())
572            },
573        }
574    }
575
576    /// Asserts that the stream yields only the expected sequence of messages, in order,
577    /// and then ends.
578    pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
579        &self,
580        expected: I,
581    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
582    where
583        T: Debug + PartialEq<T2>,
584    {
585        ChainedFuture {
586            first: self.assert_yields(expected),
587            second: self.assert_no_more(),
588            first_done: false,
589        }
590    }
591}
592
593pin_project_lite::pin_project! {
594    // A future that tracks the location of the `.await` call for better panic messages.
595    //
596    // `#[track_caller]` is important for us to create assertion methods because it makes
597    // the panic backtrace show up at that method (instead of inside the call tree within
598    // that method). This is e.g. what `Option::unwrap` uses. Unfortunately, `#[track_caller]`
599    // does not work correctly for async methods (or `dyn Future` either), so we have to
600    // create these concrete future types that (1) have `#[track_caller]` on their `poll()`
601    // method and (2) have the `panic!` triggered in their `poll()` method (or in a directly
602    // nested concrete future).
603    struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
604        #[pin]
605        future: F,
606    }
607}
608
609impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
610    type Output = ();
611
612    #[track_caller]
613    fn poll(
614        mut self: Pin<&mut Self>,
615        cx: &mut std::task::Context<'_>,
616    ) -> std::task::Poll<Self::Output> {
617        match ready!(self.as_mut().project().future.poll(cx)) {
618            Ok(()) => std::task::Poll::Ready(()),
619            Err(e) => panic!("{}", e),
620        }
621    }
622}
623
624pin_project_lite::pin_project! {
625    // A future that first awaits the first future, then the second, propagating caller info.
626    //
627    // See [`FutureTrackingCaller`] for context.
628    struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
629        #[pin]
630        first: F1,
631        #[pin]
632        second: F2,
633        first_done: bool,
634    }
635}
636
637impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
638    type Output = ();
639
640    #[track_caller]
641    fn poll(
642        mut self: Pin<&mut Self>,
643        cx: &mut std::task::Context<'_>,
644    ) -> std::task::Poll<Self::Output> {
645        if !self.first_done {
646            ready!(self.as_mut().project().first.poll(cx));
647            *self.as_mut().project().first_done = true;
648        }
649
650        self.as_mut().project().second.poll(cx)
651    }
652}
653
654impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
655    /// Collects all remaining messages from the external bincode stream into a collection,
656    /// sorting them. This will wait until no more messages can possibly arrive.
657    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
658    where
659        T: Ord,
660    {
661        self.with_stream(async |stream| {
662            let mut collected: C = stream.collect().await;
663            collected.as_mut().sort();
664            collected
665        })
666        .await
667    }
668
669    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
670    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
671    pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
672        &self,
673        expected: I,
674    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
675    where
676        T: Debug + PartialEq<T2>,
677    {
678        FutureTrackingCaller {
679            future: async {
680                self.with_stream(async |stream| {
681                    let mut expected: Vec<T2> = expected.into_iter().collect();
682
683                    while !expected.is_empty() {
684                        if let Some(next) = stream.next().await {
685                            let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
686                            if let Some((i, _)) = idx {
687                                expected.swap_remove(i);
688                            } else {
689                                return Err(format!(
690                                    "Stream yielded unexpected message: {:?}",
691                                    next
692                                ));
693                            }
694                        } else {
695                            return Err(format!(
696                                "Stream ended early, still expected: {:?}",
697                                expected
698                            ));
699                        }
700                    }
701
702                    Ok(())
703                })
704                .await
705            },
706        }
707    }
708
709    /// Asserts that the stream yields only the expected sequence of messages, in some order,
710    /// and then ends.
711    pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
712        &self,
713        expected: I,
714    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
715    where
716        T: Debug + PartialEq<T2>,
717    {
718        ChainedFuture {
719            first: self.assert_yields_unordered(expected),
720            second: self.assert_no_more(),
721            first_done: false,
722        }
723    }
724}
725
726impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
727    fn with_sink<Out>(
728        &self,
729        thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
730    ) -> Out {
731        let sender = CURRENT_SIM_CONNECTIONS.with(|connections| {
732            let connections = &mut *connections.borrow_mut();
733            connections
734                .input_senders
735                .get(connections.external_registered.get(&self.0).unwrap())
736                .unwrap()
737                .clone()
738        });
739
740        thunk(&move |t| sender.send(bincode::serialize(&t).unwrap().into()))
741    }
742}
743
744impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
745    /// Sends several messages to the external bincode sink. The messages will be asynchronously
746    /// processed as part of the simulation, in non-deterministic order.
747    pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
748        self.with_sink(|send| {
749            for t in iter {
750                send(t).unwrap();
751            }
752        })
753    }
754}
755
756impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
757    /// Sends a message to the external bincode sink. The message will be asynchronously processed
758    /// as part of the simulation.
759    pub fn send(&self, t: T) {
760        self.with_sink(|send| send(t)).unwrap();
761    }
762
763    /// Sends several messages to the external bincode sink. The messages will be asynchronously
764    /// processed as part of the simulation.
765    pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
766        self.with_sink(|send| {
767            for t in iter {
768                send(t).unwrap();
769            }
770        })
771    }
772}
773
774enum LogKind<W: std::io::Write> {
775    Null,
776    Stderr,
777    Custom(W),
778}
779
780// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
781impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
782    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
783        match self {
784            LogKind::Null => Ok(()),
785            LogKind::Stderr => {
786                eprint!("{}", s);
787                Ok(())
788            }
789            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
790        }
791    }
792}
793
794/// A running simulation, which manages the async DFIR and tick DFIRs, and makes decisions
795/// about scheduling ticks and choices for non-deterministic operators like batch.
796struct LaunchedSim<W: std::io::Write> {
797    async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
798    possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
799    not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
800    possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
801    not_ready_observation: Vec<(LocationId, Option<u32>)>,
802    hooks: Hooks<LocationId>,
803    inline_hooks: InlineHooks<LocationId>,
804    log: LogKind<W>,
805}
806
807impl<W: std::io::Write> LaunchedSim<W> {
808    async fn scheduler(&mut self) {
809        loop {
810            tokio::task::yield_now().await;
811            let mut any_made_progress = false;
812            for (loc, c_id, dfir) in &mut self.async_dfirs {
813                if dfir.run_tick().await {
814                    any_made_progress = true;
815                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
816                        .not_ready_ticks
817                        .drain(..)
818                        .partition(|(tick_loc, tick_c_id, _)| {
819                            let LocationId::Tick(_, outer) = tick_loc else {
820                                unreachable!()
821                            };
822                            outer.as_ref() == loc && tick_c_id == c_id
823                        });
824
825                    self.possibly_ready_ticks.extend(now_ready);
826                    self.not_ready_ticks.extend(still_not_ready);
827
828                    let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
829                        .not_ready_observation
830                        .drain(..)
831                        .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
832
833                    self.possibly_ready_observation.extend(now_ready_obs);
834                    self.not_ready_observation.extend(still_not_ready_obs);
835                }
836            }
837
838            if any_made_progress {
839                continue;
840            } else {
841                use bolero::generator::*;
842
843                let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
844                    .possibly_ready_ticks
845                    .drain(..)
846                    .partition(|(name, cid, _)| {
847                        self.hooks
848                            .get(&(name.clone(), *cid))
849                            .unwrap()
850                            .iter()
851                            .any(|hook| {
852                                hook.current_decision().unwrap_or(false)
853                                    || hook.can_make_nontrivial_decision()
854                            })
855                    });
856
857                self.possibly_ready_ticks = ready_tick;
858                self.not_ready_ticks.append(&mut not_ready_tick);
859
860                let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
861                    .possibly_ready_observation
862                    .drain(..)
863                    .partition(|(name, cid)| {
864                        self.hooks
865                            .get(&(name.clone(), *cid))
866                            .into_iter()
867                            .flatten()
868                            .any(|hook| {
869                                hook.current_decision().unwrap_or(false)
870                                    || hook.can_make_nontrivial_decision()
871                            })
872                    });
873
874                self.possibly_ready_observation = ready_obs;
875                self.not_ready_observation.append(&mut not_ready_obs);
876
877                if self.possibly_ready_ticks.is_empty()
878                    && self.possibly_ready_observation.is_empty()
879                {
880                    break;
881                } else {
882                    let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
883                        + self.possibly_ready_observation.len()))
884                        .any();
885
886                    if next_tick_or_obs < self.possibly_ready_ticks.len() {
887                        let next_tick = next_tick_or_obs;
888                        let mut removed = self.possibly_ready_ticks.remove(next_tick);
889
890                        match &mut self.log {
891                            LogKind::Null => {}
892                            LogKind::Stderr => {
893                                if let Some(cid) = &removed.1 {
894                                    eprintln!(
895                                        "\n{}",
896                                        format!("Running Tick (Cluster Member {})", cid)
897                                            .color(colored::Color::Magenta)
898                                            .bold()
899                                    )
900                                } else {
901                                    eprintln!(
902                                        "\n{}",
903                                        "Running Tick".color(colored::Color::Magenta).bold()
904                                    )
905                                }
906                            }
907                            LogKind::Custom(writer) => {
908                                writeln!(
909                                    writer,
910                                    "\n{}",
911                                    "Running Tick".color(colored::Color::Magenta).bold()
912                                )
913                                .unwrap();
914                            }
915                        }
916
917                        let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
918                            write.write_str(&"*".color(colored::Color::Magenta).bold())?;
919                            write.write_str(" ")
920                        };
921
922                        let mut tick_decision_writer = indenter::indented(&mut self.log)
923                            .with_format(indenter::Format::Custom {
924                                inserter: &mut asterisk_indenter,
925                            });
926
927                        let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
928                        run_hooks(&mut tick_decision_writer, hooks);
929
930                        let run_tick_future = removed.2.run_tick();
931                        if let Some(inline_hooks) =
932                            self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
933                        {
934                            let mut run_tick_future_pinned = pin!(run_tick_future);
935
936                            loop {
937                                tokio::select! {
938                                    biased;
939                                    r = &mut run_tick_future_pinned => {
940                                        assert!(r);
941                                        break;
942                                    }
943                                    _ = async {} => {
944                                        bolero_generator::any::scope::borrow_with(|driver| {
945                                            for hook in inline_hooks.iter_mut() {
946                                                if hook.pending_decision() {
947                                                    if !hook.has_decision() {
948                                                        hook.autonomous_decision(driver);
949                                                    }
950
951                                                    hook.release_decision(&mut tick_decision_writer);
952                                                }
953                                            }
954                                        });
955                                    }
956                                }
957                            }
958                        } else {
959                            assert!(run_tick_future.await);
960                        }
961
962                        self.possibly_ready_ticks.push(removed);
963                    } else {
964                        let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
965                        let mut default_hooks = vec![];
966                        let hooks = self
967                            .hooks
968                            .get_mut(&self.possibly_ready_observation[next_obs])
969                            .unwrap_or(&mut default_hooks);
970
971                        run_hooks(&mut self.log, hooks);
972                    }
973                }
974            }
975        }
976    }
977}
978
979fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
980    let mut remaining_decision_count = hooks.len();
981    let mut made_nontrivial_decision = false;
982
983    bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
984        // first, scan manual decisions
985        hooks.iter_mut().for_each(|hook| {
986            if let Some(is_nontrivial) = hook.current_decision() {
987                made_nontrivial_decision |= is_nontrivial;
988                remaining_decision_count -= 1;
989            } else if !hook.can_make_nontrivial_decision() {
990                // if no nontrivial decision is possible, make a trivial one
991                // (we need to do this in the first pass to force nontrivial decisions
992                // on the remaining hooks)
993                hook.autonomous_decision(driver, false);
994                remaining_decision_count -= 1;
995            }
996        });
997
998        hooks.iter_mut().for_each(|hook| {
999            if hook.current_decision().is_none() {
1000                made_nontrivial_decision |= hook.autonomous_decision(
1001                    driver,
1002                    !made_nontrivial_decision && remaining_decision_count == 1,
1003                );
1004                remaining_decision_count -= 1;
1005            }
1006
1007            hook.release_decision(tick_decision_writer);
1008        });
1009    });
1010}