Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::ir::{
18    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
19};
20#[cfg(stageleft_runtime)]
21use crate::forward_handle::{CycleCollection, ReceiverComplete};
22use crate::forward_handle::{ForwardRef, TickCycle};
23use crate::live_collections::stream::{Ordering, Retries};
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::DeferTick;
27use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
28use crate::manual_expr::ManualExpr;
29use crate::nondet::{NonDet, nondet};
30use crate::properties::ManualProof;
31
32/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
33///
34/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
35/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
36/// indicates that entries may be added over time, but once an entry is added it will never be
37/// removed and its value will never change.
38pub trait KeyedSingletonBound {
39    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
40    type UnderlyingBound: Boundedness;
41    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
42    type ValueBound: Boundedness;
43
44    /// The type of the keyed singleton if the value for each key is immutable.
45    type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
46
47    /// The type of the keyed singleton if the value for each key may change asynchronously.
48    type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
49
50    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
51    fn bound_kind() -> KeyedSingletonBoundKind;
52}
53
54impl KeyedSingletonBound for Unbounded {
55    type UnderlyingBound = Unbounded;
56    type ValueBound = Unbounded;
57    type WithBoundedValue = BoundedValue;
58    type WithUnboundedValue = Unbounded;
59
60    fn bound_kind() -> KeyedSingletonBoundKind {
61        KeyedSingletonBoundKind::Unbounded
62    }
63}
64
65impl KeyedSingletonBound for Bounded {
66    type UnderlyingBound = Bounded;
67    type ValueBound = Bounded;
68    type WithBoundedValue = Bounded;
69    type WithUnboundedValue = UnreachableBound;
70
71    fn bound_kind() -> KeyedSingletonBoundKind {
72        KeyedSingletonBoundKind::Bounded
73    }
74}
75
76/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
77/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
78/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
79pub struct BoundedValue;
80
81impl KeyedSingletonBound for BoundedValue {
82    type UnderlyingBound = Unbounded;
83    type ValueBound = Bounded;
84    type WithBoundedValue = BoundedValue;
85    type WithUnboundedValue = Unbounded;
86
87    fn bound_kind() -> KeyedSingletonBoundKind {
88        KeyedSingletonBoundKind::BoundedValue
89    }
90}
91
92#[doc(hidden)]
93pub struct UnreachableBound;
94
95impl KeyedSingletonBound for UnreachableBound {
96    type UnderlyingBound = Bounded;
97    type ValueBound = Unbounded;
98
99    type WithBoundedValue = Bounded;
100    type WithUnboundedValue = UnreachableBound;
101
102    fn bound_kind() -> KeyedSingletonBoundKind {
103        unreachable!("UnreachableBound cannot be instantiated")
104    }
105}
106
107/// Mapping from keys of type `K` to values of type `V`.
108///
109/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
110/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
111/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
112/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
113/// keys cannot be removed and the value for each key is immutable.
114///
115/// Type Parameters:
116/// - `K`: the type of the key for each entry
117/// - `V`: the type of the value for each entry
118/// - `Loc`: the [`Location`] where the keyed singleton is materialized
119/// - `Bound`: tracks whether the entries are:
120///     - [`Bounded`] (local and finite)
121///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
122///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
123pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
124    pub(crate) location: Loc,
125    pub(crate) ir_node: RefCell<HydroNode>,
126
127    _phantom: PhantomData<(K, V, Loc, Bound)>,
128}
129
130impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
131    for KeyedSingleton<K, V, Loc, Bound>
132{
133    fn clone(&self) -> Self {
134        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
135            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
136            *self.ir_node.borrow_mut() = HydroNode::Tee {
137                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
138                metadata: self.location.new_node_metadata(Self::collection_kind()),
139            };
140        }
141
142        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
143            KeyedSingleton {
144                location: self.location.clone(),
145                ir_node: HydroNode::Tee {
146                    inner: TeeNode(inner.0.clone()),
147                    metadata: metadata.clone(),
148                }
149                .into(),
150                _phantom: PhantomData,
151            }
152        } else {
153            unreachable!()
154        }
155    }
156}
157
158impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
159    for KeyedSingleton<K, V, L, B>
160where
161    L: Location<'a> + NoTick,
162{
163    type Location = L;
164
165    fn create_source(ident: syn::Ident, location: L) -> Self {
166        KeyedSingleton {
167            location: location.clone(),
168            ir_node: RefCell::new(HydroNode::CycleSource {
169                ident,
170                metadata: location.new_node_metadata(Self::collection_kind()),
171            }),
172            _phantom: PhantomData,
173        }
174    }
175}
176
177impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
178where
179    L: Location<'a>,
180{
181    type Location = Tick<L>;
182
183    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
184        KeyedSingleton::new(
185            location.clone(),
186            HydroNode::CycleSource {
187                ident,
188                metadata: location.new_node_metadata(Self::collection_kind()),
189            },
190        )
191    }
192}
193
194impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
195where
196    L: Location<'a>,
197{
198    fn defer_tick(self) -> Self {
199        KeyedSingleton::defer_tick(self)
200    }
201}
202
203impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
204    for KeyedSingleton<K, V, L, B>
205where
206    L: Location<'a> + NoTick,
207{
208    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
209        assert_eq!(
210            Location::id(&self.location),
211            expected_location,
212            "locations do not match"
213        );
214        self.location
215            .flow_state()
216            .borrow_mut()
217            .push_root(HydroRoot::CycleSink {
218                ident,
219                input: Box::new(self.ir_node.into_inner()),
220                op_metadata: HydroIrOpMetadata::new(),
221            });
222    }
223}
224
225impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
226where
227    L: Location<'a>,
228{
229    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
230        assert_eq!(
231            Location::id(&self.location),
232            expected_location,
233            "locations do not match"
234        );
235        self.location
236            .flow_state()
237            .borrow_mut()
238            .push_root(HydroRoot::CycleSink {
239                ident,
240                input: Box::new(self.ir_node.into_inner()),
241                op_metadata: HydroIrOpMetadata::new(),
242            });
243    }
244}
245
246impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
247    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
248        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
249        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
250
251        KeyedSingleton {
252            location,
253            ir_node: RefCell::new(ir_node),
254            _phantom: PhantomData,
255        }
256    }
257
258    /// Returns the [`Location`] where this keyed singleton is being materialized.
259    pub fn location(&self) -> &L {
260        &self.location
261    }
262}
263
264#[cfg(stageleft_runtime)]
265fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
266    me: KeyedSingleton<K, V, L, Bounded>,
267) -> Singleton<usize, L, Bounded> {
268    me.entries().count()
269}
270
271#[cfg(stageleft_runtime)]
272fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
273    me: KeyedSingleton<K, V, L, Bounded>,
274) -> Singleton<HashMap<K, V>, L, Bounded>
275where
276    K: Eq + Hash,
277{
278    me.entries()
279        .assume_ordering(nondet!(
280            /// Because this is a keyed singleton, there is only one value per key.
281        ))
282        .fold(
283            q!(|| HashMap::new()),
284            q!(|map, (k, v)| {
285                map.insert(k, v);
286            }),
287        )
288}
289
290impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
291    pub(crate) fn collection_kind() -> CollectionKind {
292        CollectionKind::KeyedSingleton {
293            bound: B::bound_kind(),
294            key_type: stageleft::quote_type::<K>().into(),
295            value_type: stageleft::quote_type::<V>().into(),
296        }
297    }
298
299    /// Transforms each value by invoking `f` on each element, with keys staying the same
300    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
301    ///
302    /// If you do not want to modify the stream and instead only want to view
303    /// each item use [`KeyedSingleton::inspect`] instead.
304    ///
305    /// # Example
306    /// ```rust
307    /// # #[cfg(feature = "deploy")] {
308    /// # use hydro_lang::prelude::*;
309    /// # use futures::StreamExt;
310    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
311    /// let keyed_singleton = // { 1: 2, 2: 4 }
312    /// # process
313    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
314    /// #     .into_keyed()
315    /// #     .first();
316    /// keyed_singleton.map(q!(|v| v + 1))
317    /// #   .entries()
318    /// # }, |mut stream| async move {
319    /// // { 1: 3, 2: 5 }
320    /// # let mut results = Vec::new();
321    /// # for _ in 0..2 {
322    /// #     results.push(stream.next().await.unwrap());
323    /// # }
324    /// # results.sort();
325    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
326    /// # }));
327    /// # }
328    /// ```
329    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
330    where
331        F: Fn(V) -> U + 'a,
332    {
333        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
334        let map_f = q!({
335            let orig = f;
336            move |(k, v)| (k, orig(v))
337        })
338        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
339        .into();
340
341        KeyedSingleton::new(
342            self.location.clone(),
343            HydroNode::Map {
344                f: map_f,
345                input: Box::new(self.ir_node.into_inner()),
346                metadata: self
347                    .location
348                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
349            },
350        )
351    }
352
353    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
354    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
355    ///
356    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
357    /// the new value `U`. The key remains unchanged in the output.
358    ///
359    /// # Example
360    /// ```rust
361    /// # #[cfg(feature = "deploy")] {
362    /// # use hydro_lang::prelude::*;
363    /// # use futures::StreamExt;
364    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
365    /// let keyed_singleton = // { 1: 2, 2: 4 }
366    /// # process
367    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
368    /// #     .into_keyed()
369    /// #     .first();
370    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
371    /// #   .entries()
372    /// # }, |mut stream| async move {
373    /// // { 1: 3, 2: 6 }
374    /// # let mut results = Vec::new();
375    /// # for _ in 0..2 {
376    /// #     results.push(stream.next().await.unwrap());
377    /// # }
378    /// # results.sort();
379    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
380    /// # }));
381    /// # }
382    /// ```
383    pub fn map_with_key<U, F>(
384        self,
385        f: impl IntoQuotedMut<'a, F, L> + Copy,
386    ) -> KeyedSingleton<K, U, L, B>
387    where
388        F: Fn((K, V)) -> U + 'a,
389        K: Clone,
390    {
391        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
392        let map_f = q!({
393            let orig = f;
394            move |(k, v)| {
395                let out = orig((Clone::clone(&k), v));
396                (k, out)
397            }
398        })
399        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
400        .into();
401
402        KeyedSingleton::new(
403            self.location.clone(),
404            HydroNode::Map {
405                f: map_f,
406                input: Box::new(self.ir_node.into_inner()),
407                metadata: self
408                    .location
409                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
410            },
411        )
412    }
413
414    /// Gets the number of keys in the keyed singleton.
415    ///
416    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
417    /// since keys may be added / removed over time. When the set of keys changes, the count will
418    /// be asynchronously updated.
419    ///
420    /// # Example
421    /// ```rust
422    /// # #[cfg(feature = "deploy")] {
423    /// # use hydro_lang::prelude::*;
424    /// # use futures::StreamExt;
425    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
426    /// # let tick = process.tick();
427    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
428    /// # process
429    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
430    /// #     .into_keyed()
431    /// #     .batch(&tick, nondet!(/** test */))
432    /// #     .first();
433    /// keyed_singleton.key_count()
434    /// # .all_ticks()
435    /// # }, |mut stream| async move {
436    /// // 3
437    /// # assert_eq!(stream.next().await.unwrap(), 3);
438    /// # }));
439    /// # }
440    /// ```
441    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
442        if B::ValueBound::BOUNDED {
443            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
444                location: self.location,
445                ir_node: self.ir_node,
446                _phantom: PhantomData,
447            };
448
449            me.entries().count()
450        } else if L::is_top_level()
451            && let Some(tick) = self.location.try_tick()
452        {
453            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
454                location: self.location,
455                ir_node: self.ir_node,
456                _phantom: PhantomData,
457            };
458
459            let out =
460                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
461                    .latest();
462            Singleton::new(out.location, out.ir_node.into_inner())
463        } else {
464            panic!("Unbounded KeyedSingleton inside a tick");
465        }
466    }
467
468    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
469    ///
470    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
471    /// asynchronously as well.
472    ///
473    /// # Example
474    /// ```rust
475    /// # #[cfg(feature = "deploy")] {
476    /// # use hydro_lang::prelude::*;
477    /// # use futures::StreamExt;
478    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
479    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
480    /// # process
481    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
482    /// #     .into_keyed()
483    /// #     .batch(&process.tick(), nondet!(/** test */))
484    /// #     .first();
485    /// keyed_singleton.into_singleton()
486    /// # .all_ticks()
487    /// # }, |mut stream| async move {
488    /// // { 1: "a", 2: "b", 3: "c" }
489    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
490    /// # }));
491    /// # }
492    /// ```
493    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
494    where
495        K: Eq + Hash,
496    {
497        if B::ValueBound::BOUNDED {
498            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
499                location: self.location,
500                ir_node: self.ir_node,
501                _phantom: PhantomData,
502            };
503
504            me.entries()
505                .assume_ordering(nondet!(
506                    /// Because this is a keyed singleton, there is only one value per key.
507                ))
508                .fold(
509                    q!(|| HashMap::new()),
510                    q!(|map, (k, v)| {
511                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
512                        map.insert(k, v);
513                    }),
514                )
515        } else if L::is_top_level()
516            && let Some(tick) = self.location.try_tick()
517        {
518            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
519                location: self.location,
520                ir_node: self.ir_node,
521                _phantom: PhantomData,
522            };
523
524            let out = into_singleton_inside_tick(
525                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
526            )
527            .latest();
528            Singleton::new(out.location, out.ir_node.into_inner())
529        } else {
530            panic!("Unbounded KeyedSingleton inside a tick");
531        }
532    }
533
534    /// An operator which allows you to "name" a `HydroNode`.
535    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
536    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
537        {
538            let mut node = self.ir_node.borrow_mut();
539            let metadata = node.metadata_mut();
540            metadata.tag = Some(name.to_owned());
541        }
542        self
543    }
544}
545
546impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
547    KeyedSingleton<K, V, L, B>
548{
549    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
550    ///
551    /// The value for each key must be bounded, otherwise the resulting stream elements would be
552    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
553    /// into the output.
554    ///
555    /// # Example
556    /// ```rust
557    /// # #[cfg(feature = "deploy")] {
558    /// # use hydro_lang::prelude::*;
559    /// # use futures::StreamExt;
560    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
561    /// let keyed_singleton = // { 1: 2, 2: 4 }
562    /// # process
563    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
564    /// #     .into_keyed()
565    /// #     .first();
566    /// keyed_singleton.entries()
567    /// # }, |mut stream| async move {
568    /// // (1, 2), (2, 4) in any order
569    /// # let mut results = Vec::new();
570    /// # for _ in 0..2 {
571    /// #     results.push(stream.next().await.unwrap());
572    /// # }
573    /// # results.sort();
574    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
575    /// # }));
576    /// # }
577    /// ```
578    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
579        self.into_keyed_stream().entries()
580    }
581
582    /// Flattens the keyed singleton into an unordered stream of just the values.
583    ///
584    /// The value for each key must be bounded, otherwise the resulting stream elements would be
585    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
586    /// into the output.
587    ///
588    /// # Example
589    /// ```rust
590    /// # #[cfg(feature = "deploy")] {
591    /// # use hydro_lang::prelude::*;
592    /// # use futures::StreamExt;
593    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
594    /// let keyed_singleton = // { 1: 2, 2: 4 }
595    /// # process
596    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
597    /// #     .into_keyed()
598    /// #     .first();
599    /// keyed_singleton.values()
600    /// # }, |mut stream| async move {
601    /// // 2, 4 in any order
602    /// # let mut results = Vec::new();
603    /// # for _ in 0..2 {
604    /// #     results.push(stream.next().await.unwrap());
605    /// # }
606    /// # results.sort();
607    /// # assert_eq!(results, vec![2, 4]);
608    /// # }));
609    /// # }
610    /// ```
611    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
612        let map_f = q!(|(_, v)| v)
613            .splice_fn1_ctx::<(K, V), V>(&self.location)
614            .into();
615
616        Stream::new(
617            self.location.clone(),
618            HydroNode::Map {
619                f: map_f,
620                input: Box::new(self.ir_node.into_inner()),
621                metadata: self.location.new_node_metadata(Stream::<
622                    V,
623                    L,
624                    B::UnderlyingBound,
625                    NoOrder,
626                    ExactlyOnce,
627                >::collection_kind()),
628            },
629        )
630    }
631
632    /// Flattens the keyed singleton into an unordered stream of just the keys.
633    ///
634    /// The value for each key must be bounded, otherwise the removal of keys would result in
635    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
636    /// into the output.
637    ///
638    /// # Example
639    /// ```rust
640    /// # #[cfg(feature = "deploy")] {
641    /// # use hydro_lang::prelude::*;
642    /// # use futures::StreamExt;
643    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
644    /// let keyed_singleton = // { 1: 2, 2: 4 }
645    /// # process
646    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
647    /// #     .into_keyed()
648    /// #     .first();
649    /// keyed_singleton.keys()
650    /// # }, |mut stream| async move {
651    /// // 1, 2 in any order
652    /// # let mut results = Vec::new();
653    /// # for _ in 0..2 {
654    /// #     results.push(stream.next().await.unwrap());
655    /// # }
656    /// # results.sort();
657    /// # assert_eq!(results, vec![1, 2]);
658    /// # }));
659    /// # }
660    /// ```
661    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
662        self.entries().map(q!(|(k, _)| k))
663    }
664
665    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
666    /// entries whose keys are not in the provided stream.
667    ///
668    /// # Example
669    /// ```rust
670    /// # #[cfg(feature = "deploy")] {
671    /// # use hydro_lang::prelude::*;
672    /// # use futures::StreamExt;
673    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
674    /// let tick = process.tick();
675    /// let keyed_singleton = // { 1: 2, 2: 4 }
676    /// # process
677    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
678    /// #     .into_keyed()
679    /// #     .first()
680    /// #     .batch(&tick, nondet!(/** test */));
681    /// let keys_to_remove = process
682    ///     .source_iter(q!(vec![1]))
683    ///     .batch(&tick, nondet!(/** test */));
684    /// keyed_singleton.filter_key_not_in(keys_to_remove)
685    /// #   .entries().all_ticks()
686    /// # }, |mut stream| async move {
687    /// // { 2: 4 }
688    /// # for w in vec![(2, 4)] {
689    /// #     assert_eq!(stream.next().await.unwrap(), w);
690    /// # }
691    /// # }));
692    /// # }
693    /// ```
694    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
695        self,
696        other: Stream<K, L, Bounded, O2, R2>,
697    ) -> Self
698    where
699        K: Hash + Eq,
700    {
701        check_matching_location(&self.location, &other.location);
702
703        KeyedSingleton::new(
704            self.location.clone(),
705            HydroNode::AntiJoin {
706                pos: Box::new(self.ir_node.into_inner()),
707                neg: Box::new(other.ir_node.into_inner()),
708                metadata: self.location.new_node_metadata(Self::collection_kind()),
709            },
710        )
711    }
712
713    /// An operator which allows you to "inspect" each value of a keyed singleton without
714    /// modifying it. The closure `f` is called on a reference to each value. This is
715    /// mainly useful for debugging, and should not be used to generate side-effects.
716    ///
717    /// # Example
718    /// ```rust
719    /// # #[cfg(feature = "deploy")] {
720    /// # use hydro_lang::prelude::*;
721    /// # use futures::StreamExt;
722    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
723    /// let keyed_singleton = // { 1: 2, 2: 4 }
724    /// # process
725    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
726    /// #     .into_keyed()
727    /// #     .first();
728    /// keyed_singleton
729    ///     .inspect(q!(|v| println!("{}", v)))
730    /// #   .entries()
731    /// # }, |mut stream| async move {
732    /// // { 1: 2, 2: 4 }
733    /// # for w in vec![(1, 2), (2, 4)] {
734    /// #     assert_eq!(stream.next().await.unwrap(), w);
735    /// # }
736    /// # }));
737    /// # }
738    /// ```
739    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
740    where
741        F: Fn(&V) + 'a,
742    {
743        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
744        let inspect_f = q!({
745            let orig = f;
746            move |t: &(_, _)| orig(&t.1)
747        })
748        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
749        .into();
750
751        KeyedSingleton::new(
752            self.location.clone(),
753            HydroNode::Inspect {
754                f: inspect_f,
755                input: Box::new(self.ir_node.into_inner()),
756                metadata: self.location.new_node_metadata(Self::collection_kind()),
757            },
758        )
759    }
760
761    /// An operator which allows you to "inspect" each entry of a keyed singleton without
762    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
763    /// mainly useful for debugging, and should not be used to generate side-effects.
764    ///
765    /// # Example
766    /// ```rust
767    /// # #[cfg(feature = "deploy")] {
768    /// # use hydro_lang::prelude::*;
769    /// # use futures::StreamExt;
770    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
771    /// let keyed_singleton = // { 1: 2, 2: 4 }
772    /// # process
773    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
774    /// #     .into_keyed()
775    /// #     .first();
776    /// keyed_singleton
777    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
778    /// #   .entries()
779    /// # }, |mut stream| async move {
780    /// // { 1: 2, 2: 4 }
781    /// # for w in vec![(1, 2), (2, 4)] {
782    /// #     assert_eq!(stream.next().await.unwrap(), w);
783    /// # }
784    /// # }));
785    /// # }
786    /// ```
787    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
788    where
789        F: Fn(&(K, V)) + 'a,
790    {
791        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
792
793        KeyedSingleton::new(
794            self.location.clone(),
795            HydroNode::Inspect {
796                f: inspect_f,
797                input: Box::new(self.ir_node.into_inner()),
798                metadata: self.location.new_node_metadata(Self::collection_kind()),
799            },
800        )
801    }
802
803    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
804    ///
805    /// Because this method requires values to be bounded, the output [`Optional`] will only be
806    /// asynchronously updated if a new key is added that is higher than the previous max key.
807    ///
808    /// # Example
809    /// ```rust
810    /// # #[cfg(feature = "deploy")] {
811    /// # use hydro_lang::prelude::*;
812    /// # use futures::StreamExt;
813    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
814    /// let tick = process.tick();
815    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
816    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
817    /// #     .into_keyed()
818    /// #     .first();
819    /// keyed_singleton.get_max_key()
820    /// # .sample_eager(nondet!(/** test */))
821    /// # }, |mut stream| async move {
822    /// // (2, 456)
823    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
824    /// # }));
825    /// # }
826    /// ```
827    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
828    where
829        K: Ord,
830    {
831        self.entries()
832            .assume_ordering_trusted(nondet!(
833                /// There is only one element associated with each key, and the keys are totallly
834                /// ordered so we will produce a deterministic value. The closure technically
835                /// isn't commutative in the case where both passed entries have the same key
836                /// but different values.
837                ///
838                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
839                /// the two inputs do not have the same key.
840            ))
841            .reduce(q!(
842                move |curr, new| {
843                    if new.0 > curr.0 {
844                        *curr = new;
845                    }
846                },
847                idempotent = ManualProof(/* repeated elements are ignored */)
848            ))
849    }
850
851    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
852    /// element, the value.
853    ///
854    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
855    ///
856    /// # Example
857    /// ```rust
858    /// # #[cfg(feature = "deploy")] {
859    /// # use hydro_lang::prelude::*;
860    /// # use futures::StreamExt;
861    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862    /// let keyed_singleton = // { 1: 2, 2: 4 }
863    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
864    /// #     .into_keyed()
865    /// #     .first();
866    /// keyed_singleton
867    ///     .clone()
868    ///     .into_keyed_stream()
869    ///     .interleave(
870    ///         keyed_singleton.into_keyed_stream()
871    ///     )
872    /// #   .entries()
873    /// # }, |mut stream| async move {
874    /// /// // { 1: [2, 2], 2: [4, 4] }
875    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
876    /// #     assert_eq!(stream.next().await.unwrap(), w);
877    /// # }
878    /// # }));
879    /// # }
880    /// ```
881    pub fn into_keyed_stream(
882        self,
883    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
884        KeyedStream::new(
885            self.location.clone(),
886            HydroNode::Cast {
887                inner: Box::new(self.ir_node.into_inner()),
888                metadata: self.location.new_node_metadata(KeyedStream::<
889                    K,
890                    V,
891                    L,
892                    B::UnderlyingBound,
893                    TotalOrder,
894                    ExactlyOnce,
895                >::collection_kind()),
896            },
897        )
898    }
899}
900
901impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, L, Bounded> {
902    /// Gets the value associated with a specific key from the keyed singleton.
903    ///
904    /// # Example
905    /// ```rust
906    /// # #[cfg(feature = "deploy")] {
907    /// # use hydro_lang::prelude::*;
908    /// # use futures::StreamExt;
909    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
910    /// let tick = process.tick();
911    /// let keyed_data = process
912    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
913    ///     .into_keyed()
914    ///     .batch(&tick, nondet!(/** test */))
915    ///     .first();
916    /// let key = tick.singleton(q!(1));
917    /// keyed_data.get(key).all_ticks()
918    /// # }, |mut stream| async move {
919    /// // 2
920    /// # assert_eq!(stream.next().await.unwrap(), 2);
921    /// # }));
922    /// # }
923    /// ```
924    pub fn get(self, key: Singleton<K, L, Bounded>) -> Optional<V, L, Bounded> {
925        self.into_keyed_stream()
926            .get(key)
927            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
928            .first()
929    }
930
931    /// Emit a keyed stream containing keys shared between the keyed singleton and the
932    /// keyed stream, where each value in the output keyed stream is a tuple of
933    /// (the keyed singleton's value, the keyed stream's value).
934    ///
935    /// # Example
936    /// ```rust
937    /// # #[cfg(feature = "deploy")] {
938    /// # use hydro_lang::prelude::*;
939    /// # use futures::StreamExt;
940    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
941    /// let tick = process.tick();
942    /// let keyed_data = process
943    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
944    ///     .into_keyed()
945    ///     .batch(&tick, nondet!(/** test */))
946    ///     .first();
947    /// let other_data = process
948    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
949    ///     .into_keyed()
950    ///     .batch(&tick, nondet!(/** test */));
951    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
952    /// # }, |mut stream| async move {
953    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
954    /// # let mut results = vec![];
955    /// # for _ in 0..3 {
956    /// #     results.push(stream.next().await.unwrap());
957    /// # }
958    /// # results.sort();
959    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
960    /// # }));
961    /// # }
962    /// ```
963    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2>(
964        self,
965        keyed_stream: KeyedStream<K, V2, L, Bounded, O2, R2>,
966    ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R2> {
967        self.entries()
968            .weaken_retries::<R2>()
969            .join(keyed_stream.entries())
970            .into_keyed()
971    }
972
973    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
974    /// where each value in the output keyed singleton is a tuple of
975    /// (self.value, other.value).
976    ///
977    /// # Example
978    /// ```rust
979    /// # #[cfg(feature = "deploy")] {
980    /// # use hydro_lang::prelude::*;
981    /// # use futures::StreamExt;
982    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
983    /// # let tick = process.tick();
984    /// let requests = // { 1: 10, 2: 20, 3: 30 }
985    /// # process
986    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
987    /// #     .into_keyed()
988    /// #     .batch(&tick, nondet!(/** test */))
989    /// #     .first();
990    /// let other = // { 1: 100, 2: 200, 4: 400 }
991    /// # process
992    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
993    /// #     .into_keyed()
994    /// #     .batch(&tick, nondet!(/** test */))
995    /// #     .first();
996    /// requests.join_keyed_singleton(other)
997    /// # .entries().all_ticks()
998    /// # }, |mut stream| async move {
999    /// // { 1: (10, 100), 2: (20, 200) }
1000    /// # let mut results = vec![];
1001    /// # for _ in 0..2 {
1002    /// #     results.push(stream.next().await.unwrap());
1003    /// # }
1004    /// # results.sort();
1005    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1006    /// # }));
1007    /// # }
1008    /// ```
1009    pub fn join_keyed_singleton<V2: Clone>(
1010        self,
1011        other: KeyedSingleton<K, V2, L, Bounded>,
1012    ) -> KeyedSingleton<K, (V, V2), L, Bounded> {
1013        let result_stream = self.entries().join(other.entries()).into_keyed();
1014
1015        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
1016        KeyedSingleton::new(
1017            result_stream.location.clone(),
1018            HydroNode::Cast {
1019                inner: Box::new(result_stream.ir_node.into_inner()),
1020                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
1021                    K,
1022                    (V, V2),
1023                    L,
1024                    Bounded,
1025                >::collection_kind(
1026                )),
1027            },
1028        )
1029    }
1030
1031    /// For each value in `self`, find the matching key in `lookup`.
1032    /// The output is a keyed singleton with the key from `self`, and a value
1033    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
1034    /// If the key is not present in `lookup`, the option will be [`None`].
1035    ///
1036    /// # Example
1037    /// ```rust
1038    /// # #[cfg(feature = "deploy")] {
1039    /// # use hydro_lang::prelude::*;
1040    /// # use futures::StreamExt;
1041    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1042    /// # let tick = process.tick();
1043    /// let requests = // { 1: 10, 2: 20 }
1044    /// # process
1045    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
1046    /// #     .into_keyed()
1047    /// #     .batch(&tick, nondet!(/** test */))
1048    /// #     .first();
1049    /// let other_data = // { 10: 100, 11: 110 }
1050    /// # process
1051    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
1052    /// #     .into_keyed()
1053    /// #     .batch(&tick, nondet!(/** test */))
1054    /// #     .first();
1055    /// requests.lookup_keyed_singleton(other_data)
1056    /// # .entries().all_ticks()
1057    /// # }, |mut stream| async move {
1058    /// // { 1: (10, Some(100)), 2: (20, None) }
1059    /// # let mut results = vec![];
1060    /// # for _ in 0..2 {
1061    /// #     results.push(stream.next().await.unwrap());
1062    /// # }
1063    /// # results.sort();
1064    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
1065    /// # }));
1066    /// # }
1067    /// ```
1068    pub fn lookup_keyed_singleton<V2>(
1069        self,
1070        lookup: KeyedSingleton<V, V2, L, Bounded>,
1071    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
1072    where
1073        K: Eq + Hash + Clone,
1074        V: Eq + Hash + Clone,
1075        V2: Clone,
1076    {
1077        let result_stream = self
1078            .into_keyed_stream()
1079            .lookup_keyed_stream(lookup.into_keyed_stream());
1080
1081        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
1082        KeyedSingleton::new(
1083            result_stream.location.clone(),
1084            HydroNode::Cast {
1085                inner: Box::new(result_stream.ir_node.into_inner()),
1086                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
1087                    K,
1088                    (V, Option<V2>),
1089                    L,
1090                    Bounded,
1091                >::collection_kind(
1092                )),
1093            },
1094        )
1095    }
1096
1097    /// For each value in `self`, find the matching key in `lookup`.
1098    /// The output is a keyed stream with the key from `self`, and a value
1099    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
1100    /// If the key is not present in `lookup`, the option will be [`None`].
1101    ///
1102    /// # Example
1103    /// ```rust
1104    /// # #[cfg(feature = "deploy")] {
1105    /// # use hydro_lang::prelude::*;
1106    /// # use futures::StreamExt;
1107    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1108    /// # let tick = process.tick();
1109    /// let requests = // { 1: 10, 2: 20 }
1110    /// # process
1111    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
1112    /// #     .into_keyed()
1113    /// #     .batch(&tick, nondet!(/** test */))
1114    /// #     .first();
1115    /// let other_data = // { 10: 100, 10: 110 }
1116    /// # process
1117    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
1118    /// #     .into_keyed()
1119    /// #     .batch(&tick, nondet!(/** test */));
1120    /// requests.lookup_keyed_stream(other_data)
1121    /// # .entries().all_ticks()
1122    /// # }, |mut stream| async move {
1123    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
1124    /// # let mut results = vec![];
1125    /// # for _ in 0..3 {
1126    /// #     results.push(stream.next().await.unwrap());
1127    /// # }
1128    /// # results.sort();
1129    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
1130    /// # }));
1131    /// # }
1132    /// ```
1133    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
1134        self,
1135        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
1136    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
1137    where
1138        K: Eq + Hash + Clone,
1139        V: Eq + Hash + Clone,
1140        V2: Clone,
1141    {
1142        self.entries()
1143            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
1144            .into_keyed()
1145            .lookup_keyed_stream(lookup)
1146    }
1147}
1148
1149impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1150where
1151    L: Location<'a>,
1152{
1153    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1154    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1155    ///
1156    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1157    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1158    /// argument that declares where the keyed singleton will be atomically processed. Batching a
1159    /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1160    /// batching into a different [`Tick`] will introduce asynchrony.
1161    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1162        let out_location = Atomic { tick: tick.clone() };
1163        KeyedSingleton::new(
1164            out_location.clone(),
1165            HydroNode::BeginAtomic {
1166                inner: Box::new(self.ir_node.into_inner()),
1167                metadata: out_location
1168                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1169            },
1170        )
1171    }
1172}
1173
1174impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1175where
1176    L: Location<'a> + NoTick,
1177{
1178    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1179    /// See [`KeyedSingleton::atomic`] for more details.
1180    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1181        KeyedSingleton::new(
1182            self.location.tick.l.clone(),
1183            HydroNode::EndAtomic {
1184                inner: Box::new(self.ir_node.into_inner()),
1185                metadata: self
1186                    .location
1187                    .tick
1188                    .l
1189                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1190            },
1191        )
1192    }
1193}
1194
1195impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1196    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1197    /// tick `T` always has the entries of `self` at tick `T - 1`.
1198    ///
1199    /// At tick `0`, the output has no entries, since there is no previous tick.
1200    ///
1201    /// This operator enables stateful iterative processing with ticks, by sending data from one
1202    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1203    ///
1204    /// # Example
1205    /// ```rust
1206    /// # #[cfg(feature = "deploy")] {
1207    /// # use hydro_lang::prelude::*;
1208    /// # use futures::StreamExt;
1209    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1210    /// let tick = process.tick();
1211    /// # // ticks are lazy by default, forces the second tick to run
1212    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1213    /// # let batch_first_tick = process
1214    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1215    /// #   .batch(&tick, nondet!(/** test */))
1216    /// #   .into_keyed();
1217    /// # let batch_second_tick = process
1218    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1219    /// #   .batch(&tick, nondet!(/** test */))
1220    /// #   .into_keyed()
1221    /// #   .defer_tick(); // appears on the second tick
1222    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1223    /// # batch_first_tick.chain(batch_second_tick).first();
1224    /// input_batch.clone().filter_key_not_in(
1225    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1226    /// )
1227    /// # .entries().all_ticks()
1228    /// # }, |mut stream| async move {
1229    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1230    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1231    /// #     assert_eq!(stream.next().await.unwrap(), w);
1232    /// # }
1233    /// # }));
1234    /// # }
1235    /// ```
1236    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1237        KeyedSingleton::new(
1238            self.location.clone(),
1239            HydroNode::DeferTick {
1240                input: Box::new(self.ir_node.into_inner()),
1241                metadata: self
1242                    .location
1243                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1244            },
1245        )
1246    }
1247}
1248
1249impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1250where
1251    L: Location<'a>,
1252{
1253    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1254    /// point in time.
1255    ///
1256    /// # Non-Determinism
1257    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1258    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1259    pub fn snapshot(
1260        self,
1261        tick: &Tick<L>,
1262        _nondet: NonDet,
1263    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1264        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1265        KeyedSingleton::new(
1266            tick.clone(),
1267            HydroNode::Batch {
1268                inner: Box::new(self.ir_node.into_inner()),
1269                metadata: tick
1270                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1271            },
1272        )
1273    }
1274}
1275
1276impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1277where
1278    L: Location<'a> + NoTick,
1279{
1280    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1281    /// state of the keyed singleton being atomically processed.
1282    ///
1283    /// # Non-Determinism
1284    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1285    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1286    pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1287        KeyedSingleton::new(
1288            self.location.clone().tick,
1289            HydroNode::Batch {
1290                inner: Box::new(self.ir_node.into_inner()),
1291                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1292                    K,
1293                    V,
1294                    Tick<L>,
1295                    Bounded,
1296                >::collection_kind(
1297                )),
1298            },
1299        )
1300    }
1301}
1302
1303impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1304where
1305    L: Location<'a>,
1306{
1307    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1308    ///
1309    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1310    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1311    /// is filtered out.
1312    ///
1313    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1314    /// not modify or take ownership of the values. If you need to modify the values while filtering
1315    /// use [`KeyedSingleton::filter_map`] instead.
1316    ///
1317    /// # Example
1318    /// ```rust
1319    /// # #[cfg(feature = "deploy")] {
1320    /// # use hydro_lang::prelude::*;
1321    /// # use futures::StreamExt;
1322    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1323    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1324    /// # process
1325    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1326    /// #     .into_keyed()
1327    /// #     .first();
1328    /// keyed_singleton.filter(q!(|&v| v > 1))
1329    /// #   .entries()
1330    /// # }, |mut stream| async move {
1331    /// // { 1: 2, 2: 4 }
1332    /// # let mut results = Vec::new();
1333    /// # for _ in 0..2 {
1334    /// #     results.push(stream.next().await.unwrap());
1335    /// # }
1336    /// # results.sort();
1337    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1338    /// # }));
1339    /// # }
1340    /// ```
1341    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1342    where
1343        F: Fn(&V) -> bool + 'a,
1344    {
1345        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1346        let filter_f = q!({
1347            let orig = f;
1348            move |t: &(_, _)| orig(&t.1)
1349        })
1350        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1351        .into();
1352
1353        KeyedSingleton::new(
1354            self.location.clone(),
1355            HydroNode::Filter {
1356                f: filter_f,
1357                input: Box::new(self.ir_node.into_inner()),
1358                metadata: self
1359                    .location
1360                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1361            },
1362        )
1363    }
1364
1365    /// An operator that both filters and maps values. It yields only the key-value pairs where
1366    /// the supplied closure `f` returns `Some(value)`.
1367    ///
1368    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1369    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1370    /// If it returns `None`, the key-value pair is filtered out.
1371    ///
1372    /// # Example
1373    /// ```rust
1374    /// # #[cfg(feature = "deploy")] {
1375    /// # use hydro_lang::prelude::*;
1376    /// # use futures::StreamExt;
1377    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1378    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1379    /// # process
1380    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1381    /// #     .into_keyed()
1382    /// #     .first();
1383    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1384    /// #   .entries()
1385    /// # }, |mut stream| async move {
1386    /// // { 1: 42, 3: 100 }
1387    /// # let mut results = Vec::new();
1388    /// # for _ in 0..2 {
1389    /// #     results.push(stream.next().await.unwrap());
1390    /// # }
1391    /// # results.sort();
1392    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1393    /// # }));
1394    /// # }
1395    /// ```
1396    pub fn filter_map<F, U>(
1397        self,
1398        f: impl IntoQuotedMut<'a, F, L> + Copy,
1399    ) -> KeyedSingleton<K, U, L, B>
1400    where
1401        F: Fn(V) -> Option<U> + 'a,
1402    {
1403        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1404        let filter_map_f = q!({
1405            let orig = f;
1406            move |(k, v)| orig(v).map(|o| (k, o))
1407        })
1408        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1409        .into();
1410
1411        KeyedSingleton::new(
1412            self.location.clone(),
1413            HydroNode::FilterMap {
1414                f: filter_map_f,
1415                input: Box::new(self.ir_node.into_inner()),
1416                metadata: self
1417                    .location
1418                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1419            },
1420        )
1421    }
1422
1423    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1424    /// arrived since the previous batch was released.
1425    ///
1426    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1427    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1428    ///
1429    /// # Non-Determinism
1430    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1431    /// has a non-deterministic set of key-value pairs.
1432    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1433    where
1434        L: NoTick,
1435    {
1436        self.atomic(tick).batch_atomic(nondet)
1437    }
1438}
1439
1440impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1441where
1442    L: Location<'a> + NoTick,
1443{
1444    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1445    /// atomically processed.
1446    ///
1447    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1448    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1449    ///
1450    /// # Non-Determinism
1451    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1452    /// has a non-deterministic set of key-value pairs.
1453    pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1454        let _ = nondet;
1455        KeyedSingleton::new(
1456            self.location.clone().tick,
1457            HydroNode::Batch {
1458                inner: Box::new(self.ir_node.into_inner()),
1459                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1460                    K,
1461                    V,
1462                    Tick<L>,
1463                    Bounded,
1464                >::collection_kind(
1465                )),
1466            },
1467        )
1468    }
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473    #[cfg(feature = "deploy")]
1474    use futures::{SinkExt, StreamExt};
1475    #[cfg(feature = "deploy")]
1476    use hydro_deploy::Deployment;
1477    #[cfg(any(feature = "deploy", feature = "sim"))]
1478    use stageleft::q;
1479
1480    #[cfg(any(feature = "deploy", feature = "sim"))]
1481    use crate::compile::builder::FlowBuilder;
1482    #[cfg(any(feature = "deploy", feature = "sim"))]
1483    use crate::location::Location;
1484    #[cfg(any(feature = "deploy", feature = "sim"))]
1485    use crate::nondet::nondet;
1486
1487    #[cfg(feature = "deploy")]
1488    #[tokio::test]
1489    async fn key_count_bounded_value() {
1490        let mut deployment = Deployment::new();
1491
1492        let mut flow = FlowBuilder::new();
1493        let node = flow.process::<()>();
1494        let external = flow.external::<()>();
1495
1496        let (input_port, input) = node.source_external_bincode(&external);
1497        let out = input
1498            .into_keyed()
1499            .first()
1500            .key_count()
1501            .sample_eager(nondet!(/** test */))
1502            .send_bincode_external(&external);
1503
1504        let nodes = flow
1505            .with_process(&node, deployment.Localhost())
1506            .with_external(&external, deployment.Localhost())
1507            .deploy(&mut deployment);
1508
1509        deployment.deploy().await.unwrap();
1510
1511        let mut external_in = nodes.connect(input_port).await;
1512        let mut external_out = nodes.connect(out).await;
1513
1514        deployment.start().await.unwrap();
1515
1516        assert_eq!(external_out.next().await.unwrap(), 0);
1517
1518        external_in.send((1, 1)).await.unwrap();
1519        assert_eq!(external_out.next().await.unwrap(), 1);
1520
1521        external_in.send((2, 2)).await.unwrap();
1522        assert_eq!(external_out.next().await.unwrap(), 2);
1523    }
1524
1525    #[cfg(feature = "deploy")]
1526    #[tokio::test]
1527    async fn key_count_unbounded_value() {
1528        let mut deployment = Deployment::new();
1529
1530        let mut flow = FlowBuilder::new();
1531        let node = flow.process::<()>();
1532        let external = flow.external::<()>();
1533
1534        let (input_port, input) = node.source_external_bincode(&external);
1535        let out = input
1536            .into_keyed()
1537            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1538            .key_count()
1539            .sample_eager(nondet!(/** test */))
1540            .send_bincode_external(&external);
1541
1542        let nodes = flow
1543            .with_process(&node, deployment.Localhost())
1544            .with_external(&external, deployment.Localhost())
1545            .deploy(&mut deployment);
1546
1547        deployment.deploy().await.unwrap();
1548
1549        let mut external_in = nodes.connect(input_port).await;
1550        let mut external_out = nodes.connect(out).await;
1551
1552        deployment.start().await.unwrap();
1553
1554        assert_eq!(external_out.next().await.unwrap(), 0);
1555
1556        external_in.send((1, 1)).await.unwrap();
1557        assert_eq!(external_out.next().await.unwrap(), 1);
1558
1559        external_in.send((1, 2)).await.unwrap();
1560        assert_eq!(external_out.next().await.unwrap(), 1);
1561
1562        external_in.send((2, 2)).await.unwrap();
1563        assert_eq!(external_out.next().await.unwrap(), 2);
1564
1565        external_in.send((1, 1)).await.unwrap();
1566        assert_eq!(external_out.next().await.unwrap(), 2);
1567
1568        external_in.send((3, 1)).await.unwrap();
1569        assert_eq!(external_out.next().await.unwrap(), 3);
1570    }
1571
1572    #[cfg(feature = "deploy")]
1573    #[tokio::test]
1574    async fn into_singleton_bounded_value() {
1575        let mut deployment = Deployment::new();
1576
1577        let mut flow = FlowBuilder::new();
1578        let node = flow.process::<()>();
1579        let external = flow.external::<()>();
1580
1581        let (input_port, input) = node.source_external_bincode(&external);
1582        let out = input
1583            .into_keyed()
1584            .first()
1585            .into_singleton()
1586            .sample_eager(nondet!(/** test */))
1587            .send_bincode_external(&external);
1588
1589        let nodes = flow
1590            .with_process(&node, deployment.Localhost())
1591            .with_external(&external, deployment.Localhost())
1592            .deploy(&mut deployment);
1593
1594        deployment.deploy().await.unwrap();
1595
1596        let mut external_in = nodes.connect(input_port).await;
1597        let mut external_out = nodes.connect(out).await;
1598
1599        deployment.start().await.unwrap();
1600
1601        assert_eq!(
1602            external_out.next().await.unwrap(),
1603            std::collections::HashMap::new()
1604        );
1605
1606        external_in.send((1, 1)).await.unwrap();
1607        assert_eq!(
1608            external_out.next().await.unwrap(),
1609            vec![(1, 1)].into_iter().collect()
1610        );
1611
1612        external_in.send((2, 2)).await.unwrap();
1613        assert_eq!(
1614            external_out.next().await.unwrap(),
1615            vec![(1, 1), (2, 2)].into_iter().collect()
1616        );
1617    }
1618
1619    #[cfg(feature = "deploy")]
1620    #[tokio::test]
1621    async fn into_singleton_unbounded_value() {
1622        let mut deployment = Deployment::new();
1623
1624        let mut flow = FlowBuilder::new();
1625        let node = flow.process::<()>();
1626        let external = flow.external::<()>();
1627
1628        let (input_port, input) = node.source_external_bincode(&external);
1629        let out = input
1630            .into_keyed()
1631            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1632            .into_singleton()
1633            .sample_eager(nondet!(/** test */))
1634            .send_bincode_external(&external);
1635
1636        let nodes = flow
1637            .with_process(&node, deployment.Localhost())
1638            .with_external(&external, deployment.Localhost())
1639            .deploy(&mut deployment);
1640
1641        deployment.deploy().await.unwrap();
1642
1643        let mut external_in = nodes.connect(input_port).await;
1644        let mut external_out = nodes.connect(out).await;
1645
1646        deployment.start().await.unwrap();
1647
1648        assert_eq!(
1649            external_out.next().await.unwrap(),
1650            std::collections::HashMap::new()
1651        );
1652
1653        external_in.send((1, 1)).await.unwrap();
1654        assert_eq!(
1655            external_out.next().await.unwrap(),
1656            vec![(1, 1)].into_iter().collect()
1657        );
1658
1659        external_in.send((1, 2)).await.unwrap();
1660        assert_eq!(
1661            external_out.next().await.unwrap(),
1662            vec![(1, 2)].into_iter().collect()
1663        );
1664
1665        external_in.send((2, 2)).await.unwrap();
1666        assert_eq!(
1667            external_out.next().await.unwrap(),
1668            vec![(1, 2), (2, 1)].into_iter().collect()
1669        );
1670
1671        external_in.send((1, 1)).await.unwrap();
1672        assert_eq!(
1673            external_out.next().await.unwrap(),
1674            vec![(1, 3), (2, 1)].into_iter().collect()
1675        );
1676
1677        external_in.send((3, 1)).await.unwrap();
1678        assert_eq!(
1679            external_out.next().await.unwrap(),
1680            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1681        );
1682    }
1683
1684    #[cfg(feature = "sim")]
1685    #[test]
1686    fn sim_unbounded_singleton_snapshot() {
1687        let mut flow = FlowBuilder::new();
1688        let node = flow.process::<()>();
1689
1690        let (input_port, input) = node.sim_input();
1691        let output = input
1692            .into_keyed()
1693            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1694            .snapshot(&node.tick(), nondet!(/** test */))
1695            .entries()
1696            .all_ticks()
1697            .sim_output();
1698
1699        let count = flow.sim().exhaustive(async || {
1700            input_port.send((1, 123));
1701            input_port.send((1, 456));
1702            input_port.send((2, 123));
1703
1704            let all = output.collect_sorted::<Vec<_>>().await;
1705            assert_eq!(all.last().unwrap(), &(2, 1));
1706        });
1707
1708        assert_eq!(count, 8);
1709    }
1710
1711    #[cfg(feature = "deploy")]
1712    #[tokio::test]
1713    async fn join_keyed_stream() {
1714        let mut deployment = Deployment::new();
1715
1716        let mut flow = FlowBuilder::new();
1717        let node = flow.process::<()>();
1718        let external = flow.external::<()>();
1719
1720        let tick = node.tick();
1721        let keyed_data = node
1722            .source_iter(q!(vec![(1, 10), (2, 20)]))
1723            .into_keyed()
1724            .batch(&tick, nondet!(/** test */))
1725            .first();
1726        let requests = node
1727            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1728            .into_keyed()
1729            .batch(&tick, nondet!(/** test */));
1730
1731        let out = keyed_data
1732            .join_keyed_stream(requests)
1733            .entries()
1734            .all_ticks()
1735            .send_bincode_external(&external);
1736
1737        let nodes = flow
1738            .with_process(&node, deployment.Localhost())
1739            .with_external(&external, deployment.Localhost())
1740            .deploy(&mut deployment);
1741
1742        deployment.deploy().await.unwrap();
1743
1744        let mut external_out = nodes.connect(out).await;
1745
1746        deployment.start().await.unwrap();
1747
1748        let mut results = vec![];
1749        for _ in 0..2 {
1750            results.push(external_out.next().await.unwrap());
1751        }
1752        results.sort();
1753
1754        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1755    }
1756}