hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
19};
20#[cfg(stageleft_runtime)]
21use crate::forward_handle::{CycleCollection, ReceiverComplete};
22use crate::forward_handle::{ForwardRef, TickCycle};
23use crate::live_collections::batch_atomic::BatchAtomic;
24use crate::live_collections::stream::{
25 AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
26};
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::DeferTick;
30use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
34
35pub mod networking;
36
37/// Streaming elements of type `V` grouped by a key of type `K`.
38///
39/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
40/// order of keys is non-deterministic but the order *within* each group may be deterministic.
41///
42/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
43/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
44/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
45///
46/// Type Parameters:
47/// - `K`: the type of the key for each group
48/// - `V`: the type of the elements inside each group
49/// - `Loc`: the [`Location`] where the keyed stream is materialized
50/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
51/// - `Order`: tracks whether the elements within each group have deterministic order
52/// ([`TotalOrder`]) or not ([`NoOrder`])
53/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
54/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
55pub struct KeyedStream<
56 K,
57 V,
58 Loc,
59 Bound: Boundedness = Unbounded,
60 Order: Ordering = TotalOrder,
61 Retry: Retries = ExactlyOnce,
62> {
63 pub(crate) location: Loc,
64 pub(crate) ir_node: RefCell<HydroNode>,
65
66 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
67}
68
69impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
70 for KeyedStream<K, V, L, Unbounded, O, R>
71where
72 L: Location<'a>,
73{
74 fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
75 let new_meta = stream
76 .location
77 .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
78
79 KeyedStream {
80 location: stream.location,
81 ir_node: RefCell::new(HydroNode::Cast {
82 inner: Box::new(stream.ir_node.into_inner()),
83 metadata: new_meta,
84 }),
85 _phantom: PhantomData,
86 }
87 }
88}
89
90impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
91 for KeyedStream<K, V, L, B, NoOrder, R>
92where
93 L: Location<'a>,
94{
95 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
96 stream.weaken_ordering()
97 }
98}
99
100impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
101where
102 L: Location<'a>,
103{
104 fn defer_tick(self) -> Self {
105 KeyedStream::defer_tick(self)
106 }
107}
108
109impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
110 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
111where
112 L: Location<'a>,
113{
114 type Location = Tick<L>;
115
116 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
117 KeyedStream {
118 location: location.clone(),
119 ir_node: RefCell::new(HydroNode::CycleSource {
120 ident,
121 metadata: location.new_node_metadata(
122 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
123 ),
124 }),
125 _phantom: PhantomData,
126 }
127 }
128}
129
130impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
131 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
132where
133 L: Location<'a>,
134{
135 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
136 assert_eq!(
137 Location::id(&self.location),
138 expected_location,
139 "locations do not match"
140 );
141
142 self.location
143 .flow_state()
144 .borrow_mut()
145 .push_root(HydroRoot::CycleSink {
146 ident,
147 input: Box::new(self.ir_node.into_inner()),
148 op_metadata: HydroIrOpMetadata::new(),
149 });
150 }
151}
152
153impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
154 for KeyedStream<K, V, L, B, O, R>
155where
156 L: Location<'a> + NoTick,
157{
158 type Location = L;
159
160 fn create_source(ident: syn::Ident, location: L) -> Self {
161 KeyedStream {
162 location: location.clone(),
163 ir_node: RefCell::new(HydroNode::CycleSource {
164 ident,
165 metadata: location
166 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
167 }),
168 _phantom: PhantomData,
169 }
170 }
171}
172
173impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
174 for KeyedStream<K, V, L, B, O, R>
175where
176 L: Location<'a> + NoTick,
177{
178 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
179 assert_eq!(
180 Location::id(&self.location),
181 expected_location,
182 "locations do not match"
183 );
184 self.location
185 .flow_state()
186 .borrow_mut()
187 .push_root(HydroRoot::CycleSink {
188 ident,
189 input: Box::new(self.ir_node.into_inner()),
190 op_metadata: HydroIrOpMetadata::new(),
191 });
192 }
193}
194
195impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
196 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
197{
198 fn clone(&self) -> Self {
199 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
200 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
201 *self.ir_node.borrow_mut() = HydroNode::Tee {
202 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
203 metadata: self.location.new_node_metadata(Self::collection_kind()),
204 };
205 }
206
207 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
208 KeyedStream {
209 location: self.location.clone(),
210 ir_node: HydroNode::Tee {
211 inner: TeeNode(inner.0.clone()),
212 metadata: metadata.clone(),
213 }
214 .into(),
215 _phantom: PhantomData,
216 }
217 } else {
218 unreachable!()
219 }
220 }
221}
222
223impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
224 KeyedStream<K, V, L, B, O, R>
225{
226 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
227 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
228 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
229
230 KeyedStream {
231 location,
232 ir_node: RefCell::new(ir_node),
233 _phantom: PhantomData,
234 }
235 }
236
237 /// Returns the [`CollectionKind`] corresponding to this type.
238 pub fn collection_kind() -> CollectionKind {
239 CollectionKind::KeyedStream {
240 bound: B::BOUND_KIND,
241 value_order: O::ORDERING_KIND,
242 value_retry: R::RETRIES_KIND,
243 key_type: stageleft::quote_type::<K>().into(),
244 value_type: stageleft::quote_type::<V>().into(),
245 }
246 }
247
248 /// Returns the [`Location`] where this keyed stream is being materialized.
249 pub fn location(&self) -> &L {
250 &self.location
251 }
252
253 /// Explicitly "casts" the keyed stream to a type with a different ordering
254 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
255 /// by the type-system.
256 ///
257 /// # Non-Determinism
258 /// This function is used as an escape hatch, and any mistakes in the
259 /// provided ordering guarantee will propagate into the guarantees
260 /// for the rest of the program.
261 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
262 if O::ORDERING_KIND == O2::ORDERING_KIND {
263 KeyedStream::new(self.location, self.ir_node.into_inner())
264 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
265 // We can always weaken the ordering guarantee
266 KeyedStream::new(
267 self.location.clone(),
268 HydroNode::Cast {
269 inner: Box::new(self.ir_node.into_inner()),
270 metadata: self
271 .location
272 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
273 },
274 )
275 } else {
276 KeyedStream::new(
277 self.location.clone(),
278 HydroNode::ObserveNonDet {
279 inner: Box::new(self.ir_node.into_inner()),
280 trusted: false,
281 metadata: self
282 .location
283 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
284 },
285 )
286 }
287 }
288
289 fn assume_ordering_trusted<O2: Ordering>(
290 self,
291 _nondet: NonDet,
292 ) -> KeyedStream<K, V, L, B, O2, R> {
293 if O::ORDERING_KIND == O2::ORDERING_KIND {
294 KeyedStream::new(self.location, self.ir_node.into_inner())
295 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
296 // We can always weaken the ordering guarantee
297 KeyedStream::new(
298 self.location.clone(),
299 HydroNode::Cast {
300 inner: Box::new(self.ir_node.into_inner()),
301 metadata: self
302 .location
303 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
304 },
305 )
306 } else {
307 KeyedStream::new(
308 self.location.clone(),
309 HydroNode::ObserveNonDet {
310 inner: Box::new(self.ir_node.into_inner()),
311 trusted: true,
312 metadata: self
313 .location
314 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
315 },
316 )
317 }
318 }
319
320 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
321 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
322 /// which is always safe because that is the weakest possible guarantee.
323 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
324 self.weaken_ordering::<NoOrder>()
325 }
326
327 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
328 /// enforcing that `O2` is weaker than the input ordering guarantee.
329 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
330 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
331 self.assume_ordering::<O2>(nondet)
332 }
333
334 /// Explicitly "casts" the keyed stream to a type with a different retries
335 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
336 /// be proven by the type-system.
337 ///
338 /// # Non-Determinism
339 /// This function is used as an escape hatch, and any mistakes in the
340 /// provided retries guarantee will propagate into the guarantees
341 /// for the rest of the program.
342 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
343 if R::RETRIES_KIND == R2::RETRIES_KIND {
344 KeyedStream::new(self.location, self.ir_node.into_inner())
345 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
346 // We can always weaken the retries guarantee
347 KeyedStream::new(
348 self.location.clone(),
349 HydroNode::Cast {
350 inner: Box::new(self.ir_node.into_inner()),
351 metadata: self
352 .location
353 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
354 },
355 )
356 } else {
357 KeyedStream::new(
358 self.location.clone(),
359 HydroNode::ObserveNonDet {
360 inner: Box::new(self.ir_node.into_inner()),
361 trusted: false,
362 metadata: self
363 .location
364 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
365 },
366 )
367 }
368 }
369
370 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
371 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
372 /// which is always safe because that is the weakest possible guarantee.
373 pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
374 self.weaken_retries::<AtLeastOnce>()
375 }
376
377 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
378 /// enforcing that `R2` is weaker than the input retries guarantee.
379 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
380 let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
381 self.assume_retries::<R2>(nondet)
382 }
383
384 /// Flattens the keyed stream into an unordered stream of key-value pairs.
385 ///
386 /// # Example
387 /// ```rust
388 /// # #[cfg(feature = "deploy")] {
389 /// # use hydro_lang::prelude::*;
390 /// # use futures::StreamExt;
391 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
392 /// process
393 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
394 /// .into_keyed()
395 /// .entries()
396 /// # }, |mut stream| async move {
397 /// // (1, 2), (1, 3), (2, 4) in any order
398 /// # let mut results = Vec::new();
399 /// # for _ in 0..3 {
400 /// # results.push(stream.next().await.unwrap());
401 /// # }
402 /// # results.sort();
403 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
404 /// # }));
405 /// # }
406 /// ```
407 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
408 Stream::new(
409 self.location.clone(),
410 HydroNode::Cast {
411 inner: Box::new(self.ir_node.into_inner()),
412 metadata: self
413 .location
414 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
415 },
416 )
417 }
418
419 /// Flattens the keyed stream into an unordered stream of only the values.
420 ///
421 /// # Example
422 /// ```rust
423 /// # #[cfg(feature = "deploy")] {
424 /// # use hydro_lang::prelude::*;
425 /// # use futures::StreamExt;
426 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
427 /// process
428 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
429 /// .into_keyed()
430 /// .values()
431 /// # }, |mut stream| async move {
432 /// // 2, 3, 4 in any order
433 /// # let mut results = Vec::new();
434 /// # for _ in 0..3 {
435 /// # results.push(stream.next().await.unwrap());
436 /// # }
437 /// # results.sort();
438 /// # assert_eq!(results, vec![2, 3, 4]);
439 /// # }));
440 /// # }
441 /// ```
442 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
443 self.entries().map(q!(|(_, v)| v))
444 }
445
446 /// Flattens the keyed stream into an unordered stream of just the keys.
447 ///
448 /// # Example
449 /// ```rust
450 /// # #[cfg(feature = "deploy")] {
451 /// # use hydro_lang::prelude::*;
452 /// # use futures::StreamExt;
453 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
454 /// # process
455 /// # .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
456 /// # .into_keyed()
457 /// # .keys()
458 /// # }, |mut stream| async move {
459 /// // 1, 2 in any order
460 /// # let mut results = Vec::new();
461 /// # for _ in 0..2 {
462 /// # results.push(stream.next().await.unwrap());
463 /// # }
464 /// # results.sort();
465 /// # assert_eq!(results, vec![1, 2]);
466 /// # }));
467 /// # }
468 /// ```
469 pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
470 where
471 K: Eq + Hash,
472 {
473 self.entries().map(q!(|(k, _)| k)).unique()
474 }
475
476 /// Transforms each value by invoking `f` on each element, with keys staying the same
477 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
478 ///
479 /// If you do not want to modify the stream and instead only want to view
480 /// each item use [`KeyedStream::inspect`] instead.
481 ///
482 /// # Example
483 /// ```rust
484 /// # #[cfg(feature = "deploy")] {
485 /// # use hydro_lang::prelude::*;
486 /// # use futures::StreamExt;
487 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
488 /// process
489 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
490 /// .into_keyed()
491 /// .map(q!(|v| v + 1))
492 /// # .entries()
493 /// # }, |mut stream| async move {
494 /// // { 1: [3, 4], 2: [5] }
495 /// # let mut results = Vec::new();
496 /// # for _ in 0..3 {
497 /// # results.push(stream.next().await.unwrap());
498 /// # }
499 /// # results.sort();
500 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
501 /// # }));
502 /// # }
503 /// ```
504 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
505 where
506 F: Fn(V) -> U + 'a,
507 {
508 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
509 let map_f = q!({
510 let orig = f;
511 move |(k, v)| (k, orig(v))
512 })
513 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
514 .into();
515
516 KeyedStream::new(
517 self.location.clone(),
518 HydroNode::Map {
519 f: map_f,
520 input: Box::new(self.ir_node.into_inner()),
521 metadata: self
522 .location
523 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
524 },
525 )
526 }
527
528 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
529 /// re-grouped even they are tuples; instead they will be grouped under the original key.
530 ///
531 /// If you do not want to modify the stream and instead only want to view
532 /// each item use [`KeyedStream::inspect_with_key`] instead.
533 ///
534 /// # Example
535 /// ```rust
536 /// # #[cfg(feature = "deploy")] {
537 /// # use hydro_lang::prelude::*;
538 /// # use futures::StreamExt;
539 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
540 /// process
541 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
542 /// .into_keyed()
543 /// .map_with_key(q!(|(k, v)| k + v))
544 /// # .entries()
545 /// # }, |mut stream| async move {
546 /// // { 1: [3, 4], 2: [6] }
547 /// # let mut results = Vec::new();
548 /// # for _ in 0..3 {
549 /// # results.push(stream.next().await.unwrap());
550 /// # }
551 /// # results.sort();
552 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
553 /// # }));
554 /// # }
555 /// ```
556 pub fn map_with_key<U, F>(
557 self,
558 f: impl IntoQuotedMut<'a, F, L> + Copy,
559 ) -> KeyedStream<K, U, L, B, O, R>
560 where
561 F: Fn((K, V)) -> U + 'a,
562 K: Clone,
563 {
564 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
565 let map_f = q!({
566 let orig = f;
567 move |(k, v)| {
568 let out = orig((Clone::clone(&k), v));
569 (k, out)
570 }
571 })
572 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
573 .into();
574
575 KeyedStream::new(
576 self.location.clone(),
577 HydroNode::Map {
578 f: map_f,
579 input: Box::new(self.ir_node.into_inner()),
580 metadata: self
581 .location
582 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
583 },
584 )
585 }
586
587 /// Prepends a new value to the key of each element in the stream, producing a new
588 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
589 /// occurs and the elements in each group preserve their original order.
590 ///
591 /// # Example
592 /// ```rust
593 /// # #[cfg(feature = "deploy")] {
594 /// # use hydro_lang::prelude::*;
595 /// # use futures::StreamExt;
596 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
597 /// process
598 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
599 /// .into_keyed()
600 /// .prefix_key(q!(|&(k, _)| k % 2))
601 /// # .entries()
602 /// # }, |mut stream| async move {
603 /// // { (1, 1): [2, 3], (0, 2): [4] }
604 /// # let mut results = Vec::new();
605 /// # for _ in 0..3 {
606 /// # results.push(stream.next().await.unwrap());
607 /// # }
608 /// # results.sort();
609 /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
610 /// # }));
611 /// # }
612 /// ```
613 pub fn prefix_key<K2, F>(
614 self,
615 f: impl IntoQuotedMut<'a, F, L> + Copy,
616 ) -> KeyedStream<(K2, K), V, L, B, O, R>
617 where
618 F: Fn(&(K, V)) -> K2 + 'a,
619 {
620 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
621 let map_f = q!({
622 let orig = f;
623 move |kv| {
624 let out = orig(&kv);
625 ((out, kv.0), kv.1)
626 }
627 })
628 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
629 .into();
630
631 KeyedStream::new(
632 self.location.clone(),
633 HydroNode::Map {
634 f: map_f,
635 input: Box::new(self.ir_node.into_inner()),
636 metadata: self
637 .location
638 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
639 },
640 )
641 }
642
643 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
644 /// `f`, preserving the order of the elements within the group.
645 ///
646 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
647 /// not modify or take ownership of the values. If you need to modify the values while filtering
648 /// use [`KeyedStream::filter_map`] instead.
649 ///
650 /// # Example
651 /// ```rust
652 /// # #[cfg(feature = "deploy")] {
653 /// # use hydro_lang::prelude::*;
654 /// # use futures::StreamExt;
655 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
656 /// process
657 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
658 /// .into_keyed()
659 /// .filter(q!(|&x| x > 2))
660 /// # .entries()
661 /// # }, |mut stream| async move {
662 /// // { 1: [3], 2: [4] }
663 /// # let mut results = Vec::new();
664 /// # for _ in 0..2 {
665 /// # results.push(stream.next().await.unwrap());
666 /// # }
667 /// # results.sort();
668 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
669 /// # }));
670 /// # }
671 /// ```
672 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
673 where
674 F: Fn(&V) -> bool + 'a,
675 {
676 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
677 let filter_f = q!({
678 let orig = f;
679 move |t: &(_, _)| orig(&t.1)
680 })
681 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
682 .into();
683
684 KeyedStream::new(
685 self.location.clone(),
686 HydroNode::Filter {
687 f: filter_f,
688 input: Box::new(self.ir_node.into_inner()),
689 metadata: self.location.new_node_metadata(Self::collection_kind()),
690 },
691 )
692 }
693
694 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
695 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
696 ///
697 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
698 /// not modify or take ownership of the values. If you need to modify the values while filtering
699 /// use [`KeyedStream::filter_map_with_key`] instead.
700 ///
701 /// # Example
702 /// ```rust
703 /// # #[cfg(feature = "deploy")] {
704 /// # use hydro_lang::prelude::*;
705 /// # use futures::StreamExt;
706 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
707 /// process
708 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
709 /// .into_keyed()
710 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
711 /// # .entries()
712 /// # }, |mut stream| async move {
713 /// // { 1: [3], 2: [4] }
714 /// # let mut results = Vec::new();
715 /// # for _ in 0..2 {
716 /// # results.push(stream.next().await.unwrap());
717 /// # }
718 /// # results.sort();
719 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
720 /// # }));
721 /// # }
722 /// ```
723 pub fn filter_with_key<F>(
724 self,
725 f: impl IntoQuotedMut<'a, F, L> + Copy,
726 ) -> KeyedStream<K, V, L, B, O, R>
727 where
728 F: Fn(&(K, V)) -> bool + 'a,
729 {
730 let filter_f = f
731 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
732 .into();
733
734 KeyedStream::new(
735 self.location.clone(),
736 HydroNode::Filter {
737 f: filter_f,
738 input: Box::new(self.ir_node.into_inner()),
739 metadata: self.location.new_node_metadata(Self::collection_kind()),
740 },
741 )
742 }
743
744 /// An operator that both filters and maps each value, with keys staying the same.
745 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
746 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
747 ///
748 /// # Example
749 /// ```rust
750 /// # #[cfg(feature = "deploy")] {
751 /// # use hydro_lang::prelude::*;
752 /// # use futures::StreamExt;
753 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
754 /// process
755 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
756 /// .into_keyed()
757 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
758 /// # .entries()
759 /// # }, |mut stream| async move {
760 /// // { 1: [2], 2: [4] }
761 /// # let mut results = Vec::new();
762 /// # for _ in 0..2 {
763 /// # results.push(stream.next().await.unwrap());
764 /// # }
765 /// # results.sort();
766 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
767 /// # }));
768 /// # }
769 /// ```
770 pub fn filter_map<U, F>(
771 self,
772 f: impl IntoQuotedMut<'a, F, L> + Copy,
773 ) -> KeyedStream<K, U, L, B, O, R>
774 where
775 F: Fn(V) -> Option<U> + 'a,
776 {
777 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
778 let filter_map_f = q!({
779 let orig = f;
780 move |(k, v)| orig(v).map(|o| (k, o))
781 })
782 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
783 .into();
784
785 KeyedStream::new(
786 self.location.clone(),
787 HydroNode::FilterMap {
788 f: filter_map_f,
789 input: Box::new(self.ir_node.into_inner()),
790 metadata: self
791 .location
792 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
793 },
794 )
795 }
796
797 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
798 /// re-grouped even they are tuples; instead they will be grouped under the original key.
799 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
800 ///
801 /// # Example
802 /// ```rust
803 /// # #[cfg(feature = "deploy")] {
804 /// # use hydro_lang::prelude::*;
805 /// # use futures::StreamExt;
806 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
807 /// process
808 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
809 /// .into_keyed()
810 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
811 /// # .entries()
812 /// # }, |mut stream| async move {
813 /// // { 2: [2] }
814 /// # let mut results = Vec::new();
815 /// # for _ in 0..1 {
816 /// # results.push(stream.next().await.unwrap());
817 /// # }
818 /// # results.sort();
819 /// # assert_eq!(results, vec![(2, 2)]);
820 /// # }));
821 /// # }
822 /// ```
823 pub fn filter_map_with_key<U, F>(
824 self,
825 f: impl IntoQuotedMut<'a, F, L> + Copy,
826 ) -> KeyedStream<K, U, L, B, O, R>
827 where
828 F: Fn((K, V)) -> Option<U> + 'a,
829 K: Clone,
830 {
831 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
832 let filter_map_f = q!({
833 let orig = f;
834 move |(k, v)| {
835 let out = orig((Clone::clone(&k), v));
836 out.map(|o| (k, o))
837 }
838 })
839 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
840 .into();
841
842 KeyedStream::new(
843 self.location.clone(),
844 HydroNode::FilterMap {
845 f: filter_map_f,
846 input: Box::new(self.ir_node.into_inner()),
847 metadata: self
848 .location
849 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
850 },
851 )
852 }
853
854 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
855 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
856 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
857 ///
858 /// # Example
859 /// ```rust
860 /// # #[cfg(feature = "deploy")] {
861 /// # use hydro_lang::prelude::*;
862 /// # use futures::StreamExt;
863 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
864 /// let tick = process.tick();
865 /// let batch = process
866 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
867 /// .into_keyed()
868 /// .batch(&tick, nondet!(/** test */));
869 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
870 /// batch.cross_singleton(count).all_ticks().entries()
871 /// # }, |mut stream| async move {
872 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
873 /// # let mut results = Vec::new();
874 /// # for _ in 0..3 {
875 /// # results.push(stream.next().await.unwrap());
876 /// # }
877 /// # results.sort();
878 /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
879 /// # }));
880 /// # }
881 /// ```
882 pub fn cross_singleton<O2>(
883 self,
884 other: impl Into<Optional<O2, L, Bounded>>,
885 ) -> KeyedStream<K, (V, O2), L, B, O, R>
886 where
887 O2: Clone,
888 {
889 let other: Optional<O2, L, Bounded> = other.into();
890 check_matching_location(&self.location, &other.location);
891
892 Stream::new(
893 self.location.clone(),
894 HydroNode::CrossSingleton {
895 left: Box::new(self.ir_node.into_inner()),
896 right: Box::new(other.ir_node.into_inner()),
897 metadata: self
898 .location
899 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
900 },
901 )
902 .map(q!(|((k, v), o2)| (k, (v, o2))))
903 .into_keyed()
904 }
905
906 /// For each value `v` in each group, transform `v` using `f` and then treat the
907 /// result as an [`Iterator`] to produce values one by one within the same group.
908 /// The implementation for [`Iterator`] for the output type `I` must produce items
909 /// in a **deterministic** order.
910 ///
911 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
912 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
913 ///
914 /// # Example
915 /// ```rust
916 /// # #[cfg(feature = "deploy")] {
917 /// # use hydro_lang::prelude::*;
918 /// # use futures::StreamExt;
919 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
920 /// process
921 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
922 /// .into_keyed()
923 /// .flat_map_ordered(q!(|x| x))
924 /// # .entries()
925 /// # }, |mut stream| async move {
926 /// // { 1: [2, 3, 4], 2: [5, 6] }
927 /// # let mut results = Vec::new();
928 /// # for _ in 0..5 {
929 /// # results.push(stream.next().await.unwrap());
930 /// # }
931 /// # results.sort();
932 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
933 /// # }));
934 /// # }
935 /// ```
936 pub fn flat_map_ordered<U, I, F>(
937 self,
938 f: impl IntoQuotedMut<'a, F, L> + Copy,
939 ) -> KeyedStream<K, U, L, B, O, R>
940 where
941 I: IntoIterator<Item = U>,
942 F: Fn(V) -> I + 'a,
943 K: Clone,
944 {
945 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
946 let flat_map_f = q!({
947 let orig = f;
948 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
949 })
950 .splice_fn1_ctx::<(K, V), _>(&self.location)
951 .into();
952
953 KeyedStream::new(
954 self.location.clone(),
955 HydroNode::FlatMap {
956 f: flat_map_f,
957 input: Box::new(self.ir_node.into_inner()),
958 metadata: self
959 .location
960 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
961 },
962 )
963 }
964
965 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
966 /// for the output type `I` to produce items in any order.
967 ///
968 /// # Example
969 /// ```rust
970 /// # #[cfg(feature = "deploy")] {
971 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
972 /// # use futures::StreamExt;
973 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
974 /// process
975 /// .source_iter(q!(vec![
976 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
977 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
978 /// ]))
979 /// .into_keyed()
980 /// .flat_map_unordered(q!(|x| x))
981 /// # .entries()
982 /// # }, |mut stream| async move {
983 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
984 /// # let mut results = Vec::new();
985 /// # for _ in 0..4 {
986 /// # results.push(stream.next().await.unwrap());
987 /// # }
988 /// # results.sort();
989 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
990 /// # }));
991 /// # }
992 /// ```
993 pub fn flat_map_unordered<U, I, F>(
994 self,
995 f: impl IntoQuotedMut<'a, F, L> + Copy,
996 ) -> KeyedStream<K, U, L, B, NoOrder, R>
997 where
998 I: IntoIterator<Item = U>,
999 F: Fn(V) -> I + 'a,
1000 K: Clone,
1001 {
1002 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1003 let flat_map_f = q!({
1004 let orig = f;
1005 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1006 })
1007 .splice_fn1_ctx::<(K, V), _>(&self.location)
1008 .into();
1009
1010 KeyedStream::new(
1011 self.location.clone(),
1012 HydroNode::FlatMap {
1013 f: flat_map_f,
1014 input: Box::new(self.ir_node.into_inner()),
1015 metadata: self
1016 .location
1017 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1018 },
1019 )
1020 }
1021
1022 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1023 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1024 /// items in a **deterministic** order.
1025 ///
1026 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1027 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1028 ///
1029 /// # Example
1030 /// ```rust
1031 /// # #[cfg(feature = "deploy")] {
1032 /// # use hydro_lang::prelude::*;
1033 /// # use futures::StreamExt;
1034 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1035 /// process
1036 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1037 /// .into_keyed()
1038 /// .flatten_ordered()
1039 /// # .entries()
1040 /// # }, |mut stream| async move {
1041 /// // { 1: [2, 3, 4], 2: [5, 6] }
1042 /// # let mut results = Vec::new();
1043 /// # for _ in 0..5 {
1044 /// # results.push(stream.next().await.unwrap());
1045 /// # }
1046 /// # results.sort();
1047 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1048 /// # }));
1049 /// # }
1050 /// ```
1051 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1052 where
1053 V: IntoIterator<Item = U>,
1054 K: Clone,
1055 {
1056 self.flat_map_ordered(q!(|d| d))
1057 }
1058
1059 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1060 /// for the value type `V` to produce items in any order.
1061 ///
1062 /// # Example
1063 /// ```rust
1064 /// # #[cfg(feature = "deploy")] {
1065 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1066 /// # use futures::StreamExt;
1067 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1068 /// process
1069 /// .source_iter(q!(vec![
1070 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1071 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1072 /// ]))
1073 /// .into_keyed()
1074 /// .flatten_unordered()
1075 /// # .entries()
1076 /// # }, |mut stream| async move {
1077 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1078 /// # let mut results = Vec::new();
1079 /// # for _ in 0..4 {
1080 /// # results.push(stream.next().await.unwrap());
1081 /// # }
1082 /// # results.sort();
1083 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1084 /// # }));
1085 /// # }
1086 /// ```
1087 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1088 where
1089 V: IntoIterator<Item = U>,
1090 K: Clone,
1091 {
1092 self.flat_map_unordered(q!(|d| d))
1093 }
1094
1095 /// An operator which allows you to "inspect" each element of a stream without
1096 /// modifying it. The closure `f` is called on a reference to each value. This is
1097 /// mainly useful for debugging, and should not be used to generate side-effects.
1098 ///
1099 /// # Example
1100 /// ```rust
1101 /// # #[cfg(feature = "deploy")] {
1102 /// # use hydro_lang::prelude::*;
1103 /// # use futures::StreamExt;
1104 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1105 /// process
1106 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1107 /// .into_keyed()
1108 /// .inspect(q!(|v| println!("{}", v)))
1109 /// # .entries()
1110 /// # }, |mut stream| async move {
1111 /// # let mut results = Vec::new();
1112 /// # for _ in 0..3 {
1113 /// # results.push(stream.next().await.unwrap());
1114 /// # }
1115 /// # results.sort();
1116 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1117 /// # }));
1118 /// # }
1119 /// ```
1120 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1121 where
1122 F: Fn(&V) + 'a,
1123 {
1124 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1125 let inspect_f = q!({
1126 let orig = f;
1127 move |t: &(_, _)| orig(&t.1)
1128 })
1129 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1130 .into();
1131
1132 KeyedStream::new(
1133 self.location.clone(),
1134 HydroNode::Inspect {
1135 f: inspect_f,
1136 input: Box::new(self.ir_node.into_inner()),
1137 metadata: self.location.new_node_metadata(Self::collection_kind()),
1138 },
1139 )
1140 }
1141
1142 /// An operator which allows you to "inspect" each element of a stream without
1143 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1144 /// mainly useful for debugging, and should not be used to generate side-effects.
1145 ///
1146 /// # Example
1147 /// ```rust
1148 /// # #[cfg(feature = "deploy")] {
1149 /// # use hydro_lang::prelude::*;
1150 /// # use futures::StreamExt;
1151 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1152 /// process
1153 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1154 /// .into_keyed()
1155 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1156 /// # .entries()
1157 /// # }, |mut stream| async move {
1158 /// # let mut results = Vec::new();
1159 /// # for _ in 0..3 {
1160 /// # results.push(stream.next().await.unwrap());
1161 /// # }
1162 /// # results.sort();
1163 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1164 /// # }));
1165 /// # }
1166 /// ```
1167 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1168 where
1169 F: Fn(&(K, V)) + 'a,
1170 {
1171 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1172
1173 KeyedStream::new(
1174 self.location.clone(),
1175 HydroNode::Inspect {
1176 f: inspect_f,
1177 input: Box::new(self.ir_node.into_inner()),
1178 metadata: self.location.new_node_metadata(Self::collection_kind()),
1179 },
1180 )
1181 }
1182
1183 /// An operator which allows you to "name" a `HydroNode`.
1184 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1185 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1186 {
1187 let mut node = self.ir_node.borrow_mut();
1188 let metadata = node.metadata_mut();
1189 metadata.tag = Some(name.to_owned());
1190 }
1191 self
1192 }
1193}
1194
1195impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1196 KeyedStream<(K1, K2), V, L, B, O, R>
1197{
1198 /// Produces a new keyed stream by dropping the first element of the compound key.
1199 ///
1200 /// Because multiple keys may share the same suffix, this operation results in re-grouping
1201 /// of the values under the new keys. The values across groups with the same new key
1202 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
1203 ///
1204 /// # Example
1205 /// ```rust
1206 /// # #[cfg(feature = "deploy")] {
1207 /// # use hydro_lang::prelude::*;
1208 /// # use futures::StreamExt;
1209 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1210 /// process
1211 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
1212 /// .into_keyed()
1213 /// .drop_key_prefix()
1214 /// # .entries()
1215 /// # }, |mut stream| async move {
1216 /// // { 10: [2, 3], 20: [4] }
1217 /// # let mut results = Vec::new();
1218 /// # for _ in 0..3 {
1219 /// # results.push(stream.next().await.unwrap());
1220 /// # }
1221 /// # results.sort();
1222 /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
1223 /// # }));
1224 /// # }
1225 /// ```
1226 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
1227 self.entries()
1228 .map(q!(|((_k1, k2), v)| (k2, v)))
1229 .into_keyed()
1230 }
1231}
1232
1233impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
1234 KeyedStream<K, V, L, Unbounded, O, R>
1235{
1236 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
1237 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
1238 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
1239 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
1240 ///
1241 /// Currently, both input streams must be [`Unbounded`].
1242 ///
1243 /// # Example
1244 /// ```rust
1245 /// # #[cfg(feature = "deploy")] {
1246 /// # use hydro_lang::prelude::*;
1247 /// # use futures::StreamExt;
1248 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1249 /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
1250 /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
1251 /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
1252 /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
1253 /// numbers1.interleave(numbers2)
1254 /// # .entries()
1255 /// # }, |mut stream| async move {
1256 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
1257 /// # let mut results = Vec::new();
1258 /// # for _ in 0..4 {
1259 /// # results.push(stream.next().await.unwrap());
1260 /// # }
1261 /// # results.sort();
1262 /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
1263 /// # }));
1264 /// # }
1265 /// ```
1266 pub fn interleave<O2: Ordering, R2: Retries>(
1267 self,
1268 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
1269 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1270 where
1271 R: MinRetries<R2>,
1272 {
1273 KeyedStream::new(
1274 self.location.clone(),
1275 HydroNode::Chain {
1276 first: Box::new(self.ir_node.into_inner()),
1277 second: Box::new(other.ir_node.into_inner()),
1278 metadata: self.location.new_node_metadata(KeyedStream::<
1279 K,
1280 V,
1281 L,
1282 Unbounded,
1283 NoOrder,
1284 <R as MinRetries<R2>>::Min,
1285 >::collection_kind()),
1286 },
1287 )
1288 }
1289}
1290
1291/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
1292/// control the processing of future elements.
1293pub enum Generate<T> {
1294 /// Emit the provided element, and keep processing future inputs.
1295 Yield(T),
1296 /// Emit the provided element as the _final_ element, do not process future inputs.
1297 Return(T),
1298 /// Do not emit anything, but continue processing future inputs.
1299 Continue,
1300 /// Do not emit anything, and do not process further inputs.
1301 Break,
1302}
1303
1304impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1305where
1306 L: Location<'a>,
1307{
1308 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1309 ///
1310 /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1311 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1312 /// early by returning `None`.
1313 ///
1314 /// The function takes a mutable reference to the accumulator and the current element, and returns
1315 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1316 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1317 ///
1318 /// # Example
1319 /// ```rust
1320 /// # #[cfg(feature = "deploy")] {
1321 /// # use hydro_lang::prelude::*;
1322 /// # use futures::StreamExt;
1323 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1324 /// process
1325 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1326 /// .into_keyed()
1327 /// .scan(
1328 /// q!(|| 0),
1329 /// q!(|acc, x| {
1330 /// *acc += x;
1331 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1332 /// }),
1333 /// )
1334 /// # .entries()
1335 /// # }, |mut stream| async move {
1336 /// // Output: { 0: [1], 1: [3, 7] }
1337 /// # let mut results = Vec::new();
1338 /// # for _ in 0..3 {
1339 /// # results.push(stream.next().await.unwrap());
1340 /// # }
1341 /// # results.sort();
1342 /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1343 /// # }));
1344 /// # }
1345 /// ```
1346 pub fn scan<A, U, I, F>(
1347 self,
1348 init: impl IntoQuotedMut<'a, I, L> + Copy,
1349 f: impl IntoQuotedMut<'a, F, L> + Copy,
1350 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1351 where
1352 K: Clone + Eq + Hash,
1353 I: Fn() -> A + 'a,
1354 F: Fn(&mut A, V) -> Option<U> + 'a,
1355 {
1356 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1357 self.generator(
1358 init,
1359 q!({
1360 let orig = f;
1361 move |state, v| {
1362 if let Some(out) = orig(state, v) {
1363 Generate::Yield(out)
1364 } else {
1365 Generate::Break
1366 }
1367 }
1368 }),
1369 )
1370 }
1371
1372 /// Iteratively processes the elements in each group using a state machine that can yield
1373 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1374 /// syntax in Rust, without requiring special syntax.
1375 ///
1376 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1377 /// state for each group. The second argument defines the processing logic, taking in a
1378 /// mutable reference to the group's state and the value to be processed. It emits a
1379 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1380 /// should be processed.
1381 ///
1382 /// # Example
1383 /// ```rust
1384 /// # #[cfg(feature = "deploy")] {
1385 /// # use hydro_lang::prelude::*;
1386 /// # use futures::StreamExt;
1387 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1388 /// process
1389 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1390 /// .into_keyed()
1391 /// .generator(
1392 /// q!(|| 0),
1393 /// q!(|acc, x| {
1394 /// *acc += x;
1395 /// if *acc > 100 {
1396 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1397 /// "done!".to_owned()
1398 /// )
1399 /// } else if *acc % 2 == 0 {
1400 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1401 /// "even".to_owned()
1402 /// )
1403 /// } else {
1404 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1405 /// }
1406 /// }),
1407 /// )
1408 /// # .entries()
1409 /// # }, |mut stream| async move {
1410 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1411 /// # let mut results = Vec::new();
1412 /// # for _ in 0..3 {
1413 /// # results.push(stream.next().await.unwrap());
1414 /// # }
1415 /// # results.sort();
1416 /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1417 /// # }));
1418 /// # }
1419 /// ```
1420 pub fn generator<A, U, I, F>(
1421 self,
1422 init: impl IntoQuotedMut<'a, I, L> + Copy,
1423 f: impl IntoQuotedMut<'a, F, L> + Copy,
1424 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1425 where
1426 K: Clone + Eq + Hash,
1427 I: Fn() -> A + 'a,
1428 F: Fn(&mut A, V) -> Generate<U> + 'a,
1429 {
1430 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1431 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1432
1433 let scan_init = q!(|| HashMap::new())
1434 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&self.location)
1435 .into();
1436 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1437 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1438 if let Some(existing_state_value) = existing_state {
1439 match f(existing_state_value, v) {
1440 Generate::Yield(out) => Some(Some((k, out))),
1441 Generate::Return(out) => {
1442 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1443 Some(Some((k, out)))
1444 }
1445 Generate::Break => {
1446 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1447 Some(None)
1448 }
1449 Generate::Continue => Some(None),
1450 }
1451 } else {
1452 Some(None)
1453 }
1454 })
1455 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&self.location)
1456 .into();
1457
1458 let scan_node = HydroNode::Scan {
1459 init: scan_init,
1460 acc: scan_f,
1461 input: Box::new(self.ir_node.into_inner()),
1462 metadata: self.location.new_node_metadata(Stream::<
1463 Option<(K, U)>,
1464 L,
1465 B,
1466 TotalOrder,
1467 ExactlyOnce,
1468 >::collection_kind()),
1469 };
1470
1471 let flatten_f = q!(|d| d)
1472 .splice_fn1_ctx::<Option<(K, U)>, _>(&self.location)
1473 .into();
1474 let flatten_node = HydroNode::FlatMap {
1475 f: flatten_f,
1476 input: Box::new(scan_node),
1477 metadata: self.location.new_node_metadata(KeyedStream::<
1478 K,
1479 U,
1480 L,
1481 B,
1482 TotalOrder,
1483 ExactlyOnce,
1484 >::collection_kind()),
1485 };
1486
1487 KeyedStream::new(self.location, flatten_node)
1488 }
1489
1490 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1491 /// in-order across the values in each group. But the aggregation function returns a boolean,
1492 /// which when true indicates that the aggregated result is complete and can be released to
1493 /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1494 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1495 /// normal stream elements.
1496 ///
1497 /// # Example
1498 /// ```rust
1499 /// # #[cfg(feature = "deploy")] {
1500 /// # use hydro_lang::prelude::*;
1501 /// # use futures::StreamExt;
1502 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1503 /// process
1504 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1505 /// .into_keyed()
1506 /// .fold_early_stop(
1507 /// q!(|| 0),
1508 /// q!(|acc, x| {
1509 /// *acc += x;
1510 /// x % 2 == 0
1511 /// }),
1512 /// )
1513 /// # .entries()
1514 /// # }, |mut stream| async move {
1515 /// // Output: { 0: 2, 1: 9 }
1516 /// # let mut results = Vec::new();
1517 /// # for _ in 0..2 {
1518 /// # results.push(stream.next().await.unwrap());
1519 /// # }
1520 /// # results.sort();
1521 /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1522 /// # }));
1523 /// # }
1524 /// ```
1525 pub fn fold_early_stop<A, I, F>(
1526 self,
1527 init: impl IntoQuotedMut<'a, I, L> + Copy,
1528 f: impl IntoQuotedMut<'a, F, L> + Copy,
1529 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1530 where
1531 K: Clone + Eq + Hash,
1532 I: Fn() -> A + 'a,
1533 F: Fn(&mut A, V) -> bool + 'a,
1534 {
1535 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1536 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1537 let out_without_bound_cast = self.generator(
1538 q!(move || Some(init())),
1539 q!(move |key_state, v| {
1540 if let Some(key_state_value) = key_state.as_mut() {
1541 if f(key_state_value, v) {
1542 Generate::Return(key_state.take().unwrap())
1543 } else {
1544 Generate::Continue
1545 }
1546 } else {
1547 unreachable!()
1548 }
1549 }),
1550 );
1551
1552 KeyedSingleton::new(
1553 out_without_bound_cast.location.clone(),
1554 HydroNode::Cast {
1555 inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1556 metadata: out_without_bound_cast
1557 .location
1558 .new_node_metadata(
1559 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1560 ),
1561 },
1562 )
1563 }
1564
1565 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1566 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1567 /// otherwise the first element would be non-deterministic.
1568 ///
1569 /// # Example
1570 /// ```rust
1571 /// # #[cfg(feature = "deploy")] {
1572 /// # use hydro_lang::prelude::*;
1573 /// # use futures::StreamExt;
1574 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1575 /// process
1576 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1577 /// .into_keyed()
1578 /// .first()
1579 /// # .entries()
1580 /// # }, |mut stream| async move {
1581 /// // Output: { 0: 2, 1: 3 }
1582 /// # let mut results = Vec::new();
1583 /// # for _ in 0..2 {
1584 /// # results.push(stream.next().await.unwrap());
1585 /// # }
1586 /// # results.sort();
1587 /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1588 /// # }));
1589 /// # }
1590 /// ```
1591 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1592 where
1593 K: Clone + Eq + Hash,
1594 {
1595 self.fold_early_stop(
1596 q!(|| None),
1597 q!(|acc, v| {
1598 *acc = Some(v);
1599 true
1600 }),
1601 )
1602 .map(q!(|v| v.unwrap()))
1603 }
1604}
1605
1606impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1607where
1608 L: Location<'a>,
1609{
1610 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1611 ///
1612 /// # Example
1613 /// ```rust
1614 /// # #[cfg(feature = "deploy")] {
1615 /// # use hydro_lang::prelude::*;
1616 /// # use futures::StreamExt;
1617 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1618 /// let tick = process.tick();
1619 /// let numbers = process
1620 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1621 /// .into_keyed();
1622 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1623 /// batch
1624 /// .value_counts()
1625 /// .entries()
1626 /// .all_ticks()
1627 /// # }, |mut stream| async move {
1628 /// // (1, 3), (2, 2)
1629 /// # let mut results = Vec::new();
1630 /// # for _ in 0..2 {
1631 /// # results.push(stream.next().await.unwrap());
1632 /// # }
1633 /// # results.sort();
1634 /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1635 /// # }));
1636 /// # }
1637 /// ```
1638 pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1639 where
1640 K: Eq + Hash,
1641 {
1642 self.assume_ordering_trusted(
1643 nondet!(/** ordering within each group affects neither result nor intermediates */),
1644 )
1645 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1646 }
1647}
1648
1649impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1650where
1651 L: Location<'a>,
1652{
1653 /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1654 /// group via the `comb` closure.
1655 ///
1656 /// Depending on the input stream guarantees, the closure may need to be commutative
1657 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1658 ///
1659 /// If the input and output value types are the same and do not require initialization then use
1660 /// [`KeyedStream::reduce`].
1661 ///
1662 /// # Example
1663 /// ```rust
1664 /// # #[cfg(feature = "deploy")] {
1665 /// # use hydro_lang::prelude::*;
1666 /// # use futures::StreamExt;
1667 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1668 /// let tick = process.tick();
1669 /// let numbers = process
1670 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1671 /// .into_keyed();
1672 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1673 /// batch
1674 /// .fold(q!(|| false), q!(|acc, x| *acc |= x))
1675 /// .entries()
1676 /// .all_ticks()
1677 /// # }, |mut stream| async move {
1678 /// // (1, false), (2, true)
1679 /// # let mut results = Vec::new();
1680 /// # for _ in 0..2 {
1681 /// # results.push(stream.next().await.unwrap());
1682 /// # }
1683 /// # results.sort();
1684 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1685 /// # }));
1686 /// # }
1687 /// ```
1688 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1689 self,
1690 init: impl IntoQuotedMut<'a, I, L>,
1691 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1692 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1693 where
1694 K: Eq + Hash,
1695 C: ValidCommutativityFor<O>,
1696 Idemp: ValidIdempotenceFor<R>,
1697 {
1698 let init = init.splice_fn0_ctx(&self.location).into();
1699 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1700 proof.register_proof(&comb);
1701
1702 let ordered = self
1703 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1704 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1705
1706 KeyedSingleton::new(
1707 ordered.location.clone(),
1708 HydroNode::FoldKeyed {
1709 init,
1710 acc: comb.into(),
1711 input: Box::new(ordered.ir_node.into_inner()),
1712 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1713 K,
1714 A,
1715 L,
1716 B::WhenValueUnbounded,
1717 >::collection_kind()),
1718 },
1719 )
1720 }
1721
1722 /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1723 /// group via the `comb` closure.
1724 ///
1725 /// Depending on the input stream guarantees, the closure may need to be commutative
1726 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1727 ///
1728 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1729 ///
1730 /// # Example
1731 /// ```rust
1732 /// # #[cfg(feature = "deploy")] {
1733 /// # use hydro_lang::prelude::*;
1734 /// # use futures::StreamExt;
1735 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1736 /// let tick = process.tick();
1737 /// let numbers = process
1738 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1739 /// .into_keyed();
1740 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1741 /// batch
1742 /// .reduce(q!(|acc, x| *acc |= x))
1743 /// .entries()
1744 /// .all_ticks()
1745 /// # }, |mut stream| async move {
1746 /// // (1, false), (2, true)
1747 /// # let mut results = Vec::new();
1748 /// # for _ in 0..2 {
1749 /// # results.push(stream.next().await.unwrap());
1750 /// # }
1751 /// # results.sort();
1752 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1753 /// # }));
1754 /// # }
1755 /// ```
1756 pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1757 self,
1758 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1759 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1760 where
1761 K: Eq + Hash,
1762 C: ValidCommutativityFor<O>,
1763 Idemp: ValidIdempotenceFor<R>,
1764 {
1765 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1766 proof.register_proof(&f);
1767
1768 let ordered = self
1769 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1770 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1771
1772 KeyedSingleton::new(
1773 ordered.location.clone(),
1774 HydroNode::ReduceKeyed {
1775 f: f.into(),
1776 input: Box::new(ordered.ir_node.into_inner()),
1777 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1778 K,
1779 V,
1780 L,
1781 B::WhenValueUnbounded,
1782 >::collection_kind()),
1783 },
1784 )
1785 }
1786
1787 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1788 /// are automatically deleted.
1789 ///
1790 /// Depending on the input stream guarantees, the closure may need to be commutative
1791 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1792 ///
1793 /// # Example
1794 /// ```rust
1795 /// # #[cfg(feature = "deploy")] {
1796 /// # use hydro_lang::prelude::*;
1797 /// # use futures::StreamExt;
1798 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1799 /// let tick = process.tick();
1800 /// let watermark = tick.singleton(q!(1));
1801 /// let numbers = process
1802 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1803 /// .into_keyed();
1804 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1805 /// batch
1806 /// .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1807 /// .entries()
1808 /// .all_ticks()
1809 /// # }, |mut stream| async move {
1810 /// // (2, true)
1811 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1812 /// # }));
1813 /// # }
1814 /// ```
1815 pub fn reduce_watermark<O2, F, C, Idemp>(
1816 self,
1817 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1818 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1819 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1820 where
1821 K: Eq + Hash,
1822 O2: Clone,
1823 F: Fn(&mut V, V) + 'a,
1824 C: ValidCommutativityFor<O>,
1825 Idemp: ValidIdempotenceFor<R>,
1826 {
1827 let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1828 check_matching_location(&self.location.root(), other.location.outer());
1829 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1830 proof.register_proof(&f);
1831
1832 let ordered = self
1833 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1834 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1835
1836 KeyedSingleton::new(
1837 ordered.location.clone(),
1838 HydroNode::ReduceKeyedWatermark {
1839 f: f.into(),
1840 input: Box::new(ordered.ir_node.into_inner()),
1841 watermark: Box::new(other.ir_node.into_inner()),
1842 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1843 K,
1844 V,
1845 L,
1846 B::WhenValueUnbounded,
1847 >::collection_kind()),
1848 },
1849 )
1850 }
1851
1852 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1853 /// whose keys are not in the bounded stream.
1854 ///
1855 /// # Example
1856 /// ```rust
1857 /// # #[cfg(feature = "deploy")] {
1858 /// # use hydro_lang::prelude::*;
1859 /// # use futures::StreamExt;
1860 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1861 /// let tick = process.tick();
1862 /// let keyed_stream = process
1863 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1864 /// .batch(&tick, nondet!(/** test */))
1865 /// .into_keyed();
1866 /// let keys_to_remove = process
1867 /// .source_iter(q!(vec![1, 2]))
1868 /// .batch(&tick, nondet!(/** test */));
1869 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1870 /// # .entries()
1871 /// # }, |mut stream| async move {
1872 /// // { 3: ['c'], 4: ['d'] }
1873 /// # let mut results = Vec::new();
1874 /// # for _ in 0..2 {
1875 /// # results.push(stream.next().await.unwrap());
1876 /// # }
1877 /// # results.sort();
1878 /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1879 /// # }));
1880 /// # }
1881 /// ```
1882 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1883 self,
1884 other: Stream<K, L, Bounded, O2, R2>,
1885 ) -> Self
1886 where
1887 K: Eq + Hash,
1888 {
1889 check_matching_location(&self.location, &other.location);
1890
1891 KeyedStream::new(
1892 self.location.clone(),
1893 HydroNode::AntiJoin {
1894 pos: Box::new(self.ir_node.into_inner()),
1895 neg: Box::new(other.ir_node.into_inner()),
1896 metadata: self.location.new_node_metadata(Self::collection_kind()),
1897 },
1898 )
1899 }
1900
1901 /// Emit a keyed stream containing keys shared between two keyed streams,
1902 /// where each value in the output keyed stream is a tuple of
1903 /// (self's value, other's value).
1904 /// If there are multiple values for the same key, this performs a cross product
1905 /// for each matching key.
1906 ///
1907 /// # Example
1908 /// ```rust
1909 /// # #[cfg(feature = "deploy")] {
1910 /// # use hydro_lang::prelude::*;
1911 /// # use futures::StreamExt;
1912 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1913 /// let tick = process.tick();
1914 /// let keyed_data = process
1915 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1916 /// .into_keyed()
1917 /// .batch(&tick, nondet!(/** test */));
1918 /// let other_data = process
1919 /// .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1920 /// .into_keyed()
1921 /// .batch(&tick, nondet!(/** test */));
1922 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1923 /// # }, |mut stream| async move {
1924 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1925 /// # let mut results = vec![];
1926 /// # for _ in 0..4 {
1927 /// # results.push(stream.next().await.unwrap());
1928 /// # }
1929 /// # results.sort();
1930 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
1931 /// # }));
1932 /// # }
1933 /// ```
1934 pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
1935 self,
1936 other: KeyedStream<K, V2, L, B, O2, R2>,
1937 ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1938 where
1939 K: Eq + Hash,
1940 R: MinRetries<R2>,
1941 {
1942 self.entries().join(other.entries()).into_keyed()
1943 }
1944}
1945
1946impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1947where
1948 L: Location<'a>,
1949{
1950 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1951 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1952 ///
1953 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1954 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1955 /// argument that declares where the stream will be atomically processed. Batching a stream into
1956 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1957 /// [`Tick`] will introduce asynchrony.
1958 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1959 let out_location = Atomic { tick: tick.clone() };
1960 KeyedStream::new(
1961 out_location.clone(),
1962 HydroNode::BeginAtomic {
1963 inner: Box::new(self.ir_node.into_inner()),
1964 metadata: out_location
1965 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
1966 },
1967 )
1968 }
1969
1970 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1971 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1972 /// the order of the input.
1973 ///
1974 /// # Non-Determinism
1975 /// The batch boundaries are non-deterministic and may change across executions.
1976 pub fn batch(
1977 self,
1978 tick: &Tick<L>,
1979 nondet: NonDet,
1980 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1981 let _ = nondet;
1982 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1983 KeyedStream::new(
1984 tick.clone(),
1985 HydroNode::Batch {
1986 inner: Box::new(self.ir_node.into_inner()),
1987 metadata: tick.new_node_metadata(
1988 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
1989 ),
1990 },
1991 )
1992 }
1993}
1994
1995impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1996where
1997 L: Location<'a> + NoTick,
1998{
1999 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2000 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2001 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2002 /// used to create the atomic section.
2003 ///
2004 /// # Non-Determinism
2005 /// The batch boundaries are non-deterministic and may change across executions.
2006 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2007 let _ = nondet;
2008 KeyedStream::new(
2009 self.location.clone().tick,
2010 HydroNode::Batch {
2011 inner: Box::new(self.ir_node.into_inner()),
2012 metadata: self.location.tick.new_node_metadata(KeyedStream::<
2013 K,
2014 V,
2015 Tick<L>,
2016 Bounded,
2017 O,
2018 R,
2019 >::collection_kind(
2020 )),
2021 },
2022 )
2023 }
2024
2025 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2026 /// See [`KeyedStream::atomic`] for more details.
2027 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2028 KeyedStream::new(
2029 self.location.tick.l.clone(),
2030 HydroNode::EndAtomic {
2031 inner: Box::new(self.ir_node.into_inner()),
2032 metadata: self
2033 .location
2034 .tick
2035 .l
2036 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2037 },
2038 )
2039 }
2040}
2041
2042impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
2043where
2044 L: Location<'a>,
2045{
2046 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2047 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2048 /// is only present in one of the inputs, its values are passed through as-is). The output has
2049 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2050 ///
2051 /// Currently, both input streams must be [`Bounded`]. This operator will block
2052 /// on the first stream until all its elements are available. In a future version,
2053 /// we will relax the requirement on the `other` stream.
2054 ///
2055 /// # Example
2056 /// ```rust
2057 /// # #[cfg(feature = "deploy")] {
2058 /// # use hydro_lang::prelude::*;
2059 /// # use futures::StreamExt;
2060 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2061 /// let tick = process.tick();
2062 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2063 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2064 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2065 /// # .entries()
2066 /// # }, |mut stream| async move {
2067 /// // { 0: [2, 1], 1: [4, 3] }
2068 /// # let mut results = Vec::new();
2069 /// # for _ in 0..4 {
2070 /// # results.push(stream.next().await.unwrap());
2071 /// # }
2072 /// # results.sort();
2073 /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2074 /// # }));
2075 /// # }
2076 /// ```
2077 pub fn chain<O2: Ordering, R2: Retries>(
2078 self,
2079 other: KeyedStream<K, V, L, Bounded, O2, R2>,
2080 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2081 where
2082 O: MinOrder<O2>,
2083 R: MinRetries<R2>,
2084 {
2085 check_matching_location(&self.location, &other.location);
2086
2087 KeyedStream::new(
2088 self.location.clone(),
2089 HydroNode::Chain {
2090 first: Box::new(self.ir_node.into_inner()),
2091 second: Box::new(other.ir_node.into_inner()),
2092 metadata: self.location.new_node_metadata(KeyedStream::<
2093 K,
2094 V,
2095 L,
2096 Bounded,
2097 <O as MinOrder<O2>>::Min,
2098 <R as MinRetries<R2>>::Min,
2099 >::collection_kind()),
2100 },
2101 )
2102 }
2103
2104 /// Emit a keyed stream containing keys shared between the keyed stream and the
2105 /// keyed singleton, where each value in the output keyed stream is a tuple of
2106 /// (the keyed stream's value, the keyed singleton's value).
2107 ///
2108 /// # Example
2109 /// ```rust
2110 /// # #[cfg(feature = "deploy")] {
2111 /// # use hydro_lang::prelude::*;
2112 /// # use futures::StreamExt;
2113 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2114 /// let tick = process.tick();
2115 /// let keyed_data = process
2116 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2117 /// .into_keyed()
2118 /// .batch(&tick, nondet!(/** test */));
2119 /// let singleton_data = process
2120 /// .source_iter(q!(vec![(1, 100), (2, 200)]))
2121 /// .into_keyed()
2122 /// .batch(&tick, nondet!(/** test */))
2123 /// .first();
2124 /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2125 /// # }, |mut stream| async move {
2126 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2127 /// # let mut results = vec![];
2128 /// # for _ in 0..3 {
2129 /// # results.push(stream.next().await.unwrap());
2130 /// # }
2131 /// # results.sort();
2132 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2133 /// # }));
2134 /// # }
2135 /// ```
2136 pub fn join_keyed_singleton<V2: Clone>(
2137 self,
2138 keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
2139 ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
2140 where
2141 K: Eq + Hash,
2142 {
2143 keyed_singleton
2144 .join_keyed_stream(self)
2145 .map(q!(|(v2, v)| (v, v2)))
2146 }
2147
2148 /// Gets the values associated with a specific key from the keyed stream.
2149 ///
2150 /// # Example
2151 /// ```rust
2152 /// # #[cfg(feature = "deploy")] {
2153 /// # use hydro_lang::prelude::*;
2154 /// # use futures::StreamExt;
2155 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2156 /// let tick = process.tick();
2157 /// let keyed_data = process
2158 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2159 /// .into_keyed()
2160 /// .batch(&tick, nondet!(/** test */));
2161 /// let key = tick.singleton(q!(1));
2162 /// keyed_data.get(key).all_ticks()
2163 /// # }, |mut stream| async move {
2164 /// // 10, 11 in any order
2165 /// # let mut results = vec![];
2166 /// # for _ in 0..2 {
2167 /// # results.push(stream.next().await.unwrap());
2168 /// # }
2169 /// # results.sort();
2170 /// # assert_eq!(results, vec![10, 11]);
2171 /// # }));
2172 /// # }
2173 /// ```
2174 pub fn get(self, key: Singleton<K, L, Bounded>) -> Stream<V, L, Bounded, NoOrder, R>
2175 where
2176 K: Eq + Hash,
2177 {
2178 self.entries()
2179 .join(key.into_stream().map(q!(|k| (k, ()))))
2180 .map(q!(|(_, (v, _))| v))
2181 }
2182
2183 /// For each value in `self`, find the matching key in `lookup`.
2184 /// The output is a keyed stream with the key from `self`, and a value
2185 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2186 /// If the key is not present in `lookup`, the option will be [`None`].
2187 ///
2188 /// # Example
2189 /// ```rust
2190 /// # #[cfg(feature = "deploy")] {
2191 /// # use hydro_lang::prelude::*;
2192 /// # use futures::StreamExt;
2193 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2194 /// # let tick = process.tick();
2195 /// let requests = // { 1: [10, 11], 2: 20 }
2196 /// # process
2197 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2198 /// # .into_keyed()
2199 /// # .batch(&tick, nondet!(/** test */));
2200 /// let other_data = // { 10: 100, 11: 110 }
2201 /// # process
2202 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
2203 /// # .into_keyed()
2204 /// # .batch(&tick, nondet!(/** test */))
2205 /// # .first();
2206 /// requests.lookup_keyed_singleton(other_data)
2207 /// # .entries().all_ticks()
2208 /// # }, |mut stream| async move {
2209 /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2210 /// # let mut results = vec![];
2211 /// # for _ in 0..3 {
2212 /// # results.push(stream.next().await.unwrap());
2213 /// # }
2214 /// # results.sort();
2215 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2216 /// # }));
2217 /// # }
2218 /// ```
2219 pub fn lookup_keyed_singleton<V2>(
2220 self,
2221 lookup: KeyedSingleton<V, V2, L, Bounded>,
2222 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2223 where
2224 K: Eq + Hash + Clone,
2225 V: Eq + Hash + Clone,
2226 V2: Clone,
2227 {
2228 self.lookup_keyed_stream(
2229 lookup
2230 .into_keyed_stream()
2231 .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2232 )
2233 }
2234
2235 /// For each value in `self`, find the matching key in `lookup`.
2236 /// The output is a keyed stream with the key from `self`, and a value
2237 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2238 /// If the key is not present in `lookup`, the option will be [`None`].
2239 ///
2240 /// # Example
2241 /// ```rust
2242 /// # #[cfg(feature = "deploy")] {
2243 /// # use hydro_lang::prelude::*;
2244 /// # use futures::StreamExt;
2245 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2246 /// # let tick = process.tick();
2247 /// let requests = // { 1: [10, 11], 2: 20 }
2248 /// # process
2249 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2250 /// # .into_keyed()
2251 /// # .batch(&tick, nondet!(/** test */));
2252 /// let other_data = // { 10: [100, 101], 11: 110 }
2253 /// # process
2254 /// # .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2255 /// # .into_keyed()
2256 /// # .batch(&tick, nondet!(/** test */));
2257 /// requests.lookup_keyed_stream(other_data)
2258 /// # .entries().all_ticks()
2259 /// # }, |mut stream| async move {
2260 /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2261 /// # let mut results = vec![];
2262 /// # for _ in 0..4 {
2263 /// # results.push(stream.next().await.unwrap());
2264 /// # }
2265 /// # results.sort();
2266 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2267 /// # }));
2268 /// # }
2269 /// ```
2270 #[expect(clippy::type_complexity, reason = "retries propagation")]
2271 pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2272 self,
2273 lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2274 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2275 where
2276 K: Eq + Hash + Clone,
2277 V: Eq + Hash + Clone,
2278 V2: Clone,
2279 R: MinRetries<R2>,
2280 {
2281 let inverted = self
2282 .entries()
2283 .map(q!(|(key, lookup_value)| (lookup_value, key)))
2284 .into_keyed();
2285 let found = inverted
2286 .clone()
2287 .join_keyed_stream(lookup.clone())
2288 .entries()
2289 .map(q!(|(lookup_value, (key, value))| (
2290 key,
2291 (lookup_value, Some(value))
2292 )))
2293 .into_keyed();
2294 let not_found = inverted
2295 .filter_key_not_in(lookup.keys())
2296 .entries()
2297 .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2298 .into_keyed();
2299
2300 found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2301 }
2302}
2303
2304impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2305where
2306 L: Location<'a>,
2307{
2308 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2309 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2310 /// each key.
2311 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2312 KeyedStream::new(
2313 self.location.outer().clone(),
2314 HydroNode::YieldConcat {
2315 inner: Box::new(self.ir_node.into_inner()),
2316 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2317 K,
2318 V,
2319 L,
2320 Unbounded,
2321 O,
2322 R,
2323 >::collection_kind(
2324 )),
2325 },
2326 )
2327 }
2328
2329 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2330 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2331 /// each key.
2332 ///
2333 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2334 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2335 /// stream's [`Tick`] context.
2336 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2337 let out_location = Atomic {
2338 tick: self.location.clone(),
2339 };
2340
2341 KeyedStream::new(
2342 out_location.clone(),
2343 HydroNode::YieldConcat {
2344 inner: Box::new(self.ir_node.into_inner()),
2345 metadata: out_location.new_node_metadata(KeyedStream::<
2346 K,
2347 V,
2348 Atomic<L>,
2349 Unbounded,
2350 O,
2351 R,
2352 >::collection_kind()),
2353 },
2354 )
2355 }
2356
2357 /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2358 /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2359 ///
2360 /// This API is particularly useful for stateful computation on batches of data, such as
2361 /// maintaining an accumulated state that is up to date with the current batch.
2362 ///
2363 /// # Example
2364 /// ```rust
2365 /// # #[cfg(feature = "deploy")] {
2366 /// # use hydro_lang::prelude::*;
2367 /// # use futures::StreamExt;
2368 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2369 /// let tick = process.tick();
2370 /// # // ticks are lazy by default, forces the second tick to run
2371 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2372 /// # let batch_first_tick = process
2373 /// # .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2374 /// # .into_keyed()
2375 /// # .batch(&tick, nondet!(/** test */));
2376 /// # let batch_second_tick = process
2377 /// # .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2378 /// # .into_keyed()
2379 /// # .batch(&tick, nondet!(/** test */))
2380 /// # .defer_tick(); // appears on the second tick
2381 /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2382 ///
2383 /// input.batch(&tick, nondet!(/** test */))
2384 /// .across_ticks(|s| s.reduce(q!(|sum, new| {
2385 /// *sum += new;
2386 /// }))).entries().all_ticks()
2387 /// # }, |mut stream| async move {
2388 /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2389 /// # let mut results = Vec::new();
2390 /// # for _ in 0..4 {
2391 /// # results.push(stream.next().await.unwrap());
2392 /// # }
2393 /// # results.sort();
2394 /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2395 /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2396 /// # results.clear();
2397 /// # for _ in 0..4 {
2398 /// # results.push(stream.next().await.unwrap());
2399 /// # }
2400 /// # results.sort();
2401 /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2402 /// # }));
2403 /// # }
2404 /// ```
2405 pub fn across_ticks<Out: BatchAtomic>(
2406 self,
2407 thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2408 ) -> Out::Batched {
2409 thunk(self.all_ticks_atomic()).batched_atomic()
2410 }
2411
2412 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2413 /// tick `T` always has the entries of `self` at tick `T - 1`.
2414 ///
2415 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2416 ///
2417 /// This operator enables stateful iterative processing with ticks, by sending data from one
2418 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2419 ///
2420 /// # Example
2421 /// ```rust
2422 /// # #[cfg(feature = "deploy")] {
2423 /// # use hydro_lang::prelude::*;
2424 /// # use futures::StreamExt;
2425 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2426 /// let tick = process.tick();
2427 /// # // ticks are lazy by default, forces the second tick to run
2428 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2429 /// # let batch_first_tick = process
2430 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2431 /// # .batch(&tick, nondet!(/** test */))
2432 /// # .into_keyed();
2433 /// # let batch_second_tick = process
2434 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2435 /// # .batch(&tick, nondet!(/** test */))
2436 /// # .defer_tick()
2437 /// # .into_keyed(); // appears on the second tick
2438 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2439 /// # batch_first_tick.chain(batch_second_tick);
2440 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2441 /// changes_across_ticks // from the current tick
2442 /// )
2443 /// # .entries().all_ticks()
2444 /// # }, |mut stream| async move {
2445 /// // First tick: { 1: [2, 3] }
2446 /// # let mut results = Vec::new();
2447 /// # for _ in 0..2 {
2448 /// # results.push(stream.next().await.unwrap());
2449 /// # }
2450 /// # results.sort();
2451 /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2452 /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2453 /// # results.clear();
2454 /// # for _ in 0..4 {
2455 /// # results.push(stream.next().await.unwrap());
2456 /// # }
2457 /// # results.sort();
2458 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2459 /// // Third tick: { 1: [4], 2: [5] }
2460 /// # results.clear();
2461 /// # for _ in 0..2 {
2462 /// # results.push(stream.next().await.unwrap());
2463 /// # }
2464 /// # results.sort();
2465 /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2466 /// # }));
2467 /// # }
2468 /// ```
2469 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2470 KeyedStream::new(
2471 self.location.clone(),
2472 HydroNode::DeferTick {
2473 input: Box::new(self.ir_node.into_inner()),
2474 metadata: self.location.new_node_metadata(KeyedStream::<
2475 K,
2476 V,
2477 Tick<L>,
2478 Bounded,
2479 O,
2480 R,
2481 >::collection_kind()),
2482 },
2483 )
2484 }
2485}
2486
2487#[cfg(test)]
2488mod tests {
2489 #[cfg(feature = "deploy")]
2490 use futures::{SinkExt, StreamExt};
2491 #[cfg(feature = "deploy")]
2492 use hydro_deploy::Deployment;
2493 #[cfg(any(feature = "deploy", feature = "sim"))]
2494 use stageleft::q;
2495
2496 #[cfg(any(feature = "deploy", feature = "sim"))]
2497 use crate::compile::builder::FlowBuilder;
2498 #[cfg(feature = "deploy")]
2499 use crate::live_collections::stream::ExactlyOnce;
2500 #[cfg(feature = "sim")]
2501 use crate::live_collections::stream::{NoOrder, TotalOrder};
2502 #[cfg(any(feature = "deploy", feature = "sim"))]
2503 use crate::location::Location;
2504 #[cfg(any(feature = "deploy", feature = "sim"))]
2505 use crate::nondet::nondet;
2506 #[cfg(feature = "deploy")]
2507 use crate::properties::ManualProof;
2508
2509 #[cfg(feature = "deploy")]
2510 #[tokio::test]
2511 async fn reduce_watermark_filter() {
2512 let mut deployment = Deployment::new();
2513
2514 let mut flow = FlowBuilder::new();
2515 let node = flow.process::<()>();
2516 let external = flow.external::<()>();
2517
2518 let node_tick = node.tick();
2519 let watermark = node_tick.singleton(q!(1));
2520
2521 let sum = node
2522 .source_stream(q!(tokio_stream::iter([
2523 (0, 100),
2524 (1, 101),
2525 (2, 102),
2526 (2, 102)
2527 ])))
2528 .into_keyed()
2529 .reduce_watermark(
2530 watermark,
2531 q!(|acc, v| {
2532 *acc += v;
2533 }),
2534 )
2535 .snapshot(&node_tick, nondet!(/** test */))
2536 .entries()
2537 .all_ticks()
2538 .send_bincode_external(&external);
2539
2540 let nodes = flow
2541 .with_process(&node, deployment.Localhost())
2542 .with_external(&external, deployment.Localhost())
2543 .deploy(&mut deployment);
2544
2545 deployment.deploy().await.unwrap();
2546
2547 let mut out = nodes.connect(sum).await;
2548
2549 deployment.start().await.unwrap();
2550
2551 assert_eq!(out.next().await.unwrap(), (2, 204));
2552 }
2553
2554 #[cfg(feature = "deploy")]
2555 #[tokio::test]
2556 async fn reduce_watermark_bounded() {
2557 let mut deployment = Deployment::new();
2558
2559 let mut flow = FlowBuilder::new();
2560 let node = flow.process::<()>();
2561 let external = flow.external::<()>();
2562
2563 let node_tick = node.tick();
2564 let watermark = node_tick.singleton(q!(1));
2565
2566 let sum = node
2567 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2568 .into_keyed()
2569 .reduce_watermark(
2570 watermark,
2571 q!(|acc, v| {
2572 *acc += v;
2573 }),
2574 )
2575 .entries()
2576 .send_bincode_external(&external);
2577
2578 let nodes = flow
2579 .with_process(&node, deployment.Localhost())
2580 .with_external(&external, deployment.Localhost())
2581 .deploy(&mut deployment);
2582
2583 deployment.deploy().await.unwrap();
2584
2585 let mut out = nodes.connect(sum).await;
2586
2587 deployment.start().await.unwrap();
2588
2589 assert_eq!(out.next().await.unwrap(), (2, 204));
2590 }
2591
2592 #[cfg(feature = "deploy")]
2593 #[tokio::test]
2594 async fn reduce_watermark_garbage_collect() {
2595 let mut deployment = Deployment::new();
2596
2597 let mut flow = FlowBuilder::new();
2598 let node = flow.process::<()>();
2599 let external = flow.external::<()>();
2600 let (tick_send, tick_trigger) =
2601 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2602
2603 let node_tick = node.tick();
2604 let (watermark_complete_cycle, watermark) =
2605 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2606 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2607 watermark_complete_cycle.complete_next_tick(next_watermark);
2608
2609 let tick_triggered_input = node_tick
2610 .singleton(q!((3, 103)))
2611 .into_stream()
2612 .filter_if_some(
2613 tick_trigger
2614 .clone()
2615 .batch(&node_tick, nondet!(/** test */))
2616 .first(),
2617 )
2618 .all_ticks();
2619
2620 let sum = node
2621 .source_stream(q!(tokio_stream::iter([
2622 (0, 100),
2623 (1, 101),
2624 (2, 102),
2625 (2, 102)
2626 ])))
2627 .interleave(tick_triggered_input)
2628 .into_keyed()
2629 .reduce_watermark(
2630 watermark,
2631 q!(
2632 |acc, v| {
2633 *acc += v;
2634 },
2635 commutative = ManualProof(/* integer addition is commutative */)
2636 ),
2637 )
2638 .snapshot(&node_tick, nondet!(/** test */))
2639 .entries()
2640 .all_ticks()
2641 .send_bincode_external(&external);
2642
2643 let nodes = flow
2644 .with_default_optimize()
2645 .with_process(&node, deployment.Localhost())
2646 .with_external(&external, deployment.Localhost())
2647 .deploy(&mut deployment);
2648
2649 deployment.deploy().await.unwrap();
2650
2651 let mut tick_send = nodes.connect(tick_send).await;
2652 let mut out_recv = nodes.connect(sum).await;
2653
2654 deployment.start().await.unwrap();
2655
2656 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2657
2658 tick_send.send(()).await.unwrap();
2659
2660 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2661 }
2662
2663 #[cfg(feature = "sim")]
2664 #[test]
2665 #[should_panic]
2666 fn sim_batch_nondet_size() {
2667 let mut flow = FlowBuilder::new();
2668 let node = flow.process::<()>();
2669
2670 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2671
2672 let tick = node.tick();
2673 let out_recv = input
2674 .batch(&tick, nondet!(/** test */))
2675 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2676 .entries()
2677 .all_ticks()
2678 .sim_output();
2679
2680 flow.sim().exhaustive(async || {
2681 out_recv
2682 .assert_yields_only_unordered([(1, vec![1, 2])])
2683 .await;
2684 });
2685 }
2686
2687 #[cfg(feature = "sim")]
2688 #[test]
2689 fn sim_batch_preserves_group_order() {
2690 let mut flow = FlowBuilder::new();
2691 let node = flow.process::<()>();
2692
2693 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2694
2695 let tick = node.tick();
2696 let out_recv = input
2697 .batch(&tick, nondet!(/** test */))
2698 .all_ticks()
2699 .fold_early_stop(
2700 q!(|| 0),
2701 q!(|acc, v| {
2702 *acc = std::cmp::max(v, *acc);
2703 *acc >= 2
2704 }),
2705 )
2706 .entries()
2707 .sim_output();
2708
2709 let instances = flow.sim().exhaustive(async || {
2710 out_recv
2711 .assert_yields_only_unordered([(1, 2), (2, 3)])
2712 .await;
2713 });
2714
2715 assert_eq!(instances, 8);
2716 // - three cases: all three in a separate tick (pick where (2, 3) is)
2717 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2718 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2719 // - one case: all three together
2720 }
2721
2722 #[cfg(feature = "sim")]
2723 #[test]
2724 fn sim_batch_unordered_shuffles() {
2725 let mut flow = FlowBuilder::new();
2726 let node = flow.process::<()>();
2727
2728 let input = node
2729 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2730 .into_keyed()
2731 .weaken_ordering::<NoOrder>();
2732
2733 let tick = node.tick();
2734 let out_recv = input
2735 .batch(&tick, nondet!(/** test */))
2736 .all_ticks()
2737 .entries()
2738 .sim_output();
2739
2740 let instances = flow.sim().exhaustive(async || {
2741 out_recv
2742 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2743 .await;
2744 });
2745
2746 assert_eq!(instances, 13);
2747 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2748 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2749 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2750 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2751 }
2752
2753 #[cfg(feature = "sim")]
2754 #[test]
2755 #[should_panic]
2756 fn sim_observe_order_batched() {
2757 let mut flow = FlowBuilder::new();
2758 let node = flow.process::<()>();
2759
2760 let (port, input) = node.sim_input::<_, NoOrder, _>();
2761
2762 let tick = node.tick();
2763 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2764 let out_recv = batch
2765 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2766 .all_ticks()
2767 .first()
2768 .entries()
2769 .sim_output();
2770
2771 flow.sim().exhaustive(async || {
2772 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2773 out_recv
2774 .assert_yields_only_unordered([(1, 1), (2, 1)])
2775 .await; // fails with assume_ordering
2776 });
2777 }
2778
2779 #[cfg(feature = "sim")]
2780 #[test]
2781 fn sim_observe_order_batched_count() {
2782 let mut flow = FlowBuilder::new();
2783 let node = flow.process::<()>();
2784
2785 let (port, input) = node.sim_input::<_, NoOrder, _>();
2786
2787 let tick = node.tick();
2788 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2789 let out_recv = batch
2790 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2791 .all_ticks()
2792 .entries()
2793 .sim_output();
2794
2795 let instance_count = flow.sim().exhaustive(async || {
2796 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2797 let _ = out_recv.collect_sorted::<Vec<_>>().await;
2798 });
2799
2800 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2801 }
2802
2803 #[cfg(feature = "sim")]
2804 #[test]
2805 fn sim_top_level_assume_ordering() {
2806 use std::collections::HashMap;
2807
2808 let mut flow = FlowBuilder::new();
2809 let node = flow.process::<()>();
2810
2811 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2812
2813 let out_recv = input
2814 .into_keyed()
2815 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2816 .fold_early_stop(
2817 q!(|| Vec::new()),
2818 q!(|acc, v| {
2819 acc.push(v);
2820 acc.len() >= 2
2821 }),
2822 )
2823 .entries()
2824 .sim_output();
2825
2826 let instance_count = flow.sim().exhaustive(async || {
2827 in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
2828 let out: HashMap<_, _> = out_recv
2829 .collect_sorted::<Vec<_>>()
2830 .await
2831 .into_iter()
2832 .collect();
2833 // Each key accumulates its values; we get one entry per key
2834 assert_eq!(out.len(), 2);
2835 });
2836
2837 assert_eq!(instance_count, 24)
2838 }
2839
2840 #[cfg(feature = "sim")]
2841 #[test]
2842 fn sim_top_level_assume_ordering_cycle_back() {
2843 use std::collections::HashMap;
2844
2845 let mut flow = FlowBuilder::new();
2846 let node = flow.process::<()>();
2847
2848 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2849
2850 let (complete_cycle_back, cycle_back) =
2851 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
2852 let ordered = input
2853 .into_keyed()
2854 .interleave(cycle_back)
2855 .assume_ordering::<TotalOrder>(nondet!(/** test */));
2856 complete_cycle_back.complete(
2857 ordered
2858 .clone()
2859 .map(q!(|v| v + 1))
2860 .filter(q!(|v| v % 2 == 1)),
2861 );
2862
2863 let out_recv = ordered
2864 .fold_early_stop(
2865 q!(|| Vec::new()),
2866 q!(|acc, v| {
2867 acc.push(v);
2868 acc.len() >= 2
2869 }),
2870 )
2871 .entries()
2872 .sim_output();
2873
2874 let mut saw = false;
2875 let instance_count = flow.sim().exhaustive(async || {
2876 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
2877 // We want to see [0, 1] - the cycled back value interleaved
2878 in_send.send_many_unordered([(1, 0), (1, 2)]);
2879 let out: HashMap<_, _> = out_recv
2880 .collect_sorted::<Vec<_>>()
2881 .await
2882 .into_iter()
2883 .collect();
2884
2885 // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
2886 if let Some(values) = out.get(&1)
2887 && *values == vec![0, 1]
2888 {
2889 saw = true;
2890 }
2891 });
2892
2893 assert!(
2894 saw,
2895 "did not see an instance with key 1 having [0, 1] in order"
2896 );
2897 assert_eq!(instance_count, 6);
2898 }
2899
2900 #[cfg(feature = "sim")]
2901 #[test]
2902 fn sim_top_level_assume_ordering_cross_key_cycle() {
2903 use std::collections::HashMap;
2904
2905 // This test demonstrates why releasing one entry at a time is important:
2906 // When one key's observed order cycles back into a different key, we need
2907 // to be able to interleave the cycled-back entry with pending items for
2908 // that other key.
2909 let mut flow = FlowBuilder::new();
2910 let node = flow.process::<()>();
2911
2912 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2913
2914 let (complete_cycle_back, cycle_back) =
2915 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
2916 let ordered = input
2917 .into_keyed()
2918 .interleave(cycle_back)
2919 .assume_ordering::<TotalOrder>(nondet!(/** test */));
2920
2921 // Cycle back: when we see (1, 10), emit (2, 100) to key 2
2922 complete_cycle_back.complete(
2923 ordered
2924 .clone()
2925 .filter(q!(|v| *v == 10))
2926 .map(q!(|_| 100))
2927 .entries()
2928 .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
2929 .into_keyed(),
2930 );
2931
2932 let out_recv = ordered
2933 .fold_early_stop(
2934 q!(|| Vec::new()),
2935 q!(|acc, v| {
2936 acc.push(v);
2937 acc.len() >= 2
2938 }),
2939 )
2940 .entries()
2941 .sim_output();
2942
2943 // We want to see an instance where:
2944 // - (1, 10) is released first
2945 // - This causes (2, 100) to be cycled back
2946 // - (2, 100) is released BEFORE (2, 20) which was already pending
2947 let mut saw_cross_key_interleave = false;
2948 let instance_count = flow.sim().exhaustive(async || {
2949 // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
2950 in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
2951 let out: HashMap<_, _> = out_recv
2952 .collect_sorted::<Vec<_>>()
2953 .await
2954 .into_iter()
2955 .collect();
2956
2957 // Check if we see the cross-key interleaving:
2958 // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
2959 if let Some(values) = out.get(&2)
2960 && values.len() >= 2
2961 && values[0] == 100
2962 {
2963 saw_cross_key_interleave = true;
2964 }
2965 });
2966
2967 assert!(
2968 saw_cross_key_interleave,
2969 "did not see an instance where cycled-back 100 was released before pending items for key 2"
2970 );
2971 assert_eq!(instance_count, 60);
2972 }
2973
2974 #[cfg(feature = "sim")]
2975 #[test]
2976 fn sim_top_level_assume_ordering_cycle_back_tick() {
2977 use std::collections::HashMap;
2978
2979 let mut flow = FlowBuilder::new();
2980 let node = flow.process::<()>();
2981
2982 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2983
2984 let (complete_cycle_back, cycle_back) =
2985 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
2986 let ordered = input
2987 .into_keyed()
2988 .interleave(cycle_back)
2989 .assume_ordering::<TotalOrder>(nondet!(/** test */));
2990 complete_cycle_back.complete(
2991 ordered
2992 .clone()
2993 .batch(&node.tick(), nondet!(/** test */))
2994 .all_ticks()
2995 .map(q!(|v| v + 1))
2996 .filter(q!(|v| v % 2 == 1)),
2997 );
2998
2999 let out_recv = ordered
3000 .fold_early_stop(
3001 q!(|| Vec::new()),
3002 q!(|acc, v| {
3003 acc.push(v);
3004 acc.len() >= 2
3005 }),
3006 )
3007 .entries()
3008 .sim_output();
3009
3010 let mut saw = false;
3011 let instance_count = flow.sim().exhaustive(async || {
3012 in_send.send_many_unordered([(1, 0), (1, 2)]);
3013 let out: HashMap<_, _> = out_recv
3014 .collect_sorted::<Vec<_>>()
3015 .await
3016 .into_iter()
3017 .collect();
3018
3019 if let Some(values) = out.get(&1)
3020 && *values == vec![0, 1]
3021 {
3022 saw = true;
3023 }
3024 });
3025
3026 assert!(
3027 saw,
3028 "did not see an instance with key 1 having [0, 1] in order"
3029 );
3030 assert_eq!(instance_count, 58);
3031 }
3032}