Skip to main content

hydro_lang/live_collections/sliced/
mod.rs

1//! Utilities for transforming live collections via slicing.
2
3pub mod style;
4
5use super::boundedness::{Bounded, Unbounded};
6use super::stream::{Ordering, Retries};
7use crate::location::{Location, NoTick, Tick};
8
9#[doc(hidden)]
10#[macro_export]
11macro_rules! __sliced_parse_uses__ {
12    // Parse immutable use statements with style: let name = use::style(args...);
13    (
14        @uses [$($uses:tt)*]
15        @states [$($states:tt)*]
16        let $name:ident = use:: $invocation:expr; $($rest:tt)*
17    ) => {
18        $crate::__sliced_parse_uses__!(
19            @uses [$($uses)* { $name, $invocation, $invocation }]
20            @states [$($states)*]
21            $($rest)*
22        )
23    };
24
25    // Parse immutable use statements without style: let name = use(args...);
26    (
27        @uses [$($uses:tt)*]
28        @states [$($states:tt)*]
29        let $name:ident = use($($args:expr),* $(,)?); $($rest:tt)*
30    ) => {
31        $crate::__sliced_parse_uses__!(
32            @uses [$($uses)* { $name, $crate::macro_support::copy_span::copy_span!($($args,)* default)($($args),*), $($args),* }]
33            @states [$($states)*]
34            $($rest)*
35        )
36    };
37
38    // Parse mutable state statements: let mut name = use::style::<Type>(args);
39    (
40        @uses [$($uses:tt)*]
41        @states [$($states:tt)*]
42        let mut $name:ident = use:: $style:ident $(::<$ty:ty>)? ($($args:expr)?); $($rest:tt)*
43    ) => {
44        $crate::__sliced_parse_uses__!(
45            @uses [$($uses)*]
46            @states [$($states)* { $name, $style, (($($ty)?), ($($args)?)) }]
47            $($rest)*
48        )
49    };
50
51    // Terminal case: no uses, only states
52    (
53        @uses []
54        @states [$({ $state_name:ident, $state_style:ident, $state_arg:tt })+]
55        $($body:tt)*
56    ) => {
57        {
58            // We need at least one use to get a tick, so panic if there are none
59            compile_error!("sliced! requires at least one `let name = use(...)` statement to determine the tick")
60        }
61    };
62
63    // Terminal case: uses with optional states
64    (
65        @uses [$({ $use_name:ident, $invocation:expr, $($invocation_spans:expr),* })+]
66        @states [$({ $state_name:ident, $state_style:ident, (($($state_ty:ty)?), ($($state_arg:expr)?)) })*]
67        $($body:tt)*
68    ) => {
69        {
70            use $crate::live_collections::sliced::style::*;
71            let __styled = (
72                $($invocation,)+
73            );
74
75            let __tick = $crate::live_collections::sliced::Slicable::preferred_tick(&__styled).unwrap_or_else(|| $crate::live_collections::sliced::Slicable::create_tick(&__styled.0));
76            let __backtraces = {
77                use $crate::compile::ir::backtrace::__macro_get_backtrace;
78                (
79                    $($crate::macro_support::copy_span::copy_span!($($invocation_spans,)* {
80                        __macro_get_backtrace(1)
81                    }),)+
82                )
83            };
84            let __sliced = $crate::live_collections::sliced::Slicable::slice(__styled, &__tick, __backtraces);
85            let (
86                $($use_name,)+
87            ) = __sliced;
88
89            // Create all cycles and pack handles/values into tuples
90            let (__handles, __states) = $crate::live_collections::sliced::unzip_cycles((
91                $($crate::live_collections::sliced::style::$state_style$(::<$state_ty, _>)?(& __tick, $($state_arg)?),)*
92            ));
93
94            // Unpack mutable state values
95            let (
96                $(mut $state_name,)*
97            ) = __states;
98
99            // Execute the body
100            let __body_result = {
101                $($body)*
102            };
103
104            // Re-pack the final state values and complete cycles
105            let __final_states = (
106                $($state_name,)*
107            );
108            $crate::live_collections::sliced::complete_cycles(__handles, __final_states);
109
110            // Unslice the result
111            $crate::live_collections::sliced::Unslicable::unslice(__body_result)
112        }
113    };
114}
115
116#[macro_export]
117/// Transforms a live collection with a computation relying on a slice of another live collection.
118/// This is useful for reading a snapshot of an asynchronously updated collection while processing another
119/// collection, such as joining a stream with the latest values from a singleton.
120///
121/// # Syntax
122/// The `sliced!` macro takes in a closure-like syntax specifying the live collections to be sliced
123/// and the body of the transformation. Each `use` statement indicates a live collection to be sliced,
124/// along with a non-determinism explanation. Optionally, a style can be specified to control how the
125/// live collection is sliced (e.g., atomically). All `use` statements must appear before the body.
126///
127/// ```rust,ignore
128/// let stream = sliced! {
129///     let name1 = use(collection1, nondet!(/** explanation */));
130///     let name2 = use::atomic(collection2, nondet!(/** explanation */));
131///
132///     // arbitrary statements can follow
133///     let intermediate = name1.map(...);
134///     intermediate.cross_singleton(name2)
135/// };
136/// ```
137///
138/// # Stateful Computations
139/// The `sliced!` macro also supports stateful computations across iterations using `let mut` bindings
140/// with `use::state` or `use::state_null`. These create cycles that persist values between iterations.
141///
142/// - `use::state(|l| initial)`: Creates a cycle with an initial value. The closure receives
143///   the slice location and returns the initial state for the first iteration.
144/// - `use::state_null::<Type>()`: Creates a cycle that starts as null/empty on the first iteration.
145///
146/// The mutable binding can be reassigned in the body, and the final value will be passed to the
147/// next iteration.
148///
149/// ```rust,ignore
150/// let counter_stream = sliced! {
151///     let batch = use(input_stream, nondet!(/** explanation */));
152///     let mut counter = use::state(|l| l.singleton(q!(0)));
153///
154///     // Increment counter by the number of items in this batch
155///     let new_count = counter.clone().zip(batch.count())
156///         .map(q!(|(old, add)| old + add));
157///     counter = new_count.clone();
158///     new_count.into_stream()
159/// };
160/// ```
161macro_rules! __sliced__ {
162    ($($tt:tt)*) => {
163        $crate::__sliced_parse_uses__!(
164            @uses []
165            @states []
166            $($tt)*
167        )
168    };
169}
170
171pub use crate::__sliced__ as sliced;
172
173/// Marks this live collection as atomically-yielded, which means that the output outside
174/// `sliced` will be at an atomic location that is synchronous with respect to the body
175/// of the slice.
176pub fn yield_atomic<T>(t: T) -> style::Atomic<T> {
177    style::Atomic {
178        collection: t,
179        // yield_atomic doesn't need a nondet since it's for output, not input
180        nondet: crate::nondet::NonDet,
181    }
182}
183
184/// A trait for live collections which can be sliced into bounded versions at a tick.
185pub trait Slicable<'a, L: Location<'a>> {
186    /// The sliced version of this live collection.
187    type Slice;
188
189    /// The type of backtrace associated with this slice.
190    type Backtrace;
191
192    /// Gets the preferred tick to slice at. Used for atomic slicing.
193    fn preferred_tick(&self) -> Option<Tick<L>>;
194
195    /// Gets the location associated with this live collection.
196    fn get_location(&self) -> &L;
197
198    /// Creates a tick that is appropriate for the collection's location.
199    fn create_tick(&self) -> Tick<L>
200    where
201        L: NoTick,
202    {
203        self.get_location().tick()
204    }
205
206    /// Slices this live collection at the given tick.
207    ///
208    /// # Non-Determinism
209    /// Slicing a live collection may involve non-determinism, such as choosing which messages
210    /// to include in a batch.
211    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace) -> Self::Slice;
212}
213
214/// A trait for live collections which can be yielded out of a slice back into their original form.
215pub trait Unslicable {
216    /// The unsliced version of this live collection.
217    type Unsliced;
218
219    /// Unslices a sliced live collection back into its original form.
220    fn unslice(self) -> Self::Unsliced;
221}
222
223/// A trait for unzipping a tuple of (handle, state) pairs into separate tuples.
224#[doc(hidden)]
225pub trait UnzipCycles {
226    /// The tuple of cycle handles.
227    type Handles;
228    /// The tuple of state values.
229    type States;
230
231    /// Unzips the cycles into handles and states.
232    fn unzip(self) -> (Self::Handles, Self::States);
233}
234
235/// Unzips a tuple of cycles into handles and states.
236#[doc(hidden)]
237pub fn unzip_cycles<T: UnzipCycles>(cycles: T) -> (T::Handles, T::States) {
238    cycles.unzip()
239}
240
241/// A trait for completing a tuple of cycle handles with their final state values.
242#[doc(hidden)]
243pub trait CompleteCycles<States> {
244    /// Completes all cycles with the provided state values.
245    fn complete(self, states: States);
246}
247
248/// Completes a tuple of cycle handles with their final state values.
249#[doc(hidden)]
250pub fn complete_cycles<H: CompleteCycles<S>, S>(handles: H, states: S) {
251    handles.complete(states);
252}
253
254impl<'a, L: Location<'a>> Slicable<'a, L> for () {
255    type Slice = ();
256    type Backtrace = ();
257
258    fn get_location(&self) -> &L {
259        unreachable!()
260    }
261
262    fn preferred_tick(&self) -> Option<Tick<L>> {
263        None
264    }
265
266    fn slice(self, _tick: &Tick<L>, _backtrace: Self::Backtrace) -> Self::Slice {}
267}
268
269impl Unslicable for () {
270    type Unsliced = ();
271
272    fn unslice(self) -> Self::Unsliced {}
273}
274
275macro_rules! impl_slicable_for_tuple {
276    ($($T:ident, $T_bt:ident, $idx:tt),+) => {
277        impl<'a, L: Location<'a>, $($T: Slicable<'a, L>),+> Slicable<'a, L> for ($($T,)+) {
278            type Slice = ($($T::Slice,)+);
279            type Backtrace = ($($T::Backtrace,)+);
280
281            fn get_location(&self) -> &L {
282                self.0.get_location()
283            }
284
285            fn preferred_tick(&self) -> Option<Tick<L>> {
286                let mut preferred: Option<Tick<L>> = None;
287                $(
288                    if let Some(tick) = self.$idx.preferred_tick() {
289                        preferred = Some(match preferred {
290                            Some(current) => {
291                                if $crate::location::Location::id(&current) == $crate::location::Location::id(&tick) {
292                                    current
293                                } else {
294                                    panic!("Mismatched preferred ticks for sliced collections")
295                                }
296                            },
297                            None => tick,
298                        });
299                    }
300                )+
301                preferred
302            }
303
304            #[expect(non_snake_case, reason = "macro codegen")]
305            fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace) -> Self::Slice {
306                let ($($T,)+) = self;
307                let ($($T_bt,)+) = backtrace;
308                ($($T.slice(tick, $T_bt),)+)
309            }
310        }
311
312        impl<$($T: Unslicable),+> Unslicable for ($($T,)+) {
313            type Unsliced = ($($T::Unsliced,)+);
314
315            #[expect(non_snake_case, reason = "macro codegen")]
316            fn unslice(self) -> Self::Unsliced {
317                let ($($T,)+) = self;
318                ($($T.unslice(),)+)
319            }
320        }
321    };
322}
323
324#[cfg(stageleft_runtime)]
325impl_slicable_for_tuple!(S1, S1_bt, 0);
326#[cfg(stageleft_runtime)]
327impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1);
328#[cfg(stageleft_runtime)]
329impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2);
330#[cfg(stageleft_runtime)]
331impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3);
332#[cfg(stageleft_runtime)]
333impl_slicable_for_tuple!(
334    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4
335); // 5 slices ought to be enough for anyone
336
337macro_rules! impl_cycles_for_tuple {
338    ($($H:ident, $S:ident, $idx:tt),*) => {
339        impl<$($H, $S),*> UnzipCycles for ($(($H, $S),)*) {
340            type Handles = ($($H,)*);
341            type States = ($($S,)*);
342
343            #[expect(clippy::allow_attributes, reason = "macro codegen")]
344            #[allow(non_snake_case, reason = "macro codegen")]
345            fn unzip(self) -> (Self::Handles, Self::States) {
346                let ($($H,)*) = self;
347                (
348                    ($($H.0,)*),
349                    ($($H.1,)*),
350                )
351            }
352        }
353
354        impl<$($H: crate::forward_handle::CompleteCycle<$S>, $S),*> CompleteCycles<($($S,)*)> for ($($H,)*) {
355            #[expect(clippy::allow_attributes, reason = "macro codegen")]
356            #[allow(non_snake_case, reason = "macro codegen")]
357            fn complete(self, states: ($($S,)*)) {
358                let ($($H,)*) = self;
359                let ($($S,)*) = states;
360                $($H.complete_next_tick($S);)*
361            }
362        }
363    };
364}
365
366#[cfg(stageleft_runtime)]
367impl_cycles_for_tuple!();
368#[cfg(stageleft_runtime)]
369impl_cycles_for_tuple!(H1, S1, 0);
370#[cfg(stageleft_runtime)]
371impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1);
372#[cfg(stageleft_runtime)]
373impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2);
374#[cfg(stageleft_runtime)]
375impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3);
376#[cfg(stageleft_runtime)]
377impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4);
378
379// Unslicable implementations for plain collections (used when returning from sliced! body)
380impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Unslicable
381    for super::Stream<T, Tick<L>, Bounded, O, R>
382{
383    type Unsliced = super::Stream<T, L, Unbounded, O, R>;
384
385    fn unslice(self) -> Self::Unsliced {
386        self.all_ticks()
387    }
388}
389
390impl<'a, T, L: Location<'a>> Unslicable for super::Singleton<T, Tick<L>, Bounded> {
391    type Unsliced = super::Singleton<T, L, Unbounded>;
392
393    fn unslice(self) -> Self::Unsliced {
394        self.latest()
395    }
396}
397
398impl<'a, T, L: Location<'a>> Unslicable for super::Optional<T, Tick<L>, Bounded> {
399    type Unsliced = super::Optional<T, L, Unbounded>;
400
401    fn unslice(self) -> Self::Unsliced {
402        self.latest()
403    }
404}
405
406impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Unslicable
407    for super::KeyedStream<K, V, Tick<L>, Bounded, O, R>
408{
409    type Unsliced = super::KeyedStream<K, V, L, Unbounded, O, R>;
410
411    fn unslice(self) -> Self::Unsliced {
412        self.all_ticks()
413    }
414}
415
416// Unslicable implementations for Atomic-wrapped bounded collections
417impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Unslicable
418    for style::Atomic<super::Stream<T, Tick<L>, Bounded, O, R>>
419{
420    type Unsliced = super::Stream<T, crate::location::Atomic<L>, Unbounded, O, R>;
421
422    fn unslice(self) -> Self::Unsliced {
423        self.collection.all_ticks_atomic()
424    }
425}
426
427impl<'a, T, L: Location<'a> + NoTick> Unslicable
428    for style::Atomic<super::Singleton<T, Tick<L>, Bounded>>
429{
430    type Unsliced = super::Singleton<T, crate::location::Atomic<L>, Unbounded>;
431
432    fn unslice(self) -> Self::Unsliced {
433        self.collection.latest_atomic()
434    }
435}
436
437impl<'a, T, L: Location<'a> + NoTick> Unslicable
438    for style::Atomic<super::Optional<T, Tick<L>, Bounded>>
439{
440    type Unsliced = super::Optional<T, crate::location::Atomic<L>, Unbounded>;
441
442    fn unslice(self) -> Self::Unsliced {
443        self.collection.latest_atomic()
444    }
445}
446
447impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries> Unslicable
448    for style::Atomic<super::KeyedStream<K, V, Tick<L>, Bounded, O, R>>
449{
450    type Unsliced = super::KeyedStream<K, V, crate::location::Atomic<L>, Unbounded, O, R>;
451
452    fn unslice(self) -> Self::Unsliced {
453        self.collection.all_ticks_atomic()
454    }
455}
456
457#[cfg(feature = "sim")]
458#[cfg(test)]
459mod tests {
460    use stageleft::q;
461
462    use super::sliced;
463    use crate::location::Location;
464    use crate::nondet::nondet;
465    use crate::prelude::FlowBuilder;
466
467    /// Test a counter using `use::state` with an initial singleton value.
468    /// Each input increments the counter, and we verify the output after each tick.
469    #[test]
470    fn sim_state_counter() {
471        let mut flow = FlowBuilder::new();
472        let node = flow.process::<()>();
473
474        let (input_send, input) = node.sim_input::<i32, _, _>();
475
476        let out_recv = sliced! {
477            let batch = use(input, nondet!(/** test */));
478            let mut counter = use::state(|l| l.singleton(q!(0)));
479
480            let new_count = counter.clone().zip(batch.count())
481                .map(q!(|(old, add)| old + add));
482            counter = new_count.clone();
483            new_count.into_stream()
484        }
485        .sim_output();
486
487        flow.sim().exhaustive(async || {
488            input_send.send(1);
489            assert_eq!(out_recv.next().await.unwrap(), 1);
490
491            input_send.send(1);
492            assert_eq!(out_recv.next().await.unwrap(), 2);
493
494            input_send.send(1);
495            assert_eq!(out_recv.next().await.unwrap(), 3);
496        });
497    }
498
499    /// Test `use::state_null` with an Optional that starts as None.
500    #[cfg(feature = "sim")]
501    #[test]
502    fn sim_state_null_optional() {
503        use crate::live_collections::Optional;
504        use crate::live_collections::boundedness::Bounded;
505        use crate::location::{Location, Tick};
506
507        let mut flow = FlowBuilder::new();
508        let node = flow.process::<()>();
509
510        let (input_send, input) = node.sim_input::<i32, _, _>();
511
512        let out_recv = sliced! {
513            let batch = use(input, nondet!(/** test */));
514            let mut prev = use::state_null::<Optional<i32, Tick<_>, Bounded>>();
515
516            // Output the previous value (or -1 if none)
517            let output = prev.clone().unwrap_or(prev.location().singleton(q!(-1)));
518            // Store the current batch's first value for next tick
519            prev = batch.first();
520            output.into_stream()
521        }
522        .sim_output();
523
524        flow.sim().exhaustive(async || {
525            input_send.send(10);
526            // First tick: prev is None, so output is -1
527            assert_eq!(out_recv.next().await.unwrap(), -1);
528
529            input_send.send(20);
530            // Second tick: prev is Some(10), so output is 10
531            assert_eq!(out_recv.next().await.unwrap(), 10);
532
533            input_send.send(30);
534            // Third tick: prev is Some(20), so output is 20
535            assert_eq!(out_recv.next().await.unwrap(), 20);
536        });
537    }
538
539    /// Test atomic slicing with keyed streams.
540    #[test]
541    fn sim_sliced_atomic_keyed_stream() {
542        let mut flow = FlowBuilder::new();
543        let node = flow.process::<()>();
544
545        let (input_send, input) = node.sim_input::<(i32, i32), _, _>();
546        let tick = node.tick();
547        let atomic_keyed_input = input.into_keyed().atomic(&tick);
548        let accumulated_inputs = atomic_keyed_input
549            .clone()
550            .assume_ordering(nondet!(/** Test */))
551            .fold(
552                q!(|| 0),
553                q!(|curr, new| {
554                    *curr += new;
555                }),
556            );
557
558        let out_recv = sliced! {
559            let atomic_keyed_input = use::atomic(atomic_keyed_input, nondet!(/** test */));
560            let accumulated_inputs = use::atomic(accumulated_inputs, nondet!(/** test */));
561            accumulated_inputs.join_keyed_stream(atomic_keyed_input)
562                .map(q!(|(sum, _input)| sum))
563                .entries()
564        }
565        .assume_ordering_trusted(nondet!(/** test */))
566        .sim_output();
567
568        flow.sim().exhaustive(async || {
569            input_send.send((1, 1));
570            assert_eq!(out_recv.next().await.unwrap(), (1, 1));
571
572            input_send.send((1, 2));
573            assert_eq!(out_recv.next().await.unwrap(), (1, 3));
574
575            input_send.send((2, 1));
576            assert_eq!(out_recv.next().await.unwrap(), (2, 1));
577
578            input_send.send((1, 3));
579            assert_eq!(out_recv.next().await.unwrap(), (1, 6));
580        });
581    }
582}