hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A single Rust value that can asynchronously change over time.
25///
26/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
27/// [`Unbounded`], the value will asynchronously change over time.
28///
29/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
30/// a single number that will asynchronously change as events are processed. Singletons also appear
31/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
32/// such as getting the length of a batch of requests.
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this singleton
36/// - `Loc`: the [`Location`] where the singleton is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Singleton<Type, Loc, Bound: Boundedness> {
39    pub(crate) location: Loc,
40    pub(crate) ir_node: RefCell<HydroNode>,
41
42    _phantom: PhantomData<(Type, Loc, Bound)>,
43}
44
45impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
46where
47    L: Location<'a>,
48{
49    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
50        Singleton::new(singleton.location, singleton.ir_node.into_inner())
51    }
52}
53
54impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
55where
56    L: Location<'a>,
57{
58    type Location = Tick<L>;
59
60    fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
61        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
62            location.clone(),
63            HydroNode::DeferTick {
64                input: Box::new(HydroNode::CycleSource {
65                    ident,
66                    metadata: location.new_node_metadata(Self::collection_kind()),
67                }),
68                metadata: location
69                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
70            },
71        );
72
73        from_previous_tick.unwrap_or(initial)
74    }
75}
76
77impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
78where
79    L: Location<'a>,
80{
81    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
82        assert_eq!(
83            Location::id(&self.location),
84            expected_location,
85            "locations do not match"
86        );
87        self.location
88            .flow_state()
89            .borrow_mut()
90            .push_root(HydroRoot::CycleSink {
91                ident,
92                input: Box::new(self.ir_node.into_inner()),
93                op_metadata: HydroIrOpMetadata::new(),
94            });
95    }
96}
97
98impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
99where
100    L: Location<'a>,
101{
102    type Location = Tick<L>;
103
104    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
105        Singleton::new(
106            location.clone(),
107            HydroNode::CycleSource {
108                ident,
109                metadata: location.new_node_metadata(Self::collection_kind()),
110            },
111        )
112    }
113}
114
115impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
116where
117    L: Location<'a>,
118{
119    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
120        assert_eq!(
121            Location::id(&self.location),
122            expected_location,
123            "locations do not match"
124        );
125        self.location
126            .flow_state()
127            .borrow_mut()
128            .push_root(HydroRoot::CycleSink {
129                ident,
130                input: Box::new(self.ir_node.into_inner()),
131                op_metadata: HydroIrOpMetadata::new(),
132            });
133    }
134}
135
136impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
137where
138    L: Location<'a> + NoTick,
139{
140    type Location = L;
141
142    fn create_source(ident: syn::Ident, location: L) -> Self {
143        Singleton::new(
144            location.clone(),
145            HydroNode::CycleSource {
146                ident,
147                metadata: location.new_node_metadata(Self::collection_kind()),
148            },
149        )
150    }
151}
152
153impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
154where
155    L: Location<'a> + NoTick,
156{
157    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
158        assert_eq!(
159            Location::id(&self.location),
160            expected_location,
161            "locations do not match"
162        );
163        self.location
164            .flow_state()
165            .borrow_mut()
166            .push_root(HydroRoot::CycleSink {
167                ident,
168                input: Box::new(self.ir_node.into_inner()),
169                op_metadata: HydroIrOpMetadata::new(),
170            });
171    }
172}
173
174impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
175where
176    T: Clone,
177    L: Location<'a>,
178{
179    fn clone(&self) -> Self {
180        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
181            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
182            *self.ir_node.borrow_mut() = HydroNode::Tee {
183                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
184                metadata: self.location.new_node_metadata(Self::collection_kind()),
185            };
186        }
187
188        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
189            Singleton {
190                location: self.location.clone(),
191                ir_node: HydroNode::Tee {
192                    inner: TeeNode(inner.0.clone()),
193                    metadata: metadata.clone(),
194                }
195                .into(),
196                _phantom: PhantomData,
197            }
198        } else {
199            unreachable!()
200        }
201    }
202}
203
204#[cfg(stageleft_runtime)]
205fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
206    me: Singleton<T, Tick<L>, B>,
207    other: O,
208) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
209where
210    Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
211{
212    check_matching_location(
213        &me.location,
214        &Singleton::<T, Tick<L>, B>::other_location(&other),
215    );
216
217    Singleton::<T, Tick<L>, B>::make(
218        me.location.clone(),
219        HydroNode::CrossSingleton {
220            left: Box::new(me.ir_node.into_inner()),
221            right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
222            metadata: me.location.new_node_metadata(CollectionKind::Singleton {
223                bound: B::BOUND_KIND,
224                element_type: stageleft::quote_type::<
225                    <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
226                >()
227                .into(),
228            }),
229        },
230    )
231}
232
233impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
234where
235    L: Location<'a>,
236{
237    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
238        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
239        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
240        Singleton {
241            location,
242            ir_node: RefCell::new(ir_node),
243            _phantom: PhantomData,
244        }
245    }
246
247    pub(crate) fn collection_kind() -> CollectionKind {
248        CollectionKind::Singleton {
249            bound: B::BOUND_KIND,
250            element_type: stageleft::quote_type::<T>().into(),
251        }
252    }
253
254    /// Returns the [`Location`] where this singleton is being materialized.
255    pub fn location(&self) -> &L {
256        &self.location
257    }
258
259    /// Transforms the singleton value by applying a function `f` to it,
260    /// continuously as the input is updated.
261    ///
262    /// # Example
263    /// ```rust
264    /// # #[cfg(feature = "deploy")] {
265    /// # use hydro_lang::prelude::*;
266    /// # use futures::StreamExt;
267    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
268    /// let tick = process.tick();
269    /// let singleton = tick.singleton(q!(5));
270    /// singleton.map(q!(|v| v * 2)).all_ticks()
271    /// # }, |mut stream| async move {
272    /// // 10
273    /// # assert_eq!(stream.next().await.unwrap(), 10);
274    /// # }));
275    /// # }
276    /// ```
277    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
278    where
279        F: Fn(T) -> U + 'a,
280    {
281        let f = f.splice_fn1_ctx(&self.location).into();
282        Singleton::new(
283            self.location.clone(),
284            HydroNode::Map {
285                f,
286                input: Box::new(self.ir_node.into_inner()),
287                metadata: self
288                    .location
289                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
290            },
291        )
292    }
293
294    /// Transforms the singleton value by applying a function `f` to it and then flattening
295    /// the result into a stream, preserving the order of elements.
296    ///
297    /// The function `f` is applied to the singleton value to produce an iterator, and all items
298    /// from that iterator are emitted in the output stream in deterministic order.
299    ///
300    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
301    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
302    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
303    ///
304    /// # Example
305    /// ```rust
306    /// # #[cfg(feature = "deploy")] {
307    /// # use hydro_lang::prelude::*;
308    /// # use futures::StreamExt;
309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310    /// let tick = process.tick();
311    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
312    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
313    /// # }, |mut stream| async move {
314    /// // 1, 2, 3
315    /// # for w in vec![1, 2, 3] {
316    /// #     assert_eq!(stream.next().await.unwrap(), w);
317    /// # }
318    /// # }));
319    /// # }
320    /// ```
321    pub fn flat_map_ordered<U, I, F>(
322        self,
323        f: impl IntoQuotedMut<'a, F, L>,
324    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
325    where
326        I: IntoIterator<Item = U>,
327        F: Fn(T) -> I + 'a,
328    {
329        let f = f.splice_fn1_ctx(&self.location).into();
330        Stream::new(
331            self.location.clone(),
332            HydroNode::FlatMap {
333                f,
334                input: Box::new(self.ir_node.into_inner()),
335                metadata: self.location.new_node_metadata(
336                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
337                ),
338            },
339        )
340    }
341
342    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
343    /// for the output type `I` to produce items in any order.
344    ///
345    /// The function `f` is applied to the singleton value to produce an iterator, and all items
346    /// from that iterator are emitted in the output stream in non-deterministic order.
347    ///
348    /// # Example
349    /// ```rust
350    /// # #[cfg(feature = "deploy")] {
351    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
352    /// # use futures::StreamExt;
353    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
354    /// let tick = process.tick();
355    /// let singleton = tick.singleton(q!(
356    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
357    /// ));
358    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
359    /// # }, |mut stream| async move {
360    /// // 1, 2, 3, but in no particular order
361    /// # let mut results = Vec::new();
362    /// # for _ in 0..3 {
363    /// #     results.push(stream.next().await.unwrap());
364    /// # }
365    /// # results.sort();
366    /// # assert_eq!(results, vec![1, 2, 3]);
367    /// # }));
368    /// # }
369    /// ```
370    pub fn flat_map_unordered<U, I, F>(
371        self,
372        f: impl IntoQuotedMut<'a, F, L>,
373    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
374    where
375        I: IntoIterator<Item = U>,
376        F: Fn(T) -> I + 'a,
377    {
378        let f = f.splice_fn1_ctx(&self.location).into();
379        Stream::new(
380            self.location.clone(),
381            HydroNode::FlatMap {
382                f,
383                input: Box::new(self.ir_node.into_inner()),
384                metadata: self
385                    .location
386                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
387            },
388        )
389    }
390
391    /// Flattens the singleton value into a stream, preserving the order of elements.
392    ///
393    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
394    /// are emitted in the output stream in deterministic order.
395    ///
396    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
397    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
398    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
399    ///
400    /// # Example
401    /// ```rust
402    /// # #[cfg(feature = "deploy")] {
403    /// # use hydro_lang::prelude::*;
404    /// # use futures::StreamExt;
405    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
406    /// let tick = process.tick();
407    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
408    /// singleton.flatten_ordered().all_ticks()
409    /// # }, |mut stream| async move {
410    /// // 1, 2, 3
411    /// # for w in vec![1, 2, 3] {
412    /// #     assert_eq!(stream.next().await.unwrap(), w);
413    /// # }
414    /// # }));
415    /// # }
416    /// ```
417    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
418    where
419        T: IntoIterator<Item = U>,
420    {
421        self.flat_map_ordered(q!(|x| x))
422    }
423
424    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
425    /// for the element type `T` to produce items in any order.
426    ///
427    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
428    /// are emitted in the output stream in non-deterministic order.
429    ///
430    /// # Example
431    /// ```rust
432    /// # #[cfg(feature = "deploy")] {
433    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
434    /// # use futures::StreamExt;
435    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
436    /// let tick = process.tick();
437    /// let singleton = tick.singleton(q!(
438    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
439    /// ));
440    /// singleton.flatten_unordered().all_ticks()
441    /// # }, |mut stream| async move {
442    /// // 1, 2, 3, but in no particular order
443    /// # let mut results = Vec::new();
444    /// # for _ in 0..3 {
445    /// #     results.push(stream.next().await.unwrap());
446    /// # }
447    /// # results.sort();
448    /// # assert_eq!(results, vec![1, 2, 3]);
449    /// # }));
450    /// # }
451    /// ```
452    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
453    where
454        T: IntoIterator<Item = U>,
455    {
456        self.flat_map_unordered(q!(|x| x))
457    }
458
459    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
460    ///
461    /// If the predicate returns `true`, the output optional contains the same value.
462    /// If the predicate returns `false`, the output optional is empty.
463    ///
464    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
465    /// not modify or take ownership of the value. If you need to modify the value while filtering
466    /// use [`Singleton::filter_map`] instead.
467    ///
468    /// # Example
469    /// ```rust
470    /// # #[cfg(feature = "deploy")] {
471    /// # use hydro_lang::prelude::*;
472    /// # use futures::StreamExt;
473    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
474    /// let tick = process.tick();
475    /// let singleton = tick.singleton(q!(5));
476    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
477    /// # }, |mut stream| async move {
478    /// // 5
479    /// # assert_eq!(stream.next().await.unwrap(), 5);
480    /// # }));
481    /// # }
482    /// ```
483    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
484    where
485        F: Fn(&T) -> bool + 'a,
486    {
487        let f = f.splice_fn1_borrow_ctx(&self.location).into();
488        Optional::new(
489            self.location.clone(),
490            HydroNode::Filter {
491                f,
492                input: Box::new(self.ir_node.into_inner()),
493                metadata: self
494                    .location
495                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
496            },
497        )
498    }
499
500    /// An operator that both filters and maps. It yields the value only if the supplied
501    /// closure `f` returns `Some(value)`.
502    ///
503    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
504    /// If the closure returns `None`, the output optional is empty.
505    ///
506    /// # Example
507    /// ```rust
508    /// # #[cfg(feature = "deploy")] {
509    /// # use hydro_lang::prelude::*;
510    /// # use futures::StreamExt;
511    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
512    /// let tick = process.tick();
513    /// let singleton = tick.singleton(q!("42"));
514    /// singleton
515    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
516    ///     .all_ticks()
517    /// # }, |mut stream| async move {
518    /// // 42
519    /// # assert_eq!(stream.next().await.unwrap(), 42);
520    /// # }));
521    /// # }
522    /// ```
523    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
524    where
525        F: Fn(T) -> Option<U> + 'a,
526    {
527        let f = f.splice_fn1_ctx(&self.location).into();
528        Optional::new(
529            self.location.clone(),
530            HydroNode::FilterMap {
531                f,
532                input: Box::new(self.ir_node.into_inner()),
533                metadata: self
534                    .location
535                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
536            },
537        )
538    }
539
540    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
541    ///
542    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
543    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
544    /// non-null. This is useful for combining several pieces of state together.
545    ///
546    /// # Example
547    /// ```rust
548    /// # #[cfg(feature = "deploy")] {
549    /// # use hydro_lang::prelude::*;
550    /// # use futures::StreamExt;
551    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
552    /// let tick = process.tick();
553    /// let numbers = process
554    ///   .source_iter(q!(vec![123, 456]))
555    ///   .batch(&tick, nondet!(/** test */));
556    /// let count = numbers.clone().count(); // Singleton
557    /// let max = numbers.max(); // Optional
558    /// count.zip(max).all_ticks()
559    /// # }, |mut stream| async move {
560    /// // [(2, 456)]
561    /// # for w in vec![(2, 456)] {
562    /// #     assert_eq!(stream.next().await.unwrap(), w);
563    /// # }
564    /// # }));
565    /// # }
566    /// ```
567    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
568    where
569        Self: ZipResult<'a, O, Location = L>,
570    {
571        check_matching_location(&self.location, &Self::other_location(&other));
572
573        if L::is_top_level()
574            && let Some(tick) = self.location.try_tick()
575        {
576            let out = zip_inside_tick(
577                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
578                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
579                    Self::other_location(&other),
580                    Self::other_ir_node(other),
581                )
582                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
583            )
584            .latest();
585
586            Self::make(out.location, out.ir_node.into_inner())
587        } else {
588            Self::make(
589                self.location.clone(),
590                HydroNode::CrossSingleton {
591                    left: Box::new(self.ir_node.into_inner()),
592                    right: Box::new(Self::other_ir_node(other)),
593                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
594                        bound: B::BOUND_KIND,
595                        element_type: stageleft::quote_type::<
596                            <Self as ZipResult<'a, O>>::ElementType,
597                        >()
598                        .into(),
599                    }),
600                },
601            )
602        }
603    }
604
605    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
606    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
607    ///
608    /// Useful for conditionally processing, such as only emitting a singleton's value outside
609    /// a tick if some other condition is satisfied.
610    ///
611    /// # Example
612    /// ```rust
613    /// # #[cfg(feature = "deploy")] {
614    /// # use hydro_lang::prelude::*;
615    /// # use futures::StreamExt;
616    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
617    /// let tick = process.tick();
618    /// // ticks are lazy by default, forces the second tick to run
619    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
620    ///
621    /// let batch_first_tick = process
622    ///   .source_iter(q!(vec![1]))
623    ///   .batch(&tick, nondet!(/** test */));
624    /// let batch_second_tick = process
625    ///   .source_iter(q!(vec![1, 2, 3]))
626    ///   .batch(&tick, nondet!(/** test */))
627    ///   .defer_tick(); // appears on the second tick
628    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
629    /// batch_first_tick.chain(batch_second_tick).count()
630    ///   .filter_if_some(some_on_first_tick)
631    ///   .all_ticks()
632    /// # }, |mut stream| async move {
633    /// // [1]
634    /// # for w in vec![1] {
635    /// #     assert_eq!(stream.next().await.unwrap(), w);
636    /// # }
637    /// # }));
638    /// # }
639    /// ```
640    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
641        self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
642            .map(q!(|(d, _signal)| d))
643    }
644
645    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
646    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
647    ///
648    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
649    /// the condition.
650    ///
651    /// # Example
652    /// ```rust
653    /// # #[cfg(feature = "deploy")] {
654    /// # use hydro_lang::prelude::*;
655    /// # use futures::StreamExt;
656    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
657    /// let tick = process.tick();
658    /// // ticks are lazy by default, forces the second tick to run
659    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
660    ///
661    /// let batch_first_tick = process
662    ///   .source_iter(q!(vec![1]))
663    ///   .batch(&tick, nondet!(/** test */));
664    /// let batch_second_tick = process
665    ///   .source_iter(q!(vec![1, 2, 3]))
666    ///   .batch(&tick, nondet!(/** test */))
667    ///   .defer_tick(); // appears on the second tick
668    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
669    /// batch_first_tick.chain(batch_second_tick).count()
670    ///   .filter_if_none(some_on_first_tick)
671    ///   .all_ticks()
672    /// # }, |mut stream| async move {
673    /// // [3]
674    /// # for w in vec![3] {
675    /// #     assert_eq!(stream.next().await.unwrap(), w);
676    /// # }
677    /// # }));
678    /// # }
679    /// ```
680    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
681        self.filter_if_some(
682            other
683                .map(q!(|_| ()))
684                .into_singleton()
685                .filter(q!(|o| o.is_none())),
686        )
687    }
688
689    /// An operator which allows you to "name" a `HydroNode`.
690    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
691    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
692        {
693            let mut node = self.ir_node.borrow_mut();
694            let metadata = node.metadata_mut();
695            metadata.tag = Some(name.to_string());
696        }
697        self
698    }
699}
700
701impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
702where
703    L: Location<'a> + NoTick,
704{
705    /// Returns a singleton value corresponding to the latest snapshot of the singleton
706    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
707    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
708    /// all snapshots of this singleton into the atomic-associated tick will observe the
709    /// same value each tick.
710    ///
711    /// # Non-Determinism
712    /// Because this picks a snapshot of a singleton whose value is continuously changing,
713    /// the output singleton has a non-deterministic value since the snapshot can be at an
714    /// arbitrary point in time.
715    pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
716        Singleton::new(
717            self.location.clone().tick,
718            HydroNode::Batch {
719                inner: Box::new(self.ir_node.into_inner()),
720                metadata: self
721                    .location
722                    .tick
723                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
724            },
725        )
726    }
727
728    /// Returns this singleton back into a top-level, asynchronous execution context where updates
729    /// to the value will be asynchronously propagated.
730    pub fn end_atomic(self) -> Singleton<T, L, B> {
731        Singleton::new(
732            self.location.tick.l.clone(),
733            HydroNode::EndAtomic {
734                inner: Box::new(self.ir_node.into_inner()),
735                metadata: self
736                    .location
737                    .tick
738                    .l
739                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
740            },
741        )
742    }
743}
744
745impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
746where
747    L: Location<'a>,
748{
749    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
750    /// will observe the same version of the value and will be executed synchronously before any
751    /// outputs are yielded (in [`Optional::end_atomic`]).
752    ///
753    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
754    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
755    /// a different version).
756    ///
757    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
758    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
759    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
760    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
761        let out_location = Atomic { tick: tick.clone() };
762        Singleton::new(
763            out_location.clone(),
764            HydroNode::BeginAtomic {
765                inner: Box::new(self.ir_node.into_inner()),
766                metadata: out_location
767                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
768            },
769        )
770    }
771
772    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
773    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
774    /// relevant data that contributed to the snapshot at tick `t`.
775    ///
776    /// # Non-Determinism
777    /// Because this picks a snapshot of a singleton whose value is continuously changing,
778    /// the output singleton has a non-deterministic value since the snapshot can be at an
779    /// arbitrary point in time.
780    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
781        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
782        Singleton::new(
783            tick.clone(),
784            HydroNode::Batch {
785                inner: Box::new(self.ir_node.into_inner()),
786                metadata: tick
787                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
788            },
789        )
790    }
791
792    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
793    /// with order corresponding to increasing prefixes of data contributing to the singleton.
794    ///
795    /// # Non-Determinism
796    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
797    /// to non-deterministic batching and arrival of inputs, the output stream is
798    /// non-deterministic.
799    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
800    where
801        L: NoTick,
802    {
803        sliced! {
804            let snapshot = use(self, nondet);
805            snapshot.into_stream()
806        }
807        .weakest_retries()
808    }
809
810    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
811    /// value taken at various points in time. Because the input singleton may be
812    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
813    /// represent the value of the singleton given some prefix of the streams leading up to
814    /// it.
815    ///
816    /// # Non-Determinism
817    /// The output stream is non-deterministic in which elements are sampled, since this
818    /// is controlled by a clock.
819    pub fn sample_every(
820        self,
821        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
822        nondet: NonDet,
823    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
824    where
825        L: NoTick + NoAtomic,
826    {
827        let samples = self.location.source_interval(interval, nondet);
828        sliced! {
829            let snapshot = use(self, nondet);
830            let sample_batch = use(samples, nondet);
831
832            snapshot.filter_if_some(sample_batch.first()).into_stream()
833        }
834        .weakest_retries()
835    }
836}
837
838impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
839where
840    L: Location<'a>,
841{
842    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
843    /// which will stream the value computed in _each_ tick as a separate stream element.
844    ///
845    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
846    /// producing one element in the output for each tick. This is useful for batched computations,
847    /// where the results from each tick must be combined together.
848    ///
849    /// # Example
850    /// ```rust
851    /// # #[cfg(feature = "deploy")] {
852    /// # use hydro_lang::prelude::*;
853    /// # use futures::StreamExt;
854    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
855    /// let tick = process.tick();
856    /// # // ticks are lazy by default, forces the second tick to run
857    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
858    /// # let batch_first_tick = process
859    /// #   .source_iter(q!(vec![1]))
860    /// #   .batch(&tick, nondet!(/** test */));
861    /// # let batch_second_tick = process
862    /// #   .source_iter(q!(vec![1, 2, 3]))
863    /// #   .batch(&tick, nondet!(/** test */))
864    /// #   .defer_tick(); // appears on the second tick
865    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
866    /// input_batch // first tick: [1], second tick: [1, 2, 3]
867    ///     .count()
868    ///     .all_ticks()
869    /// # }, |mut stream| async move {
870    /// // [1, 3]
871    /// # for w in vec![1, 3] {
872    /// #     assert_eq!(stream.next().await.unwrap(), w);
873    /// # }
874    /// # }));
875    /// # }
876    /// ```
877    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
878        self.into_stream().all_ticks()
879    }
880
881    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
882    /// which will stream the value computed in _each_ tick as a separate stream element.
883    ///
884    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
885    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
886    /// singleton's [`Tick`] context.
887    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
888        self.into_stream().all_ticks_atomic()
889    }
890
891    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
892    /// be asynchronously updated with the latest value of the singleton inside the tick.
893    ///
894    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
895    /// tick that tracks the inner value. This is useful for getting the value as of the
896    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
897    ///
898    /// # Example
899    /// ```rust
900    /// # #[cfg(feature = "deploy")] {
901    /// # use hydro_lang::prelude::*;
902    /// # use futures::StreamExt;
903    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
904    /// let tick = process.tick();
905    /// # // ticks are lazy by default, forces the second tick to run
906    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
907    /// # let batch_first_tick = process
908    /// #   .source_iter(q!(vec![1]))
909    /// #   .batch(&tick, nondet!(/** test */));
910    /// # let batch_second_tick = process
911    /// #   .source_iter(q!(vec![1, 2, 3]))
912    /// #   .batch(&tick, nondet!(/** test */))
913    /// #   .defer_tick(); // appears on the second tick
914    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
915    /// input_batch // first tick: [1], second tick: [1, 2, 3]
916    ///     .count()
917    ///     .latest()
918    /// # .sample_eager(nondet!(/** test */))
919    /// # }, |mut stream| async move {
920    /// // asynchronously changes from 1 ~> 3
921    /// # for w in vec![1, 3] {
922    /// #     assert_eq!(stream.next().await.unwrap(), w);
923    /// # }
924    /// # }));
925    /// # }
926    /// ```
927    pub fn latest(self) -> Singleton<T, L, Unbounded> {
928        Singleton::new(
929            self.location.outer().clone(),
930            HydroNode::YieldConcat {
931                inner: Box::new(self.ir_node.into_inner()),
932                metadata: self
933                    .location
934                    .outer()
935                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
936            },
937        )
938    }
939
940    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
941    /// be updated with the latest value of the singleton inside the tick.
942    ///
943    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
944    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
945    /// singleton's [`Tick`] context.
946    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
947        let out_location = Atomic {
948            tick: self.location.clone(),
949        };
950        Singleton::new(
951            out_location.clone(),
952            HydroNode::YieldConcat {
953                inner: Box::new(self.ir_node.into_inner()),
954                metadata: out_location
955                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
956            },
957        )
958    }
959
960    /// Converts this singleton into a [`Stream`] containing a single element, the value.
961    ///
962    /// # Example
963    /// ```rust
964    /// # #[cfg(feature = "deploy")] {
965    /// # use hydro_lang::prelude::*;
966    /// # use futures::StreamExt;
967    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
968    /// let tick = process.tick();
969    /// let batch_input = process
970    ///   .source_iter(q!(vec![123, 456]))
971    ///   .batch(&tick, nondet!(/** test */));
972    /// batch_input.clone().chain(
973    ///   batch_input.count().into_stream()
974    /// ).all_ticks()
975    /// # }, |mut stream| async move {
976    /// // [123, 456, 2]
977    /// # for w in vec![123, 456, 2] {
978    /// #     assert_eq!(stream.next().await.unwrap(), w);
979    /// # }
980    /// # }));
981    /// # }
982    /// ```
983    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
984        Stream::new(
985            self.location.clone(),
986            HydroNode::Cast {
987                inner: Box::new(self.ir_node.into_inner()),
988                metadata: self.location.new_node_metadata(Stream::<
989                    T,
990                    Tick<L>,
991                    Bounded,
992                    TotalOrder,
993                    ExactlyOnce,
994                >::collection_kind()),
995            },
996        )
997    }
998}
999
1000#[doc(hidden)]
1001/// Helper trait that determines the output collection type for [`Singleton::zip`].
1002///
1003/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1004/// [`Singleton`].
1005#[sealed::sealed]
1006pub trait ZipResult<'a, Other> {
1007    /// The output collection type.
1008    type Out;
1009    /// The type of the tupled output value.
1010    type ElementType;
1011    /// The type of the other collection's value.
1012    type OtherType;
1013    /// The location where the tupled result will be materialized.
1014    type Location: Location<'a>;
1015
1016    /// The location of the second input to the `zip`.
1017    fn other_location(other: &Other) -> Self::Location;
1018    /// The IR node of the second input to the `zip`.
1019    fn other_ir_node(other: Other) -> HydroNode;
1020
1021    /// Constructs the output live collection given an IR node containing the zip result.
1022    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1023}
1024
1025#[sealed::sealed]
1026impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1027where
1028    L: Location<'a>,
1029{
1030    type Out = Singleton<(T, U), L, B>;
1031    type ElementType = (T, U);
1032    type OtherType = U;
1033    type Location = L;
1034
1035    fn other_location(other: &Singleton<U, L, B>) -> L {
1036        other.location.clone()
1037    }
1038
1039    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1040        other.ir_node.into_inner()
1041    }
1042
1043    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1044        Singleton::new(
1045            location.clone(),
1046            HydroNode::Cast {
1047                inner: Box::new(ir_node),
1048                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1049            },
1050        )
1051    }
1052}
1053
1054#[sealed::sealed]
1055impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1056where
1057    L: Location<'a>,
1058{
1059    type Out = Optional<(T, U), L, B>;
1060    type ElementType = (T, U);
1061    type OtherType = U;
1062    type Location = L;
1063
1064    fn other_location(other: &Optional<U, L, B>) -> L {
1065        other.location.clone()
1066    }
1067
1068    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1069        other.ir_node.into_inner()
1070    }
1071
1072    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1073        Optional::new(location, ir_node)
1074    }
1075}
1076
1077#[cfg(test)]
1078mod tests {
1079    #[cfg(feature = "deploy")]
1080    use futures::{SinkExt, StreamExt};
1081    #[cfg(feature = "deploy")]
1082    use hydro_deploy::Deployment;
1083    #[cfg(any(feature = "deploy", feature = "sim"))]
1084    use stageleft::q;
1085
1086    #[cfg(any(feature = "deploy", feature = "sim"))]
1087    use crate::compile::builder::FlowBuilder;
1088    #[cfg(feature = "deploy")]
1089    use crate::live_collections::stream::ExactlyOnce;
1090    #[cfg(any(feature = "deploy", feature = "sim"))]
1091    use crate::location::Location;
1092    #[cfg(any(feature = "deploy", feature = "sim"))]
1093    use crate::nondet::nondet;
1094
1095    #[cfg(feature = "deploy")]
1096    #[tokio::test]
1097    async fn tick_cycle_cardinality() {
1098        let mut deployment = Deployment::new();
1099
1100        let flow = FlowBuilder::new();
1101        let node = flow.process::<()>();
1102        let external = flow.external::<()>();
1103
1104        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1105
1106        let node_tick = node.tick();
1107        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1108        let counts = singleton
1109            .clone()
1110            .into_stream()
1111            .count()
1112            .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1113            .all_ticks()
1114            .send_bincode_external(&external);
1115        complete_cycle.complete_next_tick(singleton);
1116
1117        let nodes = flow
1118            .with_process(&node, deployment.Localhost())
1119            .with_external(&external, deployment.Localhost())
1120            .deploy(&mut deployment);
1121
1122        deployment.deploy().await.unwrap();
1123
1124        let mut tick_trigger = nodes.connect(input_send).await;
1125        let mut external_out = nodes.connect(counts).await;
1126
1127        deployment.start().await.unwrap();
1128
1129        tick_trigger.send(()).await.unwrap();
1130
1131        assert_eq!(external_out.next().await.unwrap(), 1);
1132
1133        tick_trigger.send(()).await.unwrap();
1134
1135        assert_eq!(external_out.next().await.unwrap(), 1);
1136    }
1137
1138    #[cfg(feature = "sim")]
1139    #[test]
1140    #[should_panic]
1141    fn sim_fold_intermediate_states() {
1142        let flow = FlowBuilder::new();
1143        let node = flow.process::<()>();
1144
1145        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1146        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1147
1148        let tick = node.tick();
1149        let batch = folded.snapshot(&tick, nondet!(/** test */));
1150        let out_recv = batch.all_ticks().sim_output();
1151
1152        flow.sim().exhaustive(async || {
1153            assert_eq!(out_recv.next().await.unwrap(), 10);
1154        });
1155    }
1156
1157    #[cfg(feature = "sim")]
1158    #[test]
1159    fn sim_fold_intermediate_state_count() {
1160        let flow = FlowBuilder::new();
1161        let node = flow.process::<()>();
1162
1163        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1164        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1165
1166        let tick = node.tick();
1167        let batch = folded.snapshot(&tick, nondet!(/** test */));
1168        let out_recv = batch.all_ticks().sim_output();
1169
1170        let instance_count = flow.sim().exhaustive(async || {
1171            let out = out_recv.collect::<Vec<_>>().await;
1172            assert_eq!(out.last(), Some(&10));
1173        });
1174
1175        assert_eq!(
1176            instance_count,
1177            16 // 2^4 possible subsets of intermediates (including initial state)
1178        )
1179    }
1180
1181    #[cfg(feature = "sim")]
1182    #[test]
1183    fn sim_fold_no_repeat_initial() {
1184        // check that we don't repeat the initial state of the fold in autonomous decisions
1185
1186        let flow = FlowBuilder::new();
1187        let node = flow.process::<()>();
1188
1189        let (in_port, input) = node.sim_input();
1190        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1191
1192        let tick = node.tick();
1193        let batch = folded.snapshot(&tick, nondet!(/** test */));
1194        let out_recv = batch.all_ticks().sim_output();
1195
1196        flow.sim().exhaustive(async || {
1197            assert_eq!(out_recv.next().await.unwrap(), 0);
1198
1199            in_port.send(123);
1200
1201            assert_eq!(out_recv.next().await.unwrap(), 123);
1202        });
1203    }
1204
1205    #[cfg(feature = "sim")]
1206    #[test]
1207    #[should_panic]
1208    fn sim_fold_repeats_snapshots() {
1209        // when the tick is driven by a snapshot AND something else, the snapshot can
1210        // "stutter" and repeat the same state multiple times
1211
1212        let flow = FlowBuilder::new();
1213        let node = flow.process::<()>();
1214
1215        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1216        let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1217
1218        let tick = node.tick();
1219        let batch = source_iter
1220            .batch(&tick, nondet!(/** test */))
1221            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1222        let out_recv = batch.all_ticks().sim_output();
1223
1224        flow.sim().exhaustive(async || {
1225            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1226            {
1227                panic!("repeated snapshot");
1228            }
1229        });
1230    }
1231
1232    #[cfg(feature = "sim")]
1233    #[test]
1234    fn sim_fold_repeats_snapshots_count() {
1235        // check the number of instances
1236        let flow = FlowBuilder::new();
1237        let node = flow.process::<()>();
1238
1239        let source_iter = node.source_iter(q!(vec![1, 2]));
1240        let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1241
1242        let tick = node.tick();
1243        let batch = source_iter
1244            .batch(&tick, nondet!(/** test */))
1245            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1246        let out_recv = batch.all_ticks().sim_output();
1247
1248        let count = flow.sim().exhaustive(async || {
1249            let _ = out_recv.collect::<Vec<_>>().await;
1250        });
1251
1252        assert_eq!(count, 52);
1253        // don't have a combinatorial explanation for this number yet, but checked via logs
1254    }
1255
1256    #[cfg(feature = "sim")]
1257    #[test]
1258    fn sim_top_level_singleton_exhaustive() {
1259        // ensures that top-level singletons have only one snapshot
1260        let flow = FlowBuilder::new();
1261        let node = flow.process::<()>();
1262
1263        let singleton = node.singleton(q!(1));
1264        let tick = node.tick();
1265        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1266        let out_recv = batch.all_ticks().sim_output();
1267
1268        let count = flow.sim().exhaustive(async || {
1269            let _ = out_recv.collect::<Vec<_>>().await;
1270        });
1271
1272        assert_eq!(count, 1);
1273    }
1274
1275    #[cfg(feature = "sim")]
1276    #[test]
1277    fn sim_top_level_singleton_join_count() {
1278        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1279        // exploration
1280
1281        let flow = FlowBuilder::new();
1282        let node = flow.process::<()>();
1283
1284        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1285        let tick = node.tick();
1286        let batch = source_iter
1287            .batch(&tick, nondet!(/** test */))
1288            .cross_singleton(
1289                node.singleton(q!(123))
1290                    .snapshot(&tick, nondet!(/** test */)),
1291            );
1292        let out_recv = batch.all_ticks().sim_output();
1293
1294        let instance_count = flow.sim().exhaustive(async || {
1295            let _ = out_recv.collect::<Vec<_>>().await;
1296        });
1297
1298        assert_eq!(
1299            instance_count,
1300            16 // 2^4 ways to split up (including a possibly empty first batch)
1301        )
1302    }
1303}