hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A single Rust value that can asynchronously change over time.
25///
26/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
27/// [`Unbounded`], the value will asynchronously change over time.
28///
29/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
30/// a single number that will asynchronously change as events are processed. Singletons also appear
31/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
32/// such as getting the length of a batch of requests.
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this singleton
36/// - `Loc`: the [`Location`] where the singleton is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Singleton<Type, Loc, Bound: Boundedness> {
39 pub(crate) location: Loc,
40 pub(crate) ir_node: RefCell<HydroNode>,
41
42 _phantom: PhantomData<(Type, Loc, Bound)>,
43}
44
45impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
46where
47 L: Location<'a>,
48{
49 fn from(singleton: Singleton<T, L, Bounded>) -> Self {
50 Singleton::new(singleton.location, singleton.ir_node.into_inner())
51 }
52}
53
54impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
55where
56 L: Location<'a>,
57{
58 type Location = Tick<L>;
59
60 fn create_source_with_initial(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
61 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
62 location.clone(),
63 HydroNode::DeferTick {
64 input: Box::new(HydroNode::CycleSource {
65 ident,
66 metadata: location.new_node_metadata(Self::collection_kind()),
67 }),
68 metadata: location
69 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
70 },
71 );
72
73 from_previous_tick.unwrap_or(initial)
74 }
75}
76
77impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
78where
79 L: Location<'a>,
80{
81 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
82 assert_eq!(
83 Location::id(&self.location),
84 expected_location,
85 "locations do not match"
86 );
87 self.location
88 .flow_state()
89 .borrow_mut()
90 .push_root(HydroRoot::CycleSink {
91 ident,
92 input: Box::new(self.ir_node.into_inner()),
93 op_metadata: HydroIrOpMetadata::new(),
94 });
95 }
96}
97
98impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
99where
100 L: Location<'a>,
101{
102 type Location = Tick<L>;
103
104 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
105 Singleton::new(
106 location.clone(),
107 HydroNode::CycleSource {
108 ident,
109 metadata: location.new_node_metadata(Self::collection_kind()),
110 },
111 )
112 }
113}
114
115impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
116where
117 L: Location<'a>,
118{
119 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
120 assert_eq!(
121 Location::id(&self.location),
122 expected_location,
123 "locations do not match"
124 );
125 self.location
126 .flow_state()
127 .borrow_mut()
128 .push_root(HydroRoot::CycleSink {
129 ident,
130 input: Box::new(self.ir_node.into_inner()),
131 op_metadata: HydroIrOpMetadata::new(),
132 });
133 }
134}
135
136impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
137where
138 L: Location<'a> + NoTick,
139{
140 type Location = L;
141
142 fn create_source(ident: syn::Ident, location: L) -> Self {
143 Singleton::new(
144 location.clone(),
145 HydroNode::CycleSource {
146 ident,
147 metadata: location.new_node_metadata(Self::collection_kind()),
148 },
149 )
150 }
151}
152
153impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
154where
155 L: Location<'a> + NoTick,
156{
157 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
158 assert_eq!(
159 Location::id(&self.location),
160 expected_location,
161 "locations do not match"
162 );
163 self.location
164 .flow_state()
165 .borrow_mut()
166 .push_root(HydroRoot::CycleSink {
167 ident,
168 input: Box::new(self.ir_node.into_inner()),
169 op_metadata: HydroIrOpMetadata::new(),
170 });
171 }
172}
173
174impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
175where
176 T: Clone,
177 L: Location<'a>,
178{
179 fn clone(&self) -> Self {
180 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
181 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
182 *self.ir_node.borrow_mut() = HydroNode::Tee {
183 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
184 metadata: self.location.new_node_metadata(Self::collection_kind()),
185 };
186 }
187
188 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
189 Singleton {
190 location: self.location.clone(),
191 ir_node: HydroNode::Tee {
192 inner: TeeNode(inner.0.clone()),
193 metadata: metadata.clone(),
194 }
195 .into(),
196 _phantom: PhantomData,
197 }
198 } else {
199 unreachable!()
200 }
201 }
202}
203
204#[cfg(stageleft_runtime)]
205fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
206 me: Singleton<T, Tick<L>, B>,
207 other: O,
208) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
209where
210 Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
211{
212 check_matching_location(
213 &me.location,
214 &Singleton::<T, Tick<L>, B>::other_location(&other),
215 );
216
217 Singleton::<T, Tick<L>, B>::make(
218 me.location.clone(),
219 HydroNode::CrossSingleton {
220 left: Box::new(me.ir_node.into_inner()),
221 right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
222 metadata: me.location.new_node_metadata(CollectionKind::Singleton {
223 bound: B::BOUND_KIND,
224 element_type: stageleft::quote_type::<
225 <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
226 >()
227 .into(),
228 }),
229 },
230 )
231}
232
233impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
234where
235 L: Location<'a>,
236{
237 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
238 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
239 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
240 Singleton {
241 location,
242 ir_node: RefCell::new(ir_node),
243 _phantom: PhantomData,
244 }
245 }
246
247 pub(crate) fn collection_kind() -> CollectionKind {
248 CollectionKind::Singleton {
249 bound: B::BOUND_KIND,
250 element_type: stageleft::quote_type::<T>().into(),
251 }
252 }
253
254 /// Returns the [`Location`] where this singleton is being materialized.
255 pub fn location(&self) -> &L {
256 &self.location
257 }
258
259 /// Transforms the singleton value by applying a function `f` to it,
260 /// continuously as the input is updated.
261 ///
262 /// # Example
263 /// ```rust
264 /// # #[cfg(feature = "deploy")] {
265 /// # use hydro_lang::prelude::*;
266 /// # use futures::StreamExt;
267 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
268 /// let tick = process.tick();
269 /// let singleton = tick.singleton(q!(5));
270 /// singleton.map(q!(|v| v * 2)).all_ticks()
271 /// # }, |mut stream| async move {
272 /// // 10
273 /// # assert_eq!(stream.next().await.unwrap(), 10);
274 /// # }));
275 /// # }
276 /// ```
277 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
278 where
279 F: Fn(T) -> U + 'a,
280 {
281 let f = f.splice_fn1_ctx(&self.location).into();
282 Singleton::new(
283 self.location.clone(),
284 HydroNode::Map {
285 f,
286 input: Box::new(self.ir_node.into_inner()),
287 metadata: self
288 .location
289 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
290 },
291 )
292 }
293
294 /// Transforms the singleton value by applying a function `f` to it and then flattening
295 /// the result into a stream, preserving the order of elements.
296 ///
297 /// The function `f` is applied to the singleton value to produce an iterator, and all items
298 /// from that iterator are emitted in the output stream in deterministic order.
299 ///
300 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
301 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
302 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
303 ///
304 /// # Example
305 /// ```rust
306 /// # #[cfg(feature = "deploy")] {
307 /// # use hydro_lang::prelude::*;
308 /// # use futures::StreamExt;
309 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310 /// let tick = process.tick();
311 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
312 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
313 /// # }, |mut stream| async move {
314 /// // 1, 2, 3
315 /// # for w in vec![1, 2, 3] {
316 /// # assert_eq!(stream.next().await.unwrap(), w);
317 /// # }
318 /// # }));
319 /// # }
320 /// ```
321 pub fn flat_map_ordered<U, I, F>(
322 self,
323 f: impl IntoQuotedMut<'a, F, L>,
324 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
325 where
326 I: IntoIterator<Item = U>,
327 F: Fn(T) -> I + 'a,
328 {
329 let f = f.splice_fn1_ctx(&self.location).into();
330 Stream::new(
331 self.location.clone(),
332 HydroNode::FlatMap {
333 f,
334 input: Box::new(self.ir_node.into_inner()),
335 metadata: self.location.new_node_metadata(
336 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
337 ),
338 },
339 )
340 }
341
342 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
343 /// for the output type `I` to produce items in any order.
344 ///
345 /// The function `f` is applied to the singleton value to produce an iterator, and all items
346 /// from that iterator are emitted in the output stream in non-deterministic order.
347 ///
348 /// # Example
349 /// ```rust
350 /// # #[cfg(feature = "deploy")] {
351 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
352 /// # use futures::StreamExt;
353 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
354 /// let tick = process.tick();
355 /// let singleton = tick.singleton(q!(
356 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
357 /// ));
358 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
359 /// # }, |mut stream| async move {
360 /// // 1, 2, 3, but in no particular order
361 /// # let mut results = Vec::new();
362 /// # for _ in 0..3 {
363 /// # results.push(stream.next().await.unwrap());
364 /// # }
365 /// # results.sort();
366 /// # assert_eq!(results, vec![1, 2, 3]);
367 /// # }));
368 /// # }
369 /// ```
370 pub fn flat_map_unordered<U, I, F>(
371 self,
372 f: impl IntoQuotedMut<'a, F, L>,
373 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
374 where
375 I: IntoIterator<Item = U>,
376 F: Fn(T) -> I + 'a,
377 {
378 let f = f.splice_fn1_ctx(&self.location).into();
379 Stream::new(
380 self.location.clone(),
381 HydroNode::FlatMap {
382 f,
383 input: Box::new(self.ir_node.into_inner()),
384 metadata: self
385 .location
386 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
387 },
388 )
389 }
390
391 /// Flattens the singleton value into a stream, preserving the order of elements.
392 ///
393 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
394 /// are emitted in the output stream in deterministic order.
395 ///
396 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
397 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
398 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
399 ///
400 /// # Example
401 /// ```rust
402 /// # #[cfg(feature = "deploy")] {
403 /// # use hydro_lang::prelude::*;
404 /// # use futures::StreamExt;
405 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
406 /// let tick = process.tick();
407 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
408 /// singleton.flatten_ordered().all_ticks()
409 /// # }, |mut stream| async move {
410 /// // 1, 2, 3
411 /// # for w in vec![1, 2, 3] {
412 /// # assert_eq!(stream.next().await.unwrap(), w);
413 /// # }
414 /// # }));
415 /// # }
416 /// ```
417 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
418 where
419 T: IntoIterator<Item = U>,
420 {
421 self.flat_map_ordered(q!(|x| x))
422 }
423
424 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
425 /// for the element type `T` to produce items in any order.
426 ///
427 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
428 /// are emitted in the output stream in non-deterministic order.
429 ///
430 /// # Example
431 /// ```rust
432 /// # #[cfg(feature = "deploy")] {
433 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
434 /// # use futures::StreamExt;
435 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
436 /// let tick = process.tick();
437 /// let singleton = tick.singleton(q!(
438 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
439 /// ));
440 /// singleton.flatten_unordered().all_ticks()
441 /// # }, |mut stream| async move {
442 /// // 1, 2, 3, but in no particular order
443 /// # let mut results = Vec::new();
444 /// # for _ in 0..3 {
445 /// # results.push(stream.next().await.unwrap());
446 /// # }
447 /// # results.sort();
448 /// # assert_eq!(results, vec![1, 2, 3]);
449 /// # }));
450 /// # }
451 /// ```
452 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
453 where
454 T: IntoIterator<Item = U>,
455 {
456 self.flat_map_unordered(q!(|x| x))
457 }
458
459 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
460 ///
461 /// If the predicate returns `true`, the output optional contains the same value.
462 /// If the predicate returns `false`, the output optional is empty.
463 ///
464 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
465 /// not modify or take ownership of the value. If you need to modify the value while filtering
466 /// use [`Singleton::filter_map`] instead.
467 ///
468 /// # Example
469 /// ```rust
470 /// # #[cfg(feature = "deploy")] {
471 /// # use hydro_lang::prelude::*;
472 /// # use futures::StreamExt;
473 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
474 /// let tick = process.tick();
475 /// let singleton = tick.singleton(q!(5));
476 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
477 /// # }, |mut stream| async move {
478 /// // 5
479 /// # assert_eq!(stream.next().await.unwrap(), 5);
480 /// # }));
481 /// # }
482 /// ```
483 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
484 where
485 F: Fn(&T) -> bool + 'a,
486 {
487 let f = f.splice_fn1_borrow_ctx(&self.location).into();
488 Optional::new(
489 self.location.clone(),
490 HydroNode::Filter {
491 f,
492 input: Box::new(self.ir_node.into_inner()),
493 metadata: self
494 .location
495 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
496 },
497 )
498 }
499
500 /// An operator that both filters and maps. It yields the value only if the supplied
501 /// closure `f` returns `Some(value)`.
502 ///
503 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
504 /// If the closure returns `None`, the output optional is empty.
505 ///
506 /// # Example
507 /// ```rust
508 /// # #[cfg(feature = "deploy")] {
509 /// # use hydro_lang::prelude::*;
510 /// # use futures::StreamExt;
511 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
512 /// let tick = process.tick();
513 /// let singleton = tick.singleton(q!("42"));
514 /// singleton
515 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
516 /// .all_ticks()
517 /// # }, |mut stream| async move {
518 /// // 42
519 /// # assert_eq!(stream.next().await.unwrap(), 42);
520 /// # }));
521 /// # }
522 /// ```
523 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
524 where
525 F: Fn(T) -> Option<U> + 'a,
526 {
527 let f = f.splice_fn1_ctx(&self.location).into();
528 Optional::new(
529 self.location.clone(),
530 HydroNode::FilterMap {
531 f,
532 input: Box::new(self.ir_node.into_inner()),
533 metadata: self
534 .location
535 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
536 },
537 )
538 }
539
540 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
541 ///
542 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
543 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
544 /// non-null. This is useful for combining several pieces of state together.
545 ///
546 /// # Example
547 /// ```rust
548 /// # #[cfg(feature = "deploy")] {
549 /// # use hydro_lang::prelude::*;
550 /// # use futures::StreamExt;
551 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
552 /// let tick = process.tick();
553 /// let numbers = process
554 /// .source_iter(q!(vec![123, 456]))
555 /// .batch(&tick, nondet!(/** test */));
556 /// let count = numbers.clone().count(); // Singleton
557 /// let max = numbers.max(); // Optional
558 /// count.zip(max).all_ticks()
559 /// # }, |mut stream| async move {
560 /// // [(2, 456)]
561 /// # for w in vec![(2, 456)] {
562 /// # assert_eq!(stream.next().await.unwrap(), w);
563 /// # }
564 /// # }));
565 /// # }
566 /// ```
567 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
568 where
569 Self: ZipResult<'a, O, Location = L>,
570 {
571 check_matching_location(&self.location, &Self::other_location(&other));
572
573 if L::is_top_level()
574 && let Some(tick) = self.location.try_tick()
575 {
576 let out = zip_inside_tick(
577 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
578 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
579 Self::other_location(&other),
580 Self::other_ir_node(other),
581 )
582 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
583 )
584 .latest();
585
586 Self::make(out.location, out.ir_node.into_inner())
587 } else {
588 Self::make(
589 self.location.clone(),
590 HydroNode::CrossSingleton {
591 left: Box::new(self.ir_node.into_inner()),
592 right: Box::new(Self::other_ir_node(other)),
593 metadata: self.location.new_node_metadata(CollectionKind::Optional {
594 bound: B::BOUND_KIND,
595 element_type: stageleft::quote_type::<
596 <Self as ZipResult<'a, O>>::ElementType,
597 >()
598 .into(),
599 }),
600 },
601 )
602 }
603 }
604
605 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
606 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
607 ///
608 /// Useful for conditionally processing, such as only emitting a singleton's value outside
609 /// a tick if some other condition is satisfied.
610 ///
611 /// # Example
612 /// ```rust
613 /// # #[cfg(feature = "deploy")] {
614 /// # use hydro_lang::prelude::*;
615 /// # use futures::StreamExt;
616 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
617 /// let tick = process.tick();
618 /// // ticks are lazy by default, forces the second tick to run
619 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
620 ///
621 /// let batch_first_tick = process
622 /// .source_iter(q!(vec![1]))
623 /// .batch(&tick, nondet!(/** test */));
624 /// let batch_second_tick = process
625 /// .source_iter(q!(vec![1, 2, 3]))
626 /// .batch(&tick, nondet!(/** test */))
627 /// .defer_tick(); // appears on the second tick
628 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
629 /// batch_first_tick.chain(batch_second_tick).count()
630 /// .filter_if_some(some_on_first_tick)
631 /// .all_ticks()
632 /// # }, |mut stream| async move {
633 /// // [1]
634 /// # for w in vec![1] {
635 /// # assert_eq!(stream.next().await.unwrap(), w);
636 /// # }
637 /// # }));
638 /// # }
639 /// ```
640 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
641 self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
642 .map(q!(|(d, _signal)| d))
643 }
644
645 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
646 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
647 ///
648 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
649 /// the condition.
650 ///
651 /// # Example
652 /// ```rust
653 /// # #[cfg(feature = "deploy")] {
654 /// # use hydro_lang::prelude::*;
655 /// # use futures::StreamExt;
656 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
657 /// let tick = process.tick();
658 /// // ticks are lazy by default, forces the second tick to run
659 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
660 ///
661 /// let batch_first_tick = process
662 /// .source_iter(q!(vec![1]))
663 /// .batch(&tick, nondet!(/** test */));
664 /// let batch_second_tick = process
665 /// .source_iter(q!(vec![1, 2, 3]))
666 /// .batch(&tick, nondet!(/** test */))
667 /// .defer_tick(); // appears on the second tick
668 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
669 /// batch_first_tick.chain(batch_second_tick).count()
670 /// .filter_if_none(some_on_first_tick)
671 /// .all_ticks()
672 /// # }, |mut stream| async move {
673 /// // [3]
674 /// # for w in vec![3] {
675 /// # assert_eq!(stream.next().await.unwrap(), w);
676 /// # }
677 /// # }));
678 /// # }
679 /// ```
680 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
681 self.filter_if_some(
682 other
683 .map(q!(|_| ()))
684 .into_singleton()
685 .filter(q!(|o| o.is_none())),
686 )
687 }
688
689 /// An operator which allows you to "name" a `HydroNode`.
690 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
691 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
692 {
693 let mut node = self.ir_node.borrow_mut();
694 let metadata = node.metadata_mut();
695 metadata.tag = Some(name.to_string());
696 }
697 self
698 }
699}
700
701impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
702where
703 L: Location<'a> + NoTick,
704{
705 /// Returns a singleton value corresponding to the latest snapshot of the singleton
706 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
707 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
708 /// all snapshots of this singleton into the atomic-associated tick will observe the
709 /// same value each tick.
710 ///
711 /// # Non-Determinism
712 /// Because this picks a snapshot of a singleton whose value is continuously changing,
713 /// the output singleton has a non-deterministic value since the snapshot can be at an
714 /// arbitrary point in time.
715 pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
716 Singleton::new(
717 self.location.clone().tick,
718 HydroNode::Batch {
719 inner: Box::new(self.ir_node.into_inner()),
720 metadata: self
721 .location
722 .tick
723 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
724 },
725 )
726 }
727
728 /// Returns this singleton back into a top-level, asynchronous execution context where updates
729 /// to the value will be asynchronously propagated.
730 pub fn end_atomic(self) -> Singleton<T, L, B> {
731 Singleton::new(
732 self.location.tick.l.clone(),
733 HydroNode::EndAtomic {
734 inner: Box::new(self.ir_node.into_inner()),
735 metadata: self
736 .location
737 .tick
738 .l
739 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
740 },
741 )
742 }
743}
744
745impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
746where
747 L: Location<'a>,
748{
749 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
750 /// will observe the same version of the value and will be executed synchronously before any
751 /// outputs are yielded (in [`Optional::end_atomic`]).
752 ///
753 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
754 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
755 /// a different version).
756 ///
757 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
758 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
759 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
760 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
761 let out_location = Atomic { tick: tick.clone() };
762 Singleton::new(
763 out_location.clone(),
764 HydroNode::BeginAtomic {
765 inner: Box::new(self.ir_node.into_inner()),
766 metadata: out_location
767 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
768 },
769 )
770 }
771
772 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
773 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
774 /// relevant data that contributed to the snapshot at tick `t`.
775 ///
776 /// # Non-Determinism
777 /// Because this picks a snapshot of a singleton whose value is continuously changing,
778 /// the output singleton has a non-deterministic value since the snapshot can be at an
779 /// arbitrary point in time.
780 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
781 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
782 Singleton::new(
783 tick.clone(),
784 HydroNode::Batch {
785 inner: Box::new(self.ir_node.into_inner()),
786 metadata: tick
787 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
788 },
789 )
790 }
791
792 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
793 /// with order corresponding to increasing prefixes of data contributing to the singleton.
794 ///
795 /// # Non-Determinism
796 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
797 /// to non-deterministic batching and arrival of inputs, the output stream is
798 /// non-deterministic.
799 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
800 where
801 L: NoTick,
802 {
803 sliced! {
804 let snapshot = use(self, nondet);
805 snapshot.into_stream()
806 }
807 .weakest_retries()
808 }
809
810 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
811 /// value taken at various points in time. Because the input singleton may be
812 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
813 /// represent the value of the singleton given some prefix of the streams leading up to
814 /// it.
815 ///
816 /// # Non-Determinism
817 /// The output stream is non-deterministic in which elements are sampled, since this
818 /// is controlled by a clock.
819 pub fn sample_every(
820 self,
821 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
822 nondet: NonDet,
823 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
824 where
825 L: NoTick + NoAtomic,
826 {
827 let samples = self.location.source_interval(interval, nondet);
828 sliced! {
829 let snapshot = use(self, nondet);
830 let sample_batch = use(samples, nondet);
831
832 snapshot.filter_if_some(sample_batch.first()).into_stream()
833 }
834 .weakest_retries()
835 }
836}
837
838impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
839where
840 L: Location<'a>,
841{
842 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
843 /// which will stream the value computed in _each_ tick as a separate stream element.
844 ///
845 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
846 /// producing one element in the output for each tick. This is useful for batched computations,
847 /// where the results from each tick must be combined together.
848 ///
849 /// # Example
850 /// ```rust
851 /// # #[cfg(feature = "deploy")] {
852 /// # use hydro_lang::prelude::*;
853 /// # use futures::StreamExt;
854 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
855 /// let tick = process.tick();
856 /// # // ticks are lazy by default, forces the second tick to run
857 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
858 /// # let batch_first_tick = process
859 /// # .source_iter(q!(vec![1]))
860 /// # .batch(&tick, nondet!(/** test */));
861 /// # let batch_second_tick = process
862 /// # .source_iter(q!(vec![1, 2, 3]))
863 /// # .batch(&tick, nondet!(/** test */))
864 /// # .defer_tick(); // appears on the second tick
865 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
866 /// input_batch // first tick: [1], second tick: [1, 2, 3]
867 /// .count()
868 /// .all_ticks()
869 /// # }, |mut stream| async move {
870 /// // [1, 3]
871 /// # for w in vec![1, 3] {
872 /// # assert_eq!(stream.next().await.unwrap(), w);
873 /// # }
874 /// # }));
875 /// # }
876 /// ```
877 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
878 self.into_stream().all_ticks()
879 }
880
881 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
882 /// which will stream the value computed in _each_ tick as a separate stream element.
883 ///
884 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
885 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
886 /// singleton's [`Tick`] context.
887 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
888 self.into_stream().all_ticks_atomic()
889 }
890
891 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
892 /// be asynchronously updated with the latest value of the singleton inside the tick.
893 ///
894 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
895 /// tick that tracks the inner value. This is useful for getting the value as of the
896 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
897 ///
898 /// # Example
899 /// ```rust
900 /// # #[cfg(feature = "deploy")] {
901 /// # use hydro_lang::prelude::*;
902 /// # use futures::StreamExt;
903 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
904 /// let tick = process.tick();
905 /// # // ticks are lazy by default, forces the second tick to run
906 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
907 /// # let batch_first_tick = process
908 /// # .source_iter(q!(vec![1]))
909 /// # .batch(&tick, nondet!(/** test */));
910 /// # let batch_second_tick = process
911 /// # .source_iter(q!(vec![1, 2, 3]))
912 /// # .batch(&tick, nondet!(/** test */))
913 /// # .defer_tick(); // appears on the second tick
914 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
915 /// input_batch // first tick: [1], second tick: [1, 2, 3]
916 /// .count()
917 /// .latest()
918 /// # .sample_eager(nondet!(/** test */))
919 /// # }, |mut stream| async move {
920 /// // asynchronously changes from 1 ~> 3
921 /// # for w in vec![1, 3] {
922 /// # assert_eq!(stream.next().await.unwrap(), w);
923 /// # }
924 /// # }));
925 /// # }
926 /// ```
927 pub fn latest(self) -> Singleton<T, L, Unbounded> {
928 Singleton::new(
929 self.location.outer().clone(),
930 HydroNode::YieldConcat {
931 inner: Box::new(self.ir_node.into_inner()),
932 metadata: self
933 .location
934 .outer()
935 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
936 },
937 )
938 }
939
940 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
941 /// be updated with the latest value of the singleton inside the tick.
942 ///
943 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
944 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
945 /// singleton's [`Tick`] context.
946 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
947 let out_location = Atomic {
948 tick: self.location.clone(),
949 };
950 Singleton::new(
951 out_location.clone(),
952 HydroNode::YieldConcat {
953 inner: Box::new(self.ir_node.into_inner()),
954 metadata: out_location
955 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
956 },
957 )
958 }
959
960 /// Converts this singleton into a [`Stream`] containing a single element, the value.
961 ///
962 /// # Example
963 /// ```rust
964 /// # #[cfg(feature = "deploy")] {
965 /// # use hydro_lang::prelude::*;
966 /// # use futures::StreamExt;
967 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
968 /// let tick = process.tick();
969 /// let batch_input = process
970 /// .source_iter(q!(vec![123, 456]))
971 /// .batch(&tick, nondet!(/** test */));
972 /// batch_input.clone().chain(
973 /// batch_input.count().into_stream()
974 /// ).all_ticks()
975 /// # }, |mut stream| async move {
976 /// // [123, 456, 2]
977 /// # for w in vec![123, 456, 2] {
978 /// # assert_eq!(stream.next().await.unwrap(), w);
979 /// # }
980 /// # }));
981 /// # }
982 /// ```
983 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
984 Stream::new(
985 self.location.clone(),
986 HydroNode::Cast {
987 inner: Box::new(self.ir_node.into_inner()),
988 metadata: self.location.new_node_metadata(Stream::<
989 T,
990 Tick<L>,
991 Bounded,
992 TotalOrder,
993 ExactlyOnce,
994 >::collection_kind()),
995 },
996 )
997 }
998}
999
1000#[doc(hidden)]
1001/// Helper trait that determines the output collection type for [`Singleton::zip`].
1002///
1003/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1004/// [`Singleton`].
1005#[sealed::sealed]
1006pub trait ZipResult<'a, Other> {
1007 /// The output collection type.
1008 type Out;
1009 /// The type of the tupled output value.
1010 type ElementType;
1011 /// The type of the other collection's value.
1012 type OtherType;
1013 /// The location where the tupled result will be materialized.
1014 type Location: Location<'a>;
1015
1016 /// The location of the second input to the `zip`.
1017 fn other_location(other: &Other) -> Self::Location;
1018 /// The IR node of the second input to the `zip`.
1019 fn other_ir_node(other: Other) -> HydroNode;
1020
1021 /// Constructs the output live collection given an IR node containing the zip result.
1022 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1023}
1024
1025#[sealed::sealed]
1026impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1027where
1028 L: Location<'a>,
1029{
1030 type Out = Singleton<(T, U), L, B>;
1031 type ElementType = (T, U);
1032 type OtherType = U;
1033 type Location = L;
1034
1035 fn other_location(other: &Singleton<U, L, B>) -> L {
1036 other.location.clone()
1037 }
1038
1039 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1040 other.ir_node.into_inner()
1041 }
1042
1043 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1044 Singleton::new(
1045 location.clone(),
1046 HydroNode::Cast {
1047 inner: Box::new(ir_node),
1048 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1049 },
1050 )
1051 }
1052}
1053
1054#[sealed::sealed]
1055impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1056where
1057 L: Location<'a>,
1058{
1059 type Out = Optional<(T, U), L, B>;
1060 type ElementType = (T, U);
1061 type OtherType = U;
1062 type Location = L;
1063
1064 fn other_location(other: &Optional<U, L, B>) -> L {
1065 other.location.clone()
1066 }
1067
1068 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1069 other.ir_node.into_inner()
1070 }
1071
1072 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1073 Optional::new(location, ir_node)
1074 }
1075}
1076
1077#[cfg(test)]
1078mod tests {
1079 #[cfg(feature = "deploy")]
1080 use futures::{SinkExt, StreamExt};
1081 #[cfg(feature = "deploy")]
1082 use hydro_deploy::Deployment;
1083 #[cfg(any(feature = "deploy", feature = "sim"))]
1084 use stageleft::q;
1085
1086 #[cfg(any(feature = "deploy", feature = "sim"))]
1087 use crate::compile::builder::FlowBuilder;
1088 #[cfg(feature = "deploy")]
1089 use crate::live_collections::stream::ExactlyOnce;
1090 #[cfg(any(feature = "deploy", feature = "sim"))]
1091 use crate::location::Location;
1092 #[cfg(any(feature = "deploy", feature = "sim"))]
1093 use crate::nondet::nondet;
1094
1095 #[cfg(feature = "deploy")]
1096 #[tokio::test]
1097 async fn tick_cycle_cardinality() {
1098 let mut deployment = Deployment::new();
1099
1100 let flow = FlowBuilder::new();
1101 let node = flow.process::<()>();
1102 let external = flow.external::<()>();
1103
1104 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1105
1106 let node_tick = node.tick();
1107 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1108 let counts = singleton
1109 .clone()
1110 .into_stream()
1111 .count()
1112 .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1113 .all_ticks()
1114 .send_bincode_external(&external);
1115 complete_cycle.complete_next_tick(singleton);
1116
1117 let nodes = flow
1118 .with_process(&node, deployment.Localhost())
1119 .with_external(&external, deployment.Localhost())
1120 .deploy(&mut deployment);
1121
1122 deployment.deploy().await.unwrap();
1123
1124 let mut tick_trigger = nodes.connect(input_send).await;
1125 let mut external_out = nodes.connect(counts).await;
1126
1127 deployment.start().await.unwrap();
1128
1129 tick_trigger.send(()).await.unwrap();
1130
1131 assert_eq!(external_out.next().await.unwrap(), 1);
1132
1133 tick_trigger.send(()).await.unwrap();
1134
1135 assert_eq!(external_out.next().await.unwrap(), 1);
1136 }
1137
1138 #[cfg(feature = "sim")]
1139 #[test]
1140 #[should_panic]
1141 fn sim_fold_intermediate_states() {
1142 let flow = FlowBuilder::new();
1143 let node = flow.process::<()>();
1144
1145 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1146 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1147
1148 let tick = node.tick();
1149 let batch = folded.snapshot(&tick, nondet!(/** test */));
1150 let out_recv = batch.all_ticks().sim_output();
1151
1152 flow.sim().exhaustive(async || {
1153 assert_eq!(out_recv.next().await.unwrap(), 10);
1154 });
1155 }
1156
1157 #[cfg(feature = "sim")]
1158 #[test]
1159 fn sim_fold_intermediate_state_count() {
1160 let flow = FlowBuilder::new();
1161 let node = flow.process::<()>();
1162
1163 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1164 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1165
1166 let tick = node.tick();
1167 let batch = folded.snapshot(&tick, nondet!(/** test */));
1168 let out_recv = batch.all_ticks().sim_output();
1169
1170 let instance_count = flow.sim().exhaustive(async || {
1171 let out = out_recv.collect::<Vec<_>>().await;
1172 assert_eq!(out.last(), Some(&10));
1173 });
1174
1175 assert_eq!(
1176 instance_count,
1177 16 // 2^4 possible subsets of intermediates (including initial state)
1178 )
1179 }
1180
1181 #[cfg(feature = "sim")]
1182 #[test]
1183 fn sim_fold_no_repeat_initial() {
1184 // check that we don't repeat the initial state of the fold in autonomous decisions
1185
1186 let flow = FlowBuilder::new();
1187 let node = flow.process::<()>();
1188
1189 let (in_port, input) = node.sim_input();
1190 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1191
1192 let tick = node.tick();
1193 let batch = folded.snapshot(&tick, nondet!(/** test */));
1194 let out_recv = batch.all_ticks().sim_output();
1195
1196 flow.sim().exhaustive(async || {
1197 assert_eq!(out_recv.next().await.unwrap(), 0);
1198
1199 in_port.send(123);
1200
1201 assert_eq!(out_recv.next().await.unwrap(), 123);
1202 });
1203 }
1204
1205 #[cfg(feature = "sim")]
1206 #[test]
1207 #[should_panic]
1208 fn sim_fold_repeats_snapshots() {
1209 // when the tick is driven by a snapshot AND something else, the snapshot can
1210 // "stutter" and repeat the same state multiple times
1211
1212 let flow = FlowBuilder::new();
1213 let node = flow.process::<()>();
1214
1215 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1216 let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1217
1218 let tick = node.tick();
1219 let batch = source_iter
1220 .batch(&tick, nondet!(/** test */))
1221 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1222 let out_recv = batch.all_ticks().sim_output();
1223
1224 flow.sim().exhaustive(async || {
1225 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1226 {
1227 panic!("repeated snapshot");
1228 }
1229 });
1230 }
1231
1232 #[cfg(feature = "sim")]
1233 #[test]
1234 fn sim_fold_repeats_snapshots_count() {
1235 // check the number of instances
1236 let flow = FlowBuilder::new();
1237 let node = flow.process::<()>();
1238
1239 let source_iter = node.source_iter(q!(vec![1, 2]));
1240 let folded = source_iter.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1241
1242 let tick = node.tick();
1243 let batch = source_iter
1244 .batch(&tick, nondet!(/** test */))
1245 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1246 let out_recv = batch.all_ticks().sim_output();
1247
1248 let count = flow.sim().exhaustive(async || {
1249 let _ = out_recv.collect::<Vec<_>>().await;
1250 });
1251
1252 assert_eq!(count, 52);
1253 // don't have a combinatorial explanation for this number yet, but checked via logs
1254 }
1255
1256 #[cfg(feature = "sim")]
1257 #[test]
1258 fn sim_top_level_singleton_exhaustive() {
1259 // ensures that top-level singletons have only one snapshot
1260 let flow = FlowBuilder::new();
1261 let node = flow.process::<()>();
1262
1263 let singleton = node.singleton(q!(1));
1264 let tick = node.tick();
1265 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1266 let out_recv = batch.all_ticks().sim_output();
1267
1268 let count = flow.sim().exhaustive(async || {
1269 let _ = out_recv.collect::<Vec<_>>().await;
1270 });
1271
1272 assert_eq!(count, 1);
1273 }
1274
1275 #[cfg(feature = "sim")]
1276 #[test]
1277 fn sim_top_level_singleton_join_count() {
1278 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1279 // exploration
1280
1281 let flow = FlowBuilder::new();
1282 let node = flow.process::<()>();
1283
1284 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1285 let tick = node.tick();
1286 let batch = source_iter
1287 .batch(&tick, nondet!(/** test */))
1288 .cross_singleton(
1289 node.singleton(q!(123))
1290 .snapshot(&tick, nondet!(/** test */)),
1291 );
1292 let out_recv = batch.all_ticks().sim_output();
1293
1294 let instance_count = flow.sim().exhaustive(async || {
1295 let _ = out_recv.collect::<Vec<_>>().await;
1296 });
1297
1298 assert_eq!(
1299 instance_count,
1300 16 // 2^4 ways to split up (including a possibly empty first batch)
1301 )
1302 }
1303}