Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::batch_atomic::BatchAtomic;
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::{Atomic, DeferTick, NoAtomic};
28use crate::location::{Location, NoTick, Tick, check_matching_location};
29use crate::nondet::{NonDet, nondet};
30use crate::prelude::ManualProof;
31use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
32
33pub mod networking;
34
35/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
36#[sealed::sealed]
37pub trait Ordering:
38    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
39{
40    /// The [`StreamOrder`] corresponding to this type.
41    const ORDERING_KIND: StreamOrder;
42}
43
44/// Marks the stream as being totally ordered, which means that there are
45/// no sources of non-determinism (other than intentional ones) that will
46/// affect the order of elements.
47pub enum TotalOrder {}
48
49#[sealed::sealed]
50impl Ordering for TotalOrder {
51    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
52}
53
54/// Marks the stream as having no order, which means that the order of
55/// elements may be affected by non-determinism.
56///
57/// This restricts certain operators, such as `fold` and `reduce`, to only
58/// be used with commutative aggregation functions.
59pub enum NoOrder {}
60
61#[sealed::sealed]
62impl Ordering for NoOrder {
63    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
64}
65
66/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
67/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
68/// have `Self` guarantees instead.
69#[sealed::sealed]
70pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
71#[sealed::sealed]
72impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
73
74/// Helper trait for determining the weakest of two orderings.
75#[sealed::sealed]
76pub trait MinOrder<Other: ?Sized> {
77    /// The weaker of the two orderings.
78    type Min: Ordering;
79}
80
81#[sealed::sealed]
82impl<O: Ordering> MinOrder<O> for TotalOrder {
83    type Min = O;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for NoOrder {
88    type Min = NoOrder;
89}
90
91/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
92#[sealed::sealed]
93pub trait Retries:
94    MinRetries<Self, Min = Self>
95    + MinRetries<ExactlyOnce, Min = Self>
96    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
97{
98    /// The [`StreamRetry`] corresponding to this type.
99    const RETRIES_KIND: StreamRetry;
100}
101
102/// Marks the stream as having deterministic message cardinality, with no
103/// possibility of duplicates.
104pub enum ExactlyOnce {}
105
106#[sealed::sealed]
107impl Retries for ExactlyOnce {
108    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
109}
110
111/// Marks the stream as having non-deterministic message cardinality, which
112/// means that duplicates may occur, but messages will not be dropped.
113pub enum AtLeastOnce {}
114
115#[sealed::sealed]
116impl Retries for AtLeastOnce {
117    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
118}
119
120/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
121/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
122/// have `Self` guarantees instead.
123#[sealed::sealed]
124pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
125#[sealed::sealed]
126impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
127
128/// Helper trait for determining the weakest of two retry guarantees.
129#[sealed::sealed]
130pub trait MinRetries<Other: ?Sized> {
131    /// The weaker of the two retry guarantees.
132    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
133}
134
135#[sealed::sealed]
136impl<R: Retries> MinRetries<R> for ExactlyOnce {
137    type Min = R;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for AtLeastOnce {
142    type Min = AtLeastOnce;
143}
144
145/// Streaming sequence of elements with type `Type`.
146///
147/// This live collection represents a growing sequence of elements, with new elements being
148/// asynchronously appended to the end of the sequence. This can be used to model the arrival
149/// of network input, such as API requests, or streaming ingestion.
150///
151/// By default, all streams have deterministic ordering and each element is materialized exactly
152/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
153/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
154/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
155///
156/// Type Parameters:
157/// - `Type`: the type of elements in the stream
158/// - `Loc`: the location where the stream is being materialized
159/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
160/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
161///   (default is [`TotalOrder`])
162/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
163///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
164pub struct Stream<
165    Type,
166    Loc,
167    Bound: Boundedness = Unbounded,
168    Order: Ordering = TotalOrder,
169    Retry: Retries = ExactlyOnce,
170> {
171    pub(crate) location: Loc,
172    pub(crate) ir_node: RefCell<HydroNode>,
173
174    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
175}
176
177impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
178    for Stream<T, L, Unbounded, O, R>
179where
180    L: Location<'a>,
181{
182    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
183        let new_meta = stream
184            .location
185            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
186
187        Stream {
188            location: stream.location,
189            ir_node: RefCell::new(HydroNode::Cast {
190                inner: Box::new(stream.ir_node.into_inner()),
191                metadata: new_meta,
192            }),
193            _phantom: PhantomData,
194        }
195    }
196}
197
198impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
199    for Stream<T, L, B, NoOrder, R>
200where
201    L: Location<'a>,
202{
203    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
204        stream.weaken_ordering()
205    }
206}
207
208impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
209    for Stream<T, L, B, O, AtLeastOnce>
210where
211    L: Location<'a>,
212{
213    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
214        stream.weaken_retries()
215    }
216}
217
218impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
219where
220    L: Location<'a>,
221{
222    fn defer_tick(self) -> Self {
223        Stream::defer_tick(self)
224    }
225}
226
227impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
228    for Stream<T, Tick<L>, Bounded, O, R>
229where
230    L: Location<'a>,
231{
232    type Location = Tick<L>;
233
234    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
235        Stream::new(
236            location.clone(),
237            HydroNode::CycleSource {
238                ident,
239                metadata: location.new_node_metadata(Self::collection_kind()),
240            },
241        )
242    }
243}
244
245impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
246    for Stream<T, Tick<L>, Bounded, O, R>
247where
248    L: Location<'a>,
249{
250    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
251        assert_eq!(
252            Location::id(&self.location),
253            expected_location,
254            "locations do not match"
255        );
256        self.location
257            .flow_state()
258            .borrow_mut()
259            .push_root(HydroRoot::CycleSink {
260                ident,
261                input: Box::new(self.ir_node.into_inner()),
262                op_metadata: HydroIrOpMetadata::new(),
263            });
264    }
265}
266
267impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
268    for Stream<T, L, B, O, R>
269where
270    L: Location<'a> + NoTick,
271{
272    type Location = L;
273
274    fn create_source(ident: syn::Ident, location: L) -> Self {
275        Stream::new(
276            location.clone(),
277            HydroNode::CycleSource {
278                ident,
279                metadata: location.new_node_metadata(Self::collection_kind()),
280            },
281        )
282    }
283}
284
285impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
286    for Stream<T, L, B, O, R>
287where
288    L: Location<'a> + NoTick,
289{
290    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
291        assert_eq!(
292            Location::id(&self.location),
293            expected_location,
294            "locations do not match"
295        );
296        self.location
297            .flow_state()
298            .borrow_mut()
299            .push_root(HydroRoot::CycleSink {
300                ident,
301                input: Box::new(self.ir_node.into_inner()),
302                op_metadata: HydroIrOpMetadata::new(),
303            });
304    }
305}
306
307impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
308where
309    T: Clone,
310    L: Location<'a>,
311{
312    fn clone(&self) -> Self {
313        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
314            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
315            *self.ir_node.borrow_mut() = HydroNode::Tee {
316                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
317                metadata: self.location.new_node_metadata(Self::collection_kind()),
318            };
319        }
320
321        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
322            Stream {
323                location: self.location.clone(),
324                ir_node: HydroNode::Tee {
325                    inner: TeeNode(inner.0.clone()),
326                    metadata: metadata.clone(),
327                }
328                .into(),
329                _phantom: PhantomData,
330            }
331        } else {
332            unreachable!()
333        }
334    }
335}
336
337impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
338where
339    L: Location<'a>,
340{
341    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
342        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
343        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
344
345        Stream {
346            location,
347            ir_node: RefCell::new(ir_node),
348            _phantom: PhantomData,
349        }
350    }
351
352    /// Returns the [`Location`] where this stream is being materialized.
353    pub fn location(&self) -> &L {
354        &self.location
355    }
356
357    pub(crate) fn collection_kind() -> CollectionKind {
358        CollectionKind::Stream {
359            bound: B::BOUND_KIND,
360            order: O::ORDERING_KIND,
361            retry: R::RETRIES_KIND,
362            element_type: quote_type::<T>().into(),
363        }
364    }
365
366    /// Produces a stream based on invoking `f` on each element.
367    /// If you do not want to modify the stream and instead only want to view
368    /// each item use [`Stream::inspect`] instead.
369    ///
370    /// # Example
371    /// ```rust
372    /// # #[cfg(feature = "deploy")] {
373    /// # use hydro_lang::prelude::*;
374    /// # use futures::StreamExt;
375    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
376    /// let words = process.source_iter(q!(vec!["hello", "world"]));
377    /// words.map(q!(|x| x.to_uppercase()))
378    /// # }, |mut stream| async move {
379    /// # for w in vec!["HELLO", "WORLD"] {
380    /// #     assert_eq!(stream.next().await.unwrap(), w);
381    /// # }
382    /// # }));
383    /// # }
384    /// ```
385    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
386    where
387        F: Fn(T) -> U + 'a,
388    {
389        let f = f.splice_fn1_ctx(&self.location).into();
390        Stream::new(
391            self.location.clone(),
392            HydroNode::Map {
393                f,
394                input: Box::new(self.ir_node.into_inner()),
395                metadata: self
396                    .location
397                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
398            },
399        )
400    }
401
402    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
403    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
404    /// for the output type `U` must produce items in a **deterministic** order.
405    ///
406    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
407    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
408    ///
409    /// # Example
410    /// ```rust
411    /// # #[cfg(feature = "deploy")] {
412    /// # use hydro_lang::prelude::*;
413    /// # use futures::StreamExt;
414    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
415    /// process
416    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
417    ///     .flat_map_ordered(q!(|x| x))
418    /// # }, |mut stream| async move {
419    /// // 1, 2, 3, 4
420    /// # for w in (1..5) {
421    /// #     assert_eq!(stream.next().await.unwrap(), w);
422    /// # }
423    /// # }));
424    /// # }
425    /// ```
426    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
427    where
428        I: IntoIterator<Item = U>,
429        F: Fn(T) -> I + 'a,
430    {
431        let f = f.splice_fn1_ctx(&self.location).into();
432        Stream::new(
433            self.location.clone(),
434            HydroNode::FlatMap {
435                f,
436                input: Box::new(self.ir_node.into_inner()),
437                metadata: self
438                    .location
439                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
440            },
441        )
442    }
443
444    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
445    /// for the output type `U` to produce items in any order.
446    ///
447    /// # Example
448    /// ```rust
449    /// # #[cfg(feature = "deploy")] {
450    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
451    /// # use futures::StreamExt;
452    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
453    /// process
454    ///     .source_iter(q!(vec![
455    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
456    ///         std::collections::HashSet::from_iter(vec![3, 4]),
457    ///     ]))
458    ///     .flat_map_unordered(q!(|x| x))
459    /// # }, |mut stream| async move {
460    /// // 1, 2, 3, 4, but in no particular order
461    /// # let mut results = Vec::new();
462    /// # for w in (1..5) {
463    /// #     results.push(stream.next().await.unwrap());
464    /// # }
465    /// # results.sort();
466    /// # assert_eq!(results, vec![1, 2, 3, 4]);
467    /// # }));
468    /// # }
469    /// ```
470    pub fn flat_map_unordered<U, I, F>(
471        self,
472        f: impl IntoQuotedMut<'a, F, L>,
473    ) -> Stream<U, L, B, NoOrder, R>
474    where
475        I: IntoIterator<Item = U>,
476        F: Fn(T) -> I + 'a,
477    {
478        let f = f.splice_fn1_ctx(&self.location).into();
479        Stream::new(
480            self.location.clone(),
481            HydroNode::FlatMap {
482                f,
483                input: Box::new(self.ir_node.into_inner()),
484                metadata: self
485                    .location
486                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
487            },
488        )
489    }
490
491    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
492    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
493    ///
494    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
495    /// not deterministic, use [`Stream::flatten_unordered`] instead.
496    ///
497    /// ```rust
498    /// # #[cfg(feature = "deploy")] {
499    /// # use hydro_lang::prelude::*;
500    /// # use futures::StreamExt;
501    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
502    /// process
503    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
504    ///     .flatten_ordered()
505    /// # }, |mut stream| async move {
506    /// // 1, 2, 3, 4
507    /// # for w in (1..5) {
508    /// #     assert_eq!(stream.next().await.unwrap(), w);
509    /// # }
510    /// # }));
511    /// # }
512    /// ```
513    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
514    where
515        T: IntoIterator<Item = U>,
516    {
517        self.flat_map_ordered(q!(|d| d))
518    }
519
520    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
521    /// for the element type `T` to produce items in any order.
522    ///
523    /// # Example
524    /// ```rust
525    /// # #[cfg(feature = "deploy")] {
526    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
527    /// # use futures::StreamExt;
528    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
529    /// process
530    ///     .source_iter(q!(vec![
531    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
532    ///         std::collections::HashSet::from_iter(vec![3, 4]),
533    ///     ]))
534    ///     .flatten_unordered()
535    /// # }, |mut stream| async move {
536    /// // 1, 2, 3, 4, but in no particular order
537    /// # let mut results = Vec::new();
538    /// # for w in (1..5) {
539    /// #     results.push(stream.next().await.unwrap());
540    /// # }
541    /// # results.sort();
542    /// # assert_eq!(results, vec![1, 2, 3, 4]);
543    /// # }));
544    /// # }
545    /// ```
546    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
547    where
548        T: IntoIterator<Item = U>,
549    {
550        self.flat_map_unordered(q!(|d| d))
551    }
552
553    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
554    /// `f`, preserving the order of the elements.
555    ///
556    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
557    /// not modify or take ownership of the values. If you need to modify the values while filtering
558    /// use [`Stream::filter_map`] instead.
559    ///
560    /// # Example
561    /// ```rust
562    /// # #[cfg(feature = "deploy")] {
563    /// # use hydro_lang::prelude::*;
564    /// # use futures::StreamExt;
565    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
566    /// process
567    ///     .source_iter(q!(vec![1, 2, 3, 4]))
568    ///     .filter(q!(|&x| x > 2))
569    /// # }, |mut stream| async move {
570    /// // 3, 4
571    /// # for w in (3..5) {
572    /// #     assert_eq!(stream.next().await.unwrap(), w);
573    /// # }
574    /// # }));
575    /// # }
576    /// ```
577    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
578    where
579        F: Fn(&T) -> bool + 'a,
580    {
581        let f = f.splice_fn1_borrow_ctx(&self.location).into();
582        Stream::new(
583            self.location.clone(),
584            HydroNode::Filter {
585                f,
586                input: Box::new(self.ir_node.into_inner()),
587                metadata: self.location.new_node_metadata(Self::collection_kind()),
588            },
589        )
590    }
591
592    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
593    ///
594    /// # Example
595    /// ```rust
596    /// # #[cfg(feature = "deploy")] {
597    /// # use hydro_lang::prelude::*;
598    /// # use futures::StreamExt;
599    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
600    /// process
601    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
602    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
603    /// # }, |mut stream| async move {
604    /// // 1, 2
605    /// # for w in (1..3) {
606    /// #     assert_eq!(stream.next().await.unwrap(), w);
607    /// # }
608    /// # }));
609    /// # }
610    /// ```
611    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
612    where
613        F: Fn(T) -> Option<U> + 'a,
614    {
615        let f = f.splice_fn1_ctx(&self.location).into();
616        Stream::new(
617            self.location.clone(),
618            HydroNode::FilterMap {
619                f,
620                input: Box::new(self.ir_node.into_inner()),
621                metadata: self
622                    .location
623                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
624            },
625        )
626    }
627
628    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
629    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
630    /// If `other` is an empty [`Optional`], no values will be produced.
631    ///
632    /// # Example
633    /// ```rust
634    /// # #[cfg(feature = "deploy")] {
635    /// # use hydro_lang::prelude::*;
636    /// # use futures::StreamExt;
637    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638    /// let tick = process.tick();
639    /// let batch = process
640    ///   .source_iter(q!(vec![1, 2, 3, 4]))
641    ///   .batch(&tick, nondet!(/** test */));
642    /// let count = batch.clone().count(); // `count()` returns a singleton
643    /// batch.cross_singleton(count).all_ticks()
644    /// # }, |mut stream| async move {
645    /// // (1, 4), (2, 4), (3, 4), (4, 4)
646    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
647    /// #     assert_eq!(stream.next().await.unwrap(), w);
648    /// # }
649    /// # }));
650    /// # }
651    /// ```
652    pub fn cross_singleton<O2>(
653        self,
654        other: impl Into<Optional<O2, L, Bounded>>,
655    ) -> Stream<(T, O2), L, B, O, R>
656    where
657        O2: Clone,
658    {
659        let other: Optional<O2, L, Bounded> = other.into();
660        check_matching_location(&self.location, &other.location);
661
662        Stream::new(
663            self.location.clone(),
664            HydroNode::CrossSingleton {
665                left: Box::new(self.ir_node.into_inner()),
666                right: Box::new(other.ir_node.into_inner()),
667                metadata: self
668                    .location
669                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
670            },
671        )
672    }
673
674    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
675    ///
676    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
677    /// leader of a cluster.
678    ///
679    /// # Example
680    /// ```rust
681    /// # #[cfg(feature = "deploy")] {
682    /// # use hydro_lang::prelude::*;
683    /// # use futures::StreamExt;
684    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
685    /// let tick = process.tick();
686    /// // ticks are lazy by default, forces the second tick to run
687    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
688    ///
689    /// let batch_first_tick = process
690    ///   .source_iter(q!(vec![1, 2, 3, 4]))
691    ///   .batch(&tick, nondet!(/** test */));
692    /// let batch_second_tick = process
693    ///   .source_iter(q!(vec![5, 6, 7, 8]))
694    ///   .batch(&tick, nondet!(/** test */))
695    ///   .defer_tick(); // appears on the second tick
696    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
697    /// batch_first_tick.chain(batch_second_tick)
698    ///   .filter_if_some(some_on_first_tick)
699    ///   .all_ticks()
700    /// # }, |mut stream| async move {
701    /// // [1, 2, 3, 4]
702    /// # for w in vec![1, 2, 3, 4] {
703    /// #     assert_eq!(stream.next().await.unwrap(), w);
704    /// # }
705    /// # }));
706    /// # }
707    /// ```
708    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
709        self.cross_singleton(signal.map(q!(|_u| ())))
710            .map(q!(|(d, _signal)| d))
711    }
712
713    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
714    ///
715    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
716    /// some local state.
717    ///
718    /// # Example
719    /// ```rust
720    /// # #[cfg(feature = "deploy")] {
721    /// # use hydro_lang::prelude::*;
722    /// # use futures::StreamExt;
723    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
724    /// let tick = process.tick();
725    /// // ticks are lazy by default, forces the second tick to run
726    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
727    ///
728    /// let batch_first_tick = process
729    ///   .source_iter(q!(vec![1, 2, 3, 4]))
730    ///   .batch(&tick, nondet!(/** test */));
731    /// let batch_second_tick = process
732    ///   .source_iter(q!(vec![5, 6, 7, 8]))
733    ///   .batch(&tick, nondet!(/** test */))
734    ///   .defer_tick(); // appears on the second tick
735    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
736    /// batch_first_tick.chain(batch_second_tick)
737    ///   .filter_if_none(some_on_first_tick)
738    ///   .all_ticks()
739    /// # }, |mut stream| async move {
740    /// // [5, 6, 7, 8]
741    /// # for w in vec![5, 6, 7, 8] {
742    /// #     assert_eq!(stream.next().await.unwrap(), w);
743    /// # }
744    /// # }));
745    /// # }
746    /// ```
747    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
748        self.filter_if_some(
749            other
750                .map(q!(|_| ()))
751                .into_singleton()
752                .filter(q!(|o| o.is_none())),
753        )
754    }
755
756    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
757    /// tupled pairs in a non-deterministic order.
758    ///
759    /// # Example
760    /// ```rust
761    /// # #[cfg(feature = "deploy")] {
762    /// # use hydro_lang::prelude::*;
763    /// # use std::collections::HashSet;
764    /// # use futures::StreamExt;
765    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
766    /// let tick = process.tick();
767    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
768    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
769    /// stream1.cross_product(stream2)
770    /// # }, |mut stream| async move {
771    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
772    /// # stream.map(|i| assert!(expected.contains(&i)));
773    /// # }));
774    /// # }
775    /// ```
776    pub fn cross_product<T2, O2: Ordering>(
777        self,
778        other: Stream<T2, L, B, O2, R>,
779    ) -> Stream<(T, T2), L, B, NoOrder, R>
780    where
781        T: Clone,
782        T2: Clone,
783    {
784        check_matching_location(&self.location, &other.location);
785
786        Stream::new(
787            self.location.clone(),
788            HydroNode::CrossProduct {
789                left: Box::new(self.ir_node.into_inner()),
790                right: Box::new(other.ir_node.into_inner()),
791                metadata: self
792                    .location
793                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
794            },
795        )
796    }
797
798    /// Takes one stream as input and filters out any duplicate occurrences. The output
799    /// contains all unique values from the input.
800    ///
801    /// # Example
802    /// ```rust
803    /// # #[cfg(feature = "deploy")] {
804    /// # use hydro_lang::prelude::*;
805    /// # use futures::StreamExt;
806    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
807    /// let tick = process.tick();
808    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
809    /// # }, |mut stream| async move {
810    /// # for w in vec![1, 2, 3, 4] {
811    /// #     assert_eq!(stream.next().await.unwrap(), w);
812    /// # }
813    /// # }));
814    /// # }
815    /// ```
816    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
817    where
818        T: Eq + Hash,
819    {
820        Stream::new(
821            self.location.clone(),
822            HydroNode::Unique {
823                input: Box::new(self.ir_node.into_inner()),
824                metadata: self
825                    .location
826                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
827            },
828        )
829    }
830
831    /// Outputs everything in this stream that is *not* contained in the `other` stream.
832    ///
833    /// The `other` stream must be [`Bounded`], since this function will wait until
834    /// all its elements are available before producing any output.
835    /// # Example
836    /// ```rust
837    /// # #[cfg(feature = "deploy")] {
838    /// # use hydro_lang::prelude::*;
839    /// # use futures::StreamExt;
840    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
841    /// let tick = process.tick();
842    /// let stream = process
843    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
844    ///   .batch(&tick, nondet!(/** test */));
845    /// let batch = process
846    ///   .source_iter(q!(vec![1, 2]))
847    ///   .batch(&tick, nondet!(/** test */));
848    /// stream.filter_not_in(batch).all_ticks()
849    /// # }, |mut stream| async move {
850    /// # for w in vec![3, 4] {
851    /// #     assert_eq!(stream.next().await.unwrap(), w);
852    /// # }
853    /// # }));
854    /// # }
855    /// ```
856    pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
857    where
858        T: Eq + Hash,
859    {
860        check_matching_location(&self.location, &other.location);
861
862        Stream::new(
863            self.location.clone(),
864            HydroNode::Difference {
865                pos: Box::new(self.ir_node.into_inner()),
866                neg: Box::new(other.ir_node.into_inner()),
867                metadata: self
868                    .location
869                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
870            },
871        )
872    }
873
874    /// An operator which allows you to "inspect" each element of a stream without
875    /// modifying it. The closure `f` is called on a reference to each item. This is
876    /// mainly useful for debugging, and should not be used to generate side-effects.
877    ///
878    /// # Example
879    /// ```rust
880    /// # #[cfg(feature = "deploy")] {
881    /// # use hydro_lang::prelude::*;
882    /// # use futures::StreamExt;
883    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884    /// let nums = process.source_iter(q!(vec![1, 2]));
885    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
886    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
887    /// # }, |mut stream| async move {
888    /// # for w in vec![1, 2] {
889    /// #     assert_eq!(stream.next().await.unwrap(), w);
890    /// # }
891    /// # }));
892    /// # }
893    /// ```
894    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
895    where
896        F: Fn(&T) + 'a,
897    {
898        let f = f.splice_fn1_borrow_ctx(&self.location).into();
899
900        Stream::new(
901            self.location.clone(),
902            HydroNode::Inspect {
903                f,
904                input: Box::new(self.ir_node.into_inner()),
905                metadata: self.location.new_node_metadata(Self::collection_kind()),
906            },
907        )
908    }
909
910    /// An operator which allows you to "name" a `HydroNode`.
911    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
912    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
913        {
914            let mut node = self.ir_node.borrow_mut();
915            let metadata = node.metadata_mut();
916            metadata.tag = Some(name.to_owned());
917        }
918        self
919    }
920
921    /// Explicitly "casts" the stream to a type with a different ordering
922    /// guarantee. Useful in unsafe code where the ordering cannot be proven
923    /// by the type-system.
924    ///
925    /// # Non-Determinism
926    /// This function is used as an escape hatch, and any mistakes in the
927    /// provided ordering guarantee will propagate into the guarantees
928    /// for the rest of the program.
929    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
930        if O::ORDERING_KIND == O2::ORDERING_KIND {
931            Stream::new(self.location, self.ir_node.into_inner())
932        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
933            // We can always weaken the ordering guarantee
934            Stream::new(
935                self.location.clone(),
936                HydroNode::Cast {
937                    inner: Box::new(self.ir_node.into_inner()),
938                    metadata: self
939                        .location
940                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
941                },
942            )
943        } else {
944            Stream::new(
945                self.location.clone(),
946                HydroNode::ObserveNonDet {
947                    inner: Box::new(self.ir_node.into_inner()),
948                    trusted: false,
949                    metadata: self
950                        .location
951                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
952                },
953            )
954        }
955    }
956
957    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
958    // intermediate states will not be revealed
959    fn assume_ordering_trusted_bounded<O2: Ordering>(
960        self,
961        nondet: NonDet,
962    ) -> Stream<T, L, B, O2, R> {
963        if B::BOUNDED {
964            self.assume_ordering_trusted(nondet)
965        } else {
966            self.assume_ordering(nondet)
967        }
968    }
969
970    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
971    // is not observable
972    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
973        self,
974        _nondet: NonDet,
975    ) -> Stream<T, L, B, O2, R> {
976        if O::ORDERING_KIND == O2::ORDERING_KIND {
977            Stream::new(self.location, self.ir_node.into_inner())
978        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
979            // We can always weaken the ordering guarantee
980            Stream::new(
981                self.location.clone(),
982                HydroNode::Cast {
983                    inner: Box::new(self.ir_node.into_inner()),
984                    metadata: self
985                        .location
986                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
987                },
988            )
989        } else {
990            Stream::new(
991                self.location.clone(),
992                HydroNode::ObserveNonDet {
993                    inner: Box::new(self.ir_node.into_inner()),
994                    trusted: true,
995                    metadata: self
996                        .location
997                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
998                },
999            )
1000        }
1001    }
1002
1003    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1004    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1005    /// which is always safe because that is the weakest possible guarantee.
1006    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1007        self.weaken_ordering::<NoOrder>()
1008    }
1009
1010    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1011    /// enforcing that `O2` is weaker than the input ordering guarantee.
1012    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1013        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1014        self.assume_ordering::<O2>(nondet)
1015    }
1016
1017    /// Explicitly "casts" the stream to a type with a different retries
1018    /// guarantee. Useful in unsafe code where the lack of retries cannot
1019    /// be proven by the type-system.
1020    ///
1021    /// # Non-Determinism
1022    /// This function is used as an escape hatch, and any mistakes in the
1023    /// provided retries guarantee will propagate into the guarantees
1024    /// for the rest of the program.
1025    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1026        if R::RETRIES_KIND == R2::RETRIES_KIND {
1027            Stream::new(self.location, self.ir_node.into_inner())
1028        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1029            // We can always weaken the retries guarantee
1030            Stream::new(
1031                self.location.clone(),
1032                HydroNode::Cast {
1033                    inner: Box::new(self.ir_node.into_inner()),
1034                    metadata: self
1035                        .location
1036                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1037                },
1038            )
1039        } else {
1040            Stream::new(
1041                self.location.clone(),
1042                HydroNode::ObserveNonDet {
1043                    inner: Box::new(self.ir_node.into_inner()),
1044                    trusted: false,
1045                    metadata: self
1046                        .location
1047                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1048                },
1049            )
1050        }
1051    }
1052
1053    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1054    // is not observable
1055    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1056        if R::RETRIES_KIND == R2::RETRIES_KIND {
1057            Stream::new(self.location, self.ir_node.into_inner())
1058        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1059            // We can always weaken the retries guarantee
1060            Stream::new(
1061                self.location.clone(),
1062                HydroNode::Cast {
1063                    inner: Box::new(self.ir_node.into_inner()),
1064                    metadata: self
1065                        .location
1066                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1067                },
1068            )
1069        } else {
1070            Stream::new(
1071                self.location.clone(),
1072                HydroNode::ObserveNonDet {
1073                    inner: Box::new(self.ir_node.into_inner()),
1074                    trusted: true,
1075                    metadata: self
1076                        .location
1077                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1078                },
1079            )
1080        }
1081    }
1082
1083    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1084    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1085    /// which is always safe because that is the weakest possible guarantee.
1086    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1087        self.weaken_retries::<AtLeastOnce>()
1088    }
1089
1090    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1091    /// enforcing that `R2` is weaker than the input retries guarantee.
1092    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1093        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1094        self.assume_retries::<R2>(nondet)
1095    }
1096}
1097
1098impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1099where
1100    L: Location<'a>,
1101{
1102    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1103    ///
1104    /// # Example
1105    /// ```rust
1106    /// # #[cfg(feature = "deploy")] {
1107    /// # use hydro_lang::prelude::*;
1108    /// # use futures::StreamExt;
1109    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1110    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1111    /// # }, |mut stream| async move {
1112    /// // 1, 2, 3
1113    /// # for w in vec![1, 2, 3] {
1114    /// #     assert_eq!(stream.next().await.unwrap(), w);
1115    /// # }
1116    /// # }));
1117    /// # }
1118    /// ```
1119    pub fn cloned(self) -> Stream<T, L, B, O, R>
1120    where
1121        T: Clone,
1122    {
1123        self.map(q!(|d| d.clone()))
1124    }
1125}
1126
1127impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1128where
1129    L: Location<'a>,
1130{
1131    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1132    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1133    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1134    ///
1135    /// Depending on the input stream guarantees, the closure may need to be commutative
1136    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1137    ///
1138    /// # Example
1139    /// ```rust
1140    /// # #[cfg(feature = "deploy")] {
1141    /// # use hydro_lang::prelude::*;
1142    /// # use futures::StreamExt;
1143    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1144    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1145    /// words
1146    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1147    ///     .into_stream()
1148    /// # }, |mut stream| async move {
1149    /// // "HELLOWORLD"
1150    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1151    /// # }));
1152    /// # }
1153    /// ```
1154    pub fn fold<A, I, F, C, Idemp>(
1155        self,
1156        init: impl IntoQuotedMut<'a, I, L>,
1157        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1158    ) -> Singleton<A, L, B>
1159    where
1160        I: Fn() -> A + 'a,
1161        F: Fn(&mut A, T),
1162        C: ValidCommutativityFor<O>,
1163        Idemp: ValidIdempotenceFor<R>,
1164    {
1165        let init = init.splice_fn0_ctx(&self.location).into();
1166        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1167        proof.register_proof(&comb);
1168
1169        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1170        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1171
1172        let core = HydroNode::Fold {
1173            init,
1174            acc: comb.into(),
1175            input: Box::new(ordered_etc.ir_node.into_inner()),
1176            metadata: ordered_etc
1177                .location
1178                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1179        };
1180
1181        Singleton::new(ordered_etc.location, core)
1182    }
1183
1184    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1185    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1186    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1187    /// reference, so that it can be modified in place.
1188    ///
1189    /// Depending on the input stream guarantees, the closure may need to be commutative
1190    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1191    ///
1192    /// # Example
1193    /// ```rust
1194    /// # #[cfg(feature = "deploy")] {
1195    /// # use hydro_lang::prelude::*;
1196    /// # use futures::StreamExt;
1197    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1198    /// let bools = process.source_iter(q!(vec![false, true, false]));
1199    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1200    /// # }, |mut stream| async move {
1201    /// // true
1202    /// # assert_eq!(stream.next().await.unwrap(), true);
1203    /// # }));
1204    /// # }
1205    /// ```
1206    pub fn reduce<F, C, Idemp>(
1207        self,
1208        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1209    ) -> Optional<T, L, B>
1210    where
1211        F: Fn(&mut T, T) + 'a,
1212        C: ValidCommutativityFor<O>,
1213        Idemp: ValidIdempotenceFor<R>,
1214    {
1215        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1216        proof.register_proof(&f);
1217
1218        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1219        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1220
1221        let core = HydroNode::Reduce {
1222            f: f.into(),
1223            input: Box::new(ordered_etc.ir_node.into_inner()),
1224            metadata: ordered_etc
1225                .location
1226                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1227        };
1228
1229        Optional::new(ordered_etc.location, core)
1230    }
1231
1232    /// Computes the maximum element in the stream as an [`Optional`], which
1233    /// will be empty until the first element in the input arrives.
1234    ///
1235    /// # Example
1236    /// ```rust
1237    /// # #[cfg(feature = "deploy")] {
1238    /// # use hydro_lang::prelude::*;
1239    /// # use futures::StreamExt;
1240    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1241    /// let tick = process.tick();
1242    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1243    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1244    /// batch.max().all_ticks()
1245    /// # }, |mut stream| async move {
1246    /// // 4
1247    /// # assert_eq!(stream.next().await.unwrap(), 4);
1248    /// # }));
1249    /// # }
1250    /// ```
1251    pub fn max(self) -> Optional<T, L, B>
1252    where
1253        T: Ord,
1254    {
1255        self.assume_retries_trusted(nondet!(/** max is idempotent */))
1256            .assume_ordering_trusted_bounded(
1257                nondet!(/** max is commutative, but order affects intermediates */),
1258            )
1259            .reduce(q!(|curr, new| {
1260                if new > *curr {
1261                    *curr = new;
1262                }
1263            }))
1264    }
1265
1266    /// Computes the minimum element in the stream as an [`Optional`], which
1267    /// will be empty until the first element in the input arrives.
1268    ///
1269    /// # Example
1270    /// ```rust
1271    /// # #[cfg(feature = "deploy")] {
1272    /// # use hydro_lang::prelude::*;
1273    /// # use futures::StreamExt;
1274    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1275    /// let tick = process.tick();
1276    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1277    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1278    /// batch.min().all_ticks()
1279    /// # }, |mut stream| async move {
1280    /// // 1
1281    /// # assert_eq!(stream.next().await.unwrap(), 1);
1282    /// # }));
1283    /// # }
1284    /// ```
1285    pub fn min(self) -> Optional<T, L, B>
1286    where
1287        T: Ord,
1288    {
1289        self.assume_retries_trusted(nondet!(/** min is idempotent */))
1290            .assume_ordering_trusted_bounded(
1291                nondet!(/** max is commutative, but order affects intermediates */),
1292            )
1293            .reduce(q!(|curr, new| {
1294                if new < *curr {
1295                    *curr = new;
1296                }
1297            }))
1298    }
1299}
1300
1301impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1302where
1303    L: Location<'a>,
1304{
1305    /// Computes the number of elements in the stream as a [`Singleton`].
1306    ///
1307    /// # Example
1308    /// ```rust
1309    /// # #[cfg(feature = "deploy")] {
1310    /// # use hydro_lang::prelude::*;
1311    /// # use futures::StreamExt;
1312    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1313    /// let tick = process.tick();
1314    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1315    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1316    /// batch.count().all_ticks()
1317    /// # }, |mut stream| async move {
1318    /// // 4
1319    /// # assert_eq!(stream.next().await.unwrap(), 4);
1320    /// # }));
1321    /// # }
1322    /// ```
1323    pub fn count(self) -> Singleton<usize, L, B> {
1324        self.assume_ordering_trusted(nondet!(
1325            /// Order does not affect eventual count, and also does not affect intermediate states.
1326        ))
1327        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1328    }
1329}
1330
1331impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1332where
1333    L: Location<'a>,
1334{
1335    /// Computes the first element in the stream as an [`Optional`], which
1336    /// will be empty until the first element in the input arrives.
1337    ///
1338    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1339    /// re-ordering of elements may cause the first element to change.
1340    ///
1341    /// # Example
1342    /// ```rust
1343    /// # #[cfg(feature = "deploy")] {
1344    /// # use hydro_lang::prelude::*;
1345    /// # use futures::StreamExt;
1346    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1347    /// let tick = process.tick();
1348    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1349    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1350    /// batch.first().all_ticks()
1351    /// # }, |mut stream| async move {
1352    /// // 1
1353    /// # assert_eq!(stream.next().await.unwrap(), 1);
1354    /// # }));
1355    /// # }
1356    /// ```
1357    pub fn first(self) -> Optional<T, L, B> {
1358        self.assume_retries_trusted(nondet!(/** first is idempotent */))
1359            .reduce(q!(|_, _| {}))
1360    }
1361
1362    /// Computes the last element in the stream as an [`Optional`], which
1363    /// will be empty until an element in the input arrives.
1364    ///
1365    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1366    /// re-ordering of elements may cause the last element to change.
1367    ///
1368    /// # Example
1369    /// ```rust
1370    /// # #[cfg(feature = "deploy")] {
1371    /// # use hydro_lang::prelude::*;
1372    /// # use futures::StreamExt;
1373    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1374    /// let tick = process.tick();
1375    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1376    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1377    /// batch.last().all_ticks()
1378    /// # }, |mut stream| async move {
1379    /// // 4
1380    /// # assert_eq!(stream.next().await.unwrap(), 4);
1381    /// # }));
1382    /// # }
1383    /// ```
1384    pub fn last(self) -> Optional<T, L, B> {
1385        self.assume_retries_trusted(nondet!(/** last is idempotent */))
1386            .reduce(q!(|curr, new| *curr = new))
1387    }
1388}
1389
1390impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1391where
1392    L: Location<'a>,
1393{
1394    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1395    ///
1396    /// # Example
1397    /// ```rust
1398    /// # #[cfg(feature = "deploy")] {
1399    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1400    /// # use futures::StreamExt;
1401    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1402    /// let tick = process.tick();
1403    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1404    /// numbers.enumerate()
1405    /// # }, |mut stream| async move {
1406    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1407    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1408    /// #     assert_eq!(stream.next().await.unwrap(), w);
1409    /// # }
1410    /// # }));
1411    /// # }
1412    /// ```
1413    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1414        Stream::new(
1415            self.location.clone(),
1416            HydroNode::Enumerate {
1417                input: Box::new(self.ir_node.into_inner()),
1418                metadata: self.location.new_node_metadata(Stream::<
1419                    (usize, T),
1420                    L,
1421                    B,
1422                    TotalOrder,
1423                    ExactlyOnce,
1424                >::collection_kind()),
1425            },
1426        )
1427    }
1428
1429    /// Collects all the elements of this stream into a single [`Vec`] element.
1430    ///
1431    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1432    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1433    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1434    /// the vector at an arbitrary point in time.
1435    ///
1436    /// # Example
1437    /// ```rust
1438    /// # #[cfg(feature = "deploy")] {
1439    /// # use hydro_lang::prelude::*;
1440    /// # use futures::StreamExt;
1441    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1442    /// let tick = process.tick();
1443    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1444    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1445    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1446    /// # }, |mut stream| async move {
1447    /// // [ vec![1, 2, 3, 4] ]
1448    /// # for w in vec![vec![1, 2, 3, 4]] {
1449    /// #     assert_eq!(stream.next().await.unwrap(), w);
1450    /// # }
1451    /// # }));
1452    /// # }
1453    /// ```
1454    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1455        self.fold(
1456            q!(|| vec![]),
1457            q!(|acc, v| {
1458                acc.push(v);
1459            }),
1460        )
1461    }
1462
1463    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1464    /// and emitting each intermediate result.
1465    ///
1466    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1467    /// containing all intermediate accumulated values. The scan operation can also terminate early
1468    /// by returning `None`.
1469    ///
1470    /// The function takes a mutable reference to the accumulator and the current element, and returns
1471    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1472    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1473    ///
1474    /// # Examples
1475    ///
1476    /// Basic usage - running sum:
1477    /// ```rust
1478    /// # #[cfg(feature = "deploy")] {
1479    /// # use hydro_lang::prelude::*;
1480    /// # use futures::StreamExt;
1481    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1482    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1483    ///     q!(|| 0),
1484    ///     q!(|acc, x| {
1485    ///         *acc += x;
1486    ///         Some(*acc)
1487    ///     }),
1488    /// )
1489    /// # }, |mut stream| async move {
1490    /// // Output: 1, 3, 6, 10
1491    /// # for w in vec![1, 3, 6, 10] {
1492    /// #     assert_eq!(stream.next().await.unwrap(), w);
1493    /// # }
1494    /// # }));
1495    /// # }
1496    /// ```
1497    ///
1498    /// Early termination example:
1499    /// ```rust
1500    /// # #[cfg(feature = "deploy")] {
1501    /// # use hydro_lang::prelude::*;
1502    /// # use futures::StreamExt;
1503    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1504    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1505    ///     q!(|| 1),
1506    ///     q!(|state, x| {
1507    ///         *state = *state * x;
1508    ///         if *state > 6 {
1509    ///             None // Terminate the stream
1510    ///         } else {
1511    ///             Some(-*state)
1512    ///         }
1513    ///     }),
1514    /// )
1515    /// # }, |mut stream| async move {
1516    /// // Output: -1, -2, -6
1517    /// # for w in vec![-1, -2, -6] {
1518    /// #     assert_eq!(stream.next().await.unwrap(), w);
1519    /// # }
1520    /// # }));
1521    /// # }
1522    /// ```
1523    pub fn scan<A, U, I, F>(
1524        self,
1525        init: impl IntoQuotedMut<'a, I, L>,
1526        f: impl IntoQuotedMut<'a, F, L>,
1527    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1528    where
1529        I: Fn() -> A + 'a,
1530        F: Fn(&mut A, T) -> Option<U> + 'a,
1531    {
1532        let init = init.splice_fn0_ctx(&self.location).into();
1533        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1534
1535        Stream::new(
1536            self.location.clone(),
1537            HydroNode::Scan {
1538                init,
1539                acc: f,
1540                input: Box::new(self.ir_node.into_inner()),
1541                metadata: self.location.new_node_metadata(
1542                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1543                ),
1544            },
1545        )
1546    }
1547}
1548
1549impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1550    /// Produces a new stream that interleaves the elements of the two input streams.
1551    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1552    ///
1553    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1554    /// [`Bounded`], you can use [`Stream::chain`] instead.
1555    ///
1556    /// # Example
1557    /// ```rust
1558    /// # #[cfg(feature = "deploy")] {
1559    /// # use hydro_lang::prelude::*;
1560    /// # use futures::StreamExt;
1561    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1562    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
1563    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
1564    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1565    /// # }, |mut stream| async move {
1566    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1567    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1568    /// #     assert_eq!(stream.next().await.unwrap(), w);
1569    /// # }
1570    /// # }));
1571    /// # }
1572    /// ```
1573    pub fn interleave<O2: Ordering, R2: Retries>(
1574        self,
1575        other: Stream<T, L, Unbounded, O2, R2>,
1576    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1577    where
1578        R: MinRetries<R2>,
1579    {
1580        Stream::new(
1581            self.location.clone(),
1582            HydroNode::Chain {
1583                first: Box::new(self.ir_node.into_inner()),
1584                second: Box::new(other.ir_node.into_inner()),
1585                metadata: self.location.new_node_metadata(Stream::<
1586                    T,
1587                    L,
1588                    Unbounded,
1589                    NoOrder,
1590                    <R as MinRetries<R2>>::Min,
1591                >::collection_kind()),
1592            },
1593        )
1594    }
1595}
1596
1597impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1598    /// Produces a new stream that combines the elements of the two input streams,
1599    /// preserving the relative order of elements within each input.
1600    ///
1601    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1602    /// [`Bounded`], you can use [`Stream::chain`] instead.
1603    ///
1604    /// # Non-Determinism
1605    /// The order in which elements *across* the two streams will be interleaved is
1606    /// non-deterministic, so the order of elements will vary across runs. If the output order
1607    /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1608    /// unordered stream.
1609    ///
1610    /// # Example
1611    /// ```rust
1612    /// # #[cfg(feature = "deploy")] {
1613    /// # use hydro_lang::prelude::*;
1614    /// # use futures::StreamExt;
1615    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1616    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
1617    /// # process.source_iter(q!(vec![1, 3])).into();
1618    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1619    /// # }, |mut stream| async move {
1620    /// // 1, 3 and 2, 4 in some order, preserving the original local order
1621    /// # for w in vec![1, 3, 2, 4] {
1622    /// #     assert_eq!(stream.next().await.unwrap(), w);
1623    /// # }
1624    /// # }));
1625    /// # }
1626    /// ```
1627    pub fn merge_ordered<R2: Retries>(
1628        self,
1629        other: Stream<T, L, Unbounded, TotalOrder, R2>,
1630        _nondet: NonDet,
1631    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1632    where
1633        R: MinRetries<R2>,
1634    {
1635        Stream::new(
1636            self.location.clone(),
1637            HydroNode::Chain {
1638                first: Box::new(self.ir_node.into_inner()),
1639                second: Box::new(other.ir_node.into_inner()),
1640                metadata: self.location.new_node_metadata(Stream::<
1641                    T,
1642                    L,
1643                    Unbounded,
1644                    TotalOrder,
1645                    <R as MinRetries<R2>>::Min,
1646                >::collection_kind()),
1647            },
1648        )
1649    }
1650}
1651
1652impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1653where
1654    L: Location<'a>,
1655{
1656    /// Produces a new stream that emits the input elements in sorted order.
1657    ///
1658    /// The input stream can have any ordering guarantee, but the output stream
1659    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1660    /// elements in the input stream are available, so it requires the input stream
1661    /// to be [`Bounded`].
1662    ///
1663    /// # Example
1664    /// ```rust
1665    /// # #[cfg(feature = "deploy")] {
1666    /// # use hydro_lang::prelude::*;
1667    /// # use futures::StreamExt;
1668    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1669    /// let tick = process.tick();
1670    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1671    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1672    /// batch.sort().all_ticks()
1673    /// # }, |mut stream| async move {
1674    /// // 1, 2, 3, 4
1675    /// # for w in (1..5) {
1676    /// #     assert_eq!(stream.next().await.unwrap(), w);
1677    /// # }
1678    /// # }));
1679    /// # }
1680    /// ```
1681    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1682    where
1683        T: Ord,
1684    {
1685        Stream::new(
1686            self.location.clone(),
1687            HydroNode::Sort {
1688                input: Box::new(self.ir_node.into_inner()),
1689                metadata: self
1690                    .location
1691                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1692            },
1693        )
1694    }
1695
1696    /// Produces a new stream that first emits the elements of the `self` stream,
1697    /// and then emits the elements of the `other` stream. The output stream has
1698    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1699    /// [`TotalOrder`] guarantee.
1700    ///
1701    /// Currently, both input streams must be [`Bounded`]. This operator will block
1702    /// on the first stream until all its elements are available. In a future version,
1703    /// we will relax the requirement on the `other` stream.
1704    ///
1705    /// # Example
1706    /// ```rust
1707    /// # #[cfg(feature = "deploy")] {
1708    /// # use hydro_lang::prelude::*;
1709    /// # use futures::StreamExt;
1710    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1711    /// let tick = process.tick();
1712    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1713    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1714    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1715    /// # }, |mut stream| async move {
1716    /// // 2, 3, 4, 5, 1, 2, 3, 4
1717    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1718    /// #     assert_eq!(stream.next().await.unwrap(), w);
1719    /// # }
1720    /// # }));
1721    /// # }
1722    /// ```
1723    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
1724        self,
1725        other: Stream<T, L, B2, O2, R2>,
1726    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1727    where
1728        O: MinOrder<O2>,
1729        R: MinRetries<R2>,
1730    {
1731        check_matching_location(&self.location, &other.location);
1732
1733        Stream::new(
1734            self.location.clone(),
1735            HydroNode::Chain {
1736                first: Box::new(self.ir_node.into_inner()),
1737                second: Box::new(other.ir_node.into_inner()),
1738                metadata: self.location.new_node_metadata(Stream::<
1739                    T,
1740                    L,
1741                    B2,
1742                    <O as MinOrder<O2>>::Min,
1743                    <R as MinRetries<R2>>::Min,
1744                >::collection_kind()),
1745            },
1746        )
1747    }
1748
1749    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1750    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1751    /// because this is compiled into a nested loop.
1752    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1753        self,
1754        other: Stream<T2, L, Bounded, O2, R>,
1755    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1756    where
1757        T: Clone,
1758        T2: Clone,
1759    {
1760        check_matching_location(&self.location, &other.location);
1761
1762        Stream::new(
1763            self.location.clone(),
1764            HydroNode::CrossProduct {
1765                left: Box::new(self.ir_node.into_inner()),
1766                right: Box::new(other.ir_node.into_inner()),
1767                metadata: self.location.new_node_metadata(Stream::<
1768                    (T, T2),
1769                    L,
1770                    Bounded,
1771                    <O2 as MinOrder<O>>::Min,
1772                    R,
1773                >::collection_kind()),
1774            },
1775        )
1776    }
1777
1778    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1779    /// `self` used as the values for *each* key.
1780    ///
1781    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1782    /// values. For example, it can be used to send the same set of elements to several cluster
1783    /// members, if the membership information is available as a [`KeyedSingleton`].
1784    ///
1785    /// # Example
1786    /// ```rust
1787    /// # #[cfg(feature = "deploy")] {
1788    /// # use hydro_lang::prelude::*;
1789    /// # use futures::StreamExt;
1790    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1791    /// # let tick = process.tick();
1792    /// let keyed_singleton = // { 1: (), 2: () }
1793    /// # process
1794    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
1795    /// #     .into_keyed()
1796    /// #     .batch(&tick, nondet!(/** test */))
1797    /// #     .first();
1798    /// let stream = // [ "a", "b" ]
1799    /// # process
1800    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
1801    /// #     .batch(&tick, nondet!(/** test */));
1802    /// stream.repeat_with_keys(keyed_singleton)
1803    /// # .entries().all_ticks()
1804    /// # }, |mut stream| async move {
1805    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1806    /// # let mut results = Vec::new();
1807    /// # for _ in 0..4 {
1808    /// #     results.push(stream.next().await.unwrap());
1809    /// # }
1810    /// # results.sort();
1811    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
1812    /// # }));
1813    /// # }
1814    /// ```
1815    pub fn repeat_with_keys<K, V2>(
1816        self,
1817        keys: KeyedSingleton<K, V2, L, Bounded>,
1818    ) -> KeyedStream<K, T, L, Bounded, O, R>
1819    where
1820        K: Clone,
1821        T: Clone,
1822    {
1823        keys.keys()
1824            .weaken_retries()
1825            .assume_ordering_trusted::<TotalOrder>(
1826                nondet!(/** keyed stream does not depend on ordering of keys */),
1827            )
1828            .cross_product_nested_loop(self)
1829            .into_keyed()
1830    }
1831}
1832
1833impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1834where
1835    L: Location<'a>,
1836{
1837    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1838    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1839    /// by equi-joining the two streams on the key attribute `K`.
1840    ///
1841    /// # Example
1842    /// ```rust
1843    /// # #[cfg(feature = "deploy")] {
1844    /// # use hydro_lang::prelude::*;
1845    /// # use std::collections::HashSet;
1846    /// # use futures::StreamExt;
1847    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1848    /// let tick = process.tick();
1849    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1850    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1851    /// stream1.join(stream2)
1852    /// # }, |mut stream| async move {
1853    /// // (1, ('a', 'x')), (2, ('b', 'y'))
1854    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1855    /// # stream.map(|i| assert!(expected.contains(&i)));
1856    /// # }));
1857    /// # }
1858    pub fn join<V2, O2: Ordering, R2: Retries>(
1859        self,
1860        n: Stream<(K, V2), L, B, O2, R2>,
1861    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1862    where
1863        K: Eq + Hash,
1864        R: MinRetries<R2>,
1865    {
1866        check_matching_location(&self.location, &n.location);
1867
1868        Stream::new(
1869            self.location.clone(),
1870            HydroNode::Join {
1871                left: Box::new(self.ir_node.into_inner()),
1872                right: Box::new(n.ir_node.into_inner()),
1873                metadata: self.location.new_node_metadata(Stream::<
1874                    (K, (V1, V2)),
1875                    L,
1876                    B,
1877                    NoOrder,
1878                    <R as MinRetries<R2>>::Min,
1879                >::collection_kind()),
1880            },
1881        )
1882    }
1883
1884    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1885    /// computes the anti-join of the items in the input -- i.e. returns
1886    /// unique items in the first input that do not have a matching key
1887    /// in the second input.
1888    ///
1889    /// # Example
1890    /// ```rust
1891    /// # #[cfg(feature = "deploy")] {
1892    /// # use hydro_lang::prelude::*;
1893    /// # use futures::StreamExt;
1894    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1895    /// let tick = process.tick();
1896    /// let stream = process
1897    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1898    ///   .batch(&tick, nondet!(/** test */));
1899    /// let batch = process
1900    ///   .source_iter(q!(vec![1, 2]))
1901    ///   .batch(&tick, nondet!(/** test */));
1902    /// stream.anti_join(batch).all_ticks()
1903    /// # }, |mut stream| async move {
1904    /// # for w in vec![(3, 'c'), (4, 'd')] {
1905    /// #     assert_eq!(stream.next().await.unwrap(), w);
1906    /// # }
1907    /// # }));
1908    /// # }
1909    pub fn anti_join<O2: Ordering, R2: Retries>(
1910        self,
1911        n: Stream<K, L, Bounded, O2, R2>,
1912    ) -> Stream<(K, V1), L, B, O, R>
1913    where
1914        K: Eq + Hash,
1915    {
1916        check_matching_location(&self.location, &n.location);
1917
1918        Stream::new(
1919            self.location.clone(),
1920            HydroNode::AntiJoin {
1921                pos: Box::new(self.ir_node.into_inner()),
1922                neg: Box::new(n.ir_node.into_inner()),
1923                metadata: self
1924                    .location
1925                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
1926            },
1927        )
1928    }
1929}
1930
1931impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1932    Stream<(K, V), L, B, O, R>
1933{
1934    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1935    /// is used as the key and the second element is added to the entries associated with that key.
1936    ///
1937    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1938    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1939    /// performing grouped aggregations, but also for more precise ordering guarantees such as
1940    /// total ordering _within_ each group but no ordering _across_ groups.
1941    ///
1942    /// # Example
1943    /// ```rust
1944    /// # #[cfg(feature = "deploy")] {
1945    /// # use hydro_lang::prelude::*;
1946    /// # use futures::StreamExt;
1947    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1948    /// process
1949    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1950    ///     .into_keyed()
1951    /// #   .entries()
1952    /// # }, |mut stream| async move {
1953    /// // { 1: [2, 3], 2: [4] }
1954    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1955    /// #     assert_eq!(stream.next().await.unwrap(), w);
1956    /// # }
1957    /// # }));
1958    /// # }
1959    /// ```
1960    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1961        KeyedStream::new(
1962            self.location.clone(),
1963            HydroNode::Cast {
1964                inner: Box::new(self.ir_node.into_inner()),
1965                metadata: self
1966                    .location
1967                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1968            },
1969        )
1970    }
1971}
1972
1973impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
1974where
1975    K: Eq + Hash,
1976    L: Location<'a>,
1977{
1978    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1979    /// # Example
1980    /// ```rust
1981    /// # #[cfg(feature = "deploy")] {
1982    /// # use hydro_lang::prelude::*;
1983    /// # use futures::StreamExt;
1984    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1985    /// let tick = process.tick();
1986    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1987    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1988    /// batch.keys().all_ticks()
1989    /// # }, |mut stream| async move {
1990    /// // 1, 2
1991    /// # assert_eq!(stream.next().await.unwrap(), 1);
1992    /// # assert_eq!(stream.next().await.unwrap(), 2);
1993    /// # }));
1994    /// # }
1995    /// ```
1996    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
1997        self.into_keyed()
1998            .fold(
1999                q!(|| ()),
2000                q!(
2001                    |_, _| {},
2002                    commutative = ManualProof(/* values are ignored */),
2003                    idempotent = ManualProof(/* values are ignored */)
2004                ),
2005            )
2006            .keys()
2007    }
2008}
2009
2010impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2011where
2012    L: Location<'a> + NoTick,
2013{
2014    /// Returns a stream corresponding to the latest batch of elements being atomically
2015    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2016    /// the order of the input.
2017    ///
2018    /// # Non-Determinism
2019    /// The batch boundaries are non-deterministic and may change across executions.
2020    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2021        Stream::new(
2022            self.location.clone().tick,
2023            HydroNode::Batch {
2024                inner: Box::new(self.ir_node.into_inner()),
2025                metadata: self
2026                    .location
2027                    .tick
2028                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2029            },
2030        )
2031    }
2032
2033    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2034    /// See [`Stream::atomic`] for more details.
2035    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2036        Stream::new(
2037            self.location.tick.l.clone(),
2038            HydroNode::EndAtomic {
2039                inner: Box::new(self.ir_node.into_inner()),
2040                metadata: self
2041                    .location
2042                    .tick
2043                    .l
2044                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2045            },
2046        )
2047    }
2048}
2049
2050impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2051where
2052    L: Location<'a>,
2053{
2054    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2055    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2056    ///
2057    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2058    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2059    /// argument that declares where the stream will be atomically processed. Batching a stream into
2060    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2061    /// [`Tick`] will introduce asynchrony.
2062    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2063        let out_location = Atomic { tick: tick.clone() };
2064        Stream::new(
2065            out_location.clone(),
2066            HydroNode::BeginAtomic {
2067                inner: Box::new(self.ir_node.into_inner()),
2068                metadata: out_location
2069                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2070            },
2071        )
2072    }
2073
2074    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2075    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2076    /// the order of the input. The output stream will execute in the [`Tick`] that was
2077    /// used to create the atomic section.
2078    ///
2079    /// # Non-Determinism
2080    /// The batch boundaries are non-deterministic and may change across executions.
2081    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2082        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2083        Stream::new(
2084            tick.clone(),
2085            HydroNode::Batch {
2086                inner: Box::new(self.ir_node.into_inner()),
2087                metadata: tick
2088                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2089            },
2090        )
2091    }
2092
2093    /// Given a time interval, returns a stream corresponding to samples taken from the
2094    /// stream roughly at that interval. The output will have elements in the same order
2095    /// as the input, but with arbitrary elements skipped between samples. There is also
2096    /// no guarantee on the exact timing of the samples.
2097    ///
2098    /// # Non-Determinism
2099    /// The output stream is non-deterministic in which elements are sampled, since this
2100    /// is controlled by a clock.
2101    pub fn sample_every(
2102        self,
2103        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2104        nondet: NonDet,
2105    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2106    where
2107        L: NoTick + NoAtomic,
2108    {
2109        let samples = self.location.source_interval(interval, nondet);
2110
2111        let tick = self.location.tick();
2112        self.batch(&tick, nondet)
2113            .filter_if_some(samples.batch(&tick, nondet).first())
2114            .all_ticks()
2115            .weaken_retries()
2116    }
2117
2118    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2119    /// stream has not emitted a value since that duration.
2120    ///
2121    /// # Non-Determinism
2122    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2123    /// samples take place, timeouts may be non-deterministically generated or missed,
2124    /// and the notification of the timeout may be delayed as well. There is also no
2125    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2126    /// detected based on when the next sample is taken.
2127    pub fn timeout(
2128        self,
2129        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2130        nondet: NonDet,
2131    ) -> Optional<(), L, Unbounded>
2132    where
2133        L: NoTick + NoAtomic,
2134    {
2135        let tick = self.location.tick();
2136
2137        let latest_received = self.assume_retries(nondet).fold(
2138            q!(|| None),
2139            q!(
2140                |latest, _| {
2141                    *latest = Some(Instant::now());
2142                },
2143                commutative = ManualProof(/* TODO */)
2144            ),
2145        );
2146
2147        latest_received
2148            .snapshot(&tick, nondet)
2149            .filter_map(q!(move |latest_received| {
2150                if let Some(latest_received) = latest_received {
2151                    if Instant::now().duration_since(latest_received) > duration {
2152                        Some(())
2153                    } else {
2154                        None
2155                    }
2156                } else {
2157                    Some(())
2158                }
2159            }))
2160            .latest()
2161    }
2162}
2163
2164impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2165where
2166    L: Location<'a> + NoTick + NoAtomic,
2167    F: Future<Output = T>,
2168{
2169    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2170    /// Future outputs are produced as available, regardless of input arrival order.
2171    ///
2172    /// # Example
2173    /// ```rust
2174    /// # #[cfg(feature = "deploy")] {
2175    /// # use std::collections::HashSet;
2176    /// # use futures::StreamExt;
2177    /// # use hydro_lang::prelude::*;
2178    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2179    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2180    ///     .map(q!(|x| async move {
2181    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2182    ///         x
2183    ///     }))
2184    ///     .resolve_futures()
2185    /// #   },
2186    /// #   |mut stream| async move {
2187    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2188    /// #       let mut output = HashSet::new();
2189    /// #       for _ in 1..10 {
2190    /// #           output.insert(stream.next().await.unwrap());
2191    /// #       }
2192    /// #       assert_eq!(
2193    /// #           output,
2194    /// #           HashSet::<i32>::from_iter(1..10)
2195    /// #       );
2196    /// #   },
2197    /// # ));
2198    /// # }
2199    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2200        Stream::new(
2201            self.location.clone(),
2202            HydroNode::ResolveFutures {
2203                input: Box::new(self.ir_node.into_inner()),
2204                metadata: self
2205                    .location
2206                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2207            },
2208        )
2209    }
2210
2211    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2212    /// Future outputs are produced in the same order as the input stream.
2213    ///
2214    /// # Example
2215    /// ```rust
2216    /// # #[cfg(feature = "deploy")] {
2217    /// # use std::collections::HashSet;
2218    /// # use futures::StreamExt;
2219    /// # use hydro_lang::prelude::*;
2220    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2221    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2222    ///     .map(q!(|x| async move {
2223    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2224    ///         x
2225    ///     }))
2226    ///     .resolve_futures_ordered()
2227    /// #   },
2228    /// #   |mut stream| async move {
2229    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2230    /// #       let mut output = Vec::new();
2231    /// #       for _ in 1..10 {
2232    /// #           output.push(stream.next().await.unwrap());
2233    /// #       }
2234    /// #       assert_eq!(
2235    /// #           output,
2236    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2237    /// #       );
2238    /// #   },
2239    /// # ));
2240    /// # }
2241    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2242        Stream::new(
2243            self.location.clone(),
2244            HydroNode::ResolveFuturesOrdered {
2245                input: Box::new(self.ir_node.into_inner()),
2246                metadata: self
2247                    .location
2248                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2249            },
2250        )
2251    }
2252}
2253
2254impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2255where
2256    L: Location<'a> + NoTick,
2257{
2258    /// Executes the provided closure for every element in this stream.
2259    ///
2260    /// Because the closure may have side effects, the stream must have deterministic order
2261    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2262    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2263    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2264    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2265        let f = f.splice_fn1_ctx(&self.location).into();
2266        self.location
2267            .flow_state()
2268            .borrow_mut()
2269            .push_root(HydroRoot::ForEach {
2270                input: Box::new(self.ir_node.into_inner()),
2271                f,
2272                op_metadata: HydroIrOpMetadata::new(),
2273            });
2274    }
2275
2276    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2277    /// TCP socket to some other server. You should _not_ use this API for interacting with
2278    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2279    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2280    /// interaction with asynchronous sinks.
2281    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2282    where
2283        S: 'a + futures::Sink<T> + Unpin,
2284    {
2285        self.location
2286            .flow_state()
2287            .borrow_mut()
2288            .push_root(HydroRoot::DestSink {
2289                sink: sink.splice_typed_ctx(&self.location).into(),
2290                input: Box::new(self.ir_node.into_inner()),
2291                op_metadata: HydroIrOpMetadata::new(),
2292            });
2293    }
2294}
2295
2296impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2297where
2298    L: Location<'a>,
2299{
2300    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2301    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2302    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2303        Stream::new(
2304            self.location.outer().clone(),
2305            HydroNode::YieldConcat {
2306                inner: Box::new(self.ir_node.into_inner()),
2307                metadata: self
2308                    .location
2309                    .outer()
2310                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2311            },
2312        )
2313    }
2314
2315    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2316    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2317    ///
2318    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2319    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2320    /// stream's [`Tick`] context.
2321    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2322        let out_location = Atomic {
2323            tick: self.location.clone(),
2324        };
2325
2326        Stream::new(
2327            out_location.clone(),
2328            HydroNode::YieldConcat {
2329                inner: Box::new(self.ir_node.into_inner()),
2330                metadata: out_location
2331                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2332            },
2333        )
2334    }
2335
2336    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2337    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2338    /// input.
2339    ///
2340    /// This API is particularly useful for stateful computation on batches of data, such as
2341    /// maintaining an accumulated state that is up to date with the current batch.
2342    ///
2343    /// # Example
2344    /// ```rust
2345    /// # #[cfg(feature = "deploy")] {
2346    /// # use hydro_lang::prelude::*;
2347    /// # use futures::StreamExt;
2348    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2349    /// let tick = process.tick();
2350    /// # // ticks are lazy by default, forces the second tick to run
2351    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2352    /// # let batch_first_tick = process
2353    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2354    /// #  .batch(&tick, nondet!(/** test */));
2355    /// # let batch_second_tick = process
2356    /// #   .source_iter(q!(vec![5, 6, 7]))
2357    /// #   .batch(&tick, nondet!(/** test */))
2358    /// #   .defer_tick(); // appears on the second tick
2359    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2360    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2361    ///
2362    /// input.batch(&tick, nondet!(/** test */))
2363    ///     .across_ticks(|s| s.count()).all_ticks()
2364    /// # }, |mut stream| async move {
2365    /// // [4, 7]
2366    /// assert_eq!(stream.next().await.unwrap(), 4);
2367    /// assert_eq!(stream.next().await.unwrap(), 7);
2368    /// # }));
2369    /// # }
2370    /// ```
2371    pub fn across_ticks<Out: BatchAtomic>(
2372        self,
2373        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2374    ) -> Out::Batched {
2375        thunk(self.all_ticks_atomic()).batched_atomic()
2376    }
2377
2378    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2379    /// always has the elements of `self` at tick `T - 1`.
2380    ///
2381    /// At tick `0`, the output stream is empty, since there is no previous tick.
2382    ///
2383    /// This operator enables stateful iterative processing with ticks, by sending data from one
2384    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2385    ///
2386    /// # Example
2387    /// ```rust
2388    /// # #[cfg(feature = "deploy")] {
2389    /// # use hydro_lang::prelude::*;
2390    /// # use futures::StreamExt;
2391    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2392    /// let tick = process.tick();
2393    /// // ticks are lazy by default, forces the second tick to run
2394    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2395    ///
2396    /// let batch_first_tick = process
2397    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2398    ///   .batch(&tick, nondet!(/** test */));
2399    /// let batch_second_tick = process
2400    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2401    ///   .batch(&tick, nondet!(/** test */))
2402    ///   .defer_tick(); // appears on the second tick
2403    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2404    ///
2405    /// changes_across_ticks.clone().filter_not_in(
2406    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2407    /// ).all_ticks()
2408    /// # }, |mut stream| async move {
2409    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2410    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2411    /// #     assert_eq!(stream.next().await.unwrap(), w);
2412    /// # }
2413    /// # }));
2414    /// # }
2415    /// ```
2416    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2417        Stream::new(
2418            self.location.clone(),
2419            HydroNode::DeferTick {
2420                input: Box::new(self.ir_node.into_inner()),
2421                metadata: self
2422                    .location
2423                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2424            },
2425        )
2426    }
2427}
2428
2429#[cfg(test)]
2430mod tests {
2431    #[cfg(feature = "deploy")]
2432    use futures::{SinkExt, StreamExt};
2433    #[cfg(feature = "deploy")]
2434    use hydro_deploy::Deployment;
2435    #[cfg(feature = "deploy")]
2436    use serde::{Deserialize, Serialize};
2437    #[cfg(any(feature = "deploy", feature = "sim"))]
2438    use stageleft::q;
2439
2440    #[cfg(any(feature = "deploy", feature = "sim"))]
2441    use crate::compile::builder::FlowBuilder;
2442    #[cfg(feature = "deploy")]
2443    use crate::live_collections::sliced::sliced;
2444    #[cfg(feature = "deploy")]
2445    use crate::live_collections::stream::ExactlyOnce;
2446    #[cfg(feature = "sim")]
2447    use crate::live_collections::stream::NoOrder;
2448    #[cfg(any(feature = "deploy", feature = "sim"))]
2449    use crate::live_collections::stream::TotalOrder;
2450    #[cfg(any(feature = "deploy", feature = "sim"))]
2451    use crate::location::Location;
2452    #[cfg(any(feature = "deploy", feature = "sim"))]
2453    use crate::nondet::nondet;
2454
2455    mod backtrace_chained_ops;
2456
2457    #[cfg(feature = "deploy")]
2458    struct P1 {}
2459    #[cfg(feature = "deploy")]
2460    struct P2 {}
2461
2462    #[cfg(feature = "deploy")]
2463    #[derive(Serialize, Deserialize, Debug)]
2464    struct SendOverNetwork {
2465        n: u32,
2466    }
2467
2468    #[cfg(feature = "deploy")]
2469    #[tokio::test]
2470    async fn first_ten_distributed() {
2471        use crate::networking::TCP;
2472
2473        let mut deployment = Deployment::new();
2474
2475        let mut flow = FlowBuilder::new();
2476        let first_node = flow.process::<P1>();
2477        let second_node = flow.process::<P2>();
2478        let external = flow.external::<P2>();
2479
2480        let numbers = first_node.source_iter(q!(0..10));
2481        let out_port = numbers
2482            .map(q!(|n| SendOverNetwork { n }))
2483            .send(&second_node, TCP.bincode())
2484            .send_bincode_external(&external);
2485
2486        let nodes = flow
2487            .with_process(&first_node, deployment.Localhost())
2488            .with_process(&second_node, deployment.Localhost())
2489            .with_external(&external, deployment.Localhost())
2490            .deploy(&mut deployment);
2491
2492        deployment.deploy().await.unwrap();
2493
2494        let mut external_out = nodes.connect(out_port).await;
2495
2496        deployment.start().await.unwrap();
2497
2498        for i in 0..10 {
2499            assert_eq!(external_out.next().await.unwrap().n, i);
2500        }
2501    }
2502
2503    #[cfg(feature = "deploy")]
2504    #[tokio::test]
2505    async fn first_cardinality() {
2506        let mut deployment = Deployment::new();
2507
2508        let mut flow = FlowBuilder::new();
2509        let node = flow.process::<()>();
2510        let external = flow.external::<()>();
2511
2512        let node_tick = node.tick();
2513        let count = node_tick
2514            .singleton(q!([1, 2, 3]))
2515            .into_stream()
2516            .flatten_ordered()
2517            .first()
2518            .into_stream()
2519            .count()
2520            .all_ticks()
2521            .send_bincode_external(&external);
2522
2523        let nodes = flow
2524            .with_process(&node, deployment.Localhost())
2525            .with_external(&external, deployment.Localhost())
2526            .deploy(&mut deployment);
2527
2528        deployment.deploy().await.unwrap();
2529
2530        let mut external_out = nodes.connect(count).await;
2531
2532        deployment.start().await.unwrap();
2533
2534        assert_eq!(external_out.next().await.unwrap(), 1);
2535    }
2536
2537    #[cfg(feature = "deploy")]
2538    #[tokio::test]
2539    async fn unbounded_reduce_remembers_state() {
2540        let mut deployment = Deployment::new();
2541
2542        let mut flow = FlowBuilder::new();
2543        let node = flow.process::<()>();
2544        let external = flow.external::<()>();
2545
2546        let (input_port, input) = node.source_external_bincode(&external);
2547        let out = input
2548            .reduce(q!(|acc, v| *acc += v))
2549            .sample_eager(nondet!(/** test */))
2550            .send_bincode_external(&external);
2551
2552        let nodes = flow
2553            .with_process(&node, deployment.Localhost())
2554            .with_external(&external, deployment.Localhost())
2555            .deploy(&mut deployment);
2556
2557        deployment.deploy().await.unwrap();
2558
2559        let mut external_in = nodes.connect(input_port).await;
2560        let mut external_out = nodes.connect(out).await;
2561
2562        deployment.start().await.unwrap();
2563
2564        external_in.send(1).await.unwrap();
2565        assert_eq!(external_out.next().await.unwrap(), 1);
2566
2567        external_in.send(2).await.unwrap();
2568        assert_eq!(external_out.next().await.unwrap(), 3);
2569    }
2570
2571    #[cfg(feature = "deploy")]
2572    #[tokio::test]
2573    async fn top_level_bounded_cross_singleton() {
2574        let mut deployment = Deployment::new();
2575
2576        let mut flow = FlowBuilder::new();
2577        let node = flow.process::<()>();
2578        let external = flow.external::<()>();
2579
2580        let (input_port, input) =
2581            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2582
2583        let out = input
2584            .cross_singleton(
2585                node.source_iter(q!(vec![1, 2, 3]))
2586                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2587            )
2588            .send_bincode_external(&external);
2589
2590        let nodes = flow
2591            .with_process(&node, deployment.Localhost())
2592            .with_external(&external, deployment.Localhost())
2593            .deploy(&mut deployment);
2594
2595        deployment.deploy().await.unwrap();
2596
2597        let mut external_in = nodes.connect(input_port).await;
2598        let mut external_out = nodes.connect(out).await;
2599
2600        deployment.start().await.unwrap();
2601
2602        external_in.send(1).await.unwrap();
2603        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2604
2605        external_in.send(2).await.unwrap();
2606        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2607    }
2608
2609    #[cfg(feature = "deploy")]
2610    #[tokio::test]
2611    async fn top_level_bounded_reduce_cardinality() {
2612        let mut deployment = Deployment::new();
2613
2614        let mut flow = FlowBuilder::new();
2615        let node = flow.process::<()>();
2616        let external = flow.external::<()>();
2617
2618        let (input_port, input) =
2619            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2620
2621        let out = sliced! {
2622            let input = use(input, nondet!(/** test */));
2623            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2624            input.cross_singleton(v.into_stream().count())
2625        }
2626        .send_bincode_external(&external);
2627
2628        let nodes = flow
2629            .with_process(&node, deployment.Localhost())
2630            .with_external(&external, deployment.Localhost())
2631            .deploy(&mut deployment);
2632
2633        deployment.deploy().await.unwrap();
2634
2635        let mut external_in = nodes.connect(input_port).await;
2636        let mut external_out = nodes.connect(out).await;
2637
2638        deployment.start().await.unwrap();
2639
2640        external_in.send(1).await.unwrap();
2641        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2642
2643        external_in.send(2).await.unwrap();
2644        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2645    }
2646
2647    #[cfg(feature = "deploy")]
2648    #[tokio::test]
2649    async fn top_level_bounded_into_singleton_cardinality() {
2650        let mut deployment = Deployment::new();
2651
2652        let mut flow = FlowBuilder::new();
2653        let node = flow.process::<()>();
2654        let external = flow.external::<()>();
2655
2656        let (input_port, input) =
2657            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2658
2659        let out = sliced! {
2660            let input = use(input, nondet!(/** test */));
2661            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2662            input.cross_singleton(v.into_stream().count())
2663        }
2664        .send_bincode_external(&external);
2665
2666        let nodes = flow
2667            .with_process(&node, deployment.Localhost())
2668            .with_external(&external, deployment.Localhost())
2669            .deploy(&mut deployment);
2670
2671        deployment.deploy().await.unwrap();
2672
2673        let mut external_in = nodes.connect(input_port).await;
2674        let mut external_out = nodes.connect(out).await;
2675
2676        deployment.start().await.unwrap();
2677
2678        external_in.send(1).await.unwrap();
2679        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2680
2681        external_in.send(2).await.unwrap();
2682        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2683    }
2684
2685    #[cfg(feature = "deploy")]
2686    #[tokio::test]
2687    async fn atomic_fold_replays_each_tick() {
2688        let mut deployment = Deployment::new();
2689
2690        let mut flow = FlowBuilder::new();
2691        let node = flow.process::<()>();
2692        let external = flow.external::<()>();
2693
2694        let (input_port, input) =
2695            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2696        let tick = node.tick();
2697
2698        let out = input
2699            .batch(&tick, nondet!(/** test */))
2700            .cross_singleton(
2701                node.source_iter(q!(vec![1, 2, 3]))
2702                    .atomic(&tick)
2703                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
2704                    .snapshot_atomic(nondet!(/** test */)),
2705            )
2706            .all_ticks()
2707            .send_bincode_external(&external);
2708
2709        let nodes = flow
2710            .with_process(&node, deployment.Localhost())
2711            .with_external(&external, deployment.Localhost())
2712            .deploy(&mut deployment);
2713
2714        deployment.deploy().await.unwrap();
2715
2716        let mut external_in = nodes.connect(input_port).await;
2717        let mut external_out = nodes.connect(out).await;
2718
2719        deployment.start().await.unwrap();
2720
2721        external_in.send(1).await.unwrap();
2722        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2723
2724        external_in.send(2).await.unwrap();
2725        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2726    }
2727
2728    #[cfg(feature = "deploy")]
2729    #[tokio::test]
2730    async fn unbounded_scan_remembers_state() {
2731        let mut deployment = Deployment::new();
2732
2733        let mut flow = FlowBuilder::new();
2734        let node = flow.process::<()>();
2735        let external = flow.external::<()>();
2736
2737        let (input_port, input) = node.source_external_bincode(&external);
2738        let out = input
2739            .scan(
2740                q!(|| 0),
2741                q!(|acc, v| {
2742                    *acc += v;
2743                    Some(*acc)
2744                }),
2745            )
2746            .send_bincode_external(&external);
2747
2748        let nodes = flow
2749            .with_process(&node, deployment.Localhost())
2750            .with_external(&external, deployment.Localhost())
2751            .deploy(&mut deployment);
2752
2753        deployment.deploy().await.unwrap();
2754
2755        let mut external_in = nodes.connect(input_port).await;
2756        let mut external_out = nodes.connect(out).await;
2757
2758        deployment.start().await.unwrap();
2759
2760        external_in.send(1).await.unwrap();
2761        assert_eq!(external_out.next().await.unwrap(), 1);
2762
2763        external_in.send(2).await.unwrap();
2764        assert_eq!(external_out.next().await.unwrap(), 3);
2765    }
2766
2767    #[cfg(feature = "deploy")]
2768    #[tokio::test]
2769    async fn unbounded_enumerate_remembers_state() {
2770        let mut deployment = Deployment::new();
2771
2772        let mut flow = FlowBuilder::new();
2773        let node = flow.process::<()>();
2774        let external = flow.external::<()>();
2775
2776        let (input_port, input) = node.source_external_bincode(&external);
2777        let out = input.enumerate().send_bincode_external(&external);
2778
2779        let nodes = flow
2780            .with_process(&node, deployment.Localhost())
2781            .with_external(&external, deployment.Localhost())
2782            .deploy(&mut deployment);
2783
2784        deployment.deploy().await.unwrap();
2785
2786        let mut external_in = nodes.connect(input_port).await;
2787        let mut external_out = nodes.connect(out).await;
2788
2789        deployment.start().await.unwrap();
2790
2791        external_in.send(1).await.unwrap();
2792        assert_eq!(external_out.next().await.unwrap(), (0, 1));
2793
2794        external_in.send(2).await.unwrap();
2795        assert_eq!(external_out.next().await.unwrap(), (1, 2));
2796    }
2797
2798    #[cfg(feature = "deploy")]
2799    #[tokio::test]
2800    async fn unbounded_unique_remembers_state() {
2801        let mut deployment = Deployment::new();
2802
2803        let mut flow = FlowBuilder::new();
2804        let node = flow.process::<()>();
2805        let external = flow.external::<()>();
2806
2807        let (input_port, input) =
2808            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2809        let out = input.unique().send_bincode_external(&external);
2810
2811        let nodes = flow
2812            .with_process(&node, deployment.Localhost())
2813            .with_external(&external, deployment.Localhost())
2814            .deploy(&mut deployment);
2815
2816        deployment.deploy().await.unwrap();
2817
2818        let mut external_in = nodes.connect(input_port).await;
2819        let mut external_out = nodes.connect(out).await;
2820
2821        deployment.start().await.unwrap();
2822
2823        external_in.send(1).await.unwrap();
2824        assert_eq!(external_out.next().await.unwrap(), 1);
2825
2826        external_in.send(2).await.unwrap();
2827        assert_eq!(external_out.next().await.unwrap(), 2);
2828
2829        external_in.send(1).await.unwrap();
2830        external_in.send(3).await.unwrap();
2831        assert_eq!(external_out.next().await.unwrap(), 3);
2832    }
2833
2834    #[cfg(feature = "sim")]
2835    #[test]
2836    #[should_panic]
2837    fn sim_batch_nondet_size() {
2838        let mut flow = FlowBuilder::new();
2839        let node = flow.process::<()>();
2840
2841        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
2842
2843        let tick = node.tick();
2844        let out_recv = input
2845            .batch(&tick, nondet!(/** test */))
2846            .count()
2847            .all_ticks()
2848            .sim_output();
2849
2850        flow.sim().exhaustive(async || {
2851            in_send.send(());
2852            in_send.send(());
2853            in_send.send(());
2854
2855            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
2856        });
2857    }
2858
2859    #[cfg(feature = "sim")]
2860    #[test]
2861    fn sim_batch_preserves_order() {
2862        let mut flow = FlowBuilder::new();
2863        let node = flow.process::<()>();
2864
2865        let (in_send, input) = node.sim_input();
2866
2867        let tick = node.tick();
2868        let out_recv = input
2869            .batch(&tick, nondet!(/** test */))
2870            .all_ticks()
2871            .sim_output();
2872
2873        flow.sim().exhaustive(async || {
2874            in_send.send(1);
2875            in_send.send(2);
2876            in_send.send(3);
2877
2878            out_recv.assert_yields_only([1, 2, 3]).await;
2879        });
2880    }
2881
2882    #[cfg(feature = "sim")]
2883    #[test]
2884    #[should_panic]
2885    fn sim_batch_unordered_shuffles() {
2886        let mut flow = FlowBuilder::new();
2887        let node = flow.process::<()>();
2888
2889        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2890
2891        let tick = node.tick();
2892        let batch = input.batch(&tick, nondet!(/** test */));
2893        let out_recv = batch
2894            .clone()
2895            .min()
2896            .zip(batch.max())
2897            .all_ticks()
2898            .sim_output();
2899
2900        flow.sim().exhaustive(async || {
2901            in_send.send_many_unordered([1, 2, 3]);
2902
2903            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
2904                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
2905            }
2906        });
2907    }
2908
2909    #[cfg(feature = "sim")]
2910    #[test]
2911    fn sim_batch_unordered_shuffles_count() {
2912        let mut flow = FlowBuilder::new();
2913        let node = flow.process::<()>();
2914
2915        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2916
2917        let tick = node.tick();
2918        let batch = input.batch(&tick, nondet!(/** test */));
2919        let out_recv = batch.all_ticks().sim_output();
2920
2921        let instance_count = flow.sim().exhaustive(async || {
2922            in_send.send_many_unordered([1, 2, 3, 4]);
2923            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
2924        });
2925
2926        assert_eq!(
2927            instance_count,
2928            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
2929        )
2930    }
2931
2932    #[cfg(feature = "sim")]
2933    #[test]
2934    #[should_panic]
2935    fn sim_observe_order_batched() {
2936        let mut flow = FlowBuilder::new();
2937        let node = flow.process::<()>();
2938
2939        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2940
2941        let tick = node.tick();
2942        let batch = input.batch(&tick, nondet!(/** test */));
2943        let out_recv = batch
2944            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2945            .all_ticks()
2946            .sim_output();
2947
2948        flow.sim().exhaustive(async || {
2949            in_send.send_many_unordered([1, 2, 3, 4]);
2950            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
2951        });
2952    }
2953
2954    #[cfg(feature = "sim")]
2955    #[test]
2956    fn sim_observe_order_batched_count() {
2957        let mut flow = FlowBuilder::new();
2958        let node = flow.process::<()>();
2959
2960        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2961
2962        let tick = node.tick();
2963        let batch = input.batch(&tick, nondet!(/** test */));
2964        let out_recv = batch
2965            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2966            .all_ticks()
2967            .sim_output();
2968
2969        let instance_count = flow.sim().exhaustive(async || {
2970            in_send.send_many_unordered([1, 2, 3, 4]);
2971            let _ = out_recv.collect::<Vec<_>>().await;
2972        });
2973
2974        assert_eq!(
2975            instance_count,
2976            192 // 4! * 2^{4 - 1}
2977        )
2978    }
2979
2980    #[cfg(feature = "sim")]
2981    #[test]
2982    fn sim_unordered_count_instance_count() {
2983        let mut flow = FlowBuilder::new();
2984        let node = flow.process::<()>();
2985
2986        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2987
2988        let tick = node.tick();
2989        let out_recv = input
2990            .count()
2991            .snapshot(&tick, nondet!(/** test */))
2992            .all_ticks()
2993            .sim_output();
2994
2995        let instance_count = flow.sim().exhaustive(async || {
2996            in_send.send_many_unordered([1, 2, 3, 4]);
2997            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
2998        });
2999
3000        assert_eq!(
3001            instance_count,
3002            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3003        )
3004    }
3005
3006    #[cfg(feature = "sim")]
3007    #[test]
3008    fn sim_top_level_assume_ordering() {
3009        let mut flow = FlowBuilder::new();
3010        let node = flow.process::<()>();
3011
3012        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3013
3014        let out_recv = input
3015            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3016            .sim_output();
3017
3018        let instance_count = flow.sim().exhaustive(async || {
3019            in_send.send_many_unordered([1, 2, 3]);
3020            let mut out = out_recv.collect::<Vec<_>>().await;
3021            out.sort();
3022            assert_eq!(out, vec![1, 2, 3]);
3023        });
3024
3025        assert_eq!(instance_count, 6)
3026    }
3027
3028    #[cfg(feature = "sim")]
3029    #[test]
3030    fn sim_top_level_assume_ordering_cycle_back() {
3031        let mut flow = FlowBuilder::new();
3032        let node = flow.process::<()>();
3033
3034        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3035
3036        let (complete_cycle_back, cycle_back) =
3037            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3038        let ordered = input
3039            .interleave(cycle_back)
3040            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3041        complete_cycle_back.complete(
3042            ordered
3043                .clone()
3044                .map(q!(|v| v + 1))
3045                .filter(q!(|v| v % 2 == 1)),
3046        );
3047
3048        let out_recv = ordered.sim_output();
3049
3050        let mut saw = false;
3051        let instance_count = flow.sim().exhaustive(async || {
3052            in_send.send_many_unordered([0, 2]);
3053            let out = out_recv.collect::<Vec<_>>().await;
3054
3055            if out.starts_with(&[0, 1, 2]) {
3056                saw = true;
3057            }
3058        });
3059
3060        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3061        assert_eq!(instance_count, 6)
3062    }
3063
3064    #[cfg(feature = "sim")]
3065    #[test]
3066    fn sim_top_level_assume_ordering_cycle_back_tick() {
3067        let mut flow = FlowBuilder::new();
3068        let node = flow.process::<()>();
3069
3070        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3071
3072        let (complete_cycle_back, cycle_back) =
3073            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3074        let ordered = input
3075            .interleave(cycle_back)
3076            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3077        complete_cycle_back.complete(
3078            ordered
3079                .clone()
3080                .batch(&node.tick(), nondet!(/** test */))
3081                .all_ticks()
3082                .map(q!(|v| v + 1))
3083                .filter(q!(|v| v % 2 == 1)),
3084        );
3085
3086        let out_recv = ordered.sim_output();
3087
3088        let mut saw = false;
3089        let instance_count = flow.sim().exhaustive(async || {
3090            in_send.send_many_unordered([0, 2]);
3091            let out = out_recv.collect::<Vec<_>>().await;
3092
3093            if out.starts_with(&[0, 1, 2]) {
3094                saw = true;
3095            }
3096        });
3097
3098        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3099        assert_eq!(instance_count, 58)
3100    }
3101
3102    #[cfg(feature = "sim")]
3103    #[test]
3104    fn sim_top_level_assume_ordering_multiple() {
3105        let mut flow = FlowBuilder::new();
3106        let node = flow.process::<()>();
3107
3108        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3109        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3110
3111        let (complete_cycle_back, cycle_back) =
3112            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3113        let input1_ordered = input
3114            .clone()
3115            .interleave(cycle_back)
3116            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3117        let foo = input1_ordered
3118            .clone()
3119            .map(q!(|v| v + 3))
3120            .weaken_ordering::<NoOrder>()
3121            .interleave(input2)
3122            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3123
3124        complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3125
3126        let out_recv = input1_ordered.sim_output();
3127
3128        let mut saw = false;
3129        let instance_count = flow.sim().exhaustive(async || {
3130            in_send.send_many_unordered([0, 1]);
3131            let out = out_recv.collect::<Vec<_>>().await;
3132
3133            if out.starts_with(&[0, 3, 1]) {
3134                saw = true;
3135            }
3136        });
3137
3138        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3139        assert_eq!(instance_count, 24)
3140    }
3141
3142    #[cfg(feature = "sim")]
3143    #[test]
3144    fn sim_atomic_assume_ordering_cycle_back() {
3145        let mut flow = FlowBuilder::new();
3146        let node = flow.process::<()>();
3147
3148        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3149
3150        let (complete_cycle_back, cycle_back) =
3151            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3152        let ordered = input
3153            .interleave(cycle_back)
3154            .atomic(&node.tick())
3155            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3156            .end_atomic();
3157        complete_cycle_back.complete(
3158            ordered
3159                .clone()
3160                .map(q!(|v| v + 1))
3161                .filter(q!(|v| v % 2 == 1)),
3162        );
3163
3164        let out_recv = ordered.sim_output();
3165
3166        let instance_count = flow.sim().exhaustive(async || {
3167            in_send.send_many_unordered([0, 2]);
3168            let out = out_recv.collect::<Vec<_>>().await;
3169            assert_eq!(out.len(), 4);
3170        });
3171
3172        assert_eq!(instance_count, 22)
3173    }
3174}