hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
45where
46 L: Location<'a>,
47{
48 fn defer_tick(self) -> Self {
49 Optional::defer_tick(self)
50 }
51}
52
53impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 type Location = Tick<L>;
58
59 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
60 Optional::new(
61 location.clone(),
62 HydroNode::CycleSource {
63 ident,
64 metadata: location.new_node_metadata(Self::collection_kind()),
65 },
66 )
67 }
68}
69
70impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
71where
72 L: Location<'a>,
73{
74 type Location = Tick<L>;
75
76 fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
77 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
78 location.clone(),
79 HydroNode::DeferTick {
80 input: Box::new(HydroNode::CycleSource {
81 ident,
82 metadata: location.new_node_metadata(Self::collection_kind()),
83 }),
84 metadata: location
85 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
86 },
87 );
88
89 from_previous_tick.or(initial.filter_if_some(location.optional_first_tick(q!(()))))
90 }
91}
92
93impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
94where
95 L: Location<'a>,
96{
97 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
98 assert_eq!(
99 Location::id(&self.location),
100 expected_location,
101 "locations do not match"
102 );
103 self.location
104 .flow_state()
105 .borrow_mut()
106 .push_root(HydroRoot::CycleSink {
107 ident,
108 input: Box::new(self.ir_node.into_inner()),
109 op_metadata: HydroIrOpMetadata::new(),
110 });
111 }
112}
113
114impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
115where
116 L: Location<'a>,
117{
118 type Location = Tick<L>;
119
120 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
121 Optional::new(
122 location.clone(),
123 HydroNode::CycleSource {
124 ident,
125 metadata: location.new_node_metadata(Self::collection_kind()),
126 },
127 )
128 }
129}
130
131impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
132where
133 L: Location<'a>,
134{
135 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
136 assert_eq!(
137 Location::id(&self.location),
138 expected_location,
139 "locations do not match"
140 );
141 self.location
142 .flow_state()
143 .borrow_mut()
144 .push_root(HydroRoot::CycleSink {
145 ident,
146 input: Box::new(self.ir_node.into_inner()),
147 op_metadata: HydroIrOpMetadata::new(),
148 });
149 }
150}
151
152impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
153where
154 L: Location<'a> + NoTick,
155{
156 type Location = L;
157
158 fn create_source(ident: syn::Ident, location: L) -> Self {
159 Optional::new(
160 location.clone(),
161 HydroNode::CycleSource {
162 ident,
163 metadata: location.new_node_metadata(Self::collection_kind()),
164 },
165 )
166 }
167}
168
169impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
170where
171 L: Location<'a> + NoTick,
172{
173 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
174 assert_eq!(
175 Location::id(&self.location),
176 expected_location,
177 "locations do not match"
178 );
179 self.location
180 .flow_state()
181 .borrow_mut()
182 .push_root(HydroRoot::CycleSink {
183 ident,
184 input: Box::new(self.ir_node.into_inner()),
185 op_metadata: HydroIrOpMetadata::new(),
186 });
187 }
188}
189
190impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
191where
192 L: Location<'a>,
193{
194 fn from(singleton: Optional<T, L, Bounded>) -> Self {
195 Optional::new(singleton.location, singleton.ir_node.into_inner())
196 }
197}
198
199impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
200where
201 L: Location<'a>,
202{
203 fn from(singleton: Singleton<T, L, B>) -> Self {
204 Optional::new(
205 singleton.location.clone(),
206 HydroNode::Cast {
207 inner: Box::new(singleton.ir_node.into_inner()),
208 metadata: singleton
209 .location
210 .new_node_metadata(Self::collection_kind()),
211 },
212 )
213 }
214}
215
216#[cfg(stageleft_runtime)]
217fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
218 me: Optional<T, L, B>,
219 other: Optional<O, L, B>,
220) -> Optional<(T, O), L, B> {
221 check_matching_location(&me.location, &other.location);
222
223 Optional::new(
224 me.location.clone(),
225 HydroNode::CrossSingleton {
226 left: Box::new(me.ir_node.into_inner()),
227 right: Box::new(other.ir_node.into_inner()),
228 metadata: me
229 .location
230 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
231 },
232 )
233}
234
235#[cfg(stageleft_runtime)]
236fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
237 me: Optional<T, L, B>,
238 other: Optional<T, L, B>,
239) -> Optional<T, L, B> {
240 check_matching_location(&me.location, &other.location);
241
242 Optional::new(
243 me.location.clone(),
244 HydroNode::ChainFirst {
245 first: Box::new(me.ir_node.into_inner()),
246 second: Box::new(other.ir_node.into_inner()),
247 metadata: me
248 .location
249 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
250 },
251 )
252}
253
254impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
255where
256 T: Clone,
257 L: Location<'a>,
258{
259 fn clone(&self) -> Self {
260 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
261 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
262 *self.ir_node.borrow_mut() = HydroNode::Tee {
263 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
264 metadata: self.location.new_node_metadata(Self::collection_kind()),
265 };
266 }
267
268 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
269 Optional {
270 location: self.location.clone(),
271 ir_node: HydroNode::Tee {
272 inner: TeeNode(inner.0.clone()),
273 metadata: metadata.clone(),
274 }
275 .into(),
276 _phantom: PhantomData,
277 }
278 } else {
279 unreachable!()
280 }
281 }
282}
283
284impl<'a, T, L, B: Boundedness> Optional<T, L, B>
285where
286 L: Location<'a>,
287{
288 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
289 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
290 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
291 Optional {
292 location,
293 ir_node: RefCell::new(ir_node),
294 _phantom: PhantomData,
295 }
296 }
297
298 pub(crate) fn collection_kind() -> CollectionKind {
299 CollectionKind::Optional {
300 bound: B::BOUND_KIND,
301 element_type: stageleft::quote_type::<T>().into(),
302 }
303 }
304
305 /// Returns the [`Location`] where this optional is being materialized.
306 pub fn location(&self) -> &L {
307 &self.location
308 }
309
310 /// Transforms the optional value by applying a function `f` to it,
311 /// continuously as the input is updated.
312 ///
313 /// Whenever the optional is empty, the output optional is also empty.
314 ///
315 /// # Example
316 /// ```rust
317 /// # #[cfg(feature = "deploy")] {
318 /// # use hydro_lang::prelude::*;
319 /// # use futures::StreamExt;
320 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
321 /// let tick = process.tick();
322 /// let optional = tick.optional_first_tick(q!(1));
323 /// optional.map(q!(|v| v + 1)).all_ticks()
324 /// # }, |mut stream| async move {
325 /// // 2
326 /// # assert_eq!(stream.next().await.unwrap(), 2);
327 /// # }));
328 /// # }
329 /// ```
330 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
331 where
332 F: Fn(T) -> U + 'a,
333 {
334 let f = f.splice_fn1_ctx(&self.location).into();
335 Optional::new(
336 self.location.clone(),
337 HydroNode::Map {
338 f,
339 input: Box::new(self.ir_node.into_inner()),
340 metadata: self
341 .location
342 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
343 },
344 )
345 }
346
347 /// Transforms the optional value by applying a function `f` to it and then flattening
348 /// the result into a stream, preserving the order of elements.
349 ///
350 /// If the optional is empty, the output stream is also empty. If the optional contains
351 /// a value, `f` is applied to produce an iterator, and all items from that iterator
352 /// are emitted in the output stream in deterministic order.
353 ///
354 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
355 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
356 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
357 ///
358 /// # Example
359 /// ```rust
360 /// # #[cfg(feature = "deploy")] {
361 /// # use hydro_lang::prelude::*;
362 /// # use futures::StreamExt;
363 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
364 /// let tick = process.tick();
365 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
366 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
367 /// # }, |mut stream| async move {
368 /// // 1, 2, 3
369 /// # for w in vec![1, 2, 3] {
370 /// # assert_eq!(stream.next().await.unwrap(), w);
371 /// # }
372 /// # }));
373 /// # }
374 /// ```
375 pub fn flat_map_ordered<U, I, F>(
376 self,
377 f: impl IntoQuotedMut<'a, F, L>,
378 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
379 where
380 I: IntoIterator<Item = U>,
381 F: Fn(T) -> I + 'a,
382 {
383 let f = f.splice_fn1_ctx(&self.location).into();
384 Stream::new(
385 self.location.clone(),
386 HydroNode::FlatMap {
387 f,
388 input: Box::new(self.ir_node.into_inner()),
389 metadata: self.location.new_node_metadata(
390 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
391 ),
392 },
393 )
394 }
395
396 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
397 /// for the output type `I` to produce items in any order.
398 ///
399 /// If the optional is empty, the output stream is also empty. If the optional contains
400 /// a value, `f` is applied to produce an iterator, and all items from that iterator
401 /// are emitted in the output stream in non-deterministic order.
402 ///
403 /// # Example
404 /// ```rust
405 /// # #[cfg(feature = "deploy")] {
406 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
407 /// # use futures::StreamExt;
408 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
409 /// let tick = process.tick();
410 /// let optional = tick.optional_first_tick(q!(
411 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
412 /// ));
413 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
414 /// # }, |mut stream| async move {
415 /// // 1, 2, 3, but in no particular order
416 /// # let mut results = Vec::new();
417 /// # for _ in 0..3 {
418 /// # results.push(stream.next().await.unwrap());
419 /// # }
420 /// # results.sort();
421 /// # assert_eq!(results, vec![1, 2, 3]);
422 /// # }));
423 /// # }
424 /// ```
425 pub fn flat_map_unordered<U, I, F>(
426 self,
427 f: impl IntoQuotedMut<'a, F, L>,
428 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
429 where
430 I: IntoIterator<Item = U>,
431 F: Fn(T) -> I + 'a,
432 {
433 let f = f.splice_fn1_ctx(&self.location).into();
434 Stream::new(
435 self.location.clone(),
436 HydroNode::FlatMap {
437 f,
438 input: Box::new(self.ir_node.into_inner()),
439 metadata: self
440 .location
441 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
442 },
443 )
444 }
445
446 /// Flattens the optional value into a stream, preserving the order of elements.
447 ///
448 /// If the optional is empty, the output stream is also empty. If the optional contains
449 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
450 /// in the output stream in deterministic order.
451 ///
452 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
453 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
454 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
455 ///
456 /// # Example
457 /// ```rust
458 /// # #[cfg(feature = "deploy")] {
459 /// # use hydro_lang::prelude::*;
460 /// # use futures::StreamExt;
461 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
462 /// let tick = process.tick();
463 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
464 /// optional.flatten_ordered().all_ticks()
465 /// # }, |mut stream| async move {
466 /// // 1, 2, 3
467 /// # for w in vec![1, 2, 3] {
468 /// # assert_eq!(stream.next().await.unwrap(), w);
469 /// # }
470 /// # }));
471 /// # }
472 /// ```
473 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
474 where
475 T: IntoIterator<Item = U>,
476 {
477 self.flat_map_ordered(q!(|v| v))
478 }
479
480 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
481 /// for the element type `T` to produce items in any order.
482 ///
483 /// If the optional is empty, the output stream is also empty. If the optional contains
484 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
485 /// in the output stream in non-deterministic order.
486 ///
487 /// # Example
488 /// ```rust
489 /// # #[cfg(feature = "deploy")] {
490 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
491 /// # use futures::StreamExt;
492 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
493 /// let tick = process.tick();
494 /// let optional = tick.optional_first_tick(q!(
495 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
496 /// ));
497 /// optional.flatten_unordered().all_ticks()
498 /// # }, |mut stream| async move {
499 /// // 1, 2, 3, but in no particular order
500 /// # let mut results = Vec::new();
501 /// # for _ in 0..3 {
502 /// # results.push(stream.next().await.unwrap());
503 /// # }
504 /// # results.sort();
505 /// # assert_eq!(results, vec![1, 2, 3]);
506 /// # }));
507 /// # }
508 /// ```
509 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
510 where
511 T: IntoIterator<Item = U>,
512 {
513 self.flat_map_unordered(q!(|v| v))
514 }
515
516 /// Creates an optional containing only the value if it satisfies a predicate `f`.
517 ///
518 /// If the optional is empty, the output optional is also empty. If the optional contains
519 /// a value and the predicate returns `true`, the output optional contains the same value.
520 /// If the predicate returns `false`, the output optional is empty.
521 ///
522 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
523 /// not modify or take ownership of the value. If you need to modify the value while filtering
524 /// use [`Optional::filter_map`] instead.
525 ///
526 /// # Example
527 /// ```rust
528 /// # #[cfg(feature = "deploy")] {
529 /// # use hydro_lang::prelude::*;
530 /// # use futures::StreamExt;
531 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
532 /// let tick = process.tick();
533 /// let optional = tick.optional_first_tick(q!(5));
534 /// optional.filter(q!(|&x| x > 3)).all_ticks()
535 /// # }, |mut stream| async move {
536 /// // 5
537 /// # assert_eq!(stream.next().await.unwrap(), 5);
538 /// # }));
539 /// # }
540 /// ```
541 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
542 where
543 F: Fn(&T) -> bool + 'a,
544 {
545 let f = f.splice_fn1_borrow_ctx(&self.location).into();
546 Optional::new(
547 self.location.clone(),
548 HydroNode::Filter {
549 f,
550 input: Box::new(self.ir_node.into_inner()),
551 metadata: self.location.new_node_metadata(Self::collection_kind()),
552 },
553 )
554 }
555
556 /// An operator that both filters and maps. It yields only the value if the supplied
557 /// closure `f` returns `Some(value)`.
558 ///
559 /// If the optional is empty, the output optional is also empty. If the optional contains
560 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
561 /// If the closure returns `None`, the output optional is empty.
562 ///
563 /// # Example
564 /// ```rust
565 /// # #[cfg(feature = "deploy")] {
566 /// # use hydro_lang::prelude::*;
567 /// # use futures::StreamExt;
568 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
569 /// let tick = process.tick();
570 /// let optional = tick.optional_first_tick(q!("42"));
571 /// optional
572 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
573 /// .all_ticks()
574 /// # }, |mut stream| async move {
575 /// // 42
576 /// # assert_eq!(stream.next().await.unwrap(), 42);
577 /// # }));
578 /// # }
579 /// ```
580 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
581 where
582 F: Fn(T) -> Option<U> + 'a,
583 {
584 let f = f.splice_fn1_ctx(&self.location).into();
585 Optional::new(
586 self.location.clone(),
587 HydroNode::FilterMap {
588 f,
589 input: Box::new(self.ir_node.into_inner()),
590 metadata: self
591 .location
592 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
593 },
594 )
595 }
596
597 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
598 ///
599 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
600 /// non-null. This is useful for combining several pieces of state together.
601 ///
602 /// # Example
603 /// ```rust
604 /// # #[cfg(feature = "deploy")] {
605 /// # use hydro_lang::prelude::*;
606 /// # use futures::StreamExt;
607 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
608 /// let tick = process.tick();
609 /// let numbers = process
610 /// .source_iter(q!(vec![123, 456, 789]))
611 /// .batch(&tick, nondet!(/** test */));
612 /// let min = numbers.clone().min(); // Optional
613 /// let max = numbers.max(); // Optional
614 /// min.zip(max).all_ticks()
615 /// # }, |mut stream| async move {
616 /// // [(123, 789)]
617 /// # for w in vec![(123, 789)] {
618 /// # assert_eq!(stream.next().await.unwrap(), w);
619 /// # }
620 /// # }));
621 /// # }
622 /// ```
623 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
624 where
625 O: Clone,
626 {
627 let other: Optional<O, L, B> = other.into();
628 check_matching_location(&self.location, &other.location);
629
630 if L::is_top_level()
631 && let Some(tick) = self.location.try_tick()
632 {
633 let out = zip_inside_tick(
634 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
635 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
636 )
637 .latest();
638
639 Optional::new(out.location, out.ir_node.into_inner())
640 } else {
641 zip_inside_tick(self, other)
642 }
643 }
644
645 /// Passes through `self` when it has a value, otherwise passes through `other`.
646 ///
647 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
648 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
649 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
650 ///
651 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
652 /// of the inputs change (including to/from null states).
653 ///
654 /// # Example
655 /// ```rust
656 /// # #[cfg(feature = "deploy")] {
657 /// # use hydro_lang::prelude::*;
658 /// # use futures::StreamExt;
659 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
660 /// let tick = process.tick();
661 /// // ticks are lazy by default, forces the second tick to run
662 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
663 ///
664 /// let some_first_tick = tick.optional_first_tick(q!(123));
665 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
666 /// some_first_tick.or(some_second_tick).all_ticks()
667 /// # }, |mut stream| async move {
668 /// // [123 /* first tick */, 456 /* second tick */]
669 /// # for w in vec![123, 456] {
670 /// # assert_eq!(stream.next().await.unwrap(), w);
671 /// # }
672 /// # }));
673 /// # }
674 /// ```
675 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
676 check_matching_location(&self.location, &other.location);
677
678 if L::is_top_level()
679 && let Some(tick) = self.location.try_tick()
680 {
681 let out = or_inside_tick(
682 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
683 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
684 )
685 .latest();
686
687 Optional::new(out.location, out.ir_node.into_inner())
688 } else {
689 Optional::new(
690 self.location.clone(),
691 HydroNode::ChainFirst {
692 first: Box::new(self.ir_node.into_inner()),
693 second: Box::new(other.ir_node.into_inner()),
694 metadata: self.location.new_node_metadata(Self::collection_kind()),
695 },
696 )
697 }
698 }
699
700 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
701 ///
702 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
703 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
704 ///
705 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
706 /// of the inputs change (including to/from null states).
707 ///
708 /// # Example
709 /// ```rust
710 /// # #[cfg(feature = "deploy")] {
711 /// # use hydro_lang::prelude::*;
712 /// # use futures::StreamExt;
713 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
714 /// let tick = process.tick();
715 /// // ticks are lazy by default, forces the later ticks to run
716 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
717 ///
718 /// let some_first_tick = tick.optional_first_tick(q!(123));
719 /// some_first_tick
720 /// .unwrap_or(tick.singleton(q!(456)))
721 /// .all_ticks()
722 /// # }, |mut stream| async move {
723 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
724 /// # for w in vec![123, 456, 456, 456] {
725 /// # assert_eq!(stream.next().await.unwrap(), w);
726 /// # }
727 /// # }));
728 /// # }
729 /// ```
730 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
731 let res_option = self.or(other.into());
732 Singleton::new(
733 res_option.location.clone(),
734 HydroNode::Cast {
735 inner: Box::new(res_option.ir_node.into_inner()),
736 metadata: res_option
737 .location
738 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
739 },
740 )
741 }
742
743 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
744 ///
745 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
746 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
747 /// so that Hydro can skip any computation on null values.
748 ///
749 /// # Example
750 /// ```rust
751 /// # #[cfg(feature = "deploy")] {
752 /// # use hydro_lang::prelude::*;
753 /// # use futures::StreamExt;
754 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
755 /// let tick = process.tick();
756 /// // ticks are lazy by default, forces the later ticks to run
757 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
758 ///
759 /// let some_first_tick = tick.optional_first_tick(q!(123));
760 /// some_first_tick.into_singleton().all_ticks()
761 /// # }, |mut stream| async move {
762 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
763 /// # for w in vec![Some(123), None, None, None] {
764 /// # assert_eq!(stream.next().await.unwrap(), w);
765 /// # }
766 /// # }));
767 /// # }
768 /// ```
769 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
770 where
771 T: Clone,
772 {
773 let none: syn::Expr = parse_quote!(::std::option::Option::None);
774
775 let none_singleton = Singleton::new(
776 self.location.clone(),
777 HydroNode::SingletonSource {
778 value: none.into(),
779 metadata: self
780 .location
781 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
782 },
783 );
784
785 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
786 }
787
788 /// An operator which allows you to "name" a `HydroNode`.
789 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
790 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
791 {
792 let mut node = self.ir_node.borrow_mut();
793 let metadata = node.metadata_mut();
794 metadata.tag = Some(name.to_string());
795 }
796 self
797 }
798}
799
800impl<'a, T, L> Optional<T, L, Bounded>
801where
802 L: Location<'a>,
803{
804 /// Filters this optional, passing through the optional value if it is non-null **and** the
805 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
806 ///
807 /// Useful for conditionally processing, such as only emitting an optional's value outside
808 /// a tick if some other condition is satisfied.
809 ///
810 /// # Example
811 /// ```rust
812 /// # #[cfg(feature = "deploy")] {
813 /// # use hydro_lang::prelude::*;
814 /// # use futures::StreamExt;
815 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816 /// let tick = process.tick();
817 /// // ticks are lazy by default, forces the second tick to run
818 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
819 ///
820 /// let batch_first_tick = process
821 /// .source_iter(q!(vec![]))
822 /// .batch(&tick, nondet!(/** test */));
823 /// let batch_second_tick = process
824 /// .source_iter(q!(vec![456]))
825 /// .batch(&tick, nondet!(/** test */))
826 /// .defer_tick(); // appears on the second tick
827 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
828 /// batch_first_tick.chain(batch_second_tick).first()
829 /// .filter_if_some(some_on_first_tick)
830 /// .unwrap_or(tick.singleton(q!(789)))
831 /// .all_ticks()
832 /// # }, |mut stream| async move {
833 /// // [789, 789]
834 /// # for w in vec![789, 789] {
835 /// # assert_eq!(stream.next().await.unwrap(), w);
836 /// # }
837 /// # }));
838 /// # }
839 /// ```
840 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
841 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
842 }
843
844 /// Filters this optional, passing through the optional value if it is non-null **and** the
845 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
846 ///
847 /// Useful for conditionally processing, such as only emitting an optional's value outside
848 /// a tick if some other condition is satisfied.
849 ///
850 /// # Example
851 /// ```rust
852 /// # #[cfg(feature = "deploy")] {
853 /// # use hydro_lang::prelude::*;
854 /// # use futures::StreamExt;
855 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
856 /// let tick = process.tick();
857 /// // ticks are lazy by default, forces the second tick to run
858 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
859 ///
860 /// let batch_first_tick = process
861 /// .source_iter(q!(vec![]))
862 /// .batch(&tick, nondet!(/** test */));
863 /// let batch_second_tick = process
864 /// .source_iter(q!(vec![456]))
865 /// .batch(&tick, nondet!(/** test */))
866 /// .defer_tick(); // appears on the second tick
867 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
868 /// batch_first_tick.chain(batch_second_tick).first()
869 /// .filter_if_none(some_on_first_tick)
870 /// .unwrap_or(tick.singleton(q!(789)))
871 /// .all_ticks()
872 /// # }, |mut stream| async move {
873 /// // [789, 789]
874 /// # for w in vec![789, 456] {
875 /// # assert_eq!(stream.next().await.unwrap(), w);
876 /// # }
877 /// # }));
878 /// # }
879 /// ```
880 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
881 self.filter_if_some(
882 other
883 .map(q!(|_| ()))
884 .into_singleton()
885 .filter(q!(|o| o.is_none())),
886 )
887 }
888
889 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
890 ///
891 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
892 /// having a value, such as only releasing a piece of state if the node is the leader.
893 ///
894 /// # Example
895 /// ```rust
896 /// # #[cfg(feature = "deploy")] {
897 /// # use hydro_lang::prelude::*;
898 /// # use futures::StreamExt;
899 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
900 /// let tick = process.tick();
901 /// // ticks are lazy by default, forces the second tick to run
902 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
903 ///
904 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
905 /// some_on_first_tick
906 /// .if_some_then(tick.singleton(q!(456)))
907 /// .unwrap_or(tick.singleton(q!(123)))
908 /// # .all_ticks()
909 /// # }, |mut stream| async move {
910 /// // 456 (first tick) ~> 123 (second tick onwards)
911 /// # for w in vec![456, 123, 123] {
912 /// # assert_eq!(stream.next().await.unwrap(), w);
913 /// # }
914 /// # }));
915 /// # }
916 /// ```
917 pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
918 value.filter_if_some(self)
919 }
920}
921
922impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
923where
924 L: Location<'a> + NoTick,
925{
926 /// Returns an optional value corresponding to the latest snapshot of the optional
927 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
928 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
929 /// all snapshots of this optional into the atomic-associated tick will observe the
930 /// same value each tick.
931 ///
932 /// # Non-Determinism
933 /// Because this picks a snapshot of a optional whose value is continuously changing,
934 /// the output optional has a non-deterministic value since the snapshot can be at an
935 /// arbitrary point in time.
936 pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
937 Optional::new(
938 self.location.clone().tick,
939 HydroNode::Batch {
940 inner: Box::new(self.ir_node.into_inner()),
941 metadata: self
942 .location
943 .tick
944 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
945 },
946 )
947 }
948
949 /// Returns this optional back into a top-level, asynchronous execution context where updates
950 /// to the value will be asynchronously propagated.
951 pub fn end_atomic(self) -> Optional<T, L, B> {
952 Optional::new(
953 self.location.tick.l.clone(),
954 HydroNode::EndAtomic {
955 inner: Box::new(self.ir_node.into_inner()),
956 metadata: self
957 .location
958 .tick
959 .l
960 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
961 },
962 )
963 }
964}
965
966impl<'a, T, L, B: Boundedness> Optional<T, L, B>
967where
968 L: Location<'a>,
969{
970 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
971 /// will observe the same version of the value and will be executed synchronously before any
972 /// outputs are yielded (in [`Optional::end_atomic`]).
973 ///
974 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
975 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
976 /// a different version).
977 ///
978 /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
979 /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
980 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
981 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
982 let out_location = Atomic { tick: tick.clone() };
983 Optional::new(
984 out_location.clone(),
985 HydroNode::BeginAtomic {
986 inner: Box::new(self.ir_node.into_inner()),
987 metadata: out_location
988 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
989 },
990 )
991 }
992
993 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
994 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
995 /// relevant data that contributed to the snapshot at tick `t`.
996 ///
997 /// # Non-Determinism
998 /// Because this picks a snapshot of a optional whose value is continuously changing,
999 /// the output optional has a non-deterministic value since the snapshot can be at an
1000 /// arbitrary point in time.
1001 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1002 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1003 Optional::new(
1004 tick.clone(),
1005 HydroNode::Batch {
1006 inner: Box::new(self.ir_node.into_inner()),
1007 metadata: tick
1008 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1009 },
1010 )
1011 }
1012
1013 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1014 /// with order corresponding to increasing prefixes of data contributing to the optional.
1015 ///
1016 /// # Non-Determinism
1017 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1018 /// to non-deterministic batching and arrival of inputs, the output stream is
1019 /// non-deterministic.
1020 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1021 where
1022 L: NoTick,
1023 {
1024 let tick = self.location.tick();
1025 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
1026 }
1027
1028 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1029 /// value taken at various points in time. Because the input optional may be
1030 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1031 /// represent the value of the optional given some prefix of the streams leading up to
1032 /// it.
1033 ///
1034 /// # Non-Determinism
1035 /// The output stream is non-deterministic in which elements are sampled, since this
1036 /// is controlled by a clock.
1037 pub fn sample_every(
1038 self,
1039 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1040 nondet: NonDet,
1041 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1042 where
1043 L: NoTick + NoAtomic,
1044 {
1045 let samples = self.location.source_interval(interval, nondet);
1046 let tick = self.location.tick();
1047
1048 self.snapshot(&tick, nondet)
1049 .filter_if_some(samples.batch(&tick, nondet).first())
1050 .all_ticks()
1051 .weakest_retries()
1052 }
1053}
1054
1055impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1056where
1057 L: Location<'a>,
1058{
1059 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1060 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1061 /// null values).
1062 ///
1063 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1064 /// producing one element in the output for each (non-null) tick. This is useful for batched
1065 /// computations, where the results from each tick must be combined together.
1066 ///
1067 /// # Example
1068 /// ```rust
1069 /// # #[cfg(feature = "deploy")] {
1070 /// # use hydro_lang::prelude::*;
1071 /// # use futures::StreamExt;
1072 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1073 /// # let tick = process.tick();
1074 /// # // ticks are lazy by default, forces the second tick to run
1075 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1076 /// # let batch_first_tick = process
1077 /// # .source_iter(q!(vec![]))
1078 /// # .batch(&tick, nondet!(/** test */));
1079 /// # let batch_second_tick = process
1080 /// # .source_iter(q!(vec![1, 2, 3]))
1081 /// # .batch(&tick, nondet!(/** test */))
1082 /// # .defer_tick(); // appears on the second tick
1083 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1084 /// input_batch // first tick: [], second tick: [1, 2, 3]
1085 /// .max()
1086 /// .all_ticks()
1087 /// # }, |mut stream| async move {
1088 /// // [3]
1089 /// # for w in vec![3] {
1090 /// # assert_eq!(stream.next().await.unwrap(), w);
1091 /// # }
1092 /// # }));
1093 /// # }
1094 /// ```
1095 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1096 self.into_stream().all_ticks()
1097 }
1098
1099 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1100 /// which will stream the value computed in _each_ tick as a separate stream element.
1101 ///
1102 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1103 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1104 /// optional's [`Tick`] context.
1105 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1106 self.into_stream().all_ticks_atomic()
1107 }
1108
1109 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1110 /// be asynchronously updated with the latest value of the optional inside the tick, including
1111 /// whether the optional is null or not.
1112 ///
1113 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1114 /// tick that tracks the inner value. This is useful for getting the value as of the
1115 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1116 ///
1117 /// # Example
1118 /// ```rust
1119 /// # #[cfg(feature = "deploy")] {
1120 /// # use hydro_lang::prelude::*;
1121 /// # use futures::StreamExt;
1122 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1123 /// # let tick = process.tick();
1124 /// # // ticks are lazy by default, forces the second tick to run
1125 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1126 /// # let batch_first_tick = process
1127 /// # .source_iter(q!(vec![]))
1128 /// # .batch(&tick, nondet!(/** test */));
1129 /// # let batch_second_tick = process
1130 /// # .source_iter(q!(vec![1, 2, 3]))
1131 /// # .batch(&tick, nondet!(/** test */))
1132 /// # .defer_tick(); // appears on the second tick
1133 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1134 /// input_batch // first tick: [], second tick: [1, 2, 3]
1135 /// .max()
1136 /// .latest()
1137 /// # .into_singleton()
1138 /// # .sample_eager(nondet!(/** test */))
1139 /// # }, |mut stream| async move {
1140 /// // asynchronously changes from None ~> 3
1141 /// # for w in vec![None, Some(3)] {
1142 /// # assert_eq!(stream.next().await.unwrap(), w);
1143 /// # }
1144 /// # }));
1145 /// # }
1146 /// ```
1147 pub fn latest(self) -> Optional<T, L, Unbounded> {
1148 Optional::new(
1149 self.location.outer().clone(),
1150 HydroNode::YieldConcat {
1151 inner: Box::new(self.ir_node.into_inner()),
1152 metadata: self
1153 .location
1154 .outer()
1155 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1156 },
1157 )
1158 }
1159
1160 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1161 /// be updated with the latest value of the optional inside the tick.
1162 ///
1163 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1164 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1165 /// optional's [`Tick`] context.
1166 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1167 let out_location = Atomic {
1168 tick: self.location.clone(),
1169 };
1170
1171 Optional::new(
1172 out_location.clone(),
1173 HydroNode::YieldConcat {
1174 inner: Box::new(self.ir_node.into_inner()),
1175 metadata: out_location
1176 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1177 },
1178 )
1179 }
1180
1181 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1182 /// always has the state of `self` at tick `T - 1`.
1183 ///
1184 /// At tick `0`, the output optional is null, since there is no previous tick.
1185 ///
1186 /// This operator enables stateful iterative processing with ticks, by sending data from one
1187 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1188 ///
1189 /// # Example
1190 /// ```rust
1191 /// # #[cfg(feature = "deploy")] {
1192 /// # use hydro_lang::prelude::*;
1193 /// # use futures::StreamExt;
1194 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1195 /// let tick = process.tick();
1196 /// // ticks are lazy by default, forces the second tick to run
1197 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1198 ///
1199 /// let batch_first_tick = process
1200 /// .source_iter(q!(vec![1, 2]))
1201 /// .batch(&tick, nondet!(/** test */));
1202 /// let batch_second_tick = process
1203 /// .source_iter(q!(vec![3, 4]))
1204 /// .batch(&tick, nondet!(/** test */))
1205 /// .defer_tick(); // appears on the second tick
1206 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1207 /// .reduce(q!(|state, v| *state += v));
1208 ///
1209 /// current_tick_sum.clone().into_singleton().zip(
1210 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1211 /// ).all_ticks()
1212 /// # }, |mut stream| async move {
1213 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1214 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1215 /// # assert_eq!(stream.next().await.unwrap(), w);
1216 /// # }
1217 /// # }));
1218 /// # }
1219 /// ```
1220 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1221 Optional::new(
1222 self.location.clone(),
1223 HydroNode::DeferTick {
1224 input: Box::new(self.ir_node.into_inner()),
1225 metadata: self.location.new_node_metadata(Self::collection_kind()),
1226 },
1227 )
1228 }
1229
1230 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1231 /// non-null. Otherwise, the stream is empty.
1232 ///
1233 /// # Example
1234 /// ```rust
1235 /// # #[cfg(feature = "deploy")] {
1236 /// # use hydro_lang::prelude::*;
1237 /// # use futures::StreamExt;
1238 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1239 /// # let tick = process.tick();
1240 /// # // ticks are lazy by default, forces the second tick to run
1241 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1242 /// # let batch_first_tick = process
1243 /// # .source_iter(q!(vec![]))
1244 /// # .batch(&tick, nondet!(/** test */));
1245 /// # let batch_second_tick = process
1246 /// # .source_iter(q!(vec![123, 456]))
1247 /// # .batch(&tick, nondet!(/** test */))
1248 /// # .defer_tick(); // appears on the second tick
1249 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1250 /// input_batch // first tick: [], second tick: [123, 456]
1251 /// .clone()
1252 /// .max()
1253 /// .into_stream()
1254 /// .chain(input_batch)
1255 /// .all_ticks()
1256 /// # }, |mut stream| async move {
1257 /// // [456, 123, 456]
1258 /// # for w in vec![456, 123, 456] {
1259 /// # assert_eq!(stream.next().await.unwrap(), w);
1260 /// # }
1261 /// # }));
1262 /// # }
1263 /// ```
1264 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1265 Stream::new(
1266 self.location.clone(),
1267 HydroNode::Cast {
1268 inner: Box::new(self.ir_node.into_inner()),
1269 metadata: self.location.new_node_metadata(Stream::<
1270 T,
1271 Tick<L>,
1272 Bounded,
1273 TotalOrder,
1274 ExactlyOnce,
1275 >::collection_kind()),
1276 },
1277 )
1278 }
1279}
1280
1281#[cfg(feature = "deploy")]
1282#[cfg(test)]
1283mod tests {
1284 use futures::StreamExt;
1285 use hydro_deploy::Deployment;
1286 use stageleft::q;
1287
1288 use super::Optional;
1289 use crate::compile::builder::FlowBuilder;
1290 use crate::location::Location;
1291 use crate::nondet::nondet;
1292
1293 #[tokio::test]
1294 async fn optional_or_cardinality() {
1295 let mut deployment = Deployment::new();
1296
1297 let flow = FlowBuilder::new();
1298 let node = flow.process::<()>();
1299 let external = flow.external::<()>();
1300
1301 let node_tick = node.tick();
1302 let tick_singleton = node_tick.singleton(q!(123));
1303 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1304 let counts = tick_optional_inhabited
1305 .clone()
1306 .or(tick_optional_inhabited)
1307 .into_stream()
1308 .count()
1309 .all_ticks()
1310 .send_bincode_external(&external);
1311
1312 let nodes = flow
1313 .with_process(&node, deployment.Localhost())
1314 .with_external(&external, deployment.Localhost())
1315 .deploy(&mut deployment);
1316
1317 deployment.deploy().await.unwrap();
1318
1319 let mut external_out = nodes.connect(counts).await;
1320
1321 deployment.start().await.unwrap();
1322
1323 assert_eq!(external_out.next().await.unwrap(), 1);
1324 }
1325
1326 #[tokio::test]
1327 async fn into_singleton_top_level_none_cardinality() {
1328 let mut deployment = Deployment::new();
1329
1330 let flow = FlowBuilder::new();
1331 let node = flow.process::<()>();
1332 let external = flow.external::<()>();
1333
1334 let node_tick = node.tick();
1335 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1336 let into_singleton = top_level_none.into_singleton();
1337
1338 let tick_driver = node.spin();
1339
1340 let counts = into_singleton
1341 .snapshot(&node_tick, nondet!(/** test */))
1342 .into_stream()
1343 .count()
1344 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1345 .map(q!(|(c, _)| c))
1346 .all_ticks()
1347 .send_bincode_external(&external);
1348
1349 let nodes = flow
1350 .with_process(&node, deployment.Localhost())
1351 .with_external(&external, deployment.Localhost())
1352 .deploy(&mut deployment);
1353
1354 deployment.deploy().await.unwrap();
1355
1356 let mut external_out = nodes.connect(counts).await;
1357
1358 deployment.start().await.unwrap();
1359
1360 assert_eq!(external_out.next().await.unwrap(), 1);
1361 assert_eq!(external_out.next().await.unwrap(), 1);
1362 assert_eq!(external_out.next().await.unwrap(), 1);
1363 }
1364}