hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
19};
20#[cfg(stageleft_runtime)]
21use crate::forward_handle::{CycleCollection, ReceiverComplete};
22use crate::forward_handle::{ForwardRef, TickCycle};
23use crate::live_collections::stream::{Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::DeferTick;
27use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
28use crate::manual_expr::ManualExpr;
29use crate::nondet::{NonDet, nondet};
30
31/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
32///
33/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
34/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
35/// indicates that entries may be added over time, but once an entry is added it will never be
36/// removed and its value will never change.
37pub trait KeyedSingletonBound {
38 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
39 type UnderlyingBound: Boundedness;
40 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
41 type ValueBound: Boundedness;
42
43 /// The type of the keyed singleton if the value for each key is immutable.
44 type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
45
46 /// The type of the keyed singleton if the value for each key may change asynchronously.
47 type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
48
49 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
50 fn bound_kind() -> KeyedSingletonBoundKind;
51}
52
53impl KeyedSingletonBound for Unbounded {
54 type UnderlyingBound = Unbounded;
55 type ValueBound = Unbounded;
56 type WithBoundedValue = BoundedValue;
57 type WithUnboundedValue = Unbounded;
58
59 fn bound_kind() -> KeyedSingletonBoundKind {
60 KeyedSingletonBoundKind::Unbounded
61 }
62}
63
64impl KeyedSingletonBound for Bounded {
65 type UnderlyingBound = Bounded;
66 type ValueBound = Bounded;
67 type WithBoundedValue = Bounded;
68 type WithUnboundedValue = UnreachableBound;
69
70 fn bound_kind() -> KeyedSingletonBoundKind {
71 KeyedSingletonBoundKind::Bounded
72 }
73}
74
75/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
76/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
77/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
78pub struct BoundedValue;
79
80impl KeyedSingletonBound for BoundedValue {
81 type UnderlyingBound = Unbounded;
82 type ValueBound = Bounded;
83 type WithBoundedValue = BoundedValue;
84 type WithUnboundedValue = Unbounded;
85
86 fn bound_kind() -> KeyedSingletonBoundKind {
87 KeyedSingletonBoundKind::BoundedValue
88 }
89}
90
91#[doc(hidden)]
92pub struct UnreachableBound;
93
94impl KeyedSingletonBound for UnreachableBound {
95 type UnderlyingBound = Bounded;
96 type ValueBound = Unbounded;
97
98 type WithBoundedValue = Bounded;
99 type WithUnboundedValue = UnreachableBound;
100
101 fn bound_kind() -> KeyedSingletonBoundKind {
102 unreachable!("UnreachableBound cannot be instantiated")
103 }
104}
105
106/// Mapping from keys of type `K` to values of type `V`.
107///
108/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
109/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
110/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
111/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
112/// keys cannot be removed and the value for each key is immutable.
113///
114/// Type Parameters:
115/// - `K`: the type of the key for each entry
116/// - `V`: the type of the value for each entry
117/// - `Loc`: the [`Location`] where the keyed singleton is materialized
118/// - `Bound`: tracks whether the entries are:
119/// - [`Bounded`] (local and finite)
120/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
121/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
122pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
123 pub(crate) location: Loc,
124 pub(crate) ir_node: RefCell<HydroNode>,
125
126 _phantom: PhantomData<(K, V, Loc, Bound)>,
127}
128
129impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
130 for KeyedSingleton<K, V, Loc, Bound>
131{
132 fn clone(&self) -> Self {
133 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
134 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
135 *self.ir_node.borrow_mut() = HydroNode::Tee {
136 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
137 metadata: self.location.new_node_metadata(Self::collection_kind()),
138 };
139 }
140
141 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
142 KeyedSingleton {
143 location: self.location.clone(),
144 ir_node: HydroNode::Tee {
145 inner: TeeNode(inner.0.clone()),
146 metadata: metadata.clone(),
147 }
148 .into(),
149 _phantom: PhantomData,
150 }
151 } else {
152 unreachable!()
153 }
154 }
155}
156
157impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
158 for KeyedSingleton<K, V, L, B>
159where
160 L: Location<'a> + NoTick,
161{
162 type Location = L;
163
164 fn create_source(ident: syn::Ident, location: L) -> Self {
165 KeyedSingleton {
166 location: location.clone(),
167 ir_node: RefCell::new(HydroNode::CycleSource {
168 ident,
169 metadata: location.new_node_metadata(Self::collection_kind()),
170 }),
171 _phantom: PhantomData,
172 }
173 }
174}
175
176impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
177where
178 L: Location<'a>,
179{
180 type Location = Tick<L>;
181
182 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
183 KeyedSingleton::new(
184 location.clone(),
185 HydroNode::CycleSource {
186 ident,
187 metadata: location.new_node_metadata(Self::collection_kind()),
188 },
189 )
190 }
191}
192
193impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
194where
195 L: Location<'a>,
196{
197 fn defer_tick(self) -> Self {
198 KeyedSingleton::defer_tick(self)
199 }
200}
201
202impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
203 for KeyedSingleton<K, V, L, B>
204where
205 L: Location<'a> + NoTick,
206{
207 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
208 assert_eq!(
209 Location::id(&self.location),
210 expected_location,
211 "locations do not match"
212 );
213 self.location
214 .flow_state()
215 .borrow_mut()
216 .push_root(HydroRoot::CycleSink {
217 ident,
218 input: Box::new(self.ir_node.into_inner()),
219 op_metadata: HydroIrOpMetadata::new(),
220 });
221 }
222}
223
224impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
225where
226 L: Location<'a>,
227{
228 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
229 assert_eq!(
230 Location::id(&self.location),
231 expected_location,
232 "locations do not match"
233 );
234 self.location
235 .flow_state()
236 .borrow_mut()
237 .push_root(HydroRoot::CycleSink {
238 ident,
239 input: Box::new(self.ir_node.into_inner()),
240 op_metadata: HydroIrOpMetadata::new(),
241 });
242 }
243}
244
245impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
246 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
247 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
248 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
249
250 KeyedSingleton {
251 location,
252 ir_node: RefCell::new(ir_node),
253 _phantom: PhantomData,
254 }
255 }
256
257 /// Returns the [`Location`] where this keyed singleton is being materialized.
258 pub fn location(&self) -> &L {
259 &self.location
260 }
261}
262
263#[cfg(stageleft_runtime)]
264fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
265 me: KeyedSingleton<K, V, L, Bounded>,
266) -> Singleton<usize, L, Bounded> {
267 me.entries().count()
268}
269
270#[cfg(stageleft_runtime)]
271fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
272 me: KeyedSingleton<K, V, L, Bounded>,
273) -> Singleton<HashMap<K, V>, L, Bounded>
274where
275 K: Eq + Hash,
276{
277 me.entries()
278 .assume_ordering(nondet!(
279 /// Because this is a keyed singleton, there is only one value per key.
280 ))
281 .fold(
282 q!(|| HashMap::new()),
283 q!(|map, (k, v)| {
284 map.insert(k, v);
285 }),
286 )
287}
288
289impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
290 pub(crate) fn collection_kind() -> CollectionKind {
291 CollectionKind::KeyedSingleton {
292 bound: B::bound_kind(),
293 key_type: stageleft::quote_type::<K>().into(),
294 value_type: stageleft::quote_type::<V>().into(),
295 }
296 }
297
298 /// Transforms each value by invoking `f` on each element, with keys staying the same
299 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
300 ///
301 /// If you do not want to modify the stream and instead only want to view
302 /// each item use [`KeyedSingleton::inspect`] instead.
303 ///
304 /// # Example
305 /// ```rust
306 /// # #[cfg(feature = "deploy")] {
307 /// # use hydro_lang::prelude::*;
308 /// # use futures::StreamExt;
309 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310 /// let keyed_singleton = // { 1: 2, 2: 4 }
311 /// # process
312 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
313 /// # .into_keyed()
314 /// # .first();
315 /// keyed_singleton.map(q!(|v| v + 1))
316 /// # .entries()
317 /// # }, |mut stream| async move {
318 /// // { 1: 3, 2: 5 }
319 /// # let mut results = Vec::new();
320 /// # for _ in 0..2 {
321 /// # results.push(stream.next().await.unwrap());
322 /// # }
323 /// # results.sort();
324 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
325 /// # }));
326 /// # }
327 /// ```
328 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
329 where
330 F: Fn(V) -> U + 'a,
331 {
332 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
333 let map_f = q!({
334 let orig = f;
335 move |(k, v)| (k, orig(v))
336 })
337 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
338 .into();
339
340 KeyedSingleton::new(
341 self.location.clone(),
342 HydroNode::Map {
343 f: map_f,
344 input: Box::new(self.ir_node.into_inner()),
345 metadata: self
346 .location
347 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
348 },
349 )
350 }
351
352 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
353 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
354 ///
355 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
356 /// the new value `U`. The key remains unchanged in the output.
357 ///
358 /// # Example
359 /// ```rust
360 /// # #[cfg(feature = "deploy")] {
361 /// # use hydro_lang::prelude::*;
362 /// # use futures::StreamExt;
363 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
364 /// let keyed_singleton = // { 1: 2, 2: 4 }
365 /// # process
366 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
367 /// # .into_keyed()
368 /// # .first();
369 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
370 /// # .entries()
371 /// # }, |mut stream| async move {
372 /// // { 1: 3, 2: 6 }
373 /// # let mut results = Vec::new();
374 /// # for _ in 0..2 {
375 /// # results.push(stream.next().await.unwrap());
376 /// # }
377 /// # results.sort();
378 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
379 /// # }));
380 /// # }
381 /// ```
382 pub fn map_with_key<U, F>(
383 self,
384 f: impl IntoQuotedMut<'a, F, L> + Copy,
385 ) -> KeyedSingleton<K, U, L, B>
386 where
387 F: Fn((K, V)) -> U + 'a,
388 K: Clone,
389 {
390 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
391 let map_f = q!({
392 let orig = f;
393 move |(k, v)| {
394 let out = orig((Clone::clone(&k), v));
395 (k, out)
396 }
397 })
398 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
399 .into();
400
401 KeyedSingleton::new(
402 self.location.clone(),
403 HydroNode::Map {
404 f: map_f,
405 input: Box::new(self.ir_node.into_inner()),
406 metadata: self
407 .location
408 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
409 },
410 )
411 }
412
413 /// Gets the number of keys in the keyed singleton.
414 ///
415 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
416 /// since keys may be added / removed over time. When the set of keys changes, the count will
417 /// be asynchronously updated.
418 ///
419 /// # Example
420 /// ```rust
421 /// # #[cfg(feature = "deploy")] {
422 /// # use hydro_lang::prelude::*;
423 /// # use futures::StreamExt;
424 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
425 /// # let tick = process.tick();
426 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
427 /// # process
428 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
429 /// # .into_keyed()
430 /// # .batch(&tick, nondet!(/** test */))
431 /// # .first();
432 /// keyed_singleton.key_count()
433 /// # .all_ticks()
434 /// # }, |mut stream| async move {
435 /// // 3
436 /// # assert_eq!(stream.next().await.unwrap(), 3);
437 /// # }));
438 /// # }
439 /// ```
440 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
441 if B::ValueBound::BOUNDED {
442 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
443 location: self.location,
444 ir_node: self.ir_node,
445 _phantom: PhantomData,
446 };
447
448 me.entries().count()
449 } else if L::is_top_level()
450 && let Some(tick) = self.location.try_tick()
451 {
452 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
453 location: self.location,
454 ir_node: self.ir_node,
455 _phantom: PhantomData,
456 };
457
458 let out =
459 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
460 .latest();
461 Singleton::new(out.location, out.ir_node.into_inner())
462 } else {
463 panic!("Unbounded KeyedSingleton inside a tick");
464 }
465 }
466
467 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
468 ///
469 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
470 /// asynchronously as well.
471 ///
472 /// # Example
473 /// ```rust
474 /// # #[cfg(feature = "deploy")] {
475 /// # use hydro_lang::prelude::*;
476 /// # use futures::StreamExt;
477 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
478 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
479 /// # process
480 /// # .source_iter(q!(vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())]))
481 /// # .into_keyed()
482 /// # .batch(&process.tick(), nondet!(/** test */))
483 /// # .first();
484 /// keyed_singleton.into_singleton()
485 /// # .all_ticks()
486 /// # }, |mut stream| async move {
487 /// // { 1: "a", 2: "b", 3: "c" }
488 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())].into_iter().collect());
489 /// # }));
490 /// # }
491 /// ```
492 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
493 where
494 K: Eq + Hash,
495 {
496 if B::ValueBound::BOUNDED {
497 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
498 location: self.location,
499 ir_node: self.ir_node,
500 _phantom: PhantomData,
501 };
502
503 me.entries()
504 .assume_ordering(nondet!(
505 /// Because this is a keyed singleton, there is only one value per key.
506 ))
507 .fold(
508 q!(|| HashMap::new()),
509 q!(|map, (k, v)| {
510 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
511 map.insert(k, v);
512 }),
513 )
514 } else if L::is_top_level()
515 && let Some(tick) = self.location.try_tick()
516 {
517 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
518 location: self.location,
519 ir_node: self.ir_node,
520 _phantom: PhantomData,
521 };
522
523 let out = into_singleton_inside_tick(
524 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
525 )
526 .latest();
527 Singleton::new(out.location, out.ir_node.into_inner())
528 } else {
529 panic!("Unbounded KeyedSingleton inside a tick");
530 }
531 }
532
533 /// An operator which allows you to "name" a `HydroNode`.
534 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
535 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
536 {
537 let mut node = self.ir_node.borrow_mut();
538 let metadata = node.metadata_mut();
539 metadata.tag = Some(name.to_string());
540 }
541 self
542 }
543}
544
545impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
546 KeyedSingleton<K, V, L, B>
547{
548 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
549 ///
550 /// The value for each key must be bounded, otherwise the resulting stream elements would be
551 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
552 /// into the output.
553 ///
554 /// # Example
555 /// ```rust
556 /// # #[cfg(feature = "deploy")] {
557 /// # use hydro_lang::prelude::*;
558 /// # use futures::StreamExt;
559 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
560 /// let keyed_singleton = // { 1: 2, 2: 4 }
561 /// # process
562 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
563 /// # .into_keyed()
564 /// # .first();
565 /// keyed_singleton.entries()
566 /// # }, |mut stream| async move {
567 /// // (1, 2), (2, 4) in any order
568 /// # let mut results = Vec::new();
569 /// # for _ in 0..2 {
570 /// # results.push(stream.next().await.unwrap());
571 /// # }
572 /// # results.sort();
573 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
574 /// # }));
575 /// # }
576 /// ```
577 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
578 self.into_keyed_stream().entries()
579 }
580
581 /// Flattens the keyed singleton into an unordered stream of just the values.
582 ///
583 /// The value for each key must be bounded, otherwise the resulting stream elements would be
584 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
585 /// into the output.
586 ///
587 /// # Example
588 /// ```rust
589 /// # #[cfg(feature = "deploy")] {
590 /// # use hydro_lang::prelude::*;
591 /// # use futures::StreamExt;
592 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
593 /// let keyed_singleton = // { 1: 2, 2: 4 }
594 /// # process
595 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
596 /// # .into_keyed()
597 /// # .first();
598 /// keyed_singleton.values()
599 /// # }, |mut stream| async move {
600 /// // 2, 4 in any order
601 /// # let mut results = Vec::new();
602 /// # for _ in 0..2 {
603 /// # results.push(stream.next().await.unwrap());
604 /// # }
605 /// # results.sort();
606 /// # assert_eq!(results, vec![2, 4]);
607 /// # }));
608 /// # }
609 /// ```
610 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
611 let map_f = q!(|(_, v)| v)
612 .splice_fn1_ctx::<(K, V), V>(&self.location)
613 .into();
614
615 Stream::new(
616 self.location.clone(),
617 HydroNode::Map {
618 f: map_f,
619 input: Box::new(self.ir_node.into_inner()),
620 metadata: self.location.new_node_metadata(Stream::<
621 V,
622 L,
623 B::UnderlyingBound,
624 NoOrder,
625 ExactlyOnce,
626 >::collection_kind()),
627 },
628 )
629 }
630
631 /// Flattens the keyed singleton into an unordered stream of just the keys.
632 ///
633 /// The value for each key must be bounded, otherwise the removal of keys would result in
634 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
635 /// into the output.
636 ///
637 /// # Example
638 /// ```rust
639 /// # #[cfg(feature = "deploy")] {
640 /// # use hydro_lang::prelude::*;
641 /// # use futures::StreamExt;
642 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
643 /// let keyed_singleton = // { 1: 2, 2: 4 }
644 /// # process
645 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
646 /// # .into_keyed()
647 /// # .first();
648 /// keyed_singleton.keys()
649 /// # }, |mut stream| async move {
650 /// // 1, 2 in any order
651 /// # let mut results = Vec::new();
652 /// # for _ in 0..2 {
653 /// # results.push(stream.next().await.unwrap());
654 /// # }
655 /// # results.sort();
656 /// # assert_eq!(results, vec![1, 2]);
657 /// # }));
658 /// # }
659 /// ```
660 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
661 self.entries().map(q!(|(k, _)| k))
662 }
663
664 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
665 /// entries whose keys are not in the provided stream.
666 ///
667 /// # Example
668 /// ```rust
669 /// # #[cfg(feature = "deploy")] {
670 /// # use hydro_lang::prelude::*;
671 /// # use futures::StreamExt;
672 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
673 /// let tick = process.tick();
674 /// let keyed_singleton = // { 1: 2, 2: 4 }
675 /// # process
676 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
677 /// # .into_keyed()
678 /// # .first()
679 /// # .batch(&tick, nondet!(/** test */));
680 /// let keys_to_remove = process
681 /// .source_iter(q!(vec![1]))
682 /// .batch(&tick, nondet!(/** test */));
683 /// keyed_singleton.filter_key_not_in(keys_to_remove)
684 /// # .entries().all_ticks()
685 /// # }, |mut stream| async move {
686 /// // { 2: 4 }
687 /// # for w in vec![(2, 4)] {
688 /// # assert_eq!(stream.next().await.unwrap(), w);
689 /// # }
690 /// # }));
691 /// # }
692 /// ```
693 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
694 self,
695 other: Stream<K, L, Bounded, O2, R2>,
696 ) -> Self
697 where
698 K: Hash + Eq,
699 {
700 check_matching_location(&self.location, &other.location);
701
702 KeyedSingleton::new(
703 self.location.clone(),
704 HydroNode::AntiJoin {
705 pos: Box::new(self.ir_node.into_inner()),
706 neg: Box::new(other.ir_node.into_inner()),
707 metadata: self.location.new_node_metadata(Self::collection_kind()),
708 },
709 )
710 }
711
712 /// An operator which allows you to "inspect" each value of a keyed singleton without
713 /// modifying it. The closure `f` is called on a reference to each value. This is
714 /// mainly useful for debugging, and should not be used to generate side-effects.
715 ///
716 /// # Example
717 /// ```rust
718 /// # #[cfg(feature = "deploy")] {
719 /// # use hydro_lang::prelude::*;
720 /// # use futures::StreamExt;
721 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
722 /// let keyed_singleton = // { 1: 2, 2: 4 }
723 /// # process
724 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
725 /// # .into_keyed()
726 /// # .first();
727 /// keyed_singleton
728 /// .inspect(q!(|v| println!("{}", v)))
729 /// # .entries()
730 /// # }, |mut stream| async move {
731 /// // { 1: 2, 2: 4 }
732 /// # for w in vec![(1, 2), (2, 4)] {
733 /// # assert_eq!(stream.next().await.unwrap(), w);
734 /// # }
735 /// # }));
736 /// # }
737 /// ```
738 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
739 where
740 F: Fn(&V) + 'a,
741 {
742 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
743 let inspect_f = q!({
744 let orig = f;
745 move |t: &(_, _)| orig(&t.1)
746 })
747 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
748 .into();
749
750 KeyedSingleton::new(
751 self.location.clone(),
752 HydroNode::Inspect {
753 f: inspect_f,
754 input: Box::new(self.ir_node.into_inner()),
755 metadata: self.location.new_node_metadata(Self::collection_kind()),
756 },
757 )
758 }
759
760 /// An operator which allows you to "inspect" each entry of a keyed singleton without
761 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
762 /// mainly useful for debugging, and should not be used to generate side-effects.
763 ///
764 /// # Example
765 /// ```rust
766 /// # #[cfg(feature = "deploy")] {
767 /// # use hydro_lang::prelude::*;
768 /// # use futures::StreamExt;
769 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770 /// let keyed_singleton = // { 1: 2, 2: 4 }
771 /// # process
772 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
773 /// # .into_keyed()
774 /// # .first();
775 /// keyed_singleton
776 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
777 /// # .entries()
778 /// # }, |mut stream| async move {
779 /// // { 1: 2, 2: 4 }
780 /// # for w in vec![(1, 2), (2, 4)] {
781 /// # assert_eq!(stream.next().await.unwrap(), w);
782 /// # }
783 /// # }));
784 /// # }
785 /// ```
786 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
787 where
788 F: Fn(&(K, V)) + 'a,
789 {
790 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
791
792 KeyedSingleton::new(
793 self.location.clone(),
794 HydroNode::Inspect {
795 f: inspect_f,
796 input: Box::new(self.ir_node.into_inner()),
797 metadata: self.location.new_node_metadata(Self::collection_kind()),
798 },
799 )
800 }
801
802 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
803 ///
804 /// Because this method requires values to be bounded, the output [`Optional`] will only be
805 /// asynchronously updated if a new key is added that is higher than the previous max key.
806 ///
807 /// # Example
808 /// ```rust
809 /// # #[cfg(feature = "deploy")] {
810 /// # use hydro_lang::prelude::*;
811 /// # use futures::StreamExt;
812 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813 /// let tick = process.tick();
814 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
815 /// # process
816 /// # .source_iter(q!(vec![(1, 123), (2, 456), (0, 789)]))
817 /// # .into_keyed()
818 /// # .first();
819 /// keyed_singleton.get_max_key()
820 /// # .sample_eager(nondet!(/** test */))
821 /// # }, |mut stream| async move {
822 /// // (2, 456)
823 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
824 /// # }));
825 /// # }
826 /// ```
827 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
828 where
829 K: Ord,
830 {
831 self.entries()
832 .assume_ordering(nondet!(
833 /// There is only one element associated with each key, and the keys are totallly
834 /// ordered so we will produce a deterministic value. We can't call
835 /// `reduce_commutative_idempotent` because the closure technically isn't commutative
836 /// in the case where both passed entries have the same key but different values.
837 ///
838 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
839 /// the two inputs do not have the same key.
840 ))
841 .reduce_idempotent(q!({
842 move |curr, new| {
843 if new.0 > curr.0 {
844 *curr = new;
845 }
846 }
847 }))
848 }
849
850 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
851 /// element, the value.
852 ///
853 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
854 ///
855 /// # Example
856 /// ```rust
857 /// # #[cfg(feature = "deploy")] {
858 /// # use hydro_lang::prelude::*;
859 /// # use futures::StreamExt;
860 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
861 /// let keyed_singleton = // { 1: 2, 2: 4 }
862 /// # process
863 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
864 /// # .into_keyed()
865 /// # .first();
866 /// keyed_singleton
867 /// .clone()
868 /// .into_keyed_stream()
869 /// .interleave(
870 /// keyed_singleton.into_keyed_stream()
871 /// )
872 /// # .entries()
873 /// # }, |mut stream| async move {
874 /// /// // { 1: [2, 2], 2: [4, 4] }
875 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
876 /// # assert_eq!(stream.next().await.unwrap(), w);
877 /// # }
878 /// # }));
879 /// # }
880 /// ```
881 pub fn into_keyed_stream(
882 self,
883 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
884 KeyedStream::new(
885 self.location.clone(),
886 HydroNode::Cast {
887 inner: Box::new(self.ir_node.into_inner()),
888 metadata: self.location.new_node_metadata(KeyedStream::<
889 K,
890 V,
891 L,
892 B::UnderlyingBound,
893 TotalOrder,
894 ExactlyOnce,
895 >::collection_kind()),
896 },
897 )
898 }
899}
900
901impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
902 /// Gets the value associated with a specific key from the keyed singleton.
903 ///
904 /// # Example
905 /// ```rust
906 /// # #[cfg(feature = "deploy")] {
907 /// # use hydro_lang::prelude::*;
908 /// # use futures::StreamExt;
909 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
910 /// let tick = process.tick();
911 /// let keyed_data = process
912 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
913 /// .into_keyed()
914 /// .batch(&tick, nondet!(/** test */))
915 /// .first();
916 /// let key = tick.singleton(q!(1));
917 /// keyed_data.get(key).all_ticks()
918 /// # }, |mut stream| async move {
919 /// // 2
920 /// # assert_eq!(stream.next().await.unwrap(), 2);
921 /// # }));
922 /// # }
923 /// ```
924 pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
925 self.entries()
926 .join(key.into_stream().map(q!(|k| (k, ()))))
927 .map(q!(|(_, (v, _))| v))
928 .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
929 .first()
930 }
931
932 /// Given a keyed stream of lookup requests, where the key is the lookup and the value
933 /// is some additional metadata, emits a keyed stream of lookup results where the key
934 /// is the same as before, but the value is a tuple of the lookup result and the metadata
935 /// of the request. If the key is not found, no output will be produced.
936 ///
937 /// # Example
938 /// ```rust
939 /// # #[cfg(feature = "deploy")] {
940 /// # use hydro_lang::prelude::*;
941 /// # use futures::StreamExt;
942 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
943 /// let tick = process.tick();
944 /// let keyed_data = process
945 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
946 /// .into_keyed()
947 /// .batch(&tick, nondet!(/** test */))
948 /// .first();
949 /// let other_data = process
950 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
951 /// .into_keyed()
952 /// .batch(&tick, nondet!(/** test */));
953 /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
954 /// # }, |mut stream| async move {
955 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
956 /// # let mut results = vec![];
957 /// # for _ in 0..3 {
958 /// # results.push(stream.next().await.unwrap());
959 /// # }
960 /// # results.sort();
961 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
962 /// # }));
963 /// # }
964 /// ```
965 pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
966 self,
967 requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
968 ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
969 self.entries()
970 .weaker_retries::<R2>()
971 .join(requests.entries())
972 .into_keyed()
973 }
974
975 /// Given a keyed stream of lookup requests, where the key is the lookup and the value
976 /// is some additional metadata, emits a keyed stream of lookup results where the key
977 /// is the same as before, but the value is a tuple of the lookup result (as `Option<V>`)
978 /// and the metadata of the request. Unlike `get_many_if_present`, this returns all request
979 /// keys, with `None` for keys that are not found.
980 ///
981 /// # Example
982 /// ```rust
983 /// # #[cfg(feature = "deploy")] {
984 /// # use hydro_lang::prelude::*;
985 /// # use futures::StreamExt;
986 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
987 /// let tick = process.tick();
988 /// let keyed_data = process
989 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
990 /// .into_keyed()
991 /// .batch(&tick, nondet!(/** test */))
992 /// .first();
993 /// let other_data = process
994 /// .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
995 /// .into_keyed()
996 /// .batch(&tick, nondet!(/** test */));
997 /// keyed_data.get_many(other_data).entries().all_ticks()
998 /// # }, |mut stream| async move {
999 /// // { 1: [(Some(10), 100)], 2: [(Some(20), 200)], 3: [(None, 300)] } in any order
1000 /// # let mut results = vec![];
1001 /// # for _ in 0..3 {
1002 /// # results.push(stream.next().await.unwrap());
1003 /// # }
1004 /// # results.sort();
1005 /// # assert_eq!(results, vec![(1, (Some(10), 100)), (2, (Some(20), 200)), (3, (None, 300))]);
1006 /// # }));
1007 /// # }
1008 /// ```
1009 #[expect(clippy::type_complexity, reason = "stream types")]
1010 pub fn get_many<O2: Ordering, R2: Retries, V2>(
1011 self,
1012 requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
1013 ) -> KeyedStream<K, (Option<V>, V2), Tick<L>, Bounded, NoOrder, R2>
1014 where
1015 K: Clone,
1016 V: Clone,
1017 V2: Clone,
1018 {
1019 let lookup_result = self.clone().get_many_if_present(requests.clone());
1020 let missing_keys = requests.filter_key_not_in(self.keys()).weakest_ordering();
1021
1022 lookup_result
1023 .map(q!(|(v, v2)| (Some(v), v2)))
1024 .chain(missing_keys.map(q!(|v2| (None, v2))))
1025 }
1026
1027 /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
1028 /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
1029 /// containing the value from `self` and an option of the value from `from`. If the key is not
1030 /// present in `from`, the option will be [`None`].
1031 ///
1032 /// # Example
1033 /// ```rust
1034 /// # #[cfg(feature = "deploy")] {
1035 /// # use hydro_lang::prelude::*;
1036 /// # use futures::StreamExt;
1037 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1038 /// # let tick = process.tick();
1039 /// let requests = // { 1: 10, 2: 20 }
1040 /// # process
1041 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
1042 /// # .into_keyed()
1043 /// # .batch(&tick, nondet!(/** test */))
1044 /// # .first();
1045 /// let other_data = // { 10: 100, 11: 101 }
1046 /// # process
1047 /// # .source_iter(q!(vec![(10, 100), (11, 101)]))
1048 /// # .into_keyed()
1049 /// # .batch(&tick, nondet!(/** test */))
1050 /// # .first();
1051 /// requests.get_from(other_data)
1052 /// # .entries().all_ticks()
1053 /// # }, |mut stream| async move {
1054 /// // { 1: (10, Some(100)), 2: (20, None) }
1055 /// # let mut results = vec![];
1056 /// # for _ in 0..2 {
1057 /// # results.push(stream.next().await.unwrap());
1058 /// # }
1059 /// # results.sort();
1060 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
1061 /// # }));
1062 /// # }
1063 /// ```
1064 pub fn get_from<V2: Clone>(
1065 self,
1066 from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
1067 ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
1068 where
1069 K: Clone,
1070 V: Hash + Eq + Clone,
1071 {
1072 let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
1073 let lookup_result = from.get_many_if_present(to_lookup.clone());
1074 let missing_values =
1075 to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
1076 let result_stream = lookup_result
1077 .entries()
1078 .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
1079 .into_keyed()
1080 .chain(
1081 missing_values
1082 .entries()
1083 .map(q!(|(v, k)| (k, (v, None))))
1084 .into_keyed(),
1085 );
1086
1087 KeyedSingleton::new(
1088 result_stream.location.clone(),
1089 HydroNode::Cast {
1090 inner: Box::new(result_stream.ir_node.into_inner()),
1091 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
1092 K,
1093 (V, Option<V2>),
1094 Tick<L>,
1095 Bounded,
1096 >::collection_kind(
1097 )),
1098 },
1099 )
1100 }
1101}
1102
1103impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1104where
1105 L: Location<'a>,
1106{
1107 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1108 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1109 ///
1110 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1111 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1112 /// argument that declares where the keyed singleton will be atomically processed. Batching a
1113 /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1114 /// batching into a different [`Tick`] will introduce asynchrony.
1115 pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1116 let out_location = Atomic { tick: tick.clone() };
1117 KeyedSingleton::new(
1118 out_location.clone(),
1119 HydroNode::BeginAtomic {
1120 inner: Box::new(self.ir_node.into_inner()),
1121 metadata: out_location
1122 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1123 },
1124 )
1125 }
1126}
1127
1128impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1129where
1130 L: Location<'a> + NoTick,
1131{
1132 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1133 /// See [`KeyedSingleton::atomic`] for more details.
1134 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1135 KeyedSingleton::new(
1136 self.location.tick.l.clone(),
1137 HydroNode::EndAtomic {
1138 inner: Box::new(self.ir_node.into_inner()),
1139 metadata: self
1140 .location
1141 .tick
1142 .l
1143 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1144 },
1145 )
1146 }
1147}
1148
1149impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1150 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1151 /// tick `T` always has the entries of `self` at tick `T - 1`.
1152 ///
1153 /// At tick `0`, the output has no entries, since there is no previous tick.
1154 ///
1155 /// This operator enables stateful iterative processing with ticks, by sending data from one
1156 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1157 ///
1158 /// # Example
1159 /// ```rust
1160 /// # #[cfg(feature = "deploy")] {
1161 /// # use hydro_lang::prelude::*;
1162 /// # use futures::StreamExt;
1163 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1164 /// let tick = process.tick();
1165 /// # // ticks are lazy by default, forces the second tick to run
1166 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1167 /// # let batch_first_tick = process
1168 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1169 /// # .batch(&tick, nondet!(/** test */))
1170 /// # .into_keyed();
1171 /// # let batch_second_tick = process
1172 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1173 /// # .batch(&tick, nondet!(/** test */))
1174 /// # .into_keyed()
1175 /// # .defer_tick(); // appears on the second tick
1176 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1177 /// # batch_first_tick.chain(batch_second_tick).first();
1178 /// input_batch.clone().filter_key_not_in(
1179 /// input_batch.defer_tick().keys() // keys present in the previous tick
1180 /// )
1181 /// # .entries().all_ticks()
1182 /// # }, |mut stream| async move {
1183 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1184 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1185 /// # assert_eq!(stream.next().await.unwrap(), w);
1186 /// # }
1187 /// # }));
1188 /// # }
1189 /// ```
1190 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1191 KeyedSingleton::new(
1192 self.location.clone(),
1193 HydroNode::DeferTick {
1194 input: Box::new(self.ir_node.into_inner()),
1195 metadata: self
1196 .location
1197 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1198 },
1199 )
1200 }
1201}
1202
1203impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1204where
1205 L: Location<'a>,
1206{
1207 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1208 /// point in time.
1209 ///
1210 /// # Non-Determinism
1211 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1212 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1213 pub fn snapshot(
1214 self,
1215 tick: &Tick<L>,
1216 _nondet: NonDet,
1217 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1218 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1219 KeyedSingleton::new(
1220 tick.clone(),
1221 HydroNode::Batch {
1222 inner: Box::new(self.ir_node.into_inner()),
1223 metadata: tick
1224 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1225 },
1226 )
1227 }
1228}
1229
1230impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1231where
1232 L: Location<'a> + NoTick,
1233{
1234 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1235 /// state of the keyed singleton being atomically processed.
1236 ///
1237 /// # Non-Determinism
1238 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1239 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1240 pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1241 KeyedSingleton::new(
1242 self.location.clone().tick,
1243 HydroNode::Batch {
1244 inner: Box::new(self.ir_node.into_inner()),
1245 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1246 K,
1247 V,
1248 Tick<L>,
1249 Bounded,
1250 >::collection_kind(
1251 )),
1252 },
1253 )
1254 }
1255}
1256
1257impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1258where
1259 L: Location<'a>,
1260{
1261 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1262 ///
1263 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1264 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1265 /// is filtered out.
1266 ///
1267 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1268 /// not modify or take ownership of the values. If you need to modify the values while filtering
1269 /// use [`KeyedSingleton::filter_map`] instead.
1270 ///
1271 /// # Example
1272 /// ```rust
1273 /// # #[cfg(feature = "deploy")] {
1274 /// # use hydro_lang::prelude::*;
1275 /// # use futures::StreamExt;
1276 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1277 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1278 /// # process
1279 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1280 /// # .into_keyed()
1281 /// # .first();
1282 /// keyed_singleton.filter(q!(|&v| v > 1))
1283 /// # .entries()
1284 /// # }, |mut stream| async move {
1285 /// // { 1: 2, 2: 4 }
1286 /// # let mut results = Vec::new();
1287 /// # for _ in 0..2 {
1288 /// # results.push(stream.next().await.unwrap());
1289 /// # }
1290 /// # results.sort();
1291 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1292 /// # }));
1293 /// # }
1294 /// ```
1295 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1296 where
1297 F: Fn(&V) -> bool + 'a,
1298 {
1299 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1300 let filter_f = q!({
1301 let orig = f;
1302 move |t: &(_, _)| orig(&t.1)
1303 })
1304 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1305 .into();
1306
1307 KeyedSingleton::new(
1308 self.location.clone(),
1309 HydroNode::Filter {
1310 f: filter_f,
1311 input: Box::new(self.ir_node.into_inner()),
1312 metadata: self
1313 .location
1314 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1315 },
1316 )
1317 }
1318
1319 /// An operator that both filters and maps values. It yields only the key-value pairs where
1320 /// the supplied closure `f` returns `Some(value)`.
1321 ///
1322 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1323 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1324 /// If it returns `None`, the key-value pair is filtered out.
1325 ///
1326 /// # Example
1327 /// ```rust
1328 /// # #[cfg(feature = "deploy")] {
1329 /// # use hydro_lang::prelude::*;
1330 /// # use futures::StreamExt;
1331 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1332 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1333 /// # process
1334 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1335 /// # .into_keyed()
1336 /// # .first();
1337 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1338 /// # .entries()
1339 /// # }, |mut stream| async move {
1340 /// // { 1: 42, 3: 100 }
1341 /// # let mut results = Vec::new();
1342 /// # for _ in 0..2 {
1343 /// # results.push(stream.next().await.unwrap());
1344 /// # }
1345 /// # results.sort();
1346 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1347 /// # }));
1348 /// # }
1349 /// ```
1350 pub fn filter_map<F, U>(
1351 self,
1352 f: impl IntoQuotedMut<'a, F, L> + Copy,
1353 ) -> KeyedSingleton<K, U, L, B>
1354 where
1355 F: Fn(V) -> Option<U> + 'a,
1356 {
1357 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1358 let filter_map_f = q!({
1359 let orig = f;
1360 move |(k, v)| orig(v).map(|o| (k, o))
1361 })
1362 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1363 .into();
1364
1365 KeyedSingleton::new(
1366 self.location.clone(),
1367 HydroNode::FilterMap {
1368 f: filter_map_f,
1369 input: Box::new(self.ir_node.into_inner()),
1370 metadata: self
1371 .location
1372 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1373 },
1374 )
1375 }
1376
1377 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1378 /// arrived since the previous batch was released.
1379 ///
1380 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1381 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1382 ///
1383 /// # Non-Determinism
1384 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1385 /// has a non-deterministic set of key-value pairs.
1386 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1387 where
1388 L: NoTick,
1389 {
1390 self.atomic(tick).batch_atomic(nondet)
1391 }
1392}
1393
1394impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1395where
1396 L: Location<'a> + NoTick,
1397{
1398 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1399 /// atomically processed.
1400 ///
1401 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1402 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1403 ///
1404 /// # Non-Determinism
1405 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1406 /// has a non-deterministic set of key-value pairs.
1407 pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1408 let _ = nondet;
1409 KeyedSingleton::new(
1410 self.location.clone().tick,
1411 HydroNode::Batch {
1412 inner: Box::new(self.ir_node.into_inner()),
1413 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1414 K,
1415 V,
1416 Tick<L>,
1417 Bounded,
1418 >::collection_kind(
1419 )),
1420 },
1421 )
1422 }
1423}
1424
1425#[cfg(test)]
1426mod tests {
1427 #[cfg(feature = "deploy")]
1428 use futures::{SinkExt, StreamExt};
1429 #[cfg(feature = "deploy")]
1430 use hydro_deploy::Deployment;
1431 #[cfg(any(feature = "deploy", feature = "sim"))]
1432 use stageleft::q;
1433
1434 #[cfg(any(feature = "deploy", feature = "sim"))]
1435 use crate::compile::builder::FlowBuilder;
1436 #[cfg(any(feature = "deploy", feature = "sim"))]
1437 use crate::location::Location;
1438 #[cfg(any(feature = "deploy", feature = "sim"))]
1439 use crate::nondet::nondet;
1440
1441 #[cfg(feature = "deploy")]
1442 #[tokio::test]
1443 async fn key_count_bounded_value() {
1444 let mut deployment = Deployment::new();
1445
1446 let flow = FlowBuilder::new();
1447 let node = flow.process::<()>();
1448 let external = flow.external::<()>();
1449
1450 let (input_port, input) = node.source_external_bincode(&external);
1451 let out = input
1452 .into_keyed()
1453 .first()
1454 .key_count()
1455 .sample_eager(nondet!(/** test */))
1456 .send_bincode_external(&external);
1457
1458 let nodes = flow
1459 .with_process(&node, deployment.Localhost())
1460 .with_external(&external, deployment.Localhost())
1461 .deploy(&mut deployment);
1462
1463 deployment.deploy().await.unwrap();
1464
1465 let mut external_in = nodes.connect(input_port).await;
1466 let mut external_out = nodes.connect(out).await;
1467
1468 deployment.start().await.unwrap();
1469
1470 assert_eq!(external_out.next().await.unwrap(), 0);
1471
1472 external_in.send((1, 1)).await.unwrap();
1473 assert_eq!(external_out.next().await.unwrap(), 1);
1474
1475 external_in.send((2, 2)).await.unwrap();
1476 assert_eq!(external_out.next().await.unwrap(), 2);
1477 }
1478
1479 #[cfg(feature = "deploy")]
1480 #[tokio::test]
1481 async fn key_count_unbounded_value() {
1482 let mut deployment = Deployment::new();
1483
1484 let flow = FlowBuilder::new();
1485 let node = flow.process::<()>();
1486 let external = flow.external::<()>();
1487
1488 let (input_port, input) = node.source_external_bincode(&external);
1489 let out = input
1490 .into_keyed()
1491 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1492 .key_count()
1493 .sample_eager(nondet!(/** test */))
1494 .send_bincode_external(&external);
1495
1496 let nodes = flow
1497 .with_process(&node, deployment.Localhost())
1498 .with_external(&external, deployment.Localhost())
1499 .deploy(&mut deployment);
1500
1501 deployment.deploy().await.unwrap();
1502
1503 let mut external_in = nodes.connect(input_port).await;
1504 let mut external_out = nodes.connect(out).await;
1505
1506 deployment.start().await.unwrap();
1507
1508 assert_eq!(external_out.next().await.unwrap(), 0);
1509
1510 external_in.send((1, 1)).await.unwrap();
1511 assert_eq!(external_out.next().await.unwrap(), 1);
1512
1513 external_in.send((1, 2)).await.unwrap();
1514 assert_eq!(external_out.next().await.unwrap(), 1);
1515
1516 external_in.send((2, 2)).await.unwrap();
1517 assert_eq!(external_out.next().await.unwrap(), 2);
1518
1519 external_in.send((1, 1)).await.unwrap();
1520 assert_eq!(external_out.next().await.unwrap(), 2);
1521
1522 external_in.send((3, 1)).await.unwrap();
1523 assert_eq!(external_out.next().await.unwrap(), 3);
1524 }
1525
1526 #[cfg(feature = "deploy")]
1527 #[tokio::test]
1528 async fn into_singleton_bounded_value() {
1529 let mut deployment = Deployment::new();
1530
1531 let flow = FlowBuilder::new();
1532 let node = flow.process::<()>();
1533 let external = flow.external::<()>();
1534
1535 let (input_port, input) = node.source_external_bincode(&external);
1536 let out = input
1537 .into_keyed()
1538 .first()
1539 .into_singleton()
1540 .sample_eager(nondet!(/** test */))
1541 .send_bincode_external(&external);
1542
1543 let nodes = flow
1544 .with_process(&node, deployment.Localhost())
1545 .with_external(&external, deployment.Localhost())
1546 .deploy(&mut deployment);
1547
1548 deployment.deploy().await.unwrap();
1549
1550 let mut external_in = nodes.connect(input_port).await;
1551 let mut external_out = nodes.connect(out).await;
1552
1553 deployment.start().await.unwrap();
1554
1555 assert_eq!(
1556 external_out.next().await.unwrap(),
1557 std::collections::HashMap::new()
1558 );
1559
1560 external_in.send((1, 1)).await.unwrap();
1561 assert_eq!(
1562 external_out.next().await.unwrap(),
1563 vec![(1, 1)].into_iter().collect()
1564 );
1565
1566 external_in.send((2, 2)).await.unwrap();
1567 assert_eq!(
1568 external_out.next().await.unwrap(),
1569 vec![(1, 1), (2, 2)].into_iter().collect()
1570 );
1571 }
1572
1573 #[cfg(feature = "deploy")]
1574 #[tokio::test]
1575 async fn into_singleton_unbounded_value() {
1576 let mut deployment = Deployment::new();
1577
1578 let flow = FlowBuilder::new();
1579 let node = flow.process::<()>();
1580 let external = flow.external::<()>();
1581
1582 let (input_port, input) = node.source_external_bincode(&external);
1583 let out = input
1584 .into_keyed()
1585 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1586 .into_singleton()
1587 .sample_eager(nondet!(/** test */))
1588 .send_bincode_external(&external);
1589
1590 let nodes = flow
1591 .with_process(&node, deployment.Localhost())
1592 .with_external(&external, deployment.Localhost())
1593 .deploy(&mut deployment);
1594
1595 deployment.deploy().await.unwrap();
1596
1597 let mut external_in = nodes.connect(input_port).await;
1598 let mut external_out = nodes.connect(out).await;
1599
1600 deployment.start().await.unwrap();
1601
1602 assert_eq!(
1603 external_out.next().await.unwrap(),
1604 std::collections::HashMap::new()
1605 );
1606
1607 external_in.send((1, 1)).await.unwrap();
1608 assert_eq!(
1609 external_out.next().await.unwrap(),
1610 vec![(1, 1)].into_iter().collect()
1611 );
1612
1613 external_in.send((1, 2)).await.unwrap();
1614 assert_eq!(
1615 external_out.next().await.unwrap(),
1616 vec![(1, 2)].into_iter().collect()
1617 );
1618
1619 external_in.send((2, 2)).await.unwrap();
1620 assert_eq!(
1621 external_out.next().await.unwrap(),
1622 vec![(1, 2), (2, 1)].into_iter().collect()
1623 );
1624
1625 external_in.send((1, 1)).await.unwrap();
1626 assert_eq!(
1627 external_out.next().await.unwrap(),
1628 vec![(1, 3), (2, 1)].into_iter().collect()
1629 );
1630
1631 external_in.send((3, 1)).await.unwrap();
1632 assert_eq!(
1633 external_out.next().await.unwrap(),
1634 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1635 );
1636 }
1637
1638 #[cfg(feature = "sim")]
1639 #[test]
1640 fn sim_unbounded_singleton_snapshot() {
1641 let flow = FlowBuilder::new();
1642 let node = flow.process::<()>();
1643
1644 let (input_port, input) = node.sim_input();
1645 let output = input
1646 .into_keyed()
1647 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1648 .snapshot(&node.tick(), nondet!(/** test */))
1649 .entries()
1650 .all_ticks()
1651 .sim_output();
1652
1653 let count = flow.sim().exhaustive(async || {
1654 input_port.send((1, 123));
1655 input_port.send((1, 456));
1656 input_port.send((2, 123));
1657
1658 let all = output.collect_sorted::<Vec<_>>().await;
1659 assert_eq!(all.last().unwrap(), &(2, 1));
1660 });
1661
1662 assert_eq!(count, 8);
1663 }
1664
1665 #[cfg(feature = "deploy")]
1666 #[tokio::test]
1667 async fn get_many_outer_join() {
1668 let mut deployment = Deployment::new();
1669
1670 let flow = FlowBuilder::new();
1671 let node = flow.process::<()>();
1672 let external = flow.external::<()>();
1673
1674 let tick = node.tick();
1675 let keyed_data = node
1676 .source_iter(q!(vec![(1, 10), (2, 20)]))
1677 .into_keyed()
1678 .batch(&tick, nondet!(/** test */))
1679 .first();
1680 let requests = node
1681 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1682 .into_keyed()
1683 .batch(&tick, nondet!(/** test */));
1684
1685 let out = keyed_data
1686 .get_many(requests)
1687 .entries()
1688 .all_ticks()
1689 .send_bincode_external(&external);
1690
1691 let nodes = flow
1692 .with_process(&node, deployment.Localhost())
1693 .with_external(&external, deployment.Localhost())
1694 .deploy(&mut deployment);
1695
1696 deployment.deploy().await.unwrap();
1697
1698 let mut external_out = nodes.connect(out).await;
1699
1700 deployment.start().await.unwrap();
1701
1702 let mut results = vec![];
1703 for _ in 0..3 {
1704 results.push(external_out.next().await.unwrap());
1705 }
1706 results.sort();
1707
1708 assert_eq!(
1709 results,
1710 vec![(1, (Some(10), 100)), (2, (Some(20), 200)), (3, (None, 300))]
1711 );
1712 }
1713}