Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::manual_expr::ManualExpr;
31use crate::nondet::{NonDet, nondet};
32use crate::prelude::manual_proof;
33use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
34
35pub mod networking;
36
37/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
38#[sealed::sealed]
39pub trait Ordering:
40    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
41{
42    /// The [`StreamOrder`] corresponding to this type.
43    const ORDERING_KIND: StreamOrder;
44}
45
46/// Marks the stream as being totally ordered, which means that there are
47/// no sources of non-determinism (other than intentional ones) that will
48/// affect the order of elements.
49pub enum TotalOrder {}
50
51#[sealed::sealed]
52impl Ordering for TotalOrder {
53    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
54}
55
56/// Marks the stream as having no order, which means that the order of
57/// elements may be affected by non-determinism.
58///
59/// This restricts certain operators, such as `fold` and `reduce`, to only
60/// be used with commutative aggregation functions.
61pub enum NoOrder {}
62
63#[sealed::sealed]
64impl Ordering for NoOrder {
65    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
66}
67
68/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
69/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
70/// have `Self` guarantees instead.
71#[sealed::sealed]
72pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
73#[sealed::sealed]
74impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
75
76/// Helper trait for determining the weakest of two orderings.
77#[sealed::sealed]
78pub trait MinOrder<Other: ?Sized> {
79    /// The weaker of the two orderings.
80    type Min: Ordering;
81}
82
83#[sealed::sealed]
84impl<O: Ordering> MinOrder<O> for TotalOrder {
85    type Min = O;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for NoOrder {
90    type Min = NoOrder;
91}
92
93/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
94#[sealed::sealed]
95pub trait Retries:
96    MinRetries<Self, Min = Self>
97    + MinRetries<ExactlyOnce, Min = Self>
98    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
99{
100    /// The [`StreamRetry`] corresponding to this type.
101    const RETRIES_KIND: StreamRetry;
102}
103
104/// Marks the stream as having deterministic message cardinality, with no
105/// possibility of duplicates.
106pub enum ExactlyOnce {}
107
108#[sealed::sealed]
109impl Retries for ExactlyOnce {
110    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
111}
112
113/// Marks the stream as having non-deterministic message cardinality, which
114/// means that duplicates may occur, but messages will not be dropped.
115pub enum AtLeastOnce {}
116
117#[sealed::sealed]
118impl Retries for AtLeastOnce {
119    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
120}
121
122/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
123/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
124/// have `Self` guarantees instead.
125#[sealed::sealed]
126pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
127#[sealed::sealed]
128impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
129
130/// Helper trait for determining the weakest of two retry guarantees.
131#[sealed::sealed]
132pub trait MinRetries<Other: ?Sized> {
133    /// The weaker of the two retry guarantees.
134    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
135}
136
137#[sealed::sealed]
138impl<R: Retries> MinRetries<R> for ExactlyOnce {
139    type Min = R;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for AtLeastOnce {
144    type Min = AtLeastOnce;
145}
146
147#[sealed::sealed]
148#[diagnostic::on_unimplemented(
149    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
150    label = "required here",
151    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
152)]
153/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
154pub trait IsOrdered: Ordering {}
155
156#[sealed::sealed]
157#[diagnostic::do_not_recommend]
158impl IsOrdered for TotalOrder {}
159
160#[sealed::sealed]
161#[diagnostic::on_unimplemented(
162    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
163    label = "required here",
164    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
165)]
166/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
167pub trait IsExactlyOnce: Retries {}
168
169#[sealed::sealed]
170#[diagnostic::do_not_recommend]
171impl IsExactlyOnce for ExactlyOnce {}
172
173/// Streaming sequence of elements with type `Type`.
174///
175/// This live collection represents a growing sequence of elements, with new elements being
176/// asynchronously appended to the end of the sequence. This can be used to model the arrival
177/// of network input, such as API requests, or streaming ingestion.
178///
179/// By default, all streams have deterministic ordering and each element is materialized exactly
180/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
181/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
182/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
183///
184/// Type Parameters:
185/// - `Type`: the type of elements in the stream
186/// - `Loc`: the location where the stream is being materialized
187/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
188/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
189///   (default is [`TotalOrder`])
190/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
191///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
192pub struct Stream<
193    Type,
194    Loc,
195    Bound: Boundedness = Unbounded,
196    Order: Ordering = TotalOrder,
197    Retry: Retries = ExactlyOnce,
198> {
199    pub(crate) location: Loc,
200    pub(crate) ir_node: RefCell<HydroNode>,
201    pub(crate) flow_state: FlowState,
202
203    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
204}
205
206impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
207    fn drop(&mut self) {
208        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
209        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
210            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
211                input: Box::new(ir_node),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214        }
215    }
216}
217
218impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
219    for Stream<T, L, Unbounded, O, R>
220where
221    L: Location<'a>,
222{
223    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
224        let new_meta = stream
225            .location
226            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
227
228        Stream {
229            location: stream.location.clone(),
230            flow_state: stream.flow_state.clone(),
231            ir_node: RefCell::new(HydroNode::Cast {
232                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
233                metadata: new_meta,
234            }),
235            _phantom: PhantomData,
236        }
237    }
238}
239
240impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
241    for Stream<T, L, B, NoOrder, R>
242where
243    L: Location<'a>,
244{
245    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
246        stream.weaken_ordering()
247    }
248}
249
250impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
251    for Stream<T, L, B, O, AtLeastOnce>
252where
253    L: Location<'a>,
254{
255    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
256        stream.weaken_retries()
257    }
258}
259
260impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
261where
262    L: Location<'a>,
263{
264    fn defer_tick(self) -> Self {
265        Stream::defer_tick(self)
266    }
267}
268
269impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
270    for Stream<T, Tick<L>, Bounded, O, R>
271where
272    L: Location<'a>,
273{
274    type Location = Tick<L>;
275
276    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
277        Stream::new(
278            location.clone(),
279            HydroNode::CycleSource {
280                cycle_id,
281                metadata: location.new_node_metadata(Self::collection_kind()),
282            },
283        )
284    }
285}
286
287impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
288    for Stream<T, Tick<L>, Bounded, O, R>
289where
290    L: Location<'a>,
291{
292    type Location = Tick<L>;
293
294    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
295        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
296            location.clone(),
297            HydroNode::DeferTick {
298                input: Box::new(HydroNode::CycleSource {
299                    cycle_id,
300                    metadata: location.new_node_metadata(Self::collection_kind()),
301                }),
302                metadata: location.new_node_metadata(Self::collection_kind()),
303            },
304        );
305
306        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
307    }
308}
309
310impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
311    for Stream<T, Tick<L>, Bounded, O, R>
312where
313    L: Location<'a>,
314{
315    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
316        assert_eq!(
317            Location::id(&self.location),
318            expected_location,
319            "locations do not match"
320        );
321        self.location
322            .flow_state()
323            .borrow_mut()
324            .push_root(HydroRoot::CycleSink {
325                cycle_id,
326                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
327                op_metadata: HydroIrOpMetadata::new(),
328            });
329    }
330}
331
332impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
333    for Stream<T, L, B, O, R>
334where
335    L: Location<'a> + NoTick,
336{
337    type Location = L;
338
339    fn create_source(cycle_id: CycleId, location: L) -> Self {
340        Stream::new(
341            location.clone(),
342            HydroNode::CycleSource {
343                cycle_id,
344                metadata: location.new_node_metadata(Self::collection_kind()),
345            },
346        )
347    }
348}
349
350impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
351    for Stream<T, L, B, O, R>
352where
353    L: Location<'a> + NoTick,
354{
355    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
356        assert_eq!(
357            Location::id(&self.location),
358            expected_location,
359            "locations do not match"
360        );
361        self.location
362            .flow_state()
363            .borrow_mut()
364            .push_root(HydroRoot::CycleSink {
365                cycle_id,
366                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
367                op_metadata: HydroIrOpMetadata::new(),
368            });
369    }
370}
371
372impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
373where
374    T: Clone,
375    L: Location<'a>,
376{
377    fn clone(&self) -> Self {
378        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
379            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
380            *self.ir_node.borrow_mut() = HydroNode::Tee {
381                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
382                metadata: self.location.new_node_metadata(Self::collection_kind()),
383            };
384        }
385
386        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
387            Stream {
388                location: self.location.clone(),
389                flow_state: self.flow_state.clone(),
390                ir_node: HydroNode::Tee {
391                    inner: SharedNode(inner.0.clone()),
392                    metadata: metadata.clone(),
393                }
394                .into(),
395                _phantom: PhantomData,
396            }
397        } else {
398            unreachable!()
399        }
400    }
401}
402
403impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
404where
405    L: Location<'a>,
406{
407    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
408        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
409        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
410
411        let flow_state = location.flow_state().clone();
412        Stream {
413            location,
414            flow_state,
415            ir_node: RefCell::new(ir_node),
416            _phantom: PhantomData,
417        }
418    }
419
420    /// Returns the [`Location`] where this stream is being materialized.
421    pub fn location(&self) -> &L {
422        &self.location
423    }
424
425    pub(crate) fn collection_kind() -> CollectionKind {
426        CollectionKind::Stream {
427            bound: B::BOUND_KIND,
428            order: O::ORDERING_KIND,
429            retry: R::RETRIES_KIND,
430            element_type: quote_type::<T>().into(),
431        }
432    }
433
434    /// Produces a stream based on invoking `f` on each element.
435    /// If you do not want to modify the stream and instead only want to view
436    /// each item use [`Stream::inspect`] instead.
437    ///
438    /// # Example
439    /// ```rust
440    /// # #[cfg(feature = "deploy")] {
441    /// # use hydro_lang::prelude::*;
442    /// # use futures::StreamExt;
443    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
444    /// let words = process.source_iter(q!(vec!["hello", "world"]));
445    /// words.map(q!(|x| x.to_uppercase()))
446    /// # }, |mut stream| async move {
447    /// # for w in vec!["HELLO", "WORLD"] {
448    /// #     assert_eq!(stream.next().await.unwrap(), w);
449    /// # }
450    /// # }));
451    /// # }
452    /// ```
453    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
454    where
455        F: Fn(T) -> U + 'a,
456    {
457        let f = f.splice_fn1_ctx(&self.location).into();
458        Stream::new(
459            self.location.clone(),
460            HydroNode::Map {
461                f,
462                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
463                metadata: self
464                    .location
465                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
466            },
467        )
468    }
469
470    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
471    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
472    /// for the output type `U` must produce items in a **deterministic** order.
473    ///
474    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
475    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
476    ///
477    /// # Example
478    /// ```rust
479    /// # #[cfg(feature = "deploy")] {
480    /// # use hydro_lang::prelude::*;
481    /// # use futures::StreamExt;
482    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
483    /// process
484    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
485    ///     .flat_map_ordered(q!(|x| x))
486    /// # }, |mut stream| async move {
487    /// // 1, 2, 3, 4
488    /// # for w in (1..5) {
489    /// #     assert_eq!(stream.next().await.unwrap(), w);
490    /// # }
491    /// # }));
492    /// # }
493    /// ```
494    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
495    where
496        I: IntoIterator<Item = U>,
497        F: Fn(T) -> I + 'a,
498    {
499        let f = f.splice_fn1_ctx(&self.location).into();
500        Stream::new(
501            self.location.clone(),
502            HydroNode::FlatMap {
503                f,
504                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505                metadata: self
506                    .location
507                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
508            },
509        )
510    }
511
512    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
513    /// for the output type `U` to produce items in any order.
514    ///
515    /// # Example
516    /// ```rust
517    /// # #[cfg(feature = "deploy")] {
518    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
519    /// # use futures::StreamExt;
520    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
521    /// process
522    ///     .source_iter(q!(vec![
523    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
524    ///         std::collections::HashSet::from_iter(vec![3, 4]),
525    ///     ]))
526    ///     .flat_map_unordered(q!(|x| x))
527    /// # }, |mut stream| async move {
528    /// // 1, 2, 3, 4, but in no particular order
529    /// # let mut results = Vec::new();
530    /// # for w in (1..5) {
531    /// #     results.push(stream.next().await.unwrap());
532    /// # }
533    /// # results.sort();
534    /// # assert_eq!(results, vec![1, 2, 3, 4]);
535    /// # }));
536    /// # }
537    /// ```
538    pub fn flat_map_unordered<U, I, F>(
539        self,
540        f: impl IntoQuotedMut<'a, F, L>,
541    ) -> Stream<U, L, B, NoOrder, R>
542    where
543        I: IntoIterator<Item = U>,
544        F: Fn(T) -> I + 'a,
545    {
546        let f = f.splice_fn1_ctx(&self.location).into();
547        Stream::new(
548            self.location.clone(),
549            HydroNode::FlatMap {
550                f,
551                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
552                metadata: self
553                    .location
554                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
555            },
556        )
557    }
558
559    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
560    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
561    ///
562    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
563    /// not deterministic, use [`Stream::flatten_unordered`] instead.
564    ///
565    /// ```rust
566    /// # #[cfg(feature = "deploy")] {
567    /// # use hydro_lang::prelude::*;
568    /// # use futures::StreamExt;
569    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
570    /// process
571    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
572    ///     .flatten_ordered()
573    /// # }, |mut stream| async move {
574    /// // 1, 2, 3, 4
575    /// # for w in (1..5) {
576    /// #     assert_eq!(stream.next().await.unwrap(), w);
577    /// # }
578    /// # }));
579    /// # }
580    /// ```
581    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
582    where
583        T: IntoIterator<Item = U>,
584    {
585        self.flat_map_ordered(q!(|d| d))
586    }
587
588    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
589    /// for the element type `T` to produce items in any order.
590    ///
591    /// # Example
592    /// ```rust
593    /// # #[cfg(feature = "deploy")] {
594    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
595    /// # use futures::StreamExt;
596    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
597    /// process
598    ///     .source_iter(q!(vec![
599    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
600    ///         std::collections::HashSet::from_iter(vec![3, 4]),
601    ///     ]))
602    ///     .flatten_unordered()
603    /// # }, |mut stream| async move {
604    /// // 1, 2, 3, 4, but in no particular order
605    /// # let mut results = Vec::new();
606    /// # for w in (1..5) {
607    /// #     results.push(stream.next().await.unwrap());
608    /// # }
609    /// # results.sort();
610    /// # assert_eq!(results, vec![1, 2, 3, 4]);
611    /// # }));
612    /// # }
613    /// ```
614    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
615    where
616        T: IntoIterator<Item = U>,
617    {
618        self.flat_map_unordered(q!(|d| d))
619    }
620
621    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
622    /// `f`, preserving the order of the elements.
623    ///
624    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
625    /// not modify or take ownership of the values. If you need to modify the values while filtering
626    /// use [`Stream::filter_map`] instead.
627    ///
628    /// # Example
629    /// ```rust
630    /// # #[cfg(feature = "deploy")] {
631    /// # use hydro_lang::prelude::*;
632    /// # use futures::StreamExt;
633    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
634    /// process
635    ///     .source_iter(q!(vec![1, 2, 3, 4]))
636    ///     .filter(q!(|&x| x > 2))
637    /// # }, |mut stream| async move {
638    /// // 3, 4
639    /// # for w in (3..5) {
640    /// #     assert_eq!(stream.next().await.unwrap(), w);
641    /// # }
642    /// # }));
643    /// # }
644    /// ```
645    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
646    where
647        F: Fn(&T) -> bool + 'a,
648    {
649        let f = f.splice_fn1_borrow_ctx(&self.location).into();
650        Stream::new(
651            self.location.clone(),
652            HydroNode::Filter {
653                f,
654                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
655                metadata: self.location.new_node_metadata(Self::collection_kind()),
656            },
657        )
658    }
659
660    /// Splits the stream into two streams based on a predicate, without cloning elements.
661    ///
662    /// Elements for which `f` returns `true` are sent to the first output stream,
663    /// and elements for which `f` returns `false` are sent to the second output stream.
664    ///
665    /// Unlike using `filter` twice, this only evaluates the predicate once per element
666    /// and does not require `T: Clone`.
667    ///
668    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
669    /// the predicate is only used for routing; the element itself is moved to the
670    /// appropriate output stream.
671    ///
672    /// # Example
673    /// ```rust
674    /// # #[cfg(feature = "deploy")] {
675    /// # use hydro_lang::prelude::*;
676    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
677    /// # use futures::StreamExt;
678    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
679    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
680    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
681    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
682    /// evens.map(q!(|x| (x, true)))
683    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
684    /// # }, |mut stream| async move {
685    /// # let mut results = Vec::new();
686    /// # for _ in 0..6 {
687    /// #     results.push(stream.next().await.unwrap());
688    /// # }
689    /// # results.sort();
690    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
691    /// # }));
692    /// # }
693    /// ```
694    #[expect(
695        clippy::type_complexity,
696        reason = "return type mirrors the input stream type"
697    )]
698    pub fn partition<F>(
699        self,
700        f: impl IntoQuotedMut<'a, F, L>,
701    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
702    where
703        F: Fn(&T) -> bool + 'a,
704    {
705        let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
706        let shared = SharedNode(Rc::new(RefCell::new(
707            self.ir_node.replace(HydroNode::Placeholder),
708        )));
709
710        let true_stream = Stream::new(
711            self.location.clone(),
712            HydroNode::Partition {
713                inner: SharedNode(shared.0.clone()),
714                f: f.clone(),
715                is_true: true,
716                metadata: self.location.new_node_metadata(Self::collection_kind()),
717            },
718        );
719
720        let false_stream = Stream::new(
721            self.location.clone(),
722            HydroNode::Partition {
723                inner: SharedNode(shared.0),
724                f,
725                is_true: false,
726                metadata: self.location.new_node_metadata(Self::collection_kind()),
727            },
728        );
729
730        (true_stream, false_stream)
731    }
732
733    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
734    ///
735    /// # Example
736    /// ```rust
737    /// # #[cfg(feature = "deploy")] {
738    /// # use hydro_lang::prelude::*;
739    /// # use futures::StreamExt;
740    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
741    /// process
742    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
743    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
744    /// # }, |mut stream| async move {
745    /// // 1, 2
746    /// # for w in (1..3) {
747    /// #     assert_eq!(stream.next().await.unwrap(), w);
748    /// # }
749    /// # }));
750    /// # }
751    /// ```
752    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
753    where
754        F: Fn(T) -> Option<U> + 'a,
755    {
756        let f = f.splice_fn1_ctx(&self.location).into();
757        Stream::new(
758            self.location.clone(),
759            HydroNode::FilterMap {
760                f,
761                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
762                metadata: self
763                    .location
764                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
765            },
766        )
767    }
768
769    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
770    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
771    /// If `other` is an empty [`Optional`], no values will be produced.
772    ///
773    /// # Example
774    /// ```rust
775    /// # #[cfg(feature = "deploy")] {
776    /// # use hydro_lang::prelude::*;
777    /// # use futures::StreamExt;
778    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
779    /// let tick = process.tick();
780    /// let batch = process
781    ///   .source_iter(q!(vec![1, 2, 3, 4]))
782    ///   .batch(&tick, nondet!(/** test */));
783    /// let count = batch.clone().count(); // `count()` returns a singleton
784    /// batch.cross_singleton(count).all_ticks()
785    /// # }, |mut stream| async move {
786    /// // (1, 4), (2, 4), (3, 4), (4, 4)
787    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
788    /// #     assert_eq!(stream.next().await.unwrap(), w);
789    /// # }
790    /// # }));
791    /// # }
792    /// ```
793    pub fn cross_singleton<O2>(
794        self,
795        other: impl Into<Optional<O2, L, Bounded>>,
796    ) -> Stream<(T, O2), L, B, O, R>
797    where
798        O2: Clone,
799    {
800        let other: Optional<O2, L, Bounded> = other.into();
801        check_matching_location(&self.location, &other.location);
802
803        Stream::new(
804            self.location.clone(),
805            HydroNode::CrossSingleton {
806                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
807                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
808                metadata: self
809                    .location
810                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
811            },
812        )
813    }
814
815    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
816    ///
817    /// # Example
818    /// ```rust
819    /// # #[cfg(feature = "deploy")] {
820    /// # use hydro_lang::prelude::*;
821    /// # use futures::StreamExt;
822    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
823    /// let tick = process.tick();
824    /// // ticks are lazy by default, forces the second tick to run
825    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
826    ///
827    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
828    /// let batch_first_tick = process
829    ///   .source_iter(q!(vec![1, 2, 3, 4]))
830    ///   .batch(&tick, nondet!(/** test */));
831    /// let batch_second_tick = process
832    ///   .source_iter(q!(vec![5, 6, 7, 8]))
833    ///   .batch(&tick, nondet!(/** test */))
834    ///   .defer_tick();
835    /// batch_first_tick.chain(batch_second_tick)
836    ///   .filter_if(signal)
837    ///   .all_ticks()
838    /// # }, |mut stream| async move {
839    /// // [1, 2, 3, 4]
840    /// # for w in vec![1, 2, 3, 4] {
841    /// #     assert_eq!(stream.next().await.unwrap(), w);
842    /// # }
843    /// # }));
844    /// # }
845    /// ```
846    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
847        self.cross_singleton(signal.filter(q!(|b| *b)))
848            .map(q!(|(d, _)| d))
849    }
850
851    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
852    ///
853    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
854    /// leader of a cluster.
855    ///
856    /// # Example
857    /// ```rust
858    /// # #[cfg(feature = "deploy")] {
859    /// # use hydro_lang::prelude::*;
860    /// # use futures::StreamExt;
861    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862    /// let tick = process.tick();
863    /// // ticks are lazy by default, forces the second tick to run
864    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
865    ///
866    /// let batch_first_tick = process
867    ///   .source_iter(q!(vec![1, 2, 3, 4]))
868    ///   .batch(&tick, nondet!(/** test */));
869    /// let batch_second_tick = process
870    ///   .source_iter(q!(vec![5, 6, 7, 8]))
871    ///   .batch(&tick, nondet!(/** test */))
872    ///   .defer_tick(); // appears on the second tick
873    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
874    /// batch_first_tick.chain(batch_second_tick)
875    ///   .filter_if_some(some_on_first_tick)
876    ///   .all_ticks()
877    /// # }, |mut stream| async move {
878    /// // [1, 2, 3, 4]
879    /// # for w in vec![1, 2, 3, 4] {
880    /// #     assert_eq!(stream.next().await.unwrap(), w);
881    /// # }
882    /// # }));
883    /// # }
884    /// ```
885    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
886    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
887        self.filter_if(signal.is_some())
888    }
889
890    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
891    ///
892    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
893    /// some local state.
894    ///
895    /// # Example
896    /// ```rust
897    /// # #[cfg(feature = "deploy")] {
898    /// # use hydro_lang::prelude::*;
899    /// # use futures::StreamExt;
900    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
901    /// let tick = process.tick();
902    /// // ticks are lazy by default, forces the second tick to run
903    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
904    ///
905    /// let batch_first_tick = process
906    ///   .source_iter(q!(vec![1, 2, 3, 4]))
907    ///   .batch(&tick, nondet!(/** test */));
908    /// let batch_second_tick = process
909    ///   .source_iter(q!(vec![5, 6, 7, 8]))
910    ///   .batch(&tick, nondet!(/** test */))
911    ///   .defer_tick(); // appears on the second tick
912    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
913    /// batch_first_tick.chain(batch_second_tick)
914    ///   .filter_if_none(some_on_first_tick)
915    ///   .all_ticks()
916    /// # }, |mut stream| async move {
917    /// // [5, 6, 7, 8]
918    /// # for w in vec![5, 6, 7, 8] {
919    /// #     assert_eq!(stream.next().await.unwrap(), w);
920    /// # }
921    /// # }));
922    /// # }
923    /// ```
924    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
925    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
926        self.filter_if(other.is_none())
927    }
928
929    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
930    /// tupled pairs in a non-deterministic order.
931    ///
932    /// # Example
933    /// ```rust
934    /// # #[cfg(feature = "deploy")] {
935    /// # use hydro_lang::prelude::*;
936    /// # use std::collections::HashSet;
937    /// # use futures::StreamExt;
938    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
939    /// let tick = process.tick();
940    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
941    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
942    /// stream1.cross_product(stream2)
943    /// # }, |mut stream| async move {
944    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
945    /// # stream.map(|i| assert!(expected.contains(&i)));
946    /// # }));
947    /// # }
948    /// ```
949    pub fn cross_product<T2, O2: Ordering>(
950        self,
951        other: Stream<T2, L, B, O2, R>,
952    ) -> Stream<(T, T2), L, B, NoOrder, R>
953    where
954        T: Clone,
955        T2: Clone,
956    {
957        check_matching_location(&self.location, &other.location);
958
959        Stream::new(
960            self.location.clone(),
961            HydroNode::CrossProduct {
962                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
963                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
964                metadata: self
965                    .location
966                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
967            },
968        )
969    }
970
971    /// Takes one stream as input and filters out any duplicate occurrences. The output
972    /// contains all unique values from the input.
973    ///
974    /// # Example
975    /// ```rust
976    /// # #[cfg(feature = "deploy")] {
977    /// # use hydro_lang::prelude::*;
978    /// # use futures::StreamExt;
979    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980    /// let tick = process.tick();
981    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
982    /// # }, |mut stream| async move {
983    /// # for w in vec![1, 2, 3, 4] {
984    /// #     assert_eq!(stream.next().await.unwrap(), w);
985    /// # }
986    /// # }));
987    /// # }
988    /// ```
989    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
990    where
991        T: Eq + Hash,
992    {
993        Stream::new(
994            self.location.clone(),
995            HydroNode::Unique {
996                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
997                metadata: self
998                    .location
999                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1000            },
1001        )
1002    }
1003
1004    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1005    ///
1006    /// The `other` stream must be [`Bounded`], since this function will wait until
1007    /// all its elements are available before producing any output.
1008    /// # Example
1009    /// ```rust
1010    /// # #[cfg(feature = "deploy")] {
1011    /// # use hydro_lang::prelude::*;
1012    /// # use futures::StreamExt;
1013    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014    /// let tick = process.tick();
1015    /// let stream = process
1016    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1017    ///   .batch(&tick, nondet!(/** test */));
1018    /// let batch = process
1019    ///   .source_iter(q!(vec![1, 2]))
1020    ///   .batch(&tick, nondet!(/** test */));
1021    /// stream.filter_not_in(batch).all_ticks()
1022    /// # }, |mut stream| async move {
1023    /// # for w in vec![3, 4] {
1024    /// #     assert_eq!(stream.next().await.unwrap(), w);
1025    /// # }
1026    /// # }));
1027    /// # }
1028    /// ```
1029    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1030    where
1031        T: Eq + Hash,
1032        B2: IsBounded,
1033    {
1034        check_matching_location(&self.location, &other.location);
1035
1036        Stream::new(
1037            self.location.clone(),
1038            HydroNode::Difference {
1039                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1040                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1041                metadata: self
1042                    .location
1043                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1044            },
1045        )
1046    }
1047
1048    /// An operator which allows you to "inspect" each element of a stream without
1049    /// modifying it. The closure `f` is called on a reference to each item. This is
1050    /// mainly useful for debugging, and should not be used to generate side-effects.
1051    ///
1052    /// # Example
1053    /// ```rust
1054    /// # #[cfg(feature = "deploy")] {
1055    /// # use hydro_lang::prelude::*;
1056    /// # use futures::StreamExt;
1057    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058    /// let nums = process.source_iter(q!(vec![1, 2]));
1059    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1060    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1061    /// # }, |mut stream| async move {
1062    /// # for w in vec![1, 2] {
1063    /// #     assert_eq!(stream.next().await.unwrap(), w);
1064    /// # }
1065    /// # }));
1066    /// # }
1067    /// ```
1068    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1069    where
1070        F: Fn(&T) + 'a,
1071    {
1072        let f = f.splice_fn1_borrow_ctx(&self.location).into();
1073
1074        Stream::new(
1075            self.location.clone(),
1076            HydroNode::Inspect {
1077                f,
1078                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1079                metadata: self.location.new_node_metadata(Self::collection_kind()),
1080            },
1081        )
1082    }
1083
1084    /// Executes the provided closure for every element in this stream.
1085    ///
1086    /// Because the closure may have side effects, the stream must have deterministic order
1087    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1088    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1089    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1090    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1091    where
1092        O: IsOrdered,
1093        R: IsExactlyOnce,
1094    {
1095        let f = f.splice_fn1_ctx(&self.location).into();
1096        self.location
1097            .flow_state()
1098            .borrow_mut()
1099            .push_root(HydroRoot::ForEach {
1100                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1101                f,
1102                op_metadata: HydroIrOpMetadata::new(),
1103            });
1104    }
1105
1106    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1107    /// TCP socket to some other server. You should _not_ use this API for interacting with
1108    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1109    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1110    /// interaction with asynchronous sinks.
1111    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1112    where
1113        O: IsOrdered,
1114        R: IsExactlyOnce,
1115        S: 'a + futures::Sink<T> + Unpin,
1116    {
1117        self.location
1118            .flow_state()
1119            .borrow_mut()
1120            .push_root(HydroRoot::DestSink {
1121                sink: sink.splice_typed_ctx(&self.location).into(),
1122                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1123                op_metadata: HydroIrOpMetadata::new(),
1124            });
1125    }
1126
1127    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1128    ///
1129    /// # Example
1130    /// ```rust
1131    /// # #[cfg(feature = "deploy")] {
1132    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1133    /// # use futures::StreamExt;
1134    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1135    /// let tick = process.tick();
1136    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1137    /// numbers.enumerate()
1138    /// # }, |mut stream| async move {
1139    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1140    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1141    /// #     assert_eq!(stream.next().await.unwrap(), w);
1142    /// # }
1143    /// # }));
1144    /// # }
1145    /// ```
1146    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1147    where
1148        O: IsOrdered,
1149        R: IsExactlyOnce,
1150    {
1151        Stream::new(
1152            self.location.clone(),
1153            HydroNode::Enumerate {
1154                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1155                metadata: self.location.new_node_metadata(Stream::<
1156                    (usize, T),
1157                    L,
1158                    B,
1159                    TotalOrder,
1160                    ExactlyOnce,
1161                >::collection_kind()),
1162            },
1163        )
1164    }
1165
1166    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1167    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1168    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1169    ///
1170    /// Depending on the input stream guarantees, the closure may need to be commutative
1171    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1172    ///
1173    /// # Example
1174    /// ```rust
1175    /// # #[cfg(feature = "deploy")] {
1176    /// # use hydro_lang::prelude::*;
1177    /// # use futures::StreamExt;
1178    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1179    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1180    /// words
1181    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1182    ///     .into_stream()
1183    /// # }, |mut stream| async move {
1184    /// // "HELLOWORLD"
1185    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1186    /// # }));
1187    /// # }
1188    /// ```
1189    pub fn fold<A, I, F, C, Idemp>(
1190        self,
1191        init: impl IntoQuotedMut<'a, I, L>,
1192        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1193    ) -> Singleton<A, L, B>
1194    where
1195        I: Fn() -> A + 'a,
1196        F: Fn(&mut A, T),
1197        C: ValidCommutativityFor<O>,
1198        Idemp: ValidIdempotenceFor<R>,
1199    {
1200        let init = init.splice_fn0_ctx(&self.location).into();
1201        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1202        proof.register_proof(&comb);
1203
1204        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1205        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1206
1207        let core = HydroNode::Fold {
1208            init,
1209            acc: comb.into(),
1210            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1211            metadata: ordered_etc
1212                .location
1213                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1214        };
1215
1216        Singleton::new(ordered_etc.location.clone(), core)
1217    }
1218
1219    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1220    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1221    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1222    /// reference, so that it can be modified in place.
1223    ///
1224    /// Depending on the input stream guarantees, the closure may need to be commutative
1225    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1226    ///
1227    /// # Example
1228    /// ```rust
1229    /// # #[cfg(feature = "deploy")] {
1230    /// # use hydro_lang::prelude::*;
1231    /// # use futures::StreamExt;
1232    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233    /// let bools = process.source_iter(q!(vec![false, true, false]));
1234    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1235    /// # }, |mut stream| async move {
1236    /// // true
1237    /// # assert_eq!(stream.next().await.unwrap(), true);
1238    /// # }));
1239    /// # }
1240    /// ```
1241    pub fn reduce<F, C, Idemp>(
1242        self,
1243        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1244    ) -> Optional<T, L, B>
1245    where
1246        F: Fn(&mut T, T) + 'a,
1247        C: ValidCommutativityFor<O>,
1248        Idemp: ValidIdempotenceFor<R>,
1249    {
1250        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1251        proof.register_proof(&f);
1252
1253        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1254        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1255
1256        let core = HydroNode::Reduce {
1257            f: f.into(),
1258            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1259            metadata: ordered_etc
1260                .location
1261                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1262        };
1263
1264        Optional::new(ordered_etc.location.clone(), core)
1265    }
1266
1267    /// Computes the maximum element in the stream as an [`Optional`], which
1268    /// will be empty until the first element in the input arrives.
1269    ///
1270    /// # Example
1271    /// ```rust
1272    /// # #[cfg(feature = "deploy")] {
1273    /// # use hydro_lang::prelude::*;
1274    /// # use futures::StreamExt;
1275    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1276    /// let tick = process.tick();
1277    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1278    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1279    /// batch.max().all_ticks()
1280    /// # }, |mut stream| async move {
1281    /// // 4
1282    /// # assert_eq!(stream.next().await.unwrap(), 4);
1283    /// # }));
1284    /// # }
1285    /// ```
1286    pub fn max(self) -> Optional<T, L, B>
1287    where
1288        T: Ord,
1289    {
1290        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1291            .assume_ordering_trusted_bounded::<TotalOrder>(
1292                nondet!(/** max is commutative, but order affects intermediates */),
1293            )
1294            .reduce(q!(|curr, new| {
1295                if new > *curr {
1296                    *curr = new;
1297                }
1298            }))
1299    }
1300
1301    /// Computes the minimum element in the stream as an [`Optional`], which
1302    /// will be empty until the first element in the input arrives.
1303    ///
1304    /// # Example
1305    /// ```rust
1306    /// # #[cfg(feature = "deploy")] {
1307    /// # use hydro_lang::prelude::*;
1308    /// # use futures::StreamExt;
1309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1310    /// let tick = process.tick();
1311    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1312    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1313    /// batch.min().all_ticks()
1314    /// # }, |mut stream| async move {
1315    /// // 1
1316    /// # assert_eq!(stream.next().await.unwrap(), 1);
1317    /// # }));
1318    /// # }
1319    /// ```
1320    pub fn min(self) -> Optional<T, L, B>
1321    where
1322        T: Ord,
1323    {
1324        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1325            .assume_ordering_trusted_bounded::<TotalOrder>(
1326                nondet!(/** max is commutative, but order affects intermediates */),
1327            )
1328            .reduce(q!(|curr, new| {
1329                if new < *curr {
1330                    *curr = new;
1331                }
1332            }))
1333    }
1334
1335    /// Computes the first element in the stream as an [`Optional`], which
1336    /// will be empty until the first element in the input arrives.
1337    ///
1338    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1339    /// re-ordering of elements may cause the first element to change.
1340    ///
1341    /// # Example
1342    /// ```rust
1343    /// # #[cfg(feature = "deploy")] {
1344    /// # use hydro_lang::prelude::*;
1345    /// # use futures::StreamExt;
1346    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1347    /// let tick = process.tick();
1348    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1349    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1350    /// batch.first().all_ticks()
1351    /// # }, |mut stream| async move {
1352    /// // 1
1353    /// # assert_eq!(stream.next().await.unwrap(), 1);
1354    /// # }));
1355    /// # }
1356    /// ```
1357    pub fn first(self) -> Optional<T, L, B>
1358    where
1359        O: IsOrdered,
1360    {
1361        self.make_totally_ordered()
1362            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1363            .reduce(q!(|_, _| {}))
1364    }
1365
1366    /// Computes the last element in the stream as an [`Optional`], which
1367    /// will be empty until an element in the input arrives.
1368    ///
1369    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1370    /// re-ordering of elements may cause the last element to change.
1371    ///
1372    /// # Example
1373    /// ```rust
1374    /// # #[cfg(feature = "deploy")] {
1375    /// # use hydro_lang::prelude::*;
1376    /// # use futures::StreamExt;
1377    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1378    /// let tick = process.tick();
1379    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1380    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1381    /// batch.last().all_ticks()
1382    /// # }, |mut stream| async move {
1383    /// // 4
1384    /// # assert_eq!(stream.next().await.unwrap(), 4);
1385    /// # }));
1386    /// # }
1387    /// ```
1388    pub fn last(self) -> Optional<T, L, B>
1389    where
1390        O: IsOrdered,
1391    {
1392        self.make_totally_ordered()
1393            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1394            .reduce(q!(|curr, new| *curr = new))
1395    }
1396
1397    /// Collects all the elements of this stream into a single [`Vec`] element.
1398    ///
1399    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1400    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1401    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1402    /// the vector at an arbitrary point in time.
1403    ///
1404    /// # Example
1405    /// ```rust
1406    /// # #[cfg(feature = "deploy")] {
1407    /// # use hydro_lang::prelude::*;
1408    /// # use futures::StreamExt;
1409    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1410    /// let tick = process.tick();
1411    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1412    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1413    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1414    /// # }, |mut stream| async move {
1415    /// // [ vec![1, 2, 3, 4] ]
1416    /// # for w in vec![vec![1, 2, 3, 4]] {
1417    /// #     assert_eq!(stream.next().await.unwrap(), w);
1418    /// # }
1419    /// # }));
1420    /// # }
1421    /// ```
1422    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1423    where
1424        O: IsOrdered,
1425        R: IsExactlyOnce,
1426    {
1427        self.make_totally_ordered().make_exactly_once().fold(
1428            q!(|| vec![]),
1429            q!(|acc, v| {
1430                acc.push(v);
1431            }),
1432        )
1433    }
1434
1435    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1436    /// and emitting each intermediate result.
1437    ///
1438    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1439    /// containing all intermediate accumulated values. The scan operation can also terminate early
1440    /// by returning `None`.
1441    ///
1442    /// The function takes a mutable reference to the accumulator and the current element, and returns
1443    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1444    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1445    ///
1446    /// # Examples
1447    ///
1448    /// Basic usage - running sum:
1449    /// ```rust
1450    /// # #[cfg(feature = "deploy")] {
1451    /// # use hydro_lang::prelude::*;
1452    /// # use futures::StreamExt;
1453    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1454    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1455    ///     q!(|| 0),
1456    ///     q!(|acc, x| {
1457    ///         *acc += x;
1458    ///         Some(*acc)
1459    ///     }),
1460    /// )
1461    /// # }, |mut stream| async move {
1462    /// // Output: 1, 3, 6, 10
1463    /// # for w in vec![1, 3, 6, 10] {
1464    /// #     assert_eq!(stream.next().await.unwrap(), w);
1465    /// # }
1466    /// # }));
1467    /// # }
1468    /// ```
1469    ///
1470    /// Early termination example:
1471    /// ```rust
1472    /// # #[cfg(feature = "deploy")] {
1473    /// # use hydro_lang::prelude::*;
1474    /// # use futures::StreamExt;
1475    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1476    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1477    ///     q!(|| 1),
1478    ///     q!(|state, x| {
1479    ///         *state = *state * x;
1480    ///         if *state > 6 {
1481    ///             None // Terminate the stream
1482    ///         } else {
1483    ///             Some(-*state)
1484    ///         }
1485    ///     }),
1486    /// )
1487    /// # }, |mut stream| async move {
1488    /// // Output: -1, -2, -6
1489    /// # for w in vec![-1, -2, -6] {
1490    /// #     assert_eq!(stream.next().await.unwrap(), w);
1491    /// # }
1492    /// # }));
1493    /// # }
1494    /// ```
1495    pub fn scan<A, U, I, F>(
1496        self,
1497        init: impl IntoQuotedMut<'a, I, L>,
1498        f: impl IntoQuotedMut<'a, F, L>,
1499    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1500    where
1501        O: IsOrdered,
1502        R: IsExactlyOnce,
1503        I: Fn() -> A + 'a,
1504        F: Fn(&mut A, T) -> Option<U> + 'a,
1505    {
1506        let init = init.splice_fn0_ctx(&self.location).into();
1507        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1508
1509        Stream::new(
1510            self.location.clone(),
1511            HydroNode::Scan {
1512                init,
1513                acc: f,
1514                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1515                metadata: self.location.new_node_metadata(
1516                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1517                ),
1518            },
1519        )
1520    }
1521
1522    /// Iteratively processes the elements of the stream using a state machine that can yield
1523    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1524    /// syntax in Rust, without requiring special syntax.
1525    ///
1526    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1527    /// state. The second argument defines the processing logic, taking in a mutable reference
1528    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1529    /// variants define what is emitted and whether further inputs should be processed.
1530    ///
1531    /// # Example
1532    /// ```rust
1533    /// # #[cfg(feature = "deploy")] {
1534    /// # use hydro_lang::prelude::*;
1535    /// # use futures::StreamExt;
1536    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1537    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1538    ///     q!(|| 0),
1539    ///     q!(|acc, x| {
1540    ///         *acc += x;
1541    ///         if *acc > 100 {
1542    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1543    ///         } else if *acc % 2 == 0 {
1544    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1545    ///         } else {
1546    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1547    ///         }
1548    ///     }),
1549    /// )
1550    /// # }, |mut stream| async move {
1551    /// // Output: "even", "done!"
1552    /// # let mut results = Vec::new();
1553    /// # for _ in 0..2 {
1554    /// #     results.push(stream.next().await.unwrap());
1555    /// # }
1556    /// # results.sort();
1557    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1558    /// # }));
1559    /// # }
1560    /// ```
1561    pub fn generator<A, U, I, F>(
1562        self,
1563        init: impl IntoQuotedMut<'a, I, L> + Copy,
1564        f: impl IntoQuotedMut<'a, F, L> + Copy,
1565    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1566    where
1567        O: IsOrdered,
1568        R: IsExactlyOnce,
1569        I: Fn() -> A + 'a,
1570        F: Fn(&mut A, T) -> Generate<U> + 'a,
1571    {
1572        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1573        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1574
1575        let this = self.make_totally_ordered().make_exactly_once();
1576
1577        // State is Option<Option<A>>:
1578        //   None = not yet initialized
1579        //   Some(Some(a)) = active with state a
1580        //   Some(None) = terminated
1581        let scan_init = q!(|| None)
1582            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1583            .into();
1584        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1585            if state.is_none() {
1586                *state = Some(Some(init()));
1587            }
1588            match state {
1589                Some(Some(state_value)) => match f(state_value, v) {
1590                    Generate::Yield(out) => Some(Some(out)),
1591                    Generate::Return(out) => {
1592                        *state = Some(None);
1593                        Some(Some(out))
1594                    }
1595                    // Unlike KeyedStream, we can terminate the scan directly on
1596                    // Break/Return because there is only one state (no other keys
1597                    // that still need processing).
1598                    Generate::Break => None,
1599                    Generate::Continue => Some(None),
1600                },
1601                // State is Some(None) after Return; terminate the scan.
1602                _ => None,
1603            }
1604        })
1605        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1606        .into();
1607
1608        let scan_node = HydroNode::Scan {
1609            init: scan_init,
1610            acc: scan_f,
1611            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1612            metadata: this.location.new_node_metadata(Stream::<
1613                Option<U>,
1614                L,
1615                B,
1616                TotalOrder,
1617                ExactlyOnce,
1618            >::collection_kind()),
1619        };
1620
1621        let flatten_f = q!(|d| d)
1622            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1623            .into();
1624        let flatten_node = HydroNode::FlatMap {
1625            f: flatten_f,
1626            input: Box::new(scan_node),
1627            metadata: this
1628                .location
1629                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1630        };
1631
1632        Stream::new(this.location.clone(), flatten_node)
1633    }
1634
1635    /// Given a time interval, returns a stream corresponding to samples taken from the
1636    /// stream roughly at that interval. The output will have elements in the same order
1637    /// as the input, but with arbitrary elements skipped between samples. There is also
1638    /// no guarantee on the exact timing of the samples.
1639    ///
1640    /// # Non-Determinism
1641    /// The output stream is non-deterministic in which elements are sampled, since this
1642    /// is controlled by a clock.
1643    pub fn sample_every(
1644        self,
1645        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1646        nondet: NonDet,
1647    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1648    where
1649        L: NoTick + NoAtomic,
1650    {
1651        let samples = self.location.source_interval(interval, nondet);
1652
1653        let tick = self.location.tick();
1654        self.batch(&tick, nondet)
1655            .filter_if(samples.batch(&tick, nondet).first().is_some())
1656            .all_ticks()
1657            .weaken_retries()
1658    }
1659
1660    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1661    /// stream has not emitted a value since that duration.
1662    ///
1663    /// # Non-Determinism
1664    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1665    /// samples take place, timeouts may be non-deterministically generated or missed,
1666    /// and the notification of the timeout may be delayed as well. There is also no
1667    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1668    /// detected based on when the next sample is taken.
1669    pub fn timeout(
1670        self,
1671        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1672        nondet: NonDet,
1673    ) -> Optional<(), L, Unbounded>
1674    where
1675        L: NoTick + NoAtomic,
1676    {
1677        let tick = self.location.tick();
1678
1679        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1680            q!(|| None),
1681            q!(
1682                |latest, _| {
1683                    *latest = Some(Instant::now());
1684                },
1685                commutative = manual_proof!(/** TODO */)
1686            ),
1687        );
1688
1689        latest_received
1690            .snapshot(&tick, nondet)
1691            .filter_map(q!(move |latest_received| {
1692                if let Some(latest_received) = latest_received {
1693                    if Instant::now().duration_since(latest_received) > duration {
1694                        Some(())
1695                    } else {
1696                        None
1697                    }
1698                } else {
1699                    Some(())
1700                }
1701            }))
1702            .latest()
1703    }
1704
1705    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1706    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1707    ///
1708    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1709    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1710    /// argument that declares where the stream will be atomically processed. Batching a stream into
1711    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1712    /// [`Tick`] will introduce asynchrony.
1713    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
1714        let out_location = Atomic { tick: tick.clone() };
1715        Stream::new(
1716            out_location.clone(),
1717            HydroNode::BeginAtomic {
1718                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1719                metadata: out_location
1720                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1721            },
1722        )
1723    }
1724
1725    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1726    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1727    /// the order of the input. The output stream will execute in the [`Tick`] that was
1728    /// used to create the atomic section.
1729    ///
1730    /// # Non-Determinism
1731    /// The batch boundaries are non-deterministic and may change across executions.
1732    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1733        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1734        Stream::new(
1735            tick.clone(),
1736            HydroNode::Batch {
1737                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1738                metadata: tick
1739                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1740            },
1741        )
1742    }
1743
1744    /// An operator which allows you to "name" a `HydroNode`.
1745    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1746    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1747        {
1748            let mut node = self.ir_node.borrow_mut();
1749            let metadata = node.metadata_mut();
1750            metadata.tag = Some(name.to_owned());
1751        }
1752        self
1753    }
1754
1755    /// Explicitly "casts" the stream to a type with a different ordering
1756    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1757    /// by the type-system.
1758    ///
1759    /// # Non-Determinism
1760    /// This function is used as an escape hatch, and any mistakes in the
1761    /// provided ordering guarantee will propagate into the guarantees
1762    /// for the rest of the program.
1763    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1764        if O::ORDERING_KIND == O2::ORDERING_KIND {
1765            Stream::new(
1766                self.location.clone(),
1767                self.ir_node.replace(HydroNode::Placeholder),
1768            )
1769        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1770            // We can always weaken the ordering guarantee
1771            Stream::new(
1772                self.location.clone(),
1773                HydroNode::Cast {
1774                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1775                    metadata: self
1776                        .location
1777                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1778                },
1779            )
1780        } else {
1781            Stream::new(
1782                self.location.clone(),
1783                HydroNode::ObserveNonDet {
1784                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1785                    trusted: false,
1786                    metadata: self
1787                        .location
1788                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1789                },
1790            )
1791        }
1792    }
1793
1794    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1795    // intermediate states will not be revealed
1796    fn assume_ordering_trusted_bounded<O2: Ordering>(
1797        self,
1798        nondet: NonDet,
1799    ) -> Stream<T, L, B, O2, R> {
1800        if B::BOUNDED {
1801            self.assume_ordering_trusted(nondet)
1802        } else {
1803            self.assume_ordering(nondet)
1804        }
1805    }
1806
1807    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1808    // is not observable
1809    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1810        self,
1811        _nondet: NonDet,
1812    ) -> Stream<T, L, B, O2, R> {
1813        if O::ORDERING_KIND == O2::ORDERING_KIND {
1814            Stream::new(
1815                self.location.clone(),
1816                self.ir_node.replace(HydroNode::Placeholder),
1817            )
1818        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1819            // We can always weaken the ordering guarantee
1820            Stream::new(
1821                self.location.clone(),
1822                HydroNode::Cast {
1823                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1824                    metadata: self
1825                        .location
1826                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1827                },
1828            )
1829        } else {
1830            Stream::new(
1831                self.location.clone(),
1832                HydroNode::ObserveNonDet {
1833                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1834                    trusted: true,
1835                    metadata: self
1836                        .location
1837                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1838                },
1839            )
1840        }
1841    }
1842
1843    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1844    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1845    /// which is always safe because that is the weakest possible guarantee.
1846    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1847        self.weaken_ordering::<NoOrder>()
1848    }
1849
1850    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1851    /// enforcing that `O2` is weaker than the input ordering guarantee.
1852    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1853        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1854        self.assume_ordering::<O2>(nondet)
1855    }
1856
1857    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1858    /// implies that `O == TotalOrder`.
1859    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1860    where
1861        O: IsOrdered,
1862    {
1863        self.assume_ordering(nondet!(/** no-op */))
1864    }
1865
1866    /// Explicitly "casts" the stream to a type with a different retries
1867    /// guarantee. Useful in unsafe code where the lack of retries cannot
1868    /// be proven by the type-system.
1869    ///
1870    /// # Non-Determinism
1871    /// This function is used as an escape hatch, and any mistakes in the
1872    /// provided retries guarantee will propagate into the guarantees
1873    /// for the rest of the program.
1874    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1875        if R::RETRIES_KIND == R2::RETRIES_KIND {
1876            Stream::new(
1877                self.location.clone(),
1878                self.ir_node.replace(HydroNode::Placeholder),
1879            )
1880        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1881            // We can always weaken the retries guarantee
1882            Stream::new(
1883                self.location.clone(),
1884                HydroNode::Cast {
1885                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1886                    metadata: self
1887                        .location
1888                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1889                },
1890            )
1891        } else {
1892            Stream::new(
1893                self.location.clone(),
1894                HydroNode::ObserveNonDet {
1895                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1896                    trusted: false,
1897                    metadata: self
1898                        .location
1899                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1900                },
1901            )
1902        }
1903    }
1904
1905    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1906    // is not observable
1907    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1908        if R::RETRIES_KIND == R2::RETRIES_KIND {
1909            Stream::new(
1910                self.location.clone(),
1911                self.ir_node.replace(HydroNode::Placeholder),
1912            )
1913        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1914            // We can always weaken the retries guarantee
1915            Stream::new(
1916                self.location.clone(),
1917                HydroNode::Cast {
1918                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1919                    metadata: self
1920                        .location
1921                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1922                },
1923            )
1924        } else {
1925            Stream::new(
1926                self.location.clone(),
1927                HydroNode::ObserveNonDet {
1928                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1929                    trusted: true,
1930                    metadata: self
1931                        .location
1932                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1933                },
1934            )
1935        }
1936    }
1937
1938    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1939    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1940    /// which is always safe because that is the weakest possible guarantee.
1941    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1942        self.weaken_retries::<AtLeastOnce>()
1943    }
1944
1945    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1946    /// enforcing that `R2` is weaker than the input retries guarantee.
1947    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1948        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1949        self.assume_retries::<R2>(nondet)
1950    }
1951
1952    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1953    /// implies that `R == ExactlyOnce`.
1954    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1955    where
1956        R: IsExactlyOnce,
1957    {
1958        self.assume_retries(nondet!(/** no-op */))
1959    }
1960
1961    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1962    /// implies that `B == Bounded`.
1963    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
1964    where
1965        B: IsBounded,
1966    {
1967        Stream::new(
1968            self.location.clone(),
1969            self.ir_node.replace(HydroNode::Placeholder),
1970        )
1971    }
1972}
1973
1974impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1975where
1976    L: Location<'a>,
1977{
1978    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1979    ///
1980    /// # Example
1981    /// ```rust
1982    /// # #[cfg(feature = "deploy")] {
1983    /// # use hydro_lang::prelude::*;
1984    /// # use futures::StreamExt;
1985    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1986    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1987    /// # }, |mut stream| async move {
1988    /// // 1, 2, 3
1989    /// # for w in vec![1, 2, 3] {
1990    /// #     assert_eq!(stream.next().await.unwrap(), w);
1991    /// # }
1992    /// # }));
1993    /// # }
1994    /// ```
1995    pub fn cloned(self) -> Stream<T, L, B, O, R>
1996    where
1997        T: Clone,
1998    {
1999        self.map(q!(|d| d.clone()))
2000    }
2001}
2002
2003impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2004where
2005    L: Location<'a>,
2006{
2007    /// Computes the number of elements in the stream as a [`Singleton`].
2008    ///
2009    /// # Example
2010    /// ```rust
2011    /// # #[cfg(feature = "deploy")] {
2012    /// # use hydro_lang::prelude::*;
2013    /// # use futures::StreamExt;
2014    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2015    /// let tick = process.tick();
2016    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2017    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2018    /// batch.count().all_ticks()
2019    /// # }, |mut stream| async move {
2020    /// // 4
2021    /// # assert_eq!(stream.next().await.unwrap(), 4);
2022    /// # }));
2023    /// # }
2024    /// ```
2025    pub fn count(self) -> Singleton<usize, L, B> {
2026        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2027            /// Order does not affect eventual count, and also does not affect intermediate states.
2028        ))
2029        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
2030    }
2031}
2032
2033impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2034    /// Produces a new stream that merges the elements of the two input streams.
2035    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2036    ///
2037    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2038    /// [`Bounded`], you can use [`Stream::chain`] instead.
2039    ///
2040    /// # Example
2041    /// ```rust
2042    /// # #[cfg(feature = "deploy")] {
2043    /// # use hydro_lang::prelude::*;
2044    /// # use futures::StreamExt;
2045    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2046    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2047    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2048    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2049    /// # }, |mut stream| async move {
2050    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2051    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2052    /// #     assert_eq!(stream.next().await.unwrap(), w);
2053    /// # }
2054    /// # }));
2055    /// # }
2056    /// ```
2057    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2058        self,
2059        other: Stream<T, L, Unbounded, O2, R2>,
2060    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2061    where
2062        R: MinRetries<R2>,
2063    {
2064        Stream::new(
2065            self.location.clone(),
2066            HydroNode::Chain {
2067                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2068                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2069                metadata: self.location.new_node_metadata(Stream::<
2070                    T,
2071                    L,
2072                    Unbounded,
2073                    NoOrder,
2074                    <R as MinRetries<R2>>::Min,
2075                >::collection_kind()),
2076            },
2077        )
2078    }
2079
2080    /// Deprecated: use [`Stream::merge_unordered`] instead.
2081    #[deprecated(note = "use `merge_unordered` instead")]
2082    pub fn interleave<O2: Ordering, R2: Retries>(
2083        self,
2084        other: Stream<T, L, Unbounded, O2, R2>,
2085    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2086    where
2087        R: MinRetries<R2>,
2088    {
2089        self.merge_unordered(other)
2090    }
2091}
2092
2093impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2094    /// Produces a new stream that combines the elements of the two input streams,
2095    /// preserving the relative order of elements within each input.
2096    ///
2097    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2098    /// [`Bounded`], you can use [`Stream::chain`] instead.
2099    ///
2100    /// # Non-Determinism
2101    /// The order in which elements *across* the two streams will be interleaved is
2102    /// non-deterministic, so the order of elements will vary across runs. If the output order
2103    /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2104    /// unordered stream.
2105    ///
2106    /// # Example
2107    /// ```rust
2108    /// # #[cfg(feature = "deploy")] {
2109    /// # use hydro_lang::prelude::*;
2110    /// # use futures::StreamExt;
2111    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2112    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2113    /// # process.source_iter(q!(vec![1, 3])).into();
2114    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2115    /// # }, |mut stream| async move {
2116    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2117    /// # for w in vec![1, 3, 2, 4] {
2118    /// #     assert_eq!(stream.next().await.unwrap(), w);
2119    /// # }
2120    /// # }));
2121    /// # }
2122    /// ```
2123    pub fn merge_ordered<R2: Retries>(
2124        self,
2125        other: Stream<T, L, Unbounded, TotalOrder, R2>,
2126        _nondet: NonDet,
2127    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2128    where
2129        R: MinRetries<R2>,
2130    {
2131        Stream::new(
2132            self.location.clone(),
2133            HydroNode::Chain {
2134                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2135                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2136                metadata: self.location.new_node_metadata(Stream::<
2137                    T,
2138                    L,
2139                    Unbounded,
2140                    TotalOrder,
2141                    <R as MinRetries<R2>>::Min,
2142                >::collection_kind()),
2143            },
2144        )
2145    }
2146}
2147
2148impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2149where
2150    L: Location<'a>,
2151{
2152    /// Produces a new stream that emits the input elements in sorted order.
2153    ///
2154    /// The input stream can have any ordering guarantee, but the output stream
2155    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2156    /// elements in the input stream are available, so it requires the input stream
2157    /// to be [`Bounded`].
2158    ///
2159    /// # Example
2160    /// ```rust
2161    /// # #[cfg(feature = "deploy")] {
2162    /// # use hydro_lang::prelude::*;
2163    /// # use futures::StreamExt;
2164    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2165    /// let tick = process.tick();
2166    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2167    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2168    /// batch.sort().all_ticks()
2169    /// # }, |mut stream| async move {
2170    /// // 1, 2, 3, 4
2171    /// # for w in (1..5) {
2172    /// #     assert_eq!(stream.next().await.unwrap(), w);
2173    /// # }
2174    /// # }));
2175    /// # }
2176    /// ```
2177    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2178    where
2179        B: IsBounded,
2180        T: Ord,
2181    {
2182        let this = self.make_bounded();
2183        Stream::new(
2184            this.location.clone(),
2185            HydroNode::Sort {
2186                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2187                metadata: this
2188                    .location
2189                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2190            },
2191        )
2192    }
2193
2194    /// Produces a new stream that first emits the elements of the `self` stream,
2195    /// and then emits the elements of the `other` stream. The output stream has
2196    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2197    /// [`TotalOrder`] guarantee.
2198    ///
2199    /// Currently, both input streams must be [`Bounded`]. This operator will block
2200    /// on the first stream until all its elements are available. In a future version,
2201    /// we will relax the requirement on the `other` stream.
2202    ///
2203    /// # Example
2204    /// ```rust
2205    /// # #[cfg(feature = "deploy")] {
2206    /// # use hydro_lang::prelude::*;
2207    /// # use futures::StreamExt;
2208    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2209    /// let tick = process.tick();
2210    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2211    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2212    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2213    /// # }, |mut stream| async move {
2214    /// // 2, 3, 4, 5, 1, 2, 3, 4
2215    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2216    /// #     assert_eq!(stream.next().await.unwrap(), w);
2217    /// # }
2218    /// # }));
2219    /// # }
2220    /// ```
2221    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2222        self,
2223        other: Stream<T, L, B2, O2, R2>,
2224    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2225    where
2226        B: IsBounded,
2227        O: MinOrder<O2>,
2228        R: MinRetries<R2>,
2229    {
2230        check_matching_location(&self.location, &other.location);
2231
2232        Stream::new(
2233            self.location.clone(),
2234            HydroNode::Chain {
2235                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2236                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2237                metadata: self.location.new_node_metadata(Stream::<
2238                    T,
2239                    L,
2240                    B2,
2241                    <O as MinOrder<O2>>::Min,
2242                    <R as MinRetries<R2>>::Min,
2243                >::collection_kind()),
2244            },
2245        )
2246    }
2247
2248    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2249    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2250    /// because this is compiled into a nested loop.
2251    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2252        self,
2253        other: Stream<T2, L, Bounded, O2, R>,
2254    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2255    where
2256        B: IsBounded,
2257        T: Clone,
2258        T2: Clone,
2259    {
2260        let this = self.make_bounded();
2261        check_matching_location(&this.location, &other.location);
2262
2263        Stream::new(
2264            this.location.clone(),
2265            HydroNode::CrossProduct {
2266                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2267                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2268                metadata: this.location.new_node_metadata(Stream::<
2269                    (T, T2),
2270                    L,
2271                    Bounded,
2272                    <O2 as MinOrder<O>>::Min,
2273                    R,
2274                >::collection_kind()),
2275            },
2276        )
2277    }
2278
2279    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2280    /// `self` used as the values for *each* key.
2281    ///
2282    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2283    /// values. For example, it can be used to send the same set of elements to several cluster
2284    /// members, if the membership information is available as a [`KeyedSingleton`].
2285    ///
2286    /// # Example
2287    /// ```rust
2288    /// # #[cfg(feature = "deploy")] {
2289    /// # use hydro_lang::prelude::*;
2290    /// # use futures::StreamExt;
2291    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2292    /// # let tick = process.tick();
2293    /// let keyed_singleton = // { 1: (), 2: () }
2294    /// # process
2295    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2296    /// #     .into_keyed()
2297    /// #     .batch(&tick, nondet!(/** test */))
2298    /// #     .first();
2299    /// let stream = // [ "a", "b" ]
2300    /// # process
2301    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2302    /// #     .batch(&tick, nondet!(/** test */));
2303    /// stream.repeat_with_keys(keyed_singleton)
2304    /// # .entries().all_ticks()
2305    /// # }, |mut stream| async move {
2306    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2307    /// # let mut results = Vec::new();
2308    /// # for _ in 0..4 {
2309    /// #     results.push(stream.next().await.unwrap());
2310    /// # }
2311    /// # results.sort();
2312    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2313    /// # }));
2314    /// # }
2315    /// ```
2316    pub fn repeat_with_keys<K, V2>(
2317        self,
2318        keys: KeyedSingleton<K, V2, L, Bounded>,
2319    ) -> KeyedStream<K, T, L, Bounded, O, R>
2320    where
2321        B: IsBounded,
2322        K: Clone,
2323        T: Clone,
2324    {
2325        keys.keys()
2326            .weaken_retries()
2327            .assume_ordering_trusted::<TotalOrder>(
2328                nondet!(/** keyed stream does not depend on ordering of keys */),
2329            )
2330            .cross_product_nested_loop(self.make_bounded())
2331            .into_keyed()
2332    }
2333}
2334
2335impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2336where
2337    L: Location<'a>,
2338{
2339    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2340    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2341    /// by equi-joining the two streams on the key attribute `K`.
2342    ///
2343    /// # Example
2344    /// ```rust
2345    /// # #[cfg(feature = "deploy")] {
2346    /// # use hydro_lang::prelude::*;
2347    /// # use std::collections::HashSet;
2348    /// # use futures::StreamExt;
2349    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2350    /// let tick = process.tick();
2351    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2352    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2353    /// stream1.join(stream2)
2354    /// # }, |mut stream| async move {
2355    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2356    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2357    /// # stream.map(|i| assert!(expected.contains(&i)));
2358    /// # }));
2359    /// # }
2360    pub fn join<V2, O2: Ordering, R2: Retries>(
2361        self,
2362        n: Stream<(K, V2), L, B, O2, R2>,
2363    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2364    where
2365        K: Eq + Hash,
2366        R: MinRetries<R2>,
2367    {
2368        check_matching_location(&self.location, &n.location);
2369
2370        Stream::new(
2371            self.location.clone(),
2372            HydroNode::Join {
2373                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2374                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2375                metadata: self.location.new_node_metadata(Stream::<
2376                    (K, (V1, V2)),
2377                    L,
2378                    B,
2379                    NoOrder,
2380                    <R as MinRetries<R2>>::Min,
2381                >::collection_kind()),
2382            },
2383        )
2384    }
2385
2386    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2387    /// computes the anti-join of the items in the input -- i.e. returns
2388    /// unique items in the first input that do not have a matching key
2389    /// in the second input.
2390    ///
2391    /// # Example
2392    /// ```rust
2393    /// # #[cfg(feature = "deploy")] {
2394    /// # use hydro_lang::prelude::*;
2395    /// # use futures::StreamExt;
2396    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2397    /// let tick = process.tick();
2398    /// let stream = process
2399    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2400    ///   .batch(&tick, nondet!(/** test */));
2401    /// let batch = process
2402    ///   .source_iter(q!(vec![1, 2]))
2403    ///   .batch(&tick, nondet!(/** test */));
2404    /// stream.anti_join(batch).all_ticks()
2405    /// # }, |mut stream| async move {
2406    /// # for w in vec![(3, 'c'), (4, 'd')] {
2407    /// #     assert_eq!(stream.next().await.unwrap(), w);
2408    /// # }
2409    /// # }));
2410    /// # }
2411    pub fn anti_join<O2: Ordering, R2: Retries>(
2412        self,
2413        n: Stream<K, L, Bounded, O2, R2>,
2414    ) -> Stream<(K, V1), L, B, O, R>
2415    where
2416        K: Eq + Hash,
2417    {
2418        check_matching_location(&self.location, &n.location);
2419
2420        Stream::new(
2421            self.location.clone(),
2422            HydroNode::AntiJoin {
2423                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2424                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2425                metadata: self
2426                    .location
2427                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2428            },
2429        )
2430    }
2431}
2432
2433impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2434    Stream<(K, V), L, B, O, R>
2435{
2436    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2437    /// is used as the key and the second element is added to the entries associated with that key.
2438    ///
2439    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2440    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2441    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2442    /// total ordering _within_ each group but no ordering _across_ groups.
2443    ///
2444    /// # Example
2445    /// ```rust
2446    /// # #[cfg(feature = "deploy")] {
2447    /// # use hydro_lang::prelude::*;
2448    /// # use futures::StreamExt;
2449    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2450    /// process
2451    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2452    ///     .into_keyed()
2453    /// #   .entries()
2454    /// # }, |mut stream| async move {
2455    /// // { 1: [2, 3], 2: [4] }
2456    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2457    /// #     assert_eq!(stream.next().await.unwrap(), w);
2458    /// # }
2459    /// # }));
2460    /// # }
2461    /// ```
2462    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2463        KeyedStream::new(
2464            self.location.clone(),
2465            HydroNode::Cast {
2466                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2467                metadata: self
2468                    .location
2469                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2470            },
2471        )
2472    }
2473}
2474
2475impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2476where
2477    K: Eq + Hash,
2478    L: Location<'a>,
2479{
2480    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2481    /// # Example
2482    /// ```rust
2483    /// # #[cfg(feature = "deploy")] {
2484    /// # use hydro_lang::prelude::*;
2485    /// # use futures::StreamExt;
2486    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2487    /// let tick = process.tick();
2488    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2489    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2490    /// batch.keys().all_ticks()
2491    /// # }, |mut stream| async move {
2492    /// // 1, 2
2493    /// # assert_eq!(stream.next().await.unwrap(), 1);
2494    /// # assert_eq!(stream.next().await.unwrap(), 2);
2495    /// # }));
2496    /// # }
2497    /// ```
2498    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2499        self.into_keyed()
2500            .fold(
2501                q!(|| ()),
2502                q!(
2503                    |_, _| {},
2504                    commutative = manual_proof!(/** values are ignored */),
2505                    idempotent = manual_proof!(/** values are ignored */)
2506                ),
2507            )
2508            .keys()
2509    }
2510}
2511
2512impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2513where
2514    L: Location<'a> + NoTick,
2515{
2516    /// Returns a stream corresponding to the latest batch of elements being atomically
2517    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2518    /// the order of the input.
2519    ///
2520    /// # Non-Determinism
2521    /// The batch boundaries are non-deterministic and may change across executions.
2522    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2523        Stream::new(
2524            self.location.clone().tick,
2525            HydroNode::Batch {
2526                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2527                metadata: self
2528                    .location
2529                    .tick
2530                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2531            },
2532        )
2533    }
2534
2535    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2536    /// See [`Stream::atomic`] for more details.
2537    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2538        Stream::new(
2539            self.location.tick.l.clone(),
2540            HydroNode::EndAtomic {
2541                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2542                metadata: self
2543                    .location
2544                    .tick
2545                    .l
2546                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2547            },
2548        )
2549    }
2550}
2551
2552impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2553where
2554    L: Location<'a> + NoTick + NoAtomic,
2555    F: Future<Output = T>,
2556{
2557    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2558    /// Future outputs are produced as available, regardless of input arrival order.
2559    ///
2560    /// # Example
2561    /// ```rust
2562    /// # #[cfg(feature = "deploy")] {
2563    /// # use std::collections::HashSet;
2564    /// # use futures::StreamExt;
2565    /// # use hydro_lang::prelude::*;
2566    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2567    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2568    ///     .map(q!(|x| async move {
2569    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2570    ///         x
2571    ///     }))
2572    ///     .resolve_futures()
2573    /// #   },
2574    /// #   |mut stream| async move {
2575    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2576    /// #       let mut output = HashSet::new();
2577    /// #       for _ in 1..10 {
2578    /// #           output.insert(stream.next().await.unwrap());
2579    /// #       }
2580    /// #       assert_eq!(
2581    /// #           output,
2582    /// #           HashSet::<i32>::from_iter(1..10)
2583    /// #       );
2584    /// #   },
2585    /// # ));
2586    /// # }
2587    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2588        Stream::new(
2589            self.location.clone(),
2590            HydroNode::ResolveFutures {
2591                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2592                metadata: self
2593                    .location
2594                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2595            },
2596        )
2597    }
2598
2599    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2600    /// Future outputs are produced in the same order as the input stream.
2601    ///
2602    /// # Example
2603    /// ```rust
2604    /// # #[cfg(feature = "deploy")] {
2605    /// # use std::collections::HashSet;
2606    /// # use futures::StreamExt;
2607    /// # use hydro_lang::prelude::*;
2608    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2609    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2610    ///     .map(q!(|x| async move {
2611    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2612    ///         x
2613    ///     }))
2614    ///     .resolve_futures_ordered()
2615    /// #   },
2616    /// #   |mut stream| async move {
2617    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2618    /// #       let mut output = Vec::new();
2619    /// #       for _ in 1..10 {
2620    /// #           output.push(stream.next().await.unwrap());
2621    /// #       }
2622    /// #       assert_eq!(
2623    /// #           output,
2624    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2625    /// #       );
2626    /// #   },
2627    /// # ));
2628    /// # }
2629    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2630        Stream::new(
2631            self.location.clone(),
2632            HydroNode::ResolveFuturesOrdered {
2633                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2634                metadata: self
2635                    .location
2636                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2637            },
2638        )
2639    }
2640}
2641
2642impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2643where
2644    L: Location<'a>,
2645{
2646    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2647    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2648    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2649        Stream::new(
2650            self.location.outer().clone(),
2651            HydroNode::YieldConcat {
2652                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2653                metadata: self
2654                    .location
2655                    .outer()
2656                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2657            },
2658        )
2659    }
2660
2661    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2662    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2663    ///
2664    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2665    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2666    /// stream's [`Tick`] context.
2667    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2668        let out_location = Atomic {
2669            tick: self.location.clone(),
2670        };
2671
2672        Stream::new(
2673            out_location.clone(),
2674            HydroNode::YieldConcat {
2675                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2676                metadata: out_location
2677                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2678            },
2679        )
2680    }
2681
2682    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2683    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2684    /// input.
2685    ///
2686    /// This API is particularly useful for stateful computation on batches of data, such as
2687    /// maintaining an accumulated state that is up to date with the current batch.
2688    ///
2689    /// # Example
2690    /// ```rust
2691    /// # #[cfg(feature = "deploy")] {
2692    /// # use hydro_lang::prelude::*;
2693    /// # use futures::StreamExt;
2694    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2695    /// let tick = process.tick();
2696    /// # // ticks are lazy by default, forces the second tick to run
2697    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2698    /// # let batch_first_tick = process
2699    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2700    /// #  .batch(&tick, nondet!(/** test */));
2701    /// # let batch_second_tick = process
2702    /// #   .source_iter(q!(vec![5, 6, 7]))
2703    /// #   .batch(&tick, nondet!(/** test */))
2704    /// #   .defer_tick(); // appears on the second tick
2705    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2706    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2707    ///
2708    /// input.batch(&tick, nondet!(/** test */))
2709    ///     .across_ticks(|s| s.count()).all_ticks()
2710    /// # }, |mut stream| async move {
2711    /// // [4, 7]
2712    /// assert_eq!(stream.next().await.unwrap(), 4);
2713    /// assert_eq!(stream.next().await.unwrap(), 7);
2714    /// # }));
2715    /// # }
2716    /// ```
2717    pub fn across_ticks<Out: BatchAtomic>(
2718        self,
2719        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2720    ) -> Out::Batched {
2721        thunk(self.all_ticks_atomic()).batched_atomic()
2722    }
2723
2724    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2725    /// always has the elements of `self` at tick `T - 1`.
2726    ///
2727    /// At tick `0`, the output stream is empty, since there is no previous tick.
2728    ///
2729    /// This operator enables stateful iterative processing with ticks, by sending data from one
2730    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2731    ///
2732    /// # Example
2733    /// ```rust
2734    /// # #[cfg(feature = "deploy")] {
2735    /// # use hydro_lang::prelude::*;
2736    /// # use futures::StreamExt;
2737    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2738    /// let tick = process.tick();
2739    /// // ticks are lazy by default, forces the second tick to run
2740    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2741    ///
2742    /// let batch_first_tick = process
2743    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2744    ///   .batch(&tick, nondet!(/** test */));
2745    /// let batch_second_tick = process
2746    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2747    ///   .batch(&tick, nondet!(/** test */))
2748    ///   .defer_tick(); // appears on the second tick
2749    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2750    ///
2751    /// changes_across_ticks.clone().filter_not_in(
2752    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2753    /// ).all_ticks()
2754    /// # }, |mut stream| async move {
2755    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2756    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2757    /// #     assert_eq!(stream.next().await.unwrap(), w);
2758    /// # }
2759    /// # }));
2760    /// # }
2761    /// ```
2762    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2763        Stream::new(
2764            self.location.clone(),
2765            HydroNode::DeferTick {
2766                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2767                metadata: self
2768                    .location
2769                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2770            },
2771        )
2772    }
2773}
2774
2775#[cfg(test)]
2776mod tests {
2777    #[cfg(feature = "deploy")]
2778    use futures::{SinkExt, StreamExt};
2779    #[cfg(feature = "deploy")]
2780    use hydro_deploy::Deployment;
2781    #[cfg(feature = "deploy")]
2782    use serde::{Deserialize, Serialize};
2783    #[cfg(any(feature = "deploy", feature = "sim"))]
2784    use stageleft::q;
2785
2786    #[cfg(any(feature = "deploy", feature = "sim"))]
2787    use crate::compile::builder::FlowBuilder;
2788    #[cfg(feature = "deploy")]
2789    use crate::live_collections::sliced::sliced;
2790    #[cfg(feature = "deploy")]
2791    use crate::live_collections::stream::ExactlyOnce;
2792    #[cfg(feature = "sim")]
2793    use crate::live_collections::stream::NoOrder;
2794    #[cfg(any(feature = "deploy", feature = "sim"))]
2795    use crate::live_collections::stream::TotalOrder;
2796    #[cfg(any(feature = "deploy", feature = "sim"))]
2797    use crate::location::Location;
2798    #[cfg(any(feature = "deploy", feature = "sim"))]
2799    use crate::nondet::nondet;
2800
2801    mod backtrace_chained_ops;
2802
2803    #[cfg(feature = "deploy")]
2804    struct P1 {}
2805    #[cfg(feature = "deploy")]
2806    struct P2 {}
2807
2808    #[cfg(feature = "deploy")]
2809    #[derive(Serialize, Deserialize, Debug)]
2810    struct SendOverNetwork {
2811        n: u32,
2812    }
2813
2814    #[cfg(feature = "deploy")]
2815    #[tokio::test]
2816    async fn first_ten_distributed() {
2817        use crate::networking::TCP;
2818
2819        let mut deployment = Deployment::new();
2820
2821        let mut flow = FlowBuilder::new();
2822        let first_node = flow.process::<P1>();
2823        let second_node = flow.process::<P2>();
2824        let external = flow.external::<P2>();
2825
2826        let numbers = first_node.source_iter(q!(0..10));
2827        let out_port = numbers
2828            .map(q!(|n| SendOverNetwork { n }))
2829            .send(&second_node, TCP.fail_stop().bincode())
2830            .send_bincode_external(&external);
2831
2832        let nodes = flow
2833            .with_process(&first_node, deployment.Localhost())
2834            .with_process(&second_node, deployment.Localhost())
2835            .with_external(&external, deployment.Localhost())
2836            .deploy(&mut deployment);
2837
2838        deployment.deploy().await.unwrap();
2839
2840        let mut external_out = nodes.connect(out_port).await;
2841
2842        deployment.start().await.unwrap();
2843
2844        for i in 0..10 {
2845            assert_eq!(external_out.next().await.unwrap().n, i);
2846        }
2847    }
2848
2849    #[cfg(feature = "deploy")]
2850    #[tokio::test]
2851    async fn first_cardinality() {
2852        let mut deployment = Deployment::new();
2853
2854        let mut flow = FlowBuilder::new();
2855        let node = flow.process::<()>();
2856        let external = flow.external::<()>();
2857
2858        let node_tick = node.tick();
2859        let count = node_tick
2860            .singleton(q!([1, 2, 3]))
2861            .into_stream()
2862            .flatten_ordered()
2863            .first()
2864            .into_stream()
2865            .count()
2866            .all_ticks()
2867            .send_bincode_external(&external);
2868
2869        let nodes = flow
2870            .with_process(&node, deployment.Localhost())
2871            .with_external(&external, deployment.Localhost())
2872            .deploy(&mut deployment);
2873
2874        deployment.deploy().await.unwrap();
2875
2876        let mut external_out = nodes.connect(count).await;
2877
2878        deployment.start().await.unwrap();
2879
2880        assert_eq!(external_out.next().await.unwrap(), 1);
2881    }
2882
2883    #[cfg(feature = "deploy")]
2884    #[tokio::test]
2885    async fn unbounded_reduce_remembers_state() {
2886        let mut deployment = Deployment::new();
2887
2888        let mut flow = FlowBuilder::new();
2889        let node = flow.process::<()>();
2890        let external = flow.external::<()>();
2891
2892        let (input_port, input) = node.source_external_bincode(&external);
2893        let out = input
2894            .reduce(q!(|acc, v| *acc += v))
2895            .sample_eager(nondet!(/** test */))
2896            .send_bincode_external(&external);
2897
2898        let nodes = flow
2899            .with_process(&node, deployment.Localhost())
2900            .with_external(&external, deployment.Localhost())
2901            .deploy(&mut deployment);
2902
2903        deployment.deploy().await.unwrap();
2904
2905        let mut external_in = nodes.connect(input_port).await;
2906        let mut external_out = nodes.connect(out).await;
2907
2908        deployment.start().await.unwrap();
2909
2910        external_in.send(1).await.unwrap();
2911        assert_eq!(external_out.next().await.unwrap(), 1);
2912
2913        external_in.send(2).await.unwrap();
2914        assert_eq!(external_out.next().await.unwrap(), 3);
2915    }
2916
2917    #[cfg(feature = "deploy")]
2918    #[tokio::test]
2919    async fn top_level_bounded_cross_singleton() {
2920        let mut deployment = Deployment::new();
2921
2922        let mut flow = FlowBuilder::new();
2923        let node = flow.process::<()>();
2924        let external = flow.external::<()>();
2925
2926        let (input_port, input) =
2927            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2928
2929        let out = input
2930            .cross_singleton(
2931                node.source_iter(q!(vec![1, 2, 3]))
2932                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2933            )
2934            .send_bincode_external(&external);
2935
2936        let nodes = flow
2937            .with_process(&node, deployment.Localhost())
2938            .with_external(&external, deployment.Localhost())
2939            .deploy(&mut deployment);
2940
2941        deployment.deploy().await.unwrap();
2942
2943        let mut external_in = nodes.connect(input_port).await;
2944        let mut external_out = nodes.connect(out).await;
2945
2946        deployment.start().await.unwrap();
2947
2948        external_in.send(1).await.unwrap();
2949        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2950
2951        external_in.send(2).await.unwrap();
2952        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2953    }
2954
2955    #[cfg(feature = "deploy")]
2956    #[tokio::test]
2957    async fn top_level_bounded_reduce_cardinality() {
2958        let mut deployment = Deployment::new();
2959
2960        let mut flow = FlowBuilder::new();
2961        let node = flow.process::<()>();
2962        let external = flow.external::<()>();
2963
2964        let (input_port, input) =
2965            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2966
2967        let out = sliced! {
2968            let input = use(input, nondet!(/** test */));
2969            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2970            input.cross_singleton(v.into_stream().count())
2971        }
2972        .send_bincode_external(&external);
2973
2974        let nodes = flow
2975            .with_process(&node, deployment.Localhost())
2976            .with_external(&external, deployment.Localhost())
2977            .deploy(&mut deployment);
2978
2979        deployment.deploy().await.unwrap();
2980
2981        let mut external_in = nodes.connect(input_port).await;
2982        let mut external_out = nodes.connect(out).await;
2983
2984        deployment.start().await.unwrap();
2985
2986        external_in.send(1).await.unwrap();
2987        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2988
2989        external_in.send(2).await.unwrap();
2990        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2991    }
2992
2993    #[cfg(feature = "deploy")]
2994    #[tokio::test]
2995    async fn top_level_bounded_into_singleton_cardinality() {
2996        let mut deployment = Deployment::new();
2997
2998        let mut flow = FlowBuilder::new();
2999        let node = flow.process::<()>();
3000        let external = flow.external::<()>();
3001
3002        let (input_port, input) =
3003            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3004
3005        let out = sliced! {
3006            let input = use(input, nondet!(/** test */));
3007            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3008            input.cross_singleton(v.into_stream().count())
3009        }
3010        .send_bincode_external(&external);
3011
3012        let nodes = flow
3013            .with_process(&node, deployment.Localhost())
3014            .with_external(&external, deployment.Localhost())
3015            .deploy(&mut deployment);
3016
3017        deployment.deploy().await.unwrap();
3018
3019        let mut external_in = nodes.connect(input_port).await;
3020        let mut external_out = nodes.connect(out).await;
3021
3022        deployment.start().await.unwrap();
3023
3024        external_in.send(1).await.unwrap();
3025        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3026
3027        external_in.send(2).await.unwrap();
3028        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3029    }
3030
3031    #[cfg(feature = "deploy")]
3032    #[tokio::test]
3033    async fn atomic_fold_replays_each_tick() {
3034        let mut deployment = Deployment::new();
3035
3036        let mut flow = FlowBuilder::new();
3037        let node = flow.process::<()>();
3038        let external = flow.external::<()>();
3039
3040        let (input_port, input) =
3041            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3042        let tick = node.tick();
3043
3044        let out = input
3045            .batch(&tick, nondet!(/** test */))
3046            .cross_singleton(
3047                node.source_iter(q!(vec![1, 2, 3]))
3048                    .atomic(&tick)
3049                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3050                    .snapshot_atomic(nondet!(/** test */)),
3051            )
3052            .all_ticks()
3053            .send_bincode_external(&external);
3054
3055        let nodes = flow
3056            .with_process(&node, deployment.Localhost())
3057            .with_external(&external, deployment.Localhost())
3058            .deploy(&mut deployment);
3059
3060        deployment.deploy().await.unwrap();
3061
3062        let mut external_in = nodes.connect(input_port).await;
3063        let mut external_out = nodes.connect(out).await;
3064
3065        deployment.start().await.unwrap();
3066
3067        external_in.send(1).await.unwrap();
3068        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3069
3070        external_in.send(2).await.unwrap();
3071        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3072    }
3073
3074    #[cfg(feature = "deploy")]
3075    #[tokio::test]
3076    async fn unbounded_scan_remembers_state() {
3077        let mut deployment = Deployment::new();
3078
3079        let mut flow = FlowBuilder::new();
3080        let node = flow.process::<()>();
3081        let external = flow.external::<()>();
3082
3083        let (input_port, input) = node.source_external_bincode(&external);
3084        let out = input
3085            .scan(
3086                q!(|| 0),
3087                q!(|acc, v| {
3088                    *acc += v;
3089                    Some(*acc)
3090                }),
3091            )
3092            .send_bincode_external(&external);
3093
3094        let nodes = flow
3095            .with_process(&node, deployment.Localhost())
3096            .with_external(&external, deployment.Localhost())
3097            .deploy(&mut deployment);
3098
3099        deployment.deploy().await.unwrap();
3100
3101        let mut external_in = nodes.connect(input_port).await;
3102        let mut external_out = nodes.connect(out).await;
3103
3104        deployment.start().await.unwrap();
3105
3106        external_in.send(1).await.unwrap();
3107        assert_eq!(external_out.next().await.unwrap(), 1);
3108
3109        external_in.send(2).await.unwrap();
3110        assert_eq!(external_out.next().await.unwrap(), 3);
3111    }
3112
3113    #[cfg(feature = "deploy")]
3114    #[tokio::test]
3115    async fn unbounded_enumerate_remembers_state() {
3116        let mut deployment = Deployment::new();
3117
3118        let mut flow = FlowBuilder::new();
3119        let node = flow.process::<()>();
3120        let external = flow.external::<()>();
3121
3122        let (input_port, input) = node.source_external_bincode(&external);
3123        let out = input.enumerate().send_bincode_external(&external);
3124
3125        let nodes = flow
3126            .with_process(&node, deployment.Localhost())
3127            .with_external(&external, deployment.Localhost())
3128            .deploy(&mut deployment);
3129
3130        deployment.deploy().await.unwrap();
3131
3132        let mut external_in = nodes.connect(input_port).await;
3133        let mut external_out = nodes.connect(out).await;
3134
3135        deployment.start().await.unwrap();
3136
3137        external_in.send(1).await.unwrap();
3138        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3139
3140        external_in.send(2).await.unwrap();
3141        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3142    }
3143
3144    #[cfg(feature = "deploy")]
3145    #[tokio::test]
3146    async fn unbounded_unique_remembers_state() {
3147        let mut deployment = Deployment::new();
3148
3149        let mut flow = FlowBuilder::new();
3150        let node = flow.process::<()>();
3151        let external = flow.external::<()>();
3152
3153        let (input_port, input) =
3154            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3155        let out = input.unique().send_bincode_external(&external);
3156
3157        let nodes = flow
3158            .with_process(&node, deployment.Localhost())
3159            .with_external(&external, deployment.Localhost())
3160            .deploy(&mut deployment);
3161
3162        deployment.deploy().await.unwrap();
3163
3164        let mut external_in = nodes.connect(input_port).await;
3165        let mut external_out = nodes.connect(out).await;
3166
3167        deployment.start().await.unwrap();
3168
3169        external_in.send(1).await.unwrap();
3170        assert_eq!(external_out.next().await.unwrap(), 1);
3171
3172        external_in.send(2).await.unwrap();
3173        assert_eq!(external_out.next().await.unwrap(), 2);
3174
3175        external_in.send(1).await.unwrap();
3176        external_in.send(3).await.unwrap();
3177        assert_eq!(external_out.next().await.unwrap(), 3);
3178    }
3179
3180    #[cfg(feature = "sim")]
3181    #[test]
3182    #[should_panic]
3183    fn sim_batch_nondet_size() {
3184        let mut flow = FlowBuilder::new();
3185        let node = flow.process::<()>();
3186
3187        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3188
3189        let tick = node.tick();
3190        let out_recv = input
3191            .batch(&tick, nondet!(/** test */))
3192            .count()
3193            .all_ticks()
3194            .sim_output();
3195
3196        flow.sim().exhaustive(async || {
3197            in_send.send(());
3198            in_send.send(());
3199            in_send.send(());
3200
3201            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3202        });
3203    }
3204
3205    #[cfg(feature = "sim")]
3206    #[test]
3207    fn sim_batch_preserves_order() {
3208        let mut flow = FlowBuilder::new();
3209        let node = flow.process::<()>();
3210
3211        let (in_send, input) = node.sim_input();
3212
3213        let tick = node.tick();
3214        let out_recv = input
3215            .batch(&tick, nondet!(/** test */))
3216            .all_ticks()
3217            .sim_output();
3218
3219        flow.sim().exhaustive(async || {
3220            in_send.send(1);
3221            in_send.send(2);
3222            in_send.send(3);
3223
3224            out_recv.assert_yields_only([1, 2, 3]).await;
3225        });
3226    }
3227
3228    #[cfg(feature = "sim")]
3229    #[test]
3230    #[should_panic]
3231    fn sim_batch_unordered_shuffles() {
3232        let mut flow = FlowBuilder::new();
3233        let node = flow.process::<()>();
3234
3235        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3236
3237        let tick = node.tick();
3238        let batch = input.batch(&tick, nondet!(/** test */));
3239        let out_recv = batch
3240            .clone()
3241            .min()
3242            .zip(batch.max())
3243            .all_ticks()
3244            .sim_output();
3245
3246        flow.sim().exhaustive(async || {
3247            in_send.send_many_unordered([1, 2, 3]);
3248
3249            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3250                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3251            }
3252        });
3253    }
3254
3255    #[cfg(feature = "sim")]
3256    #[test]
3257    fn sim_batch_unordered_shuffles_count() {
3258        let mut flow = FlowBuilder::new();
3259        let node = flow.process::<()>();
3260
3261        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3262
3263        let tick = node.tick();
3264        let batch = input.batch(&tick, nondet!(/** test */));
3265        let out_recv = batch.all_ticks().sim_output();
3266
3267        let instance_count = flow.sim().exhaustive(async || {
3268            in_send.send_many_unordered([1, 2, 3, 4]);
3269            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3270        });
3271
3272        assert_eq!(
3273            instance_count,
3274            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3275        )
3276    }
3277
3278    #[cfg(feature = "sim")]
3279    #[test]
3280    #[should_panic]
3281    fn sim_observe_order_batched() {
3282        let mut flow = FlowBuilder::new();
3283        let node = flow.process::<()>();
3284
3285        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3286
3287        let tick = node.tick();
3288        let batch = input.batch(&tick, nondet!(/** test */));
3289        let out_recv = batch
3290            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3291            .all_ticks()
3292            .sim_output();
3293
3294        flow.sim().exhaustive(async || {
3295            in_send.send_many_unordered([1, 2, 3, 4]);
3296            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3297        });
3298    }
3299
3300    #[cfg(feature = "sim")]
3301    #[test]
3302    fn sim_observe_order_batched_count() {
3303        let mut flow = FlowBuilder::new();
3304        let node = flow.process::<()>();
3305
3306        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3307
3308        let tick = node.tick();
3309        let batch = input.batch(&tick, nondet!(/** test */));
3310        let out_recv = batch
3311            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3312            .all_ticks()
3313            .sim_output();
3314
3315        let instance_count = flow.sim().exhaustive(async || {
3316            in_send.send_many_unordered([1, 2, 3, 4]);
3317            let _ = out_recv.collect::<Vec<_>>().await;
3318        });
3319
3320        assert_eq!(
3321            instance_count,
3322            192 // 4! * 2^{4 - 1}
3323        )
3324    }
3325
3326    #[cfg(feature = "sim")]
3327    #[test]
3328    fn sim_unordered_count_instance_count() {
3329        let mut flow = FlowBuilder::new();
3330        let node = flow.process::<()>();
3331
3332        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3333
3334        let tick = node.tick();
3335        let out_recv = input
3336            .count()
3337            .snapshot(&tick, nondet!(/** test */))
3338            .all_ticks()
3339            .sim_output();
3340
3341        let instance_count = flow.sim().exhaustive(async || {
3342            in_send.send_many_unordered([1, 2, 3, 4]);
3343            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3344        });
3345
3346        assert_eq!(
3347            instance_count,
3348            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3349        )
3350    }
3351
3352    #[cfg(feature = "sim")]
3353    #[test]
3354    fn sim_top_level_assume_ordering() {
3355        let mut flow = FlowBuilder::new();
3356        let node = flow.process::<()>();
3357
3358        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3359
3360        let out_recv = input
3361            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3362            .sim_output();
3363
3364        let instance_count = flow.sim().exhaustive(async || {
3365            in_send.send_many_unordered([1, 2, 3]);
3366            let mut out = out_recv.collect::<Vec<_>>().await;
3367            out.sort();
3368            assert_eq!(out, vec![1, 2, 3]);
3369        });
3370
3371        assert_eq!(instance_count, 6)
3372    }
3373
3374    #[cfg(feature = "sim")]
3375    #[test]
3376    fn sim_top_level_assume_ordering_cycle_back() {
3377        let mut flow = FlowBuilder::new();
3378        let node = flow.process::<()>();
3379
3380        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3381
3382        let (complete_cycle_back, cycle_back) =
3383            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3384        let ordered = input
3385            .merge_unordered(cycle_back)
3386            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3387        complete_cycle_back.complete(
3388            ordered
3389                .clone()
3390                .map(q!(|v| v + 1))
3391                .filter(q!(|v| v % 2 == 1)),
3392        );
3393
3394        let out_recv = ordered.sim_output();
3395
3396        let mut saw = false;
3397        let instance_count = flow.sim().exhaustive(async || {
3398            in_send.send_many_unordered([0, 2]);
3399            let out = out_recv.collect::<Vec<_>>().await;
3400
3401            if out.starts_with(&[0, 1, 2]) {
3402                saw = true;
3403            }
3404        });
3405
3406        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3407        assert_eq!(instance_count, 6)
3408    }
3409
3410    #[cfg(feature = "sim")]
3411    #[test]
3412    fn sim_top_level_assume_ordering_cycle_back_tick() {
3413        let mut flow = FlowBuilder::new();
3414        let node = flow.process::<()>();
3415
3416        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3417
3418        let (complete_cycle_back, cycle_back) =
3419            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3420        let ordered = input
3421            .merge_unordered(cycle_back)
3422            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3423        complete_cycle_back.complete(
3424            ordered
3425                .clone()
3426                .batch(&node.tick(), nondet!(/** test */))
3427                .all_ticks()
3428                .map(q!(|v| v + 1))
3429                .filter(q!(|v| v % 2 == 1)),
3430        );
3431
3432        let out_recv = ordered.sim_output();
3433
3434        let mut saw = false;
3435        let instance_count = flow.sim().exhaustive(async || {
3436            in_send.send_many_unordered([0, 2]);
3437            let out = out_recv.collect::<Vec<_>>().await;
3438
3439            if out.starts_with(&[0, 1, 2]) {
3440                saw = true;
3441            }
3442        });
3443
3444        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3445        assert_eq!(instance_count, 58)
3446    }
3447
3448    #[cfg(feature = "sim")]
3449    #[test]
3450    fn sim_top_level_assume_ordering_multiple() {
3451        let mut flow = FlowBuilder::new();
3452        let node = flow.process::<()>();
3453
3454        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3455        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3456
3457        let (complete_cycle_back, cycle_back) =
3458            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3459        let input1_ordered = input
3460            .clone()
3461            .merge_unordered(cycle_back)
3462            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3463        let foo = input1_ordered
3464            .clone()
3465            .map(q!(|v| v + 3))
3466            .weaken_ordering::<NoOrder>()
3467            .merge_unordered(input2)
3468            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3469
3470        complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3471
3472        let out_recv = input1_ordered.sim_output();
3473
3474        let mut saw = false;
3475        let instance_count = flow.sim().exhaustive(async || {
3476            in_send.send_many_unordered([0, 1]);
3477            let out = out_recv.collect::<Vec<_>>().await;
3478
3479            if out.starts_with(&[0, 3, 1]) {
3480                saw = true;
3481            }
3482        });
3483
3484        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3485        assert_eq!(instance_count, 24)
3486    }
3487
3488    #[cfg(feature = "sim")]
3489    #[test]
3490    fn sim_atomic_assume_ordering_cycle_back() {
3491        let mut flow = FlowBuilder::new();
3492        let node = flow.process::<()>();
3493
3494        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3495
3496        let (complete_cycle_back, cycle_back) =
3497            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3498        let ordered = input
3499            .merge_unordered(cycle_back)
3500            .atomic(&node.tick())
3501            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3502            .end_atomic();
3503        complete_cycle_back.complete(
3504            ordered
3505                .clone()
3506                .map(q!(|v| v + 1))
3507                .filter(q!(|v| v % 2 == 1)),
3508        );
3509
3510        let out_recv = ordered.sim_output();
3511
3512        let instance_count = flow.sim().exhaustive(async || {
3513            in_send.send_many_unordered([0, 2]);
3514            let out = out_recv.collect::<Vec<_>>().await;
3515            assert_eq!(out.len(), 4);
3516        });
3517
3518        assert_eq!(instance_count, 22)
3519    }
3520
3521    #[cfg(feature = "deploy")]
3522    #[tokio::test]
3523    async fn partition_evens_odds() {
3524        let mut deployment = Deployment::new();
3525
3526        let mut flow = FlowBuilder::new();
3527        let node = flow.process::<()>();
3528        let external = flow.external::<()>();
3529
3530        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3531        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3532        let evens_port = evens.send_bincode_external(&external);
3533        let odds_port = odds.send_bincode_external(&external);
3534
3535        let nodes = flow
3536            .with_process(&node, deployment.Localhost())
3537            .with_external(&external, deployment.Localhost())
3538            .deploy(&mut deployment);
3539
3540        deployment.deploy().await.unwrap();
3541
3542        let mut evens_out = nodes.connect(evens_port).await;
3543        let mut odds_out = nodes.connect(odds_port).await;
3544
3545        deployment.start().await.unwrap();
3546
3547        let mut even_results = Vec::new();
3548        for _ in 0..3 {
3549            even_results.push(evens_out.next().await.unwrap());
3550        }
3551        even_results.sort();
3552        assert_eq!(even_results, vec![2, 4, 6]);
3553
3554        let mut odd_results = Vec::new();
3555        for _ in 0..3 {
3556            odd_results.push(odds_out.next().await.unwrap());
3557        }
3558        odd_results.sort();
3559        assert_eq!(odd_results, vec![1, 3, 5]);
3560    }
3561
3562    #[cfg(feature = "deploy")]
3563    #[tokio::test]
3564    async fn unconsumed_inspect_still_runs() {
3565        use crate::deploy::DeployCrateWrapper;
3566
3567        let mut deployment = Deployment::new();
3568
3569        let mut flow = FlowBuilder::new();
3570        let node = flow.process::<()>();
3571
3572        // The return value of .inspect() is intentionally dropped.
3573        // Before the Null-root fix, this would silently do nothing.
3574        node.source_iter(q!(0..5))
3575            .inspect(q!(|x| println!("inspect: {}", x)));
3576
3577        let nodes = flow
3578            .with_process(&node, deployment.Localhost())
3579            .deploy(&mut deployment);
3580
3581        deployment.deploy().await.unwrap();
3582
3583        let mut stdout = nodes.get_process(&node).stdout();
3584
3585        deployment.start().await.unwrap();
3586
3587        let mut lines = Vec::new();
3588        for _ in 0..5 {
3589            lines.push(stdout.recv().await.unwrap());
3590        }
3591        lines.sort();
3592        assert_eq!(
3593            lines,
3594            vec![
3595                "inspect: 0",
3596                "inspect: 1",
3597                "inspect: 2",
3598                "inspect: 3",
3599                "inspect: 4",
3600            ]
3601        );
3602    }
3603}