hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::batch_atomic::BatchAtomic;
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::{Atomic, DeferTick, NoAtomic};
28use crate::location::{Location, NoTick, Tick, check_matching_location};
29use crate::nondet::{NonDet, nondet};
30use crate::prelude::ManualProof;
31use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
32
33pub mod networking;
34
35/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
36#[sealed::sealed]
37pub trait Ordering:
38 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
39{
40 /// The [`StreamOrder`] corresponding to this type.
41 const ORDERING_KIND: StreamOrder;
42}
43
44/// Marks the stream as being totally ordered, which means that there are
45/// no sources of non-determinism (other than intentional ones) that will
46/// affect the order of elements.
47pub enum TotalOrder {}
48
49#[sealed::sealed]
50impl Ordering for TotalOrder {
51 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
52}
53
54/// Marks the stream as having no order, which means that the order of
55/// elements may be affected by non-determinism.
56///
57/// This restricts certain operators, such as `fold` and `reduce`, to only
58/// be used with commutative aggregation functions.
59pub enum NoOrder {}
60
61#[sealed::sealed]
62impl Ordering for NoOrder {
63 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
64}
65
66/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
67/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
68/// have `Self` guarantees instead.
69#[sealed::sealed]
70pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
71#[sealed::sealed]
72impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
73
74/// Helper trait for determining the weakest of two orderings.
75#[sealed::sealed]
76pub trait MinOrder<Other: ?Sized> {
77 /// The weaker of the two orderings.
78 type Min: Ordering;
79}
80
81#[sealed::sealed]
82impl<O: Ordering> MinOrder<O> for TotalOrder {
83 type Min = O;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for NoOrder {
88 type Min = NoOrder;
89}
90
91/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
92#[sealed::sealed]
93pub trait Retries:
94 MinRetries<Self, Min = Self>
95 + MinRetries<ExactlyOnce, Min = Self>
96 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
97{
98 /// The [`StreamRetry`] corresponding to this type.
99 const RETRIES_KIND: StreamRetry;
100}
101
102/// Marks the stream as having deterministic message cardinality, with no
103/// possibility of duplicates.
104pub enum ExactlyOnce {}
105
106#[sealed::sealed]
107impl Retries for ExactlyOnce {
108 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
109}
110
111/// Marks the stream as having non-deterministic message cardinality, which
112/// means that duplicates may occur, but messages will not be dropped.
113pub enum AtLeastOnce {}
114
115#[sealed::sealed]
116impl Retries for AtLeastOnce {
117 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
118}
119
120/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
121/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
122/// have `Self` guarantees instead.
123#[sealed::sealed]
124pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
125#[sealed::sealed]
126impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
127
128/// Helper trait for determining the weakest of two retry guarantees.
129#[sealed::sealed]
130pub trait MinRetries<Other: ?Sized> {
131 /// The weaker of the two retry guarantees.
132 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
133}
134
135#[sealed::sealed]
136impl<R: Retries> MinRetries<R> for ExactlyOnce {
137 type Min = R;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for AtLeastOnce {
142 type Min = AtLeastOnce;
143}
144
145/// Streaming sequence of elements with type `Type`.
146///
147/// This live collection represents a growing sequence of elements, with new elements being
148/// asynchronously appended to the end of the sequence. This can be used to model the arrival
149/// of network input, such as API requests, or streaming ingestion.
150///
151/// By default, all streams have deterministic ordering and each element is materialized exactly
152/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
153/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
154/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
155///
156/// Type Parameters:
157/// - `Type`: the type of elements in the stream
158/// - `Loc`: the location where the stream is being materialized
159/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
160/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
161/// (default is [`TotalOrder`])
162/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
163/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
164pub struct Stream<
165 Type,
166 Loc,
167 Bound: Boundedness = Unbounded,
168 Order: Ordering = TotalOrder,
169 Retry: Retries = ExactlyOnce,
170> {
171 pub(crate) location: Loc,
172 pub(crate) ir_node: RefCell<HydroNode>,
173
174 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
175}
176
177impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
178 for Stream<T, L, Unbounded, O, R>
179where
180 L: Location<'a>,
181{
182 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
183 let new_meta = stream
184 .location
185 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
186
187 Stream {
188 location: stream.location,
189 ir_node: RefCell::new(HydroNode::Cast {
190 inner: Box::new(stream.ir_node.into_inner()),
191 metadata: new_meta,
192 }),
193 _phantom: PhantomData,
194 }
195 }
196}
197
198impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
199 for Stream<T, L, B, NoOrder, R>
200where
201 L: Location<'a>,
202{
203 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
204 stream.weaken_ordering()
205 }
206}
207
208impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
209 for Stream<T, L, B, O, AtLeastOnce>
210where
211 L: Location<'a>,
212{
213 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
214 stream.weaken_retries()
215 }
216}
217
218impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
219where
220 L: Location<'a>,
221{
222 fn defer_tick(self) -> Self {
223 Stream::defer_tick(self)
224 }
225}
226
227impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
228 for Stream<T, Tick<L>, Bounded, O, R>
229where
230 L: Location<'a>,
231{
232 type Location = Tick<L>;
233
234 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
235 Stream::new(
236 location.clone(),
237 HydroNode::CycleSource {
238 ident,
239 metadata: location.new_node_metadata(Self::collection_kind()),
240 },
241 )
242 }
243}
244
245impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
246 for Stream<T, Tick<L>, Bounded, O, R>
247where
248 L: Location<'a>,
249{
250 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
251 assert_eq!(
252 Location::id(&self.location),
253 expected_location,
254 "locations do not match"
255 );
256 self.location
257 .flow_state()
258 .borrow_mut()
259 .push_root(HydroRoot::CycleSink {
260 ident,
261 input: Box::new(self.ir_node.into_inner()),
262 op_metadata: HydroIrOpMetadata::new(),
263 });
264 }
265}
266
267impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
268 for Stream<T, L, B, O, R>
269where
270 L: Location<'a> + NoTick,
271{
272 type Location = L;
273
274 fn create_source(ident: syn::Ident, location: L) -> Self {
275 Stream::new(
276 location.clone(),
277 HydroNode::CycleSource {
278 ident,
279 metadata: location.new_node_metadata(Self::collection_kind()),
280 },
281 )
282 }
283}
284
285impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
286 for Stream<T, L, B, O, R>
287where
288 L: Location<'a> + NoTick,
289{
290 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
291 assert_eq!(
292 Location::id(&self.location),
293 expected_location,
294 "locations do not match"
295 );
296 self.location
297 .flow_state()
298 .borrow_mut()
299 .push_root(HydroRoot::CycleSink {
300 ident,
301 input: Box::new(self.ir_node.into_inner()),
302 op_metadata: HydroIrOpMetadata::new(),
303 });
304 }
305}
306
307impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
308where
309 T: Clone,
310 L: Location<'a>,
311{
312 fn clone(&self) -> Self {
313 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
314 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
315 *self.ir_node.borrow_mut() = HydroNode::Tee {
316 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
317 metadata: self.location.new_node_metadata(Self::collection_kind()),
318 };
319 }
320
321 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
322 Stream {
323 location: self.location.clone(),
324 ir_node: HydroNode::Tee {
325 inner: TeeNode(inner.0.clone()),
326 metadata: metadata.clone(),
327 }
328 .into(),
329 _phantom: PhantomData,
330 }
331 } else {
332 unreachable!()
333 }
334 }
335}
336
337impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
338where
339 L: Location<'a>,
340{
341 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
342 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
343 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
344
345 Stream {
346 location,
347 ir_node: RefCell::new(ir_node),
348 _phantom: PhantomData,
349 }
350 }
351
352 /// Returns the [`Location`] where this stream is being materialized.
353 pub fn location(&self) -> &L {
354 &self.location
355 }
356
357 pub(crate) fn collection_kind() -> CollectionKind {
358 CollectionKind::Stream {
359 bound: B::BOUND_KIND,
360 order: O::ORDERING_KIND,
361 retry: R::RETRIES_KIND,
362 element_type: quote_type::<T>().into(),
363 }
364 }
365
366 /// Produces a stream based on invoking `f` on each element.
367 /// If you do not want to modify the stream and instead only want to view
368 /// each item use [`Stream::inspect`] instead.
369 ///
370 /// # Example
371 /// ```rust
372 /// # #[cfg(feature = "deploy")] {
373 /// # use hydro_lang::prelude::*;
374 /// # use futures::StreamExt;
375 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
376 /// let words = process.source_iter(q!(vec!["hello", "world"]));
377 /// words.map(q!(|x| x.to_uppercase()))
378 /// # }, |mut stream| async move {
379 /// # for w in vec!["HELLO", "WORLD"] {
380 /// # assert_eq!(stream.next().await.unwrap(), w);
381 /// # }
382 /// # }));
383 /// # }
384 /// ```
385 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
386 where
387 F: Fn(T) -> U + 'a,
388 {
389 let f = f.splice_fn1_ctx(&self.location).into();
390 Stream::new(
391 self.location.clone(),
392 HydroNode::Map {
393 f,
394 input: Box::new(self.ir_node.into_inner()),
395 metadata: self
396 .location
397 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
398 },
399 )
400 }
401
402 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
403 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
404 /// for the output type `U` must produce items in a **deterministic** order.
405 ///
406 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
407 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
408 ///
409 /// # Example
410 /// ```rust
411 /// # #[cfg(feature = "deploy")] {
412 /// # use hydro_lang::prelude::*;
413 /// # use futures::StreamExt;
414 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
415 /// process
416 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
417 /// .flat_map_ordered(q!(|x| x))
418 /// # }, |mut stream| async move {
419 /// // 1, 2, 3, 4
420 /// # for w in (1..5) {
421 /// # assert_eq!(stream.next().await.unwrap(), w);
422 /// # }
423 /// # }));
424 /// # }
425 /// ```
426 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
427 where
428 I: IntoIterator<Item = U>,
429 F: Fn(T) -> I + 'a,
430 {
431 let f = f.splice_fn1_ctx(&self.location).into();
432 Stream::new(
433 self.location.clone(),
434 HydroNode::FlatMap {
435 f,
436 input: Box::new(self.ir_node.into_inner()),
437 metadata: self
438 .location
439 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
440 },
441 )
442 }
443
444 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
445 /// for the output type `U` to produce items in any order.
446 ///
447 /// # Example
448 /// ```rust
449 /// # #[cfg(feature = "deploy")] {
450 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
451 /// # use futures::StreamExt;
452 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
453 /// process
454 /// .source_iter(q!(vec![
455 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
456 /// std::collections::HashSet::from_iter(vec![3, 4]),
457 /// ]))
458 /// .flat_map_unordered(q!(|x| x))
459 /// # }, |mut stream| async move {
460 /// // 1, 2, 3, 4, but in no particular order
461 /// # let mut results = Vec::new();
462 /// # for w in (1..5) {
463 /// # results.push(stream.next().await.unwrap());
464 /// # }
465 /// # results.sort();
466 /// # assert_eq!(results, vec![1, 2, 3, 4]);
467 /// # }));
468 /// # }
469 /// ```
470 pub fn flat_map_unordered<U, I, F>(
471 self,
472 f: impl IntoQuotedMut<'a, F, L>,
473 ) -> Stream<U, L, B, NoOrder, R>
474 where
475 I: IntoIterator<Item = U>,
476 F: Fn(T) -> I + 'a,
477 {
478 let f = f.splice_fn1_ctx(&self.location).into();
479 Stream::new(
480 self.location.clone(),
481 HydroNode::FlatMap {
482 f,
483 input: Box::new(self.ir_node.into_inner()),
484 metadata: self
485 .location
486 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
487 },
488 )
489 }
490
491 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
492 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
493 ///
494 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
495 /// not deterministic, use [`Stream::flatten_unordered`] instead.
496 ///
497 /// ```rust
498 /// # #[cfg(feature = "deploy")] {
499 /// # use hydro_lang::prelude::*;
500 /// # use futures::StreamExt;
501 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
502 /// process
503 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
504 /// .flatten_ordered()
505 /// # }, |mut stream| async move {
506 /// // 1, 2, 3, 4
507 /// # for w in (1..5) {
508 /// # assert_eq!(stream.next().await.unwrap(), w);
509 /// # }
510 /// # }));
511 /// # }
512 /// ```
513 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
514 where
515 T: IntoIterator<Item = U>,
516 {
517 self.flat_map_ordered(q!(|d| d))
518 }
519
520 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
521 /// for the element type `T` to produce items in any order.
522 ///
523 /// # Example
524 /// ```rust
525 /// # #[cfg(feature = "deploy")] {
526 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
527 /// # use futures::StreamExt;
528 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
529 /// process
530 /// .source_iter(q!(vec![
531 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
532 /// std::collections::HashSet::from_iter(vec![3, 4]),
533 /// ]))
534 /// .flatten_unordered()
535 /// # }, |mut stream| async move {
536 /// // 1, 2, 3, 4, but in no particular order
537 /// # let mut results = Vec::new();
538 /// # for w in (1..5) {
539 /// # results.push(stream.next().await.unwrap());
540 /// # }
541 /// # results.sort();
542 /// # assert_eq!(results, vec![1, 2, 3, 4]);
543 /// # }));
544 /// # }
545 /// ```
546 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
547 where
548 T: IntoIterator<Item = U>,
549 {
550 self.flat_map_unordered(q!(|d| d))
551 }
552
553 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
554 /// `f`, preserving the order of the elements.
555 ///
556 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
557 /// not modify or take ownership of the values. If you need to modify the values while filtering
558 /// use [`Stream::filter_map`] instead.
559 ///
560 /// # Example
561 /// ```rust
562 /// # #[cfg(feature = "deploy")] {
563 /// # use hydro_lang::prelude::*;
564 /// # use futures::StreamExt;
565 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
566 /// process
567 /// .source_iter(q!(vec![1, 2, 3, 4]))
568 /// .filter(q!(|&x| x > 2))
569 /// # }, |mut stream| async move {
570 /// // 3, 4
571 /// # for w in (3..5) {
572 /// # assert_eq!(stream.next().await.unwrap(), w);
573 /// # }
574 /// # }));
575 /// # }
576 /// ```
577 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
578 where
579 F: Fn(&T) -> bool + 'a,
580 {
581 let f = f.splice_fn1_borrow_ctx(&self.location).into();
582 Stream::new(
583 self.location.clone(),
584 HydroNode::Filter {
585 f,
586 input: Box::new(self.ir_node.into_inner()),
587 metadata: self.location.new_node_metadata(Self::collection_kind()),
588 },
589 )
590 }
591
592 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
593 ///
594 /// # Example
595 /// ```rust
596 /// # #[cfg(feature = "deploy")] {
597 /// # use hydro_lang::prelude::*;
598 /// # use futures::StreamExt;
599 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
600 /// process
601 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
602 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
603 /// # }, |mut stream| async move {
604 /// // 1, 2
605 /// # for w in (1..3) {
606 /// # assert_eq!(stream.next().await.unwrap(), w);
607 /// # }
608 /// # }));
609 /// # }
610 /// ```
611 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
612 where
613 F: Fn(T) -> Option<U> + 'a,
614 {
615 let f = f.splice_fn1_ctx(&self.location).into();
616 Stream::new(
617 self.location.clone(),
618 HydroNode::FilterMap {
619 f,
620 input: Box::new(self.ir_node.into_inner()),
621 metadata: self
622 .location
623 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
624 },
625 )
626 }
627
628 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
629 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
630 /// If `other` is an empty [`Optional`], no values will be produced.
631 ///
632 /// # Example
633 /// ```rust
634 /// # #[cfg(feature = "deploy")] {
635 /// # use hydro_lang::prelude::*;
636 /// # use futures::StreamExt;
637 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638 /// let tick = process.tick();
639 /// let batch = process
640 /// .source_iter(q!(vec![1, 2, 3, 4]))
641 /// .batch(&tick, nondet!(/** test */));
642 /// let count = batch.clone().count(); // `count()` returns a singleton
643 /// batch.cross_singleton(count).all_ticks()
644 /// # }, |mut stream| async move {
645 /// // (1, 4), (2, 4), (3, 4), (4, 4)
646 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
647 /// # assert_eq!(stream.next().await.unwrap(), w);
648 /// # }
649 /// # }));
650 /// # }
651 /// ```
652 pub fn cross_singleton<O2>(
653 self,
654 other: impl Into<Optional<O2, L, Bounded>>,
655 ) -> Stream<(T, O2), L, B, O, R>
656 where
657 O2: Clone,
658 {
659 let other: Optional<O2, L, Bounded> = other.into();
660 check_matching_location(&self.location, &other.location);
661
662 Stream::new(
663 self.location.clone(),
664 HydroNode::CrossSingleton {
665 left: Box::new(self.ir_node.into_inner()),
666 right: Box::new(other.ir_node.into_inner()),
667 metadata: self
668 .location
669 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
670 },
671 )
672 }
673
674 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
675 ///
676 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
677 /// leader of a cluster.
678 ///
679 /// # Example
680 /// ```rust
681 /// # #[cfg(feature = "deploy")] {
682 /// # use hydro_lang::prelude::*;
683 /// # use futures::StreamExt;
684 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
685 /// let tick = process.tick();
686 /// // ticks are lazy by default, forces the second tick to run
687 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
688 ///
689 /// let batch_first_tick = process
690 /// .source_iter(q!(vec![1, 2, 3, 4]))
691 /// .batch(&tick, nondet!(/** test */));
692 /// let batch_second_tick = process
693 /// .source_iter(q!(vec![5, 6, 7, 8]))
694 /// .batch(&tick, nondet!(/** test */))
695 /// .defer_tick(); // appears on the second tick
696 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
697 /// batch_first_tick.chain(batch_second_tick)
698 /// .filter_if_some(some_on_first_tick)
699 /// .all_ticks()
700 /// # }, |mut stream| async move {
701 /// // [1, 2, 3, 4]
702 /// # for w in vec![1, 2, 3, 4] {
703 /// # assert_eq!(stream.next().await.unwrap(), w);
704 /// # }
705 /// # }));
706 /// # }
707 /// ```
708 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
709 self.cross_singleton(signal.map(q!(|_u| ())))
710 .map(q!(|(d, _signal)| d))
711 }
712
713 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
714 ///
715 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
716 /// some local state.
717 ///
718 /// # Example
719 /// ```rust
720 /// # #[cfg(feature = "deploy")] {
721 /// # use hydro_lang::prelude::*;
722 /// # use futures::StreamExt;
723 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
724 /// let tick = process.tick();
725 /// // ticks are lazy by default, forces the second tick to run
726 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
727 ///
728 /// let batch_first_tick = process
729 /// .source_iter(q!(vec![1, 2, 3, 4]))
730 /// .batch(&tick, nondet!(/** test */));
731 /// let batch_second_tick = process
732 /// .source_iter(q!(vec![5, 6, 7, 8]))
733 /// .batch(&tick, nondet!(/** test */))
734 /// .defer_tick(); // appears on the second tick
735 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
736 /// batch_first_tick.chain(batch_second_tick)
737 /// .filter_if_none(some_on_first_tick)
738 /// .all_ticks()
739 /// # }, |mut stream| async move {
740 /// // [5, 6, 7, 8]
741 /// # for w in vec![5, 6, 7, 8] {
742 /// # assert_eq!(stream.next().await.unwrap(), w);
743 /// # }
744 /// # }));
745 /// # }
746 /// ```
747 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
748 self.filter_if_some(
749 other
750 .map(q!(|_| ()))
751 .into_singleton()
752 .filter(q!(|o| o.is_none())),
753 )
754 }
755
756 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
757 /// tupled pairs in a non-deterministic order.
758 ///
759 /// # Example
760 /// ```rust
761 /// # #[cfg(feature = "deploy")] {
762 /// # use hydro_lang::prelude::*;
763 /// # use std::collections::HashSet;
764 /// # use futures::StreamExt;
765 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
766 /// let tick = process.tick();
767 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
768 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
769 /// stream1.cross_product(stream2)
770 /// # }, |mut stream| async move {
771 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
772 /// # stream.map(|i| assert!(expected.contains(&i)));
773 /// # }));
774 /// # }
775 /// ```
776 pub fn cross_product<T2, O2: Ordering>(
777 self,
778 other: Stream<T2, L, B, O2, R>,
779 ) -> Stream<(T, T2), L, B, NoOrder, R>
780 where
781 T: Clone,
782 T2: Clone,
783 {
784 check_matching_location(&self.location, &other.location);
785
786 Stream::new(
787 self.location.clone(),
788 HydroNode::CrossProduct {
789 left: Box::new(self.ir_node.into_inner()),
790 right: Box::new(other.ir_node.into_inner()),
791 metadata: self
792 .location
793 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
794 },
795 )
796 }
797
798 /// Takes one stream as input and filters out any duplicate occurrences. The output
799 /// contains all unique values from the input.
800 ///
801 /// # Example
802 /// ```rust
803 /// # #[cfg(feature = "deploy")] {
804 /// # use hydro_lang::prelude::*;
805 /// # use futures::StreamExt;
806 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
807 /// let tick = process.tick();
808 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
809 /// # }, |mut stream| async move {
810 /// # for w in vec![1, 2, 3, 4] {
811 /// # assert_eq!(stream.next().await.unwrap(), w);
812 /// # }
813 /// # }));
814 /// # }
815 /// ```
816 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
817 where
818 T: Eq + Hash,
819 {
820 Stream::new(
821 self.location.clone(),
822 HydroNode::Unique {
823 input: Box::new(self.ir_node.into_inner()),
824 metadata: self
825 .location
826 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
827 },
828 )
829 }
830
831 /// Outputs everything in this stream that is *not* contained in the `other` stream.
832 ///
833 /// The `other` stream must be [`Bounded`], since this function will wait until
834 /// all its elements are available before producing any output.
835 /// # Example
836 /// ```rust
837 /// # #[cfg(feature = "deploy")] {
838 /// # use hydro_lang::prelude::*;
839 /// # use futures::StreamExt;
840 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
841 /// let tick = process.tick();
842 /// let stream = process
843 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
844 /// .batch(&tick, nondet!(/** test */));
845 /// let batch = process
846 /// .source_iter(q!(vec![1, 2]))
847 /// .batch(&tick, nondet!(/** test */));
848 /// stream.filter_not_in(batch).all_ticks()
849 /// # }, |mut stream| async move {
850 /// # for w in vec![3, 4] {
851 /// # assert_eq!(stream.next().await.unwrap(), w);
852 /// # }
853 /// # }));
854 /// # }
855 /// ```
856 pub fn filter_not_in<O2: Ordering>(self, other: Stream<T, L, Bounded, O2, R>) -> Self
857 where
858 T: Eq + Hash,
859 {
860 check_matching_location(&self.location, &other.location);
861
862 Stream::new(
863 self.location.clone(),
864 HydroNode::Difference {
865 pos: Box::new(self.ir_node.into_inner()),
866 neg: Box::new(other.ir_node.into_inner()),
867 metadata: self
868 .location
869 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
870 },
871 )
872 }
873
874 /// An operator which allows you to "inspect" each element of a stream without
875 /// modifying it. The closure `f` is called on a reference to each item. This is
876 /// mainly useful for debugging, and should not be used to generate side-effects.
877 ///
878 /// # Example
879 /// ```rust
880 /// # #[cfg(feature = "deploy")] {
881 /// # use hydro_lang::prelude::*;
882 /// # use futures::StreamExt;
883 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884 /// let nums = process.source_iter(q!(vec![1, 2]));
885 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
886 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
887 /// # }, |mut stream| async move {
888 /// # for w in vec![1, 2] {
889 /// # assert_eq!(stream.next().await.unwrap(), w);
890 /// # }
891 /// # }));
892 /// # }
893 /// ```
894 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
895 where
896 F: Fn(&T) + 'a,
897 {
898 let f = f.splice_fn1_borrow_ctx(&self.location).into();
899
900 Stream::new(
901 self.location.clone(),
902 HydroNode::Inspect {
903 f,
904 input: Box::new(self.ir_node.into_inner()),
905 metadata: self.location.new_node_metadata(Self::collection_kind()),
906 },
907 )
908 }
909
910 /// An operator which allows you to "name" a `HydroNode`.
911 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
912 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
913 {
914 let mut node = self.ir_node.borrow_mut();
915 let metadata = node.metadata_mut();
916 metadata.tag = Some(name.to_owned());
917 }
918 self
919 }
920
921 /// Explicitly "casts" the stream to a type with a different ordering
922 /// guarantee. Useful in unsafe code where the ordering cannot be proven
923 /// by the type-system.
924 ///
925 /// # Non-Determinism
926 /// This function is used as an escape hatch, and any mistakes in the
927 /// provided ordering guarantee will propagate into the guarantees
928 /// for the rest of the program.
929 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
930 if O::ORDERING_KIND == O2::ORDERING_KIND {
931 Stream::new(self.location, self.ir_node.into_inner())
932 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
933 // We can always weaken the ordering guarantee
934 Stream::new(
935 self.location.clone(),
936 HydroNode::Cast {
937 inner: Box::new(self.ir_node.into_inner()),
938 metadata: self
939 .location
940 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
941 },
942 )
943 } else {
944 Stream::new(
945 self.location.clone(),
946 HydroNode::ObserveNonDet {
947 inner: Box::new(self.ir_node.into_inner()),
948 trusted: false,
949 metadata: self
950 .location
951 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
952 },
953 )
954 }
955 }
956
957 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
958 // intermediate states will not be revealed
959 fn assume_ordering_trusted_bounded<O2: Ordering>(
960 self,
961 nondet: NonDet,
962 ) -> Stream<T, L, B, O2, R> {
963 if B::BOUNDED {
964 self.assume_ordering_trusted(nondet)
965 } else {
966 self.assume_ordering(nondet)
967 }
968 }
969
970 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
971 // is not observable
972 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
973 self,
974 _nondet: NonDet,
975 ) -> Stream<T, L, B, O2, R> {
976 if O::ORDERING_KIND == O2::ORDERING_KIND {
977 Stream::new(self.location, self.ir_node.into_inner())
978 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
979 // We can always weaken the ordering guarantee
980 Stream::new(
981 self.location.clone(),
982 HydroNode::Cast {
983 inner: Box::new(self.ir_node.into_inner()),
984 metadata: self
985 .location
986 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
987 },
988 )
989 } else {
990 Stream::new(
991 self.location.clone(),
992 HydroNode::ObserveNonDet {
993 inner: Box::new(self.ir_node.into_inner()),
994 trusted: true,
995 metadata: self
996 .location
997 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
998 },
999 )
1000 }
1001 }
1002
1003 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1004 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1005 /// which is always safe because that is the weakest possible guarantee.
1006 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1007 self.weaken_ordering::<NoOrder>()
1008 }
1009
1010 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1011 /// enforcing that `O2` is weaker than the input ordering guarantee.
1012 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1013 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1014 self.assume_ordering::<O2>(nondet)
1015 }
1016
1017 /// Explicitly "casts" the stream to a type with a different retries
1018 /// guarantee. Useful in unsafe code where the lack of retries cannot
1019 /// be proven by the type-system.
1020 ///
1021 /// # Non-Determinism
1022 /// This function is used as an escape hatch, and any mistakes in the
1023 /// provided retries guarantee will propagate into the guarantees
1024 /// for the rest of the program.
1025 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1026 if R::RETRIES_KIND == R2::RETRIES_KIND {
1027 Stream::new(self.location, self.ir_node.into_inner())
1028 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1029 // We can always weaken the retries guarantee
1030 Stream::new(
1031 self.location.clone(),
1032 HydroNode::Cast {
1033 inner: Box::new(self.ir_node.into_inner()),
1034 metadata: self
1035 .location
1036 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1037 },
1038 )
1039 } else {
1040 Stream::new(
1041 self.location.clone(),
1042 HydroNode::ObserveNonDet {
1043 inner: Box::new(self.ir_node.into_inner()),
1044 trusted: false,
1045 metadata: self
1046 .location
1047 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1048 },
1049 )
1050 }
1051 }
1052
1053 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1054 // is not observable
1055 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1056 if R::RETRIES_KIND == R2::RETRIES_KIND {
1057 Stream::new(self.location, self.ir_node.into_inner())
1058 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1059 // We can always weaken the retries guarantee
1060 Stream::new(
1061 self.location.clone(),
1062 HydroNode::Cast {
1063 inner: Box::new(self.ir_node.into_inner()),
1064 metadata: self
1065 .location
1066 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1067 },
1068 )
1069 } else {
1070 Stream::new(
1071 self.location.clone(),
1072 HydroNode::ObserveNonDet {
1073 inner: Box::new(self.ir_node.into_inner()),
1074 trusted: true,
1075 metadata: self
1076 .location
1077 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1078 },
1079 )
1080 }
1081 }
1082
1083 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1084 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1085 /// which is always safe because that is the weakest possible guarantee.
1086 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1087 self.weaken_retries::<AtLeastOnce>()
1088 }
1089
1090 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1091 /// enforcing that `R2` is weaker than the input retries guarantee.
1092 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1093 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1094 self.assume_retries::<R2>(nondet)
1095 }
1096}
1097
1098impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1099where
1100 L: Location<'a>,
1101{
1102 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1103 ///
1104 /// # Example
1105 /// ```rust
1106 /// # #[cfg(feature = "deploy")] {
1107 /// # use hydro_lang::prelude::*;
1108 /// # use futures::StreamExt;
1109 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1110 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1111 /// # }, |mut stream| async move {
1112 /// // 1, 2, 3
1113 /// # for w in vec![1, 2, 3] {
1114 /// # assert_eq!(stream.next().await.unwrap(), w);
1115 /// # }
1116 /// # }));
1117 /// # }
1118 /// ```
1119 pub fn cloned(self) -> Stream<T, L, B, O, R>
1120 where
1121 T: Clone,
1122 {
1123 self.map(q!(|d| d.clone()))
1124 }
1125}
1126
1127impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1128where
1129 L: Location<'a>,
1130{
1131 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1132 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1133 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1134 ///
1135 /// Depending on the input stream guarantees, the closure may need to be commutative
1136 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1137 ///
1138 /// # Example
1139 /// ```rust
1140 /// # #[cfg(feature = "deploy")] {
1141 /// # use hydro_lang::prelude::*;
1142 /// # use futures::StreamExt;
1143 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1144 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1145 /// words
1146 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1147 /// .into_stream()
1148 /// # }, |mut stream| async move {
1149 /// // "HELLOWORLD"
1150 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1151 /// # }));
1152 /// # }
1153 /// ```
1154 pub fn fold<A, I, F, C, Idemp>(
1155 self,
1156 init: impl IntoQuotedMut<'a, I, L>,
1157 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1158 ) -> Singleton<A, L, B>
1159 where
1160 I: Fn() -> A + 'a,
1161 F: Fn(&mut A, T),
1162 C: ValidCommutativityFor<O>,
1163 Idemp: ValidIdempotenceFor<R>,
1164 {
1165 let init = init.splice_fn0_ctx(&self.location).into();
1166 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1167 proof.register_proof(&comb);
1168
1169 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1170 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1171
1172 let core = HydroNode::Fold {
1173 init,
1174 acc: comb.into(),
1175 input: Box::new(ordered_etc.ir_node.into_inner()),
1176 metadata: ordered_etc
1177 .location
1178 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1179 };
1180
1181 Singleton::new(ordered_etc.location, core)
1182 }
1183
1184 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1185 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1186 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1187 /// reference, so that it can be modified in place.
1188 ///
1189 /// Depending on the input stream guarantees, the closure may need to be commutative
1190 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1191 ///
1192 /// # Example
1193 /// ```rust
1194 /// # #[cfg(feature = "deploy")] {
1195 /// # use hydro_lang::prelude::*;
1196 /// # use futures::StreamExt;
1197 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1198 /// let bools = process.source_iter(q!(vec![false, true, false]));
1199 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1200 /// # }, |mut stream| async move {
1201 /// // true
1202 /// # assert_eq!(stream.next().await.unwrap(), true);
1203 /// # }));
1204 /// # }
1205 /// ```
1206 pub fn reduce<F, C, Idemp>(
1207 self,
1208 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1209 ) -> Optional<T, L, B>
1210 where
1211 F: Fn(&mut T, T) + 'a,
1212 C: ValidCommutativityFor<O>,
1213 Idemp: ValidIdempotenceFor<R>,
1214 {
1215 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1216 proof.register_proof(&f);
1217
1218 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1219 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1220
1221 let core = HydroNode::Reduce {
1222 f: f.into(),
1223 input: Box::new(ordered_etc.ir_node.into_inner()),
1224 metadata: ordered_etc
1225 .location
1226 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1227 };
1228
1229 Optional::new(ordered_etc.location, core)
1230 }
1231
1232 /// Computes the maximum element in the stream as an [`Optional`], which
1233 /// will be empty until the first element in the input arrives.
1234 ///
1235 /// # Example
1236 /// ```rust
1237 /// # #[cfg(feature = "deploy")] {
1238 /// # use hydro_lang::prelude::*;
1239 /// # use futures::StreamExt;
1240 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1241 /// let tick = process.tick();
1242 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1243 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1244 /// batch.max().all_ticks()
1245 /// # }, |mut stream| async move {
1246 /// // 4
1247 /// # assert_eq!(stream.next().await.unwrap(), 4);
1248 /// # }));
1249 /// # }
1250 /// ```
1251 pub fn max(self) -> Optional<T, L, B>
1252 where
1253 T: Ord,
1254 {
1255 self.assume_retries_trusted(nondet!(/** max is idempotent */))
1256 .assume_ordering_trusted_bounded(
1257 nondet!(/** max is commutative, but order affects intermediates */),
1258 )
1259 .reduce(q!(|curr, new| {
1260 if new > *curr {
1261 *curr = new;
1262 }
1263 }))
1264 }
1265
1266 /// Computes the minimum element in the stream as an [`Optional`], which
1267 /// will be empty until the first element in the input arrives.
1268 ///
1269 /// # Example
1270 /// ```rust
1271 /// # #[cfg(feature = "deploy")] {
1272 /// # use hydro_lang::prelude::*;
1273 /// # use futures::StreamExt;
1274 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1275 /// let tick = process.tick();
1276 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1277 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1278 /// batch.min().all_ticks()
1279 /// # }, |mut stream| async move {
1280 /// // 1
1281 /// # assert_eq!(stream.next().await.unwrap(), 1);
1282 /// # }));
1283 /// # }
1284 /// ```
1285 pub fn min(self) -> Optional<T, L, B>
1286 where
1287 T: Ord,
1288 {
1289 self.assume_retries_trusted(nondet!(/** min is idempotent */))
1290 .assume_ordering_trusted_bounded(
1291 nondet!(/** max is commutative, but order affects intermediates */),
1292 )
1293 .reduce(q!(|curr, new| {
1294 if new < *curr {
1295 *curr = new;
1296 }
1297 }))
1298 }
1299}
1300
1301impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1302where
1303 L: Location<'a>,
1304{
1305 /// Computes the number of elements in the stream as a [`Singleton`].
1306 ///
1307 /// # Example
1308 /// ```rust
1309 /// # #[cfg(feature = "deploy")] {
1310 /// # use hydro_lang::prelude::*;
1311 /// # use futures::StreamExt;
1312 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1313 /// let tick = process.tick();
1314 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1315 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1316 /// batch.count().all_ticks()
1317 /// # }, |mut stream| async move {
1318 /// // 4
1319 /// # assert_eq!(stream.next().await.unwrap(), 4);
1320 /// # }));
1321 /// # }
1322 /// ```
1323 pub fn count(self) -> Singleton<usize, L, B> {
1324 self.assume_ordering_trusted(nondet!(
1325 /// Order does not affect eventual count, and also does not affect intermediate states.
1326 ))
1327 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1328 }
1329}
1330
1331impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1332where
1333 L: Location<'a>,
1334{
1335 /// Computes the first element in the stream as an [`Optional`], which
1336 /// will be empty until the first element in the input arrives.
1337 ///
1338 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1339 /// re-ordering of elements may cause the first element to change.
1340 ///
1341 /// # Example
1342 /// ```rust
1343 /// # #[cfg(feature = "deploy")] {
1344 /// # use hydro_lang::prelude::*;
1345 /// # use futures::StreamExt;
1346 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1347 /// let tick = process.tick();
1348 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1349 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1350 /// batch.first().all_ticks()
1351 /// # }, |mut stream| async move {
1352 /// // 1
1353 /// # assert_eq!(stream.next().await.unwrap(), 1);
1354 /// # }));
1355 /// # }
1356 /// ```
1357 pub fn first(self) -> Optional<T, L, B> {
1358 self.assume_retries_trusted(nondet!(/** first is idempotent */))
1359 .reduce(q!(|_, _| {}))
1360 }
1361
1362 /// Computes the last element in the stream as an [`Optional`], which
1363 /// will be empty until an element in the input arrives.
1364 ///
1365 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1366 /// re-ordering of elements may cause the last element to change.
1367 ///
1368 /// # Example
1369 /// ```rust
1370 /// # #[cfg(feature = "deploy")] {
1371 /// # use hydro_lang::prelude::*;
1372 /// # use futures::StreamExt;
1373 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1374 /// let tick = process.tick();
1375 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1376 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1377 /// batch.last().all_ticks()
1378 /// # }, |mut stream| async move {
1379 /// // 4
1380 /// # assert_eq!(stream.next().await.unwrap(), 4);
1381 /// # }));
1382 /// # }
1383 /// ```
1384 pub fn last(self) -> Optional<T, L, B> {
1385 self.assume_retries_trusted(nondet!(/** last is idempotent */))
1386 .reduce(q!(|curr, new| *curr = new))
1387 }
1388}
1389
1390impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1391where
1392 L: Location<'a>,
1393{
1394 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1395 ///
1396 /// # Example
1397 /// ```rust
1398 /// # #[cfg(feature = "deploy")] {
1399 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1400 /// # use futures::StreamExt;
1401 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1402 /// let tick = process.tick();
1403 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1404 /// numbers.enumerate()
1405 /// # }, |mut stream| async move {
1406 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1407 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1408 /// # assert_eq!(stream.next().await.unwrap(), w);
1409 /// # }
1410 /// # }));
1411 /// # }
1412 /// ```
1413 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1414 Stream::new(
1415 self.location.clone(),
1416 HydroNode::Enumerate {
1417 input: Box::new(self.ir_node.into_inner()),
1418 metadata: self.location.new_node_metadata(Stream::<
1419 (usize, T),
1420 L,
1421 B,
1422 TotalOrder,
1423 ExactlyOnce,
1424 >::collection_kind()),
1425 },
1426 )
1427 }
1428
1429 /// Collects all the elements of this stream into a single [`Vec`] element.
1430 ///
1431 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1432 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1433 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1434 /// the vector at an arbitrary point in time.
1435 ///
1436 /// # Example
1437 /// ```rust
1438 /// # #[cfg(feature = "deploy")] {
1439 /// # use hydro_lang::prelude::*;
1440 /// # use futures::StreamExt;
1441 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1442 /// let tick = process.tick();
1443 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1444 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1445 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1446 /// # }, |mut stream| async move {
1447 /// // [ vec![1, 2, 3, 4] ]
1448 /// # for w in vec![vec![1, 2, 3, 4]] {
1449 /// # assert_eq!(stream.next().await.unwrap(), w);
1450 /// # }
1451 /// # }));
1452 /// # }
1453 /// ```
1454 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1455 self.fold(
1456 q!(|| vec![]),
1457 q!(|acc, v| {
1458 acc.push(v);
1459 }),
1460 )
1461 }
1462
1463 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1464 /// and emitting each intermediate result.
1465 ///
1466 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1467 /// containing all intermediate accumulated values. The scan operation can also terminate early
1468 /// by returning `None`.
1469 ///
1470 /// The function takes a mutable reference to the accumulator and the current element, and returns
1471 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1472 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1473 ///
1474 /// # Examples
1475 ///
1476 /// Basic usage - running sum:
1477 /// ```rust
1478 /// # #[cfg(feature = "deploy")] {
1479 /// # use hydro_lang::prelude::*;
1480 /// # use futures::StreamExt;
1481 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1482 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1483 /// q!(|| 0),
1484 /// q!(|acc, x| {
1485 /// *acc += x;
1486 /// Some(*acc)
1487 /// }),
1488 /// )
1489 /// # }, |mut stream| async move {
1490 /// // Output: 1, 3, 6, 10
1491 /// # for w in vec![1, 3, 6, 10] {
1492 /// # assert_eq!(stream.next().await.unwrap(), w);
1493 /// # }
1494 /// # }));
1495 /// # }
1496 /// ```
1497 ///
1498 /// Early termination example:
1499 /// ```rust
1500 /// # #[cfg(feature = "deploy")] {
1501 /// # use hydro_lang::prelude::*;
1502 /// # use futures::StreamExt;
1503 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1504 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1505 /// q!(|| 1),
1506 /// q!(|state, x| {
1507 /// *state = *state * x;
1508 /// if *state > 6 {
1509 /// None // Terminate the stream
1510 /// } else {
1511 /// Some(-*state)
1512 /// }
1513 /// }),
1514 /// )
1515 /// # }, |mut stream| async move {
1516 /// // Output: -1, -2, -6
1517 /// # for w in vec![-1, -2, -6] {
1518 /// # assert_eq!(stream.next().await.unwrap(), w);
1519 /// # }
1520 /// # }));
1521 /// # }
1522 /// ```
1523 pub fn scan<A, U, I, F>(
1524 self,
1525 init: impl IntoQuotedMut<'a, I, L>,
1526 f: impl IntoQuotedMut<'a, F, L>,
1527 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1528 where
1529 I: Fn() -> A + 'a,
1530 F: Fn(&mut A, T) -> Option<U> + 'a,
1531 {
1532 let init = init.splice_fn0_ctx(&self.location).into();
1533 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1534
1535 Stream::new(
1536 self.location.clone(),
1537 HydroNode::Scan {
1538 init,
1539 acc: f,
1540 input: Box::new(self.ir_node.into_inner()),
1541 metadata: self.location.new_node_metadata(
1542 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1543 ),
1544 },
1545 )
1546 }
1547}
1548
1549impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1550 /// Produces a new stream that interleaves the elements of the two input streams.
1551 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1552 ///
1553 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1554 /// [`Bounded`], you can use [`Stream::chain`] instead.
1555 ///
1556 /// # Example
1557 /// ```rust
1558 /// # #[cfg(feature = "deploy")] {
1559 /// # use hydro_lang::prelude::*;
1560 /// # use futures::StreamExt;
1561 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1562 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
1563 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
1564 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1565 /// # }, |mut stream| async move {
1566 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1567 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1568 /// # assert_eq!(stream.next().await.unwrap(), w);
1569 /// # }
1570 /// # }));
1571 /// # }
1572 /// ```
1573 pub fn interleave<O2: Ordering, R2: Retries>(
1574 self,
1575 other: Stream<T, L, Unbounded, O2, R2>,
1576 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1577 where
1578 R: MinRetries<R2>,
1579 {
1580 Stream::new(
1581 self.location.clone(),
1582 HydroNode::Chain {
1583 first: Box::new(self.ir_node.into_inner()),
1584 second: Box::new(other.ir_node.into_inner()),
1585 metadata: self.location.new_node_metadata(Stream::<
1586 T,
1587 L,
1588 Unbounded,
1589 NoOrder,
1590 <R as MinRetries<R2>>::Min,
1591 >::collection_kind()),
1592 },
1593 )
1594 }
1595}
1596
1597impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1598 /// Produces a new stream that combines the elements of the two input streams,
1599 /// preserving the relative order of elements within each input.
1600 ///
1601 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1602 /// [`Bounded`], you can use [`Stream::chain`] instead.
1603 ///
1604 /// # Non-Determinism
1605 /// The order in which elements *across* the two streams will be interleaved is
1606 /// non-deterministic, so the order of elements will vary across runs. If the output order
1607 /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1608 /// unordered stream.
1609 ///
1610 /// # Example
1611 /// ```rust
1612 /// # #[cfg(feature = "deploy")] {
1613 /// # use hydro_lang::prelude::*;
1614 /// # use futures::StreamExt;
1615 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1616 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
1617 /// # process.source_iter(q!(vec![1, 3])).into();
1618 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1619 /// # }, |mut stream| async move {
1620 /// // 1, 3 and 2, 4 in some order, preserving the original local order
1621 /// # for w in vec![1, 3, 2, 4] {
1622 /// # assert_eq!(stream.next().await.unwrap(), w);
1623 /// # }
1624 /// # }));
1625 /// # }
1626 /// ```
1627 pub fn merge_ordered<R2: Retries>(
1628 self,
1629 other: Stream<T, L, Unbounded, TotalOrder, R2>,
1630 _nondet: NonDet,
1631 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1632 where
1633 R: MinRetries<R2>,
1634 {
1635 Stream::new(
1636 self.location.clone(),
1637 HydroNode::Chain {
1638 first: Box::new(self.ir_node.into_inner()),
1639 second: Box::new(other.ir_node.into_inner()),
1640 metadata: self.location.new_node_metadata(Stream::<
1641 T,
1642 L,
1643 Unbounded,
1644 TotalOrder,
1645 <R as MinRetries<R2>>::Min,
1646 >::collection_kind()),
1647 },
1648 )
1649 }
1650}
1651
1652impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1653where
1654 L: Location<'a>,
1655{
1656 /// Produces a new stream that emits the input elements in sorted order.
1657 ///
1658 /// The input stream can have any ordering guarantee, but the output stream
1659 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1660 /// elements in the input stream are available, so it requires the input stream
1661 /// to be [`Bounded`].
1662 ///
1663 /// # Example
1664 /// ```rust
1665 /// # #[cfg(feature = "deploy")] {
1666 /// # use hydro_lang::prelude::*;
1667 /// # use futures::StreamExt;
1668 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1669 /// let tick = process.tick();
1670 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1671 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1672 /// batch.sort().all_ticks()
1673 /// # }, |mut stream| async move {
1674 /// // 1, 2, 3, 4
1675 /// # for w in (1..5) {
1676 /// # assert_eq!(stream.next().await.unwrap(), w);
1677 /// # }
1678 /// # }));
1679 /// # }
1680 /// ```
1681 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1682 where
1683 T: Ord,
1684 {
1685 Stream::new(
1686 self.location.clone(),
1687 HydroNode::Sort {
1688 input: Box::new(self.ir_node.into_inner()),
1689 metadata: self
1690 .location
1691 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1692 },
1693 )
1694 }
1695
1696 /// Produces a new stream that first emits the elements of the `self` stream,
1697 /// and then emits the elements of the `other` stream. The output stream has
1698 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1699 /// [`TotalOrder`] guarantee.
1700 ///
1701 /// Currently, both input streams must be [`Bounded`]. This operator will block
1702 /// on the first stream until all its elements are available. In a future version,
1703 /// we will relax the requirement on the `other` stream.
1704 ///
1705 /// # Example
1706 /// ```rust
1707 /// # #[cfg(feature = "deploy")] {
1708 /// # use hydro_lang::prelude::*;
1709 /// # use futures::StreamExt;
1710 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1711 /// let tick = process.tick();
1712 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1713 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1714 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1715 /// # }, |mut stream| async move {
1716 /// // 2, 3, 4, 5, 1, 2, 3, 4
1717 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1718 /// # assert_eq!(stream.next().await.unwrap(), w);
1719 /// # }
1720 /// # }));
1721 /// # }
1722 /// ```
1723 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
1724 self,
1725 other: Stream<T, L, B2, O2, R2>,
1726 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1727 where
1728 O: MinOrder<O2>,
1729 R: MinRetries<R2>,
1730 {
1731 check_matching_location(&self.location, &other.location);
1732
1733 Stream::new(
1734 self.location.clone(),
1735 HydroNode::Chain {
1736 first: Box::new(self.ir_node.into_inner()),
1737 second: Box::new(other.ir_node.into_inner()),
1738 metadata: self.location.new_node_metadata(Stream::<
1739 T,
1740 L,
1741 B2,
1742 <O as MinOrder<O2>>::Min,
1743 <R as MinRetries<R2>>::Min,
1744 >::collection_kind()),
1745 },
1746 )
1747 }
1748
1749 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1750 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1751 /// because this is compiled into a nested loop.
1752 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1753 self,
1754 other: Stream<T2, L, Bounded, O2, R>,
1755 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1756 where
1757 T: Clone,
1758 T2: Clone,
1759 {
1760 check_matching_location(&self.location, &other.location);
1761
1762 Stream::new(
1763 self.location.clone(),
1764 HydroNode::CrossProduct {
1765 left: Box::new(self.ir_node.into_inner()),
1766 right: Box::new(other.ir_node.into_inner()),
1767 metadata: self.location.new_node_metadata(Stream::<
1768 (T, T2),
1769 L,
1770 Bounded,
1771 <O2 as MinOrder<O>>::Min,
1772 R,
1773 >::collection_kind()),
1774 },
1775 )
1776 }
1777
1778 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1779 /// `self` used as the values for *each* key.
1780 ///
1781 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1782 /// values. For example, it can be used to send the same set of elements to several cluster
1783 /// members, if the membership information is available as a [`KeyedSingleton`].
1784 ///
1785 /// # Example
1786 /// ```rust
1787 /// # #[cfg(feature = "deploy")] {
1788 /// # use hydro_lang::prelude::*;
1789 /// # use futures::StreamExt;
1790 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1791 /// # let tick = process.tick();
1792 /// let keyed_singleton = // { 1: (), 2: () }
1793 /// # process
1794 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
1795 /// # .into_keyed()
1796 /// # .batch(&tick, nondet!(/** test */))
1797 /// # .first();
1798 /// let stream = // [ "a", "b" ]
1799 /// # process
1800 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
1801 /// # .batch(&tick, nondet!(/** test */));
1802 /// stream.repeat_with_keys(keyed_singleton)
1803 /// # .entries().all_ticks()
1804 /// # }, |mut stream| async move {
1805 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1806 /// # let mut results = Vec::new();
1807 /// # for _ in 0..4 {
1808 /// # results.push(stream.next().await.unwrap());
1809 /// # }
1810 /// # results.sort();
1811 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
1812 /// # }));
1813 /// # }
1814 /// ```
1815 pub fn repeat_with_keys<K, V2>(
1816 self,
1817 keys: KeyedSingleton<K, V2, L, Bounded>,
1818 ) -> KeyedStream<K, T, L, Bounded, O, R>
1819 where
1820 K: Clone,
1821 T: Clone,
1822 {
1823 keys.keys()
1824 .weaken_retries()
1825 .assume_ordering_trusted::<TotalOrder>(
1826 nondet!(/** keyed stream does not depend on ordering of keys */),
1827 )
1828 .cross_product_nested_loop(self)
1829 .into_keyed()
1830 }
1831}
1832
1833impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1834where
1835 L: Location<'a>,
1836{
1837 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1838 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1839 /// by equi-joining the two streams on the key attribute `K`.
1840 ///
1841 /// # Example
1842 /// ```rust
1843 /// # #[cfg(feature = "deploy")] {
1844 /// # use hydro_lang::prelude::*;
1845 /// # use std::collections::HashSet;
1846 /// # use futures::StreamExt;
1847 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1848 /// let tick = process.tick();
1849 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1850 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1851 /// stream1.join(stream2)
1852 /// # }, |mut stream| async move {
1853 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1854 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1855 /// # stream.map(|i| assert!(expected.contains(&i)));
1856 /// # }));
1857 /// # }
1858 pub fn join<V2, O2: Ordering, R2: Retries>(
1859 self,
1860 n: Stream<(K, V2), L, B, O2, R2>,
1861 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1862 where
1863 K: Eq + Hash,
1864 R: MinRetries<R2>,
1865 {
1866 check_matching_location(&self.location, &n.location);
1867
1868 Stream::new(
1869 self.location.clone(),
1870 HydroNode::Join {
1871 left: Box::new(self.ir_node.into_inner()),
1872 right: Box::new(n.ir_node.into_inner()),
1873 metadata: self.location.new_node_metadata(Stream::<
1874 (K, (V1, V2)),
1875 L,
1876 B,
1877 NoOrder,
1878 <R as MinRetries<R2>>::Min,
1879 >::collection_kind()),
1880 },
1881 )
1882 }
1883
1884 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1885 /// computes the anti-join of the items in the input -- i.e. returns
1886 /// unique items in the first input that do not have a matching key
1887 /// in the second input.
1888 ///
1889 /// # Example
1890 /// ```rust
1891 /// # #[cfg(feature = "deploy")] {
1892 /// # use hydro_lang::prelude::*;
1893 /// # use futures::StreamExt;
1894 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1895 /// let tick = process.tick();
1896 /// let stream = process
1897 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1898 /// .batch(&tick, nondet!(/** test */));
1899 /// let batch = process
1900 /// .source_iter(q!(vec![1, 2]))
1901 /// .batch(&tick, nondet!(/** test */));
1902 /// stream.anti_join(batch).all_ticks()
1903 /// # }, |mut stream| async move {
1904 /// # for w in vec![(3, 'c'), (4, 'd')] {
1905 /// # assert_eq!(stream.next().await.unwrap(), w);
1906 /// # }
1907 /// # }));
1908 /// # }
1909 pub fn anti_join<O2: Ordering, R2: Retries>(
1910 self,
1911 n: Stream<K, L, Bounded, O2, R2>,
1912 ) -> Stream<(K, V1), L, B, O, R>
1913 where
1914 K: Eq + Hash,
1915 {
1916 check_matching_location(&self.location, &n.location);
1917
1918 Stream::new(
1919 self.location.clone(),
1920 HydroNode::AntiJoin {
1921 pos: Box::new(self.ir_node.into_inner()),
1922 neg: Box::new(n.ir_node.into_inner()),
1923 metadata: self
1924 .location
1925 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
1926 },
1927 )
1928 }
1929}
1930
1931impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1932 Stream<(K, V), L, B, O, R>
1933{
1934 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1935 /// is used as the key and the second element is added to the entries associated with that key.
1936 ///
1937 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1938 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1939 /// performing grouped aggregations, but also for more precise ordering guarantees such as
1940 /// total ordering _within_ each group but no ordering _across_ groups.
1941 ///
1942 /// # Example
1943 /// ```rust
1944 /// # #[cfg(feature = "deploy")] {
1945 /// # use hydro_lang::prelude::*;
1946 /// # use futures::StreamExt;
1947 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1948 /// process
1949 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1950 /// .into_keyed()
1951 /// # .entries()
1952 /// # }, |mut stream| async move {
1953 /// // { 1: [2, 3], 2: [4] }
1954 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1955 /// # assert_eq!(stream.next().await.unwrap(), w);
1956 /// # }
1957 /// # }));
1958 /// # }
1959 /// ```
1960 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1961 KeyedStream::new(
1962 self.location.clone(),
1963 HydroNode::Cast {
1964 inner: Box::new(self.ir_node.into_inner()),
1965 metadata: self
1966 .location
1967 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
1968 },
1969 )
1970 }
1971}
1972
1973impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
1974where
1975 K: Eq + Hash,
1976 L: Location<'a>,
1977{
1978 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1979 /// # Example
1980 /// ```rust
1981 /// # #[cfg(feature = "deploy")] {
1982 /// # use hydro_lang::prelude::*;
1983 /// # use futures::StreamExt;
1984 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1985 /// let tick = process.tick();
1986 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1987 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1988 /// batch.keys().all_ticks()
1989 /// # }, |mut stream| async move {
1990 /// // 1, 2
1991 /// # assert_eq!(stream.next().await.unwrap(), 1);
1992 /// # assert_eq!(stream.next().await.unwrap(), 2);
1993 /// # }));
1994 /// # }
1995 /// ```
1996 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
1997 self.into_keyed()
1998 .fold(
1999 q!(|| ()),
2000 q!(
2001 |_, _| {},
2002 commutative = ManualProof(/* values are ignored */),
2003 idempotent = ManualProof(/* values are ignored */)
2004 ),
2005 )
2006 .keys()
2007 }
2008}
2009
2010impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2011where
2012 L: Location<'a> + NoTick,
2013{
2014 /// Returns a stream corresponding to the latest batch of elements being atomically
2015 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2016 /// the order of the input.
2017 ///
2018 /// # Non-Determinism
2019 /// The batch boundaries are non-deterministic and may change across executions.
2020 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2021 Stream::new(
2022 self.location.clone().tick,
2023 HydroNode::Batch {
2024 inner: Box::new(self.ir_node.into_inner()),
2025 metadata: self
2026 .location
2027 .tick
2028 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2029 },
2030 )
2031 }
2032
2033 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2034 /// See [`Stream::atomic`] for more details.
2035 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2036 Stream::new(
2037 self.location.tick.l.clone(),
2038 HydroNode::EndAtomic {
2039 inner: Box::new(self.ir_node.into_inner()),
2040 metadata: self
2041 .location
2042 .tick
2043 .l
2044 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2045 },
2046 )
2047 }
2048}
2049
2050impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2051where
2052 L: Location<'a>,
2053{
2054 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2055 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2056 ///
2057 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2058 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2059 /// argument that declares where the stream will be atomically processed. Batching a stream into
2060 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2061 /// [`Tick`] will introduce asynchrony.
2062 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2063 let out_location = Atomic { tick: tick.clone() };
2064 Stream::new(
2065 out_location.clone(),
2066 HydroNode::BeginAtomic {
2067 inner: Box::new(self.ir_node.into_inner()),
2068 metadata: out_location
2069 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2070 },
2071 )
2072 }
2073
2074 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2075 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2076 /// the order of the input. The output stream will execute in the [`Tick`] that was
2077 /// used to create the atomic section.
2078 ///
2079 /// # Non-Determinism
2080 /// The batch boundaries are non-deterministic and may change across executions.
2081 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2082 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2083 Stream::new(
2084 tick.clone(),
2085 HydroNode::Batch {
2086 inner: Box::new(self.ir_node.into_inner()),
2087 metadata: tick
2088 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2089 },
2090 )
2091 }
2092
2093 /// Given a time interval, returns a stream corresponding to samples taken from the
2094 /// stream roughly at that interval. The output will have elements in the same order
2095 /// as the input, but with arbitrary elements skipped between samples. There is also
2096 /// no guarantee on the exact timing of the samples.
2097 ///
2098 /// # Non-Determinism
2099 /// The output stream is non-deterministic in which elements are sampled, since this
2100 /// is controlled by a clock.
2101 pub fn sample_every(
2102 self,
2103 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2104 nondet: NonDet,
2105 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2106 where
2107 L: NoTick + NoAtomic,
2108 {
2109 let samples = self.location.source_interval(interval, nondet);
2110
2111 let tick = self.location.tick();
2112 self.batch(&tick, nondet)
2113 .filter_if_some(samples.batch(&tick, nondet).first())
2114 .all_ticks()
2115 .weaken_retries()
2116 }
2117
2118 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2119 /// stream has not emitted a value since that duration.
2120 ///
2121 /// # Non-Determinism
2122 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2123 /// samples take place, timeouts may be non-deterministically generated or missed,
2124 /// and the notification of the timeout may be delayed as well. There is also no
2125 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2126 /// detected based on when the next sample is taken.
2127 pub fn timeout(
2128 self,
2129 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2130 nondet: NonDet,
2131 ) -> Optional<(), L, Unbounded>
2132 where
2133 L: NoTick + NoAtomic,
2134 {
2135 let tick = self.location.tick();
2136
2137 let latest_received = self.assume_retries(nondet).fold(
2138 q!(|| None),
2139 q!(
2140 |latest, _| {
2141 *latest = Some(Instant::now());
2142 },
2143 commutative = ManualProof(/* TODO */)
2144 ),
2145 );
2146
2147 latest_received
2148 .snapshot(&tick, nondet)
2149 .filter_map(q!(move |latest_received| {
2150 if let Some(latest_received) = latest_received {
2151 if Instant::now().duration_since(latest_received) > duration {
2152 Some(())
2153 } else {
2154 None
2155 }
2156 } else {
2157 Some(())
2158 }
2159 }))
2160 .latest()
2161 }
2162}
2163
2164impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2165where
2166 L: Location<'a> + NoTick + NoAtomic,
2167 F: Future<Output = T>,
2168{
2169 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2170 /// Future outputs are produced as available, regardless of input arrival order.
2171 ///
2172 /// # Example
2173 /// ```rust
2174 /// # #[cfg(feature = "deploy")] {
2175 /// # use std::collections::HashSet;
2176 /// # use futures::StreamExt;
2177 /// # use hydro_lang::prelude::*;
2178 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2179 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2180 /// .map(q!(|x| async move {
2181 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2182 /// x
2183 /// }))
2184 /// .resolve_futures()
2185 /// # },
2186 /// # |mut stream| async move {
2187 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2188 /// # let mut output = HashSet::new();
2189 /// # for _ in 1..10 {
2190 /// # output.insert(stream.next().await.unwrap());
2191 /// # }
2192 /// # assert_eq!(
2193 /// # output,
2194 /// # HashSet::<i32>::from_iter(1..10)
2195 /// # );
2196 /// # },
2197 /// # ));
2198 /// # }
2199 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2200 Stream::new(
2201 self.location.clone(),
2202 HydroNode::ResolveFutures {
2203 input: Box::new(self.ir_node.into_inner()),
2204 metadata: self
2205 .location
2206 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2207 },
2208 )
2209 }
2210
2211 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2212 /// Future outputs are produced in the same order as the input stream.
2213 ///
2214 /// # Example
2215 /// ```rust
2216 /// # #[cfg(feature = "deploy")] {
2217 /// # use std::collections::HashSet;
2218 /// # use futures::StreamExt;
2219 /// # use hydro_lang::prelude::*;
2220 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2221 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2222 /// .map(q!(|x| async move {
2223 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2224 /// x
2225 /// }))
2226 /// .resolve_futures_ordered()
2227 /// # },
2228 /// # |mut stream| async move {
2229 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2230 /// # let mut output = Vec::new();
2231 /// # for _ in 1..10 {
2232 /// # output.push(stream.next().await.unwrap());
2233 /// # }
2234 /// # assert_eq!(
2235 /// # output,
2236 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2237 /// # );
2238 /// # },
2239 /// # ));
2240 /// # }
2241 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2242 Stream::new(
2243 self.location.clone(),
2244 HydroNode::ResolveFuturesOrdered {
2245 input: Box::new(self.ir_node.into_inner()),
2246 metadata: self
2247 .location
2248 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2249 },
2250 )
2251 }
2252}
2253
2254impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2255where
2256 L: Location<'a> + NoTick,
2257{
2258 /// Executes the provided closure for every element in this stream.
2259 ///
2260 /// Because the closure may have side effects, the stream must have deterministic order
2261 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2262 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2263 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2264 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2265 let f = f.splice_fn1_ctx(&self.location).into();
2266 self.location
2267 .flow_state()
2268 .borrow_mut()
2269 .push_root(HydroRoot::ForEach {
2270 input: Box::new(self.ir_node.into_inner()),
2271 f,
2272 op_metadata: HydroIrOpMetadata::new(),
2273 });
2274 }
2275
2276 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2277 /// TCP socket to some other server. You should _not_ use this API for interacting with
2278 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2279 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2280 /// interaction with asynchronous sinks.
2281 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2282 where
2283 S: 'a + futures::Sink<T> + Unpin,
2284 {
2285 self.location
2286 .flow_state()
2287 .borrow_mut()
2288 .push_root(HydroRoot::DestSink {
2289 sink: sink.splice_typed_ctx(&self.location).into(),
2290 input: Box::new(self.ir_node.into_inner()),
2291 op_metadata: HydroIrOpMetadata::new(),
2292 });
2293 }
2294}
2295
2296impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2297where
2298 L: Location<'a>,
2299{
2300 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2301 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2302 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2303 Stream::new(
2304 self.location.outer().clone(),
2305 HydroNode::YieldConcat {
2306 inner: Box::new(self.ir_node.into_inner()),
2307 metadata: self
2308 .location
2309 .outer()
2310 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2311 },
2312 )
2313 }
2314
2315 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2316 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2317 ///
2318 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2319 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2320 /// stream's [`Tick`] context.
2321 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2322 let out_location = Atomic {
2323 tick: self.location.clone(),
2324 };
2325
2326 Stream::new(
2327 out_location.clone(),
2328 HydroNode::YieldConcat {
2329 inner: Box::new(self.ir_node.into_inner()),
2330 metadata: out_location
2331 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2332 },
2333 )
2334 }
2335
2336 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2337 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2338 /// input.
2339 ///
2340 /// This API is particularly useful for stateful computation on batches of data, such as
2341 /// maintaining an accumulated state that is up to date with the current batch.
2342 ///
2343 /// # Example
2344 /// ```rust
2345 /// # #[cfg(feature = "deploy")] {
2346 /// # use hydro_lang::prelude::*;
2347 /// # use futures::StreamExt;
2348 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2349 /// let tick = process.tick();
2350 /// # // ticks are lazy by default, forces the second tick to run
2351 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2352 /// # let batch_first_tick = process
2353 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2354 /// # .batch(&tick, nondet!(/** test */));
2355 /// # let batch_second_tick = process
2356 /// # .source_iter(q!(vec![5, 6, 7]))
2357 /// # .batch(&tick, nondet!(/** test */))
2358 /// # .defer_tick(); // appears on the second tick
2359 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2360 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2361 ///
2362 /// input.batch(&tick, nondet!(/** test */))
2363 /// .across_ticks(|s| s.count()).all_ticks()
2364 /// # }, |mut stream| async move {
2365 /// // [4, 7]
2366 /// assert_eq!(stream.next().await.unwrap(), 4);
2367 /// assert_eq!(stream.next().await.unwrap(), 7);
2368 /// # }));
2369 /// # }
2370 /// ```
2371 pub fn across_ticks<Out: BatchAtomic>(
2372 self,
2373 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2374 ) -> Out::Batched {
2375 thunk(self.all_ticks_atomic()).batched_atomic()
2376 }
2377
2378 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2379 /// always has the elements of `self` at tick `T - 1`.
2380 ///
2381 /// At tick `0`, the output stream is empty, since there is no previous tick.
2382 ///
2383 /// This operator enables stateful iterative processing with ticks, by sending data from one
2384 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2385 ///
2386 /// # Example
2387 /// ```rust
2388 /// # #[cfg(feature = "deploy")] {
2389 /// # use hydro_lang::prelude::*;
2390 /// # use futures::StreamExt;
2391 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2392 /// let tick = process.tick();
2393 /// // ticks are lazy by default, forces the second tick to run
2394 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2395 ///
2396 /// let batch_first_tick = process
2397 /// .source_iter(q!(vec![1, 2, 3, 4]))
2398 /// .batch(&tick, nondet!(/** test */));
2399 /// let batch_second_tick = process
2400 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2401 /// .batch(&tick, nondet!(/** test */))
2402 /// .defer_tick(); // appears on the second tick
2403 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2404 ///
2405 /// changes_across_ticks.clone().filter_not_in(
2406 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2407 /// ).all_ticks()
2408 /// # }, |mut stream| async move {
2409 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2410 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2411 /// # assert_eq!(stream.next().await.unwrap(), w);
2412 /// # }
2413 /// # }));
2414 /// # }
2415 /// ```
2416 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2417 Stream::new(
2418 self.location.clone(),
2419 HydroNode::DeferTick {
2420 input: Box::new(self.ir_node.into_inner()),
2421 metadata: self
2422 .location
2423 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2424 },
2425 )
2426 }
2427}
2428
2429#[cfg(test)]
2430mod tests {
2431 #[cfg(feature = "deploy")]
2432 use futures::{SinkExt, StreamExt};
2433 #[cfg(feature = "deploy")]
2434 use hydro_deploy::Deployment;
2435 #[cfg(feature = "deploy")]
2436 use serde::{Deserialize, Serialize};
2437 #[cfg(any(feature = "deploy", feature = "sim"))]
2438 use stageleft::q;
2439
2440 #[cfg(any(feature = "deploy", feature = "sim"))]
2441 use crate::compile::builder::FlowBuilder;
2442 #[cfg(feature = "deploy")]
2443 use crate::live_collections::sliced::sliced;
2444 #[cfg(feature = "deploy")]
2445 use crate::live_collections::stream::ExactlyOnce;
2446 #[cfg(feature = "sim")]
2447 use crate::live_collections::stream::NoOrder;
2448 #[cfg(any(feature = "deploy", feature = "sim"))]
2449 use crate::live_collections::stream::TotalOrder;
2450 #[cfg(any(feature = "deploy", feature = "sim"))]
2451 use crate::location::Location;
2452 #[cfg(any(feature = "deploy", feature = "sim"))]
2453 use crate::nondet::nondet;
2454
2455 mod backtrace_chained_ops;
2456
2457 #[cfg(feature = "deploy")]
2458 struct P1 {}
2459 #[cfg(feature = "deploy")]
2460 struct P2 {}
2461
2462 #[cfg(feature = "deploy")]
2463 #[derive(Serialize, Deserialize, Debug)]
2464 struct SendOverNetwork {
2465 n: u32,
2466 }
2467
2468 #[cfg(feature = "deploy")]
2469 #[tokio::test]
2470 async fn first_ten_distributed() {
2471 use crate::networking::TCP;
2472
2473 let mut deployment = Deployment::new();
2474
2475 let mut flow = FlowBuilder::new();
2476 let first_node = flow.process::<P1>();
2477 let second_node = flow.process::<P2>();
2478 let external = flow.external::<P2>();
2479
2480 let numbers = first_node.source_iter(q!(0..10));
2481 let out_port = numbers
2482 .map(q!(|n| SendOverNetwork { n }))
2483 .send(&second_node, TCP.bincode())
2484 .send_bincode_external(&external);
2485
2486 let nodes = flow
2487 .with_process(&first_node, deployment.Localhost())
2488 .with_process(&second_node, deployment.Localhost())
2489 .with_external(&external, deployment.Localhost())
2490 .deploy(&mut deployment);
2491
2492 deployment.deploy().await.unwrap();
2493
2494 let mut external_out = nodes.connect(out_port).await;
2495
2496 deployment.start().await.unwrap();
2497
2498 for i in 0..10 {
2499 assert_eq!(external_out.next().await.unwrap().n, i);
2500 }
2501 }
2502
2503 #[cfg(feature = "deploy")]
2504 #[tokio::test]
2505 async fn first_cardinality() {
2506 let mut deployment = Deployment::new();
2507
2508 let mut flow = FlowBuilder::new();
2509 let node = flow.process::<()>();
2510 let external = flow.external::<()>();
2511
2512 let node_tick = node.tick();
2513 let count = node_tick
2514 .singleton(q!([1, 2, 3]))
2515 .into_stream()
2516 .flatten_ordered()
2517 .first()
2518 .into_stream()
2519 .count()
2520 .all_ticks()
2521 .send_bincode_external(&external);
2522
2523 let nodes = flow
2524 .with_process(&node, deployment.Localhost())
2525 .with_external(&external, deployment.Localhost())
2526 .deploy(&mut deployment);
2527
2528 deployment.deploy().await.unwrap();
2529
2530 let mut external_out = nodes.connect(count).await;
2531
2532 deployment.start().await.unwrap();
2533
2534 assert_eq!(external_out.next().await.unwrap(), 1);
2535 }
2536
2537 #[cfg(feature = "deploy")]
2538 #[tokio::test]
2539 async fn unbounded_reduce_remembers_state() {
2540 let mut deployment = Deployment::new();
2541
2542 let mut flow = FlowBuilder::new();
2543 let node = flow.process::<()>();
2544 let external = flow.external::<()>();
2545
2546 let (input_port, input) = node.source_external_bincode(&external);
2547 let out = input
2548 .reduce(q!(|acc, v| *acc += v))
2549 .sample_eager(nondet!(/** test */))
2550 .send_bincode_external(&external);
2551
2552 let nodes = flow
2553 .with_process(&node, deployment.Localhost())
2554 .with_external(&external, deployment.Localhost())
2555 .deploy(&mut deployment);
2556
2557 deployment.deploy().await.unwrap();
2558
2559 let mut external_in = nodes.connect(input_port).await;
2560 let mut external_out = nodes.connect(out).await;
2561
2562 deployment.start().await.unwrap();
2563
2564 external_in.send(1).await.unwrap();
2565 assert_eq!(external_out.next().await.unwrap(), 1);
2566
2567 external_in.send(2).await.unwrap();
2568 assert_eq!(external_out.next().await.unwrap(), 3);
2569 }
2570
2571 #[cfg(feature = "deploy")]
2572 #[tokio::test]
2573 async fn top_level_bounded_cross_singleton() {
2574 let mut deployment = Deployment::new();
2575
2576 let mut flow = FlowBuilder::new();
2577 let node = flow.process::<()>();
2578 let external = flow.external::<()>();
2579
2580 let (input_port, input) =
2581 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2582
2583 let out = input
2584 .cross_singleton(
2585 node.source_iter(q!(vec![1, 2, 3]))
2586 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2587 )
2588 .send_bincode_external(&external);
2589
2590 let nodes = flow
2591 .with_process(&node, deployment.Localhost())
2592 .with_external(&external, deployment.Localhost())
2593 .deploy(&mut deployment);
2594
2595 deployment.deploy().await.unwrap();
2596
2597 let mut external_in = nodes.connect(input_port).await;
2598 let mut external_out = nodes.connect(out).await;
2599
2600 deployment.start().await.unwrap();
2601
2602 external_in.send(1).await.unwrap();
2603 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2604
2605 external_in.send(2).await.unwrap();
2606 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2607 }
2608
2609 #[cfg(feature = "deploy")]
2610 #[tokio::test]
2611 async fn top_level_bounded_reduce_cardinality() {
2612 let mut deployment = Deployment::new();
2613
2614 let mut flow = FlowBuilder::new();
2615 let node = flow.process::<()>();
2616 let external = flow.external::<()>();
2617
2618 let (input_port, input) =
2619 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2620
2621 let out = sliced! {
2622 let input = use(input, nondet!(/** test */));
2623 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2624 input.cross_singleton(v.into_stream().count())
2625 }
2626 .send_bincode_external(&external);
2627
2628 let nodes = flow
2629 .with_process(&node, deployment.Localhost())
2630 .with_external(&external, deployment.Localhost())
2631 .deploy(&mut deployment);
2632
2633 deployment.deploy().await.unwrap();
2634
2635 let mut external_in = nodes.connect(input_port).await;
2636 let mut external_out = nodes.connect(out).await;
2637
2638 deployment.start().await.unwrap();
2639
2640 external_in.send(1).await.unwrap();
2641 assert_eq!(external_out.next().await.unwrap(), (1, 1));
2642
2643 external_in.send(2).await.unwrap();
2644 assert_eq!(external_out.next().await.unwrap(), (2, 1));
2645 }
2646
2647 #[cfg(feature = "deploy")]
2648 #[tokio::test]
2649 async fn top_level_bounded_into_singleton_cardinality() {
2650 let mut deployment = Deployment::new();
2651
2652 let mut flow = FlowBuilder::new();
2653 let node = flow.process::<()>();
2654 let external = flow.external::<()>();
2655
2656 let (input_port, input) =
2657 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2658
2659 let out = sliced! {
2660 let input = use(input, nondet!(/** test */));
2661 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2662 input.cross_singleton(v.into_stream().count())
2663 }
2664 .send_bincode_external(&external);
2665
2666 let nodes = flow
2667 .with_process(&node, deployment.Localhost())
2668 .with_external(&external, deployment.Localhost())
2669 .deploy(&mut deployment);
2670
2671 deployment.deploy().await.unwrap();
2672
2673 let mut external_in = nodes.connect(input_port).await;
2674 let mut external_out = nodes.connect(out).await;
2675
2676 deployment.start().await.unwrap();
2677
2678 external_in.send(1).await.unwrap();
2679 assert_eq!(external_out.next().await.unwrap(), (1, 1));
2680
2681 external_in.send(2).await.unwrap();
2682 assert_eq!(external_out.next().await.unwrap(), (2, 1));
2683 }
2684
2685 #[cfg(feature = "deploy")]
2686 #[tokio::test]
2687 async fn atomic_fold_replays_each_tick() {
2688 let mut deployment = Deployment::new();
2689
2690 let mut flow = FlowBuilder::new();
2691 let node = flow.process::<()>();
2692 let external = flow.external::<()>();
2693
2694 let (input_port, input) =
2695 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2696 let tick = node.tick();
2697
2698 let out = input
2699 .batch(&tick, nondet!(/** test */))
2700 .cross_singleton(
2701 node.source_iter(q!(vec![1, 2, 3]))
2702 .atomic(&tick)
2703 .fold(q!(|| 0), q!(|acc, v| *acc += v))
2704 .snapshot_atomic(nondet!(/** test */)),
2705 )
2706 .all_ticks()
2707 .send_bincode_external(&external);
2708
2709 let nodes = flow
2710 .with_process(&node, deployment.Localhost())
2711 .with_external(&external, deployment.Localhost())
2712 .deploy(&mut deployment);
2713
2714 deployment.deploy().await.unwrap();
2715
2716 let mut external_in = nodes.connect(input_port).await;
2717 let mut external_out = nodes.connect(out).await;
2718
2719 deployment.start().await.unwrap();
2720
2721 external_in.send(1).await.unwrap();
2722 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2723
2724 external_in.send(2).await.unwrap();
2725 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2726 }
2727
2728 #[cfg(feature = "deploy")]
2729 #[tokio::test]
2730 async fn unbounded_scan_remembers_state() {
2731 let mut deployment = Deployment::new();
2732
2733 let mut flow = FlowBuilder::new();
2734 let node = flow.process::<()>();
2735 let external = flow.external::<()>();
2736
2737 let (input_port, input) = node.source_external_bincode(&external);
2738 let out = input
2739 .scan(
2740 q!(|| 0),
2741 q!(|acc, v| {
2742 *acc += v;
2743 Some(*acc)
2744 }),
2745 )
2746 .send_bincode_external(&external);
2747
2748 let nodes = flow
2749 .with_process(&node, deployment.Localhost())
2750 .with_external(&external, deployment.Localhost())
2751 .deploy(&mut deployment);
2752
2753 deployment.deploy().await.unwrap();
2754
2755 let mut external_in = nodes.connect(input_port).await;
2756 let mut external_out = nodes.connect(out).await;
2757
2758 deployment.start().await.unwrap();
2759
2760 external_in.send(1).await.unwrap();
2761 assert_eq!(external_out.next().await.unwrap(), 1);
2762
2763 external_in.send(2).await.unwrap();
2764 assert_eq!(external_out.next().await.unwrap(), 3);
2765 }
2766
2767 #[cfg(feature = "deploy")]
2768 #[tokio::test]
2769 async fn unbounded_enumerate_remembers_state() {
2770 let mut deployment = Deployment::new();
2771
2772 let mut flow = FlowBuilder::new();
2773 let node = flow.process::<()>();
2774 let external = flow.external::<()>();
2775
2776 let (input_port, input) = node.source_external_bincode(&external);
2777 let out = input.enumerate().send_bincode_external(&external);
2778
2779 let nodes = flow
2780 .with_process(&node, deployment.Localhost())
2781 .with_external(&external, deployment.Localhost())
2782 .deploy(&mut deployment);
2783
2784 deployment.deploy().await.unwrap();
2785
2786 let mut external_in = nodes.connect(input_port).await;
2787 let mut external_out = nodes.connect(out).await;
2788
2789 deployment.start().await.unwrap();
2790
2791 external_in.send(1).await.unwrap();
2792 assert_eq!(external_out.next().await.unwrap(), (0, 1));
2793
2794 external_in.send(2).await.unwrap();
2795 assert_eq!(external_out.next().await.unwrap(), (1, 2));
2796 }
2797
2798 #[cfg(feature = "deploy")]
2799 #[tokio::test]
2800 async fn unbounded_unique_remembers_state() {
2801 let mut deployment = Deployment::new();
2802
2803 let mut flow = FlowBuilder::new();
2804 let node = flow.process::<()>();
2805 let external = flow.external::<()>();
2806
2807 let (input_port, input) =
2808 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2809 let out = input.unique().send_bincode_external(&external);
2810
2811 let nodes = flow
2812 .with_process(&node, deployment.Localhost())
2813 .with_external(&external, deployment.Localhost())
2814 .deploy(&mut deployment);
2815
2816 deployment.deploy().await.unwrap();
2817
2818 let mut external_in = nodes.connect(input_port).await;
2819 let mut external_out = nodes.connect(out).await;
2820
2821 deployment.start().await.unwrap();
2822
2823 external_in.send(1).await.unwrap();
2824 assert_eq!(external_out.next().await.unwrap(), 1);
2825
2826 external_in.send(2).await.unwrap();
2827 assert_eq!(external_out.next().await.unwrap(), 2);
2828
2829 external_in.send(1).await.unwrap();
2830 external_in.send(3).await.unwrap();
2831 assert_eq!(external_out.next().await.unwrap(), 3);
2832 }
2833
2834 #[cfg(feature = "sim")]
2835 #[test]
2836 #[should_panic]
2837 fn sim_batch_nondet_size() {
2838 let mut flow = FlowBuilder::new();
2839 let node = flow.process::<()>();
2840
2841 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
2842
2843 let tick = node.tick();
2844 let out_recv = input
2845 .batch(&tick, nondet!(/** test */))
2846 .count()
2847 .all_ticks()
2848 .sim_output();
2849
2850 flow.sim().exhaustive(async || {
2851 in_send.send(());
2852 in_send.send(());
2853 in_send.send(());
2854
2855 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
2856 });
2857 }
2858
2859 #[cfg(feature = "sim")]
2860 #[test]
2861 fn sim_batch_preserves_order() {
2862 let mut flow = FlowBuilder::new();
2863 let node = flow.process::<()>();
2864
2865 let (in_send, input) = node.sim_input();
2866
2867 let tick = node.tick();
2868 let out_recv = input
2869 .batch(&tick, nondet!(/** test */))
2870 .all_ticks()
2871 .sim_output();
2872
2873 flow.sim().exhaustive(async || {
2874 in_send.send(1);
2875 in_send.send(2);
2876 in_send.send(3);
2877
2878 out_recv.assert_yields_only([1, 2, 3]).await;
2879 });
2880 }
2881
2882 #[cfg(feature = "sim")]
2883 #[test]
2884 #[should_panic]
2885 fn sim_batch_unordered_shuffles() {
2886 let mut flow = FlowBuilder::new();
2887 let node = flow.process::<()>();
2888
2889 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2890
2891 let tick = node.tick();
2892 let batch = input.batch(&tick, nondet!(/** test */));
2893 let out_recv = batch
2894 .clone()
2895 .min()
2896 .zip(batch.max())
2897 .all_ticks()
2898 .sim_output();
2899
2900 flow.sim().exhaustive(async || {
2901 in_send.send_many_unordered([1, 2, 3]);
2902
2903 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
2904 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
2905 }
2906 });
2907 }
2908
2909 #[cfg(feature = "sim")]
2910 #[test]
2911 fn sim_batch_unordered_shuffles_count() {
2912 let mut flow = FlowBuilder::new();
2913 let node = flow.process::<()>();
2914
2915 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2916
2917 let tick = node.tick();
2918 let batch = input.batch(&tick, nondet!(/** test */));
2919 let out_recv = batch.all_ticks().sim_output();
2920
2921 let instance_count = flow.sim().exhaustive(async || {
2922 in_send.send_many_unordered([1, 2, 3, 4]);
2923 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
2924 });
2925
2926 assert_eq!(
2927 instance_count,
2928 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
2929 )
2930 }
2931
2932 #[cfg(feature = "sim")]
2933 #[test]
2934 #[should_panic]
2935 fn sim_observe_order_batched() {
2936 let mut flow = FlowBuilder::new();
2937 let node = flow.process::<()>();
2938
2939 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2940
2941 let tick = node.tick();
2942 let batch = input.batch(&tick, nondet!(/** test */));
2943 let out_recv = batch
2944 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2945 .all_ticks()
2946 .sim_output();
2947
2948 flow.sim().exhaustive(async || {
2949 in_send.send_many_unordered([1, 2, 3, 4]);
2950 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
2951 });
2952 }
2953
2954 #[cfg(feature = "sim")]
2955 #[test]
2956 fn sim_observe_order_batched_count() {
2957 let mut flow = FlowBuilder::new();
2958 let node = flow.process::<()>();
2959
2960 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2961
2962 let tick = node.tick();
2963 let batch = input.batch(&tick, nondet!(/** test */));
2964 let out_recv = batch
2965 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2966 .all_ticks()
2967 .sim_output();
2968
2969 let instance_count = flow.sim().exhaustive(async || {
2970 in_send.send_many_unordered([1, 2, 3, 4]);
2971 let _ = out_recv.collect::<Vec<_>>().await;
2972 });
2973
2974 assert_eq!(
2975 instance_count,
2976 192 // 4! * 2^{4 - 1}
2977 )
2978 }
2979
2980 #[cfg(feature = "sim")]
2981 #[test]
2982 fn sim_unordered_count_instance_count() {
2983 let mut flow = FlowBuilder::new();
2984 let node = flow.process::<()>();
2985
2986 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2987
2988 let tick = node.tick();
2989 let out_recv = input
2990 .count()
2991 .snapshot(&tick, nondet!(/** test */))
2992 .all_ticks()
2993 .sim_output();
2994
2995 let instance_count = flow.sim().exhaustive(async || {
2996 in_send.send_many_unordered([1, 2, 3, 4]);
2997 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
2998 });
2999
3000 assert_eq!(
3001 instance_count,
3002 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3003 )
3004 }
3005
3006 #[cfg(feature = "sim")]
3007 #[test]
3008 fn sim_top_level_assume_ordering() {
3009 let mut flow = FlowBuilder::new();
3010 let node = flow.process::<()>();
3011
3012 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3013
3014 let out_recv = input
3015 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3016 .sim_output();
3017
3018 let instance_count = flow.sim().exhaustive(async || {
3019 in_send.send_many_unordered([1, 2, 3]);
3020 let mut out = out_recv.collect::<Vec<_>>().await;
3021 out.sort();
3022 assert_eq!(out, vec![1, 2, 3]);
3023 });
3024
3025 assert_eq!(instance_count, 6)
3026 }
3027
3028 #[cfg(feature = "sim")]
3029 #[test]
3030 fn sim_top_level_assume_ordering_cycle_back() {
3031 let mut flow = FlowBuilder::new();
3032 let node = flow.process::<()>();
3033
3034 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3035
3036 let (complete_cycle_back, cycle_back) =
3037 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3038 let ordered = input
3039 .interleave(cycle_back)
3040 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3041 complete_cycle_back.complete(
3042 ordered
3043 .clone()
3044 .map(q!(|v| v + 1))
3045 .filter(q!(|v| v % 2 == 1)),
3046 );
3047
3048 let out_recv = ordered.sim_output();
3049
3050 let mut saw = false;
3051 let instance_count = flow.sim().exhaustive(async || {
3052 in_send.send_many_unordered([0, 2]);
3053 let out = out_recv.collect::<Vec<_>>().await;
3054
3055 if out.starts_with(&[0, 1, 2]) {
3056 saw = true;
3057 }
3058 });
3059
3060 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3061 assert_eq!(instance_count, 6)
3062 }
3063
3064 #[cfg(feature = "sim")]
3065 #[test]
3066 fn sim_top_level_assume_ordering_cycle_back_tick() {
3067 let mut flow = FlowBuilder::new();
3068 let node = flow.process::<()>();
3069
3070 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3071
3072 let (complete_cycle_back, cycle_back) =
3073 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3074 let ordered = input
3075 .interleave(cycle_back)
3076 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3077 complete_cycle_back.complete(
3078 ordered
3079 .clone()
3080 .batch(&node.tick(), nondet!(/** test */))
3081 .all_ticks()
3082 .map(q!(|v| v + 1))
3083 .filter(q!(|v| v % 2 == 1)),
3084 );
3085
3086 let out_recv = ordered.sim_output();
3087
3088 let mut saw = false;
3089 let instance_count = flow.sim().exhaustive(async || {
3090 in_send.send_many_unordered([0, 2]);
3091 let out = out_recv.collect::<Vec<_>>().await;
3092
3093 if out.starts_with(&[0, 1, 2]) {
3094 saw = true;
3095 }
3096 });
3097
3098 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3099 assert_eq!(instance_count, 58)
3100 }
3101
3102 #[cfg(feature = "sim")]
3103 #[test]
3104 fn sim_top_level_assume_ordering_multiple() {
3105 let mut flow = FlowBuilder::new();
3106 let node = flow.process::<()>();
3107
3108 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3109 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3110
3111 let (complete_cycle_back, cycle_back) =
3112 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3113 let input1_ordered = input
3114 .clone()
3115 .interleave(cycle_back)
3116 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3117 let foo = input1_ordered
3118 .clone()
3119 .map(q!(|v| v + 3))
3120 .weaken_ordering::<NoOrder>()
3121 .interleave(input2)
3122 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3123
3124 complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3125
3126 let out_recv = input1_ordered.sim_output();
3127
3128 let mut saw = false;
3129 let instance_count = flow.sim().exhaustive(async || {
3130 in_send.send_many_unordered([0, 1]);
3131 let out = out_recv.collect::<Vec<_>>().await;
3132
3133 if out.starts_with(&[0, 3, 1]) {
3134 saw = true;
3135 }
3136 });
3137
3138 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3139 assert_eq!(instance_count, 24)
3140 }
3141
3142 #[cfg(feature = "sim")]
3143 #[test]
3144 fn sim_atomic_assume_ordering_cycle_back() {
3145 let mut flow = FlowBuilder::new();
3146 let node = flow.process::<()>();
3147
3148 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3149
3150 let (complete_cycle_back, cycle_back) =
3151 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3152 let ordered = input
3153 .interleave(cycle_back)
3154 .atomic(&node.tick())
3155 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3156 .end_atomic();
3157 complete_cycle_back.complete(
3158 ordered
3159 .clone()
3160 .map(q!(|v| v + 1))
3161 .filter(q!(|v| v % 2 == 1)),
3162 );
3163
3164 let out_recv = ordered.sim_output();
3165
3166 let instance_count = flow.sim().exhaustive(async || {
3167 in_send.send_many_unordered([0, 2]);
3168 let out = out_recv.collect::<Vec<_>>().await;
3169 assert_eq!(out.len(), 4);
3170 });
3171
3172 assert_eq!(instance_count, 22)
3173 }
3174}