1use core::{fmt, panic};
4use std::cell::RefCell;
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::graph::Dfir;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::Mutex;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31use crate::sim::runtime::SimHook;
32
33struct SimConnections {
34 input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
35 output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
36 external_registered: HashMap<ExternalPortId, SimExternalPort>,
37}
38
39tokio::task_local! {
40 static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
41}
42
43pub struct CompiledSim {
45 pub(super) _path: TempPath,
46 pub(super) lib: Library,
47 pub(super) externals_port_registry: SimExternalPortRegistry,
48}
49
50#[sealed::sealed]
51pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
55#[sealed::sealed]
56impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
57
58fn null_handler(_args: fmt::Arguments) {}
59
60fn println_handler(args: fmt::Arguments) {
61 println!("{}", args);
62}
63
64fn eprintln_handler(args: fmt::Arguments) {
65 eprintln!("{}", args);
66}
67
68type SimLoaded<'a> = libloading::Symbol<
74 'a,
75 unsafe extern "Rust" fn(
76 should_color: bool,
77 external_out: HashMap<usize, UnboundedSender<Bytes>>,
79 external_in: HashMap<usize, UnboundedReceiverStream<Bytes>>,
81 println_handler: fn(fmt::Arguments<'_>),
82 eprintln_handler: fn(fmt::Arguments<'_>),
83 ) -> (
84 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
85 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
86 Hooks<&'static str>,
87 InlineHooks<&'static str>,
88 ),
89>;
90
91impl CompiledSim {
92 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
94 self.with_instantiator(|instantiator| thunk(instantiator()), true)
95 }
96
97 pub fn with_instantiator<T>(
105 &self,
106 thunk: impl FnOnce(&dyn Instantiator) -> T,
107 always_log: bool,
108 ) -> T {
109 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
110 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
111 thunk(
112 &(|| CompiledSimInstance {
113 func: func.clone(),
114 externals_port_registry: self.externals_port_registry.clone(),
115 input_ports: HashMap::new(),
116 output_ports: HashMap::new(),
117 log,
118 }),
119 )
120 }
121
122 pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
133 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
134 .elements()
135 .into_iter()
136 .find(|e| {
137 !e.fn_name.starts_with("hydro_lang::sim::compiled")
138 && !e.fn_name.starts_with("hydro_lang::sim::flow")
139 && !e.fn_name.starts_with("fuzz<")
140 && !e.fn_name.starts_with("<hydro_lang::sim")
141 })
142 .unwrap();
143
144 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
145 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
146
147 let caller_fuzz_repro_path = repro_folder
148 .join(caller_fn.fn_name.replace("::", "__"))
149 .with_extension("bin");
150
151 if std::env::var("BOLERO_FUZZER").is_ok() {
152 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
153 std::fs::create_dir_all(&corpus_dir).unwrap();
154 let libfuzzer_args = format!(
155 "{} {} -artifact_prefix={}/ -handle_abrt=0",
156 corpus_dir.to_str().unwrap(),
157 corpus_dir.to_str().unwrap(),
158 corpus_dir.to_str().unwrap(),
159 );
160
161 std::fs::create_dir_all(&repro_folder).unwrap();
162
163 if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
164 unsafe {
165 std::env::set_var(
166 "BOLERO_FAILURE_OUTPUT",
167 caller_fuzz_repro_path.to_str().unwrap(),
168 );
169 }
170 }
171
172 unsafe {
173 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
174 }
175
176 self.with_instantiator(
177 |instantiator| {
178 bolero::test(bolero::TargetLocation {
179 package_name: "",
180 manifest_dir: "",
181 module_path: "",
182 file: "",
183 line: 0,
184 item_path: "<unknown>::__bolero_item_path__",
185 test_name: None,
186 })
187 .run_with_replay(move |is_replay| {
188 let mut instance = instantiator();
189
190 if instance.log {
191 eprintln!(
192 "{}",
193 "\n==== New Simulation Instance ===="
194 .color(colored::Color::Cyan)
195 .bold()
196 );
197 }
198
199 if is_replay {
200 instance.log = true;
201 }
202
203 tokio::runtime::Builder::new_current_thread()
204 .build()
205 .unwrap()
206 .block_on(async { instance.run(&mut thunk).await })
207 })
208 },
209 false,
210 );
211 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
212 self.fuzz_repro(existing_bytes, async |compiled| {
213 compiled.launch();
214 thunk().await
215 });
216 } else {
217 eprintln!(
218 "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
219 caller_fuzz_repro_path.display()
220 );
221 self.with_instantiator(
222 |instantiator| {
223 bolero::test(bolero::TargetLocation {
224 package_name: "",
225 manifest_dir: "",
226 module_path: "",
227 file: ".",
228 line: 0,
229 item_path: "<unknown>::__bolero_item_path__",
230 test_name: None,
231 })
232 .with_iterations(8192)
233 .run(move || {
234 let instance = instantiator();
235 tokio::runtime::Builder::new_current_thread()
236 .build()
237 .unwrap()
238 .block_on(async { instance.run(&mut thunk).await })
239 })
240 },
241 false,
242 );
243 }
244 }
245
246 pub fn fuzz_repro<'a>(
250 &'a self,
251 bytes: Vec<u8>,
252 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
253 ) {
254 self.with_instance(|instance| {
255 bolero::bolero_engine::any::scope::with(
256 Box::new(bolero::bolero_engine::driver::object::Object(
257 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
258 )),
259 || {
260 tokio::runtime::Builder::new_current_thread()
261 .build()
262 .unwrap()
263 .block_on(async { instance.run_without_launching(thunk).await })
264 },
265 )
266 });
267 }
268
269 pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
280 if std::env::var("BOLERO_FUZZER").is_ok() {
281 eprintln!(
282 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
283 );
284 std::process::abort();
285 }
286
287 let mut count = 0;
288 let count_mut = &mut count;
289
290 self.with_instantiator(
291 |instantiator| {
292 bolero::test(bolero::TargetLocation {
293 package_name: "",
294 manifest_dir: "",
295 module_path: "",
296 file: "",
297 line: 0,
298 item_path: "<unknown>::__bolero_item_path__",
299 test_name: None,
300 })
301 .exhaustive()
302 .run_with_replay(move |is_replay| {
303 *count_mut += 1;
304
305 let mut instance = instantiator();
306 if instance.log {
307 eprintln!(
308 "{}",
309 "\n==== New Simulation Instance ===="
310 .color(colored::Color::Cyan)
311 .bold()
312 );
313 }
314
315 if is_replay {
316 instance.log = true;
317 }
318
319 tokio::runtime::Builder::new_current_thread()
320 .build()
321 .unwrap()
322 .block_on(async { instance.run(&mut thunk).await })
323 })
324 },
325 false,
326 );
327
328 count
329 }
330}
331
332pub struct CompiledSimInstance<'a> {
335 func: SimLoaded<'a>,
336 externals_port_registry: SimExternalPortRegistry,
337 output_ports: HashMap<SimExternalPort, UnboundedSender<Bytes>>,
338 input_ports: HashMap<SimExternalPort, UnboundedReceiverStream<Bytes>>,
339 log: bool,
340}
341
342impl<'a> CompiledSimInstance<'a> {
343 async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
344 self.run_without_launching(async |instance| {
345 instance.launch();
346 thunk().await;
347 })
348 .await;
349 }
350
351 async fn run_without_launching(
352 mut self,
353 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
354 ) {
355 let mut input_senders = HashMap::new();
356 let mut output_receivers = HashMap::new();
357 for registered_port in self.externals_port_registry.port_counter.range_up_to() {
358 {
359 let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
360 self.output_ports.insert(registered_port, sender);
361 output_receivers.insert(
362 registered_port,
363 Rc::new(Mutex::new(UnboundedReceiverStream::new(
364 receiver.into_inner(),
365 ))),
366 );
367 }
368
369 {
370 let (sender, receiver) = dfir_rs::util::unbounded_channel::<Bytes>();
371 self.input_ports.insert(registered_port, receiver);
372 input_senders.insert(registered_port, Rc::new(sender));
373 }
374 }
375
376 let local_set = tokio::task::LocalSet::new();
377 local_set
378 .run_until(CURRENT_SIM_CONNECTIONS.scope(
379 RefCell::new(SimConnections {
380 input_senders,
381 output_receivers,
382 external_registered: self.externals_port_registry.registered.clone(),
383 }),
384 async move {
385 thunk(self).await;
386 },
387 ))
388 .await;
389 }
390
391 fn launch(self) {
394 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
395 }
396
397 pub fn schedule_with_logger<W: std::io::Write>(
400 self,
401 log_writer: W,
402 ) -> impl use<W> + Future<Output = ()> {
403 self.schedule_with_maybe_logger(Some(log_writer))
404 }
405
406 fn schedule_with_maybe_logger<W: std::io::Write>(
407 self,
408 log_override: Option<W>,
409 ) -> impl use<W> + Future<Output = ()> {
410 let (async_dfirs, tick_dfirs, hooks, inline_hooks) = unsafe {
411 (self.func)(
412 colored::control::SHOULD_COLORIZE.should_colorize(),
413 self.output_ports
414 .into_iter()
415 .map(|(k, v)| (k.into_inner(), v))
416 .collect(),
417 self.input_ports
418 .into_iter()
419 .map(|(k, v)| (k.into_inner(), v))
420 .collect(),
421 if self.log {
422 println_handler
423 } else {
424 null_handler
425 },
426 if self.log {
427 eprintln_handler
428 } else {
429 null_handler
430 },
431 )
432 };
433
434 let not_ready_observation = async_dfirs
435 .iter()
436 .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
437 .collect();
438
439 let mut launched = LaunchedSim {
440 async_dfirs: async_dfirs
441 .into_iter()
442 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
443 .collect(),
444 possibly_ready_ticks: vec![],
445 not_ready_ticks: tick_dfirs
446 .into_iter()
447 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
448 .collect(),
449 possibly_ready_observation: vec![],
450 not_ready_observation,
451 hooks: hooks
452 .into_iter()
453 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
454 .collect(),
455 inline_hooks: inline_hooks
456 .into_iter()
457 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
458 .collect(),
459 log: if self.log {
460 if let Some(w) = log_override {
461 LogKind::Custom(w)
462 } else {
463 LogKind::Stderr
464 }
465 } else {
466 LogKind::Null
467 },
468 };
469
470 async move { launched.scheduler().await }
471 }
472}
473
474impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
475 fn clone(&self) -> Self {
476 *self
477 }
478}
479
480impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
481
482impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
483 async fn with_stream<Out>(
484 &self,
485 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
486 ) -> Out {
487 let receiver = CURRENT_SIM_CONNECTIONS.with(|connections| {
488 let connections = &mut *connections.borrow_mut();
489 connections
490 .output_receivers
491 .get(connections.external_registered.get(&self.0).unwrap())
492 .unwrap()
493 .clone()
494 });
495
496 let mut receiver_stream = receiver.lock().await;
497 thunk(&mut pin!(
498 &mut receiver_stream
499 .by_ref()
500 .map(|b| bincode::deserialize(&b).unwrap())
501 ))
502 .await
503 }
504
505 pub fn assert_no_more(self) -> impl Future<Output = ()>
507 where
508 T: Debug,
509 {
510 FutureTrackingCaller {
511 future: async move {
512 self.with_stream(async |stream| {
513 if let Some(next) = stream.next().await {
514 return Err(format!(
515 "Stream yielded unexpected message: {:?}, expected termination",
516 next
517 ));
518 }
519 Ok(())
520 })
521 .await
522 },
523 }
524 }
525}
526
527impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
528 pub async fn next(&self) -> Option<T> {
531 self.with_stream(async |stream| stream.next().await).await
532 }
533
534 pub async fn collect<C: Default + Extend<T>>(self) -> C {
537 self.with_stream(async |stream| stream.collect().await)
538 .await
539 }
540
541 pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
544 &self,
545 expected: I,
546 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
547 where
548 T: Debug + PartialEq<T2>,
549 {
550 FutureTrackingCaller {
551 future: async {
552 let mut expected: VecDeque<T2> = expected.into_iter().collect();
553
554 while !expected.is_empty() {
555 if let Some(next) = self.next().await {
556 let next_expected = expected.pop_front().unwrap();
557 if next != next_expected {
558 return Err(format!(
559 "Stream yielded unexpected message: {:?}, expected: {:?}",
560 next, next_expected
561 ));
562 }
563 } else {
564 return Err(format!(
565 "Stream ended early, still expected: {:?}",
566 expected
567 ));
568 }
569 }
570
571 Ok(())
572 },
573 }
574 }
575
576 pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
579 &self,
580 expected: I,
581 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
582 where
583 T: Debug + PartialEq<T2>,
584 {
585 ChainedFuture {
586 first: self.assert_yields(expected),
587 second: self.assert_no_more(),
588 first_done: false,
589 }
590 }
591}
592
593pin_project_lite::pin_project! {
594 struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
604 #[pin]
605 future: F,
606 }
607}
608
609impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
610 type Output = ();
611
612 #[track_caller]
613 fn poll(
614 mut self: Pin<&mut Self>,
615 cx: &mut std::task::Context<'_>,
616 ) -> std::task::Poll<Self::Output> {
617 match ready!(self.as_mut().project().future.poll(cx)) {
618 Ok(()) => std::task::Poll::Ready(()),
619 Err(e) => panic!("{}", e),
620 }
621 }
622}
623
624pin_project_lite::pin_project! {
625 struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
629 #[pin]
630 first: F1,
631 #[pin]
632 second: F2,
633 first_done: bool,
634 }
635}
636
637impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
638 type Output = ();
639
640 #[track_caller]
641 fn poll(
642 mut self: Pin<&mut Self>,
643 cx: &mut std::task::Context<'_>,
644 ) -> std::task::Poll<Self::Output> {
645 if !self.first_done {
646 ready!(self.as_mut().project().first.poll(cx));
647 *self.as_mut().project().first_done = true;
648 }
649
650 self.as_mut().project().second.poll(cx)
651 }
652}
653
654impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
655 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
658 where
659 T: Ord,
660 {
661 self.with_stream(async |stream| {
662 let mut collected: C = stream.collect().await;
663 collected.as_mut().sort();
664 collected
665 })
666 .await
667 }
668
669 pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
672 &self,
673 expected: I,
674 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
675 where
676 T: Debug + PartialEq<T2>,
677 {
678 FutureTrackingCaller {
679 future: async {
680 self.with_stream(async |stream| {
681 let mut expected: Vec<T2> = expected.into_iter().collect();
682
683 while !expected.is_empty() {
684 if let Some(next) = stream.next().await {
685 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
686 if let Some((i, _)) = idx {
687 expected.swap_remove(i);
688 } else {
689 return Err(format!(
690 "Stream yielded unexpected message: {:?}",
691 next
692 ));
693 }
694 } else {
695 return Err(format!(
696 "Stream ended early, still expected: {:?}",
697 expected
698 ));
699 }
700 }
701
702 Ok(())
703 })
704 .await
705 },
706 }
707 }
708
709 pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
712 &self,
713 expected: I,
714 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
715 where
716 T: Debug + PartialEq<T2>,
717 {
718 ChainedFuture {
719 first: self.assert_yields_unordered(expected),
720 second: self.assert_no_more(),
721 first_done: false,
722 }
723 }
724}
725
726impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
727 fn with_sink<Out>(
728 &self,
729 thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
730 ) -> Out {
731 let sender = CURRENT_SIM_CONNECTIONS.with(|connections| {
732 let connections = &mut *connections.borrow_mut();
733 connections
734 .input_senders
735 .get(connections.external_registered.get(&self.0).unwrap())
736 .unwrap()
737 .clone()
738 });
739
740 thunk(&move |t| sender.send(bincode::serialize(&t).unwrap().into()))
741 }
742}
743
744impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
745 pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
748 self.with_sink(|send| {
749 for t in iter {
750 send(t).unwrap();
751 }
752 })
753 }
754}
755
756impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
757 pub fn send(&self, t: T) {
760 self.with_sink(|send| send(t)).unwrap();
761 }
762
763 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
766 self.with_sink(|send| {
767 for t in iter {
768 send(t).unwrap();
769 }
770 })
771 }
772}
773
774enum LogKind<W: std::io::Write> {
775 Null,
776 Stderr,
777 Custom(W),
778}
779
780impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
782 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
783 match self {
784 LogKind::Null => Ok(()),
785 LogKind::Stderr => {
786 eprint!("{}", s);
787 Ok(())
788 }
789 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
790 }
791 }
792}
793
794struct LaunchedSim<W: std::io::Write> {
797 async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
798 possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
799 not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
800 possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
801 not_ready_observation: Vec<(LocationId, Option<u32>)>,
802 hooks: Hooks<LocationId>,
803 inline_hooks: InlineHooks<LocationId>,
804 log: LogKind<W>,
805}
806
807impl<W: std::io::Write> LaunchedSim<W> {
808 async fn scheduler(&mut self) {
809 loop {
810 tokio::task::yield_now().await;
811 let mut any_made_progress = false;
812 for (loc, c_id, dfir) in &mut self.async_dfirs {
813 if dfir.run_tick().await {
814 any_made_progress = true;
815 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
816 .not_ready_ticks
817 .drain(..)
818 .partition(|(tick_loc, tick_c_id, _)| {
819 let LocationId::Tick(_, outer) = tick_loc else {
820 unreachable!()
821 };
822 outer.as_ref() == loc && tick_c_id == c_id
823 });
824
825 self.possibly_ready_ticks.extend(now_ready);
826 self.not_ready_ticks.extend(still_not_ready);
827
828 let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
829 .not_ready_observation
830 .drain(..)
831 .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
832
833 self.possibly_ready_observation.extend(now_ready_obs);
834 self.not_ready_observation.extend(still_not_ready_obs);
835 }
836 }
837
838 if any_made_progress {
839 continue;
840 } else {
841 use bolero::generator::*;
842
843 let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
844 .possibly_ready_ticks
845 .drain(..)
846 .partition(|(name, cid, _)| {
847 self.hooks
848 .get(&(name.clone(), *cid))
849 .unwrap()
850 .iter()
851 .any(|hook| {
852 hook.current_decision().unwrap_or(false)
853 || hook.can_make_nontrivial_decision()
854 })
855 });
856
857 self.possibly_ready_ticks = ready_tick;
858 self.not_ready_ticks.append(&mut not_ready_tick);
859
860 let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
861 .possibly_ready_observation
862 .drain(..)
863 .partition(|(name, cid)| {
864 self.hooks
865 .get(&(name.clone(), *cid))
866 .into_iter()
867 .flatten()
868 .any(|hook| {
869 hook.current_decision().unwrap_or(false)
870 || hook.can_make_nontrivial_decision()
871 })
872 });
873
874 self.possibly_ready_observation = ready_obs;
875 self.not_ready_observation.append(&mut not_ready_obs);
876
877 if self.possibly_ready_ticks.is_empty()
878 && self.possibly_ready_observation.is_empty()
879 {
880 break;
881 } else {
882 let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
883 + self.possibly_ready_observation.len()))
884 .any();
885
886 if next_tick_or_obs < self.possibly_ready_ticks.len() {
887 let next_tick = next_tick_or_obs;
888 let mut removed = self.possibly_ready_ticks.remove(next_tick);
889
890 match &mut self.log {
891 LogKind::Null => {}
892 LogKind::Stderr => {
893 if let Some(cid) = &removed.1 {
894 eprintln!(
895 "\n{}",
896 format!("Running Tick (Cluster Member {})", cid)
897 .color(colored::Color::Magenta)
898 .bold()
899 )
900 } else {
901 eprintln!(
902 "\n{}",
903 "Running Tick".color(colored::Color::Magenta).bold()
904 )
905 }
906 }
907 LogKind::Custom(writer) => {
908 writeln!(
909 writer,
910 "\n{}",
911 "Running Tick".color(colored::Color::Magenta).bold()
912 )
913 .unwrap();
914 }
915 }
916
917 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
918 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
919 write.write_str(" ")
920 };
921
922 let mut tick_decision_writer = indenter::indented(&mut self.log)
923 .with_format(indenter::Format::Custom {
924 inserter: &mut asterisk_indenter,
925 });
926
927 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
928 run_hooks(&mut tick_decision_writer, hooks);
929
930 let run_tick_future = removed.2.run_tick();
931 if let Some(inline_hooks) =
932 self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
933 {
934 let mut run_tick_future_pinned = pin!(run_tick_future);
935
936 loop {
937 tokio::select! {
938 biased;
939 r = &mut run_tick_future_pinned => {
940 assert!(r);
941 break;
942 }
943 _ = async {} => {
944 bolero_generator::any::scope::borrow_with(|driver| {
945 for hook in inline_hooks.iter_mut() {
946 if hook.pending_decision() {
947 if !hook.has_decision() {
948 hook.autonomous_decision(driver);
949 }
950
951 hook.release_decision(&mut tick_decision_writer);
952 }
953 }
954 });
955 }
956 }
957 }
958 } else {
959 assert!(run_tick_future.await);
960 }
961
962 self.possibly_ready_ticks.push(removed);
963 } else {
964 let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
965 let mut default_hooks = vec![];
966 let hooks = self
967 .hooks
968 .get_mut(&self.possibly_ready_observation[next_obs])
969 .unwrap_or(&mut default_hooks);
970
971 run_hooks(&mut self.log, hooks);
972 }
973 }
974 }
975 }
976 }
977}
978
979fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
980 let mut remaining_decision_count = hooks.len();
981 let mut made_nontrivial_decision = false;
982
983 bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
984 hooks.iter_mut().for_each(|hook| {
986 if let Some(is_nontrivial) = hook.current_decision() {
987 made_nontrivial_decision |= is_nontrivial;
988 remaining_decision_count -= 1;
989 } else if !hook.can_make_nontrivial_decision() {
990 hook.autonomous_decision(driver, false);
994 remaining_decision_count -= 1;
995 }
996 });
997
998 hooks.iter_mut().for_each(|hook| {
999 if hook.current_decision().is_none() {
1000 made_nontrivial_decision |= hook.autonomous_decision(
1001 driver,
1002 !made_nontrivial_decision && remaining_decision_count == 1,
1003 );
1004 remaining_decision_count -= 1;
1005 }
1006
1007 hook.release_decision(tick_decision_writer);
1008 });
1009 });
1010}