hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
19};
20#[cfg(stageleft_runtime)]
21use crate::forward_handle::{CycleCollection, ReceiverComplete};
22use crate::forward_handle::{ForwardRef, TickCycle};
23use crate::live_collections::stream::{Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::DeferTick;
27use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
28use crate::manual_expr::ManualExpr;
29use crate::nondet::{NonDet, nondet};
30
31/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
32///
33/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
34/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
35/// indicates that entries may be added over time, but once an entry is added it will never be
36/// removed and its value will never change.
37pub trait KeyedSingletonBound {
38    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
39    type UnderlyingBound: Boundedness;
40    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
41    type ValueBound: Boundedness;
42
43    /// The type of the keyed singleton if the value for each key is immutable.
44    type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
45
46    /// The type of the keyed singleton if the value for each key may change asynchronously.
47    type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
48
49    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
50    fn bound_kind() -> KeyedSingletonBoundKind;
51}
52
53impl KeyedSingletonBound for Unbounded {
54    type UnderlyingBound = Unbounded;
55    type ValueBound = Unbounded;
56    type WithBoundedValue = BoundedValue;
57    type WithUnboundedValue = Unbounded;
58
59    fn bound_kind() -> KeyedSingletonBoundKind {
60        KeyedSingletonBoundKind::Unbounded
61    }
62}
63
64impl KeyedSingletonBound for Bounded {
65    type UnderlyingBound = Bounded;
66    type ValueBound = Bounded;
67    type WithBoundedValue = Bounded;
68    type WithUnboundedValue = UnreachableBound;
69
70    fn bound_kind() -> KeyedSingletonBoundKind {
71        KeyedSingletonBoundKind::Bounded
72    }
73}
74
75/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
76/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
77/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
78pub struct BoundedValue;
79
80impl KeyedSingletonBound for BoundedValue {
81    type UnderlyingBound = Unbounded;
82    type ValueBound = Bounded;
83    type WithBoundedValue = BoundedValue;
84    type WithUnboundedValue = Unbounded;
85
86    fn bound_kind() -> KeyedSingletonBoundKind {
87        KeyedSingletonBoundKind::BoundedValue
88    }
89}
90
91#[doc(hidden)]
92pub struct UnreachableBound;
93
94impl KeyedSingletonBound for UnreachableBound {
95    type UnderlyingBound = Bounded;
96    type ValueBound = Unbounded;
97
98    type WithBoundedValue = Bounded;
99    type WithUnboundedValue = UnreachableBound;
100
101    fn bound_kind() -> KeyedSingletonBoundKind {
102        unreachable!("UnreachableBound cannot be instantiated")
103    }
104}
105
106/// Mapping from keys of type `K` to values of type `V`.
107///
108/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
109/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
110/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
111/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
112/// keys cannot be removed and the value for each key is immutable.
113///
114/// Type Parameters:
115/// - `K`: the type of the key for each entry
116/// - `V`: the type of the value for each entry
117/// - `Loc`: the [`Location`] where the keyed singleton is materialized
118/// - `Bound`: tracks whether the entries are:
119///     - [`Bounded`] (local and finite)
120///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
121///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
122pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
123    pub(crate) location: Loc,
124    pub(crate) ir_node: RefCell<HydroNode>,
125
126    _phantom: PhantomData<(K, V, Loc, Bound)>,
127}
128
129impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
130    for KeyedSingleton<K, V, Loc, Bound>
131{
132    fn clone(&self) -> Self {
133        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
134            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
135            *self.ir_node.borrow_mut() = HydroNode::Tee {
136                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
137                metadata: self.location.new_node_metadata(Self::collection_kind()),
138            };
139        }
140
141        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
142            KeyedSingleton {
143                location: self.location.clone(),
144                ir_node: HydroNode::Tee {
145                    inner: TeeNode(inner.0.clone()),
146                    metadata: metadata.clone(),
147                }
148                .into(),
149                _phantom: PhantomData,
150            }
151        } else {
152            unreachable!()
153        }
154    }
155}
156
157impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
158    for KeyedSingleton<K, V, L, B>
159where
160    L: Location<'a> + NoTick,
161{
162    type Location = L;
163
164    fn create_source(ident: syn::Ident, location: L) -> Self {
165        KeyedSingleton {
166            location: location.clone(),
167            ir_node: RefCell::new(HydroNode::CycleSource {
168                ident,
169                metadata: location.new_node_metadata(Self::collection_kind()),
170            }),
171            _phantom: PhantomData,
172        }
173    }
174}
175
176impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
177where
178    L: Location<'a>,
179{
180    type Location = Tick<L>;
181
182    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
183        KeyedSingleton::new(
184            location.clone(),
185            HydroNode::CycleSource {
186                ident,
187                metadata: location.new_node_metadata(Self::collection_kind()),
188            },
189        )
190    }
191}
192
193impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
194where
195    L: Location<'a>,
196{
197    fn defer_tick(self) -> Self {
198        KeyedSingleton::defer_tick(self)
199    }
200}
201
202impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
203    for KeyedSingleton<K, V, L, B>
204where
205    L: Location<'a> + NoTick,
206{
207    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
208        assert_eq!(
209            Location::id(&self.location),
210            expected_location,
211            "locations do not match"
212        );
213        self.location
214            .flow_state()
215            .borrow_mut()
216            .push_root(HydroRoot::CycleSink {
217                ident,
218                input: Box::new(self.ir_node.into_inner()),
219                op_metadata: HydroIrOpMetadata::new(),
220            });
221    }
222}
223
224impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
225where
226    L: Location<'a>,
227{
228    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
229        assert_eq!(
230            Location::id(&self.location),
231            expected_location,
232            "locations do not match"
233        );
234        self.location
235            .flow_state()
236            .borrow_mut()
237            .push_root(HydroRoot::CycleSink {
238                ident,
239                input: Box::new(self.ir_node.into_inner()),
240                op_metadata: HydroIrOpMetadata::new(),
241            });
242    }
243}
244
245impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
246    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
247        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
248        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
249
250        KeyedSingleton {
251            location,
252            ir_node: RefCell::new(ir_node),
253            _phantom: PhantomData,
254        }
255    }
256
257    /// Returns the [`Location`] where this keyed singleton is being materialized.
258    pub fn location(&self) -> &L {
259        &self.location
260    }
261}
262
263#[cfg(stageleft_runtime)]
264fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
265    me: KeyedSingleton<K, V, L, Bounded>,
266) -> Singleton<usize, L, Bounded> {
267    me.entries().count()
268}
269
270#[cfg(stageleft_runtime)]
271fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
272    me: KeyedSingleton<K, V, L, Bounded>,
273) -> Singleton<HashMap<K, V>, L, Bounded>
274where
275    K: Eq + Hash,
276{
277    me.entries()
278        .assume_ordering(nondet!(
279            /// Because this is a keyed singleton, there is only one value per key.
280        ))
281        .fold(
282            q!(|| HashMap::new()),
283            q!(|map, (k, v)| {
284                map.insert(k, v);
285            }),
286        )
287}
288
289impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
290    pub(crate) fn collection_kind() -> CollectionKind {
291        CollectionKind::KeyedSingleton {
292            bound: B::bound_kind(),
293            key_type: stageleft::quote_type::<K>().into(),
294            value_type: stageleft::quote_type::<V>().into(),
295        }
296    }
297
298    /// Transforms each value by invoking `f` on each element, with keys staying the same
299    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
300    ///
301    /// If you do not want to modify the stream and instead only want to view
302    /// each item use [`KeyedSingleton::inspect`] instead.
303    ///
304    /// # Example
305    /// ```rust
306    /// # #[cfg(feature = "deploy")] {
307    /// # use hydro_lang::prelude::*;
308    /// # use futures::StreamExt;
309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310    /// let keyed_singleton = // { 1: 2, 2: 4 }
311    /// # process
312    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
313    /// #     .into_keyed()
314    /// #     .first();
315    /// keyed_singleton.map(q!(|v| v + 1))
316    /// #   .entries()
317    /// # }, |mut stream| async move {
318    /// // { 1: 3, 2: 5 }
319    /// # let mut results = Vec::new();
320    /// # for _ in 0..2 {
321    /// #     results.push(stream.next().await.unwrap());
322    /// # }
323    /// # results.sort();
324    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
325    /// # }));
326    /// # }
327    /// ```
328    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
329    where
330        F: Fn(V) -> U + 'a,
331    {
332        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
333        let map_f = q!({
334            let orig = f;
335            move |(k, v)| (k, orig(v))
336        })
337        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
338        .into();
339
340        KeyedSingleton::new(
341            self.location.clone(),
342            HydroNode::Map {
343                f: map_f,
344                input: Box::new(self.ir_node.into_inner()),
345                metadata: self
346                    .location
347                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
348            },
349        )
350    }
351
352    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
353    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
354    ///
355    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
356    /// the new value `U`. The key remains unchanged in the output.
357    ///
358    /// # Example
359    /// ```rust
360    /// # #[cfg(feature = "deploy")] {
361    /// # use hydro_lang::prelude::*;
362    /// # use futures::StreamExt;
363    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
364    /// let keyed_singleton = // { 1: 2, 2: 4 }
365    /// # process
366    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
367    /// #     .into_keyed()
368    /// #     .first();
369    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
370    /// #   .entries()
371    /// # }, |mut stream| async move {
372    /// // { 1: 3, 2: 6 }
373    /// # let mut results = Vec::new();
374    /// # for _ in 0..2 {
375    /// #     results.push(stream.next().await.unwrap());
376    /// # }
377    /// # results.sort();
378    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
379    /// # }));
380    /// # }
381    /// ```
382    pub fn map_with_key<U, F>(
383        self,
384        f: impl IntoQuotedMut<'a, F, L> + Copy,
385    ) -> KeyedSingleton<K, U, L, B>
386    where
387        F: Fn((K, V)) -> U + 'a,
388        K: Clone,
389    {
390        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
391        let map_f = q!({
392            let orig = f;
393            move |(k, v)| {
394                let out = orig((Clone::clone(&k), v));
395                (k, out)
396            }
397        })
398        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
399        .into();
400
401        KeyedSingleton::new(
402            self.location.clone(),
403            HydroNode::Map {
404                f: map_f,
405                input: Box::new(self.ir_node.into_inner()),
406                metadata: self
407                    .location
408                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
409            },
410        )
411    }
412
413    /// Gets the number of keys in the keyed singleton.
414    ///
415    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
416    /// since keys may be added / removed over time. When the set of keys changes, the count will
417    /// be asynchronously updated.
418    ///
419    /// # Example
420    /// ```rust
421    /// # #[cfg(feature = "deploy")] {
422    /// # use hydro_lang::prelude::*;
423    /// # use futures::StreamExt;
424    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
425    /// # let tick = process.tick();
426    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
427    /// # process
428    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
429    /// #     .into_keyed()
430    /// #     .batch(&tick, nondet!(/** test */))
431    /// #     .first();
432    /// keyed_singleton.key_count()
433    /// # .all_ticks()
434    /// # }, |mut stream| async move {
435    /// // 3
436    /// # assert_eq!(stream.next().await.unwrap(), 3);
437    /// # }));
438    /// # }
439    /// ```
440    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
441        if B::ValueBound::BOUNDED {
442            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
443                location: self.location,
444                ir_node: self.ir_node,
445                _phantom: PhantomData,
446            };
447
448            me.entries().count()
449        } else if L::is_top_level()
450            && let Some(tick) = self.location.try_tick()
451        {
452            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
453                location: self.location,
454                ir_node: self.ir_node,
455                _phantom: PhantomData,
456            };
457
458            let out =
459                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
460                    .latest();
461            Singleton::new(out.location, out.ir_node.into_inner())
462        } else {
463            panic!("Unbounded KeyedSingleton inside a tick");
464        }
465    }
466
467    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
468    ///
469    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
470    /// asynchronously as well.
471    ///
472    /// # Example
473    /// ```rust
474    /// # #[cfg(feature = "deploy")] {
475    /// # use hydro_lang::prelude::*;
476    /// # use futures::StreamExt;
477    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
478    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
479    /// # process
480    /// #     .source_iter(q!(vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())]))
481    /// #     .into_keyed()
482    /// #     .batch(&process.tick(), nondet!(/** test */))
483    /// #     .first();
484    /// keyed_singleton.into_singleton()
485    /// # .all_ticks()
486    /// # }, |mut stream| async move {
487    /// // { 1: "a", 2: "b", 3: "c" }
488    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_string()), (2, "b".to_string()), (3, "c".to_string())].into_iter().collect());
489    /// # }));
490    /// # }
491    /// ```
492    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
493    where
494        K: Eq + Hash,
495    {
496        if B::ValueBound::BOUNDED {
497            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
498                location: self.location,
499                ir_node: self.ir_node,
500                _phantom: PhantomData,
501            };
502
503            me.entries()
504                .assume_ordering(nondet!(
505                    /// Because this is a keyed singleton, there is only one value per key.
506                ))
507                .fold(
508                    q!(|| HashMap::new()),
509                    q!(|map, (k, v)| {
510                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
511                        map.insert(k, v);
512                    }),
513                )
514        } else if L::is_top_level()
515            && let Some(tick) = self.location.try_tick()
516        {
517            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
518                location: self.location,
519                ir_node: self.ir_node,
520                _phantom: PhantomData,
521            };
522
523            let out = into_singleton_inside_tick(
524                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
525            )
526            .latest();
527            Singleton::new(out.location, out.ir_node.into_inner())
528        } else {
529            panic!("Unbounded KeyedSingleton inside a tick");
530        }
531    }
532
533    /// An operator which allows you to "name" a `HydroNode`.
534    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
535    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
536        {
537            let mut node = self.ir_node.borrow_mut();
538            let metadata = node.metadata_mut();
539            metadata.tag = Some(name.to_string());
540        }
541        self
542    }
543}
544
545impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
546    KeyedSingleton<K, V, L, B>
547{
548    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
549    ///
550    /// The value for each key must be bounded, otherwise the resulting stream elements would be
551    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
552    /// into the output.
553    ///
554    /// # Example
555    /// ```rust
556    /// # #[cfg(feature = "deploy")] {
557    /// # use hydro_lang::prelude::*;
558    /// # use futures::StreamExt;
559    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
560    /// let keyed_singleton = // { 1: 2, 2: 4 }
561    /// # process
562    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
563    /// #     .into_keyed()
564    /// #     .first();
565    /// keyed_singleton.entries()
566    /// # }, |mut stream| async move {
567    /// // (1, 2), (2, 4) in any order
568    /// # let mut results = Vec::new();
569    /// # for _ in 0..2 {
570    /// #     results.push(stream.next().await.unwrap());
571    /// # }
572    /// # results.sort();
573    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
574    /// # }));
575    /// # }
576    /// ```
577    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
578        self.into_keyed_stream().entries()
579    }
580
581    /// Flattens the keyed singleton into an unordered stream of just the values.
582    ///
583    /// The value for each key must be bounded, otherwise the resulting stream elements would be
584    /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
585    /// into the output.
586    ///
587    /// # Example
588    /// ```rust
589    /// # #[cfg(feature = "deploy")] {
590    /// # use hydro_lang::prelude::*;
591    /// # use futures::StreamExt;
592    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
593    /// let keyed_singleton = // { 1: 2, 2: 4 }
594    /// # process
595    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
596    /// #     .into_keyed()
597    /// #     .first();
598    /// keyed_singleton.values()
599    /// # }, |mut stream| async move {
600    /// // 2, 4 in any order
601    /// # let mut results = Vec::new();
602    /// # for _ in 0..2 {
603    /// #     results.push(stream.next().await.unwrap());
604    /// # }
605    /// # results.sort();
606    /// # assert_eq!(results, vec![2, 4]);
607    /// # }));
608    /// # }
609    /// ```
610    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
611        let map_f = q!(|(_, v)| v)
612            .splice_fn1_ctx::<(K, V), V>(&self.location)
613            .into();
614
615        Stream::new(
616            self.location.clone(),
617            HydroNode::Map {
618                f: map_f,
619                input: Box::new(self.ir_node.into_inner()),
620                metadata: self.location.new_node_metadata(Stream::<
621                    V,
622                    L,
623                    B::UnderlyingBound,
624                    NoOrder,
625                    ExactlyOnce,
626                >::collection_kind()),
627            },
628        )
629    }
630
631    /// Flattens the keyed singleton into an unordered stream of just the keys.
632    ///
633    /// The value for each key must be bounded, otherwise the removal of keys would result in
634    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
635    /// into the output.
636    ///
637    /// # Example
638    /// ```rust
639    /// # #[cfg(feature = "deploy")] {
640    /// # use hydro_lang::prelude::*;
641    /// # use futures::StreamExt;
642    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
643    /// let keyed_singleton = // { 1: 2, 2: 4 }
644    /// # process
645    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
646    /// #     .into_keyed()
647    /// #     .first();
648    /// keyed_singleton.keys()
649    /// # }, |mut stream| async move {
650    /// // 1, 2 in any order
651    /// # let mut results = Vec::new();
652    /// # for _ in 0..2 {
653    /// #     results.push(stream.next().await.unwrap());
654    /// # }
655    /// # results.sort();
656    /// # assert_eq!(results, vec![1, 2]);
657    /// # }));
658    /// # }
659    /// ```
660    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
661        self.entries().map(q!(|(k, _)| k))
662    }
663
664    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
665    /// entries whose keys are not in the provided stream.
666    ///
667    /// # Example
668    /// ```rust
669    /// # #[cfg(feature = "deploy")] {
670    /// # use hydro_lang::prelude::*;
671    /// # use futures::StreamExt;
672    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
673    /// let tick = process.tick();
674    /// let keyed_singleton = // { 1: 2, 2: 4 }
675    /// # process
676    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
677    /// #     .into_keyed()
678    /// #     .first()
679    /// #     .batch(&tick, nondet!(/** test */));
680    /// let keys_to_remove = process
681    ///     .source_iter(q!(vec![1]))
682    ///     .batch(&tick, nondet!(/** test */));
683    /// keyed_singleton.filter_key_not_in(keys_to_remove)
684    /// #   .entries().all_ticks()
685    /// # }, |mut stream| async move {
686    /// // { 2: 4 }
687    /// # for w in vec![(2, 4)] {
688    /// #     assert_eq!(stream.next().await.unwrap(), w);
689    /// # }
690    /// # }));
691    /// # }
692    /// ```
693    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
694        self,
695        other: Stream<K, L, Bounded, O2, R2>,
696    ) -> Self
697    where
698        K: Hash + Eq,
699    {
700        check_matching_location(&self.location, &other.location);
701
702        KeyedSingleton::new(
703            self.location.clone(),
704            HydroNode::AntiJoin {
705                pos: Box::new(self.ir_node.into_inner()),
706                neg: Box::new(other.ir_node.into_inner()),
707                metadata: self.location.new_node_metadata(Self::collection_kind()),
708            },
709        )
710    }
711
712    /// An operator which allows you to "inspect" each value of a keyed singleton without
713    /// modifying it. The closure `f` is called on a reference to each value. This is
714    /// mainly useful for debugging, and should not be used to generate side-effects.
715    ///
716    /// # Example
717    /// ```rust
718    /// # #[cfg(feature = "deploy")] {
719    /// # use hydro_lang::prelude::*;
720    /// # use futures::StreamExt;
721    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
722    /// let keyed_singleton = // { 1: 2, 2: 4 }
723    /// # process
724    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
725    /// #     .into_keyed()
726    /// #     .first();
727    /// keyed_singleton
728    ///     .inspect(q!(|v| println!("{}", v)))
729    /// #   .entries()
730    /// # }, |mut stream| async move {
731    /// // { 1: 2, 2: 4 }
732    /// # for w in vec![(1, 2), (2, 4)] {
733    /// #     assert_eq!(stream.next().await.unwrap(), w);
734    /// # }
735    /// # }));
736    /// # }
737    /// ```
738    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
739    where
740        F: Fn(&V) + 'a,
741    {
742        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
743        let inspect_f = q!({
744            let orig = f;
745            move |t: &(_, _)| orig(&t.1)
746        })
747        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
748        .into();
749
750        KeyedSingleton::new(
751            self.location.clone(),
752            HydroNode::Inspect {
753                f: inspect_f,
754                input: Box::new(self.ir_node.into_inner()),
755                metadata: self.location.new_node_metadata(Self::collection_kind()),
756            },
757        )
758    }
759
760    /// An operator which allows you to "inspect" each entry of a keyed singleton without
761    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
762    /// mainly useful for debugging, and should not be used to generate side-effects.
763    ///
764    /// # Example
765    /// ```rust
766    /// # #[cfg(feature = "deploy")] {
767    /// # use hydro_lang::prelude::*;
768    /// # use futures::StreamExt;
769    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770    /// let keyed_singleton = // { 1: 2, 2: 4 }
771    /// # process
772    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
773    /// #     .into_keyed()
774    /// #     .first();
775    /// keyed_singleton
776    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
777    /// #   .entries()
778    /// # }, |mut stream| async move {
779    /// // { 1: 2, 2: 4 }
780    /// # for w in vec![(1, 2), (2, 4)] {
781    /// #     assert_eq!(stream.next().await.unwrap(), w);
782    /// # }
783    /// # }));
784    /// # }
785    /// ```
786    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
787    where
788        F: Fn(&(K, V)) + 'a,
789    {
790        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
791
792        KeyedSingleton::new(
793            self.location.clone(),
794            HydroNode::Inspect {
795                f: inspect_f,
796                input: Box::new(self.ir_node.into_inner()),
797                metadata: self.location.new_node_metadata(Self::collection_kind()),
798            },
799        )
800    }
801
802    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
803    ///
804    /// Because this method requires values to be bounded, the output [`Optional`] will only be
805    /// asynchronously updated if a new key is added that is higher than the previous max key.
806    ///
807    /// # Example
808    /// ```rust
809    /// # #[cfg(feature = "deploy")] {
810    /// # use hydro_lang::prelude::*;
811    /// # use futures::StreamExt;
812    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813    /// let tick = process.tick();
814    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
815    /// # process
816    /// #     .source_iter(q!(vec![(1, 123), (2, 456), (0, 789)]))
817    /// #     .into_keyed()
818    /// #     .first();
819    /// keyed_singleton.get_max_key()
820    /// # .sample_eager(nondet!(/** test */))
821    /// # }, |mut stream| async move {
822    /// // (2, 456)
823    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
824    /// # }));
825    /// # }
826    /// ```
827    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
828    where
829        K: Ord,
830    {
831        self.entries()
832            .assume_ordering(nondet!(
833                /// There is only one element associated with each key, and the keys are totallly
834                /// ordered so we will produce a deterministic value. We can't call
835                /// `reduce_commutative_idempotent` because the closure technically isn't commutative
836                /// in the case where both passed entries have the same key but different values.
837                ///
838                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
839                /// the two inputs do not have the same key.
840            ))
841            .reduce_idempotent(q!({
842                move |curr, new| {
843                    if new.0 > curr.0 {
844                        *curr = new;
845                    }
846                }
847            }))
848    }
849
850    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
851    /// element, the value.
852    ///
853    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
854    ///
855    /// # Example
856    /// ```rust
857    /// # #[cfg(feature = "deploy")] {
858    /// # use hydro_lang::prelude::*;
859    /// # use futures::StreamExt;
860    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
861    /// let keyed_singleton = // { 1: 2, 2: 4 }
862    /// # process
863    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
864    /// #     .into_keyed()
865    /// #     .first();
866    /// keyed_singleton
867    ///     .clone()
868    ///     .into_keyed_stream()
869    ///     .interleave(
870    ///         keyed_singleton.into_keyed_stream()
871    ///     )
872    /// #   .entries()
873    /// # }, |mut stream| async move {
874    /// /// // { 1: [2, 2], 2: [4, 4] }
875    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
876    /// #     assert_eq!(stream.next().await.unwrap(), w);
877    /// # }
878    /// # }));
879    /// # }
880    /// ```
881    pub fn into_keyed_stream(
882        self,
883    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
884        KeyedStream::new(
885            self.location.clone(),
886            HydroNode::Cast {
887                inner: Box::new(self.ir_node.into_inner()),
888                metadata: self.location.new_node_metadata(KeyedStream::<
889                    K,
890                    V,
891                    L,
892                    B::UnderlyingBound,
893                    TotalOrder,
894                    ExactlyOnce,
895                >::collection_kind()),
896            },
897        )
898    }
899}
900
901impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
902    /// Gets the value associated with a specific key from the keyed singleton.
903    ///
904    /// # Example
905    /// ```rust
906    /// # #[cfg(feature = "deploy")] {
907    /// # use hydro_lang::prelude::*;
908    /// # use futures::StreamExt;
909    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
910    /// let tick = process.tick();
911    /// let keyed_data = process
912    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
913    ///     .into_keyed()
914    ///     .batch(&tick, nondet!(/** test */))
915    ///     .first();
916    /// let key = tick.singleton(q!(1));
917    /// keyed_data.get(key).all_ticks()
918    /// # }, |mut stream| async move {
919    /// // 2
920    /// # assert_eq!(stream.next().await.unwrap(), 2);
921    /// # }));
922    /// # }
923    /// ```
924    pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
925        self.entries()
926            .join(key.into_stream().map(q!(|k| (k, ()))))
927            .map(q!(|(_, (v, _))| v))
928            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
929            .first()
930    }
931
932    /// Given a keyed stream of lookup requests, where the key is the lookup and the value
933    /// is some additional metadata, emits a keyed stream of lookup results where the key
934    /// is the same as before, but the value is a tuple of the lookup result and the metadata
935    /// of the request. If the key is not found, no output will be produced.
936    ///
937    /// # Example
938    /// ```rust
939    /// # #[cfg(feature = "deploy")] {
940    /// # use hydro_lang::prelude::*;
941    /// # use futures::StreamExt;
942    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
943    /// let tick = process.tick();
944    /// let keyed_data = process
945    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
946    ///     .into_keyed()
947    ///     .batch(&tick, nondet!(/** test */))
948    ///     .first();
949    /// let other_data = process
950    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
951    ///     .into_keyed()
952    ///     .batch(&tick, nondet!(/** test */));
953    /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
954    /// # }, |mut stream| async move {
955    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
956    /// # let mut results = vec![];
957    /// # for _ in 0..3 {
958    /// #     results.push(stream.next().await.unwrap());
959    /// # }
960    /// # results.sort();
961    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
962    /// # }));
963    /// # }
964    /// ```
965    pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
966        self,
967        requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
968    ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
969        self.entries()
970            .weaker_retries::<R2>()
971            .join(requests.entries())
972            .into_keyed()
973    }
974
975    /// Given a keyed stream of lookup requests, where the key is the lookup and the value
976    /// is some additional metadata, emits a keyed stream of lookup results where the key
977    /// is the same as before, but the value is a tuple of the lookup result (as `Option<V>`)
978    /// and the metadata of the request. Unlike `get_many_if_present`, this returns all request
979    /// keys, with `None` for keys that are not found.
980    ///
981    /// # Example
982    /// ```rust
983    /// # #[cfg(feature = "deploy")] {
984    /// # use hydro_lang::prelude::*;
985    /// # use futures::StreamExt;
986    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
987    /// let tick = process.tick();
988    /// let keyed_data = process
989    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
990    ///     .into_keyed()
991    ///     .batch(&tick, nondet!(/** test */))
992    ///     .first();
993    /// let other_data = process
994    ///     .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
995    ///     .into_keyed()
996    ///     .batch(&tick, nondet!(/** test */));
997    /// keyed_data.get_many(other_data).entries().all_ticks()
998    /// # }, |mut stream| async move {
999    /// // { 1: [(Some(10), 100)], 2: [(Some(20), 200)], 3: [(None, 300)] } in any order
1000    /// # let mut results = vec![];
1001    /// # for _ in 0..3 {
1002    /// #     results.push(stream.next().await.unwrap());
1003    /// # }
1004    /// # results.sort();
1005    /// # assert_eq!(results, vec![(1, (Some(10), 100)), (2, (Some(20), 200)), (3, (None, 300))]);
1006    /// # }));
1007    /// # }
1008    /// ```
1009    #[expect(clippy::type_complexity, reason = "stream types")]
1010    pub fn get_many<O2: Ordering, R2: Retries, V2>(
1011        self,
1012        requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
1013    ) -> KeyedStream<K, (Option<V>, V2), Tick<L>, Bounded, NoOrder, R2>
1014    where
1015        K: Clone,
1016        V: Clone,
1017        V2: Clone,
1018    {
1019        let lookup_result = self.clone().get_many_if_present(requests.clone());
1020        let missing_keys = requests.filter_key_not_in(self.keys()).weakest_ordering();
1021
1022        lookup_result
1023            .map(q!(|(v, v2)| (Some(v), v2)))
1024            .chain(missing_keys.map(q!(|v2| (None, v2))))
1025    }
1026
1027    /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
1028    /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
1029    /// containing the value from `self` and an option of the value from `from`. If the key is not
1030    /// present in `from`, the option will be [`None`].
1031    ///
1032    /// # Example
1033    /// ```rust
1034    /// # #[cfg(feature = "deploy")] {
1035    /// # use hydro_lang::prelude::*;
1036    /// # use futures::StreamExt;
1037    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1038    /// # let tick = process.tick();
1039    /// let requests = // { 1: 10, 2: 20 }
1040    /// # process
1041    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
1042    /// #     .into_keyed()
1043    /// #     .batch(&tick, nondet!(/** test */))
1044    /// #     .first();
1045    /// let other_data = // { 10: 100, 11: 101 }
1046    /// # process
1047    /// #     .source_iter(q!(vec![(10, 100), (11, 101)]))
1048    /// #     .into_keyed()
1049    /// #     .batch(&tick, nondet!(/** test */))
1050    /// #     .first();
1051    /// requests.get_from(other_data)
1052    /// # .entries().all_ticks()
1053    /// # }, |mut stream| async move {
1054    /// // { 1: (10, Some(100)), 2: (20, None) }
1055    /// # let mut results = vec![];
1056    /// # for _ in 0..2 {
1057    /// #     results.push(stream.next().await.unwrap());
1058    /// # }
1059    /// # results.sort();
1060    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
1061    /// # }));
1062    /// # }
1063    /// ```
1064    pub fn get_from<V2: Clone>(
1065        self,
1066        from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
1067    ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
1068    where
1069        K: Clone,
1070        V: Hash + Eq + Clone,
1071    {
1072        let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
1073        let lookup_result = from.get_many_if_present(to_lookup.clone());
1074        let missing_values =
1075            to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
1076        let result_stream = lookup_result
1077            .entries()
1078            .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
1079            .into_keyed()
1080            .chain(
1081                missing_values
1082                    .entries()
1083                    .map(q!(|(v, k)| (k, (v, None))))
1084                    .into_keyed(),
1085            );
1086
1087        KeyedSingleton::new(
1088            result_stream.location.clone(),
1089            HydroNode::Cast {
1090                inner: Box::new(result_stream.ir_node.into_inner()),
1091                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
1092                    K,
1093                    (V, Option<V2>),
1094                    Tick<L>,
1095                    Bounded,
1096                >::collection_kind(
1097                )),
1098            },
1099        )
1100    }
1101}
1102
1103impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1104where
1105    L: Location<'a>,
1106{
1107    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1108    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1109    ///
1110    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1111    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1112    /// argument that declares where the keyed singleton will be atomically processed. Batching a
1113    /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1114    /// batching into a different [`Tick`] will introduce asynchrony.
1115    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1116        let out_location = Atomic { tick: tick.clone() };
1117        KeyedSingleton::new(
1118            out_location.clone(),
1119            HydroNode::BeginAtomic {
1120                inner: Box::new(self.ir_node.into_inner()),
1121                metadata: out_location
1122                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1123            },
1124        )
1125    }
1126}
1127
1128impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1129where
1130    L: Location<'a> + NoTick,
1131{
1132    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1133    /// See [`KeyedSingleton::atomic`] for more details.
1134    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1135        KeyedSingleton::new(
1136            self.location.tick.l.clone(),
1137            HydroNode::EndAtomic {
1138                inner: Box::new(self.ir_node.into_inner()),
1139                metadata: self
1140                    .location
1141                    .tick
1142                    .l
1143                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1144            },
1145        )
1146    }
1147}
1148
1149impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1150    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1151    /// tick `T` always has the entries of `self` at tick `T - 1`.
1152    ///
1153    /// At tick `0`, the output has no entries, since there is no previous tick.
1154    ///
1155    /// This operator enables stateful iterative processing with ticks, by sending data from one
1156    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1157    ///
1158    /// # Example
1159    /// ```rust
1160    /// # #[cfg(feature = "deploy")] {
1161    /// # use hydro_lang::prelude::*;
1162    /// # use futures::StreamExt;
1163    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1164    /// let tick = process.tick();
1165    /// # // ticks are lazy by default, forces the second tick to run
1166    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1167    /// # let batch_first_tick = process
1168    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1169    /// #   .batch(&tick, nondet!(/** test */))
1170    /// #   .into_keyed();
1171    /// # let batch_second_tick = process
1172    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1173    /// #   .batch(&tick, nondet!(/** test */))
1174    /// #   .into_keyed()
1175    /// #   .defer_tick(); // appears on the second tick
1176    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1177    /// # batch_first_tick.chain(batch_second_tick).first();
1178    /// input_batch.clone().filter_key_not_in(
1179    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1180    /// )
1181    /// # .entries().all_ticks()
1182    /// # }, |mut stream| async move {
1183    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1184    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1185    /// #     assert_eq!(stream.next().await.unwrap(), w);
1186    /// # }
1187    /// # }));
1188    /// # }
1189    /// ```
1190    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1191        KeyedSingleton::new(
1192            self.location.clone(),
1193            HydroNode::DeferTick {
1194                input: Box::new(self.ir_node.into_inner()),
1195                metadata: self
1196                    .location
1197                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1198            },
1199        )
1200    }
1201}
1202
1203impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1204where
1205    L: Location<'a>,
1206{
1207    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1208    /// point in time.
1209    ///
1210    /// # Non-Determinism
1211    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1212    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1213    pub fn snapshot(
1214        self,
1215        tick: &Tick<L>,
1216        _nondet: NonDet,
1217    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1218        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1219        KeyedSingleton::new(
1220            tick.clone(),
1221            HydroNode::Batch {
1222                inner: Box::new(self.ir_node.into_inner()),
1223                metadata: tick
1224                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1225            },
1226        )
1227    }
1228}
1229
1230impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1231where
1232    L: Location<'a> + NoTick,
1233{
1234    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1235    /// state of the keyed singleton being atomically processed.
1236    ///
1237    /// # Non-Determinism
1238    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1239    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1240    pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1241        KeyedSingleton::new(
1242            self.location.clone().tick,
1243            HydroNode::Batch {
1244                inner: Box::new(self.ir_node.into_inner()),
1245                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1246                    K,
1247                    V,
1248                    Tick<L>,
1249                    Bounded,
1250                >::collection_kind(
1251                )),
1252            },
1253        )
1254    }
1255}
1256
1257impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1258where
1259    L: Location<'a>,
1260{
1261    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1262    ///
1263    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1264    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1265    /// is filtered out.
1266    ///
1267    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1268    /// not modify or take ownership of the values. If you need to modify the values while filtering
1269    /// use [`KeyedSingleton::filter_map`] instead.
1270    ///
1271    /// # Example
1272    /// ```rust
1273    /// # #[cfg(feature = "deploy")] {
1274    /// # use hydro_lang::prelude::*;
1275    /// # use futures::StreamExt;
1276    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1277    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1278    /// # process
1279    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1280    /// #     .into_keyed()
1281    /// #     .first();
1282    /// keyed_singleton.filter(q!(|&v| v > 1))
1283    /// #   .entries()
1284    /// # }, |mut stream| async move {
1285    /// // { 1: 2, 2: 4 }
1286    /// # let mut results = Vec::new();
1287    /// # for _ in 0..2 {
1288    /// #     results.push(stream.next().await.unwrap());
1289    /// # }
1290    /// # results.sort();
1291    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1292    /// # }));
1293    /// # }
1294    /// ```
1295    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1296    where
1297        F: Fn(&V) -> bool + 'a,
1298    {
1299        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1300        let filter_f = q!({
1301            let orig = f;
1302            move |t: &(_, _)| orig(&t.1)
1303        })
1304        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1305        .into();
1306
1307        KeyedSingleton::new(
1308            self.location.clone(),
1309            HydroNode::Filter {
1310                f: filter_f,
1311                input: Box::new(self.ir_node.into_inner()),
1312                metadata: self
1313                    .location
1314                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1315            },
1316        )
1317    }
1318
1319    /// An operator that both filters and maps values. It yields only the key-value pairs where
1320    /// the supplied closure `f` returns `Some(value)`.
1321    ///
1322    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1323    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1324    /// If it returns `None`, the key-value pair is filtered out.
1325    ///
1326    /// # Example
1327    /// ```rust
1328    /// # #[cfg(feature = "deploy")] {
1329    /// # use hydro_lang::prelude::*;
1330    /// # use futures::StreamExt;
1331    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1332    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1333    /// # process
1334    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1335    /// #     .into_keyed()
1336    /// #     .first();
1337    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1338    /// #   .entries()
1339    /// # }, |mut stream| async move {
1340    /// // { 1: 42, 3: 100 }
1341    /// # let mut results = Vec::new();
1342    /// # for _ in 0..2 {
1343    /// #     results.push(stream.next().await.unwrap());
1344    /// # }
1345    /// # results.sort();
1346    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1347    /// # }));
1348    /// # }
1349    /// ```
1350    pub fn filter_map<F, U>(
1351        self,
1352        f: impl IntoQuotedMut<'a, F, L> + Copy,
1353    ) -> KeyedSingleton<K, U, L, B>
1354    where
1355        F: Fn(V) -> Option<U> + 'a,
1356    {
1357        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1358        let filter_map_f = q!({
1359            let orig = f;
1360            move |(k, v)| orig(v).map(|o| (k, o))
1361        })
1362        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1363        .into();
1364
1365        KeyedSingleton::new(
1366            self.location.clone(),
1367            HydroNode::FilterMap {
1368                f: filter_map_f,
1369                input: Box::new(self.ir_node.into_inner()),
1370                metadata: self
1371                    .location
1372                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1373            },
1374        )
1375    }
1376
1377    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1378    /// arrived since the previous batch was released.
1379    ///
1380    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1381    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1382    ///
1383    /// # Non-Determinism
1384    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1385    /// has a non-deterministic set of key-value pairs.
1386    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1387    where
1388        L: NoTick,
1389    {
1390        self.atomic(tick).batch_atomic(nondet)
1391    }
1392}
1393
1394impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1395where
1396    L: Location<'a> + NoTick,
1397{
1398    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1399    /// atomically processed.
1400    ///
1401    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1402    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1403    ///
1404    /// # Non-Determinism
1405    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1406    /// has a non-deterministic set of key-value pairs.
1407    pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1408        let _ = nondet;
1409        KeyedSingleton::new(
1410            self.location.clone().tick,
1411            HydroNode::Batch {
1412                inner: Box::new(self.ir_node.into_inner()),
1413                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1414                    K,
1415                    V,
1416                    Tick<L>,
1417                    Bounded,
1418                >::collection_kind(
1419                )),
1420            },
1421        )
1422    }
1423}
1424
1425#[cfg(test)]
1426mod tests {
1427    #[cfg(feature = "deploy")]
1428    use futures::{SinkExt, StreamExt};
1429    #[cfg(feature = "deploy")]
1430    use hydro_deploy::Deployment;
1431    #[cfg(any(feature = "deploy", feature = "sim"))]
1432    use stageleft::q;
1433
1434    #[cfg(any(feature = "deploy", feature = "sim"))]
1435    use crate::compile::builder::FlowBuilder;
1436    #[cfg(any(feature = "deploy", feature = "sim"))]
1437    use crate::location::Location;
1438    #[cfg(any(feature = "deploy", feature = "sim"))]
1439    use crate::nondet::nondet;
1440
1441    #[cfg(feature = "deploy")]
1442    #[tokio::test]
1443    async fn key_count_bounded_value() {
1444        let mut deployment = Deployment::new();
1445
1446        let flow = FlowBuilder::new();
1447        let node = flow.process::<()>();
1448        let external = flow.external::<()>();
1449
1450        let (input_port, input) = node.source_external_bincode(&external);
1451        let out = input
1452            .into_keyed()
1453            .first()
1454            .key_count()
1455            .sample_eager(nondet!(/** test */))
1456            .send_bincode_external(&external);
1457
1458        let nodes = flow
1459            .with_process(&node, deployment.Localhost())
1460            .with_external(&external, deployment.Localhost())
1461            .deploy(&mut deployment);
1462
1463        deployment.deploy().await.unwrap();
1464
1465        let mut external_in = nodes.connect(input_port).await;
1466        let mut external_out = nodes.connect(out).await;
1467
1468        deployment.start().await.unwrap();
1469
1470        assert_eq!(external_out.next().await.unwrap(), 0);
1471
1472        external_in.send((1, 1)).await.unwrap();
1473        assert_eq!(external_out.next().await.unwrap(), 1);
1474
1475        external_in.send((2, 2)).await.unwrap();
1476        assert_eq!(external_out.next().await.unwrap(), 2);
1477    }
1478
1479    #[cfg(feature = "deploy")]
1480    #[tokio::test]
1481    async fn key_count_unbounded_value() {
1482        let mut deployment = Deployment::new();
1483
1484        let flow = FlowBuilder::new();
1485        let node = flow.process::<()>();
1486        let external = flow.external::<()>();
1487
1488        let (input_port, input) = node.source_external_bincode(&external);
1489        let out = input
1490            .into_keyed()
1491            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1492            .key_count()
1493            .sample_eager(nondet!(/** test */))
1494            .send_bincode_external(&external);
1495
1496        let nodes = flow
1497            .with_process(&node, deployment.Localhost())
1498            .with_external(&external, deployment.Localhost())
1499            .deploy(&mut deployment);
1500
1501        deployment.deploy().await.unwrap();
1502
1503        let mut external_in = nodes.connect(input_port).await;
1504        let mut external_out = nodes.connect(out).await;
1505
1506        deployment.start().await.unwrap();
1507
1508        assert_eq!(external_out.next().await.unwrap(), 0);
1509
1510        external_in.send((1, 1)).await.unwrap();
1511        assert_eq!(external_out.next().await.unwrap(), 1);
1512
1513        external_in.send((1, 2)).await.unwrap();
1514        assert_eq!(external_out.next().await.unwrap(), 1);
1515
1516        external_in.send((2, 2)).await.unwrap();
1517        assert_eq!(external_out.next().await.unwrap(), 2);
1518
1519        external_in.send((1, 1)).await.unwrap();
1520        assert_eq!(external_out.next().await.unwrap(), 2);
1521
1522        external_in.send((3, 1)).await.unwrap();
1523        assert_eq!(external_out.next().await.unwrap(), 3);
1524    }
1525
1526    #[cfg(feature = "deploy")]
1527    #[tokio::test]
1528    async fn into_singleton_bounded_value() {
1529        let mut deployment = Deployment::new();
1530
1531        let flow = FlowBuilder::new();
1532        let node = flow.process::<()>();
1533        let external = flow.external::<()>();
1534
1535        let (input_port, input) = node.source_external_bincode(&external);
1536        let out = input
1537            .into_keyed()
1538            .first()
1539            .into_singleton()
1540            .sample_eager(nondet!(/** test */))
1541            .send_bincode_external(&external);
1542
1543        let nodes = flow
1544            .with_process(&node, deployment.Localhost())
1545            .with_external(&external, deployment.Localhost())
1546            .deploy(&mut deployment);
1547
1548        deployment.deploy().await.unwrap();
1549
1550        let mut external_in = nodes.connect(input_port).await;
1551        let mut external_out = nodes.connect(out).await;
1552
1553        deployment.start().await.unwrap();
1554
1555        assert_eq!(
1556            external_out.next().await.unwrap(),
1557            std::collections::HashMap::new()
1558        );
1559
1560        external_in.send((1, 1)).await.unwrap();
1561        assert_eq!(
1562            external_out.next().await.unwrap(),
1563            vec![(1, 1)].into_iter().collect()
1564        );
1565
1566        external_in.send((2, 2)).await.unwrap();
1567        assert_eq!(
1568            external_out.next().await.unwrap(),
1569            vec![(1, 1), (2, 2)].into_iter().collect()
1570        );
1571    }
1572
1573    #[cfg(feature = "deploy")]
1574    #[tokio::test]
1575    async fn into_singleton_unbounded_value() {
1576        let mut deployment = Deployment::new();
1577
1578        let flow = FlowBuilder::new();
1579        let node = flow.process::<()>();
1580        let external = flow.external::<()>();
1581
1582        let (input_port, input) = node.source_external_bincode(&external);
1583        let out = input
1584            .into_keyed()
1585            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1586            .into_singleton()
1587            .sample_eager(nondet!(/** test */))
1588            .send_bincode_external(&external);
1589
1590        let nodes = flow
1591            .with_process(&node, deployment.Localhost())
1592            .with_external(&external, deployment.Localhost())
1593            .deploy(&mut deployment);
1594
1595        deployment.deploy().await.unwrap();
1596
1597        let mut external_in = nodes.connect(input_port).await;
1598        let mut external_out = nodes.connect(out).await;
1599
1600        deployment.start().await.unwrap();
1601
1602        assert_eq!(
1603            external_out.next().await.unwrap(),
1604            std::collections::HashMap::new()
1605        );
1606
1607        external_in.send((1, 1)).await.unwrap();
1608        assert_eq!(
1609            external_out.next().await.unwrap(),
1610            vec![(1, 1)].into_iter().collect()
1611        );
1612
1613        external_in.send((1, 2)).await.unwrap();
1614        assert_eq!(
1615            external_out.next().await.unwrap(),
1616            vec![(1, 2)].into_iter().collect()
1617        );
1618
1619        external_in.send((2, 2)).await.unwrap();
1620        assert_eq!(
1621            external_out.next().await.unwrap(),
1622            vec![(1, 2), (2, 1)].into_iter().collect()
1623        );
1624
1625        external_in.send((1, 1)).await.unwrap();
1626        assert_eq!(
1627            external_out.next().await.unwrap(),
1628            vec![(1, 3), (2, 1)].into_iter().collect()
1629        );
1630
1631        external_in.send((3, 1)).await.unwrap();
1632        assert_eq!(
1633            external_out.next().await.unwrap(),
1634            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1635        );
1636    }
1637
1638    #[cfg(feature = "sim")]
1639    #[test]
1640    fn sim_unbounded_singleton_snapshot() {
1641        let flow = FlowBuilder::new();
1642        let node = flow.process::<()>();
1643
1644        let (input_port, input) = node.sim_input();
1645        let output = input
1646            .into_keyed()
1647            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1648            .snapshot(&node.tick(), nondet!(/** test */))
1649            .entries()
1650            .all_ticks()
1651            .sim_output();
1652
1653        let count = flow.sim().exhaustive(async || {
1654            input_port.send((1, 123));
1655            input_port.send((1, 456));
1656            input_port.send((2, 123));
1657
1658            let all = output.collect_sorted::<Vec<_>>().await;
1659            assert_eq!(all.last().unwrap(), &(2, 1));
1660        });
1661
1662        assert_eq!(count, 8);
1663    }
1664
1665    #[cfg(feature = "deploy")]
1666    #[tokio::test]
1667    async fn get_many_outer_join() {
1668        let mut deployment = Deployment::new();
1669
1670        let flow = FlowBuilder::new();
1671        let node = flow.process::<()>();
1672        let external = flow.external::<()>();
1673
1674        let tick = node.tick();
1675        let keyed_data = node
1676            .source_iter(q!(vec![(1, 10), (2, 20)]))
1677            .into_keyed()
1678            .batch(&tick, nondet!(/** test */))
1679            .first();
1680        let requests = node
1681            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1682            .into_keyed()
1683            .batch(&tick, nondet!(/** test */));
1684
1685        let out = keyed_data
1686            .get_many(requests)
1687            .entries()
1688            .all_ticks()
1689            .send_bincode_external(&external);
1690
1691        let nodes = flow
1692            .with_process(&node, deployment.Localhost())
1693            .with_external(&external, deployment.Localhost())
1694            .deploy(&mut deployment);
1695
1696        deployment.deploy().await.unwrap();
1697
1698        let mut external_out = nodes.connect(out).await;
1699
1700        deployment.start().await.unwrap();
1701
1702        let mut results = vec![];
1703        for _ in 0..3 {
1704            results.push(external_out.next().await.unwrap());
1705        }
1706        results.sort();
1707
1708        assert_eq!(
1709            results,
1710            vec![(1, (Some(10), 100)), (2, (Some(20), 200)), (3, (None, 300))]
1711        );
1712    }
1713}