hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A single Rust value that can asynchronously change over time.
26///
27/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
28/// [`Unbounded`], the value will asynchronously change over time.
29///
30/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
31/// a single number that will asynchronously change as events are processed. Singletons also appear
32/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
33/// such as getting the length of a batch of requests.
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this singleton
37/// - `Loc`: the [`Location`] where the singleton is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Singleton<Type, Loc, Bound: Boundedness> {
40 pub(crate) location: Loc,
41 pub(crate) ir_node: RefCell<HydroNode>,
42 pub(crate) flow_state: FlowState,
43
44 _phantom: PhantomData<(Type, Loc, Bound)>,
45}
46
47impl<T, L, B: Boundedness> Drop for Singleton<T, L, B> {
48 fn drop(&mut self) {
49 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
50 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
51 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
52 input: Box::new(ir_node),
53 op_metadata: HydroIrOpMetadata::new(),
54 });
55 }
56 }
57}
58
59impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
60where
61 T: Clone,
62 L: Location<'a> + NoTick,
63{
64 fn from(value: Singleton<T, L, Bounded>) -> Self {
65 let tick = value.location().tick();
66 value.clone_into_tick(&tick).latest()
67 }
68}
69
70impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
71where
72 L: Location<'a>,
73{
74 type Location = Tick<L>;
75
76 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
77 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
78 location.clone(),
79 HydroNode::DeferTick {
80 input: Box::new(HydroNode::CycleSource {
81 cycle_id,
82 metadata: location.new_node_metadata(Self::collection_kind()),
83 }),
84 metadata: location
85 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
86 },
87 );
88
89 from_previous_tick.unwrap_or(initial)
90 }
91}
92
93impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
94where
95 L: Location<'a>,
96{
97 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
98 assert_eq!(
99 Location::id(&self.location),
100 expected_location,
101 "locations do not match"
102 );
103 self.location
104 .flow_state()
105 .borrow_mut()
106 .push_root(HydroRoot::CycleSink {
107 cycle_id,
108 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
109 op_metadata: HydroIrOpMetadata::new(),
110 });
111 }
112}
113
114impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
115where
116 L: Location<'a>,
117{
118 type Location = Tick<L>;
119
120 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
121 Singleton::new(
122 location.clone(),
123 HydroNode::CycleSource {
124 cycle_id,
125 metadata: location.new_node_metadata(Self::collection_kind()),
126 },
127 )
128 }
129}
130
131impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
132where
133 L: Location<'a>,
134{
135 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
136 assert_eq!(
137 Location::id(&self.location),
138 expected_location,
139 "locations do not match"
140 );
141 self.location
142 .flow_state()
143 .borrow_mut()
144 .push_root(HydroRoot::CycleSink {
145 cycle_id,
146 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
147 op_metadata: HydroIrOpMetadata::new(),
148 });
149 }
150}
151
152impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
153where
154 L: Location<'a> + NoTick,
155{
156 type Location = L;
157
158 fn create_source(cycle_id: CycleId, location: L) -> Self {
159 Singleton::new(
160 location.clone(),
161 HydroNode::CycleSource {
162 cycle_id,
163 metadata: location.new_node_metadata(Self::collection_kind()),
164 },
165 )
166 }
167}
168
169impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
170where
171 L: Location<'a> + NoTick,
172{
173 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
174 assert_eq!(
175 Location::id(&self.location),
176 expected_location,
177 "locations do not match"
178 );
179 self.location
180 .flow_state()
181 .borrow_mut()
182 .push_root(HydroRoot::CycleSink {
183 cycle_id,
184 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
185 op_metadata: HydroIrOpMetadata::new(),
186 });
187 }
188}
189
190impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
191where
192 T: Clone,
193 L: Location<'a>,
194{
195 fn clone(&self) -> Self {
196 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
197 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
198 *self.ir_node.borrow_mut() = HydroNode::Tee {
199 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
200 metadata: self.location.new_node_metadata(Self::collection_kind()),
201 };
202 }
203
204 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
205 Singleton {
206 location: self.location.clone(),
207 flow_state: self.flow_state.clone(),
208 ir_node: HydroNode::Tee {
209 inner: SharedNode(inner.0.clone()),
210 metadata: metadata.clone(),
211 }
212 .into(),
213 _phantom: PhantomData,
214 }
215 } else {
216 unreachable!()
217 }
218 }
219}
220
221#[cfg(stageleft_runtime)]
222fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
223 me: Singleton<T, Tick<L>, B>,
224 other: Optional<O, Tick<L>, B>,
225) -> Optional<(T, O), Tick<L>, B> {
226 let me_as_optional: Optional<T, Tick<L>, B> = me.into();
227 super::optional::zip_inside_tick(me_as_optional, other)
228}
229
230impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
231where
232 L: Location<'a>,
233{
234 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
235 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
236 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
237 let flow_state = location.flow_state().clone();
238 Singleton {
239 location,
240 flow_state,
241 ir_node: RefCell::new(ir_node),
242 _phantom: PhantomData,
243 }
244 }
245
246 pub(crate) fn collection_kind() -> CollectionKind {
247 CollectionKind::Singleton {
248 bound: B::BOUND_KIND,
249 element_type: stageleft::quote_type::<T>().into(),
250 }
251 }
252
253 /// Returns the [`Location`] where this singleton is being materialized.
254 pub fn location(&self) -> &L {
255 &self.location
256 }
257
258 /// Transforms the singleton value by applying a function `f` to it,
259 /// continuously as the input is updated.
260 ///
261 /// # Example
262 /// ```rust
263 /// # #[cfg(feature = "deploy")] {
264 /// # use hydro_lang::prelude::*;
265 /// # use futures::StreamExt;
266 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
267 /// let tick = process.tick();
268 /// let singleton = tick.singleton(q!(5));
269 /// singleton.map(q!(|v| v * 2)).all_ticks()
270 /// # }, |mut stream| async move {
271 /// // 10
272 /// # assert_eq!(stream.next().await.unwrap(), 10);
273 /// # }));
274 /// # }
275 /// ```
276 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
277 where
278 F: Fn(T) -> U + 'a,
279 {
280 let f = f.splice_fn1_ctx(&self.location).into();
281 Singleton::new(
282 self.location.clone(),
283 HydroNode::Map {
284 f,
285 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
286 metadata: self
287 .location
288 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
289 },
290 )
291 }
292
293 /// Transforms the singleton value by applying a function `f` to it and then flattening
294 /// the result into a stream, preserving the order of elements.
295 ///
296 /// The function `f` is applied to the singleton value to produce an iterator, and all items
297 /// from that iterator are emitted in the output stream in deterministic order.
298 ///
299 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
300 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
301 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
302 ///
303 /// # Example
304 /// ```rust
305 /// # #[cfg(feature = "deploy")] {
306 /// # use hydro_lang::prelude::*;
307 /// # use futures::StreamExt;
308 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
309 /// let tick = process.tick();
310 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
311 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
312 /// # }, |mut stream| async move {
313 /// // 1, 2, 3
314 /// # for w in vec![1, 2, 3] {
315 /// # assert_eq!(stream.next().await.unwrap(), w);
316 /// # }
317 /// # }));
318 /// # }
319 /// ```
320 pub fn flat_map_ordered<U, I, F>(
321 self,
322 f: impl IntoQuotedMut<'a, F, L>,
323 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
324 where
325 I: IntoIterator<Item = U>,
326 F: Fn(T) -> I + 'a,
327 {
328 let f = f.splice_fn1_ctx(&self.location).into();
329 Stream::new(
330 self.location.clone(),
331 HydroNode::FlatMap {
332 f,
333 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
334 metadata: self.location.new_node_metadata(
335 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
336 ),
337 },
338 )
339 }
340
341 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
342 /// for the output type `I` to produce items in any order.
343 ///
344 /// The function `f` is applied to the singleton value to produce an iterator, and all items
345 /// from that iterator are emitted in the output stream in non-deterministic order.
346 ///
347 /// # Example
348 /// ```rust
349 /// # #[cfg(feature = "deploy")] {
350 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
351 /// # use futures::StreamExt;
352 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
353 /// let tick = process.tick();
354 /// let singleton = tick.singleton(q!(
355 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
356 /// ));
357 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
358 /// # }, |mut stream| async move {
359 /// // 1, 2, 3, but in no particular order
360 /// # let mut results = Vec::new();
361 /// # for _ in 0..3 {
362 /// # results.push(stream.next().await.unwrap());
363 /// # }
364 /// # results.sort();
365 /// # assert_eq!(results, vec![1, 2, 3]);
366 /// # }));
367 /// # }
368 /// ```
369 pub fn flat_map_unordered<U, I, F>(
370 self,
371 f: impl IntoQuotedMut<'a, F, L>,
372 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
373 where
374 I: IntoIterator<Item = U>,
375 F: Fn(T) -> I + 'a,
376 {
377 let f = f.splice_fn1_ctx(&self.location).into();
378 Stream::new(
379 self.location.clone(),
380 HydroNode::FlatMap {
381 f,
382 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
383 metadata: self
384 .location
385 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
386 },
387 )
388 }
389
390 /// Flattens the singleton value into a stream, preserving the order of elements.
391 ///
392 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
393 /// are emitted in the output stream in deterministic order.
394 ///
395 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
396 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
397 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
398 ///
399 /// # Example
400 /// ```rust
401 /// # #[cfg(feature = "deploy")] {
402 /// # use hydro_lang::prelude::*;
403 /// # use futures::StreamExt;
404 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
405 /// let tick = process.tick();
406 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
407 /// singleton.flatten_ordered().all_ticks()
408 /// # }, |mut stream| async move {
409 /// // 1, 2, 3
410 /// # for w in vec![1, 2, 3] {
411 /// # assert_eq!(stream.next().await.unwrap(), w);
412 /// # }
413 /// # }));
414 /// # }
415 /// ```
416 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
417 where
418 T: IntoIterator<Item = U>,
419 {
420 self.flat_map_ordered(q!(|x| x))
421 }
422
423 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
424 /// for the element type `T` to produce items in any order.
425 ///
426 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
427 /// are emitted in the output stream in non-deterministic order.
428 ///
429 /// # Example
430 /// ```rust
431 /// # #[cfg(feature = "deploy")] {
432 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
433 /// # use futures::StreamExt;
434 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
435 /// let tick = process.tick();
436 /// let singleton = tick.singleton(q!(
437 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
438 /// ));
439 /// singleton.flatten_unordered().all_ticks()
440 /// # }, |mut stream| async move {
441 /// // 1, 2, 3, but in no particular order
442 /// # let mut results = Vec::new();
443 /// # for _ in 0..3 {
444 /// # results.push(stream.next().await.unwrap());
445 /// # }
446 /// # results.sort();
447 /// # assert_eq!(results, vec![1, 2, 3]);
448 /// # }));
449 /// # }
450 /// ```
451 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
452 where
453 T: IntoIterator<Item = U>,
454 {
455 self.flat_map_unordered(q!(|x| x))
456 }
457
458 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
459 ///
460 /// If the predicate returns `true`, the output optional contains the same value.
461 /// If the predicate returns `false`, the output optional is empty.
462 ///
463 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
464 /// not modify or take ownership of the value. If you need to modify the value while filtering
465 /// use [`Singleton::filter_map`] instead.
466 ///
467 /// # Example
468 /// ```rust
469 /// # #[cfg(feature = "deploy")] {
470 /// # use hydro_lang::prelude::*;
471 /// # use futures::StreamExt;
472 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
473 /// let tick = process.tick();
474 /// let singleton = tick.singleton(q!(5));
475 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
476 /// # }, |mut stream| async move {
477 /// // 5
478 /// # assert_eq!(stream.next().await.unwrap(), 5);
479 /// # }));
480 /// # }
481 /// ```
482 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
483 where
484 F: Fn(&T) -> bool + 'a,
485 {
486 let f = f.splice_fn1_borrow_ctx(&self.location).into();
487 Optional::new(
488 self.location.clone(),
489 HydroNode::Filter {
490 f,
491 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
492 metadata: self
493 .location
494 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
495 },
496 )
497 }
498
499 /// An operator that both filters and maps. It yields the value only if the supplied
500 /// closure `f` returns `Some(value)`.
501 ///
502 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
503 /// If the closure returns `None`, the output optional is empty.
504 ///
505 /// # Example
506 /// ```rust
507 /// # #[cfg(feature = "deploy")] {
508 /// # use hydro_lang::prelude::*;
509 /// # use futures::StreamExt;
510 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
511 /// let tick = process.tick();
512 /// let singleton = tick.singleton(q!("42"));
513 /// singleton
514 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
515 /// .all_ticks()
516 /// # }, |mut stream| async move {
517 /// // 42
518 /// # assert_eq!(stream.next().await.unwrap(), 42);
519 /// # }));
520 /// # }
521 /// ```
522 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
523 where
524 F: Fn(T) -> Option<U> + 'a,
525 {
526 let f = f.splice_fn1_ctx(&self.location).into();
527 Optional::new(
528 self.location.clone(),
529 HydroNode::FilterMap {
530 f,
531 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
532 metadata: self
533 .location
534 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
535 },
536 )
537 }
538
539 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
540 ///
541 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
542 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
543 /// non-null. This is useful for combining several pieces of state together.
544 ///
545 /// # Example
546 /// ```rust
547 /// # #[cfg(feature = "deploy")] {
548 /// # use hydro_lang::prelude::*;
549 /// # use futures::StreamExt;
550 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
551 /// let tick = process.tick();
552 /// let numbers = process
553 /// .source_iter(q!(vec![123, 456]))
554 /// .batch(&tick, nondet!(/** test */));
555 /// let count = numbers.clone().count(); // Singleton
556 /// let max = numbers.max(); // Optional
557 /// count.zip(max).all_ticks()
558 /// # }, |mut stream| async move {
559 /// // [(2, 456)]
560 /// # for w in vec![(2, 456)] {
561 /// # assert_eq!(stream.next().await.unwrap(), w);
562 /// # }
563 /// # }));
564 /// # }
565 /// ```
566 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
567 where
568 Self: ZipResult<'a, O, Location = L>,
569 B: IsBounded,
570 {
571 check_matching_location(&self.location, &Self::other_location(&other));
572
573 if L::is_top_level()
574 && let Some(tick) = self.location.try_tick()
575 {
576 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
577 let out = zip_inside_tick(
578 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
579 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
580 other_location.clone(),
581 HydroNode::Cast {
582 inner: Box::new(Self::other_ir_node(other)),
583 metadata: other_location.new_node_metadata(Optional::<
584 <Self as ZipResult<'a, O>>::OtherType,
585 Tick<L>,
586 Bounded,
587 >::collection_kind(
588 )),
589 },
590 )
591 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
592 )
593 .latest();
594
595 Self::make(
596 out.location.clone(),
597 out.ir_node.replace(HydroNode::Placeholder),
598 )
599 } else {
600 Self::make(
601 self.location.clone(),
602 HydroNode::CrossSingleton {
603 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
604 right: Box::new(Self::other_ir_node(other)),
605 metadata: self.location.new_node_metadata(CollectionKind::Optional {
606 bound: B::BOUND_KIND,
607 element_type: stageleft::quote_type::<
608 <Self as ZipResult<'a, O>>::ElementType,
609 >()
610 .into(),
611 }),
612 },
613 )
614 }
615 }
616
617 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
618 /// boolean signal is `true`, otherwise the output is null.
619 ///
620 /// # Example
621 /// ```rust
622 /// # #[cfg(feature = "deploy")] {
623 /// # use hydro_lang::prelude::*;
624 /// # use futures::StreamExt;
625 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
626 /// let tick = process.tick();
627 /// // ticks are lazy by default, forces the second tick to run
628 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
629 ///
630 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
631 /// let batch_first_tick = process
632 /// .source_iter(q!(vec![1]))
633 /// .batch(&tick, nondet!(/** test */));
634 /// let batch_second_tick = process
635 /// .source_iter(q!(vec![1, 2, 3]))
636 /// .batch(&tick, nondet!(/** test */))
637 /// .defer_tick();
638 /// batch_first_tick.chain(batch_second_tick).count()
639 /// .filter_if(signal)
640 /// .all_ticks()
641 /// # }, |mut stream| async move {
642 /// // [1]
643 /// # for w in vec![1] {
644 /// # assert_eq!(stream.next().await.unwrap(), w);
645 /// # }
646 /// # }));
647 /// # }
648 /// ```
649 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
650 where
651 B: IsBounded,
652 {
653 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
654 }
655
656 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
657 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
658 ///
659 /// Useful for conditionally processing, such as only emitting a singleton's value outside
660 /// a tick if some other condition is satisfied.
661 ///
662 /// # Example
663 /// ```rust
664 /// # #[cfg(feature = "deploy")] {
665 /// # use hydro_lang::prelude::*;
666 /// # use futures::StreamExt;
667 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
668 /// let tick = process.tick();
669 /// // ticks are lazy by default, forces the second tick to run
670 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
671 ///
672 /// let batch_first_tick = process
673 /// .source_iter(q!(vec![1]))
674 /// .batch(&tick, nondet!(/** test */));
675 /// let batch_second_tick = process
676 /// .source_iter(q!(vec![1, 2, 3]))
677 /// .batch(&tick, nondet!(/** test */))
678 /// .defer_tick(); // appears on the second tick
679 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
680 /// batch_first_tick.chain(batch_second_tick).count()
681 /// .filter_if_some(some_on_first_tick)
682 /// .all_ticks()
683 /// # }, |mut stream| async move {
684 /// // [1]
685 /// # for w in vec![1] {
686 /// # assert_eq!(stream.next().await.unwrap(), w);
687 /// # }
688 /// # }));
689 /// # }
690 /// ```
691 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
692 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
693 where
694 B: IsBounded,
695 {
696 self.filter_if(signal.is_some())
697 }
698
699 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
700 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
701 ///
702 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
703 /// the condition.
704 ///
705 /// # Example
706 /// ```rust
707 /// # #[cfg(feature = "deploy")] {
708 /// # use hydro_lang::prelude::*;
709 /// # use futures::StreamExt;
710 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
711 /// let tick = process.tick();
712 /// // ticks are lazy by default, forces the second tick to run
713 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
714 ///
715 /// let batch_first_tick = process
716 /// .source_iter(q!(vec![1]))
717 /// .batch(&tick, nondet!(/** test */));
718 /// let batch_second_tick = process
719 /// .source_iter(q!(vec![1, 2, 3]))
720 /// .batch(&tick, nondet!(/** test */))
721 /// .defer_tick(); // appears on the second tick
722 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
723 /// batch_first_tick.chain(batch_second_tick).count()
724 /// .filter_if_none(some_on_first_tick)
725 /// .all_ticks()
726 /// # }, |mut stream| async move {
727 /// // [3]
728 /// # for w in vec![3] {
729 /// # assert_eq!(stream.next().await.unwrap(), w);
730 /// # }
731 /// # }));
732 /// # }
733 /// ```
734 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
735 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
736 where
737 B: IsBounded,
738 {
739 self.filter_if(other.is_none())
740 }
741
742 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
743 ///
744 /// # Example
745 /// ```rust
746 /// # #[cfg(feature = "deploy")] {
747 /// # use hydro_lang::prelude::*;
748 /// # use futures::StreamExt;
749 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
750 /// let tick = process.tick();
751 /// let a = tick.singleton(q!(5));
752 /// let b = tick.singleton(q!(5));
753 /// a.equals(b).all_ticks()
754 /// # }, |mut stream| async move {
755 /// // [true]
756 /// # assert_eq!(stream.next().await.unwrap(), true);
757 /// # }));
758 /// # }
759 /// ```
760 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
761 where
762 T: PartialEq,
763 B: IsBounded,
764 {
765 self.zip(other).map(q!(|(a, b)| a == b))
766 }
767
768 /// An operator which allows you to "name" a `HydroNode`.
769 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
770 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
771 {
772 let mut node = self.ir_node.borrow_mut();
773 let metadata = node.metadata_mut();
774 metadata.tag = Some(name.to_owned());
775 }
776 self
777 }
778}
779
780impl<'a, L: Location<'a>, B: Boundedness> Not for Singleton<bool, L, B> {
781 type Output = Singleton<bool, L, B>;
782
783 fn not(self) -> Self::Output {
784 self.map(q!(|b| !b))
785 }
786}
787
788impl<'a, T, L, B: Boundedness> Singleton<Option<T>, L, B>
789where
790 L: Location<'a>,
791{
792 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
793 /// the inner `Option`.
794 ///
795 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
796 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
797 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
798 ///
799 /// # Example
800 /// ```rust
801 /// # #[cfg(feature = "deploy")] {
802 /// # use hydro_lang::prelude::*;
803 /// # use futures::StreamExt;
804 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
805 /// let tick = process.tick();
806 /// let singleton = tick.singleton(q!(Some(42)));
807 /// singleton.into_optional().all_ticks()
808 /// # }, |mut stream| async move {
809 /// // 42
810 /// # assert_eq!(stream.next().await.unwrap(), 42);
811 /// # }));
812 /// # }
813 /// ```
814 pub fn into_optional(self) -> Optional<T, L, B> {
815 self.filter_map(q!(|v| v))
816 }
817}
818
819impl<'a, L, B: Boundedness> Singleton<bool, L, B>
820where
821 L: Location<'a>,
822{
823 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
824 ///
825 /// # Example
826 /// ```rust
827 /// # #[cfg(feature = "deploy")] {
828 /// # use hydro_lang::prelude::*;
829 /// # use futures::StreamExt;
830 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
831 /// let tick = process.tick();
832 /// // ticks are lazy by default, forces the second tick to run
833 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
834 ///
835 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
836 /// let b = tick.singleton(q!(true)); // true, true
837 /// a.and(b).all_ticks()
838 /// # }, |mut stream| async move {
839 /// // [true, false]
840 /// # for w in vec![true, false] {
841 /// # assert_eq!(stream.next().await.unwrap(), w);
842 /// # }
843 /// # }));
844 /// # }
845 /// ```
846 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, B>
847 where
848 B: IsBounded,
849 {
850 self.zip(other).map(q!(|(a, b)| a && b))
851 }
852
853 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
854 ///
855 /// # Example
856 /// ```rust
857 /// # #[cfg(feature = "deploy")] {
858 /// # use hydro_lang::prelude::*;
859 /// # use futures::StreamExt;
860 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
861 /// let tick = process.tick();
862 /// // ticks are lazy by default, forces the second tick to run
863 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
864 ///
865 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
866 /// let b = tick.singleton(q!(false)); // false, false
867 /// a.or(b).all_ticks()
868 /// # }, |mut stream| async move {
869 /// // [true, false]
870 /// # for w in vec![true, false] {
871 /// # assert_eq!(stream.next().await.unwrap(), w);
872 /// # }
873 /// # }));
874 /// # }
875 /// ```
876 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, B>
877 where
878 B: IsBounded,
879 {
880 self.zip(other).map(q!(|(a, b)| a || b))
881 }
882}
883
884impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
885where
886 L: Location<'a> + NoTick,
887{
888 /// Returns a singleton value corresponding to the latest snapshot of the singleton
889 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
890 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
891 /// all snapshots of this singleton into the atomic-associated tick will observe the
892 /// same value each tick.
893 ///
894 /// # Non-Determinism
895 /// Because this picks a snapshot of a singleton whose value is continuously changing,
896 /// the output singleton has a non-deterministic value since the snapshot can be at an
897 /// arbitrary point in time.
898 pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
899 Singleton::new(
900 self.location.clone().tick,
901 HydroNode::Batch {
902 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
903 metadata: self
904 .location
905 .tick
906 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
907 },
908 )
909 }
910
911 /// Returns this singleton back into a top-level, asynchronous execution context where updates
912 /// to the value will be asynchronously propagated.
913 pub fn end_atomic(self) -> Singleton<T, L, B> {
914 Singleton::new(
915 self.location.tick.l.clone(),
916 HydroNode::EndAtomic {
917 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
918 metadata: self
919 .location
920 .tick
921 .l
922 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
923 },
924 )
925 }
926}
927
928impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
929where
930 L: Location<'a>,
931{
932 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
933 /// will observe the same version of the value and will be executed synchronously before any
934 /// outputs are yielded (in [`Optional::end_atomic`]).
935 ///
936 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
937 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
938 /// a different version).
939 ///
940 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
941 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
942 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
943 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
944 let out_location = Atomic { tick: tick.clone() };
945 Singleton::new(
946 out_location.clone(),
947 HydroNode::BeginAtomic {
948 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
949 metadata: out_location
950 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
951 },
952 )
953 }
954
955 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
956 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
957 /// relevant data that contributed to the snapshot at tick `t`.
958 ///
959 /// # Non-Determinism
960 /// Because this picks a snapshot of a singleton whose value is continuously changing,
961 /// the output singleton has a non-deterministic value since the snapshot can be at an
962 /// arbitrary point in time.
963 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
964 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
965 Singleton::new(
966 tick.clone(),
967 HydroNode::Batch {
968 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
969 metadata: tick
970 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
971 },
972 )
973 }
974
975 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
976 /// with order corresponding to increasing prefixes of data contributing to the singleton.
977 ///
978 /// # Non-Determinism
979 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
980 /// to non-deterministic batching and arrival of inputs, the output stream is
981 /// non-deterministic.
982 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
983 where
984 L: NoTick,
985 {
986 sliced! {
987 let snapshot = use(self, nondet);
988 snapshot.into_stream()
989 }
990 .weaken_retries()
991 }
992
993 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
994 /// value taken at various points in time. Because the input singleton may be
995 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
996 /// represent the value of the singleton given some prefix of the streams leading up to
997 /// it.
998 ///
999 /// # Non-Determinism
1000 /// The output stream is non-deterministic in which elements are sampled, since this
1001 /// is controlled by a clock.
1002 pub fn sample_every(
1003 self,
1004 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1005 nondet: NonDet,
1006 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1007 where
1008 L: NoTick + NoAtomic,
1009 {
1010 let samples = self.location.source_interval(interval, nondet);
1011 sliced! {
1012 let snapshot = use(self, nondet);
1013 let sample_batch = use(samples, nondet);
1014
1015 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1016 }
1017 .weaken_retries()
1018 }
1019
1020 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1021 /// implies that `B == Bounded`.
1022 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1023 where
1024 B: IsBounded,
1025 {
1026 Singleton::new(
1027 self.location.clone(),
1028 self.ir_node.replace(HydroNode::Placeholder),
1029 )
1030 }
1031
1032 /// Clones this bounded singleton into a tick, returning a singleton that has the
1033 /// same value as the outer singleton. Because the outer singleton is bounded, this
1034 /// is deterministic because there is only a single immutable version.
1035 pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1036 where
1037 B: IsBounded,
1038 T: Clone,
1039 {
1040 // TODO(shadaj): avoid printing simulator logs for this snapshot
1041 self.snapshot(
1042 tick,
1043 nondet!(/** bounded top-level singleton so deterministic */),
1044 )
1045 }
1046
1047 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1048 ///
1049 /// # Example
1050 /// ```rust
1051 /// # #[cfg(feature = "deploy")] {
1052 /// # use hydro_lang::prelude::*;
1053 /// # use futures::StreamExt;
1054 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1055 /// let tick = process.tick();
1056 /// let batch_input = process
1057 /// .source_iter(q!(vec![123, 456]))
1058 /// .batch(&tick, nondet!(/** test */));
1059 /// batch_input.clone().chain(
1060 /// batch_input.count().into_stream()
1061 /// ).all_ticks()
1062 /// # }, |mut stream| async move {
1063 /// // [123, 456, 2]
1064 /// # for w in vec![123, 456, 2] {
1065 /// # assert_eq!(stream.next().await.unwrap(), w);
1066 /// # }
1067 /// # }));
1068 /// # }
1069 /// ```
1070 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1071 where
1072 B: IsBounded,
1073 {
1074 Stream::new(
1075 self.location.clone(),
1076 HydroNode::Cast {
1077 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1078 metadata: self.location.new_node_metadata(Stream::<
1079 T,
1080 Tick<L>,
1081 Bounded,
1082 TotalOrder,
1083 ExactlyOnce,
1084 >::collection_kind()),
1085 },
1086 )
1087 }
1088}
1089
1090impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1091where
1092 L: Location<'a>,
1093{
1094 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1095 /// which will stream the value computed in _each_ tick as a separate stream element.
1096 ///
1097 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1098 /// producing one element in the output for each tick. This is useful for batched computations,
1099 /// where the results from each tick must be combined together.
1100 ///
1101 /// # Example
1102 /// ```rust
1103 /// # #[cfg(feature = "deploy")] {
1104 /// # use hydro_lang::prelude::*;
1105 /// # use futures::StreamExt;
1106 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1107 /// let tick = process.tick();
1108 /// # // ticks are lazy by default, forces the second tick to run
1109 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1110 /// # let batch_first_tick = process
1111 /// # .source_iter(q!(vec![1]))
1112 /// # .batch(&tick, nondet!(/** test */));
1113 /// # let batch_second_tick = process
1114 /// # .source_iter(q!(vec![1, 2, 3]))
1115 /// # .batch(&tick, nondet!(/** test */))
1116 /// # .defer_tick(); // appears on the second tick
1117 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1118 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1119 /// .count()
1120 /// .all_ticks()
1121 /// # }, |mut stream| async move {
1122 /// // [1, 3]
1123 /// # for w in vec![1, 3] {
1124 /// # assert_eq!(stream.next().await.unwrap(), w);
1125 /// # }
1126 /// # }));
1127 /// # }
1128 /// ```
1129 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1130 self.into_stream().all_ticks()
1131 }
1132
1133 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1134 /// which will stream the value computed in _each_ tick as a separate stream element.
1135 ///
1136 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1137 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1138 /// singleton's [`Tick`] context.
1139 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1140 self.into_stream().all_ticks_atomic()
1141 }
1142
1143 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1144 /// be asynchronously updated with the latest value of the singleton inside the tick.
1145 ///
1146 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1147 /// tick that tracks the inner value. This is useful for getting the value as of the
1148 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1149 ///
1150 /// # Example
1151 /// ```rust
1152 /// # #[cfg(feature = "deploy")] {
1153 /// # use hydro_lang::prelude::*;
1154 /// # use futures::StreamExt;
1155 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1156 /// let tick = process.tick();
1157 /// # // ticks are lazy by default, forces the second tick to run
1158 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1159 /// # let batch_first_tick = process
1160 /// # .source_iter(q!(vec![1]))
1161 /// # .batch(&tick, nondet!(/** test */));
1162 /// # let batch_second_tick = process
1163 /// # .source_iter(q!(vec![1, 2, 3]))
1164 /// # .batch(&tick, nondet!(/** test */))
1165 /// # .defer_tick(); // appears on the second tick
1166 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1167 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1168 /// .count()
1169 /// .latest()
1170 /// # .sample_eager(nondet!(/** test */))
1171 /// # }, |mut stream| async move {
1172 /// // asynchronously changes from 1 ~> 3
1173 /// # for w in vec![1, 3] {
1174 /// # assert_eq!(stream.next().await.unwrap(), w);
1175 /// # }
1176 /// # }));
1177 /// # }
1178 /// ```
1179 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1180 Singleton::new(
1181 self.location.outer().clone(),
1182 HydroNode::YieldConcat {
1183 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1184 metadata: self
1185 .location
1186 .outer()
1187 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1188 },
1189 )
1190 }
1191
1192 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1193 /// be updated with the latest value of the singleton inside the tick.
1194 ///
1195 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1196 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1197 /// singleton's [`Tick`] context.
1198 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1199 let out_location = Atomic {
1200 tick: self.location.clone(),
1201 };
1202 Singleton::new(
1203 out_location.clone(),
1204 HydroNode::YieldConcat {
1205 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1206 metadata: out_location
1207 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1208 },
1209 )
1210 }
1211}
1212
1213#[doc(hidden)]
1214/// Helper trait that determines the output collection type for [`Singleton::zip`].
1215///
1216/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1217/// [`Singleton`].
1218#[sealed::sealed]
1219pub trait ZipResult<'a, Other> {
1220 /// The output collection type.
1221 type Out;
1222 /// The type of the tupled output value.
1223 type ElementType;
1224 /// The type of the other collection's value.
1225 type OtherType;
1226 /// The location where the tupled result will be materialized.
1227 type Location: Location<'a>;
1228
1229 /// The location of the second input to the `zip`.
1230 fn other_location(other: &Other) -> Self::Location;
1231 /// The IR node of the second input to the `zip`.
1232 fn other_ir_node(other: Other) -> HydroNode;
1233
1234 /// Constructs the output live collection given an IR node containing the zip result.
1235 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1236}
1237
1238#[sealed::sealed]
1239impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1240where
1241 L: Location<'a>,
1242{
1243 type Out = Singleton<(T, U), L, B>;
1244 type ElementType = (T, U);
1245 type OtherType = U;
1246 type Location = L;
1247
1248 fn other_location(other: &Singleton<U, L, B>) -> L {
1249 other.location.clone()
1250 }
1251
1252 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1253 other.ir_node.replace(HydroNode::Placeholder)
1254 }
1255
1256 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1257 Singleton::new(
1258 location.clone(),
1259 HydroNode::Cast {
1260 inner: Box::new(ir_node),
1261 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1262 },
1263 )
1264 }
1265}
1266
1267#[sealed::sealed]
1268impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1269where
1270 L: Location<'a>,
1271{
1272 type Out = Optional<(T, U), L, B>;
1273 type ElementType = (T, U);
1274 type OtherType = U;
1275 type Location = L;
1276
1277 fn other_location(other: &Optional<U, L, B>) -> L {
1278 other.location.clone()
1279 }
1280
1281 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1282 other.ir_node.replace(HydroNode::Placeholder)
1283 }
1284
1285 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1286 Optional::new(location, ir_node)
1287 }
1288}
1289
1290#[cfg(test)]
1291mod tests {
1292 #[cfg(feature = "deploy")]
1293 use futures::{SinkExt, StreamExt};
1294 #[cfg(feature = "deploy")]
1295 use hydro_deploy::Deployment;
1296 #[cfg(any(feature = "deploy", feature = "sim"))]
1297 use stageleft::q;
1298
1299 #[cfg(any(feature = "deploy", feature = "sim"))]
1300 use crate::compile::builder::FlowBuilder;
1301 #[cfg(feature = "deploy")]
1302 use crate::live_collections::stream::ExactlyOnce;
1303 #[cfg(any(feature = "deploy", feature = "sim"))]
1304 use crate::location::Location;
1305 #[cfg(any(feature = "deploy", feature = "sim"))]
1306 use crate::nondet::nondet;
1307
1308 #[cfg(feature = "deploy")]
1309 #[tokio::test]
1310 async fn tick_cycle_cardinality() {
1311 let mut deployment = Deployment::new();
1312
1313 let mut flow = FlowBuilder::new();
1314 let node = flow.process::<()>();
1315 let external = flow.external::<()>();
1316
1317 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1318
1319 let node_tick = node.tick();
1320 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1321 let counts = singleton
1322 .clone()
1323 .into_stream()
1324 .count()
1325 .filter_if(
1326 input
1327 .batch(&node_tick, nondet!(/** testing */))
1328 .first()
1329 .is_some(),
1330 )
1331 .all_ticks()
1332 .send_bincode_external(&external);
1333 complete_cycle.complete_next_tick(singleton);
1334
1335 let nodes = flow
1336 .with_process(&node, deployment.Localhost())
1337 .with_external(&external, deployment.Localhost())
1338 .deploy(&mut deployment);
1339
1340 deployment.deploy().await.unwrap();
1341
1342 let mut tick_trigger = nodes.connect(input_send).await;
1343 let mut external_out = nodes.connect(counts).await;
1344
1345 deployment.start().await.unwrap();
1346
1347 tick_trigger.send(()).await.unwrap();
1348
1349 assert_eq!(external_out.next().await.unwrap(), 1);
1350
1351 tick_trigger.send(()).await.unwrap();
1352
1353 assert_eq!(external_out.next().await.unwrap(), 1);
1354 }
1355
1356 #[cfg(feature = "sim")]
1357 #[test]
1358 #[should_panic]
1359 fn sim_fold_intermediate_states() {
1360 let mut flow = FlowBuilder::new();
1361 let node = flow.process::<()>();
1362
1363 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1364 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1365
1366 let tick = node.tick();
1367 let batch = folded.snapshot(&tick, nondet!(/** test */));
1368 let out_recv = batch.all_ticks().sim_output();
1369
1370 flow.sim().exhaustive(async || {
1371 assert_eq!(out_recv.next().await.unwrap(), 10);
1372 });
1373 }
1374
1375 #[cfg(feature = "sim")]
1376 #[test]
1377 fn sim_fold_intermediate_state_count() {
1378 let mut flow = FlowBuilder::new();
1379 let node = flow.process::<()>();
1380
1381 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1382 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1383
1384 let tick = node.tick();
1385 let batch = folded.snapshot(&tick, nondet!(/** test */));
1386 let out_recv = batch.all_ticks().sim_output();
1387
1388 let instance_count = flow.sim().exhaustive(async || {
1389 let out = out_recv.collect::<Vec<_>>().await;
1390 assert_eq!(out.last(), Some(&10));
1391 });
1392
1393 assert_eq!(
1394 instance_count,
1395 16 // 2^4 possible subsets of intermediates (including initial state)
1396 )
1397 }
1398
1399 #[cfg(feature = "sim")]
1400 #[test]
1401 fn sim_fold_no_repeat_initial() {
1402 // check that we don't repeat the initial state of the fold in autonomous decisions
1403
1404 let mut flow = FlowBuilder::new();
1405 let node = flow.process::<()>();
1406
1407 let (in_port, input) = node.sim_input();
1408 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1409
1410 let tick = node.tick();
1411 let batch = folded.snapshot(&tick, nondet!(/** test */));
1412 let out_recv = batch.all_ticks().sim_output();
1413
1414 flow.sim().exhaustive(async || {
1415 assert_eq!(out_recv.next().await.unwrap(), 0);
1416
1417 in_port.send(123);
1418
1419 assert_eq!(out_recv.next().await.unwrap(), 123);
1420 });
1421 }
1422
1423 #[cfg(feature = "sim")]
1424 #[test]
1425 #[should_panic]
1426 fn sim_fold_repeats_snapshots() {
1427 // when the tick is driven by a snapshot AND something else, the snapshot can
1428 // "stutter" and repeat the same state multiple times
1429
1430 let mut flow = FlowBuilder::new();
1431 let node = flow.process::<()>();
1432
1433 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1434 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1435
1436 let tick = node.tick();
1437 let batch = source
1438 .batch(&tick, nondet!(/** test */))
1439 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1440 let out_recv = batch.all_ticks().sim_output();
1441
1442 flow.sim().exhaustive(async || {
1443 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1444 {
1445 panic!("repeated snapshot");
1446 }
1447 });
1448 }
1449
1450 #[cfg(feature = "sim")]
1451 #[test]
1452 fn sim_fold_repeats_snapshots_count() {
1453 // check the number of instances
1454 let mut flow = FlowBuilder::new();
1455 let node = flow.process::<()>();
1456
1457 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1458 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1459
1460 let tick = node.tick();
1461 let batch = source
1462 .batch(&tick, nondet!(/** test */))
1463 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1464 let out_recv = batch.all_ticks().sim_output();
1465
1466 let count = flow.sim().exhaustive(async || {
1467 let _ = out_recv.collect::<Vec<_>>().await;
1468 });
1469
1470 assert_eq!(count, 52);
1471 // don't have a combinatorial explanation for this number yet, but checked via logs
1472 }
1473
1474 #[cfg(feature = "sim")]
1475 #[test]
1476 fn sim_top_level_singleton_exhaustive() {
1477 // ensures that top-level singletons have only one snapshot
1478 let mut flow = FlowBuilder::new();
1479 let node = flow.process::<()>();
1480
1481 let singleton = node.singleton(q!(1));
1482 let tick = node.tick();
1483 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1484 let out_recv = batch.all_ticks().sim_output();
1485
1486 let count = flow.sim().exhaustive(async || {
1487 let _ = out_recv.collect::<Vec<_>>().await;
1488 });
1489
1490 assert_eq!(count, 1);
1491 }
1492
1493 #[cfg(feature = "sim")]
1494 #[test]
1495 fn sim_top_level_singleton_join_count() {
1496 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1497 // exploration
1498
1499 let mut flow = FlowBuilder::new();
1500 let node = flow.process::<()>();
1501
1502 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1503 let tick = node.tick();
1504 let batch = source_iter
1505 .batch(&tick, nondet!(/** test */))
1506 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1507 let out_recv = batch.all_ticks().sim_output();
1508
1509 let instance_count = flow.sim().exhaustive(async || {
1510 let _ = out_recv.collect::<Vec<_>>().await;
1511 });
1512
1513 assert_eq!(
1514 instance_count,
1515 16 // 2^4 ways to split up (including a possibly empty first batch)
1516 )
1517 }
1518
1519 #[cfg(feature = "sim")]
1520 #[test]
1521 fn top_level_singleton_into_stream_no_replay() {
1522 let mut flow = FlowBuilder::new();
1523 let node = flow.process::<()>();
1524
1525 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1526 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1527
1528 let out_recv = folded.into_stream().sim_output();
1529
1530 flow.sim().exhaustive(async || {
1531 out_recv.assert_yields_only([10]).await;
1532 });
1533 }
1534
1535 #[cfg(feature = "sim")]
1536 #[test]
1537 fn inside_tick_singleton_zip() {
1538 use crate::live_collections::Stream;
1539 use crate::live_collections::sliced::sliced;
1540
1541 let mut flow = FlowBuilder::new();
1542 let node = flow.process::<()>();
1543
1544 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1545 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1546
1547 let out_recv = sliced! {
1548 let v = use(folded, nondet!(/** test */));
1549 v.clone().zip(v).into_stream()
1550 }
1551 .sim_output();
1552
1553 let count = flow.sim().exhaustive(async || {
1554 let out = out_recv.collect::<Vec<_>>().await;
1555 assert_eq!(out.last(), Some(&(3, 3)));
1556 });
1557
1558 assert_eq!(count, 4);
1559 }
1560}