Skip to main content

hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
19};
20#[cfg(stageleft_runtime)]
21use crate::forward_handle::{CycleCollection, ReceiverComplete};
22use crate::forward_handle::{ForwardRef, TickCycle};
23use crate::live_collections::batch_atomic::BatchAtomic;
24use crate::live_collections::stream::{
25    AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
26};
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::DeferTick;
30use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
34
35pub mod networking;
36
37/// Streaming elements of type `V` grouped by a key of type `K`.
38///
39/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
40/// order of keys is non-deterministic but the order *within* each group may be deterministic.
41///
42/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
43/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
44/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
45///
46/// Type Parameters:
47/// - `K`: the type of the key for each group
48/// - `V`: the type of the elements inside each group
49/// - `Loc`: the [`Location`] where the keyed stream is materialized
50/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
51/// - `Order`: tracks whether the elements within each group have deterministic order
52///   ([`TotalOrder`]) or not ([`NoOrder`])
53/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
54///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
55pub struct KeyedStream<
56    K,
57    V,
58    Loc,
59    Bound: Boundedness = Unbounded,
60    Order: Ordering = TotalOrder,
61    Retry: Retries = ExactlyOnce,
62> {
63    pub(crate) location: Loc,
64    pub(crate) ir_node: RefCell<HydroNode>,
65
66    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
67}
68
69impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
70    for KeyedStream<K, V, L, Unbounded, O, R>
71where
72    L: Location<'a>,
73{
74    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
75        let new_meta = stream
76            .location
77            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
78
79        KeyedStream {
80            location: stream.location,
81            ir_node: RefCell::new(HydroNode::Cast {
82                inner: Box::new(stream.ir_node.into_inner()),
83                metadata: new_meta,
84            }),
85            _phantom: PhantomData,
86        }
87    }
88}
89
90impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
91    for KeyedStream<K, V, L, B, NoOrder, R>
92where
93    L: Location<'a>,
94{
95    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
96        stream.weaken_ordering()
97    }
98}
99
100impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
101where
102    L: Location<'a>,
103{
104    fn defer_tick(self) -> Self {
105        KeyedStream::defer_tick(self)
106    }
107}
108
109impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
110    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
111where
112    L: Location<'a>,
113{
114    type Location = Tick<L>;
115
116    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
117        KeyedStream {
118            location: location.clone(),
119            ir_node: RefCell::new(HydroNode::CycleSource {
120                ident,
121                metadata: location.new_node_metadata(
122                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
123                ),
124            }),
125            _phantom: PhantomData,
126        }
127    }
128}
129
130impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
131    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
132where
133    L: Location<'a>,
134{
135    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
136        assert_eq!(
137            Location::id(&self.location),
138            expected_location,
139            "locations do not match"
140        );
141
142        self.location
143            .flow_state()
144            .borrow_mut()
145            .push_root(HydroRoot::CycleSink {
146                ident,
147                input: Box::new(self.ir_node.into_inner()),
148                op_metadata: HydroIrOpMetadata::new(),
149            });
150    }
151}
152
153impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
154    for KeyedStream<K, V, L, B, O, R>
155where
156    L: Location<'a> + NoTick,
157{
158    type Location = L;
159
160    fn create_source(ident: syn::Ident, location: L) -> Self {
161        KeyedStream {
162            location: location.clone(),
163            ir_node: RefCell::new(HydroNode::CycleSource {
164                ident,
165                metadata: location
166                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
167            }),
168            _phantom: PhantomData,
169        }
170    }
171}
172
173impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
174    for KeyedStream<K, V, L, B, O, R>
175where
176    L: Location<'a> + NoTick,
177{
178    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
179        assert_eq!(
180            Location::id(&self.location),
181            expected_location,
182            "locations do not match"
183        );
184        self.location
185            .flow_state()
186            .borrow_mut()
187            .push_root(HydroRoot::CycleSink {
188                ident,
189                input: Box::new(self.ir_node.into_inner()),
190                op_metadata: HydroIrOpMetadata::new(),
191            });
192    }
193}
194
195impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
196    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
197{
198    fn clone(&self) -> Self {
199        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
200            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
201            *self.ir_node.borrow_mut() = HydroNode::Tee {
202                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
203                metadata: self.location.new_node_metadata(Self::collection_kind()),
204            };
205        }
206
207        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
208            KeyedStream {
209                location: self.location.clone(),
210                ir_node: HydroNode::Tee {
211                    inner: TeeNode(inner.0.clone()),
212                    metadata: metadata.clone(),
213                }
214                .into(),
215                _phantom: PhantomData,
216            }
217        } else {
218            unreachable!()
219        }
220    }
221}
222
223impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
224    KeyedStream<K, V, L, B, O, R>
225{
226    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
227        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
228        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
229
230        KeyedStream {
231            location,
232            ir_node: RefCell::new(ir_node),
233            _phantom: PhantomData,
234        }
235    }
236
237    /// Returns the [`CollectionKind`] corresponding to this type.
238    pub fn collection_kind() -> CollectionKind {
239        CollectionKind::KeyedStream {
240            bound: B::BOUND_KIND,
241            value_order: O::ORDERING_KIND,
242            value_retry: R::RETRIES_KIND,
243            key_type: stageleft::quote_type::<K>().into(),
244            value_type: stageleft::quote_type::<V>().into(),
245        }
246    }
247
248    /// Returns the [`Location`] where this keyed stream is being materialized.
249    pub fn location(&self) -> &L {
250        &self.location
251    }
252
253    /// Explicitly "casts" the keyed stream to a type with a different ordering
254    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
255    /// by the type-system.
256    ///
257    /// # Non-Determinism
258    /// This function is used as an escape hatch, and any mistakes in the
259    /// provided ordering guarantee will propagate into the guarantees
260    /// for the rest of the program.
261    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
262        if O::ORDERING_KIND == O2::ORDERING_KIND {
263            KeyedStream::new(self.location, self.ir_node.into_inner())
264        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
265            // We can always weaken the ordering guarantee
266            KeyedStream::new(
267                self.location.clone(),
268                HydroNode::Cast {
269                    inner: Box::new(self.ir_node.into_inner()),
270                    metadata: self
271                        .location
272                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
273                },
274            )
275        } else {
276            KeyedStream::new(
277                self.location.clone(),
278                HydroNode::ObserveNonDet {
279                    inner: Box::new(self.ir_node.into_inner()),
280                    trusted: false,
281                    metadata: self
282                        .location
283                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
284                },
285            )
286        }
287    }
288
289    fn assume_ordering_trusted<O2: Ordering>(
290        self,
291        _nondet: NonDet,
292    ) -> KeyedStream<K, V, L, B, O2, R> {
293        if O::ORDERING_KIND == O2::ORDERING_KIND {
294            KeyedStream::new(self.location, self.ir_node.into_inner())
295        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
296            // We can always weaken the ordering guarantee
297            KeyedStream::new(
298                self.location.clone(),
299                HydroNode::Cast {
300                    inner: Box::new(self.ir_node.into_inner()),
301                    metadata: self
302                        .location
303                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
304                },
305            )
306        } else {
307            KeyedStream::new(
308                self.location.clone(),
309                HydroNode::ObserveNonDet {
310                    inner: Box::new(self.ir_node.into_inner()),
311                    trusted: true,
312                    metadata: self
313                        .location
314                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
315                },
316            )
317        }
318    }
319
320    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
321    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
322    /// which is always safe because that is the weakest possible guarantee.
323    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
324        self.weaken_ordering::<NoOrder>()
325    }
326
327    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
328    /// enforcing that `O2` is weaker than the input ordering guarantee.
329    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
330        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
331        self.assume_ordering::<O2>(nondet)
332    }
333
334    /// Explicitly "casts" the keyed stream to a type with a different retries
335    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
336    /// be proven by the type-system.
337    ///
338    /// # Non-Determinism
339    /// This function is used as an escape hatch, and any mistakes in the
340    /// provided retries guarantee will propagate into the guarantees
341    /// for the rest of the program.
342    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
343        if R::RETRIES_KIND == R2::RETRIES_KIND {
344            KeyedStream::new(self.location, self.ir_node.into_inner())
345        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
346            // We can always weaken the retries guarantee
347            KeyedStream::new(
348                self.location.clone(),
349                HydroNode::Cast {
350                    inner: Box::new(self.ir_node.into_inner()),
351                    metadata: self
352                        .location
353                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
354                },
355            )
356        } else {
357            KeyedStream::new(
358                self.location.clone(),
359                HydroNode::ObserveNonDet {
360                    inner: Box::new(self.ir_node.into_inner()),
361                    trusted: false,
362                    metadata: self
363                        .location
364                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
365                },
366            )
367        }
368    }
369
370    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
371    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
372    /// which is always safe because that is the weakest possible guarantee.
373    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
374        self.weaken_retries::<AtLeastOnce>()
375    }
376
377    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
378    /// enforcing that `R2` is weaker than the input retries guarantee.
379    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
380        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
381        self.assume_retries::<R2>(nondet)
382    }
383
384    /// Flattens the keyed stream into an unordered stream of key-value pairs.
385    ///
386    /// # Example
387    /// ```rust
388    /// # #[cfg(feature = "deploy")] {
389    /// # use hydro_lang::prelude::*;
390    /// # use futures::StreamExt;
391    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
392    /// process
393    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
394    ///     .into_keyed()
395    ///     .entries()
396    /// # }, |mut stream| async move {
397    /// // (1, 2), (1, 3), (2, 4) in any order
398    /// # let mut results = Vec::new();
399    /// # for _ in 0..3 {
400    /// #     results.push(stream.next().await.unwrap());
401    /// # }
402    /// # results.sort();
403    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
404    /// # }));
405    /// # }
406    /// ```
407    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
408        Stream::new(
409            self.location.clone(),
410            HydroNode::Cast {
411                inner: Box::new(self.ir_node.into_inner()),
412                metadata: self
413                    .location
414                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
415            },
416        )
417    }
418
419    /// Flattens the keyed stream into an unordered stream of only the values.
420    ///
421    /// # Example
422    /// ```rust
423    /// # #[cfg(feature = "deploy")] {
424    /// # use hydro_lang::prelude::*;
425    /// # use futures::StreamExt;
426    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
427    /// process
428    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
429    ///     .into_keyed()
430    ///     .values()
431    /// # }, |mut stream| async move {
432    /// // 2, 3, 4 in any order
433    /// # let mut results = Vec::new();
434    /// # for _ in 0..3 {
435    /// #     results.push(stream.next().await.unwrap());
436    /// # }
437    /// # results.sort();
438    /// # assert_eq!(results, vec![2, 3, 4]);
439    /// # }));
440    /// # }
441    /// ```
442    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
443        self.entries().map(q!(|(_, v)| v))
444    }
445
446    /// Flattens the keyed stream into an unordered stream of just the keys.
447    ///
448    /// # Example
449    /// ```rust
450    /// # #[cfg(feature = "deploy")] {
451    /// # use hydro_lang::prelude::*;
452    /// # use futures::StreamExt;
453    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
454    /// # process
455    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
456    /// #     .into_keyed()
457    /// #     .keys()
458    /// # }, |mut stream| async move {
459    /// // 1, 2 in any order
460    /// # let mut results = Vec::new();
461    /// # for _ in 0..2 {
462    /// #     results.push(stream.next().await.unwrap());
463    /// # }
464    /// # results.sort();
465    /// # assert_eq!(results, vec![1, 2]);
466    /// # }));
467    /// # }
468    /// ```
469    pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
470    where
471        K: Eq + Hash,
472    {
473        self.entries().map(q!(|(k, _)| k)).unique()
474    }
475
476    /// Transforms each value by invoking `f` on each element, with keys staying the same
477    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
478    ///
479    /// If you do not want to modify the stream and instead only want to view
480    /// each item use [`KeyedStream::inspect`] instead.
481    ///
482    /// # Example
483    /// ```rust
484    /// # #[cfg(feature = "deploy")] {
485    /// # use hydro_lang::prelude::*;
486    /// # use futures::StreamExt;
487    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
488    /// process
489    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
490    ///     .into_keyed()
491    ///     .map(q!(|v| v + 1))
492    /// #   .entries()
493    /// # }, |mut stream| async move {
494    /// // { 1: [3, 4], 2: [5] }
495    /// # let mut results = Vec::new();
496    /// # for _ in 0..3 {
497    /// #     results.push(stream.next().await.unwrap());
498    /// # }
499    /// # results.sort();
500    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
501    /// # }));
502    /// # }
503    /// ```
504    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
505    where
506        F: Fn(V) -> U + 'a,
507    {
508        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
509        let map_f = q!({
510            let orig = f;
511            move |(k, v)| (k, orig(v))
512        })
513        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
514        .into();
515
516        KeyedStream::new(
517            self.location.clone(),
518            HydroNode::Map {
519                f: map_f,
520                input: Box::new(self.ir_node.into_inner()),
521                metadata: self
522                    .location
523                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
524            },
525        )
526    }
527
528    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
529    /// re-grouped even they are tuples; instead they will be grouped under the original key.
530    ///
531    /// If you do not want to modify the stream and instead only want to view
532    /// each item use [`KeyedStream::inspect_with_key`] instead.
533    ///
534    /// # Example
535    /// ```rust
536    /// # #[cfg(feature = "deploy")] {
537    /// # use hydro_lang::prelude::*;
538    /// # use futures::StreamExt;
539    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
540    /// process
541    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
542    ///     .into_keyed()
543    ///     .map_with_key(q!(|(k, v)| k + v))
544    /// #   .entries()
545    /// # }, |mut stream| async move {
546    /// // { 1: [3, 4], 2: [6] }
547    /// # let mut results = Vec::new();
548    /// # for _ in 0..3 {
549    /// #     results.push(stream.next().await.unwrap());
550    /// # }
551    /// # results.sort();
552    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
553    /// # }));
554    /// # }
555    /// ```
556    pub fn map_with_key<U, F>(
557        self,
558        f: impl IntoQuotedMut<'a, F, L> + Copy,
559    ) -> KeyedStream<K, U, L, B, O, R>
560    where
561        F: Fn((K, V)) -> U + 'a,
562        K: Clone,
563    {
564        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
565        let map_f = q!({
566            let orig = f;
567            move |(k, v)| {
568                let out = orig((Clone::clone(&k), v));
569                (k, out)
570            }
571        })
572        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
573        .into();
574
575        KeyedStream::new(
576            self.location.clone(),
577            HydroNode::Map {
578                f: map_f,
579                input: Box::new(self.ir_node.into_inner()),
580                metadata: self
581                    .location
582                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
583            },
584        )
585    }
586
587    /// Prepends a new value to the key of each element in the stream, producing a new
588    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
589    /// occurs and the elements in each group preserve their original order.
590    ///
591    /// # Example
592    /// ```rust
593    /// # #[cfg(feature = "deploy")] {
594    /// # use hydro_lang::prelude::*;
595    /// # use futures::StreamExt;
596    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
597    /// process
598    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
599    ///     .into_keyed()
600    ///     .prefix_key(q!(|&(k, _)| k % 2))
601    /// #   .entries()
602    /// # }, |mut stream| async move {
603    /// // { (1, 1): [2, 3], (0, 2): [4] }
604    /// # let mut results = Vec::new();
605    /// # for _ in 0..3 {
606    /// #     results.push(stream.next().await.unwrap());
607    /// # }
608    /// # results.sort();
609    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
610    /// # }));
611    /// # }
612    /// ```
613    pub fn prefix_key<K2, F>(
614        self,
615        f: impl IntoQuotedMut<'a, F, L> + Copy,
616    ) -> KeyedStream<(K2, K), V, L, B, O, R>
617    where
618        F: Fn(&(K, V)) -> K2 + 'a,
619    {
620        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
621        let map_f = q!({
622            let orig = f;
623            move |kv| {
624                let out = orig(&kv);
625                ((out, kv.0), kv.1)
626            }
627        })
628        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
629        .into();
630
631        KeyedStream::new(
632            self.location.clone(),
633            HydroNode::Map {
634                f: map_f,
635                input: Box::new(self.ir_node.into_inner()),
636                metadata: self
637                    .location
638                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
639            },
640        )
641    }
642
643    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
644    /// `f`, preserving the order of the elements within the group.
645    ///
646    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
647    /// not modify or take ownership of the values. If you need to modify the values while filtering
648    /// use [`KeyedStream::filter_map`] instead.
649    ///
650    /// # Example
651    /// ```rust
652    /// # #[cfg(feature = "deploy")] {
653    /// # use hydro_lang::prelude::*;
654    /// # use futures::StreamExt;
655    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
656    /// process
657    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
658    ///     .into_keyed()
659    ///     .filter(q!(|&x| x > 2))
660    /// #   .entries()
661    /// # }, |mut stream| async move {
662    /// // { 1: [3], 2: [4] }
663    /// # let mut results = Vec::new();
664    /// # for _ in 0..2 {
665    /// #     results.push(stream.next().await.unwrap());
666    /// # }
667    /// # results.sort();
668    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
669    /// # }));
670    /// # }
671    /// ```
672    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
673    where
674        F: Fn(&V) -> bool + 'a,
675    {
676        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
677        let filter_f = q!({
678            let orig = f;
679            move |t: &(_, _)| orig(&t.1)
680        })
681        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
682        .into();
683
684        KeyedStream::new(
685            self.location.clone(),
686            HydroNode::Filter {
687                f: filter_f,
688                input: Box::new(self.ir_node.into_inner()),
689                metadata: self.location.new_node_metadata(Self::collection_kind()),
690            },
691        )
692    }
693
694    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
695    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
696    ///
697    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
698    /// not modify or take ownership of the values. If you need to modify the values while filtering
699    /// use [`KeyedStream::filter_map_with_key`] instead.
700    ///
701    /// # Example
702    /// ```rust
703    /// # #[cfg(feature = "deploy")] {
704    /// # use hydro_lang::prelude::*;
705    /// # use futures::StreamExt;
706    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
707    /// process
708    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
709    ///     .into_keyed()
710    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
711    /// #   .entries()
712    /// # }, |mut stream| async move {
713    /// // { 1: [3], 2: [4] }
714    /// # let mut results = Vec::new();
715    /// # for _ in 0..2 {
716    /// #     results.push(stream.next().await.unwrap());
717    /// # }
718    /// # results.sort();
719    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
720    /// # }));
721    /// # }
722    /// ```
723    pub fn filter_with_key<F>(
724        self,
725        f: impl IntoQuotedMut<'a, F, L> + Copy,
726    ) -> KeyedStream<K, V, L, B, O, R>
727    where
728        F: Fn(&(K, V)) -> bool + 'a,
729    {
730        let filter_f = f
731            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
732            .into();
733
734        KeyedStream::new(
735            self.location.clone(),
736            HydroNode::Filter {
737                f: filter_f,
738                input: Box::new(self.ir_node.into_inner()),
739                metadata: self.location.new_node_metadata(Self::collection_kind()),
740            },
741        )
742    }
743
744    /// An operator that both filters and maps each value, with keys staying the same.
745    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
746    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
747    ///
748    /// # Example
749    /// ```rust
750    /// # #[cfg(feature = "deploy")] {
751    /// # use hydro_lang::prelude::*;
752    /// # use futures::StreamExt;
753    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
754    /// process
755    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
756    ///     .into_keyed()
757    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
758    /// #   .entries()
759    /// # }, |mut stream| async move {
760    /// // { 1: [2], 2: [4] }
761    /// # let mut results = Vec::new();
762    /// # for _ in 0..2 {
763    /// #     results.push(stream.next().await.unwrap());
764    /// # }
765    /// # results.sort();
766    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
767    /// # }));
768    /// # }
769    /// ```
770    pub fn filter_map<U, F>(
771        self,
772        f: impl IntoQuotedMut<'a, F, L> + Copy,
773    ) -> KeyedStream<K, U, L, B, O, R>
774    where
775        F: Fn(V) -> Option<U> + 'a,
776    {
777        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
778        let filter_map_f = q!({
779            let orig = f;
780            move |(k, v)| orig(v).map(|o| (k, o))
781        })
782        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
783        .into();
784
785        KeyedStream::new(
786            self.location.clone(),
787            HydroNode::FilterMap {
788                f: filter_map_f,
789                input: Box::new(self.ir_node.into_inner()),
790                metadata: self
791                    .location
792                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
793            },
794        )
795    }
796
797    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
798    /// re-grouped even they are tuples; instead they will be grouped under the original key.
799    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
800    ///
801    /// # Example
802    /// ```rust
803    /// # #[cfg(feature = "deploy")] {
804    /// # use hydro_lang::prelude::*;
805    /// # use futures::StreamExt;
806    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
807    /// process
808    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
809    ///     .into_keyed()
810    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
811    /// #   .entries()
812    /// # }, |mut stream| async move {
813    /// // { 2: [2] }
814    /// # let mut results = Vec::new();
815    /// # for _ in 0..1 {
816    /// #     results.push(stream.next().await.unwrap());
817    /// # }
818    /// # results.sort();
819    /// # assert_eq!(results, vec![(2, 2)]);
820    /// # }));
821    /// # }
822    /// ```
823    pub fn filter_map_with_key<U, F>(
824        self,
825        f: impl IntoQuotedMut<'a, F, L> + Copy,
826    ) -> KeyedStream<K, U, L, B, O, R>
827    where
828        F: Fn((K, V)) -> Option<U> + 'a,
829        K: Clone,
830    {
831        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
832        let filter_map_f = q!({
833            let orig = f;
834            move |(k, v)| {
835                let out = orig((Clone::clone(&k), v));
836                out.map(|o| (k, o))
837            }
838        })
839        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
840        .into();
841
842        KeyedStream::new(
843            self.location.clone(),
844            HydroNode::FilterMap {
845                f: filter_map_f,
846                input: Box::new(self.ir_node.into_inner()),
847                metadata: self
848                    .location
849                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
850            },
851        )
852    }
853
854    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
855    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
856    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
857    ///
858    /// # Example
859    /// ```rust
860    /// # #[cfg(feature = "deploy")] {
861    /// # use hydro_lang::prelude::*;
862    /// # use futures::StreamExt;
863    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
864    /// let tick = process.tick();
865    /// let batch = process
866    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
867    ///   .into_keyed()
868    ///   .batch(&tick, nondet!(/** test */));
869    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
870    /// batch.cross_singleton(count).all_ticks().entries()
871    /// # }, |mut stream| async move {
872    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
873    /// # let mut results = Vec::new();
874    /// # for _ in 0..3 {
875    /// #     results.push(stream.next().await.unwrap());
876    /// # }
877    /// # results.sort();
878    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
879    /// # }));
880    /// # }
881    /// ```
882    pub fn cross_singleton<O2>(
883        self,
884        other: impl Into<Optional<O2, L, Bounded>>,
885    ) -> KeyedStream<K, (V, O2), L, B, O, R>
886    where
887        O2: Clone,
888    {
889        let other: Optional<O2, L, Bounded> = other.into();
890        check_matching_location(&self.location, &other.location);
891
892        Stream::new(
893            self.location.clone(),
894            HydroNode::CrossSingleton {
895                left: Box::new(self.ir_node.into_inner()),
896                right: Box::new(other.ir_node.into_inner()),
897                metadata: self
898                    .location
899                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
900            },
901        )
902        .map(q!(|((k, v), o2)| (k, (v, o2))))
903        .into_keyed()
904    }
905
906    /// For each value `v` in each group, transform `v` using `f` and then treat the
907    /// result as an [`Iterator`] to produce values one by one within the same group.
908    /// The implementation for [`Iterator`] for the output type `I` must produce items
909    /// in a **deterministic** order.
910    ///
911    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
912    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
913    ///
914    /// # Example
915    /// ```rust
916    /// # #[cfg(feature = "deploy")] {
917    /// # use hydro_lang::prelude::*;
918    /// # use futures::StreamExt;
919    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
920    /// process
921    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
922    ///     .into_keyed()
923    ///     .flat_map_ordered(q!(|x| x))
924    /// #   .entries()
925    /// # }, |mut stream| async move {
926    /// // { 1: [2, 3, 4], 2: [5, 6] }
927    /// # let mut results = Vec::new();
928    /// # for _ in 0..5 {
929    /// #     results.push(stream.next().await.unwrap());
930    /// # }
931    /// # results.sort();
932    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
933    /// # }));
934    /// # }
935    /// ```
936    pub fn flat_map_ordered<U, I, F>(
937        self,
938        f: impl IntoQuotedMut<'a, F, L> + Copy,
939    ) -> KeyedStream<K, U, L, B, O, R>
940    where
941        I: IntoIterator<Item = U>,
942        F: Fn(V) -> I + 'a,
943        K: Clone,
944    {
945        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
946        let flat_map_f = q!({
947            let orig = f;
948            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
949        })
950        .splice_fn1_ctx::<(K, V), _>(&self.location)
951        .into();
952
953        KeyedStream::new(
954            self.location.clone(),
955            HydroNode::FlatMap {
956                f: flat_map_f,
957                input: Box::new(self.ir_node.into_inner()),
958                metadata: self
959                    .location
960                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
961            },
962        )
963    }
964
965    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
966    /// for the output type `I` to produce items in any order.
967    ///
968    /// # Example
969    /// ```rust
970    /// # #[cfg(feature = "deploy")] {
971    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
972    /// # use futures::StreamExt;
973    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
974    /// process
975    ///     .source_iter(q!(vec![
976    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
977    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
978    ///     ]))
979    ///     .into_keyed()
980    ///     .flat_map_unordered(q!(|x| x))
981    /// #   .entries()
982    /// # }, |mut stream| async move {
983    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
984    /// # let mut results = Vec::new();
985    /// # for _ in 0..4 {
986    /// #     results.push(stream.next().await.unwrap());
987    /// # }
988    /// # results.sort();
989    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
990    /// # }));
991    /// # }
992    /// ```
993    pub fn flat_map_unordered<U, I, F>(
994        self,
995        f: impl IntoQuotedMut<'a, F, L> + Copy,
996    ) -> KeyedStream<K, U, L, B, NoOrder, R>
997    where
998        I: IntoIterator<Item = U>,
999        F: Fn(V) -> I + 'a,
1000        K: Clone,
1001    {
1002        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1003        let flat_map_f = q!({
1004            let orig = f;
1005            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1006        })
1007        .splice_fn1_ctx::<(K, V), _>(&self.location)
1008        .into();
1009
1010        KeyedStream::new(
1011            self.location.clone(),
1012            HydroNode::FlatMap {
1013                f: flat_map_f,
1014                input: Box::new(self.ir_node.into_inner()),
1015                metadata: self
1016                    .location
1017                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1018            },
1019        )
1020    }
1021
1022    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1023    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1024    /// items in a **deterministic** order.
1025    ///
1026    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1027    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1028    ///
1029    /// # Example
1030    /// ```rust
1031    /// # #[cfg(feature = "deploy")] {
1032    /// # use hydro_lang::prelude::*;
1033    /// # use futures::StreamExt;
1034    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1035    /// process
1036    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1037    ///     .into_keyed()
1038    ///     .flatten_ordered()
1039    /// #   .entries()
1040    /// # }, |mut stream| async move {
1041    /// // { 1: [2, 3, 4], 2: [5, 6] }
1042    /// # let mut results = Vec::new();
1043    /// # for _ in 0..5 {
1044    /// #     results.push(stream.next().await.unwrap());
1045    /// # }
1046    /// # results.sort();
1047    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1048    /// # }));
1049    /// # }
1050    /// ```
1051    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1052    where
1053        V: IntoIterator<Item = U>,
1054        K: Clone,
1055    {
1056        self.flat_map_ordered(q!(|d| d))
1057    }
1058
1059    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1060    /// for the value type `V` to produce items in any order.
1061    ///
1062    /// # Example
1063    /// ```rust
1064    /// # #[cfg(feature = "deploy")] {
1065    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1066    /// # use futures::StreamExt;
1067    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1068    /// process
1069    ///     .source_iter(q!(vec![
1070    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1071    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1072    ///     ]))
1073    ///     .into_keyed()
1074    ///     .flatten_unordered()
1075    /// #   .entries()
1076    /// # }, |mut stream| async move {
1077    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1078    /// # let mut results = Vec::new();
1079    /// # for _ in 0..4 {
1080    /// #     results.push(stream.next().await.unwrap());
1081    /// # }
1082    /// # results.sort();
1083    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1084    /// # }));
1085    /// # }
1086    /// ```
1087    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1088    where
1089        V: IntoIterator<Item = U>,
1090        K: Clone,
1091    {
1092        self.flat_map_unordered(q!(|d| d))
1093    }
1094
1095    /// An operator which allows you to "inspect" each element of a stream without
1096    /// modifying it. The closure `f` is called on a reference to each value. This is
1097    /// mainly useful for debugging, and should not be used to generate side-effects.
1098    ///
1099    /// # Example
1100    /// ```rust
1101    /// # #[cfg(feature = "deploy")] {
1102    /// # use hydro_lang::prelude::*;
1103    /// # use futures::StreamExt;
1104    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1105    /// process
1106    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1107    ///     .into_keyed()
1108    ///     .inspect(q!(|v| println!("{}", v)))
1109    /// #   .entries()
1110    /// # }, |mut stream| async move {
1111    /// # let mut results = Vec::new();
1112    /// # for _ in 0..3 {
1113    /// #     results.push(stream.next().await.unwrap());
1114    /// # }
1115    /// # results.sort();
1116    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1117    /// # }));
1118    /// # }
1119    /// ```
1120    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1121    where
1122        F: Fn(&V) + 'a,
1123    {
1124        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1125        let inspect_f = q!({
1126            let orig = f;
1127            move |t: &(_, _)| orig(&t.1)
1128        })
1129        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1130        .into();
1131
1132        KeyedStream::new(
1133            self.location.clone(),
1134            HydroNode::Inspect {
1135                f: inspect_f,
1136                input: Box::new(self.ir_node.into_inner()),
1137                metadata: self.location.new_node_metadata(Self::collection_kind()),
1138            },
1139        )
1140    }
1141
1142    /// An operator which allows you to "inspect" each element of a stream without
1143    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1144    /// mainly useful for debugging, and should not be used to generate side-effects.
1145    ///
1146    /// # Example
1147    /// ```rust
1148    /// # #[cfg(feature = "deploy")] {
1149    /// # use hydro_lang::prelude::*;
1150    /// # use futures::StreamExt;
1151    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1152    /// process
1153    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1154    ///     .into_keyed()
1155    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1156    /// #   .entries()
1157    /// # }, |mut stream| async move {
1158    /// # let mut results = Vec::new();
1159    /// # for _ in 0..3 {
1160    /// #     results.push(stream.next().await.unwrap());
1161    /// # }
1162    /// # results.sort();
1163    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1164    /// # }));
1165    /// # }
1166    /// ```
1167    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1168    where
1169        F: Fn(&(K, V)) + 'a,
1170    {
1171        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1172
1173        KeyedStream::new(
1174            self.location.clone(),
1175            HydroNode::Inspect {
1176                f: inspect_f,
1177                input: Box::new(self.ir_node.into_inner()),
1178                metadata: self.location.new_node_metadata(Self::collection_kind()),
1179            },
1180        )
1181    }
1182
1183    /// An operator which allows you to "name" a `HydroNode`.
1184    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1185    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1186        {
1187            let mut node = self.ir_node.borrow_mut();
1188            let metadata = node.metadata_mut();
1189            metadata.tag = Some(name.to_owned());
1190        }
1191        self
1192    }
1193}
1194
1195impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1196    KeyedStream<(K1, K2), V, L, B, O, R>
1197{
1198    /// Produces a new keyed stream by dropping the first element of the compound key.
1199    ///
1200    /// Because multiple keys may share the same suffix, this operation results in re-grouping
1201    /// of the values under the new keys. The values across groups with the same new key
1202    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
1203    ///
1204    /// # Example
1205    /// ```rust
1206    /// # #[cfg(feature = "deploy")] {
1207    /// # use hydro_lang::prelude::*;
1208    /// # use futures::StreamExt;
1209    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1210    /// process
1211    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
1212    ///     .into_keyed()
1213    ///     .drop_key_prefix()
1214    /// #   .entries()
1215    /// # }, |mut stream| async move {
1216    /// // { 10: [2, 3], 20: [4] }
1217    /// # let mut results = Vec::new();
1218    /// # for _ in 0..3 {
1219    /// #     results.push(stream.next().await.unwrap());
1220    /// # }
1221    /// # results.sort();
1222    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
1223    /// # }));
1224    /// # }
1225    /// ```
1226    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
1227        self.entries()
1228            .map(q!(|((_k1, k2), v)| (k2, v)))
1229            .into_keyed()
1230    }
1231}
1232
1233impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
1234    KeyedStream<K, V, L, Unbounded, O, R>
1235{
1236    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
1237    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
1238    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
1239    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
1240    ///
1241    /// Currently, both input streams must be [`Unbounded`].
1242    ///
1243    /// # Example
1244    /// ```rust
1245    /// # #[cfg(feature = "deploy")] {
1246    /// # use hydro_lang::prelude::*;
1247    /// # use futures::StreamExt;
1248    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1249    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
1250    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
1251    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
1252    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
1253    /// numbers1.interleave(numbers2)
1254    /// #   .entries()
1255    /// # }, |mut stream| async move {
1256    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
1257    /// # let mut results = Vec::new();
1258    /// # for _ in 0..4 {
1259    /// #     results.push(stream.next().await.unwrap());
1260    /// # }
1261    /// # results.sort();
1262    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
1263    /// # }));
1264    /// # }
1265    /// ```
1266    pub fn interleave<O2: Ordering, R2: Retries>(
1267        self,
1268        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
1269    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1270    where
1271        R: MinRetries<R2>,
1272    {
1273        KeyedStream::new(
1274            self.location.clone(),
1275            HydroNode::Chain {
1276                first: Box::new(self.ir_node.into_inner()),
1277                second: Box::new(other.ir_node.into_inner()),
1278                metadata: self.location.new_node_metadata(KeyedStream::<
1279                    K,
1280                    V,
1281                    L,
1282                    Unbounded,
1283                    NoOrder,
1284                    <R as MinRetries<R2>>::Min,
1285                >::collection_kind()),
1286            },
1287        )
1288    }
1289}
1290
1291/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
1292/// control the processing of future elements.
1293pub enum Generate<T> {
1294    /// Emit the provided element, and keep processing future inputs.
1295    Yield(T),
1296    /// Emit the provided element as the _final_ element, do not process future inputs.
1297    Return(T),
1298    /// Do not emit anything, but continue processing future inputs.
1299    Continue,
1300    /// Do not emit anything, and do not process further inputs.
1301    Break,
1302}
1303
1304impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1305where
1306    L: Location<'a>,
1307{
1308    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1309    ///
1310    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1311    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1312    /// early by returning `None`.
1313    ///
1314    /// The function takes a mutable reference to the accumulator and the current element, and returns
1315    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1316    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1317    ///
1318    /// # Example
1319    /// ```rust
1320    /// # #[cfg(feature = "deploy")] {
1321    /// # use hydro_lang::prelude::*;
1322    /// # use futures::StreamExt;
1323    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1324    /// process
1325    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1326    ///     .into_keyed()
1327    ///     .scan(
1328    ///         q!(|| 0),
1329    ///         q!(|acc, x| {
1330    ///             *acc += x;
1331    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1332    ///         }),
1333    ///     )
1334    /// #   .entries()
1335    /// # }, |mut stream| async move {
1336    /// // Output: { 0: [1], 1: [3, 7] }
1337    /// # let mut results = Vec::new();
1338    /// # for _ in 0..3 {
1339    /// #     results.push(stream.next().await.unwrap());
1340    /// # }
1341    /// # results.sort();
1342    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1343    /// # }));
1344    /// # }
1345    /// ```
1346    pub fn scan<A, U, I, F>(
1347        self,
1348        init: impl IntoQuotedMut<'a, I, L> + Copy,
1349        f: impl IntoQuotedMut<'a, F, L> + Copy,
1350    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1351    where
1352        K: Clone + Eq + Hash,
1353        I: Fn() -> A + 'a,
1354        F: Fn(&mut A, V) -> Option<U> + 'a,
1355    {
1356        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1357        self.generator(
1358            init,
1359            q!({
1360                let orig = f;
1361                move |state, v| {
1362                    if let Some(out) = orig(state, v) {
1363                        Generate::Yield(out)
1364                    } else {
1365                        Generate::Break
1366                    }
1367                }
1368            }),
1369        )
1370    }
1371
1372    /// Iteratively processes the elements in each group using a state machine that can yield
1373    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1374    /// syntax in Rust, without requiring special syntax.
1375    ///
1376    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1377    /// state for each group. The second argument defines the processing logic, taking in a
1378    /// mutable reference to the group's state and the value to be processed. It emits a
1379    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1380    /// should be processed.
1381    ///
1382    /// # Example
1383    /// ```rust
1384    /// # #[cfg(feature = "deploy")] {
1385    /// # use hydro_lang::prelude::*;
1386    /// # use futures::StreamExt;
1387    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1388    /// process
1389    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1390    ///     .into_keyed()
1391    ///     .generator(
1392    ///         q!(|| 0),
1393    ///         q!(|acc, x| {
1394    ///             *acc += x;
1395    ///             if *acc > 100 {
1396    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1397    ///                     "done!".to_owned()
1398    ///                 )
1399    ///             } else if *acc % 2 == 0 {
1400    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1401    ///                     "even".to_owned()
1402    ///                 )
1403    ///             } else {
1404    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1405    ///             }
1406    ///         }),
1407    ///     )
1408    /// #   .entries()
1409    /// # }, |mut stream| async move {
1410    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1411    /// # let mut results = Vec::new();
1412    /// # for _ in 0..3 {
1413    /// #     results.push(stream.next().await.unwrap());
1414    /// # }
1415    /// # results.sort();
1416    /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1417    /// # }));
1418    /// # }
1419    /// ```
1420    pub fn generator<A, U, I, F>(
1421        self,
1422        init: impl IntoQuotedMut<'a, I, L> + Copy,
1423        f: impl IntoQuotedMut<'a, F, L> + Copy,
1424    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1425    where
1426        K: Clone + Eq + Hash,
1427        I: Fn() -> A + 'a,
1428        F: Fn(&mut A, V) -> Generate<U> + 'a,
1429    {
1430        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1431        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1432
1433        let scan_init = q!(|| HashMap::new())
1434            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1435            .into();
1436        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1437            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1438            if let Some(existing_state_value) = existing_state {
1439                match f(existing_state_value, v) {
1440                    Generate::Yield(out) => Some(Some((k, out))),
1441                    Generate::Return(out) => {
1442                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1443                        Some(Some((k, out)))
1444                    }
1445                    Generate::Break => {
1446                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1447                        Some(None)
1448                    }
1449                    Generate::Continue => Some(None),
1450                }
1451            } else {
1452                Some(None)
1453            }
1454        })
1455        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1456        .into();
1457
1458        let scan_node = HydroNode::Scan {
1459            init: scan_init,
1460            acc: scan_f,
1461            input: Box::new(self.ir_node.into_inner()),
1462            metadata: self.location.new_node_metadata(Stream::<
1463                Option<(K, U)>,
1464                L,
1465                B,
1466                TotalOrder,
1467                ExactlyOnce,
1468            >::collection_kind()),
1469        };
1470
1471        let flatten_f = q!(|d| d)
1472            .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1473            .into();
1474        let flatten_node = HydroNode::FlatMap {
1475            f: flatten_f,
1476            input: Box::new(scan_node),
1477            metadata: self.location.new_node_metadata(KeyedStream::<
1478                K,
1479                U,
1480                L,
1481                B,
1482                TotalOrder,
1483                ExactlyOnce,
1484            >::collection_kind()),
1485        };
1486
1487        KeyedStream::new(self.location, flatten_node)
1488    }
1489
1490    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1491    /// in-order across the values in each group. But the aggregation function returns a boolean,
1492    /// which when true indicates that the aggregated result is complete and can be released to
1493    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1494    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1495    /// normal stream elements.
1496    ///
1497    /// # Example
1498    /// ```rust
1499    /// # #[cfg(feature = "deploy")] {
1500    /// # use hydro_lang::prelude::*;
1501    /// # use futures::StreamExt;
1502    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1503    /// process
1504    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1505    ///     .into_keyed()
1506    ///     .fold_early_stop(
1507    ///         q!(|| 0),
1508    ///         q!(|acc, x| {
1509    ///             *acc += x;
1510    ///             x % 2 == 0
1511    ///         }),
1512    ///     )
1513    /// #   .entries()
1514    /// # }, |mut stream| async move {
1515    /// // Output: { 0: 2, 1: 9 }
1516    /// # let mut results = Vec::new();
1517    /// # for _ in 0..2 {
1518    /// #     results.push(stream.next().await.unwrap());
1519    /// # }
1520    /// # results.sort();
1521    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1522    /// # }));
1523    /// # }
1524    /// ```
1525    pub fn fold_early_stop<A, I, F>(
1526        self,
1527        init: impl IntoQuotedMut<'a, I, L> + Copy,
1528        f: impl IntoQuotedMut<'a, F, L> + Copy,
1529    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1530    where
1531        K: Clone + Eq + Hash,
1532        I: Fn() -> A + 'a,
1533        F: Fn(&mut A, V) -> bool + 'a,
1534    {
1535        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1536        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1537        let out_without_bound_cast = self.generator(
1538            q!(move || Some(init())),
1539            q!(move |key_state, v| {
1540                if let Some(key_state_value) = key_state.as_mut() {
1541                    if f(key_state_value, v) {
1542                        Generate::Return(key_state.take().unwrap())
1543                    } else {
1544                        Generate::Continue
1545                    }
1546                } else {
1547                    unreachable!()
1548                }
1549            }),
1550        );
1551
1552        KeyedSingleton::new(
1553            out_without_bound_cast.location.clone(),
1554            HydroNode::Cast {
1555                inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1556                metadata: out_without_bound_cast
1557                    .location
1558                    .new_node_metadata(
1559                        KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1560                    ),
1561            },
1562        )
1563    }
1564
1565    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1566    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1567    /// otherwise the first element would be non-deterministic.
1568    ///
1569    /// # Example
1570    /// ```rust
1571    /// # #[cfg(feature = "deploy")] {
1572    /// # use hydro_lang::prelude::*;
1573    /// # use futures::StreamExt;
1574    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1575    /// process
1576    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1577    ///     .into_keyed()
1578    ///     .first()
1579    /// #   .entries()
1580    /// # }, |mut stream| async move {
1581    /// // Output: { 0: 2, 1: 3 }
1582    /// # let mut results = Vec::new();
1583    /// # for _ in 0..2 {
1584    /// #     results.push(stream.next().await.unwrap());
1585    /// # }
1586    /// # results.sort();
1587    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1588    /// # }));
1589    /// # }
1590    /// ```
1591    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1592    where
1593        K: Clone + Eq + Hash,
1594    {
1595        self.fold_early_stop(
1596            q!(|| None),
1597            q!(|acc, v| {
1598                *acc = Some(v);
1599                true
1600            }),
1601        )
1602        .map(q!(|v| v.unwrap()))
1603    }
1604}
1605
1606impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1607where
1608    L: Location<'a>,
1609{
1610    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1611    ///
1612    /// # Example
1613    /// ```rust
1614    /// # #[cfg(feature = "deploy")] {
1615    /// # use hydro_lang::prelude::*;
1616    /// # use futures::StreamExt;
1617    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1618    /// let tick = process.tick();
1619    /// let numbers = process
1620    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1621    ///     .into_keyed();
1622    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1623    /// batch
1624    ///     .value_counts()
1625    ///     .entries()
1626    ///     .all_ticks()
1627    /// # }, |mut stream| async move {
1628    /// // (1, 3), (2, 2)
1629    /// # let mut results = Vec::new();
1630    /// # for _ in 0..2 {
1631    /// #     results.push(stream.next().await.unwrap());
1632    /// # }
1633    /// # results.sort();
1634    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1635    /// # }));
1636    /// # }
1637    /// ```
1638    pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1639    where
1640        K: Eq + Hash,
1641    {
1642        self.assume_ordering_trusted(
1643            nondet!(/** ordering within each group affects neither result nor intermediates */),
1644        )
1645        .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1646    }
1647}
1648
1649impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1650where
1651    L: Location<'a>,
1652{
1653    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1654    /// group via the `comb` closure.
1655    ///
1656    /// Depending on the input stream guarantees, the closure may need to be commutative
1657    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1658    ///
1659    /// If the input and output value types are the same and do not require initialization then use
1660    /// [`KeyedStream::reduce`].
1661    ///
1662    /// # Example
1663    /// ```rust
1664    /// # #[cfg(feature = "deploy")] {
1665    /// # use hydro_lang::prelude::*;
1666    /// # use futures::StreamExt;
1667    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1668    /// let tick = process.tick();
1669    /// let numbers = process
1670    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1671    ///     .into_keyed();
1672    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1673    /// batch
1674    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1675    ///     .entries()
1676    ///     .all_ticks()
1677    /// # }, |mut stream| async move {
1678    /// // (1, false), (2, true)
1679    /// # let mut results = Vec::new();
1680    /// # for _ in 0..2 {
1681    /// #     results.push(stream.next().await.unwrap());
1682    /// # }
1683    /// # results.sort();
1684    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1685    /// # }));
1686    /// # }
1687    /// ```
1688    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1689        self,
1690        init: impl IntoQuotedMut<'a, I, L>,
1691        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1692    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1693    where
1694        K: Eq + Hash,
1695        C: ValidCommutativityFor<O>,
1696        Idemp: ValidIdempotenceFor<R>,
1697    {
1698        let init = init.splice_fn0_ctx(&self.location).into();
1699        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1700        proof.register_proof(&comb);
1701
1702        let ordered = self
1703            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1704            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1705
1706        KeyedSingleton::new(
1707            ordered.location.clone(),
1708            HydroNode::FoldKeyed {
1709                init,
1710                acc: comb.into(),
1711                input: Box::new(ordered.ir_node.into_inner()),
1712                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1713                    K,
1714                    A,
1715                    L,
1716                    B::WhenValueUnbounded,
1717                >::collection_kind()),
1718            },
1719        )
1720    }
1721
1722    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1723    /// group via the `comb` closure.
1724    ///
1725    /// Depending on the input stream guarantees, the closure may need to be commutative
1726    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1727    ///
1728    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1729    ///
1730    /// # Example
1731    /// ```rust
1732    /// # #[cfg(feature = "deploy")] {
1733    /// # use hydro_lang::prelude::*;
1734    /// # use futures::StreamExt;
1735    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1736    /// let tick = process.tick();
1737    /// let numbers = process
1738    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1739    ///     .into_keyed();
1740    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1741    /// batch
1742    ///     .reduce(q!(|acc, x| *acc |= x))
1743    ///     .entries()
1744    ///     .all_ticks()
1745    /// # }, |mut stream| async move {
1746    /// // (1, false), (2, true)
1747    /// # let mut results = Vec::new();
1748    /// # for _ in 0..2 {
1749    /// #     results.push(stream.next().await.unwrap());
1750    /// # }
1751    /// # results.sort();
1752    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1753    /// # }));
1754    /// # }
1755    /// ```
1756    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1757        self,
1758        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1759    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1760    where
1761        K: Eq + Hash,
1762        C: ValidCommutativityFor<O>,
1763        Idemp: ValidIdempotenceFor<R>,
1764    {
1765        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1766        proof.register_proof(&f);
1767
1768        let ordered = self
1769            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1770            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1771
1772        KeyedSingleton::new(
1773            ordered.location.clone(),
1774            HydroNode::ReduceKeyed {
1775                f: f.into(),
1776                input: Box::new(ordered.ir_node.into_inner()),
1777                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1778                    K,
1779                    V,
1780                    L,
1781                    B::WhenValueUnbounded,
1782                >::collection_kind()),
1783            },
1784        )
1785    }
1786
1787    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1788    /// are automatically deleted.
1789    ///
1790    /// Depending on the input stream guarantees, the closure may need to be commutative
1791    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1792    ///
1793    /// # Example
1794    /// ```rust
1795    /// # #[cfg(feature = "deploy")] {
1796    /// # use hydro_lang::prelude::*;
1797    /// # use futures::StreamExt;
1798    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1799    /// let tick = process.tick();
1800    /// let watermark = tick.singleton(q!(1));
1801    /// let numbers = process
1802    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1803    ///     .into_keyed();
1804    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1805    /// batch
1806    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1807    ///     .entries()
1808    ///     .all_ticks()
1809    /// # }, |mut stream| async move {
1810    /// // (2, true)
1811    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1812    /// # }));
1813    /// # }
1814    /// ```
1815    pub fn reduce_watermark<O2, F, C, Idemp>(
1816        self,
1817        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1818        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1819    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1820    where
1821        K: Eq + Hash,
1822        O2: Clone,
1823        F: Fn(&mut V, V) + 'a,
1824        C: ValidCommutativityFor<O>,
1825        Idemp: ValidIdempotenceFor<R>,
1826    {
1827        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1828        check_matching_location(&self.location.root(), other.location.outer());
1829        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1830        proof.register_proof(&f);
1831
1832        let ordered = self
1833            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1834            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1835
1836        KeyedSingleton::new(
1837            ordered.location.clone(),
1838            HydroNode::ReduceKeyedWatermark {
1839                f: f.into(),
1840                input: Box::new(ordered.ir_node.into_inner()),
1841                watermark: Box::new(other.ir_node.into_inner()),
1842                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1843                    K,
1844                    V,
1845                    L,
1846                    B::WhenValueUnbounded,
1847                >::collection_kind()),
1848            },
1849        )
1850    }
1851
1852    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1853    /// whose keys are not in the bounded stream.
1854    ///
1855    /// # Example
1856    /// ```rust
1857    /// # #[cfg(feature = "deploy")] {
1858    /// # use hydro_lang::prelude::*;
1859    /// # use futures::StreamExt;
1860    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1861    /// let tick = process.tick();
1862    /// let keyed_stream = process
1863    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1864    ///     .batch(&tick, nondet!(/** test */))
1865    ///     .into_keyed();
1866    /// let keys_to_remove = process
1867    ///     .source_iter(q!(vec![1, 2]))
1868    ///     .batch(&tick, nondet!(/** test */));
1869    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1870    /// #   .entries()
1871    /// # }, |mut stream| async move {
1872    /// // { 3: ['c'], 4: ['d'] }
1873    /// # let mut results = Vec::new();
1874    /// # for _ in 0..2 {
1875    /// #     results.push(stream.next().await.unwrap());
1876    /// # }
1877    /// # results.sort();
1878    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1879    /// # }));
1880    /// # }
1881    /// ```
1882    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1883        self,
1884        other: Stream<K, L, Bounded, O2, R2>,
1885    ) -> Self
1886    where
1887        K: Eq + Hash,
1888    {
1889        check_matching_location(&self.location, &other.location);
1890
1891        KeyedStream::new(
1892            self.location.clone(),
1893            HydroNode::AntiJoin {
1894                pos: Box::new(self.ir_node.into_inner()),
1895                neg: Box::new(other.ir_node.into_inner()),
1896                metadata: self.location.new_node_metadata(Self::collection_kind()),
1897            },
1898        )
1899    }
1900
1901    /// Emit a keyed stream containing keys shared between two keyed streams,
1902    /// where each value in the output keyed stream is a tuple of
1903    /// (self's value, other's value).
1904    /// If there are multiple values for the same key, this performs a cross product
1905    /// for each matching key.
1906    ///
1907    /// # Example
1908    /// ```rust
1909    /// # #[cfg(feature = "deploy")] {
1910    /// # use hydro_lang::prelude::*;
1911    /// # use futures::StreamExt;
1912    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1913    /// let tick = process.tick();
1914    /// let keyed_data = process
1915    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1916    ///     .into_keyed()
1917    ///     .batch(&tick, nondet!(/** test */));
1918    /// let other_data = process
1919    ///     .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1920    ///     .into_keyed()
1921    ///     .batch(&tick, nondet!(/** test */));
1922    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1923    /// # }, |mut stream| async move {
1924    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1925    /// # let mut results = vec![];
1926    /// # for _ in 0..4 {
1927    /// #     results.push(stream.next().await.unwrap());
1928    /// # }
1929    /// # results.sort();
1930    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
1931    /// # }));
1932    /// # }
1933    /// ```
1934    pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
1935        self,
1936        other: KeyedStream<K, V2, L, B, O2, R2>,
1937    ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1938    where
1939        K: Eq + Hash,
1940        R: MinRetries<R2>,
1941    {
1942        self.entries().join(other.entries()).into_keyed()
1943    }
1944}
1945
1946impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1947where
1948    L: Location<'a>,
1949{
1950    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1951    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1952    ///
1953    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1954    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1955    /// argument that declares where the stream will be atomically processed. Batching a stream into
1956    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1957    /// [`Tick`] will introduce asynchrony.
1958    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1959        let out_location = Atomic { tick: tick.clone() };
1960        KeyedStream::new(
1961            out_location.clone(),
1962            HydroNode::BeginAtomic {
1963                inner: Box::new(self.ir_node.into_inner()),
1964                metadata: out_location
1965                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1966            },
1967        )
1968    }
1969
1970    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1971    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1972    /// the order of the input.
1973    ///
1974    /// # Non-Determinism
1975    /// The batch boundaries are non-deterministic and may change across executions.
1976    pub fn batch(
1977        self,
1978        tick: &Tick<L>,
1979        nondet: NonDet,
1980    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1981        let _ = nondet;
1982        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1983        KeyedStream::new(
1984            tick.clone(),
1985            HydroNode::Batch {
1986                inner: Box::new(self.ir_node.into_inner()),
1987                metadata: tick.new_node_metadata(
1988                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1989                ),
1990            },
1991        )
1992    }
1993}
1994
1995impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1996where
1997    L: Location<'a> + NoTick,
1998{
1999    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2000    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2001    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2002    /// used to create the atomic section.
2003    ///
2004    /// # Non-Determinism
2005    /// The batch boundaries are non-deterministic and may change across executions.
2006    pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2007        let _ = nondet;
2008        KeyedStream::new(
2009            self.location.clone().tick,
2010            HydroNode::Batch {
2011                inner: Box::new(self.ir_node.into_inner()),
2012                metadata: self.location.tick.new_node_metadata(KeyedStream::<
2013                    K,
2014                    V,
2015                    Tick<L>,
2016                    Bounded,
2017                    O,
2018                    R,
2019                >::collection_kind(
2020                )),
2021            },
2022        )
2023    }
2024
2025    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2026    /// See [`KeyedStream::atomic`] for more details.
2027    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2028        KeyedStream::new(
2029            self.location.tick.l.clone(),
2030            HydroNode::EndAtomic {
2031                inner: Box::new(self.ir_node.into_inner()),
2032                metadata: self
2033                    .location
2034                    .tick
2035                    .l
2036                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2037            },
2038        )
2039    }
2040}
2041
2042impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
2043where
2044    L: Location<'a>,
2045{
2046    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2047    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2048    /// is only present in one of the inputs, its values are passed through as-is). The output has
2049    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2050    ///
2051    /// Currently, both input streams must be [`Bounded`]. This operator will block
2052    /// on the first stream until all its elements are available. In a future version,
2053    /// we will relax the requirement on the `other` stream.
2054    ///
2055    /// # Example
2056    /// ```rust
2057    /// # #[cfg(feature = "deploy")] {
2058    /// # use hydro_lang::prelude::*;
2059    /// # use futures::StreamExt;
2060    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2061    /// let tick = process.tick();
2062    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2063    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2064    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2065    /// # .entries()
2066    /// # }, |mut stream| async move {
2067    /// // { 0: [2, 1], 1: [4, 3] }
2068    /// # let mut results = Vec::new();
2069    /// # for _ in 0..4 {
2070    /// #     results.push(stream.next().await.unwrap());
2071    /// # }
2072    /// # results.sort();
2073    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2074    /// # }));
2075    /// # }
2076    /// ```
2077    pub fn chain<O2: Ordering, R2: Retries>(
2078        self,
2079        other: KeyedStream<K, V, L, Bounded, O2, R2>,
2080    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2081    where
2082        O: MinOrder<O2>,
2083        R: MinRetries<R2>,
2084    {
2085        check_matching_location(&self.location, &other.location);
2086
2087        KeyedStream::new(
2088            self.location.clone(),
2089            HydroNode::Chain {
2090                first: Box::new(self.ir_node.into_inner()),
2091                second: Box::new(other.ir_node.into_inner()),
2092                metadata: self.location.new_node_metadata(KeyedStream::<
2093                    K,
2094                    V,
2095                    L,
2096                    Bounded,
2097                    <O as MinOrder<O2>>::Min,
2098                    <R as MinRetries<R2>>::Min,
2099                >::collection_kind()),
2100            },
2101        )
2102    }
2103
2104    /// Emit a keyed stream containing keys shared between the keyed stream and the
2105    /// keyed singleton, where each value in the output keyed stream is a tuple of
2106    /// (the keyed stream's value, the keyed singleton's value).
2107    ///
2108    /// # Example
2109    /// ```rust
2110    /// # #[cfg(feature = "deploy")] {
2111    /// # use hydro_lang::prelude::*;
2112    /// # use futures::StreamExt;
2113    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2114    /// let tick = process.tick();
2115    /// let keyed_data = process
2116    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2117    ///     .into_keyed()
2118    ///     .batch(&tick, nondet!(/** test */));
2119    /// let singleton_data = process
2120    ///     .source_iter(q!(vec![(1, 100), (2, 200)]))
2121    ///     .into_keyed()
2122    ///     .batch(&tick, nondet!(/** test */))
2123    ///     .first();
2124    /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2125    /// # }, |mut stream| async move {
2126    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2127    /// # let mut results = vec![];
2128    /// # for _ in 0..3 {
2129    /// #     results.push(stream.next().await.unwrap());
2130    /// # }
2131    /// # results.sort();
2132    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2133    /// # }));
2134    /// # }
2135    /// ```
2136    pub fn join_keyed_singleton<V2: Clone>(
2137        self,
2138        keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
2139    ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
2140    where
2141        K: Eq + Hash,
2142    {
2143        keyed_singleton
2144            .join_keyed_stream(self)
2145            .map(q!(|(v2, v)| (v, v2)))
2146    }
2147
2148    /// Gets the values associated with a specific key from the keyed stream.
2149    ///
2150    /// # Example
2151    /// ```rust
2152    /// # #[cfg(feature = "deploy")] {
2153    /// # use hydro_lang::prelude::*;
2154    /// # use futures::StreamExt;
2155    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2156    /// let tick = process.tick();
2157    /// let keyed_data = process
2158    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2159    ///     .into_keyed()
2160    ///     .batch(&tick, nondet!(/** test */));
2161    /// let key = tick.singleton(q!(1));
2162    /// keyed_data.get(key).all_ticks()
2163    /// # }, |mut stream| async move {
2164    /// // 10, 11 in any order
2165    /// # let mut results = vec![];
2166    /// # for _ in 0..2 {
2167    /// #     results.push(stream.next().await.unwrap());
2168    /// # }
2169    /// # results.sort();
2170    /// # assert_eq!(results, vec![10, 11]);
2171    /// # }));
2172    /// # }
2173    /// ```
2174    pub fn get(self, key: Singleton<K, L, Bounded>) -> Stream<V, L, Bounded, NoOrder, R>
2175    where
2176        K: Eq + Hash,
2177    {
2178        self.entries()
2179            .join(key.into_stream().map(q!(|k| (k, ()))))
2180            .map(q!(|(_, (v, _))| v))
2181    }
2182
2183    /// For each value in `self`, find the matching key in `lookup`.
2184    /// The output is a keyed stream with the key from `self`, and a value
2185    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2186    /// If the key is not present in `lookup`, the option will be [`None`].
2187    ///
2188    /// # Example
2189    /// ```rust
2190    /// # #[cfg(feature = "deploy")] {
2191    /// # use hydro_lang::prelude::*;
2192    /// # use futures::StreamExt;
2193    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2194    /// # let tick = process.tick();
2195    /// let requests = // { 1: [10, 11], 2: 20 }
2196    /// # process
2197    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2198    /// #     .into_keyed()
2199    /// #     .batch(&tick, nondet!(/** test */));
2200    /// let other_data = // { 10: 100, 11: 110 }
2201    /// # process
2202    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
2203    /// #     .into_keyed()
2204    /// #     .batch(&tick, nondet!(/** test */))
2205    /// #     .first();
2206    /// requests.lookup_keyed_singleton(other_data)
2207    /// # .entries().all_ticks()
2208    /// # }, |mut stream| async move {
2209    /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2210    /// # let mut results = vec![];
2211    /// # for _ in 0..3 {
2212    /// #     results.push(stream.next().await.unwrap());
2213    /// # }
2214    /// # results.sort();
2215    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2216    /// # }));
2217    /// # }
2218    /// ```
2219    pub fn lookup_keyed_singleton<V2>(
2220        self,
2221        lookup: KeyedSingleton<V, V2, L, Bounded>,
2222    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2223    where
2224        K: Eq + Hash + Clone,
2225        V: Eq + Hash + Clone,
2226        V2: Clone,
2227    {
2228        self.lookup_keyed_stream(
2229            lookup
2230                .into_keyed_stream()
2231                .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2232        )
2233    }
2234
2235    /// For each value in `self`, find the matching key in `lookup`.
2236    /// The output is a keyed stream with the key from `self`, and a value
2237    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2238    /// If the key is not present in `lookup`, the option will be [`None`].
2239    ///
2240    /// # Example
2241    /// ```rust
2242    /// # #[cfg(feature = "deploy")] {
2243    /// # use hydro_lang::prelude::*;
2244    /// # use futures::StreamExt;
2245    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2246    /// # let tick = process.tick();
2247    /// let requests = // { 1: [10, 11], 2: 20 }
2248    /// # process
2249    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2250    /// #     .into_keyed()
2251    /// #     .batch(&tick, nondet!(/** test */));
2252    /// let other_data = // { 10: [100, 101], 11: 110 }
2253    /// # process
2254    /// #     .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2255    /// #     .into_keyed()
2256    /// #     .batch(&tick, nondet!(/** test */));
2257    /// requests.lookup_keyed_stream(other_data)
2258    /// # .entries().all_ticks()
2259    /// # }, |mut stream| async move {
2260    /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2261    /// # let mut results = vec![];
2262    /// # for _ in 0..4 {
2263    /// #     results.push(stream.next().await.unwrap());
2264    /// # }
2265    /// # results.sort();
2266    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2267    /// # }));
2268    /// # }
2269    /// ```
2270    #[expect(clippy::type_complexity, reason = "retries propagation")]
2271    pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2272        self,
2273        lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2274    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2275    where
2276        K: Eq + Hash + Clone,
2277        V: Eq + Hash + Clone,
2278        V2: Clone,
2279        R: MinRetries<R2>,
2280    {
2281        let inverted = self
2282            .entries()
2283            .map(q!(|(key, lookup_value)| (lookup_value, key)))
2284            .into_keyed();
2285        let found = inverted
2286            .clone()
2287            .join_keyed_stream(lookup.clone())
2288            .entries()
2289            .map(q!(|(lookup_value, (key, value))| (
2290                key,
2291                (lookup_value, Some(value))
2292            )))
2293            .into_keyed();
2294        let not_found = inverted
2295            .filter_key_not_in(lookup.keys())
2296            .entries()
2297            .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2298            .into_keyed();
2299
2300        found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2301    }
2302}
2303
2304impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2305where
2306    L: Location<'a>,
2307{
2308    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2309    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2310    /// each key.
2311    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2312        KeyedStream::new(
2313            self.location.outer().clone(),
2314            HydroNode::YieldConcat {
2315                inner: Box::new(self.ir_node.into_inner()),
2316                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2317                    K,
2318                    V,
2319                    L,
2320                    Unbounded,
2321                    O,
2322                    R,
2323                >::collection_kind(
2324                )),
2325            },
2326        )
2327    }
2328
2329    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2330    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2331    /// each key.
2332    ///
2333    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2334    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2335    /// stream's [`Tick`] context.
2336    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2337        let out_location = Atomic {
2338            tick: self.location.clone(),
2339        };
2340
2341        KeyedStream::new(
2342            out_location.clone(),
2343            HydroNode::YieldConcat {
2344                inner: Box::new(self.ir_node.into_inner()),
2345                metadata: out_location.new_node_metadata(KeyedStream::<
2346                    K,
2347                    V,
2348                    Atomic<L>,
2349                    Unbounded,
2350                    O,
2351                    R,
2352                >::collection_kind()),
2353            },
2354        )
2355    }
2356
2357    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2358    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2359    ///
2360    /// This API is particularly useful for stateful computation on batches of data, such as
2361    /// maintaining an accumulated state that is up to date with the current batch.
2362    ///
2363    /// # Example
2364    /// ```rust
2365    /// # #[cfg(feature = "deploy")] {
2366    /// # use hydro_lang::prelude::*;
2367    /// # use futures::StreamExt;
2368    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2369    /// let tick = process.tick();
2370    /// # // ticks are lazy by default, forces the second tick to run
2371    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2372    /// # let batch_first_tick = process
2373    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2374    /// #   .into_keyed()
2375    /// #   .batch(&tick, nondet!(/** test */));
2376    /// # let batch_second_tick = process
2377    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2378    /// #   .into_keyed()
2379    /// #   .batch(&tick, nondet!(/** test */))
2380    /// #   .defer_tick(); // appears on the second tick
2381    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2382    ///
2383    /// input.batch(&tick, nondet!(/** test */))
2384    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2385    ///         *sum += new;
2386    ///     }))).entries().all_ticks()
2387    /// # }, |mut stream| async move {
2388    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2389    /// # let mut results = Vec::new();
2390    /// # for _ in 0..4 {
2391    /// #     results.push(stream.next().await.unwrap());
2392    /// # }
2393    /// # results.sort();
2394    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2395    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2396    /// # results.clear();
2397    /// # for _ in 0..4 {
2398    /// #     results.push(stream.next().await.unwrap());
2399    /// # }
2400    /// # results.sort();
2401    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2402    /// # }));
2403    /// # }
2404    /// ```
2405    pub fn across_ticks<Out: BatchAtomic>(
2406        self,
2407        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2408    ) -> Out::Batched {
2409        thunk(self.all_ticks_atomic()).batched_atomic()
2410    }
2411
2412    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2413    /// tick `T` always has the entries of `self` at tick `T - 1`.
2414    ///
2415    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2416    ///
2417    /// This operator enables stateful iterative processing with ticks, by sending data from one
2418    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2419    ///
2420    /// # Example
2421    /// ```rust
2422    /// # #[cfg(feature = "deploy")] {
2423    /// # use hydro_lang::prelude::*;
2424    /// # use futures::StreamExt;
2425    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2426    /// let tick = process.tick();
2427    /// # // ticks are lazy by default, forces the second tick to run
2428    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2429    /// # let batch_first_tick = process
2430    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2431    /// #   .batch(&tick, nondet!(/** test */))
2432    /// #   .into_keyed();
2433    /// # let batch_second_tick = process
2434    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2435    /// #   .batch(&tick, nondet!(/** test */))
2436    /// #   .defer_tick()
2437    /// #   .into_keyed(); // appears on the second tick
2438    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2439    /// # batch_first_tick.chain(batch_second_tick);
2440    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2441    ///     changes_across_ticks // from the current tick
2442    /// )
2443    /// # .entries().all_ticks()
2444    /// # }, |mut stream| async move {
2445    /// // First tick: { 1: [2, 3] }
2446    /// # let mut results = Vec::new();
2447    /// # for _ in 0..2 {
2448    /// #     results.push(stream.next().await.unwrap());
2449    /// # }
2450    /// # results.sort();
2451    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2452    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2453    /// # results.clear();
2454    /// # for _ in 0..4 {
2455    /// #     results.push(stream.next().await.unwrap());
2456    /// # }
2457    /// # results.sort();
2458    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2459    /// // Third tick: { 1: [4], 2: [5] }
2460    /// # results.clear();
2461    /// # for _ in 0..2 {
2462    /// #     results.push(stream.next().await.unwrap());
2463    /// # }
2464    /// # results.sort();
2465    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2466    /// # }));
2467    /// # }
2468    /// ```
2469    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2470        KeyedStream::new(
2471            self.location.clone(),
2472            HydroNode::DeferTick {
2473                input: Box::new(self.ir_node.into_inner()),
2474                metadata: self.location.new_node_metadata(KeyedStream::<
2475                    K,
2476                    V,
2477                    Tick<L>,
2478                    Bounded,
2479                    O,
2480                    R,
2481                >::collection_kind()),
2482            },
2483        )
2484    }
2485}
2486
2487#[cfg(test)]
2488mod tests {
2489    #[cfg(feature = "deploy")]
2490    use futures::{SinkExt, StreamExt};
2491    #[cfg(feature = "deploy")]
2492    use hydro_deploy::Deployment;
2493    #[cfg(any(feature = "deploy", feature = "sim"))]
2494    use stageleft::q;
2495
2496    #[cfg(any(feature = "deploy", feature = "sim"))]
2497    use crate::compile::builder::FlowBuilder;
2498    #[cfg(feature = "deploy")]
2499    use crate::live_collections::stream::ExactlyOnce;
2500    #[cfg(feature = "sim")]
2501    use crate::live_collections::stream::{NoOrder, TotalOrder};
2502    #[cfg(any(feature = "deploy", feature = "sim"))]
2503    use crate::location::Location;
2504    #[cfg(any(feature = "deploy", feature = "sim"))]
2505    use crate::nondet::nondet;
2506    #[cfg(feature = "deploy")]
2507    use crate::properties::ManualProof;
2508
2509    #[cfg(feature = "deploy")]
2510    #[tokio::test]
2511    async fn reduce_watermark_filter() {
2512        let mut deployment = Deployment::new();
2513
2514        let mut flow = FlowBuilder::new();
2515        let node = flow.process::<()>();
2516        let external = flow.external::<()>();
2517
2518        let node_tick = node.tick();
2519        let watermark = node_tick.singleton(q!(1));
2520
2521        let sum = node
2522            .source_stream(q!(tokio_stream::iter([
2523                (0, 100),
2524                (1, 101),
2525                (2, 102),
2526                (2, 102)
2527            ])))
2528            .into_keyed()
2529            .reduce_watermark(
2530                watermark,
2531                q!(|acc, v| {
2532                    *acc += v;
2533                }),
2534            )
2535            .snapshot(&node_tick, nondet!(/** test */))
2536            .entries()
2537            .all_ticks()
2538            .send_bincode_external(&external);
2539
2540        let nodes = flow
2541            .with_process(&node, deployment.Localhost())
2542            .with_external(&external, deployment.Localhost())
2543            .deploy(&mut deployment);
2544
2545        deployment.deploy().await.unwrap();
2546
2547        let mut out = nodes.connect(sum).await;
2548
2549        deployment.start().await.unwrap();
2550
2551        assert_eq!(out.next().await.unwrap(), (2, 204));
2552    }
2553
2554    #[cfg(feature = "deploy")]
2555    #[tokio::test]
2556    async fn reduce_watermark_bounded() {
2557        let mut deployment = Deployment::new();
2558
2559        let mut flow = FlowBuilder::new();
2560        let node = flow.process::<()>();
2561        let external = flow.external::<()>();
2562
2563        let node_tick = node.tick();
2564        let watermark = node_tick.singleton(q!(1));
2565
2566        let sum = node
2567            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2568            .into_keyed()
2569            .reduce_watermark(
2570                watermark,
2571                q!(|acc, v| {
2572                    *acc += v;
2573                }),
2574            )
2575            .entries()
2576            .send_bincode_external(&external);
2577
2578        let nodes = flow
2579            .with_process(&node, deployment.Localhost())
2580            .with_external(&external, deployment.Localhost())
2581            .deploy(&mut deployment);
2582
2583        deployment.deploy().await.unwrap();
2584
2585        let mut out = nodes.connect(sum).await;
2586
2587        deployment.start().await.unwrap();
2588
2589        assert_eq!(out.next().await.unwrap(), (2, 204));
2590    }
2591
2592    #[cfg(feature = "deploy")]
2593    #[tokio::test]
2594    async fn reduce_watermark_garbage_collect() {
2595        let mut deployment = Deployment::new();
2596
2597        let mut flow = FlowBuilder::new();
2598        let node = flow.process::<()>();
2599        let external = flow.external::<()>();
2600        let (tick_send, tick_trigger) =
2601            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2602
2603        let node_tick = node.tick();
2604        let (watermark_complete_cycle, watermark) =
2605            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2606        let next_watermark = watermark.clone().map(q!(|v| v + 1));
2607        watermark_complete_cycle.complete_next_tick(next_watermark);
2608
2609        let tick_triggered_input = node_tick
2610            .singleton(q!((3, 103)))
2611            .into_stream()
2612            .filter_if_some(
2613                tick_trigger
2614                    .clone()
2615                    .batch(&node_tick, nondet!(/** test */))
2616                    .first(),
2617            )
2618            .all_ticks();
2619
2620        let sum = node
2621            .source_stream(q!(tokio_stream::iter([
2622                (0, 100),
2623                (1, 101),
2624                (2, 102),
2625                (2, 102)
2626            ])))
2627            .interleave(tick_triggered_input)
2628            .into_keyed()
2629            .reduce_watermark(
2630                watermark,
2631                q!(
2632                    |acc, v| {
2633                        *acc += v;
2634                    },
2635                    commutative = ManualProof(/* integer addition is commutative */)
2636                ),
2637            )
2638            .snapshot(&node_tick, nondet!(/** test */))
2639            .entries()
2640            .all_ticks()
2641            .send_bincode_external(&external);
2642
2643        let nodes = flow
2644            .with_default_optimize()
2645            .with_process(&node, deployment.Localhost())
2646            .with_external(&external, deployment.Localhost())
2647            .deploy(&mut deployment);
2648
2649        deployment.deploy().await.unwrap();
2650
2651        let mut tick_send = nodes.connect(tick_send).await;
2652        let mut out_recv = nodes.connect(sum).await;
2653
2654        deployment.start().await.unwrap();
2655
2656        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2657
2658        tick_send.send(()).await.unwrap();
2659
2660        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2661    }
2662
2663    #[cfg(feature = "sim")]
2664    #[test]
2665    #[should_panic]
2666    fn sim_batch_nondet_size() {
2667        let mut flow = FlowBuilder::new();
2668        let node = flow.process::<()>();
2669
2670        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2671
2672        let tick = node.tick();
2673        let out_recv = input
2674            .batch(&tick, nondet!(/** test */))
2675            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2676            .entries()
2677            .all_ticks()
2678            .sim_output();
2679
2680        flow.sim().exhaustive(async || {
2681            out_recv
2682                .assert_yields_only_unordered([(1, vec![1, 2])])
2683                .await;
2684        });
2685    }
2686
2687    #[cfg(feature = "sim")]
2688    #[test]
2689    fn sim_batch_preserves_group_order() {
2690        let mut flow = FlowBuilder::new();
2691        let node = flow.process::<()>();
2692
2693        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2694
2695        let tick = node.tick();
2696        let out_recv = input
2697            .batch(&tick, nondet!(/** test */))
2698            .all_ticks()
2699            .fold_early_stop(
2700                q!(|| 0),
2701                q!(|acc, v| {
2702                    *acc = std::cmp::max(v, *acc);
2703                    *acc >= 2
2704                }),
2705            )
2706            .entries()
2707            .sim_output();
2708
2709        let instances = flow.sim().exhaustive(async || {
2710            out_recv
2711                .assert_yields_only_unordered([(1, 2), (2, 3)])
2712                .await;
2713        });
2714
2715        assert_eq!(instances, 8);
2716        // - three cases: all three in a separate tick (pick where (2, 3) is)
2717        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2718        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2719        // - one case: all three together
2720    }
2721
2722    #[cfg(feature = "sim")]
2723    #[test]
2724    fn sim_batch_unordered_shuffles() {
2725        let mut flow = FlowBuilder::new();
2726        let node = flow.process::<()>();
2727
2728        let input = node
2729            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2730            .into_keyed()
2731            .weaken_ordering::<NoOrder>();
2732
2733        let tick = node.tick();
2734        let out_recv = input
2735            .batch(&tick, nondet!(/** test */))
2736            .all_ticks()
2737            .entries()
2738            .sim_output();
2739
2740        let instances = flow.sim().exhaustive(async || {
2741            out_recv
2742                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2743                .await;
2744        });
2745
2746        assert_eq!(instances, 13);
2747        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2748        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2749        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2750        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2751    }
2752
2753    #[cfg(feature = "sim")]
2754    #[test]
2755    #[should_panic]
2756    fn sim_observe_order_batched() {
2757        let mut flow = FlowBuilder::new();
2758        let node = flow.process::<()>();
2759
2760        let (port, input) = node.sim_input::<_, NoOrder, _>();
2761
2762        let tick = node.tick();
2763        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2764        let out_recv = batch
2765            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2766            .all_ticks()
2767            .first()
2768            .entries()
2769            .sim_output();
2770
2771        flow.sim().exhaustive(async || {
2772            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2773            out_recv
2774                .assert_yields_only_unordered([(1, 1), (2, 1)])
2775                .await; // fails with assume_ordering
2776        });
2777    }
2778
2779    #[cfg(feature = "sim")]
2780    #[test]
2781    fn sim_observe_order_batched_count() {
2782        let mut flow = FlowBuilder::new();
2783        let node = flow.process::<()>();
2784
2785        let (port, input) = node.sim_input::<_, NoOrder, _>();
2786
2787        let tick = node.tick();
2788        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2789        let out_recv = batch
2790            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2791            .all_ticks()
2792            .entries()
2793            .sim_output();
2794
2795        let instance_count = flow.sim().exhaustive(async || {
2796            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2797            let _ = out_recv.collect_sorted::<Vec<_>>().await;
2798        });
2799
2800        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2801    }
2802
2803    #[cfg(feature = "sim")]
2804    #[test]
2805    fn sim_top_level_assume_ordering() {
2806        use std::collections::HashMap;
2807
2808        let mut flow = FlowBuilder::new();
2809        let node = flow.process::<()>();
2810
2811        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2812
2813        let out_recv = input
2814            .into_keyed()
2815            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2816            .fold_early_stop(
2817                q!(|| Vec::new()),
2818                q!(|acc, v| {
2819                    acc.push(v);
2820                    acc.len() >= 2
2821                }),
2822            )
2823            .entries()
2824            .sim_output();
2825
2826        let instance_count = flow.sim().exhaustive(async || {
2827            in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
2828            let out: HashMap<_, _> = out_recv
2829                .collect_sorted::<Vec<_>>()
2830                .await
2831                .into_iter()
2832                .collect();
2833            // Each key accumulates its values; we get one entry per key
2834            assert_eq!(out.len(), 2);
2835        });
2836
2837        assert_eq!(instance_count, 24)
2838    }
2839
2840    #[cfg(feature = "sim")]
2841    #[test]
2842    fn sim_top_level_assume_ordering_cycle_back() {
2843        use std::collections::HashMap;
2844
2845        let mut flow = FlowBuilder::new();
2846        let node = flow.process::<()>();
2847
2848        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2849
2850        let (complete_cycle_back, cycle_back) =
2851            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
2852        let ordered = input
2853            .into_keyed()
2854            .interleave(cycle_back)
2855            .assume_ordering::<TotalOrder>(nondet!(/** test */));
2856        complete_cycle_back.complete(
2857            ordered
2858                .clone()
2859                .map(q!(|v| v + 1))
2860                .filter(q!(|v| v % 2 == 1)),
2861        );
2862
2863        let out_recv = ordered
2864            .fold_early_stop(
2865                q!(|| Vec::new()),
2866                q!(|acc, v| {
2867                    acc.push(v);
2868                    acc.len() >= 2
2869                }),
2870            )
2871            .entries()
2872            .sim_output();
2873
2874        let mut saw = false;
2875        let instance_count = flow.sim().exhaustive(async || {
2876            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
2877            // We want to see [0, 1] - the cycled back value interleaved
2878            in_send.send_many_unordered([(1, 0), (1, 2)]);
2879            let out: HashMap<_, _> = out_recv
2880                .collect_sorted::<Vec<_>>()
2881                .await
2882                .into_iter()
2883                .collect();
2884
2885            // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
2886            if let Some(values) = out.get(&1)
2887                && *values == vec![0, 1]
2888            {
2889                saw = true;
2890            }
2891        });
2892
2893        assert!(
2894            saw,
2895            "did not see an instance with key 1 having [0, 1] in order"
2896        );
2897        assert_eq!(instance_count, 6);
2898    }
2899
2900    #[cfg(feature = "sim")]
2901    #[test]
2902    fn sim_top_level_assume_ordering_cross_key_cycle() {
2903        use std::collections::HashMap;
2904
2905        // This test demonstrates why releasing one entry at a time is important:
2906        // When one key's observed order cycles back into a different key, we need
2907        // to be able to interleave the cycled-back entry with pending items for
2908        // that other key.
2909        let mut flow = FlowBuilder::new();
2910        let node = flow.process::<()>();
2911
2912        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2913
2914        let (complete_cycle_back, cycle_back) =
2915            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
2916        let ordered = input
2917            .into_keyed()
2918            .interleave(cycle_back)
2919            .assume_ordering::<TotalOrder>(nondet!(/** test */));
2920
2921        // Cycle back: when we see (1, 10), emit (2, 100) to key 2
2922        complete_cycle_back.complete(
2923            ordered
2924                .clone()
2925                .filter(q!(|v| *v == 10))
2926                .map(q!(|_| 100))
2927                .entries()
2928                .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
2929                .into_keyed(),
2930        );
2931
2932        let out_recv = ordered
2933            .fold_early_stop(
2934                q!(|| Vec::new()),
2935                q!(|acc, v| {
2936                    acc.push(v);
2937                    acc.len() >= 2
2938                }),
2939            )
2940            .entries()
2941            .sim_output();
2942
2943        // We want to see an instance where:
2944        // - (1, 10) is released first
2945        // - This causes (2, 100) to be cycled back
2946        // - (2, 100) is released BEFORE (2, 20) which was already pending
2947        let mut saw_cross_key_interleave = false;
2948        let instance_count = flow.sim().exhaustive(async || {
2949            // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
2950            in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
2951            let out: HashMap<_, _> = out_recv
2952                .collect_sorted::<Vec<_>>()
2953                .await
2954                .into_iter()
2955                .collect();
2956
2957            // Check if we see the cross-key interleaving:
2958            // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
2959            if let Some(values) = out.get(&2)
2960                && values.len() >= 2
2961                && values[0] == 100
2962            {
2963                saw_cross_key_interleave = true;
2964            }
2965        });
2966
2967        assert!(
2968            saw_cross_key_interleave,
2969            "did not see an instance where cycled-back 100 was released before pending items for key 2"
2970        );
2971        assert_eq!(instance_count, 60);
2972    }
2973
2974    #[cfg(feature = "sim")]
2975    #[test]
2976    fn sim_top_level_assume_ordering_cycle_back_tick() {
2977        use std::collections::HashMap;
2978
2979        let mut flow = FlowBuilder::new();
2980        let node = flow.process::<()>();
2981
2982        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2983
2984        let (complete_cycle_back, cycle_back) =
2985            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
2986        let ordered = input
2987            .into_keyed()
2988            .interleave(cycle_back)
2989            .assume_ordering::<TotalOrder>(nondet!(/** test */));
2990        complete_cycle_back.complete(
2991            ordered
2992                .clone()
2993                .batch(&node.tick(), nondet!(/** test */))
2994                .all_ticks()
2995                .map(q!(|v| v + 1))
2996                .filter(q!(|v| v % 2 == 1)),
2997        );
2998
2999        let out_recv = ordered
3000            .fold_early_stop(
3001                q!(|| Vec::new()),
3002                q!(|acc, v| {
3003                    acc.push(v);
3004                    acc.len() >= 2
3005                }),
3006            )
3007            .entries()
3008            .sim_output();
3009
3010        let mut saw = false;
3011        let instance_count = flow.sim().exhaustive(async || {
3012            in_send.send_many_unordered([(1, 0), (1, 2)]);
3013            let out: HashMap<_, _> = out_recv
3014                .collect_sorted::<Vec<_>>()
3015                .await
3016                .into_iter()
3017                .collect();
3018
3019            if let Some(values) = out.get(&1)
3020                && *values == vec![0, 1]
3021            {
3022                saw = true;
3023            }
3024        });
3025
3026        assert!(
3027            saw,
3028            "did not see an instance with key 1 having [0, 1] in order"
3029        );
3030        assert_eq!(instance_count, 58);
3031    }
3032}