Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A single Rust value that can asynchronously change over time.
26///
27/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
28/// [`Unbounded`], the value will asynchronously change over time.
29///
30/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
31/// a single number that will asynchronously change as events are processed. Singletons also appear
32/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
33/// such as getting the length of a batch of requests.
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this singleton
37/// - `Loc`: the [`Location`] where the singleton is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Singleton<Type, Loc, Bound: Boundedness> {
40    pub(crate) location: Loc,
41    pub(crate) ir_node: RefCell<HydroNode>,
42    pub(crate) flow_state: FlowState,
43
44    _phantom: PhantomData<(Type, Loc, Bound)>,
45}
46
47impl<T, L, B: Boundedness> Drop for Singleton<T, L, B> {
48    fn drop(&mut self) {
49        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
50        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
51            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
52                input: Box::new(ir_node),
53                op_metadata: HydroIrOpMetadata::new(),
54            });
55        }
56    }
57}
58
59impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
60where
61    T: Clone,
62    L: Location<'a> + NoTick,
63{
64    fn from(value: Singleton<T, L, Bounded>) -> Self {
65        let tick = value.location().tick();
66        value.clone_into_tick(&tick).latest()
67    }
68}
69
70impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
71where
72    L: Location<'a>,
73{
74    type Location = Tick<L>;
75
76    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
77        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
78            location.clone(),
79            HydroNode::DeferTick {
80                input: Box::new(HydroNode::CycleSource {
81                    cycle_id,
82                    metadata: location.new_node_metadata(Self::collection_kind()),
83                }),
84                metadata: location
85                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
86            },
87        );
88
89        from_previous_tick.unwrap_or(initial)
90    }
91}
92
93impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
94where
95    L: Location<'a>,
96{
97    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
98        assert_eq!(
99            Location::id(&self.location),
100            expected_location,
101            "locations do not match"
102        );
103        self.location
104            .flow_state()
105            .borrow_mut()
106            .push_root(HydroRoot::CycleSink {
107                cycle_id,
108                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
109                op_metadata: HydroIrOpMetadata::new(),
110            });
111    }
112}
113
114impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
115where
116    L: Location<'a>,
117{
118    type Location = Tick<L>;
119
120    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
121        Singleton::new(
122            location.clone(),
123            HydroNode::CycleSource {
124                cycle_id,
125                metadata: location.new_node_metadata(Self::collection_kind()),
126            },
127        )
128    }
129}
130
131impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
132where
133    L: Location<'a>,
134{
135    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
136        assert_eq!(
137            Location::id(&self.location),
138            expected_location,
139            "locations do not match"
140        );
141        self.location
142            .flow_state()
143            .borrow_mut()
144            .push_root(HydroRoot::CycleSink {
145                cycle_id,
146                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
147                op_metadata: HydroIrOpMetadata::new(),
148            });
149    }
150}
151
152impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
153where
154    L: Location<'a> + NoTick,
155{
156    type Location = L;
157
158    fn create_source(cycle_id: CycleId, location: L) -> Self {
159        Singleton::new(
160            location.clone(),
161            HydroNode::CycleSource {
162                cycle_id,
163                metadata: location.new_node_metadata(Self::collection_kind()),
164            },
165        )
166    }
167}
168
169impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
170where
171    L: Location<'a> + NoTick,
172{
173    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
174        assert_eq!(
175            Location::id(&self.location),
176            expected_location,
177            "locations do not match"
178        );
179        self.location
180            .flow_state()
181            .borrow_mut()
182            .push_root(HydroRoot::CycleSink {
183                cycle_id,
184                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
185                op_metadata: HydroIrOpMetadata::new(),
186            });
187    }
188}
189
190impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
191where
192    T: Clone,
193    L: Location<'a>,
194{
195    fn clone(&self) -> Self {
196        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
197            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
198            *self.ir_node.borrow_mut() = HydroNode::Tee {
199                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
200                metadata: self.location.new_node_metadata(Self::collection_kind()),
201            };
202        }
203
204        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
205            Singleton {
206                location: self.location.clone(),
207                flow_state: self.flow_state.clone(),
208                ir_node: HydroNode::Tee {
209                    inner: SharedNode(inner.0.clone()),
210                    metadata: metadata.clone(),
211                }
212                .into(),
213                _phantom: PhantomData,
214            }
215        } else {
216            unreachable!()
217        }
218    }
219}
220
221#[cfg(stageleft_runtime)]
222fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
223    me: Singleton<T, Tick<L>, B>,
224    other: Optional<O, Tick<L>, B>,
225) -> Optional<(T, O), Tick<L>, B> {
226    let me_as_optional: Optional<T, Tick<L>, B> = me.into();
227    super::optional::zip_inside_tick(me_as_optional, other)
228}
229
230impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
231where
232    L: Location<'a>,
233{
234    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
235        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
236        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
237        let flow_state = location.flow_state().clone();
238        Singleton {
239            location,
240            flow_state,
241            ir_node: RefCell::new(ir_node),
242            _phantom: PhantomData,
243        }
244    }
245
246    pub(crate) fn collection_kind() -> CollectionKind {
247        CollectionKind::Singleton {
248            bound: B::BOUND_KIND,
249            element_type: stageleft::quote_type::<T>().into(),
250        }
251    }
252
253    /// Returns the [`Location`] where this singleton is being materialized.
254    pub fn location(&self) -> &L {
255        &self.location
256    }
257
258    /// Transforms the singleton value by applying a function `f` to it,
259    /// continuously as the input is updated.
260    ///
261    /// # Example
262    /// ```rust
263    /// # #[cfg(feature = "deploy")] {
264    /// # use hydro_lang::prelude::*;
265    /// # use futures::StreamExt;
266    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
267    /// let tick = process.tick();
268    /// let singleton = tick.singleton(q!(5));
269    /// singleton.map(q!(|v| v * 2)).all_ticks()
270    /// # }, |mut stream| async move {
271    /// // 10
272    /// # assert_eq!(stream.next().await.unwrap(), 10);
273    /// # }));
274    /// # }
275    /// ```
276    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
277    where
278        F: Fn(T) -> U + 'a,
279    {
280        let f = f.splice_fn1_ctx(&self.location).into();
281        Singleton::new(
282            self.location.clone(),
283            HydroNode::Map {
284                f,
285                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
286                metadata: self
287                    .location
288                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
289            },
290        )
291    }
292
293    /// Transforms the singleton value by applying a function `f` to it and then flattening
294    /// the result into a stream, preserving the order of elements.
295    ///
296    /// The function `f` is applied to the singleton value to produce an iterator, and all items
297    /// from that iterator are emitted in the output stream in deterministic order.
298    ///
299    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
300    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
301    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
302    ///
303    /// # Example
304    /// ```rust
305    /// # #[cfg(feature = "deploy")] {
306    /// # use hydro_lang::prelude::*;
307    /// # use futures::StreamExt;
308    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
309    /// let tick = process.tick();
310    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
311    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
312    /// # }, |mut stream| async move {
313    /// // 1, 2, 3
314    /// # for w in vec![1, 2, 3] {
315    /// #     assert_eq!(stream.next().await.unwrap(), w);
316    /// # }
317    /// # }));
318    /// # }
319    /// ```
320    pub fn flat_map_ordered<U, I, F>(
321        self,
322        f: impl IntoQuotedMut<'a, F, L>,
323    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
324    where
325        I: IntoIterator<Item = U>,
326        F: Fn(T) -> I + 'a,
327    {
328        let f = f.splice_fn1_ctx(&self.location).into();
329        Stream::new(
330            self.location.clone(),
331            HydroNode::FlatMap {
332                f,
333                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
334                metadata: self.location.new_node_metadata(
335                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
336                ),
337            },
338        )
339    }
340
341    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
342    /// for the output type `I` to produce items in any order.
343    ///
344    /// The function `f` is applied to the singleton value to produce an iterator, and all items
345    /// from that iterator are emitted in the output stream in non-deterministic order.
346    ///
347    /// # Example
348    /// ```rust
349    /// # #[cfg(feature = "deploy")] {
350    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
351    /// # use futures::StreamExt;
352    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
353    /// let tick = process.tick();
354    /// let singleton = tick.singleton(q!(
355    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
356    /// ));
357    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
358    /// # }, |mut stream| async move {
359    /// // 1, 2, 3, but in no particular order
360    /// # let mut results = Vec::new();
361    /// # for _ in 0..3 {
362    /// #     results.push(stream.next().await.unwrap());
363    /// # }
364    /// # results.sort();
365    /// # assert_eq!(results, vec![1, 2, 3]);
366    /// # }));
367    /// # }
368    /// ```
369    pub fn flat_map_unordered<U, I, F>(
370        self,
371        f: impl IntoQuotedMut<'a, F, L>,
372    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
373    where
374        I: IntoIterator<Item = U>,
375        F: Fn(T) -> I + 'a,
376    {
377        let f = f.splice_fn1_ctx(&self.location).into();
378        Stream::new(
379            self.location.clone(),
380            HydroNode::FlatMap {
381                f,
382                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
383                metadata: self
384                    .location
385                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
386            },
387        )
388    }
389
390    /// Flattens the singleton value into a stream, preserving the order of elements.
391    ///
392    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
393    /// are emitted in the output stream in deterministic order.
394    ///
395    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
396    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
397    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
398    ///
399    /// # Example
400    /// ```rust
401    /// # #[cfg(feature = "deploy")] {
402    /// # use hydro_lang::prelude::*;
403    /// # use futures::StreamExt;
404    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
405    /// let tick = process.tick();
406    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
407    /// singleton.flatten_ordered().all_ticks()
408    /// # }, |mut stream| async move {
409    /// // 1, 2, 3
410    /// # for w in vec![1, 2, 3] {
411    /// #     assert_eq!(stream.next().await.unwrap(), w);
412    /// # }
413    /// # }));
414    /// # }
415    /// ```
416    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
417    where
418        T: IntoIterator<Item = U>,
419    {
420        self.flat_map_ordered(q!(|x| x))
421    }
422
423    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
424    /// for the element type `T` to produce items in any order.
425    ///
426    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
427    /// are emitted in the output stream in non-deterministic order.
428    ///
429    /// # Example
430    /// ```rust
431    /// # #[cfg(feature = "deploy")] {
432    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
433    /// # use futures::StreamExt;
434    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
435    /// let tick = process.tick();
436    /// let singleton = tick.singleton(q!(
437    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
438    /// ));
439    /// singleton.flatten_unordered().all_ticks()
440    /// # }, |mut stream| async move {
441    /// // 1, 2, 3, but in no particular order
442    /// # let mut results = Vec::new();
443    /// # for _ in 0..3 {
444    /// #     results.push(stream.next().await.unwrap());
445    /// # }
446    /// # results.sort();
447    /// # assert_eq!(results, vec![1, 2, 3]);
448    /// # }));
449    /// # }
450    /// ```
451    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
452    where
453        T: IntoIterator<Item = U>,
454    {
455        self.flat_map_unordered(q!(|x| x))
456    }
457
458    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
459    ///
460    /// If the predicate returns `true`, the output optional contains the same value.
461    /// If the predicate returns `false`, the output optional is empty.
462    ///
463    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
464    /// not modify or take ownership of the value. If you need to modify the value while filtering
465    /// use [`Singleton::filter_map`] instead.
466    ///
467    /// # Example
468    /// ```rust
469    /// # #[cfg(feature = "deploy")] {
470    /// # use hydro_lang::prelude::*;
471    /// # use futures::StreamExt;
472    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
473    /// let tick = process.tick();
474    /// let singleton = tick.singleton(q!(5));
475    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
476    /// # }, |mut stream| async move {
477    /// // 5
478    /// # assert_eq!(stream.next().await.unwrap(), 5);
479    /// # }));
480    /// # }
481    /// ```
482    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
483    where
484        F: Fn(&T) -> bool + 'a,
485    {
486        let f = f.splice_fn1_borrow_ctx(&self.location).into();
487        Optional::new(
488            self.location.clone(),
489            HydroNode::Filter {
490                f,
491                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
492                metadata: self
493                    .location
494                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
495            },
496        )
497    }
498
499    /// An operator that both filters and maps. It yields the value only if the supplied
500    /// closure `f` returns `Some(value)`.
501    ///
502    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
503    /// If the closure returns `None`, the output optional is empty.
504    ///
505    /// # Example
506    /// ```rust
507    /// # #[cfg(feature = "deploy")] {
508    /// # use hydro_lang::prelude::*;
509    /// # use futures::StreamExt;
510    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
511    /// let tick = process.tick();
512    /// let singleton = tick.singleton(q!("42"));
513    /// singleton
514    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
515    ///     .all_ticks()
516    /// # }, |mut stream| async move {
517    /// // 42
518    /// # assert_eq!(stream.next().await.unwrap(), 42);
519    /// # }));
520    /// # }
521    /// ```
522    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
523    where
524        F: Fn(T) -> Option<U> + 'a,
525    {
526        let f = f.splice_fn1_ctx(&self.location).into();
527        Optional::new(
528            self.location.clone(),
529            HydroNode::FilterMap {
530                f,
531                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
532                metadata: self
533                    .location
534                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
535            },
536        )
537    }
538
539    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
540    ///
541    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
542    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
543    /// non-null. This is useful for combining several pieces of state together.
544    ///
545    /// # Example
546    /// ```rust
547    /// # #[cfg(feature = "deploy")] {
548    /// # use hydro_lang::prelude::*;
549    /// # use futures::StreamExt;
550    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
551    /// let tick = process.tick();
552    /// let numbers = process
553    ///   .source_iter(q!(vec![123, 456]))
554    ///   .batch(&tick, nondet!(/** test */));
555    /// let count = numbers.clone().count(); // Singleton
556    /// let max = numbers.max(); // Optional
557    /// count.zip(max).all_ticks()
558    /// # }, |mut stream| async move {
559    /// // [(2, 456)]
560    /// # for w in vec![(2, 456)] {
561    /// #     assert_eq!(stream.next().await.unwrap(), w);
562    /// # }
563    /// # }));
564    /// # }
565    /// ```
566    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
567    where
568        Self: ZipResult<'a, O, Location = L>,
569        B: IsBounded,
570    {
571        check_matching_location(&self.location, &Self::other_location(&other));
572
573        if L::is_top_level()
574            && let Some(tick) = self.location.try_tick()
575        {
576            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
577            let out = zip_inside_tick(
578                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
579                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
580                    other_location.clone(),
581                    HydroNode::Cast {
582                        inner: Box::new(Self::other_ir_node(other)),
583                        metadata: other_location.new_node_metadata(Optional::<
584                            <Self as ZipResult<'a, O>>::OtherType,
585                            Tick<L>,
586                            Bounded,
587                        >::collection_kind(
588                        )),
589                    },
590                )
591                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
592            )
593            .latest();
594
595            Self::make(
596                out.location.clone(),
597                out.ir_node.replace(HydroNode::Placeholder),
598            )
599        } else {
600            Self::make(
601                self.location.clone(),
602                HydroNode::CrossSingleton {
603                    left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
604                    right: Box::new(Self::other_ir_node(other)),
605                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
606                        bound: B::BOUND_KIND,
607                        element_type: stageleft::quote_type::<
608                            <Self as ZipResult<'a, O>>::ElementType,
609                        >()
610                        .into(),
611                    }),
612                },
613            )
614        }
615    }
616
617    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
618    /// boolean signal is `true`, otherwise the output is null.
619    ///
620    /// # Example
621    /// ```rust
622    /// # #[cfg(feature = "deploy")] {
623    /// # use hydro_lang::prelude::*;
624    /// # use futures::StreamExt;
625    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
626    /// let tick = process.tick();
627    /// // ticks are lazy by default, forces the second tick to run
628    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
629    ///
630    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
631    /// let batch_first_tick = process
632    ///   .source_iter(q!(vec![1]))
633    ///   .batch(&tick, nondet!(/** test */));
634    /// let batch_second_tick = process
635    ///   .source_iter(q!(vec![1, 2, 3]))
636    ///   .batch(&tick, nondet!(/** test */))
637    ///   .defer_tick();
638    /// batch_first_tick.chain(batch_second_tick).count()
639    ///   .filter_if(signal)
640    ///   .all_ticks()
641    /// # }, |mut stream| async move {
642    /// // [1]
643    /// # for w in vec![1] {
644    /// #     assert_eq!(stream.next().await.unwrap(), w);
645    /// # }
646    /// # }));
647    /// # }
648    /// ```
649    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
650    where
651        B: IsBounded,
652    {
653        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
654    }
655
656    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
657    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
658    ///
659    /// Useful for conditionally processing, such as only emitting a singleton's value outside
660    /// a tick if some other condition is satisfied.
661    ///
662    /// # Example
663    /// ```rust
664    /// # #[cfg(feature = "deploy")] {
665    /// # use hydro_lang::prelude::*;
666    /// # use futures::StreamExt;
667    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
668    /// let tick = process.tick();
669    /// // ticks are lazy by default, forces the second tick to run
670    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
671    ///
672    /// let batch_first_tick = process
673    ///   .source_iter(q!(vec![1]))
674    ///   .batch(&tick, nondet!(/** test */));
675    /// let batch_second_tick = process
676    ///   .source_iter(q!(vec![1, 2, 3]))
677    ///   .batch(&tick, nondet!(/** test */))
678    ///   .defer_tick(); // appears on the second tick
679    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
680    /// batch_first_tick.chain(batch_second_tick).count()
681    ///   .filter_if_some(some_on_first_tick)
682    ///   .all_ticks()
683    /// # }, |mut stream| async move {
684    /// // [1]
685    /// # for w in vec![1] {
686    /// #     assert_eq!(stream.next().await.unwrap(), w);
687    /// # }
688    /// # }));
689    /// # }
690    /// ```
691    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
692    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
693    where
694        B: IsBounded,
695    {
696        self.filter_if(signal.is_some())
697    }
698
699    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
700    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
701    ///
702    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
703    /// the condition.
704    ///
705    /// # Example
706    /// ```rust
707    /// # #[cfg(feature = "deploy")] {
708    /// # use hydro_lang::prelude::*;
709    /// # use futures::StreamExt;
710    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
711    /// let tick = process.tick();
712    /// // ticks are lazy by default, forces the second tick to run
713    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
714    ///
715    /// let batch_first_tick = process
716    ///   .source_iter(q!(vec![1]))
717    ///   .batch(&tick, nondet!(/** test */));
718    /// let batch_second_tick = process
719    ///   .source_iter(q!(vec![1, 2, 3]))
720    ///   .batch(&tick, nondet!(/** test */))
721    ///   .defer_tick(); // appears on the second tick
722    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
723    /// batch_first_tick.chain(batch_second_tick).count()
724    ///   .filter_if_none(some_on_first_tick)
725    ///   .all_ticks()
726    /// # }, |mut stream| async move {
727    /// // [3]
728    /// # for w in vec![3] {
729    /// #     assert_eq!(stream.next().await.unwrap(), w);
730    /// # }
731    /// # }));
732    /// # }
733    /// ```
734    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
735    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
736    where
737        B: IsBounded,
738    {
739        self.filter_if(other.is_none())
740    }
741
742    /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
743    ///
744    /// # Example
745    /// ```rust
746    /// # #[cfg(feature = "deploy")] {
747    /// # use hydro_lang::prelude::*;
748    /// # use futures::StreamExt;
749    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
750    /// let tick = process.tick();
751    /// let a = tick.singleton(q!(5));
752    /// let b = tick.singleton(q!(5));
753    /// a.equals(b).all_ticks()
754    /// # }, |mut stream| async move {
755    /// // [true]
756    /// # assert_eq!(stream.next().await.unwrap(), true);
757    /// # }));
758    /// # }
759    /// ```
760    pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
761    where
762        T: PartialEq,
763        B: IsBounded,
764    {
765        self.zip(other).map(q!(|(a, b)| a == b))
766    }
767
768    /// An operator which allows you to "name" a `HydroNode`.
769    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
770    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
771        {
772            let mut node = self.ir_node.borrow_mut();
773            let metadata = node.metadata_mut();
774            metadata.tag = Some(name.to_owned());
775        }
776        self
777    }
778}
779
780impl<'a, L: Location<'a>, B: Boundedness> Not for Singleton<bool, L, B> {
781    type Output = Singleton<bool, L, B>;
782
783    fn not(self) -> Self::Output {
784        self.map(q!(|b| !b))
785    }
786}
787
788impl<'a, T, L, B: Boundedness> Singleton<Option<T>, L, B>
789where
790    L: Location<'a>,
791{
792    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
793    /// the inner `Option`.
794    ///
795    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
796    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
797    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
798    ///
799    /// # Example
800    /// ```rust
801    /// # #[cfg(feature = "deploy")] {
802    /// # use hydro_lang::prelude::*;
803    /// # use futures::StreamExt;
804    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
805    /// let tick = process.tick();
806    /// let singleton = tick.singleton(q!(Some(42)));
807    /// singleton.into_optional().all_ticks()
808    /// # }, |mut stream| async move {
809    /// // 42
810    /// # assert_eq!(stream.next().await.unwrap(), 42);
811    /// # }));
812    /// # }
813    /// ```
814    pub fn into_optional(self) -> Optional<T, L, B> {
815        self.filter_map(q!(|v| v))
816    }
817}
818
819impl<'a, L, B: Boundedness> Singleton<bool, L, B>
820where
821    L: Location<'a>,
822{
823    /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
824    ///
825    /// # Example
826    /// ```rust
827    /// # #[cfg(feature = "deploy")] {
828    /// # use hydro_lang::prelude::*;
829    /// # use futures::StreamExt;
830    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
831    /// let tick = process.tick();
832    /// // ticks are lazy by default, forces the second tick to run
833    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
834    ///
835    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
836    /// let b = tick.singleton(q!(true)); // true, true
837    /// a.and(b).all_ticks()
838    /// # }, |mut stream| async move {
839    /// // [true, false]
840    /// # for w in vec![true, false] {
841    /// #     assert_eq!(stream.next().await.unwrap(), w);
842    /// # }
843    /// # }));
844    /// # }
845    /// ```
846    pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, B>
847    where
848        B: IsBounded,
849    {
850        self.zip(other).map(q!(|(a, b)| a && b))
851    }
852
853    /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
854    ///
855    /// # Example
856    /// ```rust
857    /// # #[cfg(feature = "deploy")] {
858    /// # use hydro_lang::prelude::*;
859    /// # use futures::StreamExt;
860    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
861    /// let tick = process.tick();
862    /// // ticks are lazy by default, forces the second tick to run
863    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
864    ///
865    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
866    /// let b = tick.singleton(q!(false)); // false, false
867    /// a.or(b).all_ticks()
868    /// # }, |mut stream| async move {
869    /// // [true, false]
870    /// # for w in vec![true, false] {
871    /// #     assert_eq!(stream.next().await.unwrap(), w);
872    /// # }
873    /// # }));
874    /// # }
875    /// ```
876    pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, B>
877    where
878        B: IsBounded,
879    {
880        self.zip(other).map(q!(|(a, b)| a || b))
881    }
882}
883
884impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
885where
886    L: Location<'a> + NoTick,
887{
888    /// Returns a singleton value corresponding to the latest snapshot of the singleton
889    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
890    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
891    /// all snapshots of this singleton into the atomic-associated tick will observe the
892    /// same value each tick.
893    ///
894    /// # Non-Determinism
895    /// Because this picks a snapshot of a singleton whose value is continuously changing,
896    /// the output singleton has a non-deterministic value since the snapshot can be at an
897    /// arbitrary point in time.
898    pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
899        Singleton::new(
900            self.location.clone().tick,
901            HydroNode::Batch {
902                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
903                metadata: self
904                    .location
905                    .tick
906                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
907            },
908        )
909    }
910
911    /// Returns this singleton back into a top-level, asynchronous execution context where updates
912    /// to the value will be asynchronously propagated.
913    pub fn end_atomic(self) -> Singleton<T, L, B> {
914        Singleton::new(
915            self.location.tick.l.clone(),
916            HydroNode::EndAtomic {
917                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
918                metadata: self
919                    .location
920                    .tick
921                    .l
922                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
923            },
924        )
925    }
926}
927
928impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
929where
930    L: Location<'a>,
931{
932    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
933    /// will observe the same version of the value and will be executed synchronously before any
934    /// outputs are yielded (in [`Optional::end_atomic`]).
935    ///
936    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
937    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
938    /// a different version).
939    ///
940    /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
941    /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
942    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
943    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
944        let out_location = Atomic { tick: tick.clone() };
945        Singleton::new(
946            out_location.clone(),
947            HydroNode::BeginAtomic {
948                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
949                metadata: out_location
950                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
951            },
952        )
953    }
954
955    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
956    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
957    /// relevant data that contributed to the snapshot at tick `t`.
958    ///
959    /// # Non-Determinism
960    /// Because this picks a snapshot of a singleton whose value is continuously changing,
961    /// the output singleton has a non-deterministic value since the snapshot can be at an
962    /// arbitrary point in time.
963    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
964        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
965        Singleton::new(
966            tick.clone(),
967            HydroNode::Batch {
968                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
969                metadata: tick
970                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
971            },
972        )
973    }
974
975    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
976    /// with order corresponding to increasing prefixes of data contributing to the singleton.
977    ///
978    /// # Non-Determinism
979    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
980    /// to non-deterministic batching and arrival of inputs, the output stream is
981    /// non-deterministic.
982    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
983    where
984        L: NoTick,
985    {
986        sliced! {
987            let snapshot = use(self, nondet);
988            snapshot.into_stream()
989        }
990        .weaken_retries()
991    }
992
993    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
994    /// value taken at various points in time. Because the input singleton may be
995    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
996    /// represent the value of the singleton given some prefix of the streams leading up to
997    /// it.
998    ///
999    /// # Non-Determinism
1000    /// The output stream is non-deterministic in which elements are sampled, since this
1001    /// is controlled by a clock.
1002    pub fn sample_every(
1003        self,
1004        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1005        nondet: NonDet,
1006    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1007    where
1008        L: NoTick + NoAtomic,
1009    {
1010        let samples = self.location.source_interval(interval, nondet);
1011        sliced! {
1012            let snapshot = use(self, nondet);
1013            let sample_batch = use(samples, nondet);
1014
1015            snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1016        }
1017        .weaken_retries()
1018    }
1019
1020    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1021    /// implies that `B == Bounded`.
1022    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1023    where
1024        B: IsBounded,
1025    {
1026        Singleton::new(
1027            self.location.clone(),
1028            self.ir_node.replace(HydroNode::Placeholder),
1029        )
1030    }
1031
1032    /// Clones this bounded singleton into a tick, returning a singleton that has the
1033    /// same value as the outer singleton. Because the outer singleton is bounded, this
1034    /// is deterministic because there is only a single immutable version.
1035    pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1036    where
1037        B: IsBounded,
1038        T: Clone,
1039    {
1040        // TODO(shadaj): avoid printing simulator logs for this snapshot
1041        self.snapshot(
1042            tick,
1043            nondet!(/** bounded top-level singleton so deterministic */),
1044        )
1045    }
1046
1047    /// Converts this singleton into a [`Stream`] containing a single element, the value.
1048    ///
1049    /// # Example
1050    /// ```rust
1051    /// # #[cfg(feature = "deploy")] {
1052    /// # use hydro_lang::prelude::*;
1053    /// # use futures::StreamExt;
1054    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1055    /// let tick = process.tick();
1056    /// let batch_input = process
1057    ///   .source_iter(q!(vec![123, 456]))
1058    ///   .batch(&tick, nondet!(/** test */));
1059    /// batch_input.clone().chain(
1060    ///   batch_input.count().into_stream()
1061    /// ).all_ticks()
1062    /// # }, |mut stream| async move {
1063    /// // [123, 456, 2]
1064    /// # for w in vec![123, 456, 2] {
1065    /// #     assert_eq!(stream.next().await.unwrap(), w);
1066    /// # }
1067    /// # }));
1068    /// # }
1069    /// ```
1070    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1071    where
1072        B: IsBounded,
1073    {
1074        Stream::new(
1075            self.location.clone(),
1076            HydroNode::Cast {
1077                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1078                metadata: self.location.new_node_metadata(Stream::<
1079                    T,
1080                    Tick<L>,
1081                    Bounded,
1082                    TotalOrder,
1083                    ExactlyOnce,
1084                >::collection_kind()),
1085            },
1086        )
1087    }
1088}
1089
1090impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1091where
1092    L: Location<'a>,
1093{
1094    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1095    /// which will stream the value computed in _each_ tick as a separate stream element.
1096    ///
1097    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1098    /// producing one element in the output for each tick. This is useful for batched computations,
1099    /// where the results from each tick must be combined together.
1100    ///
1101    /// # Example
1102    /// ```rust
1103    /// # #[cfg(feature = "deploy")] {
1104    /// # use hydro_lang::prelude::*;
1105    /// # use futures::StreamExt;
1106    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1107    /// let tick = process.tick();
1108    /// # // ticks are lazy by default, forces the second tick to run
1109    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1110    /// # let batch_first_tick = process
1111    /// #   .source_iter(q!(vec![1]))
1112    /// #   .batch(&tick, nondet!(/** test */));
1113    /// # let batch_second_tick = process
1114    /// #   .source_iter(q!(vec![1, 2, 3]))
1115    /// #   .batch(&tick, nondet!(/** test */))
1116    /// #   .defer_tick(); // appears on the second tick
1117    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1118    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1119    ///     .count()
1120    ///     .all_ticks()
1121    /// # }, |mut stream| async move {
1122    /// // [1, 3]
1123    /// # for w in vec![1, 3] {
1124    /// #     assert_eq!(stream.next().await.unwrap(), w);
1125    /// # }
1126    /// # }));
1127    /// # }
1128    /// ```
1129    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1130        self.into_stream().all_ticks()
1131    }
1132
1133    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1134    /// which will stream the value computed in _each_ tick as a separate stream element.
1135    ///
1136    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1137    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1138    /// singleton's [`Tick`] context.
1139    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1140        self.into_stream().all_ticks_atomic()
1141    }
1142
1143    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1144    /// be asynchronously updated with the latest value of the singleton inside the tick.
1145    ///
1146    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1147    /// tick that tracks the inner value. This is useful for getting the value as of the
1148    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1149    ///
1150    /// # Example
1151    /// ```rust
1152    /// # #[cfg(feature = "deploy")] {
1153    /// # use hydro_lang::prelude::*;
1154    /// # use futures::StreamExt;
1155    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1156    /// let tick = process.tick();
1157    /// # // ticks are lazy by default, forces the second tick to run
1158    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1159    /// # let batch_first_tick = process
1160    /// #   .source_iter(q!(vec![1]))
1161    /// #   .batch(&tick, nondet!(/** test */));
1162    /// # let batch_second_tick = process
1163    /// #   .source_iter(q!(vec![1, 2, 3]))
1164    /// #   .batch(&tick, nondet!(/** test */))
1165    /// #   .defer_tick(); // appears on the second tick
1166    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1167    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1168    ///     .count()
1169    ///     .latest()
1170    /// # .sample_eager(nondet!(/** test */))
1171    /// # }, |mut stream| async move {
1172    /// // asynchronously changes from 1 ~> 3
1173    /// # for w in vec![1, 3] {
1174    /// #     assert_eq!(stream.next().await.unwrap(), w);
1175    /// # }
1176    /// # }));
1177    /// # }
1178    /// ```
1179    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1180        Singleton::new(
1181            self.location.outer().clone(),
1182            HydroNode::YieldConcat {
1183                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1184                metadata: self
1185                    .location
1186                    .outer()
1187                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1188            },
1189        )
1190    }
1191
1192    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1193    /// be updated with the latest value of the singleton inside the tick.
1194    ///
1195    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1196    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1197    /// singleton's [`Tick`] context.
1198    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1199        let out_location = Atomic {
1200            tick: self.location.clone(),
1201        };
1202        Singleton::new(
1203            out_location.clone(),
1204            HydroNode::YieldConcat {
1205                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1206                metadata: out_location
1207                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1208            },
1209        )
1210    }
1211}
1212
1213#[doc(hidden)]
1214/// Helper trait that determines the output collection type for [`Singleton::zip`].
1215///
1216/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1217/// [`Singleton`].
1218#[sealed::sealed]
1219pub trait ZipResult<'a, Other> {
1220    /// The output collection type.
1221    type Out;
1222    /// The type of the tupled output value.
1223    type ElementType;
1224    /// The type of the other collection's value.
1225    type OtherType;
1226    /// The location where the tupled result will be materialized.
1227    type Location: Location<'a>;
1228
1229    /// The location of the second input to the `zip`.
1230    fn other_location(other: &Other) -> Self::Location;
1231    /// The IR node of the second input to the `zip`.
1232    fn other_ir_node(other: Other) -> HydroNode;
1233
1234    /// Constructs the output live collection given an IR node containing the zip result.
1235    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1236}
1237
1238#[sealed::sealed]
1239impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1240where
1241    L: Location<'a>,
1242{
1243    type Out = Singleton<(T, U), L, B>;
1244    type ElementType = (T, U);
1245    type OtherType = U;
1246    type Location = L;
1247
1248    fn other_location(other: &Singleton<U, L, B>) -> L {
1249        other.location.clone()
1250    }
1251
1252    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1253        other.ir_node.replace(HydroNode::Placeholder)
1254    }
1255
1256    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1257        Singleton::new(
1258            location.clone(),
1259            HydroNode::Cast {
1260                inner: Box::new(ir_node),
1261                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1262            },
1263        )
1264    }
1265}
1266
1267#[sealed::sealed]
1268impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1269where
1270    L: Location<'a>,
1271{
1272    type Out = Optional<(T, U), L, B>;
1273    type ElementType = (T, U);
1274    type OtherType = U;
1275    type Location = L;
1276
1277    fn other_location(other: &Optional<U, L, B>) -> L {
1278        other.location.clone()
1279    }
1280
1281    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1282        other.ir_node.replace(HydroNode::Placeholder)
1283    }
1284
1285    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1286        Optional::new(location, ir_node)
1287    }
1288}
1289
1290#[cfg(test)]
1291mod tests {
1292    #[cfg(feature = "deploy")]
1293    use futures::{SinkExt, StreamExt};
1294    #[cfg(feature = "deploy")]
1295    use hydro_deploy::Deployment;
1296    #[cfg(any(feature = "deploy", feature = "sim"))]
1297    use stageleft::q;
1298
1299    #[cfg(any(feature = "deploy", feature = "sim"))]
1300    use crate::compile::builder::FlowBuilder;
1301    #[cfg(feature = "deploy")]
1302    use crate::live_collections::stream::ExactlyOnce;
1303    #[cfg(any(feature = "deploy", feature = "sim"))]
1304    use crate::location::Location;
1305    #[cfg(any(feature = "deploy", feature = "sim"))]
1306    use crate::nondet::nondet;
1307
1308    #[cfg(feature = "deploy")]
1309    #[tokio::test]
1310    async fn tick_cycle_cardinality() {
1311        let mut deployment = Deployment::new();
1312
1313        let mut flow = FlowBuilder::new();
1314        let node = flow.process::<()>();
1315        let external = flow.external::<()>();
1316
1317        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1318
1319        let node_tick = node.tick();
1320        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1321        let counts = singleton
1322            .clone()
1323            .into_stream()
1324            .count()
1325            .filter_if(
1326                input
1327                    .batch(&node_tick, nondet!(/** testing */))
1328                    .first()
1329                    .is_some(),
1330            )
1331            .all_ticks()
1332            .send_bincode_external(&external);
1333        complete_cycle.complete_next_tick(singleton);
1334
1335        let nodes = flow
1336            .with_process(&node, deployment.Localhost())
1337            .with_external(&external, deployment.Localhost())
1338            .deploy(&mut deployment);
1339
1340        deployment.deploy().await.unwrap();
1341
1342        let mut tick_trigger = nodes.connect(input_send).await;
1343        let mut external_out = nodes.connect(counts).await;
1344
1345        deployment.start().await.unwrap();
1346
1347        tick_trigger.send(()).await.unwrap();
1348
1349        assert_eq!(external_out.next().await.unwrap(), 1);
1350
1351        tick_trigger.send(()).await.unwrap();
1352
1353        assert_eq!(external_out.next().await.unwrap(), 1);
1354    }
1355
1356    #[cfg(feature = "sim")]
1357    #[test]
1358    #[should_panic]
1359    fn sim_fold_intermediate_states() {
1360        let mut flow = FlowBuilder::new();
1361        let node = flow.process::<()>();
1362
1363        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1364        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1365
1366        let tick = node.tick();
1367        let batch = folded.snapshot(&tick, nondet!(/** test */));
1368        let out_recv = batch.all_ticks().sim_output();
1369
1370        flow.sim().exhaustive(async || {
1371            assert_eq!(out_recv.next().await.unwrap(), 10);
1372        });
1373    }
1374
1375    #[cfg(feature = "sim")]
1376    #[test]
1377    fn sim_fold_intermediate_state_count() {
1378        let mut flow = FlowBuilder::new();
1379        let node = flow.process::<()>();
1380
1381        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1382        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1383
1384        let tick = node.tick();
1385        let batch = folded.snapshot(&tick, nondet!(/** test */));
1386        let out_recv = batch.all_ticks().sim_output();
1387
1388        let instance_count = flow.sim().exhaustive(async || {
1389            let out = out_recv.collect::<Vec<_>>().await;
1390            assert_eq!(out.last(), Some(&10));
1391        });
1392
1393        assert_eq!(
1394            instance_count,
1395            16 // 2^4 possible subsets of intermediates (including initial state)
1396        )
1397    }
1398
1399    #[cfg(feature = "sim")]
1400    #[test]
1401    fn sim_fold_no_repeat_initial() {
1402        // check that we don't repeat the initial state of the fold in autonomous decisions
1403
1404        let mut flow = FlowBuilder::new();
1405        let node = flow.process::<()>();
1406
1407        let (in_port, input) = node.sim_input();
1408        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1409
1410        let tick = node.tick();
1411        let batch = folded.snapshot(&tick, nondet!(/** test */));
1412        let out_recv = batch.all_ticks().sim_output();
1413
1414        flow.sim().exhaustive(async || {
1415            assert_eq!(out_recv.next().await.unwrap(), 0);
1416
1417            in_port.send(123);
1418
1419            assert_eq!(out_recv.next().await.unwrap(), 123);
1420        });
1421    }
1422
1423    #[cfg(feature = "sim")]
1424    #[test]
1425    #[should_panic]
1426    fn sim_fold_repeats_snapshots() {
1427        // when the tick is driven by a snapshot AND something else, the snapshot can
1428        // "stutter" and repeat the same state multiple times
1429
1430        let mut flow = FlowBuilder::new();
1431        let node = flow.process::<()>();
1432
1433        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1434        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1435
1436        let tick = node.tick();
1437        let batch = source
1438            .batch(&tick, nondet!(/** test */))
1439            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1440        let out_recv = batch.all_ticks().sim_output();
1441
1442        flow.sim().exhaustive(async || {
1443            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1444            {
1445                panic!("repeated snapshot");
1446            }
1447        });
1448    }
1449
1450    #[cfg(feature = "sim")]
1451    #[test]
1452    fn sim_fold_repeats_snapshots_count() {
1453        // check the number of instances
1454        let mut flow = FlowBuilder::new();
1455        let node = flow.process::<()>();
1456
1457        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1458        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1459
1460        let tick = node.tick();
1461        let batch = source
1462            .batch(&tick, nondet!(/** test */))
1463            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1464        let out_recv = batch.all_ticks().sim_output();
1465
1466        let count = flow.sim().exhaustive(async || {
1467            let _ = out_recv.collect::<Vec<_>>().await;
1468        });
1469
1470        assert_eq!(count, 52);
1471        // don't have a combinatorial explanation for this number yet, but checked via logs
1472    }
1473
1474    #[cfg(feature = "sim")]
1475    #[test]
1476    fn sim_top_level_singleton_exhaustive() {
1477        // ensures that top-level singletons have only one snapshot
1478        let mut flow = FlowBuilder::new();
1479        let node = flow.process::<()>();
1480
1481        let singleton = node.singleton(q!(1));
1482        let tick = node.tick();
1483        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1484        let out_recv = batch.all_ticks().sim_output();
1485
1486        let count = flow.sim().exhaustive(async || {
1487            let _ = out_recv.collect::<Vec<_>>().await;
1488        });
1489
1490        assert_eq!(count, 1);
1491    }
1492
1493    #[cfg(feature = "sim")]
1494    #[test]
1495    fn sim_top_level_singleton_join_count() {
1496        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1497        // exploration
1498
1499        let mut flow = FlowBuilder::new();
1500        let node = flow.process::<()>();
1501
1502        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1503        let tick = node.tick();
1504        let batch = source_iter
1505            .batch(&tick, nondet!(/** test */))
1506            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1507        let out_recv = batch.all_ticks().sim_output();
1508
1509        let instance_count = flow.sim().exhaustive(async || {
1510            let _ = out_recv.collect::<Vec<_>>().await;
1511        });
1512
1513        assert_eq!(
1514            instance_count,
1515            16 // 2^4 ways to split up (including a possibly empty first batch)
1516        )
1517    }
1518
1519    #[cfg(feature = "sim")]
1520    #[test]
1521    fn top_level_singleton_into_stream_no_replay() {
1522        let mut flow = FlowBuilder::new();
1523        let node = flow.process::<()>();
1524
1525        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1526        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1527
1528        let out_recv = folded.into_stream().sim_output();
1529
1530        flow.sim().exhaustive(async || {
1531            out_recv.assert_yields_only([10]).await;
1532        });
1533    }
1534
1535    #[cfg(feature = "sim")]
1536    #[test]
1537    fn inside_tick_singleton_zip() {
1538        use crate::live_collections::Stream;
1539        use crate::live_collections::sliced::sliced;
1540
1541        let mut flow = FlowBuilder::new();
1542        let node = flow.process::<()>();
1543
1544        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1545        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1546
1547        let out_recv = sliced! {
1548            let v = use(folded, nondet!(/** test */));
1549            v.clone().zip(v).into_stream()
1550        }
1551        .sim_output();
1552
1553        let count = flow.sim().exhaustive(async || {
1554            let out = out_recv.collect::<Vec<_>>().await;
1555            assert_eq!(out.last(), Some(&(3, 3)));
1556        });
1557
1558        assert_eq!(count, 4);
1559    }
1560}