hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::batch_atomic::BatchAtomic;
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::{Atomic, DeferTick, NoAtomic};
28use crate::location::{Location, NoTick, Tick, check_matching_location};
29use crate::nondet::{NonDet, nondet};
30
31pub mod networking;
32
33/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
34#[sealed::sealed]
35pub trait Ordering:
36    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
37{
38    /// The [`StreamOrder`] corresponding to this type.
39    const ORDERING_KIND: StreamOrder;
40}
41
42/// Marks the stream as being totally ordered, which means that there are
43/// no sources of non-determinism (other than intentional ones) that will
44/// affect the order of elements.
45pub enum TotalOrder {}
46
47#[sealed::sealed]
48impl Ordering for TotalOrder {
49    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
50}
51
52/// Marks the stream as having no order, which means that the order of
53/// elements may be affected by non-determinism.
54///
55/// This restricts certain operators, such as `fold` and `reduce`, to only
56/// be used with commutative aggregation functions.
57pub enum NoOrder {}
58
59#[sealed::sealed]
60impl Ordering for NoOrder {
61    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
62}
63
64/// Helper trait for determining the weakest of two orderings.
65#[sealed::sealed]
66pub trait MinOrder<Other: ?Sized> {
67    /// The weaker of the two orderings.
68    type Min: Ordering;
69}
70
71#[sealed::sealed]
72impl MinOrder<NoOrder> for TotalOrder {
73    type Min = NoOrder;
74}
75
76#[sealed::sealed]
77impl MinOrder<TotalOrder> for TotalOrder {
78    type Min = TotalOrder;
79}
80
81#[sealed::sealed]
82impl MinOrder<TotalOrder> for NoOrder {
83    type Min = NoOrder;
84}
85
86#[sealed::sealed]
87impl MinOrder<NoOrder> for NoOrder {
88    type Min = NoOrder;
89}
90
91/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
92#[sealed::sealed]
93pub trait Retries:
94    MinRetries<Self, Min = Self>
95    + MinRetries<ExactlyOnce, Min = Self>
96    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
97{
98    /// The [`StreamRetry`] corresponding to this type.
99    const RETRIES_KIND: StreamRetry;
100}
101
102/// Marks the stream as having deterministic message cardinality, with no
103/// possibility of duplicates.
104pub enum ExactlyOnce {}
105
106#[sealed::sealed]
107impl Retries for ExactlyOnce {
108    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
109}
110
111/// Marks the stream as having non-deterministic message cardinality, which
112/// means that duplicates may occur, but messages will not be dropped.
113pub enum AtLeastOnce {}
114
115#[sealed::sealed]
116impl Retries for AtLeastOnce {
117    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
118}
119
120/// Helper trait for determining the weakest of two retry guarantees.
121#[sealed::sealed]
122pub trait MinRetries<Other: ?Sized> {
123    /// The weaker of the two retry guarantees.
124    type Min: Retries;
125}
126
127#[sealed::sealed]
128impl MinRetries<AtLeastOnce> for ExactlyOnce {
129    type Min = AtLeastOnce;
130}
131
132#[sealed::sealed]
133impl MinRetries<ExactlyOnce> for ExactlyOnce {
134    type Min = ExactlyOnce;
135}
136
137#[sealed::sealed]
138impl MinRetries<ExactlyOnce> for AtLeastOnce {
139    type Min = AtLeastOnce;
140}
141
142#[sealed::sealed]
143impl MinRetries<AtLeastOnce> for AtLeastOnce {
144    type Min = AtLeastOnce;
145}
146
147/// Streaming sequence of elements with type `Type`.
148///
149/// This live collection represents a growing sequence of elements, with new elements being
150/// asynchronously appended to the end of the sequence. This can be used to model the arrival
151/// of network input, such as API requests, or streaming ingestion.
152///
153/// By default, all streams have deterministic ordering and each element is materialized exactly
154/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
155/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
156/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
157///
158/// Type Parameters:
159/// - `Type`: the type of elements in the stream
160/// - `Loc`: the location where the stream is being materialized
161/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
162/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
163///   (default is [`TotalOrder`])
164/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
165///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
166pub struct Stream<
167    Type,
168    Loc,
169    Bound: Boundedness = Unbounded,
170    Order: Ordering = TotalOrder,
171    Retry: Retries = ExactlyOnce,
172> {
173    pub(crate) location: Loc,
174    pub(crate) ir_node: RefCell<HydroNode>,
175
176    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
177}
178
179impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
180    for Stream<T, L, Unbounded, O, R>
181where
182    L: Location<'a>,
183{
184    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
185        Stream {
186            location: stream.location,
187            ir_node: stream.ir_node,
188            _phantom: PhantomData,
189        }
190    }
191}
192
193impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
194    for Stream<T, L, B, NoOrder, R>
195where
196    L: Location<'a>,
197{
198    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
199        Stream {
200            location: stream.location,
201            ir_node: stream.ir_node,
202            _phantom: PhantomData,
203        }
204    }
205}
206
207impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
208    for Stream<T, L, B, O, AtLeastOnce>
209where
210    L: Location<'a>,
211{
212    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
213        Stream {
214            location: stream.location,
215            ir_node: stream.ir_node,
216            _phantom: PhantomData,
217        }
218    }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
222where
223    L: Location<'a>,
224{
225    fn defer_tick(self) -> Self {
226        Stream::defer_tick(self)
227    }
228}
229
230impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
231    for Stream<T, Tick<L>, Bounded, O, R>
232where
233    L: Location<'a>,
234{
235    type Location = Tick<L>;
236
237    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
238        Stream::new(
239            location.clone(),
240            HydroNode::CycleSource {
241                ident,
242                metadata: location.new_node_metadata(Self::collection_kind()),
243            },
244        )
245    }
246}
247
248impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
249    for Stream<T, Tick<L>, Bounded, O, R>
250where
251    L: Location<'a>,
252{
253    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
254        assert_eq!(
255            Location::id(&self.location),
256            expected_location,
257            "locations do not match"
258        );
259        self.location
260            .flow_state()
261            .borrow_mut()
262            .push_root(HydroRoot::CycleSink {
263                ident,
264                input: Box::new(self.ir_node.into_inner()),
265                op_metadata: HydroIrOpMetadata::new(),
266            });
267    }
268}
269
270impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
271    for Stream<T, L, B, O, R>
272where
273    L: Location<'a> + NoTick,
274{
275    type Location = L;
276
277    fn create_source(ident: syn::Ident, location: L) -> Self {
278        Stream::new(
279            location.clone(),
280            HydroNode::CycleSource {
281                ident,
282                metadata: location.new_node_metadata(Self::collection_kind()),
283            },
284        )
285    }
286}
287
288impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
289    for Stream<T, L, B, O, R>
290where
291    L: Location<'a> + NoTick,
292{
293    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
294        assert_eq!(
295            Location::id(&self.location),
296            expected_location,
297            "locations do not match"
298        );
299        self.location
300            .flow_state()
301            .borrow_mut()
302            .push_root(HydroRoot::CycleSink {
303                ident,
304                input: Box::new(self.ir_node.into_inner()),
305                op_metadata: HydroIrOpMetadata::new(),
306            });
307    }
308}
309
310impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
311where
312    T: Clone,
313    L: Location<'a>,
314{
315    fn clone(&self) -> Self {
316        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
317            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
318            *self.ir_node.borrow_mut() = HydroNode::Tee {
319                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
320                metadata: self.location.new_node_metadata(Self::collection_kind()),
321            };
322        }
323
324        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
325            Stream {
326                location: self.location.clone(),
327                ir_node: HydroNode::Tee {
328                    inner: TeeNode(inner.0.clone()),
329                    metadata: metadata.clone(),
330                }
331                .into(),
332                _phantom: PhantomData,
333            }
334        } else {
335            unreachable!()
336        }
337    }
338}
339
340impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
341where
342    L: Location<'a>,
343{
344    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
345        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
346        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
347
348        Stream {
349            location,
350            ir_node: RefCell::new(ir_node),
351            _phantom: PhantomData,
352        }
353    }
354
355    /// Returns the [`Location`] where this stream is being materialized.
356    pub fn location(&self) -> &L {
357        &self.location
358    }
359
360    pub(crate) fn collection_kind() -> CollectionKind {
361        CollectionKind::Stream {
362            bound: B::BOUND_KIND,
363            order: O::ORDERING_KIND,
364            retry: R::RETRIES_KIND,
365            element_type: quote_type::<T>().into(),
366        }
367    }
368
369    /// Produces a stream based on invoking `f` on each element.
370    /// If you do not want to modify the stream and instead only want to view
371    /// each item use [`Stream::inspect`] instead.
372    ///
373    /// # Example
374    /// ```rust
375    /// # #[cfg(feature = "deploy")] {
376    /// # use hydro_lang::prelude::*;
377    /// # use futures::StreamExt;
378    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
379    /// let words = process.source_iter(q!(vec!["hello", "world"]));
380    /// words.map(q!(|x| x.to_uppercase()))
381    /// # }, |mut stream| async move {
382    /// # for w in vec!["HELLO", "WORLD"] {
383    /// #     assert_eq!(stream.next().await.unwrap(), w);
384    /// # }
385    /// # }));
386    /// # }
387    /// ```
388    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
389    where
390        F: Fn(T) -> U + 'a,
391    {
392        let f = f.splice_fn1_ctx(&self.location).into();
393        Stream::new(
394            self.location.clone(),
395            HydroNode::Map {
396                f,
397                input: Box::new(self.ir_node.into_inner()),
398                metadata: self
399                    .location
400                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
401            },
402        )
403    }
404
405    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
406    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
407    /// for the output type `U` must produce items in a **deterministic** order.
408    ///
409    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
410    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
411    ///
412    /// # Example
413    /// ```rust
414    /// # #[cfg(feature = "deploy")] {
415    /// # use hydro_lang::prelude::*;
416    /// # use futures::StreamExt;
417    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
418    /// process
419    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
420    ///     .flat_map_ordered(q!(|x| x))
421    /// # }, |mut stream| async move {
422    /// // 1, 2, 3, 4
423    /// # for w in (1..5) {
424    /// #     assert_eq!(stream.next().await.unwrap(), w);
425    /// # }
426    /// # }));
427    /// # }
428    /// ```
429    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
430    where
431        I: IntoIterator<Item = U>,
432        F: Fn(T) -> I + 'a,
433    {
434        let f = f.splice_fn1_ctx(&self.location).into();
435        Stream::new(
436            self.location.clone(),
437            HydroNode::FlatMap {
438                f,
439                input: Box::new(self.ir_node.into_inner()),
440                metadata: self
441                    .location
442                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
443            },
444        )
445    }
446
447    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
448    /// for the output type `U` to produce items in any order.
449    ///
450    /// # Example
451    /// ```rust
452    /// # #[cfg(feature = "deploy")] {
453    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
454    /// # use futures::StreamExt;
455    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
456    /// process
457    ///     .source_iter(q!(vec![
458    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
459    ///         std::collections::HashSet::from_iter(vec![3, 4]),
460    ///     ]))
461    ///     .flat_map_unordered(q!(|x| x))
462    /// # }, |mut stream| async move {
463    /// // 1, 2, 3, 4, but in no particular order
464    /// # let mut results = Vec::new();
465    /// # for w in (1..5) {
466    /// #     results.push(stream.next().await.unwrap());
467    /// # }
468    /// # results.sort();
469    /// # assert_eq!(results, vec![1, 2, 3, 4]);
470    /// # }));
471    /// # }
472    /// ```
473    pub fn flat_map_unordered<U, I, F>(
474        self,
475        f: impl IntoQuotedMut<'a, F, L>,
476    ) -> Stream<U, L, B, NoOrder, R>
477    where
478        I: IntoIterator<Item = U>,
479        F: Fn(T) -> I + 'a,
480    {
481        let f = f.splice_fn1_ctx(&self.location).into();
482        Stream::new(
483            self.location.clone(),
484            HydroNode::FlatMap {
485                f,
486                input: Box::new(self.ir_node.into_inner()),
487                metadata: self
488                    .location
489                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
490            },
491        )
492    }
493
494    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
495    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
496    ///
497    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
498    /// not deterministic, use [`Stream::flatten_unordered`] instead.
499    ///
500    /// ```rust
501    /// # #[cfg(feature = "deploy")] {
502    /// # use hydro_lang::prelude::*;
503    /// # use futures::StreamExt;
504    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
505    /// process
506    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
507    ///     .flatten_ordered()
508    /// # }, |mut stream| async move {
509    /// // 1, 2, 3, 4
510    /// # for w in (1..5) {
511    /// #     assert_eq!(stream.next().await.unwrap(), w);
512    /// # }
513    /// # }));
514    /// # }
515    /// ```
516    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
517    where
518        T: IntoIterator<Item = U>,
519    {
520        self.flat_map_ordered(q!(|d| d))
521    }
522
523    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
524    /// for the element type `T` to produce items in any order.
525    ///
526    /// # Example
527    /// ```rust
528    /// # #[cfg(feature = "deploy")] {
529    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
530    /// # use futures::StreamExt;
531    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
532    /// process
533    ///     .source_iter(q!(vec![
534    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
535    ///         std::collections::HashSet::from_iter(vec![3, 4]),
536    ///     ]))
537    ///     .flatten_unordered()
538    /// # }, |mut stream| async move {
539    /// // 1, 2, 3, 4, but in no particular order
540    /// # let mut results = Vec::new();
541    /// # for w in (1..5) {
542    /// #     results.push(stream.next().await.unwrap());
543    /// # }
544    /// # results.sort();
545    /// # assert_eq!(results, vec![1, 2, 3, 4]);
546    /// # }));
547    /// # }
548    /// ```
549    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
550    where
551        T: IntoIterator<Item = U>,
552    {
553        self.flat_map_unordered(q!(|d| d))
554    }
555
556    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
557    /// `f`, preserving the order of the elements.
558    ///
559    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
560    /// not modify or take ownership of the values. If you need to modify the values while filtering
561    /// use [`Stream::filter_map`] instead.
562    ///
563    /// # Example
564    /// ```rust
565    /// # #[cfg(feature = "deploy")] {
566    /// # use hydro_lang::prelude::*;
567    /// # use futures::StreamExt;
568    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
569    /// process
570    ///     .source_iter(q!(vec![1, 2, 3, 4]))
571    ///     .filter(q!(|&x| x > 2))
572    /// # }, |mut stream| async move {
573    /// // 3, 4
574    /// # for w in (3..5) {
575    /// #     assert_eq!(stream.next().await.unwrap(), w);
576    /// # }
577    /// # }));
578    /// # }
579    /// ```
580    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
581    where
582        F: Fn(&T) -> bool + 'a,
583    {
584        let f = f.splice_fn1_borrow_ctx(&self.location).into();
585        Stream::new(
586            self.location.clone(),
587            HydroNode::Filter {
588                f,
589                input: Box::new(self.ir_node.into_inner()),
590                metadata: self.location.new_node_metadata(Self::collection_kind()),
591            },
592        )
593    }
594
595    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
596    ///
597    /// # Example
598    /// ```rust
599    /// # #[cfg(feature = "deploy")] {
600    /// # use hydro_lang::prelude::*;
601    /// # use futures::StreamExt;
602    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
603    /// process
604    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
605    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
606    /// # }, |mut stream| async move {
607    /// // 1, 2
608    /// # for w in (1..3) {
609    /// #     assert_eq!(stream.next().await.unwrap(), w);
610    /// # }
611    /// # }));
612    /// # }
613    /// ```
614    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
615    where
616        F: Fn(T) -> Option<U> + 'a,
617    {
618        let f = f.splice_fn1_ctx(&self.location).into();
619        Stream::new(
620            self.location.clone(),
621            HydroNode::FilterMap {
622                f,
623                input: Box::new(self.ir_node.into_inner()),
624                metadata: self
625                    .location
626                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
627            },
628        )
629    }
630
631    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
632    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
633    /// If `other` is an empty [`Optional`], no values will be produced.
634    ///
635    /// # Example
636    /// ```rust
637    /// # #[cfg(feature = "deploy")] {
638    /// # use hydro_lang::prelude::*;
639    /// # use futures::StreamExt;
640    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
641    /// let tick = process.tick();
642    /// let batch = process
643    ///   .source_iter(q!(vec![1, 2, 3, 4]))
644    ///   .batch(&tick, nondet!(/** test */));
645    /// let count = batch.clone().count(); // `count()` returns a singleton
646    /// batch.cross_singleton(count).all_ticks()
647    /// # }, |mut stream| async move {
648    /// // (1, 4), (2, 4), (3, 4), (4, 4)
649    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
650    /// #     assert_eq!(stream.next().await.unwrap(), w);
651    /// # }
652    /// # }));
653    /// # }
654    /// ```
655    pub fn cross_singleton<O2>(
656        self,
657        other: impl Into<Optional<O2, L, Bounded>>,
658    ) -> Stream<(T, O2), L, B, O, R>
659    where
660        O2: Clone,
661    {
662        let other: Optional<O2, L, Bounded> = other.into();
663        check_matching_location(&self.location, &other.location);
664
665        Stream::new(
666            self.location.clone(),
667            HydroNode::CrossSingleton {
668                left: Box::new(self.ir_node.into_inner()),
669                right: Box::new(other.ir_node.into_inner()),
670                metadata: self
671                    .location
672                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
673            },
674        )
675    }
676
677    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
678    ///
679    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
680    /// leader of a cluster.
681    ///
682    /// # Example
683    /// ```rust
684    /// # #[cfg(feature = "deploy")] {
685    /// # use hydro_lang::prelude::*;
686    /// # use futures::StreamExt;
687    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
688    /// let tick = process.tick();
689    /// // ticks are lazy by default, forces the second tick to run
690    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
691    ///
692    /// let batch_first_tick = process
693    ///   .source_iter(q!(vec![1, 2, 3, 4]))
694    ///   .batch(&tick, nondet!(/** test */));
695    /// let batch_second_tick = process
696    ///   .source_iter(q!(vec![5, 6, 7, 8]))
697    ///   .batch(&tick, nondet!(/** test */))
698    ///   .defer_tick(); // appears on the second tick
699    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
700    /// batch_first_tick.chain(batch_second_tick)
701    ///   .filter_if_some(some_on_first_tick)
702    ///   .all_ticks()
703    /// # }, |mut stream| async move {
704    /// // [1, 2, 3, 4]
705    /// # for w in vec![1, 2, 3, 4] {
706    /// #     assert_eq!(stream.next().await.unwrap(), w);
707    /// # }
708    /// # }));
709    /// # }
710    /// ```
711    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
712        self.cross_singleton(signal.map(q!(|_u| ())))
713            .map(q!(|(d, _signal)| d))
714    }
715
716    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
717    ///
718    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
719    /// some local state.
720    ///
721    /// # Example
722    /// ```rust
723    /// # #[cfg(feature = "deploy")] {
724    /// # use hydro_lang::prelude::*;
725    /// # use futures::StreamExt;
726    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
727    /// let tick = process.tick();
728    /// // ticks are lazy by default, forces the second tick to run
729    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
730    ///
731    /// let batch_first_tick = process
732    ///   .source_iter(q!(vec![1, 2, 3, 4]))
733    ///   .batch(&tick, nondet!(/** test */));
734    /// let batch_second_tick = process
735    ///   .source_iter(q!(vec![5, 6, 7, 8]))
736    ///   .batch(&tick, nondet!(/** test */))
737    ///   .defer_tick(); // appears on the second tick
738    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
739    /// batch_first_tick.chain(batch_second_tick)
740    ///   .filter_if_none(some_on_first_tick)
741    ///   .all_ticks()
742    /// # }, |mut stream| async move {
743    /// // [5, 6, 7, 8]
744    /// # for w in vec![5, 6, 7, 8] {
745    /// #     assert_eq!(stream.next().await.unwrap(), w);
746    /// # }
747    /// # }));
748    /// # }
749    /// ```
750    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
751        self.filter_if_some(
752            other
753                .map(q!(|_| ()))
754                .into_singleton()
755                .filter(q!(|o| o.is_none())),
756        )
757    }
758
759    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
760    /// tupled pairs in a non-deterministic order.
761    ///
762    /// # Example
763    /// ```rust
764    /// # #[cfg(feature = "deploy")] {
765    /// # use hydro_lang::prelude::*;
766    /// # use std::collections::HashSet;
767    /// # use futures::StreamExt;
768    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769    /// let tick = process.tick();
770    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
771    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
772    /// stream1.cross_product(stream2)
773    /// # }, |mut stream| async move {
774    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
775    /// # stream.map(|i| assert!(expected.contains(&i)));
776    /// # }));
777    /// # }
778    /// ```
779    pub fn cross_product<T2, O2: Ordering>(
780        self,
781        other: Stream<T2, L, B, O2, R>,
782    ) -> Stream<(T, T2), L, B, NoOrder, R>
783    where
784        T: Clone,
785        T2: Clone,
786    {
787        check_matching_location(&self.location, &other.location);
788
789        Stream::new(
790            self.location.clone(),
791            HydroNode::CrossProduct {
792                left: Box::new(self.ir_node.into_inner()),
793                right: Box::new(other.ir_node.into_inner()),
794                metadata: self
795                    .location
796                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
797            },
798        )
799    }
800
801    /// Takes one stream as input and filters out any duplicate occurrences. The output
802    /// contains all unique values from the input.
803    ///
804    /// # Example
805    /// ```rust
806    /// # #[cfg(feature = "deploy")] {
807    /// # use hydro_lang::prelude::*;
808    /// # use futures::StreamExt;
809    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
810    /// let tick = process.tick();
811    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
812    /// # }, |mut stream| async move {
813    /// # for w in vec![1, 2, 3, 4] {
814    /// #     assert_eq!(stream.next().await.unwrap(), w);
815    /// # }
816    /// # }));
817    /// # }
818    /// ```
819    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
820    where
821        T: Eq + Hash,
822    {
823        Stream::new(
824            self.location.clone(),
825            HydroNode::Unique {
826                input: Box::new(self.ir_node.into_inner()),
827                metadata: self
828                    .location
829                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
830            },
831        )
832    }
833
834    /// Outputs everything in this stream that is *not* contained in the `other` stream.
835    ///
836    /// The `other` stream must be [`Bounded`], since this function will wait until
837    /// all its elements are available before producing any output.
838    /// # Example
839    /// ```rust
840    /// # #[cfg(feature = "deploy")] {
841    /// # use hydro_lang::prelude::*;
842    /// # use futures::StreamExt;
843    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
844    /// let tick = process.tick();
845    /// let stream = process
846    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
847    ///   .batch(&tick, nondet!(/** test */));
848    /// let batch = process
849    ///   .source_iter(q!(vec![1, 2]))
850    ///   .batch(&tick, nondet!(/** test */));
851    /// stream.filter_not_in(batch).all_ticks()
852    /// # }, |mut stream| async move {
853    /// # for w in vec![3, 4] {
854    /// #     assert_eq!(stream.next().await.unwrap(), w);
855    /// # }
856    /// # }));
857    /// # }
858    /// ```
859    pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
860    where
861        T: Eq + Hash,
862    {
863        check_matching_location(&self.location, &other.location);
864
865        Stream::new(
866            self.location.clone(),
867            HydroNode::Difference {
868                pos: Box::new(self.ir_node.into_inner()),
869                neg: Box::new(other.ir_node.into_inner()),
870                metadata: self
871                    .location
872                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
873            },
874        )
875    }
876
877    /// An operator which allows you to "inspect" each element of a stream without
878    /// modifying it. The closure `f` is called on a reference to each item. This is
879    /// mainly useful for debugging, and should not be used to generate side-effects.
880    ///
881    /// # Example
882    /// ```rust
883    /// # #[cfg(feature = "deploy")] {
884    /// # use hydro_lang::prelude::*;
885    /// # use futures::StreamExt;
886    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
887    /// let nums = process.source_iter(q!(vec![1, 2]));
888    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
889    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
890    /// # }, |mut stream| async move {
891    /// # for w in vec![1, 2] {
892    /// #     assert_eq!(stream.next().await.unwrap(), w);
893    /// # }
894    /// # }));
895    /// # }
896    /// ```
897    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
898    where
899        F: Fn(&T) + 'a,
900    {
901        let f = f.splice_fn1_borrow_ctx(&self.location).into();
902
903        Stream::new(
904            self.location.clone(),
905            HydroNode::Inspect {
906                f,
907                input: Box::new(self.ir_node.into_inner()),
908                metadata: self.location.new_node_metadata(Self::collection_kind()),
909            },
910        )
911    }
912
913    /// An operator which allows you to "name" a `HydroNode`.
914    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
915    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
916        {
917            let mut node = self.ir_node.borrow_mut();
918            let metadata = node.metadata_mut();
919            metadata.tag = Some(name.to_string());
920        }
921        self
922    }
923
924    /// Explicitly "casts" the stream to a type with a different ordering
925    /// guarantee. Useful in unsafe code where the ordering cannot be proven
926    /// by the type-system.
927    ///
928    /// # Non-Determinism
929    /// This function is used as an escape hatch, and any mistakes in the
930    /// provided ordering guarantee will propagate into the guarantees
931    /// for the rest of the program.
932    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
933        if O::ORDERING_KIND == O2::ORDERING_KIND {
934            Stream::new(self.location, self.ir_node.into_inner())
935        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
936            // We can always weaken the ordering guarantee
937            Stream::new(
938                self.location.clone(),
939                HydroNode::Cast {
940                    inner: Box::new(self.ir_node.into_inner()),
941                    metadata: self
942                        .location
943                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
944                },
945            )
946        } else {
947            Stream::new(
948                self.location.clone(),
949                HydroNode::ObserveNonDet {
950                    inner: Box::new(self.ir_node.into_inner()),
951                    trusted: false,
952                    metadata: self
953                        .location
954                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
955                },
956            )
957        }
958    }
959
960    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
961    // is not observable
962    fn assume_ordering_trusted<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
963        if O::ORDERING_KIND == O2::ORDERING_KIND {
964            Stream::new(self.location, self.ir_node.into_inner())
965        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
966            // We can always weaken the ordering guarantee
967            Stream::new(
968                self.location.clone(),
969                HydroNode::Cast {
970                    inner: Box::new(self.ir_node.into_inner()),
971                    metadata: self
972                        .location
973                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
974                },
975            )
976        } else {
977            Stream::new(
978                self.location.clone(),
979                HydroNode::ObserveNonDet {
980                    inner: Box::new(self.ir_node.into_inner()),
981                    trusted: true,
982                    metadata: self
983                        .location
984                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
985                },
986            )
987        }
988    }
989
990    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
991    /// which is always safe because that is the weakest possible guarantee.
992    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
993        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
994        self.assume_ordering::<NoOrder>(nondet)
995    }
996
997    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
998    /// enforcing that `O2` is weaker than the input ordering guarantee.
999    pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
1000        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1001        self.assume_ordering::<O2>(nondet)
1002    }
1003
1004    /// Explicitly "casts" the stream to a type with a different retries
1005    /// guarantee. Useful in unsafe code where the lack of retries cannot
1006    /// be proven by the type-system.
1007    ///
1008    /// # Non-Determinism
1009    /// This function is used as an escape hatch, and any mistakes in the
1010    /// provided retries guarantee will propagate into the guarantees
1011    /// for the rest of the program.
1012    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1013        if R::RETRIES_KIND == R2::RETRIES_KIND {
1014            Stream::new(self.location, self.ir_node.into_inner())
1015        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1016            // We can always weaken the retries guarantee
1017            Stream::new(
1018                self.location.clone(),
1019                HydroNode::Cast {
1020                    inner: Box::new(self.ir_node.into_inner()),
1021                    metadata: self
1022                        .location
1023                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1024                },
1025            )
1026        } else {
1027            Stream::new(
1028                self.location.clone(),
1029                HydroNode::ObserveNonDet {
1030                    inner: Box::new(self.ir_node.into_inner()),
1031                    trusted: false,
1032                    metadata: self
1033                        .location
1034                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1035                },
1036            )
1037        }
1038    }
1039
1040    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1041    // is not observable
1042    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1043        if R::RETRIES_KIND == R2::RETRIES_KIND {
1044            Stream::new(self.location, self.ir_node.into_inner())
1045        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1046            // We can always weaken the retries guarantee
1047            Stream::new(
1048                self.location.clone(),
1049                HydroNode::Cast {
1050                    inner: Box::new(self.ir_node.into_inner()),
1051                    metadata: self
1052                        .location
1053                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1054                },
1055            )
1056        } else {
1057            Stream::new(
1058                self.location.clone(),
1059                HydroNode::ObserveNonDet {
1060                    inner: Box::new(self.ir_node.into_inner()),
1061                    trusted: true,
1062                    metadata: self
1063                        .location
1064                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1065                },
1066            )
1067        }
1068    }
1069
1070    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1071    /// which is always safe because that is the weakest possible guarantee.
1072    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1073        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1074        self.assume_retries::<AtLeastOnce>(nondet)
1075    }
1076
1077    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1078    /// enforcing that `R2` is weaker than the input retries guarantee.
1079    pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1080        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1081        self.assume_retries::<R2>(nondet)
1082    }
1083}
1084
1085impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1086where
1087    L: Location<'a>,
1088{
1089    /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1090    /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1091    pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1092        self.assume_retries(
1093            nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1094        )
1095    }
1096}
1097
1098impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1099where
1100    L: Location<'a>,
1101{
1102    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1103    ///
1104    /// # Example
1105    /// ```rust
1106    /// # #[cfg(feature = "deploy")] {
1107    /// # use hydro_lang::prelude::*;
1108    /// # use futures::StreamExt;
1109    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1110    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1111    /// # }, |mut stream| async move {
1112    /// // 1, 2, 3
1113    /// # for w in vec![1, 2, 3] {
1114    /// #     assert_eq!(stream.next().await.unwrap(), w);
1115    /// # }
1116    /// # }));
1117    /// # }
1118    /// ```
1119    pub fn cloned(self) -> Stream<T, L, B, O, R>
1120    where
1121        T: Clone,
1122    {
1123        self.map(q!(|d| d.clone()))
1124    }
1125}
1126
1127impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1128where
1129    L: Location<'a>,
1130{
1131    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1132    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1133    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1134    ///
1135    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1136    /// and there may be duplicates.
1137    ///
1138    /// # Example
1139    /// ```rust
1140    /// # #[cfg(feature = "deploy")] {
1141    /// # use hydro_lang::prelude::*;
1142    /// # use futures::StreamExt;
1143    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1144    /// let tick = process.tick();
1145    /// let bools = process.source_iter(q!(vec![false, true, false]));
1146    /// let batch = bools.batch(&tick, nondet!(/** test */));
1147    /// batch
1148    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1149    ///     .all_ticks()
1150    /// # }, |mut stream| async move {
1151    /// // true
1152    /// # assert_eq!(stream.next().await.unwrap(), true);
1153    /// # }));
1154    /// # }
1155    /// ```
1156    pub fn fold_commutative_idempotent<A, I, F>(
1157        self,
1158        init: impl IntoQuotedMut<'a, I, L>,
1159        comb: impl IntoQuotedMut<'a, F, L>,
1160    ) -> Singleton<A, L, B>
1161    where
1162        I: Fn() -> A + 'a,
1163        F: Fn(&mut A, T),
1164    {
1165        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1166        self.assume_ordering(nondet)
1167            .assume_retries(nondet)
1168            .fold(init, comb)
1169    }
1170
1171    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1172    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1173    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1174    /// reference, so that it can be modified in place.
1175    ///
1176    /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1177    /// and there may be duplicates.
1178    ///
1179    /// # Example
1180    /// ```rust
1181    /// # #[cfg(feature = "deploy")] {
1182    /// # use hydro_lang::prelude::*;
1183    /// # use futures::StreamExt;
1184    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1185    /// let tick = process.tick();
1186    /// let bools = process.source_iter(q!(vec![false, true, false]));
1187    /// let batch = bools.batch(&tick, nondet!(/** test */));
1188    /// batch
1189    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1190    ///     .all_ticks()
1191    /// # }, |mut stream| async move {
1192    /// // true
1193    /// # assert_eq!(stream.next().await.unwrap(), true);
1194    /// # }));
1195    /// # }
1196    /// ```
1197    pub fn reduce_commutative_idempotent<F>(
1198        self,
1199        comb: impl IntoQuotedMut<'a, F, L>,
1200    ) -> Optional<T, L, B>
1201    where
1202        F: Fn(&mut T, T) + 'a,
1203    {
1204        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1205        self.assume_retries(nondet).reduce_commutative(comb)
1206    }
1207
1208    // only for internal APIs that have been carefully vetted, will eventually be removed once we
1209    // have algebraic verification of these properties
1210    fn reduce_commutative_idempotent_trusted<F>(
1211        self,
1212        comb: impl IntoQuotedMut<'a, F, L>,
1213    ) -> Optional<T, L, B>
1214    where
1215        F: Fn(&mut T, T) + 'a,
1216    {
1217        self.assume_retries_trusted(nondet!(/** because the closure is trusted idempotent, retries don't affect intermediate states */))
1218            .reduce_commutative_trusted(comb)
1219    }
1220
1221    /// Computes the maximum element in the stream as an [`Optional`], which
1222    /// will be empty until the first element in the input arrives.
1223    ///
1224    /// # Example
1225    /// ```rust
1226    /// # #[cfg(feature = "deploy")] {
1227    /// # use hydro_lang::prelude::*;
1228    /// # use futures::StreamExt;
1229    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1230    /// let tick = process.tick();
1231    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1232    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1233    /// batch.max().all_ticks()
1234    /// # }, |mut stream| async move {
1235    /// // 4
1236    /// # assert_eq!(stream.next().await.unwrap(), 4);
1237    /// # }));
1238    /// # }
1239    /// ```
1240    pub fn max(self) -> Optional<T, L, B>
1241    where
1242        T: Ord,
1243    {
1244        self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1245            if new > *curr {
1246                *curr = new;
1247            }
1248        }))
1249    }
1250
1251    /// Computes the minimum element in the stream as an [`Optional`], which
1252    /// will be empty until the first element in the input arrives.
1253    ///
1254    /// # Example
1255    /// ```rust
1256    /// # #[cfg(feature = "deploy")] {
1257    /// # use hydro_lang::prelude::*;
1258    /// # use futures::StreamExt;
1259    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1260    /// let tick = process.tick();
1261    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1262    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1263    /// batch.min().all_ticks()
1264    /// # }, |mut stream| async move {
1265    /// // 1
1266    /// # assert_eq!(stream.next().await.unwrap(), 1);
1267    /// # }));
1268    /// # }
1269    /// ```
1270    pub fn min(self) -> Optional<T, L, B>
1271    where
1272        T: Ord,
1273    {
1274        self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1275            if new < *curr {
1276                *curr = new;
1277            }
1278        }))
1279    }
1280}
1281
1282impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1283where
1284    L: Location<'a>,
1285{
1286    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1287    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1288    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1289    ///
1290    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1291    ///
1292    /// # Example
1293    /// ```rust
1294    /// # #[cfg(feature = "deploy")] {
1295    /// # use hydro_lang::prelude::*;
1296    /// # use futures::StreamExt;
1297    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1298    /// let tick = process.tick();
1299    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1300    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1301    /// batch
1302    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1303    ///     .all_ticks()
1304    /// # }, |mut stream| async move {
1305    /// // 10
1306    /// # assert_eq!(stream.next().await.unwrap(), 10);
1307    /// # }));
1308    /// # }
1309    /// ```
1310    pub fn fold_commutative<A, I, F>(
1311        self,
1312        init: impl IntoQuotedMut<'a, I, L>,
1313        comb: impl IntoQuotedMut<'a, F, L>,
1314    ) -> Singleton<A, L, B>
1315    where
1316        I: Fn() -> A + 'a,
1317        F: Fn(&mut A, T),
1318    {
1319        let nondet = nondet!(/** the combinator function is commutative */);
1320        self.assume_ordering(nondet).fold(init, comb)
1321    }
1322
1323    /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1324    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1325    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1326    /// reference, so that it can be modified in place.
1327    ///
1328    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1329    ///
1330    /// # Example
1331    /// ```rust
1332    /// # #[cfg(feature = "deploy")] {
1333    /// # use hydro_lang::prelude::*;
1334    /// # use futures::StreamExt;
1335    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1336    /// let tick = process.tick();
1337    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1338    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1339    /// batch
1340    ///     .reduce_commutative(q!(|curr, new| *curr += new))
1341    ///     .all_ticks()
1342    /// # }, |mut stream| async move {
1343    /// // 10
1344    /// # assert_eq!(stream.next().await.unwrap(), 10);
1345    /// # }));
1346    /// # }
1347    /// ```
1348    pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1349    where
1350        F: Fn(&mut T, T) + 'a,
1351    {
1352        let nondet = nondet!(/** the combinator function is commutative */);
1353        self.assume_ordering(nondet).reduce(comb)
1354    }
1355
1356    fn reduce_commutative_trusted<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1357    where
1358        F: Fn(&mut T, T) + 'a,
1359    {
1360        let ordered = if B::BOUNDED {
1361            self.assume_ordering_trusted(nondet!(/** if bounded, there are no intermediate states and output is deterministic because trusted commutative */))
1362        } else {
1363            self.assume_ordering(nondet!(/** if unbounded, ordering affects intermediate states */))
1364        };
1365
1366        ordered.reduce(comb)
1367    }
1368
1369    /// Computes the number of elements in the stream as a [`Singleton`].
1370    ///
1371    /// # Example
1372    /// ```rust
1373    /// # #[cfg(feature = "deploy")] {
1374    /// # use hydro_lang::prelude::*;
1375    /// # use futures::StreamExt;
1376    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1377    /// let tick = process.tick();
1378    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1379    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1380    /// batch.count().all_ticks()
1381    /// # }, |mut stream| async move {
1382    /// // 4
1383    /// # assert_eq!(stream.next().await.unwrap(), 4);
1384    /// # }));
1385    /// # }
1386    /// ```
1387    pub fn count(self) -> Singleton<usize, L, B> {
1388        self.assume_ordering_trusted(nondet!(
1389            /// Order does not affect eventual count, and also does not affect intermediate states.
1390        ))
1391        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1392    }
1393}
1394
1395impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1396where
1397    L: Location<'a>,
1398{
1399    /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1400    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1401    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1402    ///
1403    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1404    ///
1405    /// # Example
1406    /// ```rust
1407    /// # #[cfg(feature = "deploy")] {
1408    /// # use hydro_lang::prelude::*;
1409    /// # use futures::StreamExt;
1410    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1411    /// let tick = process.tick();
1412    /// let bools = process.source_iter(q!(vec![false, true, false]));
1413    /// let batch = bools.batch(&tick, nondet!(/** test */));
1414    /// batch
1415    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1416    ///     .all_ticks()
1417    /// # }, |mut stream| async move {
1418    /// // true
1419    /// # assert_eq!(stream.next().await.unwrap(), true);
1420    /// # }));
1421    /// # }
1422    /// ```
1423    pub fn fold_idempotent<A, I, F>(
1424        self,
1425        init: impl IntoQuotedMut<'a, I, L>,
1426        comb: impl IntoQuotedMut<'a, F, L>,
1427    ) -> Singleton<A, L, B>
1428    where
1429        I: Fn() -> A + 'a,
1430        F: Fn(&mut A, T),
1431    {
1432        let nondet = nondet!(/** the combinator function is idempotent */);
1433        self.assume_retries(nondet).fold(init, comb)
1434    }
1435
1436    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1437    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1438    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1439    /// reference, so that it can be modified in place.
1440    ///
1441    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1442    ///
1443    /// # Example
1444    /// ```rust
1445    /// # #[cfg(feature = "deploy")] {
1446    /// # use hydro_lang::prelude::*;
1447    /// # use futures::StreamExt;
1448    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1449    /// let tick = process.tick();
1450    /// let bools = process.source_iter(q!(vec![false, true, false]));
1451    /// let batch = bools.batch(&tick, nondet!(/** test */));
1452    /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1453    /// # }, |mut stream| async move {
1454    /// // true
1455    /// # assert_eq!(stream.next().await.unwrap(), true);
1456    /// # }));
1457    /// # }
1458    /// ```
1459    pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1460    where
1461        F: Fn(&mut T, T) + 'a,
1462    {
1463        let nondet = nondet!(/** the combinator function is idempotent */);
1464        self.assume_retries(nondet).reduce(comb)
1465    }
1466
1467    /// Computes the first element in the stream as an [`Optional`], which
1468    /// will be empty until the first element in the input arrives.
1469    ///
1470    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1471    /// re-ordering of elements may cause the first element to change.
1472    ///
1473    /// # Example
1474    /// ```rust
1475    /// # #[cfg(feature = "deploy")] {
1476    /// # use hydro_lang::prelude::*;
1477    /// # use futures::StreamExt;
1478    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1479    /// let tick = process.tick();
1480    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1481    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1482    /// batch.first().all_ticks()
1483    /// # }, |mut stream| async move {
1484    /// // 1
1485    /// # assert_eq!(stream.next().await.unwrap(), 1);
1486    /// # }));
1487    /// # }
1488    /// ```
1489    pub fn first(self) -> Optional<T, L, B> {
1490        self.reduce_idempotent(q!(|_, _| {}))
1491    }
1492
1493    /// Computes the last element in the stream as an [`Optional`], which
1494    /// will be empty until an element in the input arrives.
1495    ///
1496    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1497    /// re-ordering of elements may cause the last element to change.
1498    ///
1499    /// # Example
1500    /// ```rust
1501    /// # #[cfg(feature = "deploy")] {
1502    /// # use hydro_lang::prelude::*;
1503    /// # use futures::StreamExt;
1504    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1505    /// let tick = process.tick();
1506    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1507    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1508    /// batch.last().all_ticks()
1509    /// # }, |mut stream| async move {
1510    /// // 4
1511    /// # assert_eq!(stream.next().await.unwrap(), 4);
1512    /// # }));
1513    /// # }
1514    /// ```
1515    pub fn last(self) -> Optional<T, L, B> {
1516        self.reduce_idempotent(q!(|curr, new| *curr = new))
1517    }
1518}
1519
1520impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1521where
1522    L: Location<'a>,
1523{
1524    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1525    ///
1526    /// # Example
1527    /// ```rust
1528    /// # #[cfg(feature = "deploy")] {
1529    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1530    /// # use futures::StreamExt;
1531    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1532    /// let tick = process.tick();
1533    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1534    /// numbers.enumerate()
1535    /// # }, |mut stream| async move {
1536    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1537    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1538    /// #     assert_eq!(stream.next().await.unwrap(), w);
1539    /// # }
1540    /// # }));
1541    /// # }
1542    /// ```
1543    pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1544        Stream::new(
1545            self.location.clone(),
1546            HydroNode::Enumerate {
1547                input: Box::new(self.ir_node.into_inner()),
1548                metadata: self.location.new_node_metadata(Stream::<
1549                    (usize, T),
1550                    L,
1551                    B,
1552                    TotalOrder,
1553                    ExactlyOnce,
1554                >::collection_kind()),
1555            },
1556        )
1557    }
1558
1559    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1560    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1561    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1562    ///
1563    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1564    /// to depend on the order of elements in the stream.
1565    ///
1566    /// # Example
1567    /// ```rust
1568    /// # #[cfg(feature = "deploy")] {
1569    /// # use hydro_lang::prelude::*;
1570    /// # use futures::StreamExt;
1571    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1572    /// let tick = process.tick();
1573    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1574    /// let batch = words.batch(&tick, nondet!(/** test */));
1575    /// batch
1576    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1577    ///     .all_ticks()
1578    /// # }, |mut stream| async move {
1579    /// // "HELLOWORLD"
1580    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1581    /// # }));
1582    /// # }
1583    /// ```
1584    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1585        self,
1586        init: impl IntoQuotedMut<'a, I, L>,
1587        comb: impl IntoQuotedMut<'a, F, L>,
1588    ) -> Singleton<A, L, B> {
1589        let init = init.splice_fn0_ctx(&self.location).into();
1590        let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1591
1592        let core = HydroNode::Fold {
1593            init,
1594            acc: comb,
1595            input: Box::new(self.ir_node.into_inner()),
1596            metadata: self
1597                .location
1598                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1599        };
1600
1601        Singleton::new(self.location, core)
1602    }
1603
1604    /// Collects all the elements of this stream into a single [`Vec`] element.
1605    ///
1606    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1607    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1608    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1609    /// the vector at an arbitrary point in time.
1610    ///
1611    /// # Example
1612    /// ```rust
1613    /// # #[cfg(feature = "deploy")] {
1614    /// # use hydro_lang::prelude::*;
1615    /// # use futures::StreamExt;
1616    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1617    /// let tick = process.tick();
1618    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1619    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1620    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1621    /// # }, |mut stream| async move {
1622    /// // [ vec![1, 2, 3, 4] ]
1623    /// # for w in vec![vec![1, 2, 3, 4]] {
1624    /// #     assert_eq!(stream.next().await.unwrap(), w);
1625    /// # }
1626    /// # }));
1627    /// # }
1628    /// ```
1629    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1630        self.fold(
1631            q!(|| vec![]),
1632            q!(|acc, v| {
1633                acc.push(v);
1634            }),
1635        )
1636    }
1637
1638    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1639    /// and emitting each intermediate result.
1640    ///
1641    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1642    /// containing all intermediate accumulated values. The scan operation can also terminate early
1643    /// by returning `None`.
1644    ///
1645    /// The function takes a mutable reference to the accumulator and the current element, and returns
1646    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1647    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1648    ///
1649    /// # Examples
1650    ///
1651    /// Basic usage - running sum:
1652    /// ```rust
1653    /// # #[cfg(feature = "deploy")] {
1654    /// # use hydro_lang::prelude::*;
1655    /// # use futures::StreamExt;
1656    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1657    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1658    ///     q!(|| 0),
1659    ///     q!(|acc, x| {
1660    ///         *acc += x;
1661    ///         Some(*acc)
1662    ///     }),
1663    /// )
1664    /// # }, |mut stream| async move {
1665    /// // Output: 1, 3, 6, 10
1666    /// # for w in vec![1, 3, 6, 10] {
1667    /// #     assert_eq!(stream.next().await.unwrap(), w);
1668    /// # }
1669    /// # }));
1670    /// # }
1671    /// ```
1672    ///
1673    /// Early termination example:
1674    /// ```rust
1675    /// # #[cfg(feature = "deploy")] {
1676    /// # use hydro_lang::prelude::*;
1677    /// # use futures::StreamExt;
1678    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1679    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1680    ///     q!(|| 1),
1681    ///     q!(|state, x| {
1682    ///         *state = *state * x;
1683    ///         if *state > 6 {
1684    ///             None // Terminate the stream
1685    ///         } else {
1686    ///             Some(-*state)
1687    ///         }
1688    ///     }),
1689    /// )
1690    /// # }, |mut stream| async move {
1691    /// // Output: -1, -2, -6
1692    /// # for w in vec![-1, -2, -6] {
1693    /// #     assert_eq!(stream.next().await.unwrap(), w);
1694    /// # }
1695    /// # }));
1696    /// # }
1697    /// ```
1698    pub fn scan<A, U, I, F>(
1699        self,
1700        init: impl IntoQuotedMut<'a, I, L>,
1701        f: impl IntoQuotedMut<'a, F, L>,
1702    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1703    where
1704        I: Fn() -> A + 'a,
1705        F: Fn(&mut A, T) -> Option<U> + 'a,
1706    {
1707        let init = init.splice_fn0_ctx(&self.location).into();
1708        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1709
1710        Stream::new(
1711            self.location.clone(),
1712            HydroNode::Scan {
1713                init,
1714                acc: f,
1715                input: Box::new(self.ir_node.into_inner()),
1716                metadata: self.location.new_node_metadata(
1717                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1718                ),
1719            },
1720        )
1721    }
1722
1723    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1724    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1725    /// until the first element in the input arrives.
1726    ///
1727    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1728    /// to depend on the order of elements in the stream.
1729    ///
1730    /// # Example
1731    /// ```rust
1732    /// # #[cfg(feature = "deploy")] {
1733    /// # use hydro_lang::prelude::*;
1734    /// # use futures::StreamExt;
1735    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1736    /// let tick = process.tick();
1737    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1738    /// let batch = words.batch(&tick, nondet!(/** test */));
1739    /// batch
1740    ///     .map(q!(|x| x.to_string()))
1741    ///     .reduce(q!(|curr, new| curr.push_str(&new)))
1742    ///     .all_ticks()
1743    /// # }, |mut stream| async move {
1744    /// // "HELLOWORLD"
1745    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1746    /// # }));
1747    /// # }
1748    /// ```
1749    pub fn reduce<F: Fn(&mut T, T) + 'a>(
1750        self,
1751        comb: impl IntoQuotedMut<'a, F, L>,
1752    ) -> Optional<T, L, B> {
1753        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1754        let core = HydroNode::Reduce {
1755            f,
1756            input: Box::new(self.ir_node.into_inner()),
1757            metadata: self
1758                .location
1759                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1760        };
1761
1762        Optional::new(self.location, core)
1763    }
1764}
1765
1766impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1767    /// Produces a new stream that interleaves the elements of the two input streams.
1768    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1769    ///
1770    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1771    /// [`Bounded`], you can use [`Stream::chain`] instead.
1772    ///
1773    /// # Example
1774    /// ```rust
1775    /// # #[cfg(feature = "deploy")] {
1776    /// # use hydro_lang::prelude::*;
1777    /// # use futures::StreamExt;
1778    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1779    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1780    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1781    /// # }, |mut stream| async move {
1782    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1783    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1784    /// #     assert_eq!(stream.next().await.unwrap(), w);
1785    /// # }
1786    /// # }));
1787    /// # }
1788    /// ```
1789    pub fn interleave<O2: Ordering, R2: Retries>(
1790        self,
1791        other: Stream<T, L, Unbounded, O2, R2>,
1792    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1793    where
1794        R: MinRetries<R2>,
1795    {
1796        Stream::new(
1797            self.location.clone(),
1798            HydroNode::Chain {
1799                first: Box::new(self.ir_node.into_inner()),
1800                second: Box::new(other.ir_node.into_inner()),
1801                metadata: self.location.new_node_metadata(Stream::<
1802                    T,
1803                    L,
1804                    Unbounded,
1805                    NoOrder,
1806                    <R as MinRetries<R2>>::Min,
1807                >::collection_kind()),
1808            },
1809        )
1810    }
1811}
1812
1813impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1814    /// Produces a new stream that combines the elements of the two input streams,
1815    /// preserving the relative order of elements within each input.
1816    ///
1817    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1818    /// [`Bounded`], you can use [`Stream::chain`] instead.
1819    ///
1820    /// # Non-Determinism
1821    /// The order in which elements *across* the two streams will be interleaved is
1822    /// non-deterministic, so the order of elements will vary across runs. If the output order
1823    /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1824    /// unordered stream.
1825    ///
1826    /// # Example
1827    /// ```rust
1828    /// # #[cfg(feature = "deploy")] {
1829    /// # use hydro_lang::prelude::*;
1830    /// # use futures::StreamExt;
1831    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1832    /// let numbers = process.source_iter(q!(vec![1, 3]));
1833    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1834    /// # }, |mut stream| async move {
1835    /// // 1, 3 and 2, 4 in some order, preserving the original local order
1836    /// # for w in vec![1, 3, 2, 4] {
1837    /// #     assert_eq!(stream.next().await.unwrap(), w);
1838    /// # }
1839    /// # }));
1840    /// # }
1841    /// ```
1842    pub fn merge_ordered<R2: Retries>(
1843        self,
1844        other: Stream<T, L, Unbounded, TotalOrder, R2>,
1845        _nondet: NonDet,
1846    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1847    where
1848        R: MinRetries<R2>,
1849    {
1850        Stream::new(
1851            self.location.clone(),
1852            HydroNode::Chain {
1853                first: Box::new(self.ir_node.into_inner()),
1854                second: Box::new(other.ir_node.into_inner()),
1855                metadata: self.location.new_node_metadata(Stream::<
1856                    T,
1857                    L,
1858                    Unbounded,
1859                    TotalOrder,
1860                    <R as MinRetries<R2>>::Min,
1861                >::collection_kind()),
1862            },
1863        )
1864    }
1865}
1866
1867impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1868where
1869    L: Location<'a>,
1870{
1871    /// Produces a new stream that emits the input elements in sorted order.
1872    ///
1873    /// The input stream can have any ordering guarantee, but the output stream
1874    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1875    /// elements in the input stream are available, so it requires the input stream
1876    /// to be [`Bounded`].
1877    ///
1878    /// # Example
1879    /// ```rust
1880    /// # #[cfg(feature = "deploy")] {
1881    /// # use hydro_lang::prelude::*;
1882    /// # use futures::StreamExt;
1883    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1884    /// let tick = process.tick();
1885    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1886    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1887    /// batch.sort().all_ticks()
1888    /// # }, |mut stream| async move {
1889    /// // 1, 2, 3, 4
1890    /// # for w in (1..5) {
1891    /// #     assert_eq!(stream.next().await.unwrap(), w);
1892    /// # }
1893    /// # }));
1894    /// # }
1895    /// ```
1896    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1897    where
1898        T: Ord,
1899    {
1900        Stream::new(
1901            self.location.clone(),
1902            HydroNode::Sort {
1903                input: Box::new(self.ir_node.into_inner()),
1904                metadata: self
1905                    .location
1906                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1907            },
1908        )
1909    }
1910
1911    /// Produces a new stream that first emits the elements of the `self` stream,
1912    /// and then emits the elements of the `other` stream. The output stream has
1913    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1914    /// [`TotalOrder`] guarantee.
1915    ///
1916    /// Currently, both input streams must be [`Bounded`]. This operator will block
1917    /// on the first stream until all its elements are available. In a future version,
1918    /// we will relax the requirement on the `other` stream.
1919    ///
1920    /// # Example
1921    /// ```rust
1922    /// # #[cfg(feature = "deploy")] {
1923    /// # use hydro_lang::prelude::*;
1924    /// # use futures::StreamExt;
1925    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1926    /// let tick = process.tick();
1927    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1928    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1929    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1930    /// # }, |mut stream| async move {
1931    /// // 2, 3, 4, 5, 1, 2, 3, 4
1932    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1933    /// #     assert_eq!(stream.next().await.unwrap(), w);
1934    /// # }
1935    /// # }));
1936    /// # }
1937    /// ```
1938    pub fn chain<O2: Ordering, R2: Retries>(
1939        self,
1940        other: Stream<T, L, Bounded, O2, R2>,
1941    ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1942    where
1943        O: MinOrder<O2>,
1944        R: MinRetries<R2>,
1945    {
1946        check_matching_location(&self.location, &other.location);
1947
1948        Stream::new(
1949            self.location.clone(),
1950            HydroNode::Chain {
1951                first: Box::new(self.ir_node.into_inner()),
1952                second: Box::new(other.ir_node.into_inner()),
1953                metadata: self.location.new_node_metadata(Stream::<
1954                    T,
1955                    L,
1956                    Bounded,
1957                    <O as MinOrder<O2>>::Min,
1958                    <R as MinRetries<R2>>::Min,
1959                >::collection_kind()),
1960            },
1961        )
1962    }
1963
1964    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1965    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1966    /// because this is compiled into a nested loop.
1967    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1968        self,
1969        other: Stream<T2, L, Bounded, O2, R>,
1970    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1971    where
1972        T: Clone,
1973        T2: Clone,
1974    {
1975        check_matching_location(&self.location, &other.location);
1976
1977        Stream::new(
1978            self.location.clone(),
1979            HydroNode::CrossProduct {
1980                left: Box::new(self.ir_node.into_inner()),
1981                right: Box::new(other.ir_node.into_inner()),
1982                metadata: self.location.new_node_metadata(Stream::<
1983                    (T, T2),
1984                    L,
1985                    Bounded,
1986                    <O2 as MinOrder<O>>::Min,
1987                    R,
1988                >::collection_kind()),
1989            },
1990        )
1991    }
1992
1993    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1994    /// `self` used as the values for *each* key.
1995    ///
1996    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1997    /// values. For example, it can be used to send the same set of elements to several cluster
1998    /// members, if the membership information is available as a [`KeyedSingleton`].
1999    ///
2000    /// # Example
2001    /// ```rust
2002    /// # #[cfg(feature = "deploy")] {
2003    /// # use hydro_lang::prelude::*;
2004    /// # use futures::StreamExt;
2005    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2006    /// # let tick = process.tick();
2007    /// let keyed_singleton = // { 1: (), 2: () }
2008    /// # process
2009    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2010    /// #     .into_keyed()
2011    /// #     .batch(&tick, nondet!(/** test */))
2012    /// #     .first();
2013    /// let stream = // [ "a", "b" ]
2014    /// # process
2015    /// #     .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
2016    /// #     .batch(&tick, nondet!(/** test */));
2017    /// stream.repeat_with_keys(keyed_singleton)
2018    /// # .entries().all_ticks()
2019    /// # }, |mut stream| async move {
2020    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2021    /// # let mut results = Vec::new();
2022    /// # for _ in 0..4 {
2023    /// #     results.push(stream.next().await.unwrap());
2024    /// # }
2025    /// # results.sort();
2026    /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
2027    /// # }));
2028    /// # }
2029    /// ```
2030    pub fn repeat_with_keys<K, V2>(
2031        self,
2032        keys: KeyedSingleton<K, V2, L, Bounded>,
2033    ) -> KeyedStream<K, T, L, Bounded, O, R>
2034    where
2035        K: Clone,
2036        T: Clone,
2037    {
2038        keys.keys()
2039            .weaken_retries()
2040            .assume_ordering_trusted::<TotalOrder>(
2041                nondet!(/** keyed stream does not depend on ordering of keys */),
2042            )
2043            .cross_product_nested_loop(self)
2044            .into_keyed()
2045    }
2046}
2047
2048impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2049where
2050    L: Location<'a>,
2051{
2052    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2053    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2054    /// by equi-joining the two streams on the key attribute `K`.
2055    ///
2056    /// # Example
2057    /// ```rust
2058    /// # #[cfg(feature = "deploy")] {
2059    /// # use hydro_lang::prelude::*;
2060    /// # use std::collections::HashSet;
2061    /// # use futures::StreamExt;
2062    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2063    /// let tick = process.tick();
2064    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2065    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2066    /// stream1.join(stream2)
2067    /// # }, |mut stream| async move {
2068    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2069    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2070    /// # stream.map(|i| assert!(expected.contains(&i)));
2071    /// # }));
2072    /// # }
2073    pub fn join<V2, O2: Ordering, R2: Retries>(
2074        self,
2075        n: Stream<(K, V2), L, B, O2, R2>,
2076    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2077    where
2078        K: Eq + Hash,
2079        R: MinRetries<R2>,
2080    {
2081        check_matching_location(&self.location, &n.location);
2082
2083        Stream::new(
2084            self.location.clone(),
2085            HydroNode::Join {
2086                left: Box::new(self.ir_node.into_inner()),
2087                right: Box::new(n.ir_node.into_inner()),
2088                metadata: self.location.new_node_metadata(Stream::<
2089                    (K, (V1, V2)),
2090                    L,
2091                    B,
2092                    NoOrder,
2093                    <R as MinRetries<R2>>::Min,
2094                >::collection_kind()),
2095            },
2096        )
2097    }
2098
2099    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2100    /// computes the anti-join of the items in the input -- i.e. returns
2101    /// unique items in the first input that do not have a matching key
2102    /// in the second input.
2103    ///
2104    /// # Example
2105    /// ```rust
2106    /// # #[cfg(feature = "deploy")] {
2107    /// # use hydro_lang::prelude::*;
2108    /// # use futures::StreamExt;
2109    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2110    /// let tick = process.tick();
2111    /// let stream = process
2112    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2113    ///   .batch(&tick, nondet!(/** test */));
2114    /// let batch = process
2115    ///   .source_iter(q!(vec![1, 2]))
2116    ///   .batch(&tick, nondet!(/** test */));
2117    /// stream.anti_join(batch).all_ticks()
2118    /// # }, |mut stream| async move {
2119    /// # for w in vec![(3, 'c'), (4, 'd')] {
2120    /// #     assert_eq!(stream.next().await.unwrap(), w);
2121    /// # }
2122    /// # }));
2123    /// # }
2124    pub fn anti_join<O2: Ordering, R2: Retries>(
2125        self,
2126        n: Stream<K, L, Bounded, O2, R2>,
2127    ) -> Stream<(K, V1), L, B, O, R>
2128    where
2129        K: Eq + Hash,
2130    {
2131        check_matching_location(&self.location, &n.location);
2132
2133        Stream::new(
2134            self.location.clone(),
2135            HydroNode::AntiJoin {
2136                pos: Box::new(self.ir_node.into_inner()),
2137                neg: Box::new(n.ir_node.into_inner()),
2138                metadata: self
2139                    .location
2140                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2141            },
2142        )
2143    }
2144}
2145
2146impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2147    Stream<(K, V), L, B, O, R>
2148{
2149    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2150    /// is used as the key and the second element is added to the entries associated with that key.
2151    ///
2152    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2153    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2154    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2155    /// total ordering _within_ each group but no ordering _across_ groups.
2156    ///
2157    /// # Example
2158    /// ```rust
2159    /// # #[cfg(feature = "deploy")] {
2160    /// # use hydro_lang::prelude::*;
2161    /// # use futures::StreamExt;
2162    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2163    /// process
2164    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2165    ///     .into_keyed()
2166    /// #   .entries()
2167    /// # }, |mut stream| async move {
2168    /// // { 1: [2, 3], 2: [4] }
2169    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2170    /// #     assert_eq!(stream.next().await.unwrap(), w);
2171    /// # }
2172    /// # }));
2173    /// # }
2174    /// ```
2175    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2176        KeyedStream::new(
2177            self.location.clone(),
2178            HydroNode::Cast {
2179                inner: Box::new(self.ir_node.into_inner()),
2180                metadata: self
2181                    .location
2182                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2183            },
2184        )
2185    }
2186}
2187
2188impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
2189where
2190    K: Eq + Hash,
2191    L: Location<'a>,
2192{
2193    #[deprecated = "use .into_keyed().fold(...) instead"]
2194    /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2195    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2196    /// in the second element are accumulated via the `comb` closure.
2197    ///
2198    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2199    /// to depend on the order of elements in the stream.
2200    ///
2201    /// If the input and output value types are the same and do not require initialization then use
2202    /// [`Stream::reduce_keyed`].
2203    ///
2204    /// # Example
2205    /// ```rust
2206    /// # #[cfg(feature = "deploy")] {
2207    /// # use hydro_lang::prelude::*;
2208    /// # use futures::StreamExt;
2209    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2210    /// let tick = process.tick();
2211    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2212    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2213    /// batch
2214    ///     .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
2215    ///     .all_ticks()
2216    /// # }, |mut stream| async move {
2217    /// // (1, 5), (2, 7)
2218    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2219    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2220    /// # }));
2221    /// # }
2222    /// ```
2223    pub fn fold_keyed<A, I, F>(
2224        self,
2225        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2226        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2227    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2228    where
2229        I: Fn() -> A + 'a,
2230        F: Fn(&mut A, V) + 'a,
2231    {
2232        self.into_keyed().fold(init, comb).entries()
2233    }
2234
2235    #[deprecated = "use .into_keyed().reduce(...) instead"]
2236    /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2237    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2238    /// in the second element are accumulated via the `comb` closure.
2239    ///
2240    /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2241    /// to depend on the order of elements in the stream.
2242    ///
2243    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2244    ///
2245    /// # Example
2246    /// ```rust
2247    /// # #[cfg(feature = "deploy")] {
2248    /// # use hydro_lang::prelude::*;
2249    /// # use futures::StreamExt;
2250    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2251    /// let tick = process.tick();
2252    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2253    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2254    /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2255    /// # }, |mut stream| async move {
2256    /// // (1, 5), (2, 7)
2257    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2258    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2259    /// # }));
2260    /// # }
2261    /// ```
2262    pub fn reduce_keyed<F>(
2263        self,
2264        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2265    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2266    where
2267        F: Fn(&mut V, V) + 'a,
2268    {
2269        let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2270
2271        Stream::new(
2272            self.location.clone(),
2273            HydroNode::ReduceKeyed {
2274                f,
2275                input: Box::new(self.ir_node.into_inner()),
2276                metadata: self.location.new_node_metadata(Stream::<
2277                    (K, V),
2278                    Tick<L>,
2279                    Bounded,
2280                    NoOrder,
2281                    ExactlyOnce,
2282                >::collection_kind()),
2283            },
2284        )
2285    }
2286}
2287
2288impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2289where
2290    K: Eq + Hash,
2291    L: Location<'a>,
2292{
2293    #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2294    /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2295    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2296    /// in the second element are accumulated via the `comb` closure.
2297    ///
2298    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2299    /// as there may be non-deterministic duplicates.
2300    ///
2301    /// If the input and output value types are the same and do not require initialization then use
2302    /// [`Stream::reduce_keyed_commutative_idempotent`].
2303    ///
2304    /// # Example
2305    /// ```rust
2306    /// # #[cfg(feature = "deploy")] {
2307    /// # use hydro_lang::prelude::*;
2308    /// # use futures::StreamExt;
2309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2310    /// let tick = process.tick();
2311    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2312    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2313    /// batch
2314    ///     .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2315    ///     .all_ticks()
2316    /// # }, |mut stream| async move {
2317    /// // (1, false), (2, true)
2318    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2319    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2320    /// # }));
2321    /// # }
2322    /// ```
2323    pub fn fold_keyed_commutative_idempotent<A, I, F>(
2324        self,
2325        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2326        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2327    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2328    where
2329        I: Fn() -> A + 'a,
2330        F: Fn(&mut A, V) + 'a,
2331    {
2332        self.into_keyed()
2333            .fold_commutative_idempotent(init, comb)
2334            .entries()
2335    }
2336
2337    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2338    /// # Example
2339    /// ```rust
2340    /// # #[cfg(feature = "deploy")] {
2341    /// # use hydro_lang::prelude::*;
2342    /// # use futures::StreamExt;
2343    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2344    /// let tick = process.tick();
2345    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2346    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2347    /// batch.keys().all_ticks()
2348    /// # }, |mut stream| async move {
2349    /// // 1, 2
2350    /// # assert_eq!(stream.next().await.unwrap(), 1);
2351    /// # assert_eq!(stream.next().await.unwrap(), 2);
2352    /// # }));
2353    /// # }
2354    /// ```
2355    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2356        self.into_keyed()
2357            .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2358            .keys()
2359    }
2360
2361    #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2362    /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2363    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2364    /// in the second element are accumulated via the `comb` closure.
2365    ///
2366    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2367    /// as there may be non-deterministic duplicates.
2368    ///
2369    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2370    ///
2371    /// # Example
2372    /// ```rust
2373    /// # #[cfg(feature = "deploy")] {
2374    /// # use hydro_lang::prelude::*;
2375    /// # use futures::StreamExt;
2376    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2377    /// let tick = process.tick();
2378    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2379    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2380    /// batch
2381    ///     .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2382    ///     .all_ticks()
2383    /// # }, |mut stream| async move {
2384    /// // (1, false), (2, true)
2385    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2386    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2387    /// # }));
2388    /// # }
2389    /// ```
2390    pub fn reduce_keyed_commutative_idempotent<F>(
2391        self,
2392        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2393    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2394    where
2395        F: Fn(&mut V, V) + 'a,
2396    {
2397        self.into_keyed()
2398            .reduce_commutative_idempotent(comb)
2399            .entries()
2400    }
2401}
2402
2403impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2404where
2405    K: Eq + Hash,
2406    L: Location<'a>,
2407{
2408    #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2409    /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2410    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2411    /// in the second element are accumulated via the `comb` closure.
2412    ///
2413    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2414    ///
2415    /// If the input and output value types are the same and do not require initialization then use
2416    /// [`Stream::reduce_keyed_commutative`].
2417    ///
2418    /// # Example
2419    /// ```rust
2420    /// # #[cfg(feature = "deploy")] {
2421    /// # use hydro_lang::prelude::*;
2422    /// # use futures::StreamExt;
2423    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2424    /// let tick = process.tick();
2425    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2426    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2427    /// batch
2428    ///     .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2429    ///     .all_ticks()
2430    /// # }, |mut stream| async move {
2431    /// // (1, 5), (2, 7)
2432    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2433    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2434    /// # }));
2435    /// # }
2436    /// ```
2437    pub fn fold_keyed_commutative<A, I, F>(
2438        self,
2439        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2440        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2441    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2442    where
2443        I: Fn() -> A + 'a,
2444        F: Fn(&mut A, V) + 'a,
2445    {
2446        self.into_keyed().fold_commutative(init, comb).entries()
2447    }
2448
2449    #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2450    /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2451    /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2452    /// in the second element are accumulated via the `comb` closure.
2453    ///
2454    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2455    ///
2456    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2457    ///
2458    /// # Example
2459    /// ```rust
2460    /// # #[cfg(feature = "deploy")] {
2461    /// # use hydro_lang::prelude::*;
2462    /// # use futures::StreamExt;
2463    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2464    /// let tick = process.tick();
2465    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2466    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2467    /// batch
2468    ///     .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2469    ///     .all_ticks()
2470    /// # }, |mut stream| async move {
2471    /// // (1, 5), (2, 7)
2472    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2473    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2474    /// # }));
2475    /// # }
2476    /// ```
2477    pub fn reduce_keyed_commutative<F>(
2478        self,
2479        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2480    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2481    where
2482        F: Fn(&mut V, V) + 'a,
2483    {
2484        self.into_keyed().reduce_commutative(comb).entries()
2485    }
2486}
2487
2488impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2489where
2490    K: Eq + Hash,
2491    L: Location<'a>,
2492{
2493    #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2494    /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2495    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2496    /// in the second element are accumulated via the `comb` closure.
2497    ///
2498    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2499    ///
2500    /// If the input and output value types are the same and do not require initialization then use
2501    /// [`Stream::reduce_keyed_idempotent`].
2502    ///
2503    /// # Example
2504    /// ```rust
2505    /// # #[cfg(feature = "deploy")] {
2506    /// # use hydro_lang::prelude::*;
2507    /// # use futures::StreamExt;
2508    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2509    /// let tick = process.tick();
2510    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2511    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2512    /// batch
2513    ///     .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2514    ///     .all_ticks()
2515    /// # }, |mut stream| async move {
2516    /// // (1, false), (2, true)
2517    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2518    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2519    /// # }));
2520    /// # }
2521    /// ```
2522    pub fn fold_keyed_idempotent<A, I, F>(
2523        self,
2524        init: impl IntoQuotedMut<'a, I, Tick<L>>,
2525        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2526    ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2527    where
2528        I: Fn() -> A + 'a,
2529        F: Fn(&mut A, V) + 'a,
2530    {
2531        self.into_keyed().fold_idempotent(init, comb).entries()
2532    }
2533
2534    #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2535    /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2536    /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2537    /// in the second element are accumulated via the `comb` closure.
2538    ///
2539    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2540    ///
2541    /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2542    ///
2543    /// # Example
2544    /// ```rust
2545    /// # #[cfg(feature = "deploy")] {
2546    /// # use hydro_lang::prelude::*;
2547    /// # use futures::StreamExt;
2548    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2549    /// let tick = process.tick();
2550    /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2551    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2552    /// batch
2553    ///     .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2554    ///     .all_ticks()
2555    /// # }, |mut stream| async move {
2556    /// // (1, false), (2, true)
2557    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2558    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2559    /// # }));
2560    /// # }
2561    /// ```
2562    pub fn reduce_keyed_idempotent<F>(
2563        self,
2564        comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2565    ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2566    where
2567        F: Fn(&mut V, V) + 'a,
2568    {
2569        self.into_keyed().reduce_idempotent(comb).entries()
2570    }
2571}
2572
2573impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2574where
2575    L: Location<'a> + NoTick,
2576{
2577    /// Returns a stream corresponding to the latest batch of elements being atomically
2578    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2579    /// the order of the input.
2580    ///
2581    /// # Non-Determinism
2582    /// The batch boundaries are non-deterministic and may change across executions.
2583    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2584        Stream::new(
2585            self.location.clone().tick,
2586            HydroNode::Batch {
2587                inner: Box::new(self.ir_node.into_inner()),
2588                metadata: self
2589                    .location
2590                    .tick
2591                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2592            },
2593        )
2594    }
2595
2596    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2597    /// See [`Stream::atomic`] for more details.
2598    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2599        Stream::new(
2600            self.location.tick.l.clone(),
2601            HydroNode::EndAtomic {
2602                inner: Box::new(self.ir_node.into_inner()),
2603                metadata: self
2604                    .location
2605                    .tick
2606                    .l
2607                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2608            },
2609        )
2610    }
2611}
2612
2613impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2614where
2615    L: Location<'a>,
2616{
2617    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2618    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2619    ///
2620    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2621    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2622    /// argument that declares where the stream will be atomically processed. Batching a stream into
2623    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2624    /// [`Tick`] will introduce asynchrony.
2625    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2626        let out_location = Atomic { tick: tick.clone() };
2627        Stream::new(
2628            out_location.clone(),
2629            HydroNode::BeginAtomic {
2630                inner: Box::new(self.ir_node.into_inner()),
2631                metadata: out_location
2632                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2633            },
2634        )
2635    }
2636
2637    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2638    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2639    /// the order of the input. The output stream will execute in the [`Tick`] that was
2640    /// used to create the atomic section.
2641    ///
2642    /// # Non-Determinism
2643    /// The batch boundaries are non-deterministic and may change across executions.
2644    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2645        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2646        Stream::new(
2647            tick.clone(),
2648            HydroNode::Batch {
2649                inner: Box::new(self.ir_node.into_inner()),
2650                metadata: tick
2651                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2652            },
2653        )
2654    }
2655
2656    /// Given a time interval, returns a stream corresponding to samples taken from the
2657    /// stream roughly at that interval. The output will have elements in the same order
2658    /// as the input, but with arbitrary elements skipped between samples. There is also
2659    /// no guarantee on the exact timing of the samples.
2660    ///
2661    /// # Non-Determinism
2662    /// The output stream is non-deterministic in which elements are sampled, since this
2663    /// is controlled by a clock.
2664    pub fn sample_every(
2665        self,
2666        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2667        nondet: NonDet,
2668    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2669    where
2670        L: NoTick + NoAtomic,
2671    {
2672        let samples = self.location.source_interval(interval, nondet);
2673
2674        let tick = self.location.tick();
2675        self.batch(&tick, nondet)
2676            .filter_if_some(samples.batch(&tick, nondet).first())
2677            .all_ticks()
2678            .weakest_retries()
2679    }
2680
2681    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
2682    /// stream has not emitted a value since that duration.
2683    ///
2684    /// # Non-Determinism
2685    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2686    /// samples take place, timeouts may be non-deterministically generated or missed,
2687    /// and the notification of the timeout may be delayed as well. There is also no
2688    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2689    /// detected based on when the next sample is taken.
2690    pub fn timeout(
2691        self,
2692        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2693        nondet: NonDet,
2694    ) -> Optional<(), L, Unbounded>
2695    where
2696        L: NoTick + NoAtomic,
2697    {
2698        let tick = self.location.tick();
2699
2700        let latest_received = self.assume_retries(nondet).fold_commutative(
2701            q!(|| None),
2702            q!(|latest, _| {
2703                *latest = Some(Instant::now());
2704            }),
2705        );
2706
2707        latest_received
2708            .snapshot(&tick, nondet)
2709            .filter_map(q!(move |latest_received| {
2710                if let Some(latest_received) = latest_received {
2711                    if Instant::now().duration_since(latest_received) > duration {
2712                        Some(())
2713                    } else {
2714                        None
2715                    }
2716                } else {
2717                    Some(())
2718                }
2719            }))
2720            .latest()
2721    }
2722}
2723
2724impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2725where
2726    L: Location<'a> + NoTick + NoAtomic,
2727    F: Future<Output = T>,
2728{
2729    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2730    /// Future outputs are produced as available, regardless of input arrival order.
2731    ///
2732    /// # Example
2733    /// ```rust
2734    /// # #[cfg(feature = "deploy")] {
2735    /// # use std::collections::HashSet;
2736    /// # use futures::StreamExt;
2737    /// # use hydro_lang::prelude::*;
2738    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2739    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2740    ///     .map(q!(|x| async move {
2741    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2742    ///         x
2743    ///     }))
2744    ///     .resolve_futures()
2745    /// #   },
2746    /// #   |mut stream| async move {
2747    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2748    /// #       let mut output = HashSet::new();
2749    /// #       for _ in 1..10 {
2750    /// #           output.insert(stream.next().await.unwrap());
2751    /// #       }
2752    /// #       assert_eq!(
2753    /// #           output,
2754    /// #           HashSet::<i32>::from_iter(1..10)
2755    /// #       );
2756    /// #   },
2757    /// # ));
2758    /// # }
2759    pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2760        Stream::new(
2761            self.location.clone(),
2762            HydroNode::ResolveFutures {
2763                input: Box::new(self.ir_node.into_inner()),
2764                metadata: self
2765                    .location
2766                    .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2767            },
2768        )
2769    }
2770
2771    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2772    /// Future outputs are produced in the same order as the input stream.
2773    ///
2774    /// # Example
2775    /// ```rust
2776    /// # #[cfg(feature = "deploy")] {
2777    /// # use std::collections::HashSet;
2778    /// # use futures::StreamExt;
2779    /// # use hydro_lang::prelude::*;
2780    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2781    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2782    ///     .map(q!(|x| async move {
2783    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2784    ///         x
2785    ///     }))
2786    ///     .resolve_futures_ordered()
2787    /// #   },
2788    /// #   |mut stream| async move {
2789    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2790    /// #       let mut output = Vec::new();
2791    /// #       for _ in 1..10 {
2792    /// #           output.push(stream.next().await.unwrap());
2793    /// #       }
2794    /// #       assert_eq!(
2795    /// #           output,
2796    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2797    /// #       );
2798    /// #   },
2799    /// # ));
2800    /// # }
2801    pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2802        Stream::new(
2803            self.location.clone(),
2804            HydroNode::ResolveFuturesOrdered {
2805                input: Box::new(self.ir_node.into_inner()),
2806                metadata: self
2807                    .location
2808                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2809            },
2810        )
2811    }
2812}
2813
2814impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2815where
2816    L: Location<'a> + NoTick,
2817{
2818    /// Executes the provided closure for every element in this stream.
2819    ///
2820    /// Because the closure may have side effects, the stream must have deterministic order
2821    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2822    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2823    /// [`Stream::assume_retries`] with an explanation for why this is the case.
2824    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2825        let f = f.splice_fn1_ctx(&self.location).into();
2826        self.location
2827            .flow_state()
2828            .borrow_mut()
2829            .push_root(HydroRoot::ForEach {
2830                input: Box::new(self.ir_node.into_inner()),
2831                f,
2832                op_metadata: HydroIrOpMetadata::new(),
2833            });
2834    }
2835
2836    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2837    /// TCP socket to some other server. You should _not_ use this API for interacting with
2838    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2839    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2840    /// interaction with asynchronous sinks.
2841    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2842    where
2843        S: 'a + futures::Sink<T> + Unpin,
2844    {
2845        self.location
2846            .flow_state()
2847            .borrow_mut()
2848            .push_root(HydroRoot::DestSink {
2849                sink: sink.splice_typed_ctx(&self.location).into(),
2850                input: Box::new(self.ir_node.into_inner()),
2851                op_metadata: HydroIrOpMetadata::new(),
2852            });
2853    }
2854}
2855
2856impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2857where
2858    L: Location<'a>,
2859{
2860    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2861    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2862    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2863        Stream::new(
2864            self.location.outer().clone(),
2865            HydroNode::YieldConcat {
2866                inner: Box::new(self.ir_node.into_inner()),
2867                metadata: self
2868                    .location
2869                    .outer()
2870                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2871            },
2872        )
2873    }
2874
2875    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2876    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2877    ///
2878    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2879    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2880    /// stream's [`Tick`] context.
2881    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2882        let out_location = Atomic {
2883            tick: self.location.clone(),
2884        };
2885
2886        Stream::new(
2887            out_location.clone(),
2888            HydroNode::YieldConcat {
2889                inner: Box::new(self.ir_node.into_inner()),
2890                metadata: out_location
2891                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2892            },
2893        )
2894    }
2895
2896    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2897    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2898    /// input.
2899    ///
2900    /// This API is particularly useful for stateful computation on batches of data, such as
2901    /// maintaining an accumulated state that is up to date with the current batch.
2902    ///
2903    /// # Example
2904    /// ```rust
2905    /// # #[cfg(feature = "deploy")] {
2906    /// # use hydro_lang::prelude::*;
2907    /// # use futures::StreamExt;
2908    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2909    /// let tick = process.tick();
2910    /// # // ticks are lazy by default, forces the second tick to run
2911    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2912    /// # let batch_first_tick = process
2913    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2914    /// #  .batch(&tick, nondet!(/** test */));
2915    /// # let batch_second_tick = process
2916    /// #   .source_iter(q!(vec![5, 6, 7]))
2917    /// #   .batch(&tick, nondet!(/** test */))
2918    /// #   .defer_tick(); // appears on the second tick
2919    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2920    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2921    ///
2922    /// input.batch(&tick, nondet!(/** test */))
2923    ///     .across_ticks(|s| s.count()).all_ticks()
2924    /// # }, |mut stream| async move {
2925    /// // [4, 7]
2926    /// assert_eq!(stream.next().await.unwrap(), 4);
2927    /// assert_eq!(stream.next().await.unwrap(), 7);
2928    /// # }));
2929    /// # }
2930    /// ```
2931    pub fn across_ticks<Out: BatchAtomic>(
2932        self,
2933        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2934    ) -> Out::Batched {
2935        thunk(self.all_ticks_atomic()).batched_atomic()
2936    }
2937
2938    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2939    /// always has the elements of `self` at tick `T - 1`.
2940    ///
2941    /// At tick `0`, the output stream is empty, since there is no previous tick.
2942    ///
2943    /// This operator enables stateful iterative processing with ticks, by sending data from one
2944    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2945    ///
2946    /// # Example
2947    /// ```rust
2948    /// # #[cfg(feature = "deploy")] {
2949    /// # use hydro_lang::prelude::*;
2950    /// # use futures::StreamExt;
2951    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2952    /// let tick = process.tick();
2953    /// // ticks are lazy by default, forces the second tick to run
2954    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2955    ///
2956    /// let batch_first_tick = process
2957    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2958    ///   .batch(&tick, nondet!(/** test */));
2959    /// let batch_second_tick = process
2960    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2961    ///   .batch(&tick, nondet!(/** test */))
2962    ///   .defer_tick(); // appears on the second tick
2963    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2964    ///
2965    /// changes_across_ticks.clone().filter_not_in(
2966    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2967    /// ).all_ticks()
2968    /// # }, |mut stream| async move {
2969    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2970    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2971    /// #     assert_eq!(stream.next().await.unwrap(), w);
2972    /// # }
2973    /// # }));
2974    /// # }
2975    /// ```
2976    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2977        Stream::new(
2978            self.location.clone(),
2979            HydroNode::DeferTick {
2980                input: Box::new(self.ir_node.into_inner()),
2981                metadata: self
2982                    .location
2983                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2984            },
2985        )
2986    }
2987}
2988
2989#[cfg(test)]
2990mod tests {
2991    #[cfg(feature = "deploy")]
2992    use futures::{SinkExt, StreamExt};
2993    #[cfg(feature = "deploy")]
2994    use hydro_deploy::Deployment;
2995    #[cfg(feature = "deploy")]
2996    use serde::{Deserialize, Serialize};
2997    #[cfg(feature = "deploy")]
2998    use stageleft::q;
2999
3000    #[cfg(any(feature = "deploy", feature = "sim"))]
3001    use crate::compile::builder::FlowBuilder;
3002    #[cfg(feature = "deploy")]
3003    use crate::live_collections::stream::ExactlyOnce;
3004    #[cfg(feature = "sim")]
3005    use crate::live_collections::stream::NoOrder;
3006    #[cfg(any(feature = "deploy", feature = "sim"))]
3007    use crate::live_collections::stream::TotalOrder;
3008    #[cfg(any(feature = "deploy", feature = "sim"))]
3009    use crate::location::Location;
3010    #[cfg(any(feature = "deploy", feature = "sim"))]
3011    use crate::nondet::nondet;
3012
3013    mod backtrace_chained_ops;
3014
3015    #[cfg(feature = "deploy")]
3016    struct P1 {}
3017    #[cfg(feature = "deploy")]
3018    struct P2 {}
3019
3020    #[cfg(feature = "deploy")]
3021    #[derive(Serialize, Deserialize, Debug)]
3022    struct SendOverNetwork {
3023        n: u32,
3024    }
3025
3026    #[cfg(feature = "deploy")]
3027    #[tokio::test]
3028    async fn first_ten_distributed() {
3029        let mut deployment = Deployment::new();
3030
3031        let flow = FlowBuilder::new();
3032        let first_node = flow.process::<P1>();
3033        let second_node = flow.process::<P2>();
3034        let external = flow.external::<P2>();
3035
3036        let numbers = first_node.source_iter(q!(0..10));
3037        let out_port = numbers
3038            .map(q!(|n| SendOverNetwork { n }))
3039            .send_bincode(&second_node)
3040            .send_bincode_external(&external);
3041
3042        let nodes = flow
3043            .with_process(&first_node, deployment.Localhost())
3044            .with_process(&second_node, deployment.Localhost())
3045            .with_external(&external, deployment.Localhost())
3046            .deploy(&mut deployment);
3047
3048        deployment.deploy().await.unwrap();
3049
3050        let mut external_out = nodes.connect(out_port).await;
3051
3052        deployment.start().await.unwrap();
3053
3054        for i in 0..10 {
3055            assert_eq!(external_out.next().await.unwrap().n, i);
3056        }
3057    }
3058
3059    #[cfg(feature = "deploy")]
3060    #[tokio::test]
3061    async fn first_cardinality() {
3062        let mut deployment = Deployment::new();
3063
3064        let flow = FlowBuilder::new();
3065        let node = flow.process::<()>();
3066        let external = flow.external::<()>();
3067
3068        let node_tick = node.tick();
3069        let count = node_tick
3070            .singleton(q!([1, 2, 3]))
3071            .into_stream()
3072            .flatten_ordered()
3073            .first()
3074            .into_stream()
3075            .count()
3076            .all_ticks()
3077            .send_bincode_external(&external);
3078
3079        let nodes = flow
3080            .with_process(&node, deployment.Localhost())
3081            .with_external(&external, deployment.Localhost())
3082            .deploy(&mut deployment);
3083
3084        deployment.deploy().await.unwrap();
3085
3086        let mut external_out = nodes.connect(count).await;
3087
3088        deployment.start().await.unwrap();
3089
3090        assert_eq!(external_out.next().await.unwrap(), 1);
3091    }
3092
3093    #[cfg(feature = "deploy")]
3094    #[tokio::test]
3095    async fn unbounded_reduce_remembers_state() {
3096        let mut deployment = Deployment::new();
3097
3098        let flow = FlowBuilder::new();
3099        let node = flow.process::<()>();
3100        let external = flow.external::<()>();
3101
3102        let (input_port, input) = node.source_external_bincode(&external);
3103        let out = input
3104            .reduce(q!(|acc, v| *acc += v))
3105            .sample_eager(nondet!(/** test */))
3106            .send_bincode_external(&external);
3107
3108        let nodes = flow
3109            .with_process(&node, deployment.Localhost())
3110            .with_external(&external, deployment.Localhost())
3111            .deploy(&mut deployment);
3112
3113        deployment.deploy().await.unwrap();
3114
3115        let mut external_in = nodes.connect(input_port).await;
3116        let mut external_out = nodes.connect(out).await;
3117
3118        deployment.start().await.unwrap();
3119
3120        external_in.send(1).await.unwrap();
3121        assert_eq!(external_out.next().await.unwrap(), 1);
3122
3123        external_in.send(2).await.unwrap();
3124        assert_eq!(external_out.next().await.unwrap(), 3);
3125    }
3126
3127    #[cfg(feature = "deploy")]
3128    #[tokio::test]
3129    async fn atomic_fold_replays_each_tick() {
3130        let mut deployment = Deployment::new();
3131
3132        let flow = FlowBuilder::new();
3133        let node = flow.process::<()>();
3134        let external = flow.external::<()>();
3135
3136        let (input_port, input) =
3137            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3138        let tick = node.tick();
3139
3140        let out = input
3141            .batch(&tick, nondet!(/** test */))
3142            .cross_singleton(
3143                node.source_iter(q!(vec![1, 2, 3]))
3144                    .atomic(&tick)
3145                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3146                    .snapshot_atomic(nondet!(/** test */)),
3147            )
3148            .all_ticks()
3149            .send_bincode_external(&external);
3150
3151        let nodes = flow
3152            .with_process(&node, deployment.Localhost())
3153            .with_external(&external, deployment.Localhost())
3154            .deploy(&mut deployment);
3155
3156        deployment.deploy().await.unwrap();
3157
3158        let mut external_in = nodes.connect(input_port).await;
3159        let mut external_out = nodes.connect(out).await;
3160
3161        deployment.start().await.unwrap();
3162
3163        external_in.send(1).await.unwrap();
3164        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3165
3166        external_in.send(2).await.unwrap();
3167        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3168    }
3169
3170    #[cfg(feature = "deploy")]
3171    #[tokio::test]
3172    async fn unbounded_scan_remembers_state() {
3173        let mut deployment = Deployment::new();
3174
3175        let flow = FlowBuilder::new();
3176        let node = flow.process::<()>();
3177        let external = flow.external::<()>();
3178
3179        let (input_port, input) = node.source_external_bincode(&external);
3180        let out = input
3181            .scan(
3182                q!(|| 0),
3183                q!(|acc, v| {
3184                    *acc += v;
3185                    Some(*acc)
3186                }),
3187            )
3188            .send_bincode_external(&external);
3189
3190        let nodes = flow
3191            .with_process(&node, deployment.Localhost())
3192            .with_external(&external, deployment.Localhost())
3193            .deploy(&mut deployment);
3194
3195        deployment.deploy().await.unwrap();
3196
3197        let mut external_in = nodes.connect(input_port).await;
3198        let mut external_out = nodes.connect(out).await;
3199
3200        deployment.start().await.unwrap();
3201
3202        external_in.send(1).await.unwrap();
3203        assert_eq!(external_out.next().await.unwrap(), 1);
3204
3205        external_in.send(2).await.unwrap();
3206        assert_eq!(external_out.next().await.unwrap(), 3);
3207    }
3208
3209    #[cfg(feature = "deploy")]
3210    #[tokio::test]
3211    async fn unbounded_enumerate_remembers_state() {
3212        let mut deployment = Deployment::new();
3213
3214        let flow = FlowBuilder::new();
3215        let node = flow.process::<()>();
3216        let external = flow.external::<()>();
3217
3218        let (input_port, input) = node.source_external_bincode(&external);
3219        let out = input.enumerate().send_bincode_external(&external);
3220
3221        let nodes = flow
3222            .with_process(&node, deployment.Localhost())
3223            .with_external(&external, deployment.Localhost())
3224            .deploy(&mut deployment);
3225
3226        deployment.deploy().await.unwrap();
3227
3228        let mut external_in = nodes.connect(input_port).await;
3229        let mut external_out = nodes.connect(out).await;
3230
3231        deployment.start().await.unwrap();
3232
3233        external_in.send(1).await.unwrap();
3234        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3235
3236        external_in.send(2).await.unwrap();
3237        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3238    }
3239
3240    #[cfg(feature = "deploy")]
3241    #[tokio::test]
3242    async fn unbounded_unique_remembers_state() {
3243        let mut deployment = Deployment::new();
3244
3245        let flow = FlowBuilder::new();
3246        let node = flow.process::<()>();
3247        let external = flow.external::<()>();
3248
3249        let (input_port, input) =
3250            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3251        let out = input.unique().send_bincode_external(&external);
3252
3253        let nodes = flow
3254            .with_process(&node, deployment.Localhost())
3255            .with_external(&external, deployment.Localhost())
3256            .deploy(&mut deployment);
3257
3258        deployment.deploy().await.unwrap();
3259
3260        let mut external_in = nodes.connect(input_port).await;
3261        let mut external_out = nodes.connect(out).await;
3262
3263        deployment.start().await.unwrap();
3264
3265        external_in.send(1).await.unwrap();
3266        assert_eq!(external_out.next().await.unwrap(), 1);
3267
3268        external_in.send(2).await.unwrap();
3269        assert_eq!(external_out.next().await.unwrap(), 2);
3270
3271        external_in.send(1).await.unwrap();
3272        external_in.send(3).await.unwrap();
3273        assert_eq!(external_out.next().await.unwrap(), 3);
3274    }
3275
3276    #[cfg(feature = "sim")]
3277    #[test]
3278    #[should_panic]
3279    fn sim_batch_nondet_size() {
3280        let flow = FlowBuilder::new();
3281        let node = flow.process::<()>();
3282
3283        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3284
3285        let tick = node.tick();
3286        let out_recv = input
3287            .batch(&tick, nondet!(/** test */))
3288            .count()
3289            .all_ticks()
3290            .sim_output();
3291
3292        flow.sim().exhaustive(async || {
3293            in_send.send(());
3294            in_send.send(());
3295            in_send.send(());
3296
3297            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3298        });
3299    }
3300
3301    #[cfg(feature = "sim")]
3302    #[test]
3303    fn sim_batch_preserves_order() {
3304        let flow = FlowBuilder::new();
3305        let node = flow.process::<()>();
3306
3307        let (in_send, input) = node.sim_input();
3308
3309        let tick = node.tick();
3310        let out_recv = input
3311            .batch(&tick, nondet!(/** test */))
3312            .all_ticks()
3313            .sim_output();
3314
3315        flow.sim().exhaustive(async || {
3316            in_send.send(1);
3317            in_send.send(2);
3318            in_send.send(3);
3319
3320            out_recv.assert_yields_only([1, 2, 3]).await;
3321        });
3322    }
3323
3324    #[cfg(feature = "sim")]
3325    #[test]
3326    #[should_panic]
3327    fn sim_batch_unordered_shuffles() {
3328        let flow = FlowBuilder::new();
3329        let node = flow.process::<()>();
3330
3331        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3332
3333        let tick = node.tick();
3334        let batch = input.batch(&tick, nondet!(/** test */));
3335        let out_recv = batch
3336            .clone()
3337            .min()
3338            .zip(batch.max())
3339            .all_ticks()
3340            .sim_output();
3341
3342        flow.sim().exhaustive(async || {
3343            in_send.send_many_unordered([1, 2, 3]);
3344
3345            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3346                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3347            }
3348        });
3349    }
3350
3351    #[cfg(feature = "sim")]
3352    #[test]
3353    fn sim_batch_unordered_shuffles_count() {
3354        let flow = FlowBuilder::new();
3355        let node = flow.process::<()>();
3356
3357        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3358
3359        let tick = node.tick();
3360        let batch = input.batch(&tick, nondet!(/** test */));
3361        let out_recv = batch.all_ticks().sim_output();
3362
3363        let instance_count = flow.sim().exhaustive(async || {
3364            in_send.send_many_unordered([1, 2, 3, 4]);
3365            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3366        });
3367
3368        assert_eq!(
3369            instance_count,
3370            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3371        )
3372    }
3373
3374    #[cfg(feature = "sim")]
3375    #[test]
3376    #[should_panic]
3377    fn sim_observe_order_batched() {
3378        let flow = FlowBuilder::new();
3379        let node = flow.process::<()>();
3380
3381        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3382
3383        let tick = node.tick();
3384        let batch = input.batch(&tick, nondet!(/** test */));
3385        let out_recv = batch
3386            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3387            .all_ticks()
3388            .sim_output();
3389
3390        flow.sim().exhaustive(async || {
3391            in_send.send_many_unordered([1, 2, 3, 4]);
3392            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3393        });
3394    }
3395
3396    #[cfg(feature = "sim")]
3397    #[test]
3398    fn sim_observe_order_batched_count() {
3399        let flow = FlowBuilder::new();
3400        let node = flow.process::<()>();
3401
3402        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3403
3404        let tick = node.tick();
3405        let batch = input.batch(&tick, nondet!(/** test */));
3406        let out_recv = batch
3407            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3408            .all_ticks()
3409            .sim_output();
3410
3411        let instance_count = flow.sim().exhaustive(async || {
3412            in_send.send_many_unordered([1, 2, 3, 4]);
3413            let _ = out_recv.collect::<Vec<_>>().await;
3414        });
3415
3416        assert_eq!(
3417            instance_count,
3418            192 // 4! * 2^{4 - 1}
3419        )
3420    }
3421
3422    #[cfg(feature = "sim")]
3423    #[test]
3424    fn sim_unordered_count_instance_count() {
3425        let flow = FlowBuilder::new();
3426        let node = flow.process::<()>();
3427
3428        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3429
3430        let tick = node.tick();
3431        let out_recv = input
3432            .count()
3433            .snapshot(&tick, nondet!(/** test */))
3434            .all_ticks()
3435            .sim_output();
3436
3437        let instance_count = flow.sim().exhaustive(async || {
3438            in_send.send_many_unordered([1, 2, 3, 4]);
3439            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3440        });
3441
3442        assert_eq!(
3443            instance_count,
3444            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3445        )
3446    }
3447}