hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
45where
46    L: Location<'a>,
47{
48    fn defer_tick(self) -> Self {
49        Optional::defer_tick(self)
50    }
51}
52
53impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    type Location = Tick<L>;
58
59    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
60        Optional::new(
61            location.clone(),
62            HydroNode::CycleSource {
63                ident,
64                metadata: location.new_node_metadata(Self::collection_kind()),
65            },
66        )
67    }
68}
69
70impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
71where
72    L: Location<'a>,
73{
74    type Location = Tick<L>;
75
76    fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
77        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
78            location.clone(),
79            HydroNode::DeferTick {
80                input: Box::new(HydroNode::CycleSource {
81                    ident,
82                    metadata: location.new_node_metadata(Self::collection_kind()),
83                }),
84                metadata: location
85                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
86            },
87        );
88
89        from_previous_tick.or(initial.filter_if_some(location.optional_first_tick(q!(()))))
90    }
91}
92
93impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
94where
95    L: Location<'a>,
96{
97    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
98        assert_eq!(
99            Location::id(&self.location),
100            expected_location,
101            "locations do not match"
102        );
103        self.location
104            .flow_state()
105            .borrow_mut()
106            .push_root(HydroRoot::CycleSink {
107                ident,
108                input: Box::new(self.ir_node.into_inner()),
109                op_metadata: HydroIrOpMetadata::new(),
110            });
111    }
112}
113
114impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
115where
116    L: Location<'a>,
117{
118    type Location = Tick<L>;
119
120    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
121        Optional::new(
122            location.clone(),
123            HydroNode::CycleSource {
124                ident,
125                metadata: location.new_node_metadata(Self::collection_kind()),
126            },
127        )
128    }
129}
130
131impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
132where
133    L: Location<'a>,
134{
135    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
136        assert_eq!(
137            Location::id(&self.location),
138            expected_location,
139            "locations do not match"
140        );
141        self.location
142            .flow_state()
143            .borrow_mut()
144            .push_root(HydroRoot::CycleSink {
145                ident,
146                input: Box::new(self.ir_node.into_inner()),
147                op_metadata: HydroIrOpMetadata::new(),
148            });
149    }
150}
151
152impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
153where
154    L: Location<'a> + NoTick,
155{
156    type Location = L;
157
158    fn create_source(ident: syn::Ident, location: L) -> Self {
159        Optional::new(
160            location.clone(),
161            HydroNode::CycleSource {
162                ident,
163                metadata: location.new_node_metadata(Self::collection_kind()),
164            },
165        )
166    }
167}
168
169impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
170where
171    L: Location<'a> + NoTick,
172{
173    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
174        assert_eq!(
175            Location::id(&self.location),
176            expected_location,
177            "locations do not match"
178        );
179        self.location
180            .flow_state()
181            .borrow_mut()
182            .push_root(HydroRoot::CycleSink {
183                ident,
184                input: Box::new(self.ir_node.into_inner()),
185                op_metadata: HydroIrOpMetadata::new(),
186            });
187    }
188}
189
190impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
191where
192    L: Location<'a>,
193{
194    fn from(singleton: Optional<T, L, Bounded>) -> Self {
195        Optional::new(singleton.location, singleton.ir_node.into_inner())
196    }
197}
198
199impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
200where
201    L: Location<'a>,
202{
203    fn from(singleton: Singleton<T, L, B>) -> Self {
204        Optional::new(
205            singleton.location.clone(),
206            HydroNode::Cast {
207                inner: Box::new(singleton.ir_node.into_inner()),
208                metadata: singleton
209                    .location
210                    .new_node_metadata(Self::collection_kind()),
211            },
212        )
213    }
214}
215
216#[cfg(stageleft_runtime)]
217fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
218    me: Optional<T, L, B>,
219    other: Optional<O, L, B>,
220) -> Optional<(T, O), L, B> {
221    check_matching_location(&me.location, &other.location);
222
223    Optional::new(
224        me.location.clone(),
225        HydroNode::CrossSingleton {
226            left: Box::new(me.ir_node.into_inner()),
227            right: Box::new(other.ir_node.into_inner()),
228            metadata: me
229                .location
230                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
231        },
232    )
233}
234
235#[cfg(stageleft_runtime)]
236fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
237    me: Optional<T, L, B>,
238    other: Optional<T, L, B>,
239) -> Optional<T, L, B> {
240    check_matching_location(&me.location, &other.location);
241
242    Optional::new(
243        me.location.clone(),
244        HydroNode::ChainFirst {
245            first: Box::new(me.ir_node.into_inner()),
246            second: Box::new(other.ir_node.into_inner()),
247            metadata: me
248                .location
249                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
250        },
251    )
252}
253
254impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
255where
256    T: Clone,
257    L: Location<'a>,
258{
259    fn clone(&self) -> Self {
260        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
261            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
262            *self.ir_node.borrow_mut() = HydroNode::Tee {
263                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
264                metadata: self.location.new_node_metadata(Self::collection_kind()),
265            };
266        }
267
268        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
269            Optional {
270                location: self.location.clone(),
271                ir_node: HydroNode::Tee {
272                    inner: TeeNode(inner.0.clone()),
273                    metadata: metadata.clone(),
274                }
275                .into(),
276                _phantom: PhantomData,
277            }
278        } else {
279            unreachable!()
280        }
281    }
282}
283
284impl<'a, T, L, B: Boundedness> Optional<T, L, B>
285where
286    L: Location<'a>,
287{
288    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
289        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
290        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
291        Optional {
292            location,
293            ir_node: RefCell::new(ir_node),
294            _phantom: PhantomData,
295        }
296    }
297
298    pub(crate) fn collection_kind() -> CollectionKind {
299        CollectionKind::Optional {
300            bound: B::BOUND_KIND,
301            element_type: stageleft::quote_type::<T>().into(),
302        }
303    }
304
305    /// Returns the [`Location`] where this optional is being materialized.
306    pub fn location(&self) -> &L {
307        &self.location
308    }
309
310    /// Transforms the optional value by applying a function `f` to it,
311    /// continuously as the input is updated.
312    ///
313    /// Whenever the optional is empty, the output optional is also empty.
314    ///
315    /// # Example
316    /// ```rust
317    /// # #[cfg(feature = "deploy")] {
318    /// # use hydro_lang::prelude::*;
319    /// # use futures::StreamExt;
320    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
321    /// let tick = process.tick();
322    /// let optional = tick.optional_first_tick(q!(1));
323    /// optional.map(q!(|v| v + 1)).all_ticks()
324    /// # }, |mut stream| async move {
325    /// // 2
326    /// # assert_eq!(stream.next().await.unwrap(), 2);
327    /// # }));
328    /// # }
329    /// ```
330    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
331    where
332        F: Fn(T) -> U + 'a,
333    {
334        let f = f.splice_fn1_ctx(&self.location).into();
335        Optional::new(
336            self.location.clone(),
337            HydroNode::Map {
338                f,
339                input: Box::new(self.ir_node.into_inner()),
340                metadata: self
341                    .location
342                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
343            },
344        )
345    }
346
347    /// Transforms the optional value by applying a function `f` to it and then flattening
348    /// the result into a stream, preserving the order of elements.
349    ///
350    /// If the optional is empty, the output stream is also empty. If the optional contains
351    /// a value, `f` is applied to produce an iterator, and all items from that iterator
352    /// are emitted in the output stream in deterministic order.
353    ///
354    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
355    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
356    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
357    ///
358    /// # Example
359    /// ```rust
360    /// # #[cfg(feature = "deploy")] {
361    /// # use hydro_lang::prelude::*;
362    /// # use futures::StreamExt;
363    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
364    /// let tick = process.tick();
365    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
366    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
367    /// # }, |mut stream| async move {
368    /// // 1, 2, 3
369    /// # for w in vec![1, 2, 3] {
370    /// #     assert_eq!(stream.next().await.unwrap(), w);
371    /// # }
372    /// # }));
373    /// # }
374    /// ```
375    pub fn flat_map_ordered<U, I, F>(
376        self,
377        f: impl IntoQuotedMut<'a, F, L>,
378    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
379    where
380        I: IntoIterator<Item = U>,
381        F: Fn(T) -> I + 'a,
382    {
383        let f = f.splice_fn1_ctx(&self.location).into();
384        Stream::new(
385            self.location.clone(),
386            HydroNode::FlatMap {
387                f,
388                input: Box::new(self.ir_node.into_inner()),
389                metadata: self.location.new_node_metadata(
390                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
391                ),
392            },
393        )
394    }
395
396    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
397    /// for the output type `I` to produce items in any order.
398    ///
399    /// If the optional is empty, the output stream is also empty. If the optional contains
400    /// a value, `f` is applied to produce an iterator, and all items from that iterator
401    /// are emitted in the output stream in non-deterministic order.
402    ///
403    /// # Example
404    /// ```rust
405    /// # #[cfg(feature = "deploy")] {
406    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
407    /// # use futures::StreamExt;
408    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
409    /// let tick = process.tick();
410    /// let optional = tick.optional_first_tick(q!(
411    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
412    /// ));
413    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
414    /// # }, |mut stream| async move {
415    /// // 1, 2, 3, but in no particular order
416    /// # let mut results = Vec::new();
417    /// # for _ in 0..3 {
418    /// #     results.push(stream.next().await.unwrap());
419    /// # }
420    /// # results.sort();
421    /// # assert_eq!(results, vec![1, 2, 3]);
422    /// # }));
423    /// # }
424    /// ```
425    pub fn flat_map_unordered<U, I, F>(
426        self,
427        f: impl IntoQuotedMut<'a, F, L>,
428    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
429    where
430        I: IntoIterator<Item = U>,
431        F: Fn(T) -> I + 'a,
432    {
433        let f = f.splice_fn1_ctx(&self.location).into();
434        Stream::new(
435            self.location.clone(),
436            HydroNode::FlatMap {
437                f,
438                input: Box::new(self.ir_node.into_inner()),
439                metadata: self
440                    .location
441                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
442            },
443        )
444    }
445
446    /// Flattens the optional value into a stream, preserving the order of elements.
447    ///
448    /// If the optional is empty, the output stream is also empty. If the optional contains
449    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
450    /// in the output stream in deterministic order.
451    ///
452    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
453    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
454    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
455    ///
456    /// # Example
457    /// ```rust
458    /// # #[cfg(feature = "deploy")] {
459    /// # use hydro_lang::prelude::*;
460    /// # use futures::StreamExt;
461    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
462    /// let tick = process.tick();
463    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
464    /// optional.flatten_ordered().all_ticks()
465    /// # }, |mut stream| async move {
466    /// // 1, 2, 3
467    /// # for w in vec![1, 2, 3] {
468    /// #     assert_eq!(stream.next().await.unwrap(), w);
469    /// # }
470    /// # }));
471    /// # }
472    /// ```
473    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
474    where
475        T: IntoIterator<Item = U>,
476    {
477        self.flat_map_ordered(q!(|v| v))
478    }
479
480    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
481    /// for the element type `T` to produce items in any order.
482    ///
483    /// If the optional is empty, the output stream is also empty. If the optional contains
484    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
485    /// in the output stream in non-deterministic order.
486    ///
487    /// # Example
488    /// ```rust
489    /// # #[cfg(feature = "deploy")] {
490    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
491    /// # use futures::StreamExt;
492    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
493    /// let tick = process.tick();
494    /// let optional = tick.optional_first_tick(q!(
495    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
496    /// ));
497    /// optional.flatten_unordered().all_ticks()
498    /// # }, |mut stream| async move {
499    /// // 1, 2, 3, but in no particular order
500    /// # let mut results = Vec::new();
501    /// # for _ in 0..3 {
502    /// #     results.push(stream.next().await.unwrap());
503    /// # }
504    /// # results.sort();
505    /// # assert_eq!(results, vec![1, 2, 3]);
506    /// # }));
507    /// # }
508    /// ```
509    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
510    where
511        T: IntoIterator<Item = U>,
512    {
513        self.flat_map_unordered(q!(|v| v))
514    }
515
516    /// Creates an optional containing only the value if it satisfies a predicate `f`.
517    ///
518    /// If the optional is empty, the output optional is also empty. If the optional contains
519    /// a value and the predicate returns `true`, the output optional contains the same value.
520    /// If the predicate returns `false`, the output optional is empty.
521    ///
522    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
523    /// not modify or take ownership of the value. If you need to modify the value while filtering
524    /// use [`Optional::filter_map`] instead.
525    ///
526    /// # Example
527    /// ```rust
528    /// # #[cfg(feature = "deploy")] {
529    /// # use hydro_lang::prelude::*;
530    /// # use futures::StreamExt;
531    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
532    /// let tick = process.tick();
533    /// let optional = tick.optional_first_tick(q!(5));
534    /// optional.filter(q!(|&x| x > 3)).all_ticks()
535    /// # }, |mut stream| async move {
536    /// // 5
537    /// # assert_eq!(stream.next().await.unwrap(), 5);
538    /// # }));
539    /// # }
540    /// ```
541    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
542    where
543        F: Fn(&T) -> bool + 'a,
544    {
545        let f = f.splice_fn1_borrow_ctx(&self.location).into();
546        Optional::new(
547            self.location.clone(),
548            HydroNode::Filter {
549                f,
550                input: Box::new(self.ir_node.into_inner()),
551                metadata: self.location.new_node_metadata(Self::collection_kind()),
552            },
553        )
554    }
555
556    /// An operator that both filters and maps. It yields only the value if the supplied
557    /// closure `f` returns `Some(value)`.
558    ///
559    /// If the optional is empty, the output optional is also empty. If the optional contains
560    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
561    /// If the closure returns `None`, the output optional is empty.
562    ///
563    /// # Example
564    /// ```rust
565    /// # #[cfg(feature = "deploy")] {
566    /// # use hydro_lang::prelude::*;
567    /// # use futures::StreamExt;
568    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
569    /// let tick = process.tick();
570    /// let optional = tick.optional_first_tick(q!("42"));
571    /// optional
572    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
573    ///     .all_ticks()
574    /// # }, |mut stream| async move {
575    /// // 42
576    /// # assert_eq!(stream.next().await.unwrap(), 42);
577    /// # }));
578    /// # }
579    /// ```
580    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
581    where
582        F: Fn(T) -> Option<U> + 'a,
583    {
584        let f = f.splice_fn1_ctx(&self.location).into();
585        Optional::new(
586            self.location.clone(),
587            HydroNode::FilterMap {
588                f,
589                input: Box::new(self.ir_node.into_inner()),
590                metadata: self
591                    .location
592                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
593            },
594        )
595    }
596
597    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
598    ///
599    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
600    /// non-null. This is useful for combining several pieces of state together.
601    ///
602    /// # Example
603    /// ```rust
604    /// # #[cfg(feature = "deploy")] {
605    /// # use hydro_lang::prelude::*;
606    /// # use futures::StreamExt;
607    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
608    /// let tick = process.tick();
609    /// let numbers = process
610    ///   .source_iter(q!(vec![123, 456, 789]))
611    ///   .batch(&tick, nondet!(/** test */));
612    /// let min = numbers.clone().min(); // Optional
613    /// let max = numbers.max(); // Optional
614    /// min.zip(max).all_ticks()
615    /// # }, |mut stream| async move {
616    /// // [(123, 789)]
617    /// # for w in vec![(123, 789)] {
618    /// #     assert_eq!(stream.next().await.unwrap(), w);
619    /// # }
620    /// # }));
621    /// # }
622    /// ```
623    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
624    where
625        O: Clone,
626    {
627        let other: Optional<O, L, B> = other.into();
628        check_matching_location(&self.location, &other.location);
629
630        if L::is_top_level()
631            && let Some(tick) = self.location.try_tick()
632        {
633            let out = zip_inside_tick(
634                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
635                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
636            )
637            .latest();
638
639            Optional::new(out.location, out.ir_node.into_inner())
640        } else {
641            zip_inside_tick(self, other)
642        }
643    }
644
645    /// Passes through `self` when it has a value, otherwise passes through `other`.
646    ///
647    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
648    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
649    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
650    ///
651    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
652    /// of the inputs change (including to/from null states).
653    ///
654    /// # Example
655    /// ```rust
656    /// # #[cfg(feature = "deploy")] {
657    /// # use hydro_lang::prelude::*;
658    /// # use futures::StreamExt;
659    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
660    /// let tick = process.tick();
661    /// // ticks are lazy by default, forces the second tick to run
662    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
663    ///
664    /// let some_first_tick = tick.optional_first_tick(q!(123));
665    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
666    /// some_first_tick.or(some_second_tick).all_ticks()
667    /// # }, |mut stream| async move {
668    /// // [123 /* first tick */, 456 /* second tick */]
669    /// # for w in vec![123, 456] {
670    /// #     assert_eq!(stream.next().await.unwrap(), w);
671    /// # }
672    /// # }));
673    /// # }
674    /// ```
675    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
676        check_matching_location(&self.location, &other.location);
677
678        if L::is_top_level()
679            && let Some(tick) = self.location.try_tick()
680        {
681            let out = or_inside_tick(
682                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
683                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
684            )
685            .latest();
686
687            Optional::new(out.location, out.ir_node.into_inner())
688        } else {
689            Optional::new(
690                self.location.clone(),
691                HydroNode::ChainFirst {
692                    first: Box::new(self.ir_node.into_inner()),
693                    second: Box::new(other.ir_node.into_inner()),
694                    metadata: self.location.new_node_metadata(Self::collection_kind()),
695                },
696            )
697        }
698    }
699
700    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
701    ///
702    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
703    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
704    ///
705    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
706    /// of the inputs change (including to/from null states).
707    ///
708    /// # Example
709    /// ```rust
710    /// # #[cfg(feature = "deploy")] {
711    /// # use hydro_lang::prelude::*;
712    /// # use futures::StreamExt;
713    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
714    /// let tick = process.tick();
715    /// // ticks are lazy by default, forces the later ticks to run
716    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
717    ///
718    /// let some_first_tick = tick.optional_first_tick(q!(123));
719    /// some_first_tick
720    ///     .unwrap_or(tick.singleton(q!(456)))
721    ///     .all_ticks()
722    /// # }, |mut stream| async move {
723    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
724    /// # for w in vec![123, 456, 456, 456] {
725    /// #     assert_eq!(stream.next().await.unwrap(), w);
726    /// # }
727    /// # }));
728    /// # }
729    /// ```
730    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
731        let res_option = self.or(other.into());
732        Singleton::new(
733            res_option.location.clone(),
734            HydroNode::Cast {
735                inner: Box::new(res_option.ir_node.into_inner()),
736                metadata: res_option
737                    .location
738                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
739            },
740        )
741    }
742
743    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
744    ///
745    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
746    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
747    /// so that Hydro can skip any computation on null values.
748    ///
749    /// # Example
750    /// ```rust
751    /// # #[cfg(feature = "deploy")] {
752    /// # use hydro_lang::prelude::*;
753    /// # use futures::StreamExt;
754    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
755    /// let tick = process.tick();
756    /// // ticks are lazy by default, forces the later ticks to run
757    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
758    ///
759    /// let some_first_tick = tick.optional_first_tick(q!(123));
760    /// some_first_tick.into_singleton().all_ticks()
761    /// # }, |mut stream| async move {
762    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
763    /// # for w in vec![Some(123), None, None, None] {
764    /// #     assert_eq!(stream.next().await.unwrap(), w);
765    /// # }
766    /// # }));
767    /// # }
768    /// ```
769    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
770    where
771        T: Clone,
772    {
773        let none: syn::Expr = parse_quote!(::std::option::Option::None);
774
775        let none_singleton = Singleton::new(
776            self.location.clone(),
777            HydroNode::SingletonSource {
778                value: none.into(),
779                metadata: self
780                    .location
781                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
782            },
783        );
784
785        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
786    }
787
788    /// An operator which allows you to "name" a `HydroNode`.
789    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
790    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
791        {
792            let mut node = self.ir_node.borrow_mut();
793            let metadata = node.metadata_mut();
794            metadata.tag = Some(name.to_string());
795        }
796        self
797    }
798}
799
800impl<'a, T, L> Optional<T, L, Bounded>
801where
802    L: Location<'a>,
803{
804    /// Filters this optional, passing through the optional value if it is non-null **and** the
805    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
806    ///
807    /// Useful for conditionally processing, such as only emitting an optional's value outside
808    /// a tick if some other condition is satisfied.
809    ///
810    /// # Example
811    /// ```rust
812    /// # #[cfg(feature = "deploy")] {
813    /// # use hydro_lang::prelude::*;
814    /// # use futures::StreamExt;
815    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816    /// let tick = process.tick();
817    /// // ticks are lazy by default, forces the second tick to run
818    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
819    ///
820    /// let batch_first_tick = process
821    ///   .source_iter(q!(vec![]))
822    ///   .batch(&tick, nondet!(/** test */));
823    /// let batch_second_tick = process
824    ///   .source_iter(q!(vec![456]))
825    ///   .batch(&tick, nondet!(/** test */))
826    ///   .defer_tick(); // appears on the second tick
827    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
828    /// batch_first_tick.chain(batch_second_tick).first()
829    ///   .filter_if_some(some_on_first_tick)
830    ///   .unwrap_or(tick.singleton(q!(789)))
831    ///   .all_ticks()
832    /// # }, |mut stream| async move {
833    /// // [789, 789]
834    /// # for w in vec![789, 789] {
835    /// #     assert_eq!(stream.next().await.unwrap(), w);
836    /// # }
837    /// # }));
838    /// # }
839    /// ```
840    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
841        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
842    }
843
844    /// Filters this optional, passing through the optional value if it is non-null **and** the
845    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
846    ///
847    /// Useful for conditionally processing, such as only emitting an optional's value outside
848    /// a tick if some other condition is satisfied.
849    ///
850    /// # Example
851    /// ```rust
852    /// # #[cfg(feature = "deploy")] {
853    /// # use hydro_lang::prelude::*;
854    /// # use futures::StreamExt;
855    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
856    /// let tick = process.tick();
857    /// // ticks are lazy by default, forces the second tick to run
858    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
859    ///
860    /// let batch_first_tick = process
861    ///   .source_iter(q!(vec![]))
862    ///   .batch(&tick, nondet!(/** test */));
863    /// let batch_second_tick = process
864    ///   .source_iter(q!(vec![456]))
865    ///   .batch(&tick, nondet!(/** test */))
866    ///   .defer_tick(); // appears on the second tick
867    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
868    /// batch_first_tick.chain(batch_second_tick).first()
869    ///   .filter_if_none(some_on_first_tick)
870    ///   .unwrap_or(tick.singleton(q!(789)))
871    ///   .all_ticks()
872    /// # }, |mut stream| async move {
873    /// // [789, 789]
874    /// # for w in vec![789, 456] {
875    /// #     assert_eq!(stream.next().await.unwrap(), w);
876    /// # }
877    /// # }));
878    /// # }
879    /// ```
880    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
881        self.filter_if_some(
882            other
883                .map(q!(|_| ()))
884                .into_singleton()
885                .filter(q!(|o| o.is_none())),
886        )
887    }
888
889    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
890    ///
891    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
892    /// having a value, such as only releasing a piece of state if the node is the leader.
893    ///
894    /// # Example
895    /// ```rust
896    /// # #[cfg(feature = "deploy")] {
897    /// # use hydro_lang::prelude::*;
898    /// # use futures::StreamExt;
899    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
900    /// let tick = process.tick();
901    /// // ticks are lazy by default, forces the second tick to run
902    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
903    ///
904    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
905    /// some_on_first_tick
906    ///     .if_some_then(tick.singleton(q!(456)))
907    ///     .unwrap_or(tick.singleton(q!(123)))
908    /// # .all_ticks()
909    /// # }, |mut stream| async move {
910    /// // 456 (first tick) ~> 123 (second tick onwards)
911    /// # for w in vec![456, 123, 123] {
912    /// #     assert_eq!(stream.next().await.unwrap(), w);
913    /// # }
914    /// # }));
915    /// # }
916    /// ```
917    pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
918        value.filter_if_some(self)
919    }
920}
921
922impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
923where
924    L: Location<'a> + NoTick,
925{
926    /// Returns an optional value corresponding to the latest snapshot of the optional
927    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
928    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
929    /// all snapshots of this optional into the atomic-associated tick will observe the
930    /// same value each tick.
931    ///
932    /// # Non-Determinism
933    /// Because this picks a snapshot of a optional whose value is continuously changing,
934    /// the output optional has a non-deterministic value since the snapshot can be at an
935    /// arbitrary point in time.
936    pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
937        Optional::new(
938            self.location.clone().tick,
939            HydroNode::Batch {
940                inner: Box::new(self.ir_node.into_inner()),
941                metadata: self
942                    .location
943                    .tick
944                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
945            },
946        )
947    }
948
949    /// Returns this optional back into a top-level, asynchronous execution context where updates
950    /// to the value will be asynchronously propagated.
951    pub fn end_atomic(self) -> Optional<T, L, B> {
952        Optional::new(
953            self.location.tick.l.clone(),
954            HydroNode::EndAtomic {
955                inner: Box::new(self.ir_node.into_inner()),
956                metadata: self
957                    .location
958                    .tick
959                    .l
960                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
961            },
962        )
963    }
964}
965
966impl<'a, T, L, B: Boundedness> Optional<T, L, B>
967where
968    L: Location<'a>,
969{
970    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
971    /// will observe the same version of the value and will be executed synchronously before any
972    /// outputs are yielded (in [`Optional::end_atomic`]).
973    ///
974    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
975    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
976    /// a different version).
977    ///
978    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
979    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
980    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
981    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
982        let out_location = Atomic { tick: tick.clone() };
983        Optional::new(
984            out_location.clone(),
985            HydroNode::BeginAtomic {
986                inner: Box::new(self.ir_node.into_inner()),
987                metadata: out_location
988                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
989            },
990        )
991    }
992
993    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
994    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
995    /// relevant data that contributed to the snapshot at tick `t`.
996    ///
997    /// # Non-Determinism
998    /// Because this picks a snapshot of a optional whose value is continuously changing,
999    /// the output optional has a non-deterministic value since the snapshot can be at an
1000    /// arbitrary point in time.
1001    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1002        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1003        Optional::new(
1004            tick.clone(),
1005            HydroNode::Batch {
1006                inner: Box::new(self.ir_node.into_inner()),
1007                metadata: tick
1008                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1009            },
1010        )
1011    }
1012
1013    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1014    /// with order corresponding to increasing prefixes of data contributing to the optional.
1015    ///
1016    /// # Non-Determinism
1017    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1018    /// to non-deterministic batching and arrival of inputs, the output stream is
1019    /// non-deterministic.
1020    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1021    where
1022        L: NoTick,
1023    {
1024        let tick = self.location.tick();
1025        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
1026    }
1027
1028    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1029    /// value taken at various points in time. Because the input optional may be
1030    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1031    /// represent the value of the optional given some prefix of the streams leading up to
1032    /// it.
1033    ///
1034    /// # Non-Determinism
1035    /// The output stream is non-deterministic in which elements are sampled, since this
1036    /// is controlled by a clock.
1037    pub fn sample_every(
1038        self,
1039        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1040        nondet: NonDet,
1041    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1042    where
1043        L: NoTick + NoAtomic,
1044    {
1045        let samples = self.location.source_interval(interval, nondet);
1046        let tick = self.location.tick();
1047
1048        self.snapshot(&tick, nondet)
1049            .filter_if_some(samples.batch(&tick, nondet).first())
1050            .all_ticks()
1051            .weakest_retries()
1052    }
1053}
1054
1055impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1056where
1057    L: Location<'a>,
1058{
1059    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1060    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1061    /// null values).
1062    ///
1063    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1064    /// producing one element in the output for each (non-null) tick. This is useful for batched
1065    /// computations, where the results from each tick must be combined together.
1066    ///
1067    /// # Example
1068    /// ```rust
1069    /// # #[cfg(feature = "deploy")] {
1070    /// # use hydro_lang::prelude::*;
1071    /// # use futures::StreamExt;
1072    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1073    /// # let tick = process.tick();
1074    /// # // ticks are lazy by default, forces the second tick to run
1075    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1076    /// # let batch_first_tick = process
1077    /// #   .source_iter(q!(vec![]))
1078    /// #   .batch(&tick, nondet!(/** test */));
1079    /// # let batch_second_tick = process
1080    /// #   .source_iter(q!(vec![1, 2, 3]))
1081    /// #   .batch(&tick, nondet!(/** test */))
1082    /// #   .defer_tick(); // appears on the second tick
1083    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1084    /// input_batch // first tick: [], second tick: [1, 2, 3]
1085    ///     .max()
1086    ///     .all_ticks()
1087    /// # }, |mut stream| async move {
1088    /// // [3]
1089    /// # for w in vec![3] {
1090    /// #     assert_eq!(stream.next().await.unwrap(), w);
1091    /// # }
1092    /// # }));
1093    /// # }
1094    /// ```
1095    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1096        self.into_stream().all_ticks()
1097    }
1098
1099    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1100    /// which will stream the value computed in _each_ tick as a separate stream element.
1101    ///
1102    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1103    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1104    /// optional's [`Tick`] context.
1105    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1106        self.into_stream().all_ticks_atomic()
1107    }
1108
1109    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1110    /// be asynchronously updated with the latest value of the optional inside the tick, including
1111    /// whether the optional is null or not.
1112    ///
1113    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1114    /// tick that tracks the inner value. This is useful for getting the value as of the
1115    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1116    ///
1117    /// # Example
1118    /// ```rust
1119    /// # #[cfg(feature = "deploy")] {
1120    /// # use hydro_lang::prelude::*;
1121    /// # use futures::StreamExt;
1122    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1123    /// # let tick = process.tick();
1124    /// # // ticks are lazy by default, forces the second tick to run
1125    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1126    /// # let batch_first_tick = process
1127    /// #   .source_iter(q!(vec![]))
1128    /// #   .batch(&tick, nondet!(/** test */));
1129    /// # let batch_second_tick = process
1130    /// #   .source_iter(q!(vec![1, 2, 3]))
1131    /// #   .batch(&tick, nondet!(/** test */))
1132    /// #   .defer_tick(); // appears on the second tick
1133    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1134    /// input_batch // first tick: [], second tick: [1, 2, 3]
1135    ///     .max()
1136    ///     .latest()
1137    /// # .into_singleton()
1138    /// # .sample_eager(nondet!(/** test */))
1139    /// # }, |mut stream| async move {
1140    /// // asynchronously changes from None ~> 3
1141    /// # for w in vec![None, Some(3)] {
1142    /// #     assert_eq!(stream.next().await.unwrap(), w);
1143    /// # }
1144    /// # }));
1145    /// # }
1146    /// ```
1147    pub fn latest(self) -> Optional<T, L, Unbounded> {
1148        Optional::new(
1149            self.location.outer().clone(),
1150            HydroNode::YieldConcat {
1151                inner: Box::new(self.ir_node.into_inner()),
1152                metadata: self
1153                    .location
1154                    .outer()
1155                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1156            },
1157        )
1158    }
1159
1160    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1161    /// be updated with the latest value of the optional inside the tick.
1162    ///
1163    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1164    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1165    /// optional's [`Tick`] context.
1166    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1167        let out_location = Atomic {
1168            tick: self.location.clone(),
1169        };
1170
1171        Optional::new(
1172            out_location.clone(),
1173            HydroNode::YieldConcat {
1174                inner: Box::new(self.ir_node.into_inner()),
1175                metadata: out_location
1176                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1177            },
1178        )
1179    }
1180
1181    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1182    /// always has the state of `self` at tick `T - 1`.
1183    ///
1184    /// At tick `0`, the output optional is null, since there is no previous tick.
1185    ///
1186    /// This operator enables stateful iterative processing with ticks, by sending data from one
1187    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1188    ///
1189    /// # Example
1190    /// ```rust
1191    /// # #[cfg(feature = "deploy")] {
1192    /// # use hydro_lang::prelude::*;
1193    /// # use futures::StreamExt;
1194    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1195    /// let tick = process.tick();
1196    /// // ticks are lazy by default, forces the second tick to run
1197    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1198    ///
1199    /// let batch_first_tick = process
1200    ///   .source_iter(q!(vec![1, 2]))
1201    ///   .batch(&tick, nondet!(/** test */));
1202    /// let batch_second_tick = process
1203    ///   .source_iter(q!(vec![3, 4]))
1204    ///   .batch(&tick, nondet!(/** test */))
1205    ///   .defer_tick(); // appears on the second tick
1206    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1207    ///   .reduce(q!(|state, v| *state += v));
1208    ///
1209    /// current_tick_sum.clone().into_singleton().zip(
1210    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1211    /// ).all_ticks()
1212    /// # }, |mut stream| async move {
1213    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1214    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1215    /// #     assert_eq!(stream.next().await.unwrap(), w);
1216    /// # }
1217    /// # }));
1218    /// # }
1219    /// ```
1220    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1221        Optional::new(
1222            self.location.clone(),
1223            HydroNode::DeferTick {
1224                input: Box::new(self.ir_node.into_inner()),
1225                metadata: self.location.new_node_metadata(Self::collection_kind()),
1226            },
1227        )
1228    }
1229
1230    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1231    /// non-null. Otherwise, the stream is empty.
1232    ///
1233    /// # Example
1234    /// ```rust
1235    /// # #[cfg(feature = "deploy")] {
1236    /// # use hydro_lang::prelude::*;
1237    /// # use futures::StreamExt;
1238    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1239    /// # let tick = process.tick();
1240    /// # // ticks are lazy by default, forces the second tick to run
1241    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1242    /// # let batch_first_tick = process
1243    /// #   .source_iter(q!(vec![]))
1244    /// #   .batch(&tick, nondet!(/** test */));
1245    /// # let batch_second_tick = process
1246    /// #   .source_iter(q!(vec![123, 456]))
1247    /// #   .batch(&tick, nondet!(/** test */))
1248    /// #   .defer_tick(); // appears on the second tick
1249    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1250    /// input_batch // first tick: [], second tick: [123, 456]
1251    ///     .clone()
1252    ///     .max()
1253    ///     .into_stream()
1254    ///     .chain(input_batch)
1255    ///     .all_ticks()
1256    /// # }, |mut stream| async move {
1257    /// // [456, 123, 456]
1258    /// # for w in vec![456, 123, 456] {
1259    /// #     assert_eq!(stream.next().await.unwrap(), w);
1260    /// # }
1261    /// # }));
1262    /// # }
1263    /// ```
1264    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1265        Stream::new(
1266            self.location.clone(),
1267            HydroNode::Cast {
1268                inner: Box::new(self.ir_node.into_inner()),
1269                metadata: self.location.new_node_metadata(Stream::<
1270                    T,
1271                    Tick<L>,
1272                    Bounded,
1273                    TotalOrder,
1274                    ExactlyOnce,
1275                >::collection_kind()),
1276            },
1277        )
1278    }
1279}
1280
1281#[cfg(feature = "deploy")]
1282#[cfg(test)]
1283mod tests {
1284    use futures::StreamExt;
1285    use hydro_deploy::Deployment;
1286    use stageleft::q;
1287
1288    use super::Optional;
1289    use crate::compile::builder::FlowBuilder;
1290    use crate::location::Location;
1291    use crate::nondet::nondet;
1292
1293    #[tokio::test]
1294    async fn optional_or_cardinality() {
1295        let mut deployment = Deployment::new();
1296
1297        let flow = FlowBuilder::new();
1298        let node = flow.process::<()>();
1299        let external = flow.external::<()>();
1300
1301        let node_tick = node.tick();
1302        let tick_singleton = node_tick.singleton(q!(123));
1303        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1304        let counts = tick_optional_inhabited
1305            .clone()
1306            .or(tick_optional_inhabited)
1307            .into_stream()
1308            .count()
1309            .all_ticks()
1310            .send_bincode_external(&external);
1311
1312        let nodes = flow
1313            .with_process(&node, deployment.Localhost())
1314            .with_external(&external, deployment.Localhost())
1315            .deploy(&mut deployment);
1316
1317        deployment.deploy().await.unwrap();
1318
1319        let mut external_out = nodes.connect(counts).await;
1320
1321        deployment.start().await.unwrap();
1322
1323        assert_eq!(external_out.next().await.unwrap(), 1);
1324    }
1325
1326    #[tokio::test]
1327    async fn into_singleton_top_level_none_cardinality() {
1328        let mut deployment = Deployment::new();
1329
1330        let flow = FlowBuilder::new();
1331        let node = flow.process::<()>();
1332        let external = flow.external::<()>();
1333
1334        let node_tick = node.tick();
1335        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1336        let into_singleton = top_level_none.into_singleton();
1337
1338        let tick_driver = node.spin();
1339
1340        let counts = into_singleton
1341            .snapshot(&node_tick, nondet!(/** test */))
1342            .into_stream()
1343            .count()
1344            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1345            .map(q!(|(c, _)| c))
1346            .all_ticks()
1347            .send_bincode_external(&external);
1348
1349        let nodes = flow
1350            .with_process(&node, deployment.Localhost())
1351            .with_external(&external, deployment.Localhost())
1352            .deploy(&mut deployment);
1353
1354        deployment.deploy().await.unwrap();
1355
1356        let mut external_out = nodes.connect(counts).await;
1357
1358        deployment.start().await.unwrap();
1359
1360        assert_eq!(external_out.next().await.unwrap(), 1);
1361        assert_eq!(external_out.next().await.unwrap(), 1);
1362        assert_eq!(external_out.next().await.unwrap(), 1);
1363    }
1364}