hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16 ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::stream::{
27 AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
28};
29#[cfg(stageleft_runtime)]
30use crate::location::dynamic::{DynLocation, LocationId};
31use crate::location::tick::DeferTick;
32use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
33use crate::manual_expr::ManualExpr;
34use crate::nondet::{NonDet, nondet};
35use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
36
37pub mod networking;
38
39/// Streaming elements of type `V` grouped by a key of type `K`.
40///
41/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
42/// order of keys is non-deterministic but the order *within* each group may be deterministic.
43///
44/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
45/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
46/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
47///
48/// Type Parameters:
49/// - `K`: the type of the key for each group
50/// - `V`: the type of the elements inside each group
51/// - `Loc`: the [`Location`] where the keyed stream is materialized
52/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
53/// - `Order`: tracks whether the elements within each group have deterministic order
54/// ([`TotalOrder`]) or not ([`NoOrder`])
55/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
56/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
57pub struct KeyedStream<
58 K,
59 V,
60 Loc,
61 Bound: Boundedness = Unbounded,
62 Order: Ordering = TotalOrder,
63 Retry: Retries = ExactlyOnce,
64> {
65 pub(crate) location: Loc,
66 pub(crate) ir_node: RefCell<HydroNode>,
67 pub(crate) flow_state: FlowState,
68
69 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
70}
71
72impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
73 fn drop(&mut self) {
74 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
75 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
76 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
77 input: Box::new(ir_node),
78 op_metadata: HydroIrOpMetadata::new(),
79 });
80 }
81 }
82}
83
84impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
85 for KeyedStream<K, V, L, Unbounded, O, R>
86where
87 L: Location<'a>,
88{
89 fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
90 let new_meta = stream
91 .location
92 .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
93
94 KeyedStream {
95 location: stream.location.clone(),
96 flow_state: stream.flow_state.clone(),
97 ir_node: RefCell::new(HydroNode::Cast {
98 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
99 metadata: new_meta,
100 }),
101 _phantom: PhantomData,
102 }
103 }
104}
105
106impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
107 for KeyedStream<K, V, L, B, NoOrder, R>
108where
109 L: Location<'a>,
110{
111 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
112 stream.weaken_ordering()
113 }
114}
115
116impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
117where
118 L: Location<'a>,
119{
120 fn defer_tick(self) -> Self {
121 KeyedStream::defer_tick(self)
122 }
123}
124
125impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
126 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
127where
128 L: Location<'a>,
129{
130 type Location = Tick<L>;
131
132 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
133 KeyedStream {
134 flow_state: location.flow_state().clone(),
135 location: location.clone(),
136 ir_node: RefCell::new(HydroNode::CycleSource {
137 cycle_id,
138 metadata: location.new_node_metadata(
139 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
140 ),
141 }),
142 _phantom: PhantomData,
143 }
144 }
145}
146
147impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
148 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
149where
150 L: Location<'a>,
151{
152 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
153 assert_eq!(
154 Location::id(&self.location),
155 expected_location,
156 "locations do not match"
157 );
158
159 self.location
160 .flow_state()
161 .borrow_mut()
162 .push_root(HydroRoot::CycleSink {
163 cycle_id,
164 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
165 op_metadata: HydroIrOpMetadata::new(),
166 });
167 }
168}
169
170impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
171 for KeyedStream<K, V, L, B, O, R>
172where
173 L: Location<'a> + NoTick,
174{
175 type Location = L;
176
177 fn create_source(cycle_id: CycleId, location: L) -> Self {
178 KeyedStream {
179 flow_state: location.flow_state().clone(),
180 location: location.clone(),
181 ir_node: RefCell::new(HydroNode::CycleSource {
182 cycle_id,
183 metadata: location
184 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
185 }),
186 _phantom: PhantomData,
187 }
188 }
189}
190
191impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
192 for KeyedStream<K, V, L, B, O, R>
193where
194 L: Location<'a> + NoTick,
195{
196 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
197 assert_eq!(
198 Location::id(&self.location),
199 expected_location,
200 "locations do not match"
201 );
202 self.location
203 .flow_state()
204 .borrow_mut()
205 .push_root(HydroRoot::CycleSink {
206 cycle_id,
207 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
208 op_metadata: HydroIrOpMetadata::new(),
209 });
210 }
211}
212
213impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
214 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
215{
216 fn clone(&self) -> Self {
217 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
218 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
219 *self.ir_node.borrow_mut() = HydroNode::Tee {
220 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
221 metadata: self.location.new_node_metadata(Self::collection_kind()),
222 };
223 }
224
225 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
226 KeyedStream {
227 location: self.location.clone(),
228 flow_state: self.flow_state.clone(),
229 ir_node: HydroNode::Tee {
230 inner: SharedNode(inner.0.clone()),
231 metadata: metadata.clone(),
232 }
233 .into(),
234 _phantom: PhantomData,
235 }
236 } else {
237 unreachable!()
238 }
239 }
240}
241
242/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
243/// control the processing of future elements.
244pub enum Generate<T> {
245 /// Emit the provided element, and keep processing future inputs.
246 Yield(T),
247 /// Emit the provided element as the _final_ element, do not process future inputs.
248 Return(T),
249 /// Do not emit anything, but continue processing future inputs.
250 Continue,
251 /// Do not emit anything, and do not process further inputs.
252 Break,
253}
254
255impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
256 KeyedStream<K, V, L, B, O, R>
257{
258 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
259 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
260 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
261
262 let flow_state = location.flow_state().clone();
263 KeyedStream {
264 location,
265 flow_state,
266 ir_node: RefCell::new(ir_node),
267 _phantom: PhantomData,
268 }
269 }
270
271 /// Returns the [`CollectionKind`] corresponding to this type.
272 pub fn collection_kind() -> CollectionKind {
273 CollectionKind::KeyedStream {
274 bound: B::BOUND_KIND,
275 value_order: O::ORDERING_KIND,
276 value_retry: R::RETRIES_KIND,
277 key_type: stageleft::quote_type::<K>().into(),
278 value_type: stageleft::quote_type::<V>().into(),
279 }
280 }
281
282 /// Returns the [`Location`] where this keyed stream is being materialized.
283 pub fn location(&self) -> &L {
284 &self.location
285 }
286
287 /// Explicitly "casts" the keyed stream to a type with a different ordering
288 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
289 /// by the type-system.
290 ///
291 /// # Non-Determinism
292 /// This function is used as an escape hatch, and any mistakes in the
293 /// provided ordering guarantee will propagate into the guarantees
294 /// for the rest of the program.
295 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
296 if O::ORDERING_KIND == O2::ORDERING_KIND {
297 KeyedStream::new(
298 self.location.clone(),
299 self.ir_node.replace(HydroNode::Placeholder),
300 )
301 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
302 // We can always weaken the ordering guarantee
303 KeyedStream::new(
304 self.location.clone(),
305 HydroNode::Cast {
306 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
307 metadata: self
308 .location
309 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
310 },
311 )
312 } else {
313 KeyedStream::new(
314 self.location.clone(),
315 HydroNode::ObserveNonDet {
316 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
317 trusted: false,
318 metadata: self
319 .location
320 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
321 },
322 )
323 }
324 }
325
326 fn assume_ordering_trusted<O2: Ordering>(
327 self,
328 _nondet: NonDet,
329 ) -> KeyedStream<K, V, L, B, O2, R> {
330 if O::ORDERING_KIND == O2::ORDERING_KIND {
331 KeyedStream::new(
332 self.location.clone(),
333 self.ir_node.replace(HydroNode::Placeholder),
334 )
335 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
336 // We can always weaken the ordering guarantee
337 KeyedStream::new(
338 self.location.clone(),
339 HydroNode::Cast {
340 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
341 metadata: self
342 .location
343 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
344 },
345 )
346 } else {
347 KeyedStream::new(
348 self.location.clone(),
349 HydroNode::ObserveNonDet {
350 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
351 trusted: true,
352 metadata: self
353 .location
354 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
355 },
356 )
357 }
358 }
359
360 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
361 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
362 /// which is always safe because that is the weakest possible guarantee.
363 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
364 self.weaken_ordering::<NoOrder>()
365 }
366
367 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
368 /// enforcing that `O2` is weaker than the input ordering guarantee.
369 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
370 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
371 self.assume_ordering::<O2>(nondet)
372 }
373
374 /// Explicitly "casts" the keyed stream to a type with a different retries
375 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
376 /// be proven by the type-system.
377 ///
378 /// # Non-Determinism
379 /// This function is used as an escape hatch, and any mistakes in the
380 /// provided retries guarantee will propagate into the guarantees
381 /// for the rest of the program.
382 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
383 if R::RETRIES_KIND == R2::RETRIES_KIND {
384 KeyedStream::new(
385 self.location.clone(),
386 self.ir_node.replace(HydroNode::Placeholder),
387 )
388 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
389 // We can always weaken the retries guarantee
390 KeyedStream::new(
391 self.location.clone(),
392 HydroNode::Cast {
393 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
394 metadata: self
395 .location
396 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
397 },
398 )
399 } else {
400 KeyedStream::new(
401 self.location.clone(),
402 HydroNode::ObserveNonDet {
403 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
404 trusted: false,
405 metadata: self
406 .location
407 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
408 },
409 )
410 }
411 }
412
413 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
414 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
415 /// which is always safe because that is the weakest possible guarantee.
416 pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
417 self.weaken_retries::<AtLeastOnce>()
418 }
419
420 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
421 /// enforcing that `R2` is weaker than the input retries guarantee.
422 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
423 let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
424 self.assume_retries::<R2>(nondet)
425 }
426
427 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
428 /// implies that `O == TotalOrder`.
429 pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
430 where
431 O: IsOrdered,
432 {
433 self.assume_ordering(nondet!(/** no-op */))
434 }
435
436 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
437 /// implies that `R == ExactlyOnce`.
438 pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
439 where
440 R: IsExactlyOnce,
441 {
442 self.assume_retries(nondet!(/** no-op */))
443 }
444
445 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
446 /// implies that `B == Bounded`.
447 pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
448 where
449 B: IsBounded,
450 {
451 KeyedStream::new(
452 self.location.clone(),
453 self.ir_node.replace(HydroNode::Placeholder),
454 )
455 }
456
457 /// Flattens the keyed stream into an unordered stream of key-value pairs.
458 ///
459 /// # Example
460 /// ```rust
461 /// # #[cfg(feature = "deploy")] {
462 /// # use hydro_lang::prelude::*;
463 /// # use futures::StreamExt;
464 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465 /// process
466 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
467 /// .into_keyed()
468 /// .entries()
469 /// # }, |mut stream| async move {
470 /// // (1, 2), (1, 3), (2, 4) in any order
471 /// # let mut results = Vec::new();
472 /// # for _ in 0..3 {
473 /// # results.push(stream.next().await.unwrap());
474 /// # }
475 /// # results.sort();
476 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
477 /// # }));
478 /// # }
479 /// ```
480 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
481 Stream::new(
482 self.location.clone(),
483 HydroNode::Cast {
484 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
485 metadata: self
486 .location
487 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
488 },
489 )
490 }
491
492 /// Flattens the keyed stream into an unordered stream of only the values.
493 ///
494 /// # Example
495 /// ```rust
496 /// # #[cfg(feature = "deploy")] {
497 /// # use hydro_lang::prelude::*;
498 /// # use futures::StreamExt;
499 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
500 /// process
501 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
502 /// .into_keyed()
503 /// .values()
504 /// # }, |mut stream| async move {
505 /// // 2, 3, 4 in any order
506 /// # let mut results = Vec::new();
507 /// # for _ in 0..3 {
508 /// # results.push(stream.next().await.unwrap());
509 /// # }
510 /// # results.sort();
511 /// # assert_eq!(results, vec![2, 3, 4]);
512 /// # }));
513 /// # }
514 /// ```
515 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
516 self.entries().map(q!(|(_, v)| v))
517 }
518
519 /// Flattens the keyed stream into an unordered stream of just the keys.
520 ///
521 /// # Example
522 /// ```rust
523 /// # #[cfg(feature = "deploy")] {
524 /// # use hydro_lang::prelude::*;
525 /// # use futures::StreamExt;
526 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
527 /// # process
528 /// # .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
529 /// # .into_keyed()
530 /// # .keys()
531 /// # }, |mut stream| async move {
532 /// // 1, 2 in any order
533 /// # let mut results = Vec::new();
534 /// # for _ in 0..2 {
535 /// # results.push(stream.next().await.unwrap());
536 /// # }
537 /// # results.sort();
538 /// # assert_eq!(results, vec![1, 2]);
539 /// # }));
540 /// # }
541 /// ```
542 pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
543 where
544 K: Eq + Hash,
545 {
546 self.entries().map(q!(|(k, _)| k)).unique()
547 }
548
549 /// Transforms each value by invoking `f` on each element, with keys staying the same
550 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
551 ///
552 /// If you do not want to modify the stream and instead only want to view
553 /// each item use [`KeyedStream::inspect`] instead.
554 ///
555 /// # Example
556 /// ```rust
557 /// # #[cfg(feature = "deploy")] {
558 /// # use hydro_lang::prelude::*;
559 /// # use futures::StreamExt;
560 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
561 /// process
562 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
563 /// .into_keyed()
564 /// .map(q!(|v| v + 1))
565 /// # .entries()
566 /// # }, |mut stream| async move {
567 /// // { 1: [3, 4], 2: [5] }
568 /// # let mut results = Vec::new();
569 /// # for _ in 0..3 {
570 /// # results.push(stream.next().await.unwrap());
571 /// # }
572 /// # results.sort();
573 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
574 /// # }));
575 /// # }
576 /// ```
577 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
578 where
579 F: Fn(V) -> U + 'a,
580 {
581 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
582 let map_f = q!({
583 let orig = f;
584 move |(k, v)| (k, orig(v))
585 })
586 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
587 .into();
588
589 KeyedStream::new(
590 self.location.clone(),
591 HydroNode::Map {
592 f: map_f,
593 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
594 metadata: self
595 .location
596 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
597 },
598 )
599 }
600
601 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
602 /// re-grouped even they are tuples; instead they will be grouped under the original key.
603 ///
604 /// If you do not want to modify the stream and instead only want to view
605 /// each item use [`KeyedStream::inspect_with_key`] instead.
606 ///
607 /// # Example
608 /// ```rust
609 /// # #[cfg(feature = "deploy")] {
610 /// # use hydro_lang::prelude::*;
611 /// # use futures::StreamExt;
612 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
613 /// process
614 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
615 /// .into_keyed()
616 /// .map_with_key(q!(|(k, v)| k + v))
617 /// # .entries()
618 /// # }, |mut stream| async move {
619 /// // { 1: [3, 4], 2: [6] }
620 /// # let mut results = Vec::new();
621 /// # for _ in 0..3 {
622 /// # results.push(stream.next().await.unwrap());
623 /// # }
624 /// # results.sort();
625 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
626 /// # }));
627 /// # }
628 /// ```
629 pub fn map_with_key<U, F>(
630 self,
631 f: impl IntoQuotedMut<'a, F, L> + Copy,
632 ) -> KeyedStream<K, U, L, B, O, R>
633 where
634 F: Fn((K, V)) -> U + 'a,
635 K: Clone,
636 {
637 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
638 let map_f = q!({
639 let orig = f;
640 move |(k, v)| {
641 let out = orig((Clone::clone(&k), v));
642 (k, out)
643 }
644 })
645 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
646 .into();
647
648 KeyedStream::new(
649 self.location.clone(),
650 HydroNode::Map {
651 f: map_f,
652 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
653 metadata: self
654 .location
655 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
656 },
657 )
658 }
659
660 /// Prepends a new value to the key of each element in the stream, producing a new
661 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
662 /// occurs and the elements in each group preserve their original order.
663 ///
664 /// # Example
665 /// ```rust
666 /// # #[cfg(feature = "deploy")] {
667 /// # use hydro_lang::prelude::*;
668 /// # use futures::StreamExt;
669 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
670 /// process
671 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
672 /// .into_keyed()
673 /// .prefix_key(q!(|&(k, _)| k % 2))
674 /// # .entries()
675 /// # }, |mut stream| async move {
676 /// // { (1, 1): [2, 3], (0, 2): [4] }
677 /// # let mut results = Vec::new();
678 /// # for _ in 0..3 {
679 /// # results.push(stream.next().await.unwrap());
680 /// # }
681 /// # results.sort();
682 /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
683 /// # }));
684 /// # }
685 /// ```
686 pub fn prefix_key<K2, F>(
687 self,
688 f: impl IntoQuotedMut<'a, F, L> + Copy,
689 ) -> KeyedStream<(K2, K), V, L, B, O, R>
690 where
691 F: Fn(&(K, V)) -> K2 + 'a,
692 {
693 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
694 let map_f = q!({
695 let orig = f;
696 move |kv| {
697 let out = orig(&kv);
698 ((out, kv.0), kv.1)
699 }
700 })
701 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
702 .into();
703
704 KeyedStream::new(
705 self.location.clone(),
706 HydroNode::Map {
707 f: map_f,
708 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
709 metadata: self
710 .location
711 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
712 },
713 )
714 }
715
716 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
717 /// `f`, preserving the order of the elements within the group.
718 ///
719 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
720 /// not modify or take ownership of the values. If you need to modify the values while filtering
721 /// use [`KeyedStream::filter_map`] instead.
722 ///
723 /// # Example
724 /// ```rust
725 /// # #[cfg(feature = "deploy")] {
726 /// # use hydro_lang::prelude::*;
727 /// # use futures::StreamExt;
728 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
729 /// process
730 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
731 /// .into_keyed()
732 /// .filter(q!(|&x| x > 2))
733 /// # .entries()
734 /// # }, |mut stream| async move {
735 /// // { 1: [3], 2: [4] }
736 /// # let mut results = Vec::new();
737 /// # for _ in 0..2 {
738 /// # results.push(stream.next().await.unwrap());
739 /// # }
740 /// # results.sort();
741 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
742 /// # }));
743 /// # }
744 /// ```
745 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
746 where
747 F: Fn(&V) -> bool + 'a,
748 {
749 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
750 let filter_f = q!({
751 let orig = f;
752 move |t: &(_, _)| orig(&t.1)
753 })
754 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
755 .into();
756
757 KeyedStream::new(
758 self.location.clone(),
759 HydroNode::Filter {
760 f: filter_f,
761 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
762 metadata: self.location.new_node_metadata(Self::collection_kind()),
763 },
764 )
765 }
766
767 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
768 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
769 ///
770 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
771 /// not modify or take ownership of the values. If you need to modify the values while filtering
772 /// use [`KeyedStream::filter_map_with_key`] instead.
773 ///
774 /// # Example
775 /// ```rust
776 /// # #[cfg(feature = "deploy")] {
777 /// # use hydro_lang::prelude::*;
778 /// # use futures::StreamExt;
779 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
780 /// process
781 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
782 /// .into_keyed()
783 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
784 /// # .entries()
785 /// # }, |mut stream| async move {
786 /// // { 1: [3], 2: [4] }
787 /// # let mut results = Vec::new();
788 /// # for _ in 0..2 {
789 /// # results.push(stream.next().await.unwrap());
790 /// # }
791 /// # results.sort();
792 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
793 /// # }));
794 /// # }
795 /// ```
796 pub fn filter_with_key<F>(
797 self,
798 f: impl IntoQuotedMut<'a, F, L> + Copy,
799 ) -> KeyedStream<K, V, L, B, O, R>
800 where
801 F: Fn(&(K, V)) -> bool + 'a,
802 {
803 let filter_f = f
804 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
805 .into();
806
807 KeyedStream::new(
808 self.location.clone(),
809 HydroNode::Filter {
810 f: filter_f,
811 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
812 metadata: self.location.new_node_metadata(Self::collection_kind()),
813 },
814 )
815 }
816
817 /// An operator that both filters and maps each value, with keys staying the same.
818 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
819 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
820 ///
821 /// # Example
822 /// ```rust
823 /// # #[cfg(feature = "deploy")] {
824 /// # use hydro_lang::prelude::*;
825 /// # use futures::StreamExt;
826 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
827 /// process
828 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
829 /// .into_keyed()
830 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
831 /// # .entries()
832 /// # }, |mut stream| async move {
833 /// // { 1: [2], 2: [4] }
834 /// # let mut results = Vec::new();
835 /// # for _ in 0..2 {
836 /// # results.push(stream.next().await.unwrap());
837 /// # }
838 /// # results.sort();
839 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
840 /// # }));
841 /// # }
842 /// ```
843 pub fn filter_map<U, F>(
844 self,
845 f: impl IntoQuotedMut<'a, F, L> + Copy,
846 ) -> KeyedStream<K, U, L, B, O, R>
847 where
848 F: Fn(V) -> Option<U> + 'a,
849 {
850 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
851 let filter_map_f = q!({
852 let orig = f;
853 move |(k, v)| orig(v).map(|o| (k, o))
854 })
855 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
856 .into();
857
858 KeyedStream::new(
859 self.location.clone(),
860 HydroNode::FilterMap {
861 f: filter_map_f,
862 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
863 metadata: self
864 .location
865 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
866 },
867 )
868 }
869
870 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
871 /// re-grouped even they are tuples; instead they will be grouped under the original key.
872 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
873 ///
874 /// # Example
875 /// ```rust
876 /// # #[cfg(feature = "deploy")] {
877 /// # use hydro_lang::prelude::*;
878 /// # use futures::StreamExt;
879 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
880 /// process
881 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
882 /// .into_keyed()
883 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
884 /// # .entries()
885 /// # }, |mut stream| async move {
886 /// // { 2: [2] }
887 /// # let mut results = Vec::new();
888 /// # for _ in 0..1 {
889 /// # results.push(stream.next().await.unwrap());
890 /// # }
891 /// # results.sort();
892 /// # assert_eq!(results, vec![(2, 2)]);
893 /// # }));
894 /// # }
895 /// ```
896 pub fn filter_map_with_key<U, F>(
897 self,
898 f: impl IntoQuotedMut<'a, F, L> + Copy,
899 ) -> KeyedStream<K, U, L, B, O, R>
900 where
901 F: Fn((K, V)) -> Option<U> + 'a,
902 K: Clone,
903 {
904 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
905 let filter_map_f = q!({
906 let orig = f;
907 move |(k, v)| {
908 let out = orig((Clone::clone(&k), v));
909 out.map(|o| (k, o))
910 }
911 })
912 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
913 .into();
914
915 KeyedStream::new(
916 self.location.clone(),
917 HydroNode::FilterMap {
918 f: filter_map_f,
919 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
920 metadata: self
921 .location
922 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
923 },
924 )
925 }
926
927 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
928 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
929 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
930 ///
931 /// # Example
932 /// ```rust
933 /// # #[cfg(feature = "deploy")] {
934 /// # use hydro_lang::prelude::*;
935 /// # use futures::StreamExt;
936 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
937 /// let tick = process.tick();
938 /// let batch = process
939 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
940 /// .into_keyed()
941 /// .batch(&tick, nondet!(/** test */));
942 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
943 /// batch.cross_singleton(count).all_ticks().entries()
944 /// # }, |mut stream| async move {
945 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
946 /// # let mut results = Vec::new();
947 /// # for _ in 0..3 {
948 /// # results.push(stream.next().await.unwrap());
949 /// # }
950 /// # results.sort();
951 /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
952 /// # }));
953 /// # }
954 /// ```
955 pub fn cross_singleton<O2>(
956 self,
957 other: impl Into<Optional<O2, L, Bounded>>,
958 ) -> KeyedStream<K, (V, O2), L, B, O, R>
959 where
960 O2: Clone,
961 {
962 let other: Optional<O2, L, Bounded> = other.into();
963 check_matching_location(&self.location, &other.location);
964
965 Stream::new(
966 self.location.clone(),
967 HydroNode::CrossSingleton {
968 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
969 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
970 metadata: self
971 .location
972 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
973 },
974 )
975 .map(q!(|((k, v), o2)| (k, (v, o2))))
976 .into_keyed()
977 }
978
979 /// For each value `v` in each group, transform `v` using `f` and then treat the
980 /// result as an [`Iterator`] to produce values one by one within the same group.
981 /// The implementation for [`Iterator`] for the output type `I` must produce items
982 /// in a **deterministic** order.
983 ///
984 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
985 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
986 ///
987 /// # Example
988 /// ```rust
989 /// # #[cfg(feature = "deploy")] {
990 /// # use hydro_lang::prelude::*;
991 /// # use futures::StreamExt;
992 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
993 /// process
994 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
995 /// .into_keyed()
996 /// .flat_map_ordered(q!(|x| x))
997 /// # .entries()
998 /// # }, |mut stream| async move {
999 /// // { 1: [2, 3, 4], 2: [5, 6] }
1000 /// # let mut results = Vec::new();
1001 /// # for _ in 0..5 {
1002 /// # results.push(stream.next().await.unwrap());
1003 /// # }
1004 /// # results.sort();
1005 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1006 /// # }));
1007 /// # }
1008 /// ```
1009 pub fn flat_map_ordered<U, I, F>(
1010 self,
1011 f: impl IntoQuotedMut<'a, F, L> + Copy,
1012 ) -> KeyedStream<K, U, L, B, O, R>
1013 where
1014 I: IntoIterator<Item = U>,
1015 F: Fn(V) -> I + 'a,
1016 K: Clone,
1017 {
1018 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1019 let flat_map_f = q!({
1020 let orig = f;
1021 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1022 })
1023 .splice_fn1_ctx::<(K, V), _>(&self.location)
1024 .into();
1025
1026 KeyedStream::new(
1027 self.location.clone(),
1028 HydroNode::FlatMap {
1029 f: flat_map_f,
1030 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1031 metadata: self
1032 .location
1033 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1034 },
1035 )
1036 }
1037
1038 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1039 /// for the output type `I` to produce items in any order.
1040 ///
1041 /// # Example
1042 /// ```rust
1043 /// # #[cfg(feature = "deploy")] {
1044 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1045 /// # use futures::StreamExt;
1046 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1047 /// process
1048 /// .source_iter(q!(vec![
1049 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1050 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1051 /// ]))
1052 /// .into_keyed()
1053 /// .flat_map_unordered(q!(|x| x))
1054 /// # .entries()
1055 /// # }, |mut stream| async move {
1056 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1057 /// # let mut results = Vec::new();
1058 /// # for _ in 0..4 {
1059 /// # results.push(stream.next().await.unwrap());
1060 /// # }
1061 /// # results.sort();
1062 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1063 /// # }));
1064 /// # }
1065 /// ```
1066 pub fn flat_map_unordered<U, I, F>(
1067 self,
1068 f: impl IntoQuotedMut<'a, F, L> + Copy,
1069 ) -> KeyedStream<K, U, L, B, NoOrder, R>
1070 where
1071 I: IntoIterator<Item = U>,
1072 F: Fn(V) -> I + 'a,
1073 K: Clone,
1074 {
1075 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1076 let flat_map_f = q!({
1077 let orig = f;
1078 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1079 })
1080 .splice_fn1_ctx::<(K, V), _>(&self.location)
1081 .into();
1082
1083 KeyedStream::new(
1084 self.location.clone(),
1085 HydroNode::FlatMap {
1086 f: flat_map_f,
1087 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1088 metadata: self
1089 .location
1090 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1091 },
1092 )
1093 }
1094
1095 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1096 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1097 /// items in a **deterministic** order.
1098 ///
1099 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1100 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1101 ///
1102 /// # Example
1103 /// ```rust
1104 /// # #[cfg(feature = "deploy")] {
1105 /// # use hydro_lang::prelude::*;
1106 /// # use futures::StreamExt;
1107 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1108 /// process
1109 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1110 /// .into_keyed()
1111 /// .flatten_ordered()
1112 /// # .entries()
1113 /// # }, |mut stream| async move {
1114 /// // { 1: [2, 3, 4], 2: [5, 6] }
1115 /// # let mut results = Vec::new();
1116 /// # for _ in 0..5 {
1117 /// # results.push(stream.next().await.unwrap());
1118 /// # }
1119 /// # results.sort();
1120 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1121 /// # }));
1122 /// # }
1123 /// ```
1124 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1125 where
1126 V: IntoIterator<Item = U>,
1127 K: Clone,
1128 {
1129 self.flat_map_ordered(q!(|d| d))
1130 }
1131
1132 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1133 /// for the value type `V` to produce items in any order.
1134 ///
1135 /// # Example
1136 /// ```rust
1137 /// # #[cfg(feature = "deploy")] {
1138 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1139 /// # use futures::StreamExt;
1140 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1141 /// process
1142 /// .source_iter(q!(vec![
1143 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1144 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1145 /// ]))
1146 /// .into_keyed()
1147 /// .flatten_unordered()
1148 /// # .entries()
1149 /// # }, |mut stream| async move {
1150 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1151 /// # let mut results = Vec::new();
1152 /// # for _ in 0..4 {
1153 /// # results.push(stream.next().await.unwrap());
1154 /// # }
1155 /// # results.sort();
1156 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1157 /// # }));
1158 /// # }
1159 /// ```
1160 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1161 where
1162 V: IntoIterator<Item = U>,
1163 K: Clone,
1164 {
1165 self.flat_map_unordered(q!(|d| d))
1166 }
1167
1168 /// An operator which allows you to "inspect" each element of a stream without
1169 /// modifying it. The closure `f` is called on a reference to each value. This is
1170 /// mainly useful for debugging, and should not be used to generate side-effects.
1171 ///
1172 /// # Example
1173 /// ```rust
1174 /// # #[cfg(feature = "deploy")] {
1175 /// # use hydro_lang::prelude::*;
1176 /// # use futures::StreamExt;
1177 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1178 /// process
1179 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1180 /// .into_keyed()
1181 /// .inspect(q!(|v| println!("{}", v)))
1182 /// # .entries()
1183 /// # }, |mut stream| async move {
1184 /// # let mut results = Vec::new();
1185 /// # for _ in 0..3 {
1186 /// # results.push(stream.next().await.unwrap());
1187 /// # }
1188 /// # results.sort();
1189 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1190 /// # }));
1191 /// # }
1192 /// ```
1193 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1194 where
1195 F: Fn(&V) + 'a,
1196 {
1197 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1198 let inspect_f = q!({
1199 let orig = f;
1200 move |t: &(_, _)| orig(&t.1)
1201 })
1202 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1203 .into();
1204
1205 KeyedStream::new(
1206 self.location.clone(),
1207 HydroNode::Inspect {
1208 f: inspect_f,
1209 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1210 metadata: self.location.new_node_metadata(Self::collection_kind()),
1211 },
1212 )
1213 }
1214
1215 /// An operator which allows you to "inspect" each element of a stream without
1216 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1217 /// mainly useful for debugging, and should not be used to generate side-effects.
1218 ///
1219 /// # Example
1220 /// ```rust
1221 /// # #[cfg(feature = "deploy")] {
1222 /// # use hydro_lang::prelude::*;
1223 /// # use futures::StreamExt;
1224 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1225 /// process
1226 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1227 /// .into_keyed()
1228 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1229 /// # .entries()
1230 /// # }, |mut stream| async move {
1231 /// # let mut results = Vec::new();
1232 /// # for _ in 0..3 {
1233 /// # results.push(stream.next().await.unwrap());
1234 /// # }
1235 /// # results.sort();
1236 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1237 /// # }));
1238 /// # }
1239 /// ```
1240 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1241 where
1242 F: Fn(&(K, V)) + 'a,
1243 {
1244 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1245
1246 KeyedStream::new(
1247 self.location.clone(),
1248 HydroNode::Inspect {
1249 f: inspect_f,
1250 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1251 metadata: self.location.new_node_metadata(Self::collection_kind()),
1252 },
1253 )
1254 }
1255
1256 /// An operator which allows you to "name" a `HydroNode`.
1257 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1258 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1259 {
1260 let mut node = self.ir_node.borrow_mut();
1261 let metadata = node.metadata_mut();
1262 metadata.tag = Some(name.to_owned());
1263 }
1264 self
1265 }
1266
1267 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1268 ///
1269 /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1270 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1271 /// early by returning `None`.
1272 ///
1273 /// The function takes a mutable reference to the accumulator and the current element, and returns
1274 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1275 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1276 ///
1277 /// # Example
1278 /// ```rust
1279 /// # #[cfg(feature = "deploy")] {
1280 /// # use hydro_lang::prelude::*;
1281 /// # use futures::StreamExt;
1282 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1283 /// process
1284 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1285 /// .into_keyed()
1286 /// .scan(
1287 /// q!(|| 0),
1288 /// q!(|acc, x| {
1289 /// *acc += x;
1290 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1291 /// }),
1292 /// )
1293 /// # .entries()
1294 /// # }, |mut stream| async move {
1295 /// // Output: { 0: [1], 1: [3, 7] }
1296 /// # let mut results = Vec::new();
1297 /// # for _ in 0..3 {
1298 /// # results.push(stream.next().await.unwrap());
1299 /// # }
1300 /// # results.sort();
1301 /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1302 /// # }));
1303 /// # }
1304 /// ```
1305 pub fn scan<A, U, I, F>(
1306 self,
1307 init: impl IntoQuotedMut<'a, I, L> + Copy,
1308 f: impl IntoQuotedMut<'a, F, L> + Copy,
1309 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1310 where
1311 O: IsOrdered,
1312 R: IsExactlyOnce,
1313 K: Clone + Eq + Hash,
1314 I: Fn() -> A + 'a,
1315 F: Fn(&mut A, V) -> Option<U> + 'a,
1316 {
1317 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1318 self.make_totally_ordered().make_exactly_once().generator(
1319 init,
1320 q!({
1321 let orig = f;
1322 move |state, v| {
1323 if let Some(out) = orig(state, v) {
1324 Generate::Yield(out)
1325 } else {
1326 Generate::Break
1327 }
1328 }
1329 }),
1330 )
1331 }
1332
1333 /// Iteratively processes the elements in each group using a state machine that can yield
1334 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1335 /// syntax in Rust, without requiring special syntax.
1336 ///
1337 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1338 /// state for each group. The second argument defines the processing logic, taking in a
1339 /// mutable reference to the group's state and the value to be processed. It emits a
1340 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1341 /// should be processed.
1342 ///
1343 /// # Example
1344 /// ```rust
1345 /// # #[cfg(feature = "deploy")] {
1346 /// # use hydro_lang::prelude::*;
1347 /// # use futures::StreamExt;
1348 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1349 /// process
1350 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1351 /// .into_keyed()
1352 /// .generator(
1353 /// q!(|| 0),
1354 /// q!(|acc, x| {
1355 /// *acc += x;
1356 /// if *acc > 100 {
1357 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1358 /// "done!".to_owned()
1359 /// )
1360 /// } else if *acc % 2 == 0 {
1361 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1362 /// "even".to_owned()
1363 /// )
1364 /// } else {
1365 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1366 /// }
1367 /// }),
1368 /// )
1369 /// # .entries()
1370 /// # }, |mut stream| async move {
1371 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1372 /// # let mut results = Vec::new();
1373 /// # for _ in 0..3 {
1374 /// # results.push(stream.next().await.unwrap());
1375 /// # }
1376 /// # results.sort();
1377 /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1378 /// # }));
1379 /// # }
1380 /// ```
1381 pub fn generator<A, U, I, F>(
1382 self,
1383 init: impl IntoQuotedMut<'a, I, L> + Copy,
1384 f: impl IntoQuotedMut<'a, F, L> + Copy,
1385 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1386 where
1387 O: IsOrdered,
1388 R: IsExactlyOnce,
1389 K: Clone + Eq + Hash,
1390 I: Fn() -> A + 'a,
1391 F: Fn(&mut A, V) -> Generate<U> + 'a,
1392 {
1393 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1394 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1395
1396 let this = self.make_totally_ordered().make_exactly_once();
1397
1398 let scan_init = q!(|| HashMap::new())
1399 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1400 .into();
1401 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1402 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1403 if let Some(existing_state_value) = existing_state {
1404 match f(existing_state_value, v) {
1405 Generate::Yield(out) => Some(Some((k, out))),
1406 Generate::Return(out) => {
1407 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1408 Some(Some((k, out)))
1409 }
1410 Generate::Break => {
1411 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1412 Some(None)
1413 }
1414 Generate::Continue => Some(None),
1415 }
1416 } else {
1417 Some(None)
1418 }
1419 })
1420 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1421 .into();
1422
1423 let scan_node = HydroNode::Scan {
1424 init: scan_init,
1425 acc: scan_f,
1426 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1427 metadata: this.location.new_node_metadata(Stream::<
1428 Option<(K, U)>,
1429 L,
1430 B,
1431 TotalOrder,
1432 ExactlyOnce,
1433 >::collection_kind()),
1434 };
1435
1436 let flatten_f = q!(|d| d)
1437 .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1438 .into();
1439 let flatten_node = HydroNode::FlatMap {
1440 f: flatten_f,
1441 input: Box::new(scan_node),
1442 metadata: this.location.new_node_metadata(KeyedStream::<
1443 K,
1444 U,
1445 L,
1446 B,
1447 TotalOrder,
1448 ExactlyOnce,
1449 >::collection_kind()),
1450 };
1451
1452 KeyedStream::new(this.location.clone(), flatten_node)
1453 }
1454
1455 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1456 /// in-order across the values in each group. But the aggregation function returns a boolean,
1457 /// which when true indicates that the aggregated result is complete and can be released to
1458 /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1459 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1460 /// normal stream elements.
1461 ///
1462 /// # Example
1463 /// ```rust
1464 /// # #[cfg(feature = "deploy")] {
1465 /// # use hydro_lang::prelude::*;
1466 /// # use futures::StreamExt;
1467 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1468 /// process
1469 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1470 /// .into_keyed()
1471 /// .fold_early_stop(
1472 /// q!(|| 0),
1473 /// q!(|acc, x| {
1474 /// *acc += x;
1475 /// x % 2 == 0
1476 /// }),
1477 /// )
1478 /// # .entries()
1479 /// # }, |mut stream| async move {
1480 /// // Output: { 0: 2, 1: 9 }
1481 /// # let mut results = Vec::new();
1482 /// # for _ in 0..2 {
1483 /// # results.push(stream.next().await.unwrap());
1484 /// # }
1485 /// # results.sort();
1486 /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1487 /// # }));
1488 /// # }
1489 /// ```
1490 pub fn fold_early_stop<A, I, F>(
1491 self,
1492 init: impl IntoQuotedMut<'a, I, L> + Copy,
1493 f: impl IntoQuotedMut<'a, F, L> + Copy,
1494 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1495 where
1496 O: IsOrdered,
1497 R: IsExactlyOnce,
1498 K: Clone + Eq + Hash,
1499 I: Fn() -> A + 'a,
1500 F: Fn(&mut A, V) -> bool + 'a,
1501 {
1502 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1503 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1504 let out_without_bound_cast = self.generator(
1505 q!(move || Some(init())),
1506 q!(move |key_state, v| {
1507 if let Some(key_state_value) = key_state.as_mut() {
1508 if f(key_state_value, v) {
1509 Generate::Return(key_state.take().unwrap())
1510 } else {
1511 Generate::Continue
1512 }
1513 } else {
1514 unreachable!()
1515 }
1516 }),
1517 );
1518
1519 KeyedSingleton::new(
1520 out_without_bound_cast.location.clone(),
1521 HydroNode::Cast {
1522 inner: Box::new(
1523 out_without_bound_cast
1524 .ir_node
1525 .replace(HydroNode::Placeholder),
1526 ),
1527 metadata: out_without_bound_cast
1528 .location
1529 .new_node_metadata(
1530 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1531 ),
1532 },
1533 )
1534 }
1535
1536 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1537 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1538 /// otherwise the first element would be non-deterministic.
1539 ///
1540 /// # Example
1541 /// ```rust
1542 /// # #[cfg(feature = "deploy")] {
1543 /// # use hydro_lang::prelude::*;
1544 /// # use futures::StreamExt;
1545 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1546 /// process
1547 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1548 /// .into_keyed()
1549 /// .first()
1550 /// # .entries()
1551 /// # }, |mut stream| async move {
1552 /// // Output: { 0: 2, 1: 3 }
1553 /// # let mut results = Vec::new();
1554 /// # for _ in 0..2 {
1555 /// # results.push(stream.next().await.unwrap());
1556 /// # }
1557 /// # results.sort();
1558 /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1559 /// # }));
1560 /// # }
1561 /// ```
1562 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1563 where
1564 O: IsOrdered,
1565 R: IsExactlyOnce,
1566 K: Clone + Eq + Hash,
1567 {
1568 self.fold_early_stop(
1569 q!(|| None),
1570 q!(|acc, v| {
1571 *acc = Some(v);
1572 true
1573 }),
1574 )
1575 .map(q!(|v| v.unwrap()))
1576 }
1577
1578 /// Assigns a zero-based index to each value within each key group, emitting
1579 /// `(K, (index, V))` tuples with per-key sequential indices.
1580 ///
1581 /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1582 /// This is a streaming operator that processes elements as they arrive.
1583 ///
1584 /// # Example
1585 /// ```rust
1586 /// # #[cfg(feature = "deploy")] {
1587 /// # use hydro_lang::prelude::*;
1588 /// # use futures::StreamExt;
1589 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1590 /// process
1591 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1592 /// .into_keyed()
1593 /// .enumerate()
1594 /// # .entries()
1595 /// # }, |mut stream| async move {
1596 /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1597 /// # let mut results = Vec::new();
1598 /// # for _ in 0..3 {
1599 /// # results.push(stream.next().await.unwrap());
1600 /// # }
1601 /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1602 /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1603 /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1604 /// # assert_eq!(key2, vec![(0, 20)]);
1605 /// # }));
1606 /// # }
1607 /// ```
1608 pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1609 where
1610 O: IsOrdered,
1611 R: IsExactlyOnce,
1612 K: Eq + Hash + Clone,
1613 {
1614 self.scan(
1615 q!(|| 0),
1616 q!(|acc, next| {
1617 let curr = *acc;
1618 *acc += 1;
1619 Some((curr, next))
1620 }),
1621 )
1622 }
1623
1624 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1625 ///
1626 /// # Example
1627 /// ```rust
1628 /// # #[cfg(feature = "deploy")] {
1629 /// # use hydro_lang::prelude::*;
1630 /// # use futures::StreamExt;
1631 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1632 /// let tick = process.tick();
1633 /// let numbers = process
1634 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1635 /// .into_keyed();
1636 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1637 /// batch
1638 /// .value_counts()
1639 /// .entries()
1640 /// .all_ticks()
1641 /// # }, |mut stream| async move {
1642 /// // (1, 3), (2, 2)
1643 /// # let mut results = Vec::new();
1644 /// # for _ in 0..2 {
1645 /// # results.push(stream.next().await.unwrap());
1646 /// # }
1647 /// # results.sort();
1648 /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1649 /// # }));
1650 /// # }
1651 /// ```
1652 pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1653 where
1654 R: IsExactlyOnce,
1655 K: Eq + Hash,
1656 {
1657 self.make_exactly_once()
1658 .assume_ordering_trusted(
1659 nondet!(/** ordering within each group affects neither result nor intermediates */),
1660 )
1661 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1662 }
1663
1664 /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1665 /// group via the `comb` closure.
1666 ///
1667 /// Depending on the input stream guarantees, the closure may need to be commutative
1668 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1669 ///
1670 /// If the input and output value types are the same and do not require initialization then use
1671 /// [`KeyedStream::reduce`].
1672 ///
1673 /// # Example
1674 /// ```rust
1675 /// # #[cfg(feature = "deploy")] {
1676 /// # use hydro_lang::prelude::*;
1677 /// # use futures::StreamExt;
1678 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1679 /// let tick = process.tick();
1680 /// let numbers = process
1681 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1682 /// .into_keyed();
1683 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1684 /// batch
1685 /// .fold(q!(|| false), q!(|acc, x| *acc |= x))
1686 /// .entries()
1687 /// .all_ticks()
1688 /// # }, |mut stream| async move {
1689 /// // (1, false), (2, true)
1690 /// # let mut results = Vec::new();
1691 /// # for _ in 0..2 {
1692 /// # results.push(stream.next().await.unwrap());
1693 /// # }
1694 /// # results.sort();
1695 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1696 /// # }));
1697 /// # }
1698 /// ```
1699 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1700 self,
1701 init: impl IntoQuotedMut<'a, I, L>,
1702 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1703 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1704 where
1705 K: Eq + Hash,
1706 C: ValidCommutativityFor<O>,
1707 Idemp: ValidIdempotenceFor<R>,
1708 {
1709 let init = init.splice_fn0_ctx(&self.location).into();
1710 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1711 proof.register_proof(&comb);
1712
1713 let ordered = self
1714 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1715 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1716
1717 KeyedSingleton::new(
1718 ordered.location.clone(),
1719 HydroNode::FoldKeyed {
1720 init,
1721 acc: comb.into(),
1722 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1723 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1724 K,
1725 A,
1726 L,
1727 B::WhenValueUnbounded,
1728 >::collection_kind()),
1729 },
1730 )
1731 }
1732
1733 /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1734 /// group via the `comb` closure.
1735 ///
1736 /// Depending on the input stream guarantees, the closure may need to be commutative
1737 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1738 ///
1739 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1740 ///
1741 /// # Example
1742 /// ```rust
1743 /// # #[cfg(feature = "deploy")] {
1744 /// # use hydro_lang::prelude::*;
1745 /// # use futures::StreamExt;
1746 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1747 /// let tick = process.tick();
1748 /// let numbers = process
1749 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1750 /// .into_keyed();
1751 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1752 /// batch
1753 /// .reduce(q!(|acc, x| *acc |= x))
1754 /// .entries()
1755 /// .all_ticks()
1756 /// # }, |mut stream| async move {
1757 /// // (1, false), (2, true)
1758 /// # let mut results = Vec::new();
1759 /// # for _ in 0..2 {
1760 /// # results.push(stream.next().await.unwrap());
1761 /// # }
1762 /// # results.sort();
1763 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1764 /// # }));
1765 /// # }
1766 /// ```
1767 pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1768 self,
1769 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1770 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1771 where
1772 K: Eq + Hash,
1773 C: ValidCommutativityFor<O>,
1774 Idemp: ValidIdempotenceFor<R>,
1775 {
1776 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1777 proof.register_proof(&f);
1778
1779 let ordered = self
1780 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1781 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1782
1783 KeyedSingleton::new(
1784 ordered.location.clone(),
1785 HydroNode::ReduceKeyed {
1786 f: f.into(),
1787 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1788 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1789 K,
1790 V,
1791 L,
1792 B::WhenValueUnbounded,
1793 >::collection_kind()),
1794 },
1795 )
1796 }
1797
1798 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1799 /// are automatically deleted.
1800 ///
1801 /// Depending on the input stream guarantees, the closure may need to be commutative
1802 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1803 ///
1804 /// # Example
1805 /// ```rust
1806 /// # #[cfg(feature = "deploy")] {
1807 /// # use hydro_lang::prelude::*;
1808 /// # use futures::StreamExt;
1809 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1810 /// let tick = process.tick();
1811 /// let watermark = tick.singleton(q!(2));
1812 /// let numbers = process
1813 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1814 /// .into_keyed();
1815 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1816 /// batch
1817 /// .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1818 /// .entries()
1819 /// .all_ticks()
1820 /// # }, |mut stream| async move {
1821 /// // (2, true)
1822 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1823 /// # }));
1824 /// # }
1825 /// ```
1826 pub fn reduce_watermark<O2, F, C, Idemp>(
1827 self,
1828 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1829 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1830 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1831 where
1832 K: Eq + Hash,
1833 O2: Clone,
1834 F: Fn(&mut V, V) + 'a,
1835 C: ValidCommutativityFor<O>,
1836 Idemp: ValidIdempotenceFor<R>,
1837 {
1838 let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1839 check_matching_location(&self.location.root(), other.location.outer());
1840 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1841 proof.register_proof(&f);
1842
1843 let ordered = self
1844 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1845 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1846
1847 KeyedSingleton::new(
1848 ordered.location.clone(),
1849 HydroNode::ReduceKeyedWatermark {
1850 f: f.into(),
1851 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1852 watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1853 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1854 K,
1855 V,
1856 L,
1857 B::WhenValueUnbounded,
1858 >::collection_kind()),
1859 },
1860 )
1861 }
1862
1863 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1864 /// whose keys are not in the bounded stream.
1865 ///
1866 /// # Example
1867 /// ```rust
1868 /// # #[cfg(feature = "deploy")] {
1869 /// # use hydro_lang::prelude::*;
1870 /// # use futures::StreamExt;
1871 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1872 /// let tick = process.tick();
1873 /// let keyed_stream = process
1874 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1875 /// .batch(&tick, nondet!(/** test */))
1876 /// .into_keyed();
1877 /// let keys_to_remove = process
1878 /// .source_iter(q!(vec![1, 2]))
1879 /// .batch(&tick, nondet!(/** test */));
1880 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1881 /// # .entries()
1882 /// # }, |mut stream| async move {
1883 /// // { 3: ['c'], 4: ['d'] }
1884 /// # let mut results = Vec::new();
1885 /// # for _ in 0..2 {
1886 /// # results.push(stream.next().await.unwrap());
1887 /// # }
1888 /// # results.sort();
1889 /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1890 /// # }));
1891 /// # }
1892 /// ```
1893 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1894 self,
1895 other: Stream<K, L, Bounded, O2, R2>,
1896 ) -> Self
1897 where
1898 K: Eq + Hash,
1899 {
1900 check_matching_location(&self.location, &other.location);
1901
1902 KeyedStream::new(
1903 self.location.clone(),
1904 HydroNode::AntiJoin {
1905 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1906 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1907 metadata: self.location.new_node_metadata(Self::collection_kind()),
1908 },
1909 )
1910 }
1911
1912 /// Emit a keyed stream containing keys shared between two keyed streams,
1913 /// where each value in the output keyed stream is a tuple of
1914 /// (self's value, other's value).
1915 /// If there are multiple values for the same key, this performs a cross product
1916 /// for each matching key.
1917 ///
1918 /// # Example
1919 /// ```rust
1920 /// # #[cfg(feature = "deploy")] {
1921 /// # use hydro_lang::prelude::*;
1922 /// # use futures::StreamExt;
1923 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1924 /// let tick = process.tick();
1925 /// let keyed_data = process
1926 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1927 /// .into_keyed()
1928 /// .batch(&tick, nondet!(/** test */));
1929 /// let other_data = process
1930 /// .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1931 /// .into_keyed()
1932 /// .batch(&tick, nondet!(/** test */));
1933 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1934 /// # }, |mut stream| async move {
1935 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1936 /// # let mut results = vec![];
1937 /// # for _ in 0..4 {
1938 /// # results.push(stream.next().await.unwrap());
1939 /// # }
1940 /// # results.sort();
1941 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
1942 /// # }));
1943 /// # }
1944 /// ```
1945 pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
1946 self,
1947 other: KeyedStream<K, V2, L, B, O2, R2>,
1948 ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1949 where
1950 K: Eq + Hash,
1951 R: MinRetries<R2>,
1952 {
1953 self.entries().join(other.entries()).into_keyed()
1954 }
1955
1956 /// Deduplicates values within each key group, emitting each unique value per key
1957 /// exactly once.
1958 ///
1959 /// # Example
1960 /// ```rust
1961 /// # #[cfg(feature = "deploy")] {
1962 /// # use hydro_lang::prelude::*;
1963 /// # use futures::StreamExt;
1964 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1965 /// process
1966 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
1967 /// .into_keyed()
1968 /// .unique()
1969 /// # .entries()
1970 /// # }, |mut stream| async move {
1971 /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
1972 /// # let mut results = Vec::new();
1973 /// # for _ in 0..4 {
1974 /// # results.push(stream.next().await.unwrap());
1975 /// # }
1976 /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1977 /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1978 /// # key1.sort();
1979 /// # key2.sort();
1980 /// # assert_eq!(key1, vec![10, 20]);
1981 /// # assert_eq!(key2, vec![20, 30]);
1982 /// # }));
1983 /// # }
1984 /// ```
1985 pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
1986 where
1987 K: Eq + Hash + Clone,
1988 V: Eq + Hash + Clone,
1989 {
1990 self.entries().unique().into_keyed()
1991 }
1992
1993 /// Sorts the values within each key group in ascending order.
1994 ///
1995 /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
1996 /// each group. This operator will block until all elements in the input stream
1997 /// are available, so it requires the input stream to be [`Bounded`].
1998 ///
1999 /// # Example
2000 /// ```rust
2001 /// # #[cfg(feature = "deploy")] {
2002 /// # use hydro_lang::prelude::*;
2003 /// # use futures::StreamExt;
2004 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2005 /// let tick = process.tick();
2006 /// let numbers = process
2007 /// .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2008 /// .into_keyed();
2009 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2010 /// batch.sort().all_ticks()
2011 /// # .entries()
2012 /// # }, |mut stream| async move {
2013 /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2014 /// # let mut results = Vec::new();
2015 /// # for _ in 0..4 {
2016 /// # results.push(stream.next().await.unwrap());
2017 /// # }
2018 /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2019 /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2020 /// # assert_eq!(key1_vals, vec![1, 3]);
2021 /// # assert_eq!(key2_vals, vec![1, 2]);
2022 /// # }));
2023 /// # }
2024 /// ```
2025 pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2026 where
2027 B: IsBounded,
2028 K: Ord,
2029 V: Ord,
2030 {
2031 self.entries().sort().into_keyed()
2032 }
2033
2034 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2035 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2036 /// is only present in one of the inputs, its values are passed through as-is). The output has
2037 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2038 ///
2039 /// Currently, both input streams must be [`Bounded`]. This operator will block
2040 /// on the first stream until all its elements are available. In a future version,
2041 /// we will relax the requirement on the `other` stream.
2042 ///
2043 /// # Example
2044 /// ```rust
2045 /// # #[cfg(feature = "deploy")] {
2046 /// # use hydro_lang::prelude::*;
2047 /// # use futures::StreamExt;
2048 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2049 /// let tick = process.tick();
2050 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2051 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2052 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2053 /// # .entries()
2054 /// # }, |mut stream| async move {
2055 /// // { 0: [2, 1], 1: [4, 3] }
2056 /// # let mut results = Vec::new();
2057 /// # for _ in 0..4 {
2058 /// # results.push(stream.next().await.unwrap());
2059 /// # }
2060 /// # results.sort();
2061 /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2062 /// # }));
2063 /// # }
2064 /// ```
2065 pub fn chain<O2: Ordering, R2: Retries>(
2066 self,
2067 other: KeyedStream<K, V, L, Bounded, O2, R2>,
2068 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2069 where
2070 B: IsBounded,
2071 O: MinOrder<O2>,
2072 R: MinRetries<R2>,
2073 {
2074 let this = self.make_bounded();
2075 check_matching_location(&this.location, &other.location);
2076
2077 KeyedStream::new(
2078 this.location.clone(),
2079 HydroNode::Chain {
2080 first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2081 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2082 metadata: this.location.new_node_metadata(KeyedStream::<
2083 K,
2084 V,
2085 L,
2086 Bounded,
2087 <O as MinOrder<O2>>::Min,
2088 <R as MinRetries<R2>>::Min,
2089 >::collection_kind()),
2090 },
2091 )
2092 }
2093
2094 /// Emit a keyed stream containing keys shared between the keyed stream and the
2095 /// keyed singleton, where each value in the output keyed stream is a tuple of
2096 /// (the keyed stream's value, the keyed singleton's value).
2097 ///
2098 /// # Example
2099 /// ```rust
2100 /// # #[cfg(feature = "deploy")] {
2101 /// # use hydro_lang::prelude::*;
2102 /// # use futures::StreamExt;
2103 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2104 /// let tick = process.tick();
2105 /// let keyed_data = process
2106 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2107 /// .into_keyed()
2108 /// .batch(&tick, nondet!(/** test */));
2109 /// let singleton_data = process
2110 /// .source_iter(q!(vec![(1, 100), (2, 200)]))
2111 /// .into_keyed()
2112 /// .batch(&tick, nondet!(/** test */))
2113 /// .first();
2114 /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2115 /// # }, |mut stream| async move {
2116 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2117 /// # let mut results = vec![];
2118 /// # for _ in 0..3 {
2119 /// # results.push(stream.next().await.unwrap());
2120 /// # }
2121 /// # results.sort();
2122 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2123 /// # }));
2124 /// # }
2125 /// ```
2126 pub fn join_keyed_singleton<V2: Clone>(
2127 self,
2128 keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
2129 ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
2130 where
2131 B: IsBounded,
2132 K: Eq + Hash,
2133 {
2134 keyed_singleton
2135 .join_keyed_stream(self.make_bounded())
2136 .map(q!(|(v2, v)| (v, v2)))
2137 }
2138
2139 /// Gets the values associated with a specific key from the keyed stream.
2140 /// Returns an empty stream if the key is `None` or there are no associated values.
2141 ///
2142 /// # Example
2143 /// ```rust
2144 /// # #[cfg(feature = "deploy")] {
2145 /// # use hydro_lang::prelude::*;
2146 /// # use futures::StreamExt;
2147 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2148 /// let tick = process.tick();
2149 /// let keyed_data = process
2150 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2151 /// .into_keyed()
2152 /// .batch(&tick, nondet!(/** test */));
2153 /// let key = tick.singleton(q!(1));
2154 /// keyed_data.get(key).all_ticks()
2155 /// # }, |mut stream| async move {
2156 /// // 10, 11 in any order
2157 /// # let mut results = vec![];
2158 /// # for _ in 0..2 {
2159 /// # results.push(stream.next().await.unwrap());
2160 /// # }
2161 /// # results.sort();
2162 /// # assert_eq!(results, vec![10, 11]);
2163 /// # }));
2164 /// # }
2165 /// ```
2166 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, Bounded, NoOrder, R>
2167 where
2168 B: IsBounded,
2169 K: Eq + Hash,
2170 {
2171 self.make_bounded()
2172 .entries()
2173 .join(key.into().into_stream().map(q!(|k| (k, ()))))
2174 .map(q!(|(_, (v, _))| v))
2175 }
2176
2177 /// For each value in `self`, find the matching key in `lookup`.
2178 /// The output is a keyed stream with the key from `self`, and a value
2179 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2180 /// If the key is not present in `lookup`, the option will be [`None`].
2181 ///
2182 /// # Example
2183 /// ```rust
2184 /// # #[cfg(feature = "deploy")] {
2185 /// # use hydro_lang::prelude::*;
2186 /// # use futures::StreamExt;
2187 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2188 /// # let tick = process.tick();
2189 /// let requests = // { 1: [10, 11], 2: 20 }
2190 /// # process
2191 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2192 /// # .into_keyed()
2193 /// # .batch(&tick, nondet!(/** test */));
2194 /// let other_data = // { 10: 100, 11: 110 }
2195 /// # process
2196 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
2197 /// # .into_keyed()
2198 /// # .batch(&tick, nondet!(/** test */))
2199 /// # .first();
2200 /// requests.lookup_keyed_singleton(other_data)
2201 /// # .entries().all_ticks()
2202 /// # }, |mut stream| async move {
2203 /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2204 /// # let mut results = vec![];
2205 /// # for _ in 0..3 {
2206 /// # results.push(stream.next().await.unwrap());
2207 /// # }
2208 /// # results.sort();
2209 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2210 /// # }));
2211 /// # }
2212 /// ```
2213 pub fn lookup_keyed_singleton<V2>(
2214 self,
2215 lookup: KeyedSingleton<V, V2, L, Bounded>,
2216 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2217 where
2218 B: IsBounded,
2219 K: Eq + Hash + Clone,
2220 V: Eq + Hash + Clone,
2221 V2: Clone,
2222 {
2223 self.lookup_keyed_stream(
2224 lookup
2225 .into_keyed_stream()
2226 .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2227 )
2228 }
2229
2230 /// For each value in `self`, find the matching key in `lookup`.
2231 /// The output is a keyed stream with the key from `self`, and a value
2232 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2233 /// If the key is not present in `lookup`, the option will be [`None`].
2234 ///
2235 /// # Example
2236 /// ```rust
2237 /// # #[cfg(feature = "deploy")] {
2238 /// # use hydro_lang::prelude::*;
2239 /// # use futures::StreamExt;
2240 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2241 /// # let tick = process.tick();
2242 /// let requests = // { 1: [10, 11], 2: 20 }
2243 /// # process
2244 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2245 /// # .into_keyed()
2246 /// # .batch(&tick, nondet!(/** test */));
2247 /// let other_data = // { 10: [100, 101], 11: 110 }
2248 /// # process
2249 /// # .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2250 /// # .into_keyed()
2251 /// # .batch(&tick, nondet!(/** test */));
2252 /// requests.lookup_keyed_stream(other_data)
2253 /// # .entries().all_ticks()
2254 /// # }, |mut stream| async move {
2255 /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2256 /// # let mut results = vec![];
2257 /// # for _ in 0..4 {
2258 /// # results.push(stream.next().await.unwrap());
2259 /// # }
2260 /// # results.sort();
2261 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2262 /// # }));
2263 /// # }
2264 /// ```
2265 #[expect(clippy::type_complexity, reason = "retries propagation")]
2266 pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2267 self,
2268 lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2269 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2270 where
2271 B: IsBounded,
2272 K: Eq + Hash + Clone,
2273 V: Eq + Hash + Clone,
2274 V2: Clone,
2275 R: MinRetries<R2>,
2276 {
2277 let inverted = self
2278 .make_bounded()
2279 .entries()
2280 .map(q!(|(key, lookup_value)| (lookup_value, key)))
2281 .into_keyed();
2282 let found = inverted
2283 .clone()
2284 .join_keyed_stream(lookup.clone())
2285 .entries()
2286 .map(q!(|(lookup_value, (key, value))| (
2287 key,
2288 (lookup_value, Some(value))
2289 )))
2290 .into_keyed();
2291 let not_found = inverted
2292 .filter_key_not_in(lookup.keys())
2293 .entries()
2294 .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2295 .into_keyed();
2296
2297 found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2298 }
2299
2300 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2301 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2302 ///
2303 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2304 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2305 /// argument that declares where the stream will be atomically processed. Batching a stream into
2306 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2307 /// [`Tick`] will introduce asynchrony.
2308 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2309 let out_location = Atomic { tick: tick.clone() };
2310 KeyedStream::new(
2311 out_location.clone(),
2312 HydroNode::BeginAtomic {
2313 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2314 metadata: out_location
2315 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2316 },
2317 )
2318 }
2319
2320 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2321 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2322 /// the order of the input.
2323 ///
2324 /// # Non-Determinism
2325 /// The batch boundaries are non-deterministic and may change across executions.
2326 pub fn batch(
2327 self,
2328 tick: &Tick<L>,
2329 nondet: NonDet,
2330 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2331 let _ = nondet;
2332 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2333 KeyedStream::new(
2334 tick.clone(),
2335 HydroNode::Batch {
2336 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2337 metadata: tick.new_node_metadata(
2338 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2339 ),
2340 },
2341 )
2342 }
2343}
2344
2345impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2346 KeyedStream<(K1, K2), V, L, B, O, R>
2347{
2348 /// Produces a new keyed stream by dropping the first element of the compound key.
2349 ///
2350 /// Because multiple keys may share the same suffix, this operation results in re-grouping
2351 /// of the values under the new keys. The values across groups with the same new key
2352 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2353 ///
2354 /// # Example
2355 /// ```rust
2356 /// # #[cfg(feature = "deploy")] {
2357 /// # use hydro_lang::prelude::*;
2358 /// # use futures::StreamExt;
2359 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2360 /// process
2361 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2362 /// .into_keyed()
2363 /// .drop_key_prefix()
2364 /// # .entries()
2365 /// # }, |mut stream| async move {
2366 /// // { 10: [2, 3], 20: [4] }
2367 /// # let mut results = Vec::new();
2368 /// # for _ in 0..3 {
2369 /// # results.push(stream.next().await.unwrap());
2370 /// # }
2371 /// # results.sort();
2372 /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2373 /// # }));
2374 /// # }
2375 /// ```
2376 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2377 self.entries()
2378 .map(q!(|((_k1, k2), v)| (k2, v)))
2379 .into_keyed()
2380 }
2381}
2382
2383impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2384 KeyedStream<K, V, L, Unbounded, O, R>
2385{
2386 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2387 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2388 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2389 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2390 ///
2391 /// Currently, both input streams must be [`Unbounded`].
2392 ///
2393 /// # Example
2394 /// ```rust
2395 /// # #[cfg(feature = "deploy")] {
2396 /// # use hydro_lang::prelude::*;
2397 /// # use futures::StreamExt;
2398 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2399 /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2400 /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2401 /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2402 /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2403 /// numbers1.merge_unordered(numbers2)
2404 /// # .entries()
2405 /// # }, |mut stream| async move {
2406 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2407 /// # let mut results = Vec::new();
2408 /// # for _ in 0..4 {
2409 /// # results.push(stream.next().await.unwrap());
2410 /// # }
2411 /// # results.sort();
2412 /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2413 /// # }));
2414 /// # }
2415 /// ```
2416 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2417 self,
2418 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2419 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2420 where
2421 R: MinRetries<R2>,
2422 {
2423 KeyedStream::new(
2424 self.location.clone(),
2425 HydroNode::Chain {
2426 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2427 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2428 metadata: self.location.new_node_metadata(KeyedStream::<
2429 K,
2430 V,
2431 L,
2432 Unbounded,
2433 NoOrder,
2434 <R as MinRetries<R2>>::Min,
2435 >::collection_kind()),
2436 },
2437 )
2438 }
2439
2440 /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2441 #[deprecated(note = "use `merge_unordered` instead")]
2442 pub fn interleave<O2: Ordering, R2: Retries>(
2443 self,
2444 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2445 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2446 where
2447 R: MinRetries<R2>,
2448 {
2449 self.merge_unordered(other)
2450 }
2451}
2452
2453impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2454where
2455 L: Location<'a> + NoTick,
2456{
2457 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2458 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2459 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2460 /// used to create the atomic section.
2461 ///
2462 /// # Non-Determinism
2463 /// The batch boundaries are non-deterministic and may change across executions.
2464 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2465 let _ = nondet;
2466 KeyedStream::new(
2467 self.location.clone().tick,
2468 HydroNode::Batch {
2469 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2470 metadata: self.location.tick.new_node_metadata(KeyedStream::<
2471 K,
2472 V,
2473 Tick<L>,
2474 Bounded,
2475 O,
2476 R,
2477 >::collection_kind(
2478 )),
2479 },
2480 )
2481 }
2482
2483 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2484 /// See [`KeyedStream::atomic`] for more details.
2485 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2486 KeyedStream::new(
2487 self.location.tick.l.clone(),
2488 HydroNode::EndAtomic {
2489 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2490 metadata: self
2491 .location
2492 .tick
2493 .l
2494 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2495 },
2496 )
2497 }
2498}
2499
2500impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2501where
2502 L: Location<'a>,
2503{
2504 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2505 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2506 /// each key.
2507 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2508 KeyedStream::new(
2509 self.location.outer().clone(),
2510 HydroNode::YieldConcat {
2511 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2512 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2513 K,
2514 V,
2515 L,
2516 Unbounded,
2517 O,
2518 R,
2519 >::collection_kind(
2520 )),
2521 },
2522 )
2523 }
2524
2525 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2526 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2527 /// each key.
2528 ///
2529 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2530 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2531 /// stream's [`Tick`] context.
2532 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2533 let out_location = Atomic {
2534 tick: self.location.clone(),
2535 };
2536
2537 KeyedStream::new(
2538 out_location.clone(),
2539 HydroNode::YieldConcat {
2540 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2541 metadata: out_location.new_node_metadata(KeyedStream::<
2542 K,
2543 V,
2544 Atomic<L>,
2545 Unbounded,
2546 O,
2547 R,
2548 >::collection_kind()),
2549 },
2550 )
2551 }
2552
2553 /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2554 /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2555 ///
2556 /// This API is particularly useful for stateful computation on batches of data, such as
2557 /// maintaining an accumulated state that is up to date with the current batch.
2558 ///
2559 /// # Example
2560 /// ```rust
2561 /// # #[cfg(feature = "deploy")] {
2562 /// # use hydro_lang::prelude::*;
2563 /// # use futures::StreamExt;
2564 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2565 /// let tick = process.tick();
2566 /// # // ticks are lazy by default, forces the second tick to run
2567 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2568 /// # let batch_first_tick = process
2569 /// # .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2570 /// # .into_keyed()
2571 /// # .batch(&tick, nondet!(/** test */));
2572 /// # let batch_second_tick = process
2573 /// # .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2574 /// # .into_keyed()
2575 /// # .batch(&tick, nondet!(/** test */))
2576 /// # .defer_tick(); // appears on the second tick
2577 /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2578 ///
2579 /// input.batch(&tick, nondet!(/** test */))
2580 /// .across_ticks(|s| s.reduce(q!(|sum, new| {
2581 /// *sum += new;
2582 /// }))).entries().all_ticks()
2583 /// # }, |mut stream| async move {
2584 /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2585 /// # let mut results = Vec::new();
2586 /// # for _ in 0..4 {
2587 /// # results.push(stream.next().await.unwrap());
2588 /// # }
2589 /// # results.sort();
2590 /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2591 /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2592 /// # results.clear();
2593 /// # for _ in 0..4 {
2594 /// # results.push(stream.next().await.unwrap());
2595 /// # }
2596 /// # results.sort();
2597 /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2598 /// # }));
2599 /// # }
2600 /// ```
2601 pub fn across_ticks<Out: BatchAtomic>(
2602 self,
2603 thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2604 ) -> Out::Batched {
2605 thunk(self.all_ticks_atomic()).batched_atomic()
2606 }
2607
2608 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2609 /// tick `T` always has the entries of `self` at tick `T - 1`.
2610 ///
2611 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2612 ///
2613 /// This operator enables stateful iterative processing with ticks, by sending data from one
2614 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2615 ///
2616 /// # Example
2617 /// ```rust
2618 /// # #[cfg(feature = "deploy")] {
2619 /// # use hydro_lang::prelude::*;
2620 /// # use futures::StreamExt;
2621 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2622 /// let tick = process.tick();
2623 /// # // ticks are lazy by default, forces the second tick to run
2624 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2625 /// # let batch_first_tick = process
2626 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2627 /// # .batch(&tick, nondet!(/** test */))
2628 /// # .into_keyed();
2629 /// # let batch_second_tick = process
2630 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2631 /// # .batch(&tick, nondet!(/** test */))
2632 /// # .defer_tick()
2633 /// # .into_keyed(); // appears on the second tick
2634 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2635 /// # batch_first_tick.chain(batch_second_tick);
2636 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2637 /// changes_across_ticks // from the current tick
2638 /// )
2639 /// # .entries().all_ticks()
2640 /// # }, |mut stream| async move {
2641 /// // First tick: { 1: [2, 3] }
2642 /// # let mut results = Vec::new();
2643 /// # for _ in 0..2 {
2644 /// # results.push(stream.next().await.unwrap());
2645 /// # }
2646 /// # results.sort();
2647 /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2648 /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2649 /// # results.clear();
2650 /// # for _ in 0..4 {
2651 /// # results.push(stream.next().await.unwrap());
2652 /// # }
2653 /// # results.sort();
2654 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2655 /// // Third tick: { 1: [4], 2: [5] }
2656 /// # results.clear();
2657 /// # for _ in 0..2 {
2658 /// # results.push(stream.next().await.unwrap());
2659 /// # }
2660 /// # results.sort();
2661 /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2662 /// # }));
2663 /// # }
2664 /// ```
2665 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2666 KeyedStream::new(
2667 self.location.clone(),
2668 HydroNode::DeferTick {
2669 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2670 metadata: self.location.new_node_metadata(KeyedStream::<
2671 K,
2672 V,
2673 Tick<L>,
2674 Bounded,
2675 O,
2676 R,
2677 >::collection_kind()),
2678 },
2679 )
2680 }
2681}
2682
2683#[cfg(test)]
2684mod tests {
2685 #[cfg(feature = "deploy")]
2686 use futures::{SinkExt, StreamExt};
2687 #[cfg(feature = "deploy")]
2688 use hydro_deploy::Deployment;
2689 #[cfg(any(feature = "deploy", feature = "sim"))]
2690 use stageleft::q;
2691
2692 #[cfg(any(feature = "deploy", feature = "sim"))]
2693 use crate::compile::builder::FlowBuilder;
2694 #[cfg(feature = "deploy")]
2695 use crate::live_collections::stream::ExactlyOnce;
2696 #[cfg(feature = "sim")]
2697 use crate::live_collections::stream::{NoOrder, TotalOrder};
2698 #[cfg(any(feature = "deploy", feature = "sim"))]
2699 use crate::location::Location;
2700 #[cfg(any(feature = "deploy", feature = "sim"))]
2701 use crate::nondet::nondet;
2702 #[cfg(feature = "deploy")]
2703 use crate::properties::manual_proof;
2704
2705 #[cfg(feature = "deploy")]
2706 #[tokio::test]
2707 async fn reduce_watermark_filter() {
2708 let mut deployment = Deployment::new();
2709
2710 let mut flow = FlowBuilder::new();
2711 let node = flow.process::<()>();
2712 let external = flow.external::<()>();
2713
2714 let node_tick = node.tick();
2715 let watermark = node_tick.singleton(q!(2));
2716
2717 let sum = node
2718 .source_stream(q!(tokio_stream::iter([
2719 (0, 100),
2720 (1, 101),
2721 (2, 102),
2722 (2, 102)
2723 ])))
2724 .into_keyed()
2725 .reduce_watermark(
2726 watermark,
2727 q!(|acc, v| {
2728 *acc += v;
2729 }),
2730 )
2731 .snapshot(&node_tick, nondet!(/** test */))
2732 .entries()
2733 .all_ticks()
2734 .send_bincode_external(&external);
2735
2736 let nodes = flow
2737 .with_process(&node, deployment.Localhost())
2738 .with_external(&external, deployment.Localhost())
2739 .deploy(&mut deployment);
2740
2741 deployment.deploy().await.unwrap();
2742
2743 let mut out = nodes.connect(sum).await;
2744
2745 deployment.start().await.unwrap();
2746
2747 assert_eq!(out.next().await.unwrap(), (2, 204));
2748 }
2749
2750 #[cfg(feature = "deploy")]
2751 #[tokio::test]
2752 async fn reduce_watermark_bounded() {
2753 let mut deployment = Deployment::new();
2754
2755 let mut flow = FlowBuilder::new();
2756 let node = flow.process::<()>();
2757 let external = flow.external::<()>();
2758
2759 let node_tick = node.tick();
2760 let watermark = node_tick.singleton(q!(2));
2761
2762 let sum = node
2763 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2764 .into_keyed()
2765 .reduce_watermark(
2766 watermark,
2767 q!(|acc, v| {
2768 *acc += v;
2769 }),
2770 )
2771 .entries()
2772 .send_bincode_external(&external);
2773
2774 let nodes = flow
2775 .with_process(&node, deployment.Localhost())
2776 .with_external(&external, deployment.Localhost())
2777 .deploy(&mut deployment);
2778
2779 deployment.deploy().await.unwrap();
2780
2781 let mut out = nodes.connect(sum).await;
2782
2783 deployment.start().await.unwrap();
2784
2785 assert_eq!(out.next().await.unwrap(), (2, 204));
2786 }
2787
2788 #[cfg(feature = "deploy")]
2789 #[tokio::test]
2790 async fn reduce_watermark_garbage_collect() {
2791 let mut deployment = Deployment::new();
2792
2793 let mut flow = FlowBuilder::new();
2794 let node = flow.process::<()>();
2795 let external = flow.external::<()>();
2796 let (tick_send, tick_trigger) =
2797 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2798
2799 let node_tick = node.tick();
2800 let (watermark_complete_cycle, watermark) =
2801 node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
2802 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2803 watermark_complete_cycle.complete_next_tick(next_watermark);
2804
2805 let tick_triggered_input = node_tick
2806 .singleton(q!((3, 103)))
2807 .into_stream()
2808 .filter_if(
2809 tick_trigger
2810 .clone()
2811 .batch(&node_tick, nondet!(/** test */))
2812 .first()
2813 .is_some(),
2814 )
2815 .all_ticks();
2816
2817 let sum = node
2818 .source_stream(q!(tokio_stream::iter([
2819 (0, 100),
2820 (1, 101),
2821 (2, 102),
2822 (2, 102)
2823 ])))
2824 .merge_unordered(tick_triggered_input)
2825 .into_keyed()
2826 .reduce_watermark(
2827 watermark,
2828 q!(
2829 |acc, v| {
2830 *acc += v;
2831 },
2832 commutative = manual_proof!(/** integer addition is commutative */)
2833 ),
2834 )
2835 .snapshot(&node_tick, nondet!(/** test */))
2836 .entries()
2837 .all_ticks()
2838 .send_bincode_external(&external);
2839
2840 let nodes = flow
2841 .with_default_optimize()
2842 .with_process(&node, deployment.Localhost())
2843 .with_external(&external, deployment.Localhost())
2844 .deploy(&mut deployment);
2845
2846 deployment.deploy().await.unwrap();
2847
2848 let mut tick_send = nodes.connect(tick_send).await;
2849 let mut out_recv = nodes.connect(sum).await;
2850
2851 deployment.start().await.unwrap();
2852
2853 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2854
2855 tick_send.send(()).await.unwrap();
2856
2857 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2858 }
2859
2860 #[cfg(feature = "sim")]
2861 #[test]
2862 #[should_panic]
2863 fn sim_batch_nondet_size() {
2864 let mut flow = FlowBuilder::new();
2865 let node = flow.process::<()>();
2866
2867 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2868
2869 let tick = node.tick();
2870 let out_recv = input
2871 .batch(&tick, nondet!(/** test */))
2872 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2873 .entries()
2874 .all_ticks()
2875 .sim_output();
2876
2877 flow.sim().exhaustive(async || {
2878 out_recv
2879 .assert_yields_only_unordered([(1, vec![1, 2])])
2880 .await;
2881 });
2882 }
2883
2884 #[cfg(feature = "sim")]
2885 #[test]
2886 fn sim_batch_preserves_group_order() {
2887 let mut flow = FlowBuilder::new();
2888 let node = flow.process::<()>();
2889
2890 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2891
2892 let tick = node.tick();
2893 let out_recv = input
2894 .batch(&tick, nondet!(/** test */))
2895 .all_ticks()
2896 .fold_early_stop(
2897 q!(|| 0),
2898 q!(|acc, v| {
2899 *acc = std::cmp::max(v, *acc);
2900 *acc >= 2
2901 }),
2902 )
2903 .entries()
2904 .sim_output();
2905
2906 let instances = flow.sim().exhaustive(async || {
2907 out_recv
2908 .assert_yields_only_unordered([(1, 2), (2, 3)])
2909 .await;
2910 });
2911
2912 assert_eq!(instances, 8);
2913 // - three cases: all three in a separate tick (pick where (2, 3) is)
2914 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2915 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2916 // - one case: all three together
2917 }
2918
2919 #[cfg(feature = "sim")]
2920 #[test]
2921 fn sim_batch_unordered_shuffles() {
2922 let mut flow = FlowBuilder::new();
2923 let node = flow.process::<()>();
2924
2925 let input = node
2926 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2927 .into_keyed()
2928 .weaken_ordering::<NoOrder>();
2929
2930 let tick = node.tick();
2931 let out_recv = input
2932 .batch(&tick, nondet!(/** test */))
2933 .all_ticks()
2934 .entries()
2935 .sim_output();
2936
2937 let instances = flow.sim().exhaustive(async || {
2938 out_recv
2939 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2940 .await;
2941 });
2942
2943 assert_eq!(instances, 13);
2944 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2945 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2946 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2947 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2948 }
2949
2950 #[cfg(feature = "sim")]
2951 #[test]
2952 #[should_panic]
2953 fn sim_observe_order_batched() {
2954 let mut flow = FlowBuilder::new();
2955 let node = flow.process::<()>();
2956
2957 let (port, input) = node.sim_input::<_, NoOrder, _>();
2958
2959 let tick = node.tick();
2960 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2961 let out_recv = batch
2962 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2963 .all_ticks()
2964 .first()
2965 .entries()
2966 .sim_output();
2967
2968 flow.sim().exhaustive(async || {
2969 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2970 out_recv
2971 .assert_yields_only_unordered([(1, 1), (2, 1)])
2972 .await; // fails with assume_ordering
2973 });
2974 }
2975
2976 #[cfg(feature = "sim")]
2977 #[test]
2978 fn sim_observe_order_batched_count() {
2979 let mut flow = FlowBuilder::new();
2980 let node = flow.process::<()>();
2981
2982 let (port, input) = node.sim_input::<_, NoOrder, _>();
2983
2984 let tick = node.tick();
2985 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2986 let out_recv = batch
2987 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2988 .all_ticks()
2989 .entries()
2990 .sim_output();
2991
2992 let instance_count = flow.sim().exhaustive(async || {
2993 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2994 let _ = out_recv.collect_sorted::<Vec<_>>().await;
2995 });
2996
2997 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2998 }
2999
3000 #[cfg(feature = "sim")]
3001 #[test]
3002 fn sim_top_level_assume_ordering() {
3003 use std::collections::HashMap;
3004
3005 let mut flow = FlowBuilder::new();
3006 let node = flow.process::<()>();
3007
3008 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3009
3010 let out_recv = input
3011 .into_keyed()
3012 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3013 .fold_early_stop(
3014 q!(|| Vec::new()),
3015 q!(|acc, v| {
3016 acc.push(v);
3017 acc.len() >= 2
3018 }),
3019 )
3020 .entries()
3021 .sim_output();
3022
3023 let instance_count = flow.sim().exhaustive(async || {
3024 in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3025 let out: HashMap<_, _> = out_recv
3026 .collect_sorted::<Vec<_>>()
3027 .await
3028 .into_iter()
3029 .collect();
3030 // Each key accumulates its values; we get one entry per key
3031 assert_eq!(out.len(), 2);
3032 });
3033
3034 assert_eq!(instance_count, 24)
3035 }
3036
3037 #[cfg(feature = "sim")]
3038 #[test]
3039 fn sim_top_level_assume_ordering_cycle_back() {
3040 use std::collections::HashMap;
3041
3042 let mut flow = FlowBuilder::new();
3043 let node = flow.process::<()>();
3044
3045 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3046
3047 let (complete_cycle_back, cycle_back) =
3048 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3049 let ordered = input
3050 .into_keyed()
3051 .merge_unordered(cycle_back)
3052 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3053 complete_cycle_back.complete(
3054 ordered
3055 .clone()
3056 .map(q!(|v| v + 1))
3057 .filter(q!(|v| v % 2 == 1)),
3058 );
3059
3060 let out_recv = ordered
3061 .fold_early_stop(
3062 q!(|| Vec::new()),
3063 q!(|acc, v| {
3064 acc.push(v);
3065 acc.len() >= 2
3066 }),
3067 )
3068 .entries()
3069 .sim_output();
3070
3071 let mut saw = false;
3072 let instance_count = flow.sim().exhaustive(async || {
3073 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3074 // We want to see [0, 1] - the cycled back value interleaved
3075 in_send.send_many_unordered([(1, 0), (1, 2)]);
3076 let out: HashMap<_, _> = out_recv
3077 .collect_sorted::<Vec<_>>()
3078 .await
3079 .into_iter()
3080 .collect();
3081
3082 // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3083 if let Some(values) = out.get(&1)
3084 && *values == vec![0, 1]
3085 {
3086 saw = true;
3087 }
3088 });
3089
3090 assert!(
3091 saw,
3092 "did not see an instance with key 1 having [0, 1] in order"
3093 );
3094 assert_eq!(instance_count, 6);
3095 }
3096
3097 #[cfg(feature = "sim")]
3098 #[test]
3099 fn sim_top_level_assume_ordering_cross_key_cycle() {
3100 use std::collections::HashMap;
3101
3102 // This test demonstrates why releasing one entry at a time is important:
3103 // When one key's observed order cycles back into a different key, we need
3104 // to be able to interleave the cycled-back entry with pending items for
3105 // that other key.
3106 let mut flow = FlowBuilder::new();
3107 let node = flow.process::<()>();
3108
3109 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3110
3111 let (complete_cycle_back, cycle_back) =
3112 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3113 let ordered = input
3114 .into_keyed()
3115 .merge_unordered(cycle_back)
3116 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3117
3118 // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3119 complete_cycle_back.complete(
3120 ordered
3121 .clone()
3122 .filter(q!(|v| *v == 10))
3123 .map(q!(|_| 100))
3124 .entries()
3125 .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3126 .into_keyed(),
3127 );
3128
3129 let out_recv = ordered
3130 .fold_early_stop(
3131 q!(|| Vec::new()),
3132 q!(|acc, v| {
3133 acc.push(v);
3134 acc.len() >= 2
3135 }),
3136 )
3137 .entries()
3138 .sim_output();
3139
3140 // We want to see an instance where:
3141 // - (1, 10) is released first
3142 // - This causes (2, 100) to be cycled back
3143 // - (2, 100) is released BEFORE (2, 20) which was already pending
3144 let mut saw_cross_key_interleave = false;
3145 let instance_count = flow.sim().exhaustive(async || {
3146 // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3147 in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3148 let out: HashMap<_, _> = out_recv
3149 .collect_sorted::<Vec<_>>()
3150 .await
3151 .into_iter()
3152 .collect();
3153
3154 // Check if we see the cross-key interleaving:
3155 // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3156 if let Some(values) = out.get(&2)
3157 && values.len() >= 2
3158 && values[0] == 100
3159 {
3160 saw_cross_key_interleave = true;
3161 }
3162 });
3163
3164 assert!(
3165 saw_cross_key_interleave,
3166 "did not see an instance where cycled-back 100 was released before pending items for key 2"
3167 );
3168 assert_eq!(instance_count, 60);
3169 }
3170
3171 #[cfg(feature = "sim")]
3172 #[test]
3173 fn sim_top_level_assume_ordering_cycle_back_tick() {
3174 use std::collections::HashMap;
3175
3176 let mut flow = FlowBuilder::new();
3177 let node = flow.process::<()>();
3178
3179 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3180
3181 let (complete_cycle_back, cycle_back) =
3182 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3183 let ordered = input
3184 .into_keyed()
3185 .merge_unordered(cycle_back)
3186 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3187 complete_cycle_back.complete(
3188 ordered
3189 .clone()
3190 .batch(&node.tick(), nondet!(/** test */))
3191 .all_ticks()
3192 .map(q!(|v| v + 1))
3193 .filter(q!(|v| v % 2 == 1)),
3194 );
3195
3196 let out_recv = ordered
3197 .fold_early_stop(
3198 q!(|| Vec::new()),
3199 q!(|acc, v| {
3200 acc.push(v);
3201 acc.len() >= 2
3202 }),
3203 )
3204 .entries()
3205 .sim_output();
3206
3207 let mut saw = false;
3208 let instance_count = flow.sim().exhaustive(async || {
3209 in_send.send_many_unordered([(1, 0), (1, 2)]);
3210 let out: HashMap<_, _> = out_recv
3211 .collect_sorted::<Vec<_>>()
3212 .await
3213 .into_iter()
3214 .collect();
3215
3216 if let Some(values) = out.get(&1)
3217 && *values == vec![0, 1]
3218 {
3219 saw = true;
3220 }
3221 });
3222
3223 assert!(
3224 saw,
3225 "did not see an instance with key 1 having [0, 1] in order"
3226 );
3227 assert_eq!(instance_count, 58);
3228 }
3229}