hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
19};
20#[cfg(stageleft_runtime)]
21use crate::forward_handle::{CycleCollection, ReceiverComplete};
22use crate::forward_handle::{ForwardRef, TickCycle};
23use crate::live_collections::stream::{Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::DeferTick;
27use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
28use crate::manual_expr::ManualExpr;
29use crate::nondet::{NonDet, nondet};
30use crate::properties::ManualProof;
31
32/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
33///
34/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
35/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
36/// indicates that entries may be added over time, but once an entry is added it will never be
37/// removed and its value will never change.
38pub trait KeyedSingletonBound {
39 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
40 type UnderlyingBound: Boundedness;
41 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
42 type ValueBound: Boundedness;
43
44 /// The type of the keyed singleton if the value for each key is immutable.
45 type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
46
47 /// The type of the keyed singleton if the value for each key may change asynchronously.
48 type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
49
50 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
51 fn bound_kind() -> KeyedSingletonBoundKind;
52}
53
54impl KeyedSingletonBound for Unbounded {
55 type UnderlyingBound = Unbounded;
56 type ValueBound = Unbounded;
57 type WithBoundedValue = BoundedValue;
58 type WithUnboundedValue = Unbounded;
59
60 fn bound_kind() -> KeyedSingletonBoundKind {
61 KeyedSingletonBoundKind::Unbounded
62 }
63}
64
65impl KeyedSingletonBound for Bounded {
66 type UnderlyingBound = Bounded;
67 type ValueBound = Bounded;
68 type WithBoundedValue = Bounded;
69 type WithUnboundedValue = UnreachableBound;
70
71 fn bound_kind() -> KeyedSingletonBoundKind {
72 KeyedSingletonBoundKind::Bounded
73 }
74}
75
76/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
77/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
78/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
79pub struct BoundedValue;
80
81impl KeyedSingletonBound for BoundedValue {
82 type UnderlyingBound = Unbounded;
83 type ValueBound = Bounded;
84 type WithBoundedValue = BoundedValue;
85 type WithUnboundedValue = Unbounded;
86
87 fn bound_kind() -> KeyedSingletonBoundKind {
88 KeyedSingletonBoundKind::BoundedValue
89 }
90}
91
92#[doc(hidden)]
93pub struct UnreachableBound;
94
95impl KeyedSingletonBound for UnreachableBound {
96 type UnderlyingBound = Bounded;
97 type ValueBound = Unbounded;
98
99 type WithBoundedValue = Bounded;
100 type WithUnboundedValue = UnreachableBound;
101
102 fn bound_kind() -> KeyedSingletonBoundKind {
103 unreachable!("UnreachableBound cannot be instantiated")
104 }
105}
106
107/// Mapping from keys of type `K` to values of type `V`.
108///
109/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
110/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
111/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
112/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
113/// keys cannot be removed and the value for each key is immutable.
114///
115/// Type Parameters:
116/// - `K`: the type of the key for each entry
117/// - `V`: the type of the value for each entry
118/// - `Loc`: the [`Location`] where the keyed singleton is materialized
119/// - `Bound`: tracks whether the entries are:
120/// - [`Bounded`] (local and finite)
121/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
122/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
123pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
124 pub(crate) location: Loc,
125 pub(crate) ir_node: RefCell<HydroNode>,
126
127 _phantom: PhantomData<(K, V, Loc, Bound)>,
128}
129
130impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
131 for KeyedSingleton<K, V, Loc, Bound>
132{
133 fn clone(&self) -> Self {
134 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
135 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
136 *self.ir_node.borrow_mut() = HydroNode::Tee {
137 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
138 metadata: self.location.new_node_metadata(Self::collection_kind()),
139 };
140 }
141
142 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
143 KeyedSingleton {
144 location: self.location.clone(),
145 ir_node: HydroNode::Tee {
146 inner: TeeNode(inner.0.clone()),
147 metadata: metadata.clone(),
148 }
149 .into(),
150 _phantom: PhantomData,
151 }
152 } else {
153 unreachable!()
154 }
155 }
156}
157
158impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
159 for KeyedSingleton<K, V, L, B>
160where
161 L: Location<'a> + NoTick,
162{
163 type Location = L;
164
165 fn create_source(ident: syn::Ident, location: L) -> Self {
166 KeyedSingleton {
167 location: location.clone(),
168 ir_node: RefCell::new(HydroNode::CycleSource {
169 ident,
170 metadata: location.new_node_metadata(Self::collection_kind()),
171 }),
172 _phantom: PhantomData,
173 }
174 }
175}
176
177impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
178where
179 L: Location<'a>,
180{
181 type Location = Tick<L>;
182
183 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
184 KeyedSingleton::new(
185 location.clone(),
186 HydroNode::CycleSource {
187 ident,
188 metadata: location.new_node_metadata(Self::collection_kind()),
189 },
190 )
191 }
192}
193
194impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
195where
196 L: Location<'a>,
197{
198 fn defer_tick(self) -> Self {
199 KeyedSingleton::defer_tick(self)
200 }
201}
202
203impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
204 for KeyedSingleton<K, V, L, B>
205where
206 L: Location<'a> + NoTick,
207{
208 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
209 assert_eq!(
210 Location::id(&self.location),
211 expected_location,
212 "locations do not match"
213 );
214 self.location
215 .flow_state()
216 .borrow_mut()
217 .push_root(HydroRoot::CycleSink {
218 ident,
219 input: Box::new(self.ir_node.into_inner()),
220 op_metadata: HydroIrOpMetadata::new(),
221 });
222 }
223}
224
225impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
226where
227 L: Location<'a>,
228{
229 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
230 assert_eq!(
231 Location::id(&self.location),
232 expected_location,
233 "locations do not match"
234 );
235 self.location
236 .flow_state()
237 .borrow_mut()
238 .push_root(HydroRoot::CycleSink {
239 ident,
240 input: Box::new(self.ir_node.into_inner()),
241 op_metadata: HydroIrOpMetadata::new(),
242 });
243 }
244}
245
246impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
247 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
248 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
249 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
250
251 KeyedSingleton {
252 location,
253 ir_node: RefCell::new(ir_node),
254 _phantom: PhantomData,
255 }
256 }
257
258 /// Returns the [`Location`] where this keyed singleton is being materialized.
259 pub fn location(&self) -> &L {
260 &self.location
261 }
262}
263
264#[cfg(stageleft_runtime)]
265fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
266 me: KeyedSingleton<K, V, L, Bounded>,
267) -> Singleton<usize, L, Bounded> {
268 me.entries().count()
269}
270
271#[cfg(stageleft_runtime)]
272fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
273 me: KeyedSingleton<K, V, L, Bounded>,
274) -> Singleton<HashMap<K, V>, L, Bounded>
275where
276 K: Eq + Hash,
277{
278 me.entries()
279 .assume_ordering(nondet!(
280 /// Because this is a keyed singleton, there is only one value per key.
281 ))
282 .fold(
283 q!(|| HashMap::new()),
284 q!(|map, (k, v)| {
285 map.insert(k, v);
286 }),
287 )
288}
289
290impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
291 pub(crate) fn collection_kind() -> CollectionKind {
292 CollectionKind::KeyedSingleton {
293 bound: B::bound_kind(),
294 key_type: stageleft::quote_type::<K>().into(),
295 value_type: stageleft::quote_type::<V>().into(),
296 }
297 }
298
299 /// Transforms each value by invoking `f` on each element, with keys staying the same
300 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
301 ///
302 /// If you do not want to modify the stream and instead only want to view
303 /// each item use [`KeyedSingleton::inspect`] instead.
304 ///
305 /// # Example
306 /// ```rust
307 /// # #[cfg(feature = "deploy")] {
308 /// # use hydro_lang::prelude::*;
309 /// # use futures::StreamExt;
310 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
311 /// let keyed_singleton = // { 1: 2, 2: 4 }
312 /// # process
313 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
314 /// # .into_keyed()
315 /// # .first();
316 /// keyed_singleton.map(q!(|v| v + 1))
317 /// # .entries()
318 /// # }, |mut stream| async move {
319 /// // { 1: 3, 2: 5 }
320 /// # let mut results = Vec::new();
321 /// # for _ in 0..2 {
322 /// # results.push(stream.next().await.unwrap());
323 /// # }
324 /// # results.sort();
325 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
326 /// # }));
327 /// # }
328 /// ```
329 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
330 where
331 F: Fn(V) -> U + 'a,
332 {
333 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
334 let map_f = q!({
335 let orig = f;
336 move |(k, v)| (k, orig(v))
337 })
338 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
339 .into();
340
341 KeyedSingleton::new(
342 self.location.clone(),
343 HydroNode::Map {
344 f: map_f,
345 input: Box::new(self.ir_node.into_inner()),
346 metadata: self
347 .location
348 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
349 },
350 )
351 }
352
353 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
354 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
355 ///
356 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
357 /// the new value `U`. The key remains unchanged in the output.
358 ///
359 /// # Example
360 /// ```rust
361 /// # #[cfg(feature = "deploy")] {
362 /// # use hydro_lang::prelude::*;
363 /// # use futures::StreamExt;
364 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
365 /// let keyed_singleton = // { 1: 2, 2: 4 }
366 /// # process
367 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
368 /// # .into_keyed()
369 /// # .first();
370 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
371 /// # .entries()
372 /// # }, |mut stream| async move {
373 /// // { 1: 3, 2: 6 }
374 /// # let mut results = Vec::new();
375 /// # for _ in 0..2 {
376 /// # results.push(stream.next().await.unwrap());
377 /// # }
378 /// # results.sort();
379 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
380 /// # }));
381 /// # }
382 /// ```
383 pub fn map_with_key<U, F>(
384 self,
385 f: impl IntoQuotedMut<'a, F, L> + Copy,
386 ) -> KeyedSingleton<K, U, L, B>
387 where
388 F: Fn((K, V)) -> U + 'a,
389 K: Clone,
390 {
391 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
392 let map_f = q!({
393 let orig = f;
394 move |(k, v)| {
395 let out = orig((Clone::clone(&k), v));
396 (k, out)
397 }
398 })
399 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
400 .into();
401
402 KeyedSingleton::new(
403 self.location.clone(),
404 HydroNode::Map {
405 f: map_f,
406 input: Box::new(self.ir_node.into_inner()),
407 metadata: self
408 .location
409 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
410 },
411 )
412 }
413
414 /// Gets the number of keys in the keyed singleton.
415 ///
416 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
417 /// since keys may be added / removed over time. When the set of keys changes, the count will
418 /// be asynchronously updated.
419 ///
420 /// # Example
421 /// ```rust
422 /// # #[cfg(feature = "deploy")] {
423 /// # use hydro_lang::prelude::*;
424 /// # use futures::StreamExt;
425 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
426 /// # let tick = process.tick();
427 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
428 /// # process
429 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
430 /// # .into_keyed()
431 /// # .batch(&tick, nondet!(/** test */))
432 /// # .first();
433 /// keyed_singleton.key_count()
434 /// # .all_ticks()
435 /// # }, |mut stream| async move {
436 /// // 3
437 /// # assert_eq!(stream.next().await.unwrap(), 3);
438 /// # }));
439 /// # }
440 /// ```
441 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
442 if B::ValueBound::BOUNDED {
443 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
444 location: self.location,
445 ir_node: self.ir_node,
446 _phantom: PhantomData,
447 };
448
449 me.entries().count()
450 } else if L::is_top_level()
451 && let Some(tick) = self.location.try_tick()
452 {
453 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
454 location: self.location,
455 ir_node: self.ir_node,
456 _phantom: PhantomData,
457 };
458
459 let out =
460 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
461 .latest();
462 Singleton::new(out.location, out.ir_node.into_inner())
463 } else {
464 panic!("Unbounded KeyedSingleton inside a tick");
465 }
466 }
467
468 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
469 ///
470 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
471 /// asynchronously as well.
472 ///
473 /// # Example
474 /// ```rust
475 /// # #[cfg(feature = "deploy")] {
476 /// # use hydro_lang::prelude::*;
477 /// # use futures::StreamExt;
478 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
479 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
480 /// # process
481 /// # .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
482 /// # .into_keyed()
483 /// # .batch(&process.tick(), nondet!(/** test */))
484 /// # .first();
485 /// keyed_singleton.into_singleton()
486 /// # .all_ticks()
487 /// # }, |mut stream| async move {
488 /// // { 1: "a", 2: "b", 3: "c" }
489 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
490 /// # }));
491 /// # }
492 /// ```
493 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
494 where
495 K: Eq + Hash,
496 {
497 if B::ValueBound::BOUNDED {
498 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
499 location: self.location,
500 ir_node: self.ir_node,
501 _phantom: PhantomData,
502 };
503
504 me.entries()
505 .assume_ordering(nondet!(
506 /// Because this is a keyed singleton, there is only one value per key.
507 ))
508 .fold(
509 q!(|| HashMap::new()),
510 q!(|map, (k, v)| {
511 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
512 map.insert(k, v);
513 }),
514 )
515 } else if L::is_top_level()
516 && let Some(tick) = self.location.try_tick()
517 {
518 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
519 location: self.location,
520 ir_node: self.ir_node,
521 _phantom: PhantomData,
522 };
523
524 let out = into_singleton_inside_tick(
525 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
526 )
527 .latest();
528 Singleton::new(out.location, out.ir_node.into_inner())
529 } else {
530 panic!("Unbounded KeyedSingleton inside a tick");
531 }
532 }
533
534 /// An operator which allows you to "name" a `HydroNode`.
535 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
536 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
537 {
538 let mut node = self.ir_node.borrow_mut();
539 let metadata = node.metadata_mut();
540 metadata.tag = Some(name.to_owned());
541 }
542 self
543 }
544}
545
546impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
547 KeyedSingleton<K, V, L, B>
548{
549 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
550 ///
551 /// The value for each key must be bounded, otherwise the resulting stream elements would be
552 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
553 /// into the output.
554 ///
555 /// # Example
556 /// ```rust
557 /// # #[cfg(feature = "deploy")] {
558 /// # use hydro_lang::prelude::*;
559 /// # use futures::StreamExt;
560 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
561 /// let keyed_singleton = // { 1: 2, 2: 4 }
562 /// # process
563 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
564 /// # .into_keyed()
565 /// # .first();
566 /// keyed_singleton.entries()
567 /// # }, |mut stream| async move {
568 /// // (1, 2), (2, 4) in any order
569 /// # let mut results = Vec::new();
570 /// # for _ in 0..2 {
571 /// # results.push(stream.next().await.unwrap());
572 /// # }
573 /// # results.sort();
574 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
575 /// # }));
576 /// # }
577 /// ```
578 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
579 self.into_keyed_stream().entries()
580 }
581
582 /// Flattens the keyed singleton into an unordered stream of just the values.
583 ///
584 /// The value for each key must be bounded, otherwise the resulting stream elements would be
585 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
586 /// into the output.
587 ///
588 /// # Example
589 /// ```rust
590 /// # #[cfg(feature = "deploy")] {
591 /// # use hydro_lang::prelude::*;
592 /// # use futures::StreamExt;
593 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
594 /// let keyed_singleton = // { 1: 2, 2: 4 }
595 /// # process
596 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
597 /// # .into_keyed()
598 /// # .first();
599 /// keyed_singleton.values()
600 /// # }, |mut stream| async move {
601 /// // 2, 4 in any order
602 /// # let mut results = Vec::new();
603 /// # for _ in 0..2 {
604 /// # results.push(stream.next().await.unwrap());
605 /// # }
606 /// # results.sort();
607 /// # assert_eq!(results, vec![2, 4]);
608 /// # }));
609 /// # }
610 /// ```
611 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
612 let map_f = q!(|(_, v)| v)
613 .splice_fn1_ctx::<(K, V), V>(&self.location)
614 .into();
615
616 Stream::new(
617 self.location.clone(),
618 HydroNode::Map {
619 f: map_f,
620 input: Box::new(self.ir_node.into_inner()),
621 metadata: self.location.new_node_metadata(Stream::<
622 V,
623 L,
624 B::UnderlyingBound,
625 NoOrder,
626 ExactlyOnce,
627 >::collection_kind()),
628 },
629 )
630 }
631
632 /// Flattens the keyed singleton into an unordered stream of just the keys.
633 ///
634 /// The value for each key must be bounded, otherwise the removal of keys would result in
635 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
636 /// into the output.
637 ///
638 /// # Example
639 /// ```rust
640 /// # #[cfg(feature = "deploy")] {
641 /// # use hydro_lang::prelude::*;
642 /// # use futures::StreamExt;
643 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
644 /// let keyed_singleton = // { 1: 2, 2: 4 }
645 /// # process
646 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
647 /// # .into_keyed()
648 /// # .first();
649 /// keyed_singleton.keys()
650 /// # }, |mut stream| async move {
651 /// // 1, 2 in any order
652 /// # let mut results = Vec::new();
653 /// # for _ in 0..2 {
654 /// # results.push(stream.next().await.unwrap());
655 /// # }
656 /// # results.sort();
657 /// # assert_eq!(results, vec![1, 2]);
658 /// # }));
659 /// # }
660 /// ```
661 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
662 self.entries().map(q!(|(k, _)| k))
663 }
664
665 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
666 /// entries whose keys are not in the provided stream.
667 ///
668 /// # Example
669 /// ```rust
670 /// # #[cfg(feature = "deploy")] {
671 /// # use hydro_lang::prelude::*;
672 /// # use futures::StreamExt;
673 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
674 /// let tick = process.tick();
675 /// let keyed_singleton = // { 1: 2, 2: 4 }
676 /// # process
677 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
678 /// # .into_keyed()
679 /// # .first()
680 /// # .batch(&tick, nondet!(/** test */));
681 /// let keys_to_remove = process
682 /// .source_iter(q!(vec![1]))
683 /// .batch(&tick, nondet!(/** test */));
684 /// keyed_singleton.filter_key_not_in(keys_to_remove)
685 /// # .entries().all_ticks()
686 /// # }, |mut stream| async move {
687 /// // { 2: 4 }
688 /// # for w in vec![(2, 4)] {
689 /// # assert_eq!(stream.next().await.unwrap(), w);
690 /// # }
691 /// # }));
692 /// # }
693 /// ```
694 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
695 self,
696 other: Stream<K, L, Bounded, O2, R2>,
697 ) -> Self
698 where
699 K: Hash + Eq,
700 {
701 check_matching_location(&self.location, &other.location);
702
703 KeyedSingleton::new(
704 self.location.clone(),
705 HydroNode::AntiJoin {
706 pos: Box::new(self.ir_node.into_inner()),
707 neg: Box::new(other.ir_node.into_inner()),
708 metadata: self.location.new_node_metadata(Self::collection_kind()),
709 },
710 )
711 }
712
713 /// An operator which allows you to "inspect" each value of a keyed singleton without
714 /// modifying it. The closure `f` is called on a reference to each value. This is
715 /// mainly useful for debugging, and should not be used to generate side-effects.
716 ///
717 /// # Example
718 /// ```rust
719 /// # #[cfg(feature = "deploy")] {
720 /// # use hydro_lang::prelude::*;
721 /// # use futures::StreamExt;
722 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
723 /// let keyed_singleton = // { 1: 2, 2: 4 }
724 /// # process
725 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
726 /// # .into_keyed()
727 /// # .first();
728 /// keyed_singleton
729 /// .inspect(q!(|v| println!("{}", v)))
730 /// # .entries()
731 /// # }, |mut stream| async move {
732 /// // { 1: 2, 2: 4 }
733 /// # for w in vec![(1, 2), (2, 4)] {
734 /// # assert_eq!(stream.next().await.unwrap(), w);
735 /// # }
736 /// # }));
737 /// # }
738 /// ```
739 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
740 where
741 F: Fn(&V) + 'a,
742 {
743 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
744 let inspect_f = q!({
745 let orig = f;
746 move |t: &(_, _)| orig(&t.1)
747 })
748 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
749 .into();
750
751 KeyedSingleton::new(
752 self.location.clone(),
753 HydroNode::Inspect {
754 f: inspect_f,
755 input: Box::new(self.ir_node.into_inner()),
756 metadata: self.location.new_node_metadata(Self::collection_kind()),
757 },
758 )
759 }
760
761 /// An operator which allows you to "inspect" each entry of a keyed singleton without
762 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
763 /// mainly useful for debugging, and should not be used to generate side-effects.
764 ///
765 /// # Example
766 /// ```rust
767 /// # #[cfg(feature = "deploy")] {
768 /// # use hydro_lang::prelude::*;
769 /// # use futures::StreamExt;
770 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
771 /// let keyed_singleton = // { 1: 2, 2: 4 }
772 /// # process
773 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
774 /// # .into_keyed()
775 /// # .first();
776 /// keyed_singleton
777 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
778 /// # .entries()
779 /// # }, |mut stream| async move {
780 /// // { 1: 2, 2: 4 }
781 /// # for w in vec![(1, 2), (2, 4)] {
782 /// # assert_eq!(stream.next().await.unwrap(), w);
783 /// # }
784 /// # }));
785 /// # }
786 /// ```
787 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
788 where
789 F: Fn(&(K, V)) + 'a,
790 {
791 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
792
793 KeyedSingleton::new(
794 self.location.clone(),
795 HydroNode::Inspect {
796 f: inspect_f,
797 input: Box::new(self.ir_node.into_inner()),
798 metadata: self.location.new_node_metadata(Self::collection_kind()),
799 },
800 )
801 }
802
803 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
804 ///
805 /// Because this method requires values to be bounded, the output [`Optional`] will only be
806 /// asynchronously updated if a new key is added that is higher than the previous max key.
807 ///
808 /// # Example
809 /// ```rust
810 /// # #[cfg(feature = "deploy")] {
811 /// # use hydro_lang::prelude::*;
812 /// # use futures::StreamExt;
813 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
814 /// let tick = process.tick();
815 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
816 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
817 /// # .into_keyed()
818 /// # .first();
819 /// keyed_singleton.get_max_key()
820 /// # .sample_eager(nondet!(/** test */))
821 /// # }, |mut stream| async move {
822 /// // (2, 456)
823 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
824 /// # }));
825 /// # }
826 /// ```
827 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
828 where
829 K: Ord,
830 {
831 self.entries()
832 .assume_ordering_trusted(nondet!(
833 /// There is only one element associated with each key, and the keys are totallly
834 /// ordered so we will produce a deterministic value. The closure technically
835 /// isn't commutative in the case where both passed entries have the same key
836 /// but different values.
837 ///
838 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
839 /// the two inputs do not have the same key.
840 ))
841 .reduce(q!(
842 move |curr, new| {
843 if new.0 > curr.0 {
844 *curr = new;
845 }
846 },
847 idempotent = ManualProof(/* repeated elements are ignored */)
848 ))
849 }
850
851 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
852 /// element, the value.
853 ///
854 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
855 ///
856 /// # Example
857 /// ```rust
858 /// # #[cfg(feature = "deploy")] {
859 /// # use hydro_lang::prelude::*;
860 /// # use futures::StreamExt;
861 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862 /// let keyed_singleton = // { 1: 2, 2: 4 }
863 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
864 /// # .into_keyed()
865 /// # .first();
866 /// keyed_singleton
867 /// .clone()
868 /// .into_keyed_stream()
869 /// .interleave(
870 /// keyed_singleton.into_keyed_stream()
871 /// )
872 /// # .entries()
873 /// # }, |mut stream| async move {
874 /// /// // { 1: [2, 2], 2: [4, 4] }
875 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
876 /// # assert_eq!(stream.next().await.unwrap(), w);
877 /// # }
878 /// # }));
879 /// # }
880 /// ```
881 pub fn into_keyed_stream(
882 self,
883 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
884 KeyedStream::new(
885 self.location.clone(),
886 HydroNode::Cast {
887 inner: Box::new(self.ir_node.into_inner()),
888 metadata: self.location.new_node_metadata(KeyedStream::<
889 K,
890 V,
891 L,
892 B::UnderlyingBound,
893 TotalOrder,
894 ExactlyOnce,
895 >::collection_kind()),
896 },
897 )
898 }
899}
900
901impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, L, Bounded> {
902 /// Gets the value associated with a specific key from the keyed singleton.
903 ///
904 /// # Example
905 /// ```rust
906 /// # #[cfg(feature = "deploy")] {
907 /// # use hydro_lang::prelude::*;
908 /// # use futures::StreamExt;
909 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
910 /// let tick = process.tick();
911 /// let keyed_data = process
912 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
913 /// .into_keyed()
914 /// .batch(&tick, nondet!(/** test */))
915 /// .first();
916 /// let key = tick.singleton(q!(1));
917 /// keyed_data.get(key).all_ticks()
918 /// # }, |mut stream| async move {
919 /// // 2
920 /// # assert_eq!(stream.next().await.unwrap(), 2);
921 /// # }));
922 /// # }
923 /// ```
924 pub fn get(self, key: Singleton<K, L, Bounded>) -> Optional<V, L, Bounded> {
925 self.into_keyed_stream()
926 .get(key)
927 .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
928 .first()
929 }
930
931 /// Emit a keyed stream containing keys shared between the keyed singleton and the
932 /// keyed stream, where each value in the output keyed stream is a tuple of
933 /// (the keyed singleton's value, the keyed stream's value).
934 ///
935 /// # Example
936 /// ```rust
937 /// # #[cfg(feature = "deploy")] {
938 /// # use hydro_lang::prelude::*;
939 /// # use futures::StreamExt;
940 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
941 /// let tick = process.tick();
942 /// let keyed_data = process
943 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
944 /// .into_keyed()
945 /// .batch(&tick, nondet!(/** test */))
946 /// .first();
947 /// let other_data = process
948 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
949 /// .into_keyed()
950 /// .batch(&tick, nondet!(/** test */));
951 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
952 /// # }, |mut stream| async move {
953 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
954 /// # let mut results = vec![];
955 /// # for _ in 0..3 {
956 /// # results.push(stream.next().await.unwrap());
957 /// # }
958 /// # results.sort();
959 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
960 /// # }));
961 /// # }
962 /// ```
963 pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2>(
964 self,
965 keyed_stream: KeyedStream<K, V2, L, Bounded, O2, R2>,
966 ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R2> {
967 self.entries()
968 .weaken_retries::<R2>()
969 .join(keyed_stream.entries())
970 .into_keyed()
971 }
972
973 /// Emit a keyed singleton containing all keys shared between two keyed singletons,
974 /// where each value in the output keyed singleton is a tuple of
975 /// (self.value, other.value).
976 ///
977 /// # Example
978 /// ```rust
979 /// # #[cfg(feature = "deploy")] {
980 /// # use hydro_lang::prelude::*;
981 /// # use futures::StreamExt;
982 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
983 /// # let tick = process.tick();
984 /// let requests = // { 1: 10, 2: 20, 3: 30 }
985 /// # process
986 /// # .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
987 /// # .into_keyed()
988 /// # .batch(&tick, nondet!(/** test */))
989 /// # .first();
990 /// let other = // { 1: 100, 2: 200, 4: 400 }
991 /// # process
992 /// # .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
993 /// # .into_keyed()
994 /// # .batch(&tick, nondet!(/** test */))
995 /// # .first();
996 /// requests.join_keyed_singleton(other)
997 /// # .entries().all_ticks()
998 /// # }, |mut stream| async move {
999 /// // { 1: (10, 100), 2: (20, 200) }
1000 /// # let mut results = vec![];
1001 /// # for _ in 0..2 {
1002 /// # results.push(stream.next().await.unwrap());
1003 /// # }
1004 /// # results.sort();
1005 /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1006 /// # }));
1007 /// # }
1008 /// ```
1009 pub fn join_keyed_singleton<V2: Clone>(
1010 self,
1011 other: KeyedSingleton<K, V2, L, Bounded>,
1012 ) -> KeyedSingleton<K, (V, V2), L, Bounded> {
1013 let result_stream = self.entries().join(other.entries()).into_keyed();
1014
1015 // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
1016 KeyedSingleton::new(
1017 result_stream.location.clone(),
1018 HydroNode::Cast {
1019 inner: Box::new(result_stream.ir_node.into_inner()),
1020 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
1021 K,
1022 (V, V2),
1023 L,
1024 Bounded,
1025 >::collection_kind(
1026 )),
1027 },
1028 )
1029 }
1030
1031 /// For each value in `self`, find the matching key in `lookup`.
1032 /// The output is a keyed singleton with the key from `self`, and a value
1033 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
1034 /// If the key is not present in `lookup`, the option will be [`None`].
1035 ///
1036 /// # Example
1037 /// ```rust
1038 /// # #[cfg(feature = "deploy")] {
1039 /// # use hydro_lang::prelude::*;
1040 /// # use futures::StreamExt;
1041 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1042 /// # let tick = process.tick();
1043 /// let requests = // { 1: 10, 2: 20 }
1044 /// # process
1045 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
1046 /// # .into_keyed()
1047 /// # .batch(&tick, nondet!(/** test */))
1048 /// # .first();
1049 /// let other_data = // { 10: 100, 11: 110 }
1050 /// # process
1051 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
1052 /// # .into_keyed()
1053 /// # .batch(&tick, nondet!(/** test */))
1054 /// # .first();
1055 /// requests.lookup_keyed_singleton(other_data)
1056 /// # .entries().all_ticks()
1057 /// # }, |mut stream| async move {
1058 /// // { 1: (10, Some(100)), 2: (20, None) }
1059 /// # let mut results = vec![];
1060 /// # for _ in 0..2 {
1061 /// # results.push(stream.next().await.unwrap());
1062 /// # }
1063 /// # results.sort();
1064 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
1065 /// # }));
1066 /// # }
1067 /// ```
1068 pub fn lookup_keyed_singleton<V2>(
1069 self,
1070 lookup: KeyedSingleton<V, V2, L, Bounded>,
1071 ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
1072 where
1073 K: Eq + Hash + Clone,
1074 V: Eq + Hash + Clone,
1075 V2: Clone,
1076 {
1077 let result_stream = self
1078 .into_keyed_stream()
1079 .lookup_keyed_stream(lookup.into_keyed_stream());
1080
1081 // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
1082 KeyedSingleton::new(
1083 result_stream.location.clone(),
1084 HydroNode::Cast {
1085 inner: Box::new(result_stream.ir_node.into_inner()),
1086 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
1087 K,
1088 (V, Option<V2>),
1089 L,
1090 Bounded,
1091 >::collection_kind(
1092 )),
1093 },
1094 )
1095 }
1096
1097 /// For each value in `self`, find the matching key in `lookup`.
1098 /// The output is a keyed stream with the key from `self`, and a value
1099 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
1100 /// If the key is not present in `lookup`, the option will be [`None`].
1101 ///
1102 /// # Example
1103 /// ```rust
1104 /// # #[cfg(feature = "deploy")] {
1105 /// # use hydro_lang::prelude::*;
1106 /// # use futures::StreamExt;
1107 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1108 /// # let tick = process.tick();
1109 /// let requests = // { 1: 10, 2: 20 }
1110 /// # process
1111 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
1112 /// # .into_keyed()
1113 /// # .batch(&tick, nondet!(/** test */))
1114 /// # .first();
1115 /// let other_data = // { 10: 100, 10: 110 }
1116 /// # process
1117 /// # .source_iter(q!(vec![(10, 100), (10, 110)]))
1118 /// # .into_keyed()
1119 /// # .batch(&tick, nondet!(/** test */));
1120 /// requests.lookup_keyed_stream(other_data)
1121 /// # .entries().all_ticks()
1122 /// # }, |mut stream| async move {
1123 /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
1124 /// # let mut results = vec![];
1125 /// # for _ in 0..3 {
1126 /// # results.push(stream.next().await.unwrap());
1127 /// # }
1128 /// # results.sort();
1129 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
1130 /// # }));
1131 /// # }
1132 /// ```
1133 pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
1134 self,
1135 lookup: KeyedStream<V, V2, L, Bounded, O, R>,
1136 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
1137 where
1138 K: Eq + Hash + Clone,
1139 V: Eq + Hash + Clone,
1140 V2: Clone,
1141 {
1142 self.entries()
1143 .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
1144 .into_keyed()
1145 .lookup_keyed_stream(lookup)
1146 }
1147}
1148
1149impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1150where
1151 L: Location<'a>,
1152{
1153 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1154 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1155 ///
1156 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1157 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1158 /// argument that declares where the keyed singleton will be atomically processed. Batching a
1159 /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1160 /// batching into a different [`Tick`] will introduce asynchrony.
1161 pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1162 let out_location = Atomic { tick: tick.clone() };
1163 KeyedSingleton::new(
1164 out_location.clone(),
1165 HydroNode::BeginAtomic {
1166 inner: Box::new(self.ir_node.into_inner()),
1167 metadata: out_location
1168 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1169 },
1170 )
1171 }
1172}
1173
1174impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1175where
1176 L: Location<'a> + NoTick,
1177{
1178 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1179 /// See [`KeyedSingleton::atomic`] for more details.
1180 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1181 KeyedSingleton::new(
1182 self.location.tick.l.clone(),
1183 HydroNode::EndAtomic {
1184 inner: Box::new(self.ir_node.into_inner()),
1185 metadata: self
1186 .location
1187 .tick
1188 .l
1189 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1190 },
1191 )
1192 }
1193}
1194
1195impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1196 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1197 /// tick `T` always has the entries of `self` at tick `T - 1`.
1198 ///
1199 /// At tick `0`, the output has no entries, since there is no previous tick.
1200 ///
1201 /// This operator enables stateful iterative processing with ticks, by sending data from one
1202 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1203 ///
1204 /// # Example
1205 /// ```rust
1206 /// # #[cfg(feature = "deploy")] {
1207 /// # use hydro_lang::prelude::*;
1208 /// # use futures::StreamExt;
1209 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1210 /// let tick = process.tick();
1211 /// # // ticks are lazy by default, forces the second tick to run
1212 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1213 /// # let batch_first_tick = process
1214 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1215 /// # .batch(&tick, nondet!(/** test */))
1216 /// # .into_keyed();
1217 /// # let batch_second_tick = process
1218 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1219 /// # .batch(&tick, nondet!(/** test */))
1220 /// # .into_keyed()
1221 /// # .defer_tick(); // appears on the second tick
1222 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1223 /// # batch_first_tick.chain(batch_second_tick).first();
1224 /// input_batch.clone().filter_key_not_in(
1225 /// input_batch.defer_tick().keys() // keys present in the previous tick
1226 /// )
1227 /// # .entries().all_ticks()
1228 /// # }, |mut stream| async move {
1229 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1230 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1231 /// # assert_eq!(stream.next().await.unwrap(), w);
1232 /// # }
1233 /// # }));
1234 /// # }
1235 /// ```
1236 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1237 KeyedSingleton::new(
1238 self.location.clone(),
1239 HydroNode::DeferTick {
1240 input: Box::new(self.ir_node.into_inner()),
1241 metadata: self
1242 .location
1243 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1244 },
1245 )
1246 }
1247}
1248
1249impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1250where
1251 L: Location<'a>,
1252{
1253 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1254 /// point in time.
1255 ///
1256 /// # Non-Determinism
1257 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1258 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1259 pub fn snapshot(
1260 self,
1261 tick: &Tick<L>,
1262 _nondet: NonDet,
1263 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1264 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1265 KeyedSingleton::new(
1266 tick.clone(),
1267 HydroNode::Batch {
1268 inner: Box::new(self.ir_node.into_inner()),
1269 metadata: tick
1270 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1271 },
1272 )
1273 }
1274}
1275
1276impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1277where
1278 L: Location<'a> + NoTick,
1279{
1280 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1281 /// state of the keyed singleton being atomically processed.
1282 ///
1283 /// # Non-Determinism
1284 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1285 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1286 pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1287 KeyedSingleton::new(
1288 self.location.clone().tick,
1289 HydroNode::Batch {
1290 inner: Box::new(self.ir_node.into_inner()),
1291 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1292 K,
1293 V,
1294 Tick<L>,
1295 Bounded,
1296 >::collection_kind(
1297 )),
1298 },
1299 )
1300 }
1301}
1302
1303impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1304where
1305 L: Location<'a>,
1306{
1307 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1308 ///
1309 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1310 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1311 /// is filtered out.
1312 ///
1313 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1314 /// not modify or take ownership of the values. If you need to modify the values while filtering
1315 /// use [`KeyedSingleton::filter_map`] instead.
1316 ///
1317 /// # Example
1318 /// ```rust
1319 /// # #[cfg(feature = "deploy")] {
1320 /// # use hydro_lang::prelude::*;
1321 /// # use futures::StreamExt;
1322 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1323 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1324 /// # process
1325 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1326 /// # .into_keyed()
1327 /// # .first();
1328 /// keyed_singleton.filter(q!(|&v| v > 1))
1329 /// # .entries()
1330 /// # }, |mut stream| async move {
1331 /// // { 1: 2, 2: 4 }
1332 /// # let mut results = Vec::new();
1333 /// # for _ in 0..2 {
1334 /// # results.push(stream.next().await.unwrap());
1335 /// # }
1336 /// # results.sort();
1337 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1338 /// # }));
1339 /// # }
1340 /// ```
1341 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1342 where
1343 F: Fn(&V) -> bool + 'a,
1344 {
1345 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1346 let filter_f = q!({
1347 let orig = f;
1348 move |t: &(_, _)| orig(&t.1)
1349 })
1350 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1351 .into();
1352
1353 KeyedSingleton::new(
1354 self.location.clone(),
1355 HydroNode::Filter {
1356 f: filter_f,
1357 input: Box::new(self.ir_node.into_inner()),
1358 metadata: self
1359 .location
1360 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1361 },
1362 )
1363 }
1364
1365 /// An operator that both filters and maps values. It yields only the key-value pairs where
1366 /// the supplied closure `f` returns `Some(value)`.
1367 ///
1368 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1369 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1370 /// If it returns `None`, the key-value pair is filtered out.
1371 ///
1372 /// # Example
1373 /// ```rust
1374 /// # #[cfg(feature = "deploy")] {
1375 /// # use hydro_lang::prelude::*;
1376 /// # use futures::StreamExt;
1377 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1378 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1379 /// # process
1380 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1381 /// # .into_keyed()
1382 /// # .first();
1383 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1384 /// # .entries()
1385 /// # }, |mut stream| async move {
1386 /// // { 1: 42, 3: 100 }
1387 /// # let mut results = Vec::new();
1388 /// # for _ in 0..2 {
1389 /// # results.push(stream.next().await.unwrap());
1390 /// # }
1391 /// # results.sort();
1392 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1393 /// # }));
1394 /// # }
1395 /// ```
1396 pub fn filter_map<F, U>(
1397 self,
1398 f: impl IntoQuotedMut<'a, F, L> + Copy,
1399 ) -> KeyedSingleton<K, U, L, B>
1400 where
1401 F: Fn(V) -> Option<U> + 'a,
1402 {
1403 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1404 let filter_map_f = q!({
1405 let orig = f;
1406 move |(k, v)| orig(v).map(|o| (k, o))
1407 })
1408 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1409 .into();
1410
1411 KeyedSingleton::new(
1412 self.location.clone(),
1413 HydroNode::FilterMap {
1414 f: filter_map_f,
1415 input: Box::new(self.ir_node.into_inner()),
1416 metadata: self
1417 .location
1418 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1419 },
1420 )
1421 }
1422
1423 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1424 /// arrived since the previous batch was released.
1425 ///
1426 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1427 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1428 ///
1429 /// # Non-Determinism
1430 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1431 /// has a non-deterministic set of key-value pairs.
1432 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1433 where
1434 L: NoTick,
1435 {
1436 self.atomic(tick).batch_atomic(nondet)
1437 }
1438}
1439
1440impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1441where
1442 L: Location<'a> + NoTick,
1443{
1444 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1445 /// atomically processed.
1446 ///
1447 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1448 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1449 ///
1450 /// # Non-Determinism
1451 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1452 /// has a non-deterministic set of key-value pairs.
1453 pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1454 let _ = nondet;
1455 KeyedSingleton::new(
1456 self.location.clone().tick,
1457 HydroNode::Batch {
1458 inner: Box::new(self.ir_node.into_inner()),
1459 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1460 K,
1461 V,
1462 Tick<L>,
1463 Bounded,
1464 >::collection_kind(
1465 )),
1466 },
1467 )
1468 }
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473 #[cfg(feature = "deploy")]
1474 use futures::{SinkExt, StreamExt};
1475 #[cfg(feature = "deploy")]
1476 use hydro_deploy::Deployment;
1477 #[cfg(any(feature = "deploy", feature = "sim"))]
1478 use stageleft::q;
1479
1480 #[cfg(any(feature = "deploy", feature = "sim"))]
1481 use crate::compile::builder::FlowBuilder;
1482 #[cfg(any(feature = "deploy", feature = "sim"))]
1483 use crate::location::Location;
1484 #[cfg(any(feature = "deploy", feature = "sim"))]
1485 use crate::nondet::nondet;
1486
1487 #[cfg(feature = "deploy")]
1488 #[tokio::test]
1489 async fn key_count_bounded_value() {
1490 let mut deployment = Deployment::new();
1491
1492 let mut flow = FlowBuilder::new();
1493 let node = flow.process::<()>();
1494 let external = flow.external::<()>();
1495
1496 let (input_port, input) = node.source_external_bincode(&external);
1497 let out = input
1498 .into_keyed()
1499 .first()
1500 .key_count()
1501 .sample_eager(nondet!(/** test */))
1502 .send_bincode_external(&external);
1503
1504 let nodes = flow
1505 .with_process(&node, deployment.Localhost())
1506 .with_external(&external, deployment.Localhost())
1507 .deploy(&mut deployment);
1508
1509 deployment.deploy().await.unwrap();
1510
1511 let mut external_in = nodes.connect(input_port).await;
1512 let mut external_out = nodes.connect(out).await;
1513
1514 deployment.start().await.unwrap();
1515
1516 assert_eq!(external_out.next().await.unwrap(), 0);
1517
1518 external_in.send((1, 1)).await.unwrap();
1519 assert_eq!(external_out.next().await.unwrap(), 1);
1520
1521 external_in.send((2, 2)).await.unwrap();
1522 assert_eq!(external_out.next().await.unwrap(), 2);
1523 }
1524
1525 #[cfg(feature = "deploy")]
1526 #[tokio::test]
1527 async fn key_count_unbounded_value() {
1528 let mut deployment = Deployment::new();
1529
1530 let mut flow = FlowBuilder::new();
1531 let node = flow.process::<()>();
1532 let external = flow.external::<()>();
1533
1534 let (input_port, input) = node.source_external_bincode(&external);
1535 let out = input
1536 .into_keyed()
1537 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1538 .key_count()
1539 .sample_eager(nondet!(/** test */))
1540 .send_bincode_external(&external);
1541
1542 let nodes = flow
1543 .with_process(&node, deployment.Localhost())
1544 .with_external(&external, deployment.Localhost())
1545 .deploy(&mut deployment);
1546
1547 deployment.deploy().await.unwrap();
1548
1549 let mut external_in = nodes.connect(input_port).await;
1550 let mut external_out = nodes.connect(out).await;
1551
1552 deployment.start().await.unwrap();
1553
1554 assert_eq!(external_out.next().await.unwrap(), 0);
1555
1556 external_in.send((1, 1)).await.unwrap();
1557 assert_eq!(external_out.next().await.unwrap(), 1);
1558
1559 external_in.send((1, 2)).await.unwrap();
1560 assert_eq!(external_out.next().await.unwrap(), 1);
1561
1562 external_in.send((2, 2)).await.unwrap();
1563 assert_eq!(external_out.next().await.unwrap(), 2);
1564
1565 external_in.send((1, 1)).await.unwrap();
1566 assert_eq!(external_out.next().await.unwrap(), 2);
1567
1568 external_in.send((3, 1)).await.unwrap();
1569 assert_eq!(external_out.next().await.unwrap(), 3);
1570 }
1571
1572 #[cfg(feature = "deploy")]
1573 #[tokio::test]
1574 async fn into_singleton_bounded_value() {
1575 let mut deployment = Deployment::new();
1576
1577 let mut flow = FlowBuilder::new();
1578 let node = flow.process::<()>();
1579 let external = flow.external::<()>();
1580
1581 let (input_port, input) = node.source_external_bincode(&external);
1582 let out = input
1583 .into_keyed()
1584 .first()
1585 .into_singleton()
1586 .sample_eager(nondet!(/** test */))
1587 .send_bincode_external(&external);
1588
1589 let nodes = flow
1590 .with_process(&node, deployment.Localhost())
1591 .with_external(&external, deployment.Localhost())
1592 .deploy(&mut deployment);
1593
1594 deployment.deploy().await.unwrap();
1595
1596 let mut external_in = nodes.connect(input_port).await;
1597 let mut external_out = nodes.connect(out).await;
1598
1599 deployment.start().await.unwrap();
1600
1601 assert_eq!(
1602 external_out.next().await.unwrap(),
1603 std::collections::HashMap::new()
1604 );
1605
1606 external_in.send((1, 1)).await.unwrap();
1607 assert_eq!(
1608 external_out.next().await.unwrap(),
1609 vec![(1, 1)].into_iter().collect()
1610 );
1611
1612 external_in.send((2, 2)).await.unwrap();
1613 assert_eq!(
1614 external_out.next().await.unwrap(),
1615 vec![(1, 1), (2, 2)].into_iter().collect()
1616 );
1617 }
1618
1619 #[cfg(feature = "deploy")]
1620 #[tokio::test]
1621 async fn into_singleton_unbounded_value() {
1622 let mut deployment = Deployment::new();
1623
1624 let mut flow = FlowBuilder::new();
1625 let node = flow.process::<()>();
1626 let external = flow.external::<()>();
1627
1628 let (input_port, input) = node.source_external_bincode(&external);
1629 let out = input
1630 .into_keyed()
1631 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1632 .into_singleton()
1633 .sample_eager(nondet!(/** test */))
1634 .send_bincode_external(&external);
1635
1636 let nodes = flow
1637 .with_process(&node, deployment.Localhost())
1638 .with_external(&external, deployment.Localhost())
1639 .deploy(&mut deployment);
1640
1641 deployment.deploy().await.unwrap();
1642
1643 let mut external_in = nodes.connect(input_port).await;
1644 let mut external_out = nodes.connect(out).await;
1645
1646 deployment.start().await.unwrap();
1647
1648 assert_eq!(
1649 external_out.next().await.unwrap(),
1650 std::collections::HashMap::new()
1651 );
1652
1653 external_in.send((1, 1)).await.unwrap();
1654 assert_eq!(
1655 external_out.next().await.unwrap(),
1656 vec![(1, 1)].into_iter().collect()
1657 );
1658
1659 external_in.send((1, 2)).await.unwrap();
1660 assert_eq!(
1661 external_out.next().await.unwrap(),
1662 vec![(1, 2)].into_iter().collect()
1663 );
1664
1665 external_in.send((2, 2)).await.unwrap();
1666 assert_eq!(
1667 external_out.next().await.unwrap(),
1668 vec![(1, 2), (2, 1)].into_iter().collect()
1669 );
1670
1671 external_in.send((1, 1)).await.unwrap();
1672 assert_eq!(
1673 external_out.next().await.unwrap(),
1674 vec![(1, 3), (2, 1)].into_iter().collect()
1675 );
1676
1677 external_in.send((3, 1)).await.unwrap();
1678 assert_eq!(
1679 external_out.next().await.unwrap(),
1680 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1681 );
1682 }
1683
1684 #[cfg(feature = "sim")]
1685 #[test]
1686 fn sim_unbounded_singleton_snapshot() {
1687 let mut flow = FlowBuilder::new();
1688 let node = flow.process::<()>();
1689
1690 let (input_port, input) = node.sim_input();
1691 let output = input
1692 .into_keyed()
1693 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1694 .snapshot(&node.tick(), nondet!(/** test */))
1695 .entries()
1696 .all_ticks()
1697 .sim_output();
1698
1699 let count = flow.sim().exhaustive(async || {
1700 input_port.send((1, 123));
1701 input_port.send((1, 456));
1702 input_port.send((2, 123));
1703
1704 let all = output.collect_sorted::<Vec<_>>().await;
1705 assert_eq!(all.last().unwrap(), &(2, 1));
1706 });
1707
1708 assert_eq!(count, 8);
1709 }
1710
1711 #[cfg(feature = "deploy")]
1712 #[tokio::test]
1713 async fn join_keyed_stream() {
1714 let mut deployment = Deployment::new();
1715
1716 let mut flow = FlowBuilder::new();
1717 let node = flow.process::<()>();
1718 let external = flow.external::<()>();
1719
1720 let tick = node.tick();
1721 let keyed_data = node
1722 .source_iter(q!(vec![(1, 10), (2, 20)]))
1723 .into_keyed()
1724 .batch(&tick, nondet!(/** test */))
1725 .first();
1726 let requests = node
1727 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1728 .into_keyed()
1729 .batch(&tick, nondet!(/** test */));
1730
1731 let out = keyed_data
1732 .join_keyed_stream(requests)
1733 .entries()
1734 .all_ticks()
1735 .send_bincode_external(&external);
1736
1737 let nodes = flow
1738 .with_process(&node, deployment.Localhost())
1739 .with_external(&external, deployment.Localhost())
1740 .deploy(&mut deployment);
1741
1742 deployment.deploy().await.unwrap();
1743
1744 let mut external_out = nodes.connect(out).await;
1745
1746 deployment.start().await.unwrap();
1747
1748 let mut results = vec![];
1749 for _ in 0..2 {
1750 results.push(external_out.next().await.unwrap());
1751 }
1752 results.sort();
1753
1754 assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1755 }
1756}