hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::batch_atomic::BatchAtomic;
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::{Atomic, DeferTick, NoAtomic};
28use crate::location::{Location, NoTick, Tick, check_matching_location};
29use crate::nondet::{NonDet, nondet};
30
31pub mod networking;
32
33/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
34#[sealed::sealed]
35pub trait Ordering:
36 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
37{
38 /// The [`StreamOrder`] corresponding to this type.
39 const ORDERING_KIND: StreamOrder;
40}
41
42/// Marks the stream as being totally ordered, which means that there are
43/// no sources of non-determinism (other than intentional ones) that will
44/// affect the order of elements.
45pub enum TotalOrder {}
46
47#[sealed::sealed]
48impl Ordering for TotalOrder {
49 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
50}
51
52/// Marks the stream as having no order, which means that the order of
53/// elements may be affected by non-determinism.
54///
55/// This restricts certain operators, such as `fold` and `reduce`, to only
56/// be used with commutative aggregation functions.
57pub enum NoOrder {}
58
59#[sealed::sealed]
60impl Ordering for NoOrder {
61 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
62}
63
64/// Helper trait for determining the weakest of two orderings.
65#[sealed::sealed]
66pub trait MinOrder<Other: ?Sized> {
67 /// The weaker of the two orderings.
68 type Min: Ordering;
69}
70
71#[sealed::sealed]
72impl MinOrder<NoOrder> for TotalOrder {
73 type Min = NoOrder;
74}
75
76#[sealed::sealed]
77impl MinOrder<TotalOrder> for TotalOrder {
78 type Min = TotalOrder;
79}
80
81#[sealed::sealed]
82impl MinOrder<TotalOrder> for NoOrder {
83 type Min = NoOrder;
84}
85
86#[sealed::sealed]
87impl MinOrder<NoOrder> for NoOrder {
88 type Min = NoOrder;
89}
90
91/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
92#[sealed::sealed]
93pub trait Retries:
94 MinRetries<Self, Min = Self>
95 + MinRetries<ExactlyOnce, Min = Self>
96 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
97{
98 /// The [`StreamRetry`] corresponding to this type.
99 const RETRIES_KIND: StreamRetry;
100}
101
102/// Marks the stream as having deterministic message cardinality, with no
103/// possibility of duplicates.
104pub enum ExactlyOnce {}
105
106#[sealed::sealed]
107impl Retries for ExactlyOnce {
108 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
109}
110
111/// Marks the stream as having non-deterministic message cardinality, which
112/// means that duplicates may occur, but messages will not be dropped.
113pub enum AtLeastOnce {}
114
115#[sealed::sealed]
116impl Retries for AtLeastOnce {
117 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
118}
119
120/// Helper trait for determining the weakest of two retry guarantees.
121#[sealed::sealed]
122pub trait MinRetries<Other: ?Sized> {
123 /// The weaker of the two retry guarantees.
124 type Min: Retries;
125}
126
127#[sealed::sealed]
128impl MinRetries<AtLeastOnce> for ExactlyOnce {
129 type Min = AtLeastOnce;
130}
131
132#[sealed::sealed]
133impl MinRetries<ExactlyOnce> for ExactlyOnce {
134 type Min = ExactlyOnce;
135}
136
137#[sealed::sealed]
138impl MinRetries<ExactlyOnce> for AtLeastOnce {
139 type Min = AtLeastOnce;
140}
141
142#[sealed::sealed]
143impl MinRetries<AtLeastOnce> for AtLeastOnce {
144 type Min = AtLeastOnce;
145}
146
147/// Streaming sequence of elements with type `Type`.
148///
149/// This live collection represents a growing sequence of elements, with new elements being
150/// asynchronously appended to the end of the sequence. This can be used to model the arrival
151/// of network input, such as API requests, or streaming ingestion.
152///
153/// By default, all streams have deterministic ordering and each element is materialized exactly
154/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
155/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
156/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
157///
158/// Type Parameters:
159/// - `Type`: the type of elements in the stream
160/// - `Loc`: the location where the stream is being materialized
161/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
162/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
163/// (default is [`TotalOrder`])
164/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
165/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
166pub struct Stream<
167 Type,
168 Loc,
169 Bound: Boundedness = Unbounded,
170 Order: Ordering = TotalOrder,
171 Retry: Retries = ExactlyOnce,
172> {
173 pub(crate) location: Loc,
174 pub(crate) ir_node: RefCell<HydroNode>,
175
176 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
177}
178
179impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
180 for Stream<T, L, Unbounded, O, R>
181where
182 L: Location<'a>,
183{
184 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
185 Stream {
186 location: stream.location,
187 ir_node: stream.ir_node,
188 _phantom: PhantomData,
189 }
190 }
191}
192
193impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
194 for Stream<T, L, B, NoOrder, R>
195where
196 L: Location<'a>,
197{
198 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
199 Stream {
200 location: stream.location,
201 ir_node: stream.ir_node,
202 _phantom: PhantomData,
203 }
204 }
205}
206
207impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
208 for Stream<T, L, B, O, AtLeastOnce>
209where
210 L: Location<'a>,
211{
212 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
213 Stream {
214 location: stream.location,
215 ir_node: stream.ir_node,
216 _phantom: PhantomData,
217 }
218 }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
222where
223 L: Location<'a>,
224{
225 fn defer_tick(self) -> Self {
226 Stream::defer_tick(self)
227 }
228}
229
230impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
231 for Stream<T, Tick<L>, Bounded, O, R>
232where
233 L: Location<'a>,
234{
235 type Location = Tick<L>;
236
237 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
238 Stream::new(
239 location.clone(),
240 HydroNode::CycleSource {
241 ident,
242 metadata: location.new_node_metadata(Self::collection_kind()),
243 },
244 )
245 }
246}
247
248impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
249 for Stream<T, Tick<L>, Bounded, O, R>
250where
251 L: Location<'a>,
252{
253 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
254 assert_eq!(
255 Location::id(&self.location),
256 expected_location,
257 "locations do not match"
258 );
259 self.location
260 .flow_state()
261 .borrow_mut()
262 .push_root(HydroRoot::CycleSink {
263 ident,
264 input: Box::new(self.ir_node.into_inner()),
265 op_metadata: HydroIrOpMetadata::new(),
266 });
267 }
268}
269
270impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
271 for Stream<T, L, B, O, R>
272where
273 L: Location<'a> + NoTick,
274{
275 type Location = L;
276
277 fn create_source(ident: syn::Ident, location: L) -> Self {
278 Stream::new(
279 location.clone(),
280 HydroNode::CycleSource {
281 ident,
282 metadata: location.new_node_metadata(Self::collection_kind()),
283 },
284 )
285 }
286}
287
288impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
289 for Stream<T, L, B, O, R>
290where
291 L: Location<'a> + NoTick,
292{
293 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
294 assert_eq!(
295 Location::id(&self.location),
296 expected_location,
297 "locations do not match"
298 );
299 self.location
300 .flow_state()
301 .borrow_mut()
302 .push_root(HydroRoot::CycleSink {
303 ident,
304 input: Box::new(self.ir_node.into_inner()),
305 op_metadata: HydroIrOpMetadata::new(),
306 });
307 }
308}
309
310impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
311where
312 T: Clone,
313 L: Location<'a>,
314{
315 fn clone(&self) -> Self {
316 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
317 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
318 *self.ir_node.borrow_mut() = HydroNode::Tee {
319 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
320 metadata: self.location.new_node_metadata(Self::collection_kind()),
321 };
322 }
323
324 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
325 Stream {
326 location: self.location.clone(),
327 ir_node: HydroNode::Tee {
328 inner: TeeNode(inner.0.clone()),
329 metadata: metadata.clone(),
330 }
331 .into(),
332 _phantom: PhantomData,
333 }
334 } else {
335 unreachable!()
336 }
337 }
338}
339
340impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
341where
342 L: Location<'a>,
343{
344 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
345 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
346 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
347
348 Stream {
349 location,
350 ir_node: RefCell::new(ir_node),
351 _phantom: PhantomData,
352 }
353 }
354
355 /// Returns the [`Location`] where this stream is being materialized.
356 pub fn location(&self) -> &L {
357 &self.location
358 }
359
360 pub(crate) fn collection_kind() -> CollectionKind {
361 CollectionKind::Stream {
362 bound: B::BOUND_KIND,
363 order: O::ORDERING_KIND,
364 retry: R::RETRIES_KIND,
365 element_type: quote_type::<T>().into(),
366 }
367 }
368
369 /// Produces a stream based on invoking `f` on each element.
370 /// If you do not want to modify the stream and instead only want to view
371 /// each item use [`Stream::inspect`] instead.
372 ///
373 /// # Example
374 /// ```rust
375 /// # #[cfg(feature = "deploy")] {
376 /// # use hydro_lang::prelude::*;
377 /// # use futures::StreamExt;
378 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
379 /// let words = process.source_iter(q!(vec!["hello", "world"]));
380 /// words.map(q!(|x| x.to_uppercase()))
381 /// # }, |mut stream| async move {
382 /// # for w in vec!["HELLO", "WORLD"] {
383 /// # assert_eq!(stream.next().await.unwrap(), w);
384 /// # }
385 /// # }));
386 /// # }
387 /// ```
388 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
389 where
390 F: Fn(T) -> U + 'a,
391 {
392 let f = f.splice_fn1_ctx(&self.location).into();
393 Stream::new(
394 self.location.clone(),
395 HydroNode::Map {
396 f,
397 input: Box::new(self.ir_node.into_inner()),
398 metadata: self
399 .location
400 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
401 },
402 )
403 }
404
405 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
406 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
407 /// for the output type `U` must produce items in a **deterministic** order.
408 ///
409 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
410 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
411 ///
412 /// # Example
413 /// ```rust
414 /// # #[cfg(feature = "deploy")] {
415 /// # use hydro_lang::prelude::*;
416 /// # use futures::StreamExt;
417 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
418 /// process
419 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
420 /// .flat_map_ordered(q!(|x| x))
421 /// # }, |mut stream| async move {
422 /// // 1, 2, 3, 4
423 /// # for w in (1..5) {
424 /// # assert_eq!(stream.next().await.unwrap(), w);
425 /// # }
426 /// # }));
427 /// # }
428 /// ```
429 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
430 where
431 I: IntoIterator<Item = U>,
432 F: Fn(T) -> I + 'a,
433 {
434 let f = f.splice_fn1_ctx(&self.location).into();
435 Stream::new(
436 self.location.clone(),
437 HydroNode::FlatMap {
438 f,
439 input: Box::new(self.ir_node.into_inner()),
440 metadata: self
441 .location
442 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
443 },
444 )
445 }
446
447 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
448 /// for the output type `U` to produce items in any order.
449 ///
450 /// # Example
451 /// ```rust
452 /// # #[cfg(feature = "deploy")] {
453 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
454 /// # use futures::StreamExt;
455 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
456 /// process
457 /// .source_iter(q!(vec![
458 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
459 /// std::collections::HashSet::from_iter(vec![3, 4]),
460 /// ]))
461 /// .flat_map_unordered(q!(|x| x))
462 /// # }, |mut stream| async move {
463 /// // 1, 2, 3, 4, but in no particular order
464 /// # let mut results = Vec::new();
465 /// # for w in (1..5) {
466 /// # results.push(stream.next().await.unwrap());
467 /// # }
468 /// # results.sort();
469 /// # assert_eq!(results, vec![1, 2, 3, 4]);
470 /// # }));
471 /// # }
472 /// ```
473 pub fn flat_map_unordered<U, I, F>(
474 self,
475 f: impl IntoQuotedMut<'a, F, L>,
476 ) -> Stream<U, L, B, NoOrder, R>
477 where
478 I: IntoIterator<Item = U>,
479 F: Fn(T) -> I + 'a,
480 {
481 let f = f.splice_fn1_ctx(&self.location).into();
482 Stream::new(
483 self.location.clone(),
484 HydroNode::FlatMap {
485 f,
486 input: Box::new(self.ir_node.into_inner()),
487 metadata: self
488 .location
489 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
490 },
491 )
492 }
493
494 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
495 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
496 ///
497 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
498 /// not deterministic, use [`Stream::flatten_unordered`] instead.
499 ///
500 /// ```rust
501 /// # #[cfg(feature = "deploy")] {
502 /// # use hydro_lang::prelude::*;
503 /// # use futures::StreamExt;
504 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
505 /// process
506 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
507 /// .flatten_ordered()
508 /// # }, |mut stream| async move {
509 /// // 1, 2, 3, 4
510 /// # for w in (1..5) {
511 /// # assert_eq!(stream.next().await.unwrap(), w);
512 /// # }
513 /// # }));
514 /// # }
515 /// ```
516 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
517 where
518 T: IntoIterator<Item = U>,
519 {
520 self.flat_map_ordered(q!(|d| d))
521 }
522
523 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
524 /// for the element type `T` to produce items in any order.
525 ///
526 /// # Example
527 /// ```rust
528 /// # #[cfg(feature = "deploy")] {
529 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
530 /// # use futures::StreamExt;
531 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
532 /// process
533 /// .source_iter(q!(vec![
534 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
535 /// std::collections::HashSet::from_iter(vec![3, 4]),
536 /// ]))
537 /// .flatten_unordered()
538 /// # }, |mut stream| async move {
539 /// // 1, 2, 3, 4, but in no particular order
540 /// # let mut results = Vec::new();
541 /// # for w in (1..5) {
542 /// # results.push(stream.next().await.unwrap());
543 /// # }
544 /// # results.sort();
545 /// # assert_eq!(results, vec![1, 2, 3, 4]);
546 /// # }));
547 /// # }
548 /// ```
549 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
550 where
551 T: IntoIterator<Item = U>,
552 {
553 self.flat_map_unordered(q!(|d| d))
554 }
555
556 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
557 /// `f`, preserving the order of the elements.
558 ///
559 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
560 /// not modify or take ownership of the values. If you need to modify the values while filtering
561 /// use [`Stream::filter_map`] instead.
562 ///
563 /// # Example
564 /// ```rust
565 /// # #[cfg(feature = "deploy")] {
566 /// # use hydro_lang::prelude::*;
567 /// # use futures::StreamExt;
568 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
569 /// process
570 /// .source_iter(q!(vec![1, 2, 3, 4]))
571 /// .filter(q!(|&x| x > 2))
572 /// # }, |mut stream| async move {
573 /// // 3, 4
574 /// # for w in (3..5) {
575 /// # assert_eq!(stream.next().await.unwrap(), w);
576 /// # }
577 /// # }));
578 /// # }
579 /// ```
580 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
581 where
582 F: Fn(&T) -> bool + 'a,
583 {
584 let f = f.splice_fn1_borrow_ctx(&self.location).into();
585 Stream::new(
586 self.location.clone(),
587 HydroNode::Filter {
588 f,
589 input: Box::new(self.ir_node.into_inner()),
590 metadata: self.location.new_node_metadata(Self::collection_kind()),
591 },
592 )
593 }
594
595 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
596 ///
597 /// # Example
598 /// ```rust
599 /// # #[cfg(feature = "deploy")] {
600 /// # use hydro_lang::prelude::*;
601 /// # use futures::StreamExt;
602 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
603 /// process
604 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
605 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
606 /// # }, |mut stream| async move {
607 /// // 1, 2
608 /// # for w in (1..3) {
609 /// # assert_eq!(stream.next().await.unwrap(), w);
610 /// # }
611 /// # }));
612 /// # }
613 /// ```
614 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
615 where
616 F: Fn(T) -> Option<U> + 'a,
617 {
618 let f = f.splice_fn1_ctx(&self.location).into();
619 Stream::new(
620 self.location.clone(),
621 HydroNode::FilterMap {
622 f,
623 input: Box::new(self.ir_node.into_inner()),
624 metadata: self
625 .location
626 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
627 },
628 )
629 }
630
631 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
632 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
633 /// If `other` is an empty [`Optional`], no values will be produced.
634 ///
635 /// # Example
636 /// ```rust
637 /// # #[cfg(feature = "deploy")] {
638 /// # use hydro_lang::prelude::*;
639 /// # use futures::StreamExt;
640 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
641 /// let tick = process.tick();
642 /// let batch = process
643 /// .source_iter(q!(vec![1, 2, 3, 4]))
644 /// .batch(&tick, nondet!(/** test */));
645 /// let count = batch.clone().count(); // `count()` returns a singleton
646 /// batch.cross_singleton(count).all_ticks()
647 /// # }, |mut stream| async move {
648 /// // (1, 4), (2, 4), (3, 4), (4, 4)
649 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
650 /// # assert_eq!(stream.next().await.unwrap(), w);
651 /// # }
652 /// # }));
653 /// # }
654 /// ```
655 pub fn cross_singleton<O2>(
656 self,
657 other: impl Into<Optional<O2, L, Bounded>>,
658 ) -> Stream<(T, O2), L, B, O, R>
659 where
660 O2: Clone,
661 {
662 let other: Optional<O2, L, Bounded> = other.into();
663 check_matching_location(&self.location, &other.location);
664
665 Stream::new(
666 self.location.clone(),
667 HydroNode::CrossSingleton {
668 left: Box::new(self.ir_node.into_inner()),
669 right: Box::new(other.ir_node.into_inner()),
670 metadata: self
671 .location
672 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
673 },
674 )
675 }
676
677 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
678 ///
679 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
680 /// leader of a cluster.
681 ///
682 /// # Example
683 /// ```rust
684 /// # #[cfg(feature = "deploy")] {
685 /// # use hydro_lang::prelude::*;
686 /// # use futures::StreamExt;
687 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
688 /// let tick = process.tick();
689 /// // ticks are lazy by default, forces the second tick to run
690 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
691 ///
692 /// let batch_first_tick = process
693 /// .source_iter(q!(vec![1, 2, 3, 4]))
694 /// .batch(&tick, nondet!(/** test */));
695 /// let batch_second_tick = process
696 /// .source_iter(q!(vec![5, 6, 7, 8]))
697 /// .batch(&tick, nondet!(/** test */))
698 /// .defer_tick(); // appears on the second tick
699 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
700 /// batch_first_tick.chain(batch_second_tick)
701 /// .filter_if_some(some_on_first_tick)
702 /// .all_ticks()
703 /// # }, |mut stream| async move {
704 /// // [1, 2, 3, 4]
705 /// # for w in vec![1, 2, 3, 4] {
706 /// # assert_eq!(stream.next().await.unwrap(), w);
707 /// # }
708 /// # }));
709 /// # }
710 /// ```
711 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
712 self.cross_singleton(signal.map(q!(|_u| ())))
713 .map(q!(|(d, _signal)| d))
714 }
715
716 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
717 ///
718 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
719 /// some local state.
720 ///
721 /// # Example
722 /// ```rust
723 /// # #[cfg(feature = "deploy")] {
724 /// # use hydro_lang::prelude::*;
725 /// # use futures::StreamExt;
726 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
727 /// let tick = process.tick();
728 /// // ticks are lazy by default, forces the second tick to run
729 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
730 ///
731 /// let batch_first_tick = process
732 /// .source_iter(q!(vec![1, 2, 3, 4]))
733 /// .batch(&tick, nondet!(/** test */));
734 /// let batch_second_tick = process
735 /// .source_iter(q!(vec![5, 6, 7, 8]))
736 /// .batch(&tick, nondet!(/** test */))
737 /// .defer_tick(); // appears on the second tick
738 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
739 /// batch_first_tick.chain(batch_second_tick)
740 /// .filter_if_none(some_on_first_tick)
741 /// .all_ticks()
742 /// # }, |mut stream| async move {
743 /// // [5, 6, 7, 8]
744 /// # for w in vec![5, 6, 7, 8] {
745 /// # assert_eq!(stream.next().await.unwrap(), w);
746 /// # }
747 /// # }));
748 /// # }
749 /// ```
750 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
751 self.filter_if_some(
752 other
753 .map(q!(|_| ()))
754 .into_singleton()
755 .filter(q!(|o| o.is_none())),
756 )
757 }
758
759 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
760 /// tupled pairs in a non-deterministic order.
761 ///
762 /// # Example
763 /// ```rust
764 /// # #[cfg(feature = "deploy")] {
765 /// # use hydro_lang::prelude::*;
766 /// # use std::collections::HashSet;
767 /// # use futures::StreamExt;
768 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769 /// let tick = process.tick();
770 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
771 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
772 /// stream1.cross_product(stream2)
773 /// # }, |mut stream| async move {
774 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
775 /// # stream.map(|i| assert!(expected.contains(&i)));
776 /// # }));
777 /// # }
778 /// ```
779 pub fn cross_product<T2, O2: Ordering>(
780 self,
781 other: Stream<T2, L, B, O2, R>,
782 ) -> Stream<(T, T2), L, B, NoOrder, R>
783 where
784 T: Clone,
785 T2: Clone,
786 {
787 check_matching_location(&self.location, &other.location);
788
789 Stream::new(
790 self.location.clone(),
791 HydroNode::CrossProduct {
792 left: Box::new(self.ir_node.into_inner()),
793 right: Box::new(other.ir_node.into_inner()),
794 metadata: self
795 .location
796 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
797 },
798 )
799 }
800
801 /// Takes one stream as input and filters out any duplicate occurrences. The output
802 /// contains all unique values from the input.
803 ///
804 /// # Example
805 /// ```rust
806 /// # #[cfg(feature = "deploy")] {
807 /// # use hydro_lang::prelude::*;
808 /// # use futures::StreamExt;
809 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
810 /// let tick = process.tick();
811 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
812 /// # }, |mut stream| async move {
813 /// # for w in vec![1, 2, 3, 4] {
814 /// # assert_eq!(stream.next().await.unwrap(), w);
815 /// # }
816 /// # }));
817 /// # }
818 /// ```
819 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
820 where
821 T: Eq + Hash,
822 {
823 Stream::new(
824 self.location.clone(),
825 HydroNode::Unique {
826 input: Box::new(self.ir_node.into_inner()),
827 metadata: self
828 .location
829 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
830 },
831 )
832 }
833
834 /// Outputs everything in this stream that is *not* contained in the `other` stream.
835 ///
836 /// The `other` stream must be [`Bounded`], since this function will wait until
837 /// all its elements are available before producing any output.
838 /// # Example
839 /// ```rust
840 /// # #[cfg(feature = "deploy")] {
841 /// # use hydro_lang::prelude::*;
842 /// # use futures::StreamExt;
843 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
844 /// let tick = process.tick();
845 /// let stream = process
846 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
847 /// .batch(&tick, nondet!(/** test */));
848 /// let batch = process
849 /// .source_iter(q!(vec![1, 2]))
850 /// .batch(&tick, nondet!(/** test */));
851 /// stream.filter_not_in(batch).all_ticks()
852 /// # }, |mut stream| async move {
853 /// # for w in vec![3, 4] {
854 /// # assert_eq!(stream.next().await.unwrap(), w);
855 /// # }
856 /// # }));
857 /// # }
858 /// ```
859 pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
860 where
861 T: Eq + Hash,
862 {
863 check_matching_location(&self.location, &other.location);
864
865 Stream::new(
866 self.location.clone(),
867 HydroNode::Difference {
868 pos: Box::new(self.ir_node.into_inner()),
869 neg: Box::new(other.ir_node.into_inner()),
870 metadata: self
871 .location
872 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
873 },
874 )
875 }
876
877 /// An operator which allows you to "inspect" each element of a stream without
878 /// modifying it. The closure `f` is called on a reference to each item. This is
879 /// mainly useful for debugging, and should not be used to generate side-effects.
880 ///
881 /// # Example
882 /// ```rust
883 /// # #[cfg(feature = "deploy")] {
884 /// # use hydro_lang::prelude::*;
885 /// # use futures::StreamExt;
886 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
887 /// let nums = process.source_iter(q!(vec![1, 2]));
888 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
889 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
890 /// # }, |mut stream| async move {
891 /// # for w in vec![1, 2] {
892 /// # assert_eq!(stream.next().await.unwrap(), w);
893 /// # }
894 /// # }));
895 /// # }
896 /// ```
897 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
898 where
899 F: Fn(&T) + 'a,
900 {
901 let f = f.splice_fn1_borrow_ctx(&self.location).into();
902
903 Stream::new(
904 self.location.clone(),
905 HydroNode::Inspect {
906 f,
907 input: Box::new(self.ir_node.into_inner()),
908 metadata: self.location.new_node_metadata(Self::collection_kind()),
909 },
910 )
911 }
912
913 /// An operator which allows you to "name" a `HydroNode`.
914 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
915 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
916 {
917 let mut node = self.ir_node.borrow_mut();
918 let metadata = node.metadata_mut();
919 metadata.tag = Some(name.to_string());
920 }
921 self
922 }
923
924 /// Explicitly "casts" the stream to a type with a different ordering
925 /// guarantee. Useful in unsafe code where the ordering cannot be proven
926 /// by the type-system.
927 ///
928 /// # Non-Determinism
929 /// This function is used as an escape hatch, and any mistakes in the
930 /// provided ordering guarantee will propagate into the guarantees
931 /// for the rest of the program.
932 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
933 if O::ORDERING_KIND == O2::ORDERING_KIND {
934 Stream::new(self.location, self.ir_node.into_inner())
935 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
936 // We can always weaken the ordering guarantee
937 Stream::new(
938 self.location.clone(),
939 HydroNode::Cast {
940 inner: Box::new(self.ir_node.into_inner()),
941 metadata: self
942 .location
943 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
944 },
945 )
946 } else {
947 Stream::new(
948 self.location.clone(),
949 HydroNode::ObserveNonDet {
950 inner: Box::new(self.ir_node.into_inner()),
951 trusted: false,
952 metadata: self
953 .location
954 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
955 },
956 )
957 }
958 }
959
960 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
961 // is not observable
962 fn assume_ordering_trusted<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
963 if O::ORDERING_KIND == O2::ORDERING_KIND {
964 Stream::new(self.location, self.ir_node.into_inner())
965 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
966 // We can always weaken the ordering guarantee
967 Stream::new(
968 self.location.clone(),
969 HydroNode::Cast {
970 inner: Box::new(self.ir_node.into_inner()),
971 metadata: self
972 .location
973 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
974 },
975 )
976 } else {
977 Stream::new(
978 self.location.clone(),
979 HydroNode::ObserveNonDet {
980 inner: Box::new(self.ir_node.into_inner()),
981 trusted: true,
982 metadata: self
983 .location
984 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
985 },
986 )
987 }
988 }
989
990 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
991 /// which is always safe because that is the weakest possible guarantee.
992 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
993 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
994 self.assume_ordering::<NoOrder>(nondet)
995 }
996
997 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
998 /// enforcing that `O2` is weaker than the input ordering guarantee.
999 pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
1000 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1001 self.assume_ordering::<O2>(nondet)
1002 }
1003
1004 /// Explicitly "casts" the stream to a type with a different retries
1005 /// guarantee. Useful in unsafe code where the lack of retries cannot
1006 /// be proven by the type-system.
1007 ///
1008 /// # Non-Determinism
1009 /// This function is used as an escape hatch, and any mistakes in the
1010 /// provided retries guarantee will propagate into the guarantees
1011 /// for the rest of the program.
1012 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1013 if R::RETRIES_KIND == R2::RETRIES_KIND {
1014 Stream::new(self.location, self.ir_node.into_inner())
1015 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1016 // We can always weaken the retries guarantee
1017 Stream::new(
1018 self.location.clone(),
1019 HydroNode::Cast {
1020 inner: Box::new(self.ir_node.into_inner()),
1021 metadata: self
1022 .location
1023 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1024 },
1025 )
1026 } else {
1027 Stream::new(
1028 self.location.clone(),
1029 HydroNode::ObserveNonDet {
1030 inner: Box::new(self.ir_node.into_inner()),
1031 trusted: false,
1032 metadata: self
1033 .location
1034 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1035 },
1036 )
1037 }
1038 }
1039
1040 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1041 // is not observable
1042 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1043 if R::RETRIES_KIND == R2::RETRIES_KIND {
1044 Stream::new(self.location, self.ir_node.into_inner())
1045 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1046 // We can always weaken the retries guarantee
1047 Stream::new(
1048 self.location.clone(),
1049 HydroNode::Cast {
1050 inner: Box::new(self.ir_node.into_inner()),
1051 metadata: self
1052 .location
1053 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1054 },
1055 )
1056 } else {
1057 Stream::new(
1058 self.location.clone(),
1059 HydroNode::ObserveNonDet {
1060 inner: Box::new(self.ir_node.into_inner()),
1061 trusted: true,
1062 metadata: self
1063 .location
1064 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1065 },
1066 )
1067 }
1068 }
1069
1070 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1071 /// which is always safe because that is the weakest possible guarantee.
1072 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1073 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1074 self.assume_retries::<AtLeastOnce>(nondet)
1075 }
1076
1077 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1078 /// enforcing that `R2` is weaker than the input retries guarantee.
1079 pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
1080 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1081 self.assume_retries::<R2>(nondet)
1082 }
1083}
1084
1085impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1086where
1087 L: Location<'a>,
1088{
1089 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
1090 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
1091 pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
1092 self.assume_retries(
1093 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
1094 )
1095 }
1096}
1097
1098impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1099where
1100 L: Location<'a>,
1101{
1102 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1103 ///
1104 /// # Example
1105 /// ```rust
1106 /// # #[cfg(feature = "deploy")] {
1107 /// # use hydro_lang::prelude::*;
1108 /// # use futures::StreamExt;
1109 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1110 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1111 /// # }, |mut stream| async move {
1112 /// // 1, 2, 3
1113 /// # for w in vec![1, 2, 3] {
1114 /// # assert_eq!(stream.next().await.unwrap(), w);
1115 /// # }
1116 /// # }));
1117 /// # }
1118 /// ```
1119 pub fn cloned(self) -> Stream<T, L, B, O, R>
1120 where
1121 T: Clone,
1122 {
1123 self.map(q!(|d| d.clone()))
1124 }
1125}
1126
1127impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1128where
1129 L: Location<'a>,
1130{
1131 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1132 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1133 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1134 ///
1135 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1136 /// and there may be duplicates.
1137 ///
1138 /// # Example
1139 /// ```rust
1140 /// # #[cfg(feature = "deploy")] {
1141 /// # use hydro_lang::prelude::*;
1142 /// # use futures::StreamExt;
1143 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1144 /// let tick = process.tick();
1145 /// let bools = process.source_iter(q!(vec![false, true, false]));
1146 /// let batch = bools.batch(&tick, nondet!(/** test */));
1147 /// batch
1148 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1149 /// .all_ticks()
1150 /// # }, |mut stream| async move {
1151 /// // true
1152 /// # assert_eq!(stream.next().await.unwrap(), true);
1153 /// # }));
1154 /// # }
1155 /// ```
1156 pub fn fold_commutative_idempotent<A, I, F>(
1157 self,
1158 init: impl IntoQuotedMut<'a, I, L>,
1159 comb: impl IntoQuotedMut<'a, F, L>,
1160 ) -> Singleton<A, L, B>
1161 where
1162 I: Fn() -> A + 'a,
1163 F: Fn(&mut A, T),
1164 {
1165 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1166 self.assume_ordering(nondet)
1167 .assume_retries(nondet)
1168 .fold(init, comb)
1169 }
1170
1171 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1172 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1173 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1174 /// reference, so that it can be modified in place.
1175 ///
1176 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
1177 /// and there may be duplicates.
1178 ///
1179 /// # Example
1180 /// ```rust
1181 /// # #[cfg(feature = "deploy")] {
1182 /// # use hydro_lang::prelude::*;
1183 /// # use futures::StreamExt;
1184 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1185 /// let tick = process.tick();
1186 /// let bools = process.source_iter(q!(vec![false, true, false]));
1187 /// let batch = bools.batch(&tick, nondet!(/** test */));
1188 /// batch
1189 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1190 /// .all_ticks()
1191 /// # }, |mut stream| async move {
1192 /// // true
1193 /// # assert_eq!(stream.next().await.unwrap(), true);
1194 /// # }));
1195 /// # }
1196 /// ```
1197 pub fn reduce_commutative_idempotent<F>(
1198 self,
1199 comb: impl IntoQuotedMut<'a, F, L>,
1200 ) -> Optional<T, L, B>
1201 where
1202 F: Fn(&mut T, T) + 'a,
1203 {
1204 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1205 self.assume_retries(nondet).reduce_commutative(comb)
1206 }
1207
1208 // only for internal APIs that have been carefully vetted, will eventually be removed once we
1209 // have algebraic verification of these properties
1210 fn reduce_commutative_idempotent_trusted<F>(
1211 self,
1212 comb: impl IntoQuotedMut<'a, F, L>,
1213 ) -> Optional<T, L, B>
1214 where
1215 F: Fn(&mut T, T) + 'a,
1216 {
1217 self.assume_retries_trusted(nondet!(/** because the closure is trusted idempotent, retries don't affect intermediate states */))
1218 .reduce_commutative_trusted(comb)
1219 }
1220
1221 /// Computes the maximum element in the stream as an [`Optional`], which
1222 /// will be empty until the first element in the input arrives.
1223 ///
1224 /// # Example
1225 /// ```rust
1226 /// # #[cfg(feature = "deploy")] {
1227 /// # use hydro_lang::prelude::*;
1228 /// # use futures::StreamExt;
1229 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1230 /// let tick = process.tick();
1231 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1232 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1233 /// batch.max().all_ticks()
1234 /// # }, |mut stream| async move {
1235 /// // 4
1236 /// # assert_eq!(stream.next().await.unwrap(), 4);
1237 /// # }));
1238 /// # }
1239 /// ```
1240 pub fn max(self) -> Optional<T, L, B>
1241 where
1242 T: Ord,
1243 {
1244 self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1245 if new > *curr {
1246 *curr = new;
1247 }
1248 }))
1249 }
1250
1251 /// Computes the minimum element in the stream as an [`Optional`], which
1252 /// will be empty until the first element in the input arrives.
1253 ///
1254 /// # Example
1255 /// ```rust
1256 /// # #[cfg(feature = "deploy")] {
1257 /// # use hydro_lang::prelude::*;
1258 /// # use futures::StreamExt;
1259 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1260 /// let tick = process.tick();
1261 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1262 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1263 /// batch.min().all_ticks()
1264 /// # }, |mut stream| async move {
1265 /// // 1
1266 /// # assert_eq!(stream.next().await.unwrap(), 1);
1267 /// # }));
1268 /// # }
1269 /// ```
1270 pub fn min(self) -> Optional<T, L, B>
1271 where
1272 T: Ord,
1273 {
1274 self.reduce_commutative_idempotent_trusted(q!(|curr, new| {
1275 if new < *curr {
1276 *curr = new;
1277 }
1278 }))
1279 }
1280}
1281
1282impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1283where
1284 L: Location<'a>,
1285{
1286 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1287 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1288 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1289 ///
1290 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1291 ///
1292 /// # Example
1293 /// ```rust
1294 /// # #[cfg(feature = "deploy")] {
1295 /// # use hydro_lang::prelude::*;
1296 /// # use futures::StreamExt;
1297 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1298 /// let tick = process.tick();
1299 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1300 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1301 /// batch
1302 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1303 /// .all_ticks()
1304 /// # }, |mut stream| async move {
1305 /// // 10
1306 /// # assert_eq!(stream.next().await.unwrap(), 10);
1307 /// # }));
1308 /// # }
1309 /// ```
1310 pub fn fold_commutative<A, I, F>(
1311 self,
1312 init: impl IntoQuotedMut<'a, I, L>,
1313 comb: impl IntoQuotedMut<'a, F, L>,
1314 ) -> Singleton<A, L, B>
1315 where
1316 I: Fn() -> A + 'a,
1317 F: Fn(&mut A, T),
1318 {
1319 let nondet = nondet!(/** the combinator function is commutative */);
1320 self.assume_ordering(nondet).fold(init, comb)
1321 }
1322
1323 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1324 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1325 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1326 /// reference, so that it can be modified in place.
1327 ///
1328 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1329 ///
1330 /// # Example
1331 /// ```rust
1332 /// # #[cfg(feature = "deploy")] {
1333 /// # use hydro_lang::prelude::*;
1334 /// # use futures::StreamExt;
1335 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1336 /// let tick = process.tick();
1337 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1338 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1339 /// batch
1340 /// .reduce_commutative(q!(|curr, new| *curr += new))
1341 /// .all_ticks()
1342 /// # }, |mut stream| async move {
1343 /// // 10
1344 /// # assert_eq!(stream.next().await.unwrap(), 10);
1345 /// # }));
1346 /// # }
1347 /// ```
1348 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1349 where
1350 F: Fn(&mut T, T) + 'a,
1351 {
1352 let nondet = nondet!(/** the combinator function is commutative */);
1353 self.assume_ordering(nondet).reduce(comb)
1354 }
1355
1356 fn reduce_commutative_trusted<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1357 where
1358 F: Fn(&mut T, T) + 'a,
1359 {
1360 let ordered = if B::BOUNDED {
1361 self.assume_ordering_trusted(nondet!(/** if bounded, there are no intermediate states and output is deterministic because trusted commutative */))
1362 } else {
1363 self.assume_ordering(nondet!(/** if unbounded, ordering affects intermediate states */))
1364 };
1365
1366 ordered.reduce(comb)
1367 }
1368
1369 /// Computes the number of elements in the stream as a [`Singleton`].
1370 ///
1371 /// # Example
1372 /// ```rust
1373 /// # #[cfg(feature = "deploy")] {
1374 /// # use hydro_lang::prelude::*;
1375 /// # use futures::StreamExt;
1376 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1377 /// let tick = process.tick();
1378 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1379 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1380 /// batch.count().all_ticks()
1381 /// # }, |mut stream| async move {
1382 /// // 4
1383 /// # assert_eq!(stream.next().await.unwrap(), 4);
1384 /// # }));
1385 /// # }
1386 /// ```
1387 pub fn count(self) -> Singleton<usize, L, B> {
1388 self.assume_ordering_trusted(nondet!(
1389 /// Order does not affect eventual count, and also does not affect intermediate states.
1390 ))
1391 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1392 }
1393}
1394
1395impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1396where
1397 L: Location<'a>,
1398{
1399 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1400 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1401 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1402 ///
1403 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1404 ///
1405 /// # Example
1406 /// ```rust
1407 /// # #[cfg(feature = "deploy")] {
1408 /// # use hydro_lang::prelude::*;
1409 /// # use futures::StreamExt;
1410 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1411 /// let tick = process.tick();
1412 /// let bools = process.source_iter(q!(vec![false, true, false]));
1413 /// let batch = bools.batch(&tick, nondet!(/** test */));
1414 /// batch
1415 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1416 /// .all_ticks()
1417 /// # }, |mut stream| async move {
1418 /// // true
1419 /// # assert_eq!(stream.next().await.unwrap(), true);
1420 /// # }));
1421 /// # }
1422 /// ```
1423 pub fn fold_idempotent<A, I, F>(
1424 self,
1425 init: impl IntoQuotedMut<'a, I, L>,
1426 comb: impl IntoQuotedMut<'a, F, L>,
1427 ) -> Singleton<A, L, B>
1428 where
1429 I: Fn() -> A + 'a,
1430 F: Fn(&mut A, T),
1431 {
1432 let nondet = nondet!(/** the combinator function is idempotent */);
1433 self.assume_retries(nondet).fold(init, comb)
1434 }
1435
1436 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1437 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1438 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1439 /// reference, so that it can be modified in place.
1440 ///
1441 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1442 ///
1443 /// # Example
1444 /// ```rust
1445 /// # #[cfg(feature = "deploy")] {
1446 /// # use hydro_lang::prelude::*;
1447 /// # use futures::StreamExt;
1448 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1449 /// let tick = process.tick();
1450 /// let bools = process.source_iter(q!(vec![false, true, false]));
1451 /// let batch = bools.batch(&tick, nondet!(/** test */));
1452 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1453 /// # }, |mut stream| async move {
1454 /// // true
1455 /// # assert_eq!(stream.next().await.unwrap(), true);
1456 /// # }));
1457 /// # }
1458 /// ```
1459 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1460 where
1461 F: Fn(&mut T, T) + 'a,
1462 {
1463 let nondet = nondet!(/** the combinator function is idempotent */);
1464 self.assume_retries(nondet).reduce(comb)
1465 }
1466
1467 /// Computes the first element in the stream as an [`Optional`], which
1468 /// will be empty until the first element in the input arrives.
1469 ///
1470 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1471 /// re-ordering of elements may cause the first element to change.
1472 ///
1473 /// # Example
1474 /// ```rust
1475 /// # #[cfg(feature = "deploy")] {
1476 /// # use hydro_lang::prelude::*;
1477 /// # use futures::StreamExt;
1478 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1479 /// let tick = process.tick();
1480 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1481 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1482 /// batch.first().all_ticks()
1483 /// # }, |mut stream| async move {
1484 /// // 1
1485 /// # assert_eq!(stream.next().await.unwrap(), 1);
1486 /// # }));
1487 /// # }
1488 /// ```
1489 pub fn first(self) -> Optional<T, L, B> {
1490 self.reduce_idempotent(q!(|_, _| {}))
1491 }
1492
1493 /// Computes the last element in the stream as an [`Optional`], which
1494 /// will be empty until an element in the input arrives.
1495 ///
1496 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1497 /// re-ordering of elements may cause the last element to change.
1498 ///
1499 /// # Example
1500 /// ```rust
1501 /// # #[cfg(feature = "deploy")] {
1502 /// # use hydro_lang::prelude::*;
1503 /// # use futures::StreamExt;
1504 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1505 /// let tick = process.tick();
1506 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1507 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1508 /// batch.last().all_ticks()
1509 /// # }, |mut stream| async move {
1510 /// // 4
1511 /// # assert_eq!(stream.next().await.unwrap(), 4);
1512 /// # }));
1513 /// # }
1514 /// ```
1515 pub fn last(self) -> Optional<T, L, B> {
1516 self.reduce_idempotent(q!(|curr, new| *curr = new))
1517 }
1518}
1519
1520impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1521where
1522 L: Location<'a>,
1523{
1524 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1525 ///
1526 /// # Example
1527 /// ```rust
1528 /// # #[cfg(feature = "deploy")] {
1529 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1530 /// # use futures::StreamExt;
1531 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1532 /// let tick = process.tick();
1533 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1534 /// numbers.enumerate()
1535 /// # }, |mut stream| async move {
1536 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1537 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1538 /// # assert_eq!(stream.next().await.unwrap(), w);
1539 /// # }
1540 /// # }));
1541 /// # }
1542 /// ```
1543 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1544 Stream::new(
1545 self.location.clone(),
1546 HydroNode::Enumerate {
1547 input: Box::new(self.ir_node.into_inner()),
1548 metadata: self.location.new_node_metadata(Stream::<
1549 (usize, T),
1550 L,
1551 B,
1552 TotalOrder,
1553 ExactlyOnce,
1554 >::collection_kind()),
1555 },
1556 )
1557 }
1558
1559 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1560 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1561 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1562 ///
1563 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1564 /// to depend on the order of elements in the stream.
1565 ///
1566 /// # Example
1567 /// ```rust
1568 /// # #[cfg(feature = "deploy")] {
1569 /// # use hydro_lang::prelude::*;
1570 /// # use futures::StreamExt;
1571 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1572 /// let tick = process.tick();
1573 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1574 /// let batch = words.batch(&tick, nondet!(/** test */));
1575 /// batch
1576 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1577 /// .all_ticks()
1578 /// # }, |mut stream| async move {
1579 /// // "HELLOWORLD"
1580 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1581 /// # }));
1582 /// # }
1583 /// ```
1584 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1585 self,
1586 init: impl IntoQuotedMut<'a, I, L>,
1587 comb: impl IntoQuotedMut<'a, F, L>,
1588 ) -> Singleton<A, L, B> {
1589 let init = init.splice_fn0_ctx(&self.location).into();
1590 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1591
1592 let core = HydroNode::Fold {
1593 init,
1594 acc: comb,
1595 input: Box::new(self.ir_node.into_inner()),
1596 metadata: self
1597 .location
1598 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1599 };
1600
1601 Singleton::new(self.location, core)
1602 }
1603
1604 /// Collects all the elements of this stream into a single [`Vec`] element.
1605 ///
1606 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1607 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1608 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1609 /// the vector at an arbitrary point in time.
1610 ///
1611 /// # Example
1612 /// ```rust
1613 /// # #[cfg(feature = "deploy")] {
1614 /// # use hydro_lang::prelude::*;
1615 /// # use futures::StreamExt;
1616 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1617 /// let tick = process.tick();
1618 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1619 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1620 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1621 /// # }, |mut stream| async move {
1622 /// // [ vec![1, 2, 3, 4] ]
1623 /// # for w in vec![vec![1, 2, 3, 4]] {
1624 /// # assert_eq!(stream.next().await.unwrap(), w);
1625 /// # }
1626 /// # }));
1627 /// # }
1628 /// ```
1629 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1630 self.fold(
1631 q!(|| vec![]),
1632 q!(|acc, v| {
1633 acc.push(v);
1634 }),
1635 )
1636 }
1637
1638 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1639 /// and emitting each intermediate result.
1640 ///
1641 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1642 /// containing all intermediate accumulated values. The scan operation can also terminate early
1643 /// by returning `None`.
1644 ///
1645 /// The function takes a mutable reference to the accumulator and the current element, and returns
1646 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1647 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1648 ///
1649 /// # Examples
1650 ///
1651 /// Basic usage - running sum:
1652 /// ```rust
1653 /// # #[cfg(feature = "deploy")] {
1654 /// # use hydro_lang::prelude::*;
1655 /// # use futures::StreamExt;
1656 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1657 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1658 /// q!(|| 0),
1659 /// q!(|acc, x| {
1660 /// *acc += x;
1661 /// Some(*acc)
1662 /// }),
1663 /// )
1664 /// # }, |mut stream| async move {
1665 /// // Output: 1, 3, 6, 10
1666 /// # for w in vec![1, 3, 6, 10] {
1667 /// # assert_eq!(stream.next().await.unwrap(), w);
1668 /// # }
1669 /// # }));
1670 /// # }
1671 /// ```
1672 ///
1673 /// Early termination example:
1674 /// ```rust
1675 /// # #[cfg(feature = "deploy")] {
1676 /// # use hydro_lang::prelude::*;
1677 /// # use futures::StreamExt;
1678 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1679 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1680 /// q!(|| 1),
1681 /// q!(|state, x| {
1682 /// *state = *state * x;
1683 /// if *state > 6 {
1684 /// None // Terminate the stream
1685 /// } else {
1686 /// Some(-*state)
1687 /// }
1688 /// }),
1689 /// )
1690 /// # }, |mut stream| async move {
1691 /// // Output: -1, -2, -6
1692 /// # for w in vec![-1, -2, -6] {
1693 /// # assert_eq!(stream.next().await.unwrap(), w);
1694 /// # }
1695 /// # }));
1696 /// # }
1697 /// ```
1698 pub fn scan<A, U, I, F>(
1699 self,
1700 init: impl IntoQuotedMut<'a, I, L>,
1701 f: impl IntoQuotedMut<'a, F, L>,
1702 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1703 where
1704 I: Fn() -> A + 'a,
1705 F: Fn(&mut A, T) -> Option<U> + 'a,
1706 {
1707 let init = init.splice_fn0_ctx(&self.location).into();
1708 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1709
1710 Stream::new(
1711 self.location.clone(),
1712 HydroNode::Scan {
1713 init,
1714 acc: f,
1715 input: Box::new(self.ir_node.into_inner()),
1716 metadata: self.location.new_node_metadata(
1717 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1718 ),
1719 },
1720 )
1721 }
1722
1723 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1724 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1725 /// until the first element in the input arrives.
1726 ///
1727 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1728 /// to depend on the order of elements in the stream.
1729 ///
1730 /// # Example
1731 /// ```rust
1732 /// # #[cfg(feature = "deploy")] {
1733 /// # use hydro_lang::prelude::*;
1734 /// # use futures::StreamExt;
1735 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1736 /// let tick = process.tick();
1737 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1738 /// let batch = words.batch(&tick, nondet!(/** test */));
1739 /// batch
1740 /// .map(q!(|x| x.to_string()))
1741 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1742 /// .all_ticks()
1743 /// # }, |mut stream| async move {
1744 /// // "HELLOWORLD"
1745 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1746 /// # }));
1747 /// # }
1748 /// ```
1749 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1750 self,
1751 comb: impl IntoQuotedMut<'a, F, L>,
1752 ) -> Optional<T, L, B> {
1753 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1754 let core = HydroNode::Reduce {
1755 f,
1756 input: Box::new(self.ir_node.into_inner()),
1757 metadata: self
1758 .location
1759 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1760 };
1761
1762 Optional::new(self.location, core)
1763 }
1764}
1765
1766impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1767 /// Produces a new stream that interleaves the elements of the two input streams.
1768 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1769 ///
1770 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1771 /// [`Bounded`], you can use [`Stream::chain`] instead.
1772 ///
1773 /// # Example
1774 /// ```rust
1775 /// # #[cfg(feature = "deploy")] {
1776 /// # use hydro_lang::prelude::*;
1777 /// # use futures::StreamExt;
1778 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1779 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1780 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1781 /// # }, |mut stream| async move {
1782 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1783 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1784 /// # assert_eq!(stream.next().await.unwrap(), w);
1785 /// # }
1786 /// # }));
1787 /// # }
1788 /// ```
1789 pub fn interleave<O2: Ordering, R2: Retries>(
1790 self,
1791 other: Stream<T, L, Unbounded, O2, R2>,
1792 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1793 where
1794 R: MinRetries<R2>,
1795 {
1796 Stream::new(
1797 self.location.clone(),
1798 HydroNode::Chain {
1799 first: Box::new(self.ir_node.into_inner()),
1800 second: Box::new(other.ir_node.into_inner()),
1801 metadata: self.location.new_node_metadata(Stream::<
1802 T,
1803 L,
1804 Unbounded,
1805 NoOrder,
1806 <R as MinRetries<R2>>::Min,
1807 >::collection_kind()),
1808 },
1809 )
1810 }
1811}
1812
1813impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1814 /// Produces a new stream that combines the elements of the two input streams,
1815 /// preserving the relative order of elements within each input.
1816 ///
1817 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1818 /// [`Bounded`], you can use [`Stream::chain`] instead.
1819 ///
1820 /// # Non-Determinism
1821 /// The order in which elements *across* the two streams will be interleaved is
1822 /// non-deterministic, so the order of elements will vary across runs. If the output order
1823 /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1824 /// unordered stream.
1825 ///
1826 /// # Example
1827 /// ```rust
1828 /// # #[cfg(feature = "deploy")] {
1829 /// # use hydro_lang::prelude::*;
1830 /// # use futures::StreamExt;
1831 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1832 /// let numbers = process.source_iter(q!(vec![1, 3]));
1833 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1834 /// # }, |mut stream| async move {
1835 /// // 1, 3 and 2, 4 in some order, preserving the original local order
1836 /// # for w in vec![1, 3, 2, 4] {
1837 /// # assert_eq!(stream.next().await.unwrap(), w);
1838 /// # }
1839 /// # }));
1840 /// # }
1841 /// ```
1842 pub fn merge_ordered<R2: Retries>(
1843 self,
1844 other: Stream<T, L, Unbounded, TotalOrder, R2>,
1845 _nondet: NonDet,
1846 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1847 where
1848 R: MinRetries<R2>,
1849 {
1850 Stream::new(
1851 self.location.clone(),
1852 HydroNode::Chain {
1853 first: Box::new(self.ir_node.into_inner()),
1854 second: Box::new(other.ir_node.into_inner()),
1855 metadata: self.location.new_node_metadata(Stream::<
1856 T,
1857 L,
1858 Unbounded,
1859 TotalOrder,
1860 <R as MinRetries<R2>>::Min,
1861 >::collection_kind()),
1862 },
1863 )
1864 }
1865}
1866
1867impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1868where
1869 L: Location<'a>,
1870{
1871 /// Produces a new stream that emits the input elements in sorted order.
1872 ///
1873 /// The input stream can have any ordering guarantee, but the output stream
1874 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1875 /// elements in the input stream are available, so it requires the input stream
1876 /// to be [`Bounded`].
1877 ///
1878 /// # Example
1879 /// ```rust
1880 /// # #[cfg(feature = "deploy")] {
1881 /// # use hydro_lang::prelude::*;
1882 /// # use futures::StreamExt;
1883 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1884 /// let tick = process.tick();
1885 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1886 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1887 /// batch.sort().all_ticks()
1888 /// # }, |mut stream| async move {
1889 /// // 1, 2, 3, 4
1890 /// # for w in (1..5) {
1891 /// # assert_eq!(stream.next().await.unwrap(), w);
1892 /// # }
1893 /// # }));
1894 /// # }
1895 /// ```
1896 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1897 where
1898 T: Ord,
1899 {
1900 Stream::new(
1901 self.location.clone(),
1902 HydroNode::Sort {
1903 input: Box::new(self.ir_node.into_inner()),
1904 metadata: self
1905 .location
1906 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1907 },
1908 )
1909 }
1910
1911 /// Produces a new stream that first emits the elements of the `self` stream,
1912 /// and then emits the elements of the `other` stream. The output stream has
1913 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1914 /// [`TotalOrder`] guarantee.
1915 ///
1916 /// Currently, both input streams must be [`Bounded`]. This operator will block
1917 /// on the first stream until all its elements are available. In a future version,
1918 /// we will relax the requirement on the `other` stream.
1919 ///
1920 /// # Example
1921 /// ```rust
1922 /// # #[cfg(feature = "deploy")] {
1923 /// # use hydro_lang::prelude::*;
1924 /// # use futures::StreamExt;
1925 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1926 /// let tick = process.tick();
1927 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1928 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1929 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1930 /// # }, |mut stream| async move {
1931 /// // 2, 3, 4, 5, 1, 2, 3, 4
1932 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1933 /// # assert_eq!(stream.next().await.unwrap(), w);
1934 /// # }
1935 /// # }));
1936 /// # }
1937 /// ```
1938 pub fn chain<O2: Ordering, R2: Retries>(
1939 self,
1940 other: Stream<T, L, Bounded, O2, R2>,
1941 ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1942 where
1943 O: MinOrder<O2>,
1944 R: MinRetries<R2>,
1945 {
1946 check_matching_location(&self.location, &other.location);
1947
1948 Stream::new(
1949 self.location.clone(),
1950 HydroNode::Chain {
1951 first: Box::new(self.ir_node.into_inner()),
1952 second: Box::new(other.ir_node.into_inner()),
1953 metadata: self.location.new_node_metadata(Stream::<
1954 T,
1955 L,
1956 Bounded,
1957 <O as MinOrder<O2>>::Min,
1958 <R as MinRetries<R2>>::Min,
1959 >::collection_kind()),
1960 },
1961 )
1962 }
1963
1964 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1965 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1966 /// because this is compiled into a nested loop.
1967 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1968 self,
1969 other: Stream<T2, L, Bounded, O2, R>,
1970 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1971 where
1972 T: Clone,
1973 T2: Clone,
1974 {
1975 check_matching_location(&self.location, &other.location);
1976
1977 Stream::new(
1978 self.location.clone(),
1979 HydroNode::CrossProduct {
1980 left: Box::new(self.ir_node.into_inner()),
1981 right: Box::new(other.ir_node.into_inner()),
1982 metadata: self.location.new_node_metadata(Stream::<
1983 (T, T2),
1984 L,
1985 Bounded,
1986 <O2 as MinOrder<O>>::Min,
1987 R,
1988 >::collection_kind()),
1989 },
1990 )
1991 }
1992
1993 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1994 /// `self` used as the values for *each* key.
1995 ///
1996 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1997 /// values. For example, it can be used to send the same set of elements to several cluster
1998 /// members, if the membership information is available as a [`KeyedSingleton`].
1999 ///
2000 /// # Example
2001 /// ```rust
2002 /// # #[cfg(feature = "deploy")] {
2003 /// # use hydro_lang::prelude::*;
2004 /// # use futures::StreamExt;
2005 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2006 /// # let tick = process.tick();
2007 /// let keyed_singleton = // { 1: (), 2: () }
2008 /// # process
2009 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2010 /// # .into_keyed()
2011 /// # .batch(&tick, nondet!(/** test */))
2012 /// # .first();
2013 /// let stream = // [ "a", "b" ]
2014 /// # process
2015 /// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
2016 /// # .batch(&tick, nondet!(/** test */));
2017 /// stream.repeat_with_keys(keyed_singleton)
2018 /// # .entries().all_ticks()
2019 /// # }, |mut stream| async move {
2020 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2021 /// # let mut results = Vec::new();
2022 /// # for _ in 0..4 {
2023 /// # results.push(stream.next().await.unwrap());
2024 /// # }
2025 /// # results.sort();
2026 /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
2027 /// # }));
2028 /// # }
2029 /// ```
2030 pub fn repeat_with_keys<K, V2>(
2031 self,
2032 keys: KeyedSingleton<K, V2, L, Bounded>,
2033 ) -> KeyedStream<K, T, L, Bounded, O, R>
2034 where
2035 K: Clone,
2036 T: Clone,
2037 {
2038 keys.keys()
2039 .weaken_retries()
2040 .assume_ordering_trusted::<TotalOrder>(
2041 nondet!(/** keyed stream does not depend on ordering of keys */),
2042 )
2043 .cross_product_nested_loop(self)
2044 .into_keyed()
2045 }
2046}
2047
2048impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2049where
2050 L: Location<'a>,
2051{
2052 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2053 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2054 /// by equi-joining the two streams on the key attribute `K`.
2055 ///
2056 /// # Example
2057 /// ```rust
2058 /// # #[cfg(feature = "deploy")] {
2059 /// # use hydro_lang::prelude::*;
2060 /// # use std::collections::HashSet;
2061 /// # use futures::StreamExt;
2062 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2063 /// let tick = process.tick();
2064 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2065 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2066 /// stream1.join(stream2)
2067 /// # }, |mut stream| async move {
2068 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2069 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2070 /// # stream.map(|i| assert!(expected.contains(&i)));
2071 /// # }));
2072 /// # }
2073 pub fn join<V2, O2: Ordering, R2: Retries>(
2074 self,
2075 n: Stream<(K, V2), L, B, O2, R2>,
2076 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2077 where
2078 K: Eq + Hash,
2079 R: MinRetries<R2>,
2080 {
2081 check_matching_location(&self.location, &n.location);
2082
2083 Stream::new(
2084 self.location.clone(),
2085 HydroNode::Join {
2086 left: Box::new(self.ir_node.into_inner()),
2087 right: Box::new(n.ir_node.into_inner()),
2088 metadata: self.location.new_node_metadata(Stream::<
2089 (K, (V1, V2)),
2090 L,
2091 B,
2092 NoOrder,
2093 <R as MinRetries<R2>>::Min,
2094 >::collection_kind()),
2095 },
2096 )
2097 }
2098
2099 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2100 /// computes the anti-join of the items in the input -- i.e. returns
2101 /// unique items in the first input that do not have a matching key
2102 /// in the second input.
2103 ///
2104 /// # Example
2105 /// ```rust
2106 /// # #[cfg(feature = "deploy")] {
2107 /// # use hydro_lang::prelude::*;
2108 /// # use futures::StreamExt;
2109 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2110 /// let tick = process.tick();
2111 /// let stream = process
2112 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2113 /// .batch(&tick, nondet!(/** test */));
2114 /// let batch = process
2115 /// .source_iter(q!(vec![1, 2]))
2116 /// .batch(&tick, nondet!(/** test */));
2117 /// stream.anti_join(batch).all_ticks()
2118 /// # }, |mut stream| async move {
2119 /// # for w in vec![(3, 'c'), (4, 'd')] {
2120 /// # assert_eq!(stream.next().await.unwrap(), w);
2121 /// # }
2122 /// # }));
2123 /// # }
2124 pub fn anti_join<O2: Ordering, R2: Retries>(
2125 self,
2126 n: Stream<K, L, Bounded, O2, R2>,
2127 ) -> Stream<(K, V1), L, B, O, R>
2128 where
2129 K: Eq + Hash,
2130 {
2131 check_matching_location(&self.location, &n.location);
2132
2133 Stream::new(
2134 self.location.clone(),
2135 HydroNode::AntiJoin {
2136 pos: Box::new(self.ir_node.into_inner()),
2137 neg: Box::new(n.ir_node.into_inner()),
2138 metadata: self
2139 .location
2140 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2141 },
2142 )
2143 }
2144}
2145
2146impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2147 Stream<(K, V), L, B, O, R>
2148{
2149 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2150 /// is used as the key and the second element is added to the entries associated with that key.
2151 ///
2152 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2153 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2154 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2155 /// total ordering _within_ each group but no ordering _across_ groups.
2156 ///
2157 /// # Example
2158 /// ```rust
2159 /// # #[cfg(feature = "deploy")] {
2160 /// # use hydro_lang::prelude::*;
2161 /// # use futures::StreamExt;
2162 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2163 /// process
2164 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2165 /// .into_keyed()
2166 /// # .entries()
2167 /// # }, |mut stream| async move {
2168 /// // { 1: [2, 3], 2: [4] }
2169 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2170 /// # assert_eq!(stream.next().await.unwrap(), w);
2171 /// # }
2172 /// # }));
2173 /// # }
2174 /// ```
2175 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2176 KeyedStream::new(
2177 self.location.clone(),
2178 HydroNode::Cast {
2179 inner: Box::new(self.ir_node.into_inner()),
2180 metadata: self
2181 .location
2182 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2183 },
2184 )
2185 }
2186}
2187
2188impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
2189where
2190 K: Eq + Hash,
2191 L: Location<'a>,
2192{
2193 #[deprecated = "use .into_keyed().fold(...) instead"]
2194 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2195 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2196 /// in the second element are accumulated via the `comb` closure.
2197 ///
2198 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2199 /// to depend on the order of elements in the stream.
2200 ///
2201 /// If the input and output value types are the same and do not require initialization then use
2202 /// [`Stream::reduce_keyed`].
2203 ///
2204 /// # Example
2205 /// ```rust
2206 /// # #[cfg(feature = "deploy")] {
2207 /// # use hydro_lang::prelude::*;
2208 /// # use futures::StreamExt;
2209 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2210 /// let tick = process.tick();
2211 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2212 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2213 /// batch
2214 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
2215 /// .all_ticks()
2216 /// # }, |mut stream| async move {
2217 /// // (1, 5), (2, 7)
2218 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2219 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2220 /// # }));
2221 /// # }
2222 /// ```
2223 pub fn fold_keyed<A, I, F>(
2224 self,
2225 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2226 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2227 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2228 where
2229 I: Fn() -> A + 'a,
2230 F: Fn(&mut A, V) + 'a,
2231 {
2232 self.into_keyed().fold(init, comb).entries()
2233 }
2234
2235 #[deprecated = "use .into_keyed().reduce(...) instead"]
2236 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2237 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2238 /// in the second element are accumulated via the `comb` closure.
2239 ///
2240 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
2241 /// to depend on the order of elements in the stream.
2242 ///
2243 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
2244 ///
2245 /// # Example
2246 /// ```rust
2247 /// # #[cfg(feature = "deploy")] {
2248 /// # use hydro_lang::prelude::*;
2249 /// # use futures::StreamExt;
2250 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2251 /// let tick = process.tick();
2252 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2253 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2254 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
2255 /// # }, |mut stream| async move {
2256 /// // (1, 5), (2, 7)
2257 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2258 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2259 /// # }));
2260 /// # }
2261 /// ```
2262 pub fn reduce_keyed<F>(
2263 self,
2264 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2265 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2266 where
2267 F: Fn(&mut V, V) + 'a,
2268 {
2269 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
2270
2271 Stream::new(
2272 self.location.clone(),
2273 HydroNode::ReduceKeyed {
2274 f,
2275 input: Box::new(self.ir_node.into_inner()),
2276 metadata: self.location.new_node_metadata(Stream::<
2277 (K, V),
2278 Tick<L>,
2279 Bounded,
2280 NoOrder,
2281 ExactlyOnce,
2282 >::collection_kind()),
2283 },
2284 )
2285 }
2286}
2287
2288impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2289where
2290 K: Eq + Hash,
2291 L: Location<'a>,
2292{
2293 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
2294 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2295 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2296 /// in the second element are accumulated via the `comb` closure.
2297 ///
2298 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2299 /// as there may be non-deterministic duplicates.
2300 ///
2301 /// If the input and output value types are the same and do not require initialization then use
2302 /// [`Stream::reduce_keyed_commutative_idempotent`].
2303 ///
2304 /// # Example
2305 /// ```rust
2306 /// # #[cfg(feature = "deploy")] {
2307 /// # use hydro_lang::prelude::*;
2308 /// # use futures::StreamExt;
2309 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2310 /// let tick = process.tick();
2311 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2312 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2313 /// batch
2314 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2315 /// .all_ticks()
2316 /// # }, |mut stream| async move {
2317 /// // (1, false), (2, true)
2318 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2319 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2320 /// # }));
2321 /// # }
2322 /// ```
2323 pub fn fold_keyed_commutative_idempotent<A, I, F>(
2324 self,
2325 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2326 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2327 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2328 where
2329 I: Fn() -> A + 'a,
2330 F: Fn(&mut A, V) + 'a,
2331 {
2332 self.into_keyed()
2333 .fold_commutative_idempotent(init, comb)
2334 .entries()
2335 }
2336
2337 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2338 /// # Example
2339 /// ```rust
2340 /// # #[cfg(feature = "deploy")] {
2341 /// # use hydro_lang::prelude::*;
2342 /// # use futures::StreamExt;
2343 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2344 /// let tick = process.tick();
2345 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2346 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2347 /// batch.keys().all_ticks()
2348 /// # }, |mut stream| async move {
2349 /// // 1, 2
2350 /// # assert_eq!(stream.next().await.unwrap(), 1);
2351 /// # assert_eq!(stream.next().await.unwrap(), 2);
2352 /// # }));
2353 /// # }
2354 /// ```
2355 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2356 self.into_keyed()
2357 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2358 .keys()
2359 }
2360
2361 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2362 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2363 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2364 /// in the second element are accumulated via the `comb` closure.
2365 ///
2366 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2367 /// as there may be non-deterministic duplicates.
2368 ///
2369 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2370 ///
2371 /// # Example
2372 /// ```rust
2373 /// # #[cfg(feature = "deploy")] {
2374 /// # use hydro_lang::prelude::*;
2375 /// # use futures::StreamExt;
2376 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2377 /// let tick = process.tick();
2378 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2379 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2380 /// batch
2381 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2382 /// .all_ticks()
2383 /// # }, |mut stream| async move {
2384 /// // (1, false), (2, true)
2385 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2386 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2387 /// # }));
2388 /// # }
2389 /// ```
2390 pub fn reduce_keyed_commutative_idempotent<F>(
2391 self,
2392 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2393 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2394 where
2395 F: Fn(&mut V, V) + 'a,
2396 {
2397 self.into_keyed()
2398 .reduce_commutative_idempotent(comb)
2399 .entries()
2400 }
2401}
2402
2403impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2404where
2405 K: Eq + Hash,
2406 L: Location<'a>,
2407{
2408 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2409 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2410 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2411 /// in the second element are accumulated via the `comb` closure.
2412 ///
2413 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2414 ///
2415 /// If the input and output value types are the same and do not require initialization then use
2416 /// [`Stream::reduce_keyed_commutative`].
2417 ///
2418 /// # Example
2419 /// ```rust
2420 /// # #[cfg(feature = "deploy")] {
2421 /// # use hydro_lang::prelude::*;
2422 /// # use futures::StreamExt;
2423 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2424 /// let tick = process.tick();
2425 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2426 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2427 /// batch
2428 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2429 /// .all_ticks()
2430 /// # }, |mut stream| async move {
2431 /// // (1, 5), (2, 7)
2432 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2433 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2434 /// # }));
2435 /// # }
2436 /// ```
2437 pub fn fold_keyed_commutative<A, I, F>(
2438 self,
2439 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2440 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2441 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2442 where
2443 I: Fn() -> A + 'a,
2444 F: Fn(&mut A, V) + 'a,
2445 {
2446 self.into_keyed().fold_commutative(init, comb).entries()
2447 }
2448
2449 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2450 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2451 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2452 /// in the second element are accumulated via the `comb` closure.
2453 ///
2454 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2455 ///
2456 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2457 ///
2458 /// # Example
2459 /// ```rust
2460 /// # #[cfg(feature = "deploy")] {
2461 /// # use hydro_lang::prelude::*;
2462 /// # use futures::StreamExt;
2463 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2464 /// let tick = process.tick();
2465 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2466 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2467 /// batch
2468 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2469 /// .all_ticks()
2470 /// # }, |mut stream| async move {
2471 /// // (1, 5), (2, 7)
2472 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2473 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2474 /// # }));
2475 /// # }
2476 /// ```
2477 pub fn reduce_keyed_commutative<F>(
2478 self,
2479 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2480 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2481 where
2482 F: Fn(&mut V, V) + 'a,
2483 {
2484 self.into_keyed().reduce_commutative(comb).entries()
2485 }
2486}
2487
2488impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2489where
2490 K: Eq + Hash,
2491 L: Location<'a>,
2492{
2493 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2494 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2495 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2496 /// in the second element are accumulated via the `comb` closure.
2497 ///
2498 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2499 ///
2500 /// If the input and output value types are the same and do not require initialization then use
2501 /// [`Stream::reduce_keyed_idempotent`].
2502 ///
2503 /// # Example
2504 /// ```rust
2505 /// # #[cfg(feature = "deploy")] {
2506 /// # use hydro_lang::prelude::*;
2507 /// # use futures::StreamExt;
2508 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2509 /// let tick = process.tick();
2510 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2511 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2512 /// batch
2513 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2514 /// .all_ticks()
2515 /// # }, |mut stream| async move {
2516 /// // (1, false), (2, true)
2517 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2518 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2519 /// # }));
2520 /// # }
2521 /// ```
2522 pub fn fold_keyed_idempotent<A, I, F>(
2523 self,
2524 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2525 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2526 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2527 where
2528 I: Fn() -> A + 'a,
2529 F: Fn(&mut A, V) + 'a,
2530 {
2531 self.into_keyed().fold_idempotent(init, comb).entries()
2532 }
2533
2534 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2535 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2536 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2537 /// in the second element are accumulated via the `comb` closure.
2538 ///
2539 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2540 ///
2541 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2542 ///
2543 /// # Example
2544 /// ```rust
2545 /// # #[cfg(feature = "deploy")] {
2546 /// # use hydro_lang::prelude::*;
2547 /// # use futures::StreamExt;
2548 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2549 /// let tick = process.tick();
2550 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2551 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2552 /// batch
2553 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2554 /// .all_ticks()
2555 /// # }, |mut stream| async move {
2556 /// // (1, false), (2, true)
2557 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2558 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2559 /// # }));
2560 /// # }
2561 /// ```
2562 pub fn reduce_keyed_idempotent<F>(
2563 self,
2564 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2565 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2566 where
2567 F: Fn(&mut V, V) + 'a,
2568 {
2569 self.into_keyed().reduce_idempotent(comb).entries()
2570 }
2571}
2572
2573impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2574where
2575 L: Location<'a> + NoTick,
2576{
2577 /// Returns a stream corresponding to the latest batch of elements being atomically
2578 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2579 /// the order of the input.
2580 ///
2581 /// # Non-Determinism
2582 /// The batch boundaries are non-deterministic and may change across executions.
2583 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2584 Stream::new(
2585 self.location.clone().tick,
2586 HydroNode::Batch {
2587 inner: Box::new(self.ir_node.into_inner()),
2588 metadata: self
2589 .location
2590 .tick
2591 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2592 },
2593 )
2594 }
2595
2596 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2597 /// See [`Stream::atomic`] for more details.
2598 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2599 Stream::new(
2600 self.location.tick.l.clone(),
2601 HydroNode::EndAtomic {
2602 inner: Box::new(self.ir_node.into_inner()),
2603 metadata: self
2604 .location
2605 .tick
2606 .l
2607 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2608 },
2609 )
2610 }
2611}
2612
2613impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2614where
2615 L: Location<'a>,
2616{
2617 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2618 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2619 ///
2620 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2621 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2622 /// argument that declares where the stream will be atomically processed. Batching a stream into
2623 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2624 /// [`Tick`] will introduce asynchrony.
2625 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2626 let out_location = Atomic { tick: tick.clone() };
2627 Stream::new(
2628 out_location.clone(),
2629 HydroNode::BeginAtomic {
2630 inner: Box::new(self.ir_node.into_inner()),
2631 metadata: out_location
2632 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2633 },
2634 )
2635 }
2636
2637 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2638 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2639 /// the order of the input. The output stream will execute in the [`Tick`] that was
2640 /// used to create the atomic section.
2641 ///
2642 /// # Non-Determinism
2643 /// The batch boundaries are non-deterministic and may change across executions.
2644 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2645 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2646 Stream::new(
2647 tick.clone(),
2648 HydroNode::Batch {
2649 inner: Box::new(self.ir_node.into_inner()),
2650 metadata: tick
2651 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2652 },
2653 )
2654 }
2655
2656 /// Given a time interval, returns a stream corresponding to samples taken from the
2657 /// stream roughly at that interval. The output will have elements in the same order
2658 /// as the input, but with arbitrary elements skipped between samples. There is also
2659 /// no guarantee on the exact timing of the samples.
2660 ///
2661 /// # Non-Determinism
2662 /// The output stream is non-deterministic in which elements are sampled, since this
2663 /// is controlled by a clock.
2664 pub fn sample_every(
2665 self,
2666 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2667 nondet: NonDet,
2668 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2669 where
2670 L: NoTick + NoAtomic,
2671 {
2672 let samples = self.location.source_interval(interval, nondet);
2673
2674 let tick = self.location.tick();
2675 self.batch(&tick, nondet)
2676 .filter_if_some(samples.batch(&tick, nondet).first())
2677 .all_ticks()
2678 .weakest_retries()
2679 }
2680
2681 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2682 /// stream has not emitted a value since that duration.
2683 ///
2684 /// # Non-Determinism
2685 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2686 /// samples take place, timeouts may be non-deterministically generated or missed,
2687 /// and the notification of the timeout may be delayed as well. There is also no
2688 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2689 /// detected based on when the next sample is taken.
2690 pub fn timeout(
2691 self,
2692 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2693 nondet: NonDet,
2694 ) -> Optional<(), L, Unbounded>
2695 where
2696 L: NoTick + NoAtomic,
2697 {
2698 let tick = self.location.tick();
2699
2700 let latest_received = self.assume_retries(nondet).fold_commutative(
2701 q!(|| None),
2702 q!(|latest, _| {
2703 *latest = Some(Instant::now());
2704 }),
2705 );
2706
2707 latest_received
2708 .snapshot(&tick, nondet)
2709 .filter_map(q!(move |latest_received| {
2710 if let Some(latest_received) = latest_received {
2711 if Instant::now().duration_since(latest_received) > duration {
2712 Some(())
2713 } else {
2714 None
2715 }
2716 } else {
2717 Some(())
2718 }
2719 }))
2720 .latest()
2721 }
2722}
2723
2724impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2725where
2726 L: Location<'a> + NoTick + NoAtomic,
2727 F: Future<Output = T>,
2728{
2729 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2730 /// Future outputs are produced as available, regardless of input arrival order.
2731 ///
2732 /// # Example
2733 /// ```rust
2734 /// # #[cfg(feature = "deploy")] {
2735 /// # use std::collections::HashSet;
2736 /// # use futures::StreamExt;
2737 /// # use hydro_lang::prelude::*;
2738 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2739 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2740 /// .map(q!(|x| async move {
2741 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2742 /// x
2743 /// }))
2744 /// .resolve_futures()
2745 /// # },
2746 /// # |mut stream| async move {
2747 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2748 /// # let mut output = HashSet::new();
2749 /// # for _ in 1..10 {
2750 /// # output.insert(stream.next().await.unwrap());
2751 /// # }
2752 /// # assert_eq!(
2753 /// # output,
2754 /// # HashSet::<i32>::from_iter(1..10)
2755 /// # );
2756 /// # },
2757 /// # ));
2758 /// # }
2759 pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2760 Stream::new(
2761 self.location.clone(),
2762 HydroNode::ResolveFutures {
2763 input: Box::new(self.ir_node.into_inner()),
2764 metadata: self
2765 .location
2766 .new_node_metadata(Stream::<T, L, B, NoOrder, R>::collection_kind()),
2767 },
2768 )
2769 }
2770
2771 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2772 /// Future outputs are produced in the same order as the input stream.
2773 ///
2774 /// # Example
2775 /// ```rust
2776 /// # #[cfg(feature = "deploy")] {
2777 /// # use std::collections::HashSet;
2778 /// # use futures::StreamExt;
2779 /// # use hydro_lang::prelude::*;
2780 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2781 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2782 /// .map(q!(|x| async move {
2783 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2784 /// x
2785 /// }))
2786 /// .resolve_futures_ordered()
2787 /// # },
2788 /// # |mut stream| async move {
2789 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2790 /// # let mut output = Vec::new();
2791 /// # for _ in 1..10 {
2792 /// # output.push(stream.next().await.unwrap());
2793 /// # }
2794 /// # assert_eq!(
2795 /// # output,
2796 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2797 /// # );
2798 /// # },
2799 /// # ));
2800 /// # }
2801 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2802 Stream::new(
2803 self.location.clone(),
2804 HydroNode::ResolveFuturesOrdered {
2805 input: Box::new(self.ir_node.into_inner()),
2806 metadata: self
2807 .location
2808 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2809 },
2810 )
2811 }
2812}
2813
2814impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2815where
2816 L: Location<'a> + NoTick,
2817{
2818 /// Executes the provided closure for every element in this stream.
2819 ///
2820 /// Because the closure may have side effects, the stream must have deterministic order
2821 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2822 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2823 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2824 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2825 let f = f.splice_fn1_ctx(&self.location).into();
2826 self.location
2827 .flow_state()
2828 .borrow_mut()
2829 .push_root(HydroRoot::ForEach {
2830 input: Box::new(self.ir_node.into_inner()),
2831 f,
2832 op_metadata: HydroIrOpMetadata::new(),
2833 });
2834 }
2835
2836 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2837 /// TCP socket to some other server. You should _not_ use this API for interacting with
2838 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2839 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2840 /// interaction with asynchronous sinks.
2841 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2842 where
2843 S: 'a + futures::Sink<T> + Unpin,
2844 {
2845 self.location
2846 .flow_state()
2847 .borrow_mut()
2848 .push_root(HydroRoot::DestSink {
2849 sink: sink.splice_typed_ctx(&self.location).into(),
2850 input: Box::new(self.ir_node.into_inner()),
2851 op_metadata: HydroIrOpMetadata::new(),
2852 });
2853 }
2854}
2855
2856impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2857where
2858 L: Location<'a>,
2859{
2860 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2861 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2862 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2863 Stream::new(
2864 self.location.outer().clone(),
2865 HydroNode::YieldConcat {
2866 inner: Box::new(self.ir_node.into_inner()),
2867 metadata: self
2868 .location
2869 .outer()
2870 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2871 },
2872 )
2873 }
2874
2875 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2876 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2877 ///
2878 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2879 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2880 /// stream's [`Tick`] context.
2881 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2882 let out_location = Atomic {
2883 tick: self.location.clone(),
2884 };
2885
2886 Stream::new(
2887 out_location.clone(),
2888 HydroNode::YieldConcat {
2889 inner: Box::new(self.ir_node.into_inner()),
2890 metadata: out_location
2891 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2892 },
2893 )
2894 }
2895
2896 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2897 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2898 /// input.
2899 ///
2900 /// This API is particularly useful for stateful computation on batches of data, such as
2901 /// maintaining an accumulated state that is up to date with the current batch.
2902 ///
2903 /// # Example
2904 /// ```rust
2905 /// # #[cfg(feature = "deploy")] {
2906 /// # use hydro_lang::prelude::*;
2907 /// # use futures::StreamExt;
2908 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2909 /// let tick = process.tick();
2910 /// # // ticks are lazy by default, forces the second tick to run
2911 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2912 /// # let batch_first_tick = process
2913 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2914 /// # .batch(&tick, nondet!(/** test */));
2915 /// # let batch_second_tick = process
2916 /// # .source_iter(q!(vec![5, 6, 7]))
2917 /// # .batch(&tick, nondet!(/** test */))
2918 /// # .defer_tick(); // appears on the second tick
2919 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2920 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2921 ///
2922 /// input.batch(&tick, nondet!(/** test */))
2923 /// .across_ticks(|s| s.count()).all_ticks()
2924 /// # }, |mut stream| async move {
2925 /// // [4, 7]
2926 /// assert_eq!(stream.next().await.unwrap(), 4);
2927 /// assert_eq!(stream.next().await.unwrap(), 7);
2928 /// # }));
2929 /// # }
2930 /// ```
2931 pub fn across_ticks<Out: BatchAtomic>(
2932 self,
2933 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2934 ) -> Out::Batched {
2935 thunk(self.all_ticks_atomic()).batched_atomic()
2936 }
2937
2938 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2939 /// always has the elements of `self` at tick `T - 1`.
2940 ///
2941 /// At tick `0`, the output stream is empty, since there is no previous tick.
2942 ///
2943 /// This operator enables stateful iterative processing with ticks, by sending data from one
2944 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2945 ///
2946 /// # Example
2947 /// ```rust
2948 /// # #[cfg(feature = "deploy")] {
2949 /// # use hydro_lang::prelude::*;
2950 /// # use futures::StreamExt;
2951 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2952 /// let tick = process.tick();
2953 /// // ticks are lazy by default, forces the second tick to run
2954 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2955 ///
2956 /// let batch_first_tick = process
2957 /// .source_iter(q!(vec![1, 2, 3, 4]))
2958 /// .batch(&tick, nondet!(/** test */));
2959 /// let batch_second_tick = process
2960 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2961 /// .batch(&tick, nondet!(/** test */))
2962 /// .defer_tick(); // appears on the second tick
2963 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2964 ///
2965 /// changes_across_ticks.clone().filter_not_in(
2966 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2967 /// ).all_ticks()
2968 /// # }, |mut stream| async move {
2969 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2970 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2971 /// # assert_eq!(stream.next().await.unwrap(), w);
2972 /// # }
2973 /// # }));
2974 /// # }
2975 /// ```
2976 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2977 Stream::new(
2978 self.location.clone(),
2979 HydroNode::DeferTick {
2980 input: Box::new(self.ir_node.into_inner()),
2981 metadata: self
2982 .location
2983 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2984 },
2985 )
2986 }
2987}
2988
2989#[cfg(test)]
2990mod tests {
2991 #[cfg(feature = "deploy")]
2992 use futures::{SinkExt, StreamExt};
2993 #[cfg(feature = "deploy")]
2994 use hydro_deploy::Deployment;
2995 #[cfg(feature = "deploy")]
2996 use serde::{Deserialize, Serialize};
2997 #[cfg(feature = "deploy")]
2998 use stageleft::q;
2999
3000 #[cfg(any(feature = "deploy", feature = "sim"))]
3001 use crate::compile::builder::FlowBuilder;
3002 #[cfg(feature = "deploy")]
3003 use crate::live_collections::stream::ExactlyOnce;
3004 #[cfg(feature = "sim")]
3005 use crate::live_collections::stream::NoOrder;
3006 #[cfg(any(feature = "deploy", feature = "sim"))]
3007 use crate::live_collections::stream::TotalOrder;
3008 #[cfg(any(feature = "deploy", feature = "sim"))]
3009 use crate::location::Location;
3010 #[cfg(any(feature = "deploy", feature = "sim"))]
3011 use crate::nondet::nondet;
3012
3013 mod backtrace_chained_ops;
3014
3015 #[cfg(feature = "deploy")]
3016 struct P1 {}
3017 #[cfg(feature = "deploy")]
3018 struct P2 {}
3019
3020 #[cfg(feature = "deploy")]
3021 #[derive(Serialize, Deserialize, Debug)]
3022 struct SendOverNetwork {
3023 n: u32,
3024 }
3025
3026 #[cfg(feature = "deploy")]
3027 #[tokio::test]
3028 async fn first_ten_distributed() {
3029 let mut deployment = Deployment::new();
3030
3031 let flow = FlowBuilder::new();
3032 let first_node = flow.process::<P1>();
3033 let second_node = flow.process::<P2>();
3034 let external = flow.external::<P2>();
3035
3036 let numbers = first_node.source_iter(q!(0..10));
3037 let out_port = numbers
3038 .map(q!(|n| SendOverNetwork { n }))
3039 .send_bincode(&second_node)
3040 .send_bincode_external(&external);
3041
3042 let nodes = flow
3043 .with_process(&first_node, deployment.Localhost())
3044 .with_process(&second_node, deployment.Localhost())
3045 .with_external(&external, deployment.Localhost())
3046 .deploy(&mut deployment);
3047
3048 deployment.deploy().await.unwrap();
3049
3050 let mut external_out = nodes.connect(out_port).await;
3051
3052 deployment.start().await.unwrap();
3053
3054 for i in 0..10 {
3055 assert_eq!(external_out.next().await.unwrap().n, i);
3056 }
3057 }
3058
3059 #[cfg(feature = "deploy")]
3060 #[tokio::test]
3061 async fn first_cardinality() {
3062 let mut deployment = Deployment::new();
3063
3064 let flow = FlowBuilder::new();
3065 let node = flow.process::<()>();
3066 let external = flow.external::<()>();
3067
3068 let node_tick = node.tick();
3069 let count = node_tick
3070 .singleton(q!([1, 2, 3]))
3071 .into_stream()
3072 .flatten_ordered()
3073 .first()
3074 .into_stream()
3075 .count()
3076 .all_ticks()
3077 .send_bincode_external(&external);
3078
3079 let nodes = flow
3080 .with_process(&node, deployment.Localhost())
3081 .with_external(&external, deployment.Localhost())
3082 .deploy(&mut deployment);
3083
3084 deployment.deploy().await.unwrap();
3085
3086 let mut external_out = nodes.connect(count).await;
3087
3088 deployment.start().await.unwrap();
3089
3090 assert_eq!(external_out.next().await.unwrap(), 1);
3091 }
3092
3093 #[cfg(feature = "deploy")]
3094 #[tokio::test]
3095 async fn unbounded_reduce_remembers_state() {
3096 let mut deployment = Deployment::new();
3097
3098 let flow = FlowBuilder::new();
3099 let node = flow.process::<()>();
3100 let external = flow.external::<()>();
3101
3102 let (input_port, input) = node.source_external_bincode(&external);
3103 let out = input
3104 .reduce(q!(|acc, v| *acc += v))
3105 .sample_eager(nondet!(/** test */))
3106 .send_bincode_external(&external);
3107
3108 let nodes = flow
3109 .with_process(&node, deployment.Localhost())
3110 .with_external(&external, deployment.Localhost())
3111 .deploy(&mut deployment);
3112
3113 deployment.deploy().await.unwrap();
3114
3115 let mut external_in = nodes.connect(input_port).await;
3116 let mut external_out = nodes.connect(out).await;
3117
3118 deployment.start().await.unwrap();
3119
3120 external_in.send(1).await.unwrap();
3121 assert_eq!(external_out.next().await.unwrap(), 1);
3122
3123 external_in.send(2).await.unwrap();
3124 assert_eq!(external_out.next().await.unwrap(), 3);
3125 }
3126
3127 #[cfg(feature = "deploy")]
3128 #[tokio::test]
3129 async fn atomic_fold_replays_each_tick() {
3130 let mut deployment = Deployment::new();
3131
3132 let flow = FlowBuilder::new();
3133 let node = flow.process::<()>();
3134 let external = flow.external::<()>();
3135
3136 let (input_port, input) =
3137 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3138 let tick = node.tick();
3139
3140 let out = input
3141 .batch(&tick, nondet!(/** test */))
3142 .cross_singleton(
3143 node.source_iter(q!(vec![1, 2, 3]))
3144 .atomic(&tick)
3145 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3146 .snapshot_atomic(nondet!(/** test */)),
3147 )
3148 .all_ticks()
3149 .send_bincode_external(&external);
3150
3151 let nodes = flow
3152 .with_process(&node, deployment.Localhost())
3153 .with_external(&external, deployment.Localhost())
3154 .deploy(&mut deployment);
3155
3156 deployment.deploy().await.unwrap();
3157
3158 let mut external_in = nodes.connect(input_port).await;
3159 let mut external_out = nodes.connect(out).await;
3160
3161 deployment.start().await.unwrap();
3162
3163 external_in.send(1).await.unwrap();
3164 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3165
3166 external_in.send(2).await.unwrap();
3167 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3168 }
3169
3170 #[cfg(feature = "deploy")]
3171 #[tokio::test]
3172 async fn unbounded_scan_remembers_state() {
3173 let mut deployment = Deployment::new();
3174
3175 let flow = FlowBuilder::new();
3176 let node = flow.process::<()>();
3177 let external = flow.external::<()>();
3178
3179 let (input_port, input) = node.source_external_bincode(&external);
3180 let out = input
3181 .scan(
3182 q!(|| 0),
3183 q!(|acc, v| {
3184 *acc += v;
3185 Some(*acc)
3186 }),
3187 )
3188 .send_bincode_external(&external);
3189
3190 let nodes = flow
3191 .with_process(&node, deployment.Localhost())
3192 .with_external(&external, deployment.Localhost())
3193 .deploy(&mut deployment);
3194
3195 deployment.deploy().await.unwrap();
3196
3197 let mut external_in = nodes.connect(input_port).await;
3198 let mut external_out = nodes.connect(out).await;
3199
3200 deployment.start().await.unwrap();
3201
3202 external_in.send(1).await.unwrap();
3203 assert_eq!(external_out.next().await.unwrap(), 1);
3204
3205 external_in.send(2).await.unwrap();
3206 assert_eq!(external_out.next().await.unwrap(), 3);
3207 }
3208
3209 #[cfg(feature = "deploy")]
3210 #[tokio::test]
3211 async fn unbounded_enumerate_remembers_state() {
3212 let mut deployment = Deployment::new();
3213
3214 let flow = FlowBuilder::new();
3215 let node = flow.process::<()>();
3216 let external = flow.external::<()>();
3217
3218 let (input_port, input) = node.source_external_bincode(&external);
3219 let out = input.enumerate().send_bincode_external(&external);
3220
3221 let nodes = flow
3222 .with_process(&node, deployment.Localhost())
3223 .with_external(&external, deployment.Localhost())
3224 .deploy(&mut deployment);
3225
3226 deployment.deploy().await.unwrap();
3227
3228 let mut external_in = nodes.connect(input_port).await;
3229 let mut external_out = nodes.connect(out).await;
3230
3231 deployment.start().await.unwrap();
3232
3233 external_in.send(1).await.unwrap();
3234 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3235
3236 external_in.send(2).await.unwrap();
3237 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3238 }
3239
3240 #[cfg(feature = "deploy")]
3241 #[tokio::test]
3242 async fn unbounded_unique_remembers_state() {
3243 let mut deployment = Deployment::new();
3244
3245 let flow = FlowBuilder::new();
3246 let node = flow.process::<()>();
3247 let external = flow.external::<()>();
3248
3249 let (input_port, input) =
3250 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3251 let out = input.unique().send_bincode_external(&external);
3252
3253 let nodes = flow
3254 .with_process(&node, deployment.Localhost())
3255 .with_external(&external, deployment.Localhost())
3256 .deploy(&mut deployment);
3257
3258 deployment.deploy().await.unwrap();
3259
3260 let mut external_in = nodes.connect(input_port).await;
3261 let mut external_out = nodes.connect(out).await;
3262
3263 deployment.start().await.unwrap();
3264
3265 external_in.send(1).await.unwrap();
3266 assert_eq!(external_out.next().await.unwrap(), 1);
3267
3268 external_in.send(2).await.unwrap();
3269 assert_eq!(external_out.next().await.unwrap(), 2);
3270
3271 external_in.send(1).await.unwrap();
3272 external_in.send(3).await.unwrap();
3273 assert_eq!(external_out.next().await.unwrap(), 3);
3274 }
3275
3276 #[cfg(feature = "sim")]
3277 #[test]
3278 #[should_panic]
3279 fn sim_batch_nondet_size() {
3280 let flow = FlowBuilder::new();
3281 let node = flow.process::<()>();
3282
3283 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3284
3285 let tick = node.tick();
3286 let out_recv = input
3287 .batch(&tick, nondet!(/** test */))
3288 .count()
3289 .all_ticks()
3290 .sim_output();
3291
3292 flow.sim().exhaustive(async || {
3293 in_send.send(());
3294 in_send.send(());
3295 in_send.send(());
3296
3297 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3298 });
3299 }
3300
3301 #[cfg(feature = "sim")]
3302 #[test]
3303 fn sim_batch_preserves_order() {
3304 let flow = FlowBuilder::new();
3305 let node = flow.process::<()>();
3306
3307 let (in_send, input) = node.sim_input();
3308
3309 let tick = node.tick();
3310 let out_recv = input
3311 .batch(&tick, nondet!(/** test */))
3312 .all_ticks()
3313 .sim_output();
3314
3315 flow.sim().exhaustive(async || {
3316 in_send.send(1);
3317 in_send.send(2);
3318 in_send.send(3);
3319
3320 out_recv.assert_yields_only([1, 2, 3]).await;
3321 });
3322 }
3323
3324 #[cfg(feature = "sim")]
3325 #[test]
3326 #[should_panic]
3327 fn sim_batch_unordered_shuffles() {
3328 let flow = FlowBuilder::new();
3329 let node = flow.process::<()>();
3330
3331 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3332
3333 let tick = node.tick();
3334 let batch = input.batch(&tick, nondet!(/** test */));
3335 let out_recv = batch
3336 .clone()
3337 .min()
3338 .zip(batch.max())
3339 .all_ticks()
3340 .sim_output();
3341
3342 flow.sim().exhaustive(async || {
3343 in_send.send_many_unordered([1, 2, 3]);
3344
3345 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3346 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3347 }
3348 });
3349 }
3350
3351 #[cfg(feature = "sim")]
3352 #[test]
3353 fn sim_batch_unordered_shuffles_count() {
3354 let flow = FlowBuilder::new();
3355 let node = flow.process::<()>();
3356
3357 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3358
3359 let tick = node.tick();
3360 let batch = input.batch(&tick, nondet!(/** test */));
3361 let out_recv = batch.all_ticks().sim_output();
3362
3363 let instance_count = flow.sim().exhaustive(async || {
3364 in_send.send_many_unordered([1, 2, 3, 4]);
3365 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3366 });
3367
3368 assert_eq!(
3369 instance_count,
3370 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3371 )
3372 }
3373
3374 #[cfg(feature = "sim")]
3375 #[test]
3376 #[should_panic]
3377 fn sim_observe_order_batched() {
3378 let flow = FlowBuilder::new();
3379 let node = flow.process::<()>();
3380
3381 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3382
3383 let tick = node.tick();
3384 let batch = input.batch(&tick, nondet!(/** test */));
3385 let out_recv = batch
3386 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3387 .all_ticks()
3388 .sim_output();
3389
3390 flow.sim().exhaustive(async || {
3391 in_send.send_many_unordered([1, 2, 3, 4]);
3392 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3393 });
3394 }
3395
3396 #[cfg(feature = "sim")]
3397 #[test]
3398 fn sim_observe_order_batched_count() {
3399 let flow = FlowBuilder::new();
3400 let node = flow.process::<()>();
3401
3402 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3403
3404 let tick = node.tick();
3405 let batch = input.batch(&tick, nondet!(/** test */));
3406 let out_recv = batch
3407 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3408 .all_ticks()
3409 .sim_output();
3410
3411 let instance_count = flow.sim().exhaustive(async || {
3412 in_send.send_many_unordered([1, 2, 3, 4]);
3413 let _ = out_recv.collect::<Vec<_>>().await;
3414 });
3415
3416 assert_eq!(
3417 instance_count,
3418 192 // 4! * 2^{4 - 1}
3419 )
3420 }
3421
3422 #[cfg(feature = "sim")]
3423 #[test]
3424 fn sim_unordered_count_instance_count() {
3425 let flow = FlowBuilder::new();
3426 let node = flow.process::<()>();
3427
3428 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3429
3430 let tick = node.tick();
3431 let out_recv = input
3432 .count()
3433 .snapshot(&tick, nondet!(/** test */))
3434 .all_ticks()
3435 .sim_output();
3436
3437 let instance_count = flow.sim().exhaustive(async || {
3438 in_send.send_many_unordered([1, 2, 3, 4]);
3439 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3440 });
3441
3442 assert_eq!(
3443 instance_count,
3444 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3445 )
3446 }
3447}