Skip to main content

hydro_lang/live_collections/sliced/
mod.rs

1//! Utilities for transforming live collections via slicing.
2
3pub mod style;
4
5use super::boundedness::{Bounded, Unbounded};
6use super::stream::{Ordering, Retries};
7use crate::location::{Location, NoTick, Tick};
8
9#[doc(hidden)]
10#[macro_export]
11macro_rules! __sliced_parse_uses__ {
12    // Parse immutable use statements with style: let name = use::style(args...);
13    (
14        @uses [$($uses:tt)*]
15        @states [$($states:tt)*]
16        let $name:ident = use:: $invocation:expr; $($rest:tt)*
17    ) => {
18        $crate::__sliced_parse_uses__!(
19            @uses [$($uses)* { $name, $invocation, $invocation }]
20            @states [$($states)*]
21            $($rest)*
22        )
23    };
24
25    // Parse immutable use statements without style: let name = use(args...);
26    (
27        @uses [$($uses:tt)*]
28        @states [$($states:tt)*]
29        let $name:ident = use($($args:expr),* $(,)?); $($rest:tt)*
30    ) => {
31        $crate::__sliced_parse_uses__!(
32            @uses [$($uses)* { $name, $crate::macro_support::copy_span::copy_span!($($args,)* default)($($args),*), $($args),* }]
33            @states [$($states)*]
34            $($rest)*
35        )
36    };
37
38    // Parse mutable state statements: let mut name = use::style::<Type>(args);
39    (
40        @uses [$($uses:tt)*]
41        @states [$($states:tt)*]
42        let mut $name:ident = use:: $style:ident $(::<$ty:ty>)? ($($args:expr)?); $($rest:tt)*
43    ) => {
44        $crate::__sliced_parse_uses__!(
45            @uses [$($uses)*]
46            @states [$($states)* { $name, $style, (($($ty)?), ($($args)?)) }]
47            $($rest)*
48        )
49    };
50
51    // Terminal case: no uses, only states
52    (
53        @uses []
54        @states [$({ $state_name:ident, $state_style:ident, $state_arg:tt })+]
55        $($body:tt)*
56    ) => {
57        {
58            // We need at least one use to get a tick, so panic if there are none
59            compile_error!("sliced! requires at least one `let name = use(...)` statement to determine the tick")
60        }
61    };
62
63    // Terminal case: uses with optional states
64    (
65        @uses [$({ $use_name:ident, $invocation:expr, $($invocation_spans:expr),* })+]
66        @states [$({ $state_name:ident, $state_style:ident, (($($state_ty:ty)?), ($($state_arg:expr)?)) })*]
67        $($body:tt)*
68    ) => {
69        {
70            use $crate::live_collections::sliced::style::*;
71            let __styled = (
72                $($invocation,)+
73            );
74
75            let __tick = $crate::live_collections::sliced::Slicable::preferred_tick(&__styled).unwrap_or_else(|| $crate::live_collections::sliced::Slicable::create_tick(&__styled.0));
76            let __backtraces = {
77                use $crate::compile::ir::backtrace::__macro_get_backtrace;
78                (
79                    $($crate::macro_support::copy_span::copy_span!($($invocation_spans,)* {
80                        __macro_get_backtrace(1)
81                    }),)+
82                )
83            };
84            let __sliced = $crate::live_collections::sliced::Slicable::slice(__styled, &__tick, __backtraces);
85            let (
86                $($use_name,)+
87            ) = __sliced;
88
89            // Create all cycles and pack handles/values into tuples
90            let (__handles, __states) = $crate::live_collections::sliced::unzip_cycles((
91                $($crate::live_collections::sliced::style::$state_style$(::<$state_ty, _>)?(& __tick, $($state_arg)?),)*
92            ));
93
94            // Unpack mutable state values
95            let (
96                $(mut $state_name,)*
97            ) = __states;
98
99            // Execute the body
100            let __body_result = {
101                $($body)*
102            };
103
104            // Re-pack the final state values and complete cycles
105            let __final_states = (
106                $($state_name,)*
107            );
108            $crate::live_collections::sliced::complete_cycles(__handles, __final_states);
109
110            // Unslice the result
111            $crate::live_collections::sliced::Unslicable::unslice(__body_result)
112        }
113    };
114}
115
116#[macro_export]
117/// Transforms a live collection with a computation relying on a slice of another live collection.
118/// This is useful for reading a snapshot of an asynchronously updated collection while processing another
119/// collection, such as joining a stream with the latest values from a singleton.
120///
121/// # Syntax
122/// The `sliced!` macro takes in a closure-like syntax specifying the live collections to be sliced
123/// and the body of the transformation. Each `use` statement indicates a live collection to be sliced,
124/// along with a non-determinism explanation. Optionally, a style can be specified to control how the
125/// live collection is sliced (e.g., atomically). All `use` statements must appear before the body.
126///
127/// ```rust,ignore
128/// let stream = sliced! {
129///     let name1 = use(collection1, nondet!(/** explanation */));
130///     let name2 = use::atomic(collection2, nondet!(/** explanation */));
131///
132///     // arbitrary statements can follow
133///     let intermediate = name1.map(...);
134///     intermediate.cross_singleton(name2)
135/// };
136/// ```
137///
138/// # Stateful Computations
139/// The `sliced!` macro also supports stateful computations across iterations using `let mut` bindings
140/// with `use::state` or `use::state_null`. These create cycles that persist values between iterations.
141///
142/// - `use::state(|l| initial)`: Creates a cycle with an initial value. The closure receives
143///   the slice location and returns the initial state for the first iteration.
144/// - `use::state_null::<Type>()`: Creates a cycle that starts as null/empty on the first iteration.
145///
146/// The mutable binding can be reassigned in the body, and the final value will be passed to the
147/// next iteration.
148///
149/// ```rust,ignore
150/// let counter_stream = sliced! {
151///     let batch = use(input_stream, nondet!(/** explanation */));
152///     let mut counter = use::state(|l| l.singleton(q!(0)));
153///
154///     // Increment counter by the number of items in this batch
155///     let new_count = counter.clone().zip(batch.count())
156///         .map(q!(|(old, add)| old + add));
157///     counter = new_count.clone();
158///     new_count.into_stream()
159/// };
160/// ```
161macro_rules! __sliced__ {
162    ($($tt:tt)*) => {
163        $crate::__sliced_parse_uses__!(
164            @uses []
165            @states []
166            $($tt)*
167        )
168    };
169}
170
171pub use crate::__sliced__ as sliced;
172
173/// Marks this live collection as atomically-yielded, which means that the output outside
174/// `sliced` will be at an atomic location that is synchronous with respect to the body
175/// of the slice.
176pub fn yield_atomic<T>(t: T) -> style::Atomic<T> {
177    style::Atomic {
178        collection: t,
179        // yield_atomic doesn't need a nondet since it's for output, not input
180        nondet: crate::nondet::NonDet,
181    }
182}
183
184/// A trait for live collections which can be sliced into bounded versions at a tick.
185pub trait Slicable<'a, L: Location<'a>> {
186    /// The sliced version of this live collection.
187    type Slice;
188
189    /// The type of backtrace associated with this slice.
190    type Backtrace;
191
192    /// Gets the preferred tick to slice at. Used for atomic slicing.
193    fn preferred_tick(&self) -> Option<Tick<L>>;
194
195    /// Gets the location associated with this live collection.
196    fn get_location(&self) -> &L;
197
198    /// Creates a tick that is appropriate for the collection's location.
199    fn create_tick(&self) -> Tick<L>
200    where
201        L: NoTick,
202    {
203        self.get_location().tick()
204    }
205
206    /// Slices this live collection at the given tick.
207    ///
208    /// # Non-Determinism
209    /// Slicing a live collection may involve non-determinism, such as choosing which messages
210    /// to include in a batch.
211    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace) -> Self::Slice;
212}
213
214/// A trait for live collections which can be yielded out of a slice back into their original form.
215pub trait Unslicable {
216    /// The unsliced version of this live collection.
217    type Unsliced;
218
219    /// Unslices a sliced live collection back into its original form.
220    fn unslice(self) -> Self::Unsliced;
221}
222
223/// A trait for unzipping a tuple of (handle, state) pairs into separate tuples.
224#[doc(hidden)]
225pub trait UnzipCycles {
226    /// The tuple of cycle handles.
227    type Handles;
228    /// The tuple of state values.
229    type States;
230
231    /// Unzips the cycles into handles and states.
232    fn unzip(self) -> (Self::Handles, Self::States);
233}
234
235/// Unzips a tuple of cycles into handles and states.
236#[doc(hidden)]
237pub fn unzip_cycles<T: UnzipCycles>(cycles: T) -> (T::Handles, T::States) {
238    cycles.unzip()
239}
240
241/// A trait for completing a tuple of cycle handles with their final state values.
242#[doc(hidden)]
243pub trait CompleteCycles<States> {
244    /// Completes all cycles with the provided state values.
245    fn complete(self, states: States);
246}
247
248/// Completes a tuple of cycle handles with their final state values.
249#[doc(hidden)]
250pub fn complete_cycles<H: CompleteCycles<S>, S>(handles: H, states: S) {
251    handles.complete(states);
252}
253
254impl<'a, L: Location<'a>> Slicable<'a, L> for () {
255    type Slice = ();
256    type Backtrace = ();
257
258    fn get_location(&self) -> &L {
259        unreachable!()
260    }
261
262    fn preferred_tick(&self) -> Option<Tick<L>> {
263        None
264    }
265
266    fn slice(self, _tick: &Tick<L>, _backtrace: Self::Backtrace) -> Self::Slice {}
267}
268
269impl Unslicable for () {
270    type Unsliced = ();
271
272    fn unslice(self) -> Self::Unsliced {}
273}
274
275macro_rules! impl_slicable_for_tuple {
276    ($($T:ident, $T_bt:ident, $idx:tt),+) => {
277        impl<'a, L: Location<'a>, $($T: Slicable<'a, L>),+> Slicable<'a, L> for ($($T,)+) {
278            type Slice = ($($T::Slice,)+);
279            type Backtrace = ($($T::Backtrace,)+);
280
281            fn get_location(&self) -> &L {
282                self.0.get_location()
283            }
284
285            fn preferred_tick(&self) -> Option<Tick<L>> {
286                let mut preferred: Option<Tick<L>> = None;
287                $(
288                    if let Some(tick) = self.$idx.preferred_tick() {
289                        preferred = Some(match preferred {
290                            Some(current) => {
291                                if $crate::location::Location::id(&current) == $crate::location::Location::id(&tick) {
292                                    current
293                                } else {
294                                    panic!("Mismatched preferred ticks for sliced collections")
295                                }
296                            },
297                            None => tick,
298                        });
299                    }
300                )+
301                preferred
302            }
303
304            #[expect(non_snake_case, reason = "macro codegen")]
305            fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace) -> Self::Slice {
306                let ($($T,)+) = self;
307                let ($($T_bt,)+) = backtrace;
308                ($($T.slice(tick, $T_bt),)+)
309            }
310        }
311
312        impl<$($T: Unslicable),+> Unslicable for ($($T,)+) {
313            type Unsliced = ($($T::Unsliced,)+);
314
315            #[expect(non_snake_case, reason = "macro codegen")]
316            fn unslice(self) -> Self::Unsliced {
317                let ($($T,)+) = self;
318                ($($T.unslice(),)+)
319            }
320        }
321    };
322}
323
324#[cfg(stageleft_runtime)]
325impl_slicable_for_tuple!(S1, S1_bt, 0);
326#[cfg(stageleft_runtime)]
327impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1);
328#[cfg(stageleft_runtime)]
329impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2);
330#[cfg(stageleft_runtime)]
331impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3);
332#[cfg(stageleft_runtime)]
333impl_slicable_for_tuple!(
334    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4
335);
336#[cfg(stageleft_runtime)]
337impl_slicable_for_tuple!(
338    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5
339);
340#[cfg(stageleft_runtime)]
341impl_slicable_for_tuple!(
342    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
343    6
344);
345#[cfg(stageleft_runtime)]
346impl_slicable_for_tuple!(
347    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
348    6, S8, S8_bt, 7
349);
350#[cfg(stageleft_runtime)]
351impl_slicable_for_tuple!(
352    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
353    6, S8, S8_bt, 7, S9, S9_bt, 8
354);
355#[cfg(stageleft_runtime)]
356impl_slicable_for_tuple!(
357    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
358    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9
359);
360#[cfg(stageleft_runtime)]
361impl_slicable_for_tuple!(
362    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
363    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9, S11, S11_bt, 10
364);
365#[cfg(stageleft_runtime)]
366impl_slicable_for_tuple!(
367    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
368    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9, S11, S11_bt, 10, S12, S12_bt, 11
369);
370
371macro_rules! impl_cycles_for_tuple {
372    ($($H:ident, $S:ident, $idx:tt),*) => {
373        impl<$($H, $S),*> UnzipCycles for ($(($H, $S),)*) {
374            type Handles = ($($H,)*);
375            type States = ($($S,)*);
376
377            #[expect(clippy::allow_attributes, reason = "macro codegen")]
378            #[allow(non_snake_case, reason = "macro codegen")]
379            fn unzip(self) -> (Self::Handles, Self::States) {
380                let ($($H,)*) = self;
381                (
382                    ($($H.0,)*),
383                    ($($H.1,)*),
384                )
385            }
386        }
387
388        impl<$($H: crate::forward_handle::CompleteCycle<$S>, $S),*> CompleteCycles<($($S,)*)> for ($($H,)*) {
389            #[expect(clippy::allow_attributes, reason = "macro codegen")]
390            #[allow(non_snake_case, reason = "macro codegen")]
391            fn complete(self, states: ($($S,)*)) {
392                let ($($H,)*) = self;
393                let ($($S,)*) = states;
394                $($H.complete_next_tick($S);)*
395            }
396        }
397    };
398}
399
400#[cfg(stageleft_runtime)]
401impl_cycles_for_tuple!();
402#[cfg(stageleft_runtime)]
403impl_cycles_for_tuple!(H1, S1, 0);
404#[cfg(stageleft_runtime)]
405impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1);
406#[cfg(stageleft_runtime)]
407impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2);
408#[cfg(stageleft_runtime)]
409impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3);
410#[cfg(stageleft_runtime)]
411impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4);
412#[cfg(stageleft_runtime)]
413impl_cycles_for_tuple!(
414    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5
415);
416#[cfg(stageleft_runtime)]
417impl_cycles_for_tuple!(
418    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6
419);
420#[cfg(stageleft_runtime)]
421impl_cycles_for_tuple!(
422    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7
423);
424#[cfg(stageleft_runtime)]
425impl_cycles_for_tuple!(
426    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
427    8
428);
429#[cfg(stageleft_runtime)]
430impl_cycles_for_tuple!(
431    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
432    8, H10, S10, 9
433);
434#[cfg(stageleft_runtime)]
435impl_cycles_for_tuple!(
436    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
437    8, H10, S10, 9, H11, S11, 10
438);
439#[cfg(stageleft_runtime)]
440impl_cycles_for_tuple!(
441    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
442    8, H10, S10, 9, H11, S11, 10, H12, S12, 11
443);
444
445// Unslicable implementations for plain collections (used when returning from sliced! body)
446impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Unslicable
447    for super::Stream<T, Tick<L>, Bounded, O, R>
448{
449    type Unsliced = super::Stream<T, L, Unbounded, O, R>;
450
451    fn unslice(self) -> Self::Unsliced {
452        self.all_ticks()
453    }
454}
455
456impl<'a, T, L: Location<'a>> Unslicable for super::Singleton<T, Tick<L>, Bounded> {
457    type Unsliced = super::Singleton<T, L, Unbounded>;
458
459    fn unslice(self) -> Self::Unsliced {
460        self.latest()
461    }
462}
463
464impl<'a, T, L: Location<'a>> Unslicable for super::Optional<T, Tick<L>, Bounded> {
465    type Unsliced = super::Optional<T, L, Unbounded>;
466
467    fn unslice(self) -> Self::Unsliced {
468        self.latest()
469    }
470}
471
472impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Unslicable
473    for super::KeyedStream<K, V, Tick<L>, Bounded, O, R>
474{
475    type Unsliced = super::KeyedStream<K, V, L, Unbounded, O, R>;
476
477    fn unslice(self) -> Self::Unsliced {
478        self.all_ticks()
479    }
480}
481
482// Unslicable implementations for Atomic-wrapped bounded collections
483impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Unslicable
484    for style::Atomic<super::Stream<T, Tick<L>, Bounded, O, R>>
485{
486    type Unsliced = super::Stream<T, crate::location::Atomic<L>, Unbounded, O, R>;
487
488    fn unslice(self) -> Self::Unsliced {
489        self.collection.all_ticks_atomic()
490    }
491}
492
493impl<'a, T, L: Location<'a> + NoTick> Unslicable
494    for style::Atomic<super::Singleton<T, Tick<L>, Bounded>>
495{
496    type Unsliced = super::Singleton<T, crate::location::Atomic<L>, Unbounded>;
497
498    fn unslice(self) -> Self::Unsliced {
499        self.collection.latest_atomic()
500    }
501}
502
503impl<'a, T, L: Location<'a> + NoTick> Unslicable
504    for style::Atomic<super::Optional<T, Tick<L>, Bounded>>
505{
506    type Unsliced = super::Optional<T, crate::location::Atomic<L>, Unbounded>;
507
508    fn unslice(self) -> Self::Unsliced {
509        self.collection.latest_atomic()
510    }
511}
512
513impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries> Unslicable
514    for style::Atomic<super::KeyedStream<K, V, Tick<L>, Bounded, O, R>>
515{
516    type Unsliced = super::KeyedStream<K, V, crate::location::Atomic<L>, Unbounded, O, R>;
517
518    fn unslice(self) -> Self::Unsliced {
519        self.collection.all_ticks_atomic()
520    }
521}
522
523#[cfg(feature = "sim")]
524#[cfg(test)]
525mod tests {
526    use stageleft::q;
527
528    use super::sliced;
529    use crate::location::Location;
530    use crate::nondet::nondet;
531    use crate::prelude::FlowBuilder;
532
533    /// Test a counter using `use::state` with an initial singleton value.
534    /// Each input increments the counter, and we verify the output after each tick.
535    #[test]
536    fn sim_state_counter() {
537        let mut flow = FlowBuilder::new();
538        let node = flow.process::<()>();
539
540        let (input_send, input) = node.sim_input::<i32, _, _>();
541
542        let out_recv = sliced! {
543            let batch = use(input, nondet!(/** test */));
544            let mut counter = use::state(|l| l.singleton(q!(0)));
545
546            let new_count = counter.clone().zip(batch.count())
547                .map(q!(|(old, add)| old + add));
548            counter = new_count.clone();
549            new_count.into_stream()
550        }
551        .sim_output();
552
553        flow.sim().exhaustive(async || {
554            input_send.send(1);
555            assert_eq!(out_recv.next().await.unwrap(), 1);
556
557            input_send.send(1);
558            assert_eq!(out_recv.next().await.unwrap(), 2);
559
560            input_send.send(1);
561            assert_eq!(out_recv.next().await.unwrap(), 3);
562        });
563    }
564
565    /// Test `use::state_null` with an Optional that starts as None.
566    #[cfg(feature = "sim")]
567    #[test]
568    fn sim_state_null_optional() {
569        use crate::live_collections::Optional;
570        use crate::live_collections::boundedness::Bounded;
571        use crate::location::{Location, Tick};
572
573        let mut flow = FlowBuilder::new();
574        let node = flow.process::<()>();
575
576        let (input_send, input) = node.sim_input::<i32, _, _>();
577
578        let out_recv = sliced! {
579            let batch = use(input, nondet!(/** test */));
580            let mut prev = use::state_null::<Optional<i32, Tick<_>, Bounded>>();
581
582            // Output the previous value (or -1 if none)
583            let output = prev.clone().unwrap_or(prev.location().singleton(q!(-1)));
584            // Store the current batch's first value for next tick
585            prev = batch.first();
586            output.into_stream()
587        }
588        .sim_output();
589
590        flow.sim().exhaustive(async || {
591            input_send.send(10);
592            // First tick: prev is None, so output is -1
593            assert_eq!(out_recv.next().await.unwrap(), -1);
594
595            input_send.send(20);
596            // Second tick: prev is Some(10), so output is 10
597            assert_eq!(out_recv.next().await.unwrap(), 10);
598
599            input_send.send(30);
600            // Third tick: prev is Some(20), so output is 20
601            assert_eq!(out_recv.next().await.unwrap(), 20);
602        });
603    }
604
605    /// Test `use::state` with `source_iter` to initialize a stream state.
606    /// On the first tick, the state is the initial `[10, 20]` from `source_iter`.
607    /// On subsequent ticks, the state is the batch from the previous tick.
608    #[test]
609    fn sim_state_source_iter() {
610        let mut flow = FlowBuilder::new();
611        let node = flow.process::<()>();
612
613        let (input_send, input) = node.sim_input::<i32, _, _>();
614
615        let out_recv = sliced! {
616            let batch = use(input, nondet!(/** test */));
617            let mut items = use::state(|l| l.source_iter(q!([10, 20])));
618
619            // Output the current state, then replace it with the batch
620            let output = items.clone();
621            items = batch;
622            output
623        }
624        .sim_output();
625
626        flow.sim().exhaustive(async || {
627            input_send.send(3);
628            // First tick: items = initial [10, 20], output = [10, 20]
629            let mut results = vec![];
630            results.push(out_recv.next().await.unwrap());
631            results.push(out_recv.next().await.unwrap());
632            results.sort();
633            assert_eq!(results, vec![10, 20]);
634
635            input_send.send(4);
636            // Second tick: items = [3] (from previous batch), output = [3]
637            assert_eq!(out_recv.next().await.unwrap(), 3);
638
639            input_send.send(5);
640            // Third tick: items = [4] (from previous batch), output = [4]
641            assert_eq!(out_recv.next().await.unwrap(), 4);
642        });
643    }
644
645    /// Test atomic slicing with keyed streams.
646    #[test]
647    fn sim_sliced_atomic_keyed_stream() {
648        let mut flow = FlowBuilder::new();
649        let node = flow.process::<()>();
650
651        let (input_send, input) = node.sim_input::<(i32, i32), _, _>();
652        let tick = node.tick();
653        let atomic_keyed_input = input.into_keyed().atomic(&tick);
654        let accumulated_inputs = atomic_keyed_input
655            .clone()
656            .assume_ordering(nondet!(/** Test */))
657            .fold(
658                q!(|| 0),
659                q!(|curr, new| {
660                    *curr += new;
661                }),
662            );
663
664        let out_recv = sliced! {
665            let atomic_keyed_input = use::atomic(atomic_keyed_input, nondet!(/** test */));
666            let accumulated_inputs = use::atomic(accumulated_inputs, nondet!(/** test */));
667            accumulated_inputs.join_keyed_stream(atomic_keyed_input)
668                .map(q!(|(sum, _input)| sum))
669                .entries()
670        }
671        .assume_ordering_trusted(nondet!(/** test */))
672        .sim_output();
673
674        flow.sim().exhaustive(async || {
675            input_send.send((1, 1));
676            assert_eq!(out_recv.next().await.unwrap(), (1, 1));
677
678            input_send.send((1, 2));
679            assert_eq!(out_recv.next().await.unwrap(), (1, 3));
680
681            input_send.send((2, 1));
682            assert_eq!(out_recv.next().await.unwrap(), (2, 1));
683
684            input_send.send((1, 3));
685            assert_eq!(out_recv.next().await.unwrap(), (1, 6));
686        });
687    }
688}