Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41    type UnderlyingBound: Boundedness;
42    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43    type ValueBound: Boundedness;
44
45    /// The type of the keyed singleton if the value for each key is immutable.
46    type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
47
48    /// The type of the keyed singleton if the value for each key may change asynchronously.
49    type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
50
51    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
52    fn bound_kind() -> KeyedSingletonBoundKind;
53}
54
55impl KeyedSingletonBound for Unbounded {
56    type UnderlyingBound = Unbounded;
57    type ValueBound = Unbounded;
58    type WithBoundedValue = BoundedValue;
59    type WithUnboundedValue = Unbounded;
60
61    fn bound_kind() -> KeyedSingletonBoundKind {
62        KeyedSingletonBoundKind::Unbounded
63    }
64}
65
66impl KeyedSingletonBound for Bounded {
67    type UnderlyingBound = Bounded;
68    type ValueBound = Bounded;
69    type WithBoundedValue = Bounded;
70    type WithUnboundedValue = UnreachableBound;
71
72    fn bound_kind() -> KeyedSingletonBoundKind {
73        KeyedSingletonBoundKind::Bounded
74    }
75}
76
77/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
78/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
79/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
80pub struct BoundedValue;
81
82impl KeyedSingletonBound for BoundedValue {
83    type UnderlyingBound = Unbounded;
84    type ValueBound = Bounded;
85    type WithBoundedValue = BoundedValue;
86    type WithUnboundedValue = Unbounded;
87
88    fn bound_kind() -> KeyedSingletonBoundKind {
89        KeyedSingletonBoundKind::BoundedValue
90    }
91}
92
93#[doc(hidden)]
94pub struct UnreachableBound;
95
96impl KeyedSingletonBound for UnreachableBound {
97    type UnderlyingBound = Bounded;
98    type ValueBound = Unbounded;
99
100    type WithBoundedValue = Bounded;
101    type WithUnboundedValue = UnreachableBound;
102
103    fn bound_kind() -> KeyedSingletonBoundKind {
104        unreachable!("UnreachableBound cannot be instantiated")
105    }
106}
107
108/// Mapping from keys of type `K` to values of type `V`.
109///
110/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
111/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
112/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
113/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
114/// keys cannot be removed and the value for each key is immutable.
115///
116/// Type Parameters:
117/// - `K`: the type of the key for each entry
118/// - `V`: the type of the value for each entry
119/// - `Loc`: the [`Location`] where the keyed singleton is materialized
120/// - `Bound`: tracks whether the entries are:
121///     - [`Bounded`] (local and finite)
122///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
123///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
124pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
125    pub(crate) location: Loc,
126    pub(crate) ir_node: RefCell<HydroNode>,
127    pub(crate) flow_state: FlowState,
128
129    _phantom: PhantomData<(K, V, Loc, Bound)>,
130}
131
132impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
133    fn drop(&mut self) {
134        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
135        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
136            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
137                input: Box::new(ir_node),
138                op_metadata: HydroIrOpMetadata::new(),
139            });
140        }
141    }
142}
143
144impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
145    for KeyedSingleton<K, V, Loc, Bound>
146{
147    fn clone(&self) -> Self {
148        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
149            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
150            *self.ir_node.borrow_mut() = HydroNode::Tee {
151                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
152                metadata: self.location.new_node_metadata(Self::collection_kind()),
153            };
154        }
155
156        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
157            KeyedSingleton {
158                location: self.location.clone(),
159                flow_state: self.flow_state.clone(),
160                ir_node: HydroNode::Tee {
161                    inner: SharedNode(inner.0.clone()),
162                    metadata: metadata.clone(),
163                }
164                .into(),
165                _phantom: PhantomData,
166            }
167        } else {
168            unreachable!()
169        }
170    }
171}
172
173impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
174    for KeyedSingleton<K, V, L, B>
175where
176    L: Location<'a> + NoTick,
177{
178    type Location = L;
179
180    fn create_source(cycle_id: CycleId, location: L) -> Self {
181        KeyedSingleton {
182            flow_state: location.flow_state().clone(),
183            location: location.clone(),
184            ir_node: RefCell::new(HydroNode::CycleSource {
185                cycle_id,
186                metadata: location.new_node_metadata(Self::collection_kind()),
187            }),
188            _phantom: PhantomData,
189        }
190    }
191}
192
193impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
194where
195    L: Location<'a>,
196{
197    type Location = Tick<L>;
198
199    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
200        KeyedSingleton::new(
201            location.clone(),
202            HydroNode::CycleSource {
203                cycle_id,
204                metadata: location.new_node_metadata(Self::collection_kind()),
205            },
206        )
207    }
208}
209
210impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
211where
212    L: Location<'a>,
213{
214    fn defer_tick(self) -> Self {
215        KeyedSingleton::defer_tick(self)
216    }
217}
218
219impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
220    for KeyedSingleton<K, V, L, B>
221where
222    L: Location<'a> + NoTick,
223{
224    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
225        assert_eq!(
226            Location::id(&self.location),
227            expected_location,
228            "locations do not match"
229        );
230        self.location
231            .flow_state()
232            .borrow_mut()
233            .push_root(HydroRoot::CycleSink {
234                cycle_id,
235                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
236                op_metadata: HydroIrOpMetadata::new(),
237            });
238    }
239}
240
241impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
242where
243    L: Location<'a>,
244{
245    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
246        assert_eq!(
247            Location::id(&self.location),
248            expected_location,
249            "locations do not match"
250        );
251        self.location
252            .flow_state()
253            .borrow_mut()
254            .push_root(HydroRoot::CycleSink {
255                cycle_id,
256                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
257                op_metadata: HydroIrOpMetadata::new(),
258            });
259    }
260}
261
262impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
263    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
264        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
265        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
266
267        let flow_state = location.flow_state().clone();
268        KeyedSingleton {
269            location,
270            flow_state,
271            ir_node: RefCell::new(ir_node),
272            _phantom: PhantomData,
273        }
274    }
275
276    /// Returns the [`Location`] where this keyed singleton is being materialized.
277    pub fn location(&self) -> &L {
278        &self.location
279    }
280}
281
282#[cfg(stageleft_runtime)]
283fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
284    me: KeyedSingleton<K, V, L, Bounded>,
285) -> Singleton<usize, L, Bounded> {
286    me.entries().count()
287}
288
289#[cfg(stageleft_runtime)]
290fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
291    me: KeyedSingleton<K, V, L, Bounded>,
292) -> Singleton<HashMap<K, V>, L, Bounded>
293where
294    K: Eq + Hash,
295{
296    me.entries()
297        .assume_ordering(nondet!(
298            /// Because this is a keyed singleton, there is only one value per key.
299        ))
300        .fold(
301            q!(|| HashMap::new()),
302            q!(|map, (k, v)| {
303                map.insert(k, v);
304            }),
305        )
306}
307
308impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
309    pub(crate) fn collection_kind() -> CollectionKind {
310        CollectionKind::KeyedSingleton {
311            bound: B::bound_kind(),
312            key_type: stageleft::quote_type::<K>().into(),
313            value_type: stageleft::quote_type::<V>().into(),
314        }
315    }
316
317    /// Transforms each value by invoking `f` on each element, with keys staying the same
318    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
319    ///
320    /// If you do not want to modify the stream and instead only want to view
321    /// each item use [`KeyedSingleton::inspect`] instead.
322    ///
323    /// # Example
324    /// ```rust
325    /// # #[cfg(feature = "deploy")] {
326    /// # use hydro_lang::prelude::*;
327    /// # use futures::StreamExt;
328    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
329    /// let keyed_singleton = // { 1: 2, 2: 4 }
330    /// # process
331    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
332    /// #     .into_keyed()
333    /// #     .first();
334    /// keyed_singleton.map(q!(|v| v + 1))
335    /// #   .entries()
336    /// # }, |mut stream| async move {
337    /// // { 1: 3, 2: 5 }
338    /// # let mut results = Vec::new();
339    /// # for _ in 0..2 {
340    /// #     results.push(stream.next().await.unwrap());
341    /// # }
342    /// # results.sort();
343    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
344    /// # }));
345    /// # }
346    /// ```
347    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
348    where
349        F: Fn(V) -> U + 'a,
350    {
351        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
352        let map_f = q!({
353            let orig = f;
354            move |(k, v)| (k, orig(v))
355        })
356        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
357        .into();
358
359        KeyedSingleton::new(
360            self.location.clone(),
361            HydroNode::Map {
362                f: map_f,
363                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
364                metadata: self
365                    .location
366                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
367            },
368        )
369    }
370
371    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
372    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
373    ///
374    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
375    /// the new value `U`. The key remains unchanged in the output.
376    ///
377    /// # Example
378    /// ```rust
379    /// # #[cfg(feature = "deploy")] {
380    /// # use hydro_lang::prelude::*;
381    /// # use futures::StreamExt;
382    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
383    /// let keyed_singleton = // { 1: 2, 2: 4 }
384    /// # process
385    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
386    /// #     .into_keyed()
387    /// #     .first();
388    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
389    /// #   .entries()
390    /// # }, |mut stream| async move {
391    /// // { 1: 3, 2: 6 }
392    /// # let mut results = Vec::new();
393    /// # for _ in 0..2 {
394    /// #     results.push(stream.next().await.unwrap());
395    /// # }
396    /// # results.sort();
397    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
398    /// # }));
399    /// # }
400    /// ```
401    pub fn map_with_key<U, F>(
402        self,
403        f: impl IntoQuotedMut<'a, F, L> + Copy,
404    ) -> KeyedSingleton<K, U, L, B>
405    where
406        F: Fn((K, V)) -> U + 'a,
407        K: Clone,
408    {
409        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
410        let map_f = q!({
411            let orig = f;
412            move |(k, v)| {
413                let out = orig((Clone::clone(&k), v));
414                (k, out)
415            }
416        })
417        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
418        .into();
419
420        KeyedSingleton::new(
421            self.location.clone(),
422            HydroNode::Map {
423                f: map_f,
424                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
425                metadata: self
426                    .location
427                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
428            },
429        )
430    }
431
432    /// Gets the number of keys in the keyed singleton.
433    ///
434    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
435    /// since keys may be added / removed over time. When the set of keys changes, the count will
436    /// be asynchronously updated.
437    ///
438    /// # Example
439    /// ```rust
440    /// # #[cfg(feature = "deploy")] {
441    /// # use hydro_lang::prelude::*;
442    /// # use futures::StreamExt;
443    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
444    /// # let tick = process.tick();
445    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
446    /// # process
447    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
448    /// #     .into_keyed()
449    /// #     .batch(&tick, nondet!(/** test */))
450    /// #     .first();
451    /// keyed_singleton.key_count()
452    /// # .all_ticks()
453    /// # }, |mut stream| async move {
454    /// // 3
455    /// # assert_eq!(stream.next().await.unwrap(), 3);
456    /// # }));
457    /// # }
458    /// ```
459    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
460        if B::ValueBound::BOUNDED {
461            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
462                location: self.location.clone(),
463                flow_state: self.flow_state.clone(),
464                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
465                _phantom: PhantomData,
466            };
467
468            me.entries().count()
469        } else if L::is_top_level()
470            && let Some(tick) = self.location.try_tick()
471        {
472            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
473                location: self.location.clone(),
474                flow_state: self.flow_state.clone(),
475                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
476                _phantom: PhantomData,
477            };
478
479            let out =
480                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
481                    .latest();
482            Singleton::new(
483                out.location.clone(),
484                out.ir_node.replace(HydroNode::Placeholder),
485            )
486        } else {
487            panic!("Unbounded KeyedSingleton inside a tick");
488        }
489    }
490
491    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
492    ///
493    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
494    /// asynchronously as well.
495    ///
496    /// # Example
497    /// ```rust
498    /// # #[cfg(feature = "deploy")] {
499    /// # use hydro_lang::prelude::*;
500    /// # use futures::StreamExt;
501    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
502    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
503    /// # process
504    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
505    /// #     .into_keyed()
506    /// #     .batch(&process.tick(), nondet!(/** test */))
507    /// #     .first();
508    /// keyed_singleton.into_singleton()
509    /// # .all_ticks()
510    /// # }, |mut stream| async move {
511    /// // { 1: "a", 2: "b", 3: "c" }
512    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
513    /// # }));
514    /// # }
515    /// ```
516    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
517    where
518        K: Eq + Hash,
519    {
520        if B::ValueBound::BOUNDED {
521            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
522                location: self.location.clone(),
523                flow_state: self.flow_state.clone(),
524                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
525                _phantom: PhantomData,
526            };
527
528            me.entries()
529                .assume_ordering(nondet!(
530                    /// Because this is a keyed singleton, there is only one value per key.
531                ))
532                .fold(
533                    q!(|| HashMap::new()),
534                    q!(|map, (k, v)| {
535                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
536                        map.insert(k, v);
537                    }),
538                )
539        } else if L::is_top_level()
540            && let Some(tick) = self.location.try_tick()
541        {
542            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
543                location: self.location.clone(),
544                flow_state: self.flow_state.clone(),
545                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
546                _phantom: PhantomData,
547            };
548
549            let out = into_singleton_inside_tick(
550                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
551            )
552            .latest();
553            Singleton::new(
554                out.location.clone(),
555                out.ir_node.replace(HydroNode::Placeholder),
556            )
557        } else {
558            panic!("Unbounded KeyedSingleton inside a tick");
559        }
560    }
561
562    /// An operator which allows you to "name" a `HydroNode`.
563    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
564    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
565        {
566            let mut node = self.ir_node.borrow_mut();
567            let metadata = node.metadata_mut();
568            metadata.tag = Some(name.to_owned());
569        }
570        self
571    }
572
573    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
574    /// implies that `B == Bounded`.
575    pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
576    where
577        B: IsBounded,
578    {
579        KeyedSingleton::new(
580            self.location.clone(),
581            self.ir_node.replace(HydroNode::Placeholder),
582        )
583    }
584
585    /// Gets the value associated with a specific key from the keyed singleton.
586    /// Returns `None` if the key is `None` or there is no associated value.
587    ///
588    /// # Example
589    /// ```rust
590    /// # #[cfg(feature = "deploy")] {
591    /// # use hydro_lang::prelude::*;
592    /// # use futures::StreamExt;
593    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
594    /// let tick = process.tick();
595    /// let keyed_data = process
596    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
597    ///     .into_keyed()
598    ///     .batch(&tick, nondet!(/** test */))
599    ///     .first();
600    /// let key = tick.singleton(q!(1));
601    /// keyed_data.get(key).all_ticks()
602    /// # }, |mut stream| async move {
603    /// // 2
604    /// # assert_eq!(stream.next().await.unwrap(), 2);
605    /// # }));
606    /// # }
607    /// ```
608    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
609    where
610        B: IsBounded,
611        K: Hash + Eq,
612    {
613        self.make_bounded()
614            .into_keyed_stream()
615            .get(key)
616            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
617            .first()
618    }
619
620    /// Emit a keyed stream containing keys shared between the keyed singleton and the
621    /// keyed stream, where each value in the output keyed stream is a tuple of
622    /// (the keyed singleton's value, the keyed stream's value).
623    ///
624    /// # Example
625    /// ```rust
626    /// # #[cfg(feature = "deploy")] {
627    /// # use hydro_lang::prelude::*;
628    /// # use futures::StreamExt;
629    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
630    /// let tick = process.tick();
631    /// let keyed_data = process
632    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
633    ///     .into_keyed()
634    ///     .batch(&tick, nondet!(/** test */))
635    ///     .first();
636    /// let other_data = process
637    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
638    ///     .into_keyed()
639    ///     .batch(&tick, nondet!(/** test */));
640    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
641    /// # }, |mut stream| async move {
642    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
643    /// # let mut results = vec![];
644    /// # for _ in 0..3 {
645    /// #     results.push(stream.next().await.unwrap());
646    /// # }
647    /// # results.sort();
648    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
649    /// # }));
650    /// # }
651    /// ```
652    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2>(
653        self,
654        keyed_stream: KeyedStream<K, V2, L, Bounded, O2, R2>,
655    ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R2>
656    where
657        B: IsBounded,
658        K: Eq + Hash,
659    {
660        self.make_bounded()
661            .entries()
662            .weaken_retries::<R2>()
663            .join(keyed_stream.entries())
664            .into_keyed()
665    }
666
667    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
668    /// where each value in the output keyed singleton is a tuple of
669    /// (self.value, other.value).
670    ///
671    /// # Example
672    /// ```rust
673    /// # #[cfg(feature = "deploy")] {
674    /// # use hydro_lang::prelude::*;
675    /// # use futures::StreamExt;
676    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
677    /// # let tick = process.tick();
678    /// let requests = // { 1: 10, 2: 20, 3: 30 }
679    /// # process
680    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
681    /// #     .into_keyed()
682    /// #     .batch(&tick, nondet!(/** test */))
683    /// #     .first();
684    /// let other = // { 1: 100, 2: 200, 4: 400 }
685    /// # process
686    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
687    /// #     .into_keyed()
688    /// #     .batch(&tick, nondet!(/** test */))
689    /// #     .first();
690    /// requests.join_keyed_singleton(other)
691    /// # .entries().all_ticks()
692    /// # }, |mut stream| async move {
693    /// // { 1: (10, 100), 2: (20, 200) }
694    /// # let mut results = vec![];
695    /// # for _ in 0..2 {
696    /// #     results.push(stream.next().await.unwrap());
697    /// # }
698    /// # results.sort();
699    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
700    /// # }));
701    /// # }
702    /// ```
703    pub fn join_keyed_singleton<V2: Clone>(
704        self,
705        other: KeyedSingleton<K, V2, L, Bounded>,
706    ) -> KeyedSingleton<K, (V, V2), L, Bounded>
707    where
708        B: IsBounded,
709        K: Eq + Hash,
710    {
711        let result_stream = self
712            .make_bounded()
713            .entries()
714            .join(other.entries())
715            .into_keyed();
716
717        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
718        KeyedSingleton::new(
719            result_stream.location.clone(),
720            HydroNode::Cast {
721                inner: Box::new(result_stream.ir_node.replace(HydroNode::Placeholder)),
722                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
723                    K,
724                    (V, V2),
725                    L,
726                    Bounded,
727                >::collection_kind(
728                )),
729            },
730        )
731    }
732
733    /// For each value in `self`, find the matching key in `lookup`.
734    /// The output is a keyed singleton with the key from `self`, and a value
735    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
736    /// If the key is not present in `lookup`, the option will be [`None`].
737    ///
738    /// # Example
739    /// ```rust
740    /// # #[cfg(feature = "deploy")] {
741    /// # use hydro_lang::prelude::*;
742    /// # use futures::StreamExt;
743    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
744    /// # let tick = process.tick();
745    /// let requests = // { 1: 10, 2: 20 }
746    /// # process
747    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
748    /// #     .into_keyed()
749    /// #     .batch(&tick, nondet!(/** test */))
750    /// #     .first();
751    /// let other_data = // { 10: 100, 11: 110 }
752    /// # process
753    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
754    /// #     .into_keyed()
755    /// #     .batch(&tick, nondet!(/** test */))
756    /// #     .first();
757    /// requests.lookup_keyed_singleton(other_data)
758    /// # .entries().all_ticks()
759    /// # }, |mut stream| async move {
760    /// // { 1: (10, Some(100)), 2: (20, None) }
761    /// # let mut results = vec![];
762    /// # for _ in 0..2 {
763    /// #     results.push(stream.next().await.unwrap());
764    /// # }
765    /// # results.sort();
766    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
767    /// # }));
768    /// # }
769    /// ```
770    pub fn lookup_keyed_singleton<V2>(
771        self,
772        lookup: KeyedSingleton<V, V2, L, Bounded>,
773    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
774    where
775        B: IsBounded,
776        K: Eq + Hash + Clone,
777        V: Eq + Hash + Clone,
778        V2: Clone,
779    {
780        let result_stream = self
781            .make_bounded()
782            .into_keyed_stream()
783            .lookup_keyed_stream(lookup.into_keyed_stream());
784
785        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
786        KeyedSingleton::new(
787            result_stream.location.clone(),
788            HydroNode::Cast {
789                inner: Box::new(result_stream.ir_node.replace(HydroNode::Placeholder)),
790                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
791                    K,
792                    (V, Option<V2>),
793                    L,
794                    Bounded,
795                >::collection_kind(
796                )),
797            },
798        )
799    }
800
801    /// For each value in `self`, find the matching key in `lookup`.
802    /// The output is a keyed stream with the key from `self`, and a value
803    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
804    /// If the key is not present in `lookup`, the option will be [`None`].
805    ///
806    /// # Example
807    /// ```rust
808    /// # #[cfg(feature = "deploy")] {
809    /// # use hydro_lang::prelude::*;
810    /// # use futures::StreamExt;
811    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
812    /// # let tick = process.tick();
813    /// let requests = // { 1: 10, 2: 20 }
814    /// # process
815    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
816    /// #     .into_keyed()
817    /// #     .batch(&tick, nondet!(/** test */))
818    /// #     .first();
819    /// let other_data = // { 10: 100, 10: 110 }
820    /// # process
821    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
822    /// #     .into_keyed()
823    /// #     .batch(&tick, nondet!(/** test */));
824    /// requests.lookup_keyed_stream(other_data)
825    /// # .entries().all_ticks()
826    /// # }, |mut stream| async move {
827    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
828    /// # let mut results = vec![];
829    /// # for _ in 0..3 {
830    /// #     results.push(stream.next().await.unwrap());
831    /// # }
832    /// # results.sort();
833    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
834    /// # }));
835    /// # }
836    /// ```
837    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
838        self,
839        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
840    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
841    where
842        B: IsBounded,
843        K: Eq + Hash + Clone,
844        V: Eq + Hash + Clone,
845        V2: Clone,
846    {
847        self.make_bounded()
848            .entries()
849            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
850            .into_keyed()
851            .lookup_keyed_stream(lookup)
852    }
853}
854
855impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
856    KeyedSingleton<K, V, L, B>
857{
858    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
859    ///
860    /// The value for each key must be bounded, otherwise the resulting stream elements would be
861    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
862    /// into the output.
863    ///
864    /// # Example
865    /// ```rust
866    /// # #[cfg(feature = "deploy")] {
867    /// # use hydro_lang::prelude::*;
868    /// # use futures::StreamExt;
869    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
870    /// let keyed_singleton = // { 1: 2, 2: 4 }
871    /// # process
872    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
873    /// #     .into_keyed()
874    /// #     .first();
875    /// keyed_singleton.entries()
876    /// # }, |mut stream| async move {
877    /// // (1, 2), (2, 4) in any order
878    /// # let mut results = Vec::new();
879    /// # for _ in 0..2 {
880    /// #     results.push(stream.next().await.unwrap());
881    /// # }
882    /// # results.sort();
883    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
884    /// # }));
885    /// # }
886    /// ```
887    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
888        self.into_keyed_stream().entries()
889    }
890
891    /// Flattens the keyed singleton into an unordered stream of just the values.
892    ///
893    /// The value for each key must be bounded, otherwise the resulting stream elements would be
894    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
895    /// into the output.
896    ///
897    /// # Example
898    /// ```rust
899    /// # #[cfg(feature = "deploy")] {
900    /// # use hydro_lang::prelude::*;
901    /// # use futures::StreamExt;
902    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
903    /// let keyed_singleton = // { 1: 2, 2: 4 }
904    /// # process
905    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
906    /// #     .into_keyed()
907    /// #     .first();
908    /// keyed_singleton.values()
909    /// # }, |mut stream| async move {
910    /// // 2, 4 in any order
911    /// # let mut results = Vec::new();
912    /// # for _ in 0..2 {
913    /// #     results.push(stream.next().await.unwrap());
914    /// # }
915    /// # results.sort();
916    /// # assert_eq!(results, vec![2, 4]);
917    /// # }));
918    /// # }
919    /// ```
920    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
921        let map_f = q!(|(_, v)| v)
922            .splice_fn1_ctx::<(K, V), V>(&self.location)
923            .into();
924
925        Stream::new(
926            self.location.clone(),
927            HydroNode::Map {
928                f: map_f,
929                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
930                metadata: self.location.new_node_metadata(Stream::<
931                    V,
932                    L,
933                    B::UnderlyingBound,
934                    NoOrder,
935                    ExactlyOnce,
936                >::collection_kind()),
937            },
938        )
939    }
940
941    /// Flattens the keyed singleton into an unordered stream of just the keys.
942    ///
943    /// The value for each key must be bounded, otherwise the removal of keys would result in
944    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
945    /// into the output.
946    ///
947    /// # Example
948    /// ```rust
949    /// # #[cfg(feature = "deploy")] {
950    /// # use hydro_lang::prelude::*;
951    /// # use futures::StreamExt;
952    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
953    /// let keyed_singleton = // { 1: 2, 2: 4 }
954    /// # process
955    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
956    /// #     .into_keyed()
957    /// #     .first();
958    /// keyed_singleton.keys()
959    /// # }, |mut stream| async move {
960    /// // 1, 2 in any order
961    /// # let mut results = Vec::new();
962    /// # for _ in 0..2 {
963    /// #     results.push(stream.next().await.unwrap());
964    /// # }
965    /// # results.sort();
966    /// # assert_eq!(results, vec![1, 2]);
967    /// # }));
968    /// # }
969    /// ```
970    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
971        self.entries().map(q!(|(k, _)| k))
972    }
973
974    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
975    /// entries whose keys are not in the provided stream.
976    ///
977    /// # Example
978    /// ```rust
979    /// # #[cfg(feature = "deploy")] {
980    /// # use hydro_lang::prelude::*;
981    /// # use futures::StreamExt;
982    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
983    /// let tick = process.tick();
984    /// let keyed_singleton = // { 1: 2, 2: 4 }
985    /// # process
986    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
987    /// #     .into_keyed()
988    /// #     .first()
989    /// #     .batch(&tick, nondet!(/** test */));
990    /// let keys_to_remove = process
991    ///     .source_iter(q!(vec![1]))
992    ///     .batch(&tick, nondet!(/** test */));
993    /// keyed_singleton.filter_key_not_in(keys_to_remove)
994    /// #   .entries().all_ticks()
995    /// # }, |mut stream| async move {
996    /// // { 2: 4 }
997    /// # for w in vec![(2, 4)] {
998    /// #     assert_eq!(stream.next().await.unwrap(), w);
999    /// # }
1000    /// # }));
1001    /// # }
1002    /// ```
1003    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1004        self,
1005        other: Stream<K, L, Bounded, O2, R2>,
1006    ) -> Self
1007    where
1008        K: Hash + Eq,
1009    {
1010        check_matching_location(&self.location, &other.location);
1011
1012        KeyedSingleton::new(
1013            self.location.clone(),
1014            HydroNode::AntiJoin {
1015                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1016                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1017                metadata: self.location.new_node_metadata(Self::collection_kind()),
1018            },
1019        )
1020    }
1021
1022    /// An operator which allows you to "inspect" each value of a keyed singleton without
1023    /// modifying it. The closure `f` is called on a reference to each value. This is
1024    /// mainly useful for debugging, and should not be used to generate side-effects.
1025    ///
1026    /// # Example
1027    /// ```rust
1028    /// # #[cfg(feature = "deploy")] {
1029    /// # use hydro_lang::prelude::*;
1030    /// # use futures::StreamExt;
1031    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1032    /// let keyed_singleton = // { 1: 2, 2: 4 }
1033    /// # process
1034    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1035    /// #     .into_keyed()
1036    /// #     .first();
1037    /// keyed_singleton
1038    ///     .inspect(q!(|v| println!("{}", v)))
1039    /// #   .entries()
1040    /// # }, |mut stream| async move {
1041    /// // { 1: 2, 2: 4 }
1042    /// # for w in vec![(1, 2), (2, 4)] {
1043    /// #     assert_eq!(stream.next().await.unwrap(), w);
1044    /// # }
1045    /// # }));
1046    /// # }
1047    /// ```
1048    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1049    where
1050        F: Fn(&V) + 'a,
1051    {
1052        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1053        let inspect_f = q!({
1054            let orig = f;
1055            move |t: &(_, _)| orig(&t.1)
1056        })
1057        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1058        .into();
1059
1060        KeyedSingleton::new(
1061            self.location.clone(),
1062            HydroNode::Inspect {
1063                f: inspect_f,
1064                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1065                metadata: self.location.new_node_metadata(Self::collection_kind()),
1066            },
1067        )
1068    }
1069
1070    /// An operator which allows you to "inspect" each entry of a keyed singleton without
1071    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1072    /// mainly useful for debugging, and should not be used to generate side-effects.
1073    ///
1074    /// # Example
1075    /// ```rust
1076    /// # #[cfg(feature = "deploy")] {
1077    /// # use hydro_lang::prelude::*;
1078    /// # use futures::StreamExt;
1079    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1080    /// let keyed_singleton = // { 1: 2, 2: 4 }
1081    /// # process
1082    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1083    /// #     .into_keyed()
1084    /// #     .first();
1085    /// keyed_singleton
1086    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1087    /// #   .entries()
1088    /// # }, |mut stream| async move {
1089    /// // { 1: 2, 2: 4 }
1090    /// # for w in vec![(1, 2), (2, 4)] {
1091    /// #     assert_eq!(stream.next().await.unwrap(), w);
1092    /// # }
1093    /// # }));
1094    /// # }
1095    /// ```
1096    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1097    where
1098        F: Fn(&(K, V)) + 'a,
1099    {
1100        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1101
1102        KeyedSingleton::new(
1103            self.location.clone(),
1104            HydroNode::Inspect {
1105                f: inspect_f,
1106                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1107                metadata: self.location.new_node_metadata(Self::collection_kind()),
1108            },
1109        )
1110    }
1111
1112    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1113    ///
1114    /// Because this method requires values to be bounded, the output [`Optional`] will only be
1115    /// asynchronously updated if a new key is added that is higher than the previous max key.
1116    ///
1117    /// # Example
1118    /// ```rust
1119    /// # #[cfg(feature = "deploy")] {
1120    /// # use hydro_lang::prelude::*;
1121    /// # use futures::StreamExt;
1122    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1123    /// let tick = process.tick();
1124    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1125    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1126    /// #     .into_keyed()
1127    /// #     .first();
1128    /// keyed_singleton.get_max_key()
1129    /// # .sample_eager(nondet!(/** test */))
1130    /// # }, |mut stream| async move {
1131    /// // (2, 456)
1132    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1133    /// # }));
1134    /// # }
1135    /// ```
1136    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1137    where
1138        K: Ord,
1139    {
1140        self.entries()
1141            .assume_ordering_trusted(nondet!(
1142                /// There is only one element associated with each key, and the keys are totallly
1143                /// ordered so we will produce a deterministic value. The closure technically
1144                /// isn't commutative in the case where both passed entries have the same key
1145                /// but different values.
1146                ///
1147                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1148                /// the two inputs do not have the same key.
1149            ))
1150            .reduce(q!(
1151                move |curr, new| {
1152                    if new.0 > curr.0 {
1153                        *curr = new;
1154                    }
1155                },
1156                idempotent = manual_proof!(/** repeated elements are ignored */)
1157            ))
1158    }
1159
1160    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1161    /// element, the value.
1162    ///
1163    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1164    ///
1165    /// # Example
1166    /// ```rust
1167    /// # #[cfg(feature = "deploy")] {
1168    /// # use hydro_lang::prelude::*;
1169    /// # use futures::StreamExt;
1170    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1171    /// let keyed_singleton = // { 1: 2, 2: 4 }
1172    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1173    /// #     .into_keyed()
1174    /// #     .first();
1175    /// keyed_singleton
1176    ///     .clone()
1177    ///     .into_keyed_stream()
1178    ///     .merge_unordered(
1179    ///         keyed_singleton.into_keyed_stream()
1180    ///     )
1181    /// #   .entries()
1182    /// # }, |mut stream| async move {
1183    /// /// // { 1: [2, 2], 2: [4, 4] }
1184    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1185    /// #     assert_eq!(stream.next().await.unwrap(), w);
1186    /// # }
1187    /// # }));
1188    /// # }
1189    /// ```
1190    pub fn into_keyed_stream(
1191        self,
1192    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1193        KeyedStream::new(
1194            self.location.clone(),
1195            HydroNode::Cast {
1196                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1197                metadata: self.location.new_node_metadata(KeyedStream::<
1198                    K,
1199                    V,
1200                    L,
1201                    B::UnderlyingBound,
1202                    TotalOrder,
1203                    ExactlyOnce,
1204                >::collection_kind()),
1205            },
1206        )
1207    }
1208}
1209
1210impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1211where
1212    L: Location<'a>,
1213{
1214    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1215    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1216    ///
1217    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1218    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1219    /// argument that declares where the keyed singleton will be atomically processed. Batching a
1220    /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1221    /// batching into a different [`Tick`] will introduce asynchrony.
1222    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1223        let out_location = Atomic { tick: tick.clone() };
1224        KeyedSingleton::new(
1225            out_location.clone(),
1226            HydroNode::BeginAtomic {
1227                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1228                metadata: out_location
1229                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1230            },
1231        )
1232    }
1233}
1234
1235impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1236where
1237    L: Location<'a> + NoTick,
1238{
1239    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1240    /// See [`KeyedSingleton::atomic`] for more details.
1241    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1242        KeyedSingleton::new(
1243            self.location.tick.l.clone(),
1244            HydroNode::EndAtomic {
1245                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1246                metadata: self
1247                    .location
1248                    .tick
1249                    .l
1250                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1251            },
1252        )
1253    }
1254}
1255
1256impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1257    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1258    /// tick `T` always has the entries of `self` at tick `T - 1`.
1259    ///
1260    /// At tick `0`, the output has no entries, since there is no previous tick.
1261    ///
1262    /// This operator enables stateful iterative processing with ticks, by sending data from one
1263    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1264    ///
1265    /// # Example
1266    /// ```rust
1267    /// # #[cfg(feature = "deploy")] {
1268    /// # use hydro_lang::prelude::*;
1269    /// # use futures::StreamExt;
1270    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1271    /// let tick = process.tick();
1272    /// # // ticks are lazy by default, forces the second tick to run
1273    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1274    /// # let batch_first_tick = process
1275    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1276    /// #   .batch(&tick, nondet!(/** test */))
1277    /// #   .into_keyed();
1278    /// # let batch_second_tick = process
1279    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1280    /// #   .batch(&tick, nondet!(/** test */))
1281    /// #   .into_keyed()
1282    /// #   .defer_tick(); // appears on the second tick
1283    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1284    /// # batch_first_tick.chain(batch_second_tick).first();
1285    /// input_batch.clone().filter_key_not_in(
1286    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1287    /// )
1288    /// # .entries().all_ticks()
1289    /// # }, |mut stream| async move {
1290    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1291    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1292    /// #     assert_eq!(stream.next().await.unwrap(), w);
1293    /// # }
1294    /// # }));
1295    /// # }
1296    /// ```
1297    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1298        KeyedSingleton::new(
1299            self.location.clone(),
1300            HydroNode::DeferTick {
1301                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1302                metadata: self
1303                    .location
1304                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1305            },
1306        )
1307    }
1308}
1309
1310impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1311where
1312    L: Location<'a>,
1313{
1314    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1315    /// point in time.
1316    ///
1317    /// # Non-Determinism
1318    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1319    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1320    pub fn snapshot(
1321        self,
1322        tick: &Tick<L>,
1323        _nondet: NonDet,
1324    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1325        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1326        KeyedSingleton::new(
1327            tick.clone(),
1328            HydroNode::Batch {
1329                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1330                metadata: tick
1331                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1332            },
1333        )
1334    }
1335}
1336
1337impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1338where
1339    L: Location<'a> + NoTick,
1340{
1341    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1342    /// state of the keyed singleton being atomically processed.
1343    ///
1344    /// # Non-Determinism
1345    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1346    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1347    pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1348        KeyedSingleton::new(
1349            self.location.clone().tick,
1350            HydroNode::Batch {
1351                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1352                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1353                    K,
1354                    V,
1355                    Tick<L>,
1356                    Bounded,
1357                >::collection_kind(
1358                )),
1359            },
1360        )
1361    }
1362}
1363
1364impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1365where
1366    L: Location<'a>,
1367{
1368    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1369    ///
1370    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1371    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1372    /// is filtered out.
1373    ///
1374    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1375    /// not modify or take ownership of the values. If you need to modify the values while filtering
1376    /// use [`KeyedSingleton::filter_map`] instead.
1377    ///
1378    /// # Example
1379    /// ```rust
1380    /// # #[cfg(feature = "deploy")] {
1381    /// # use hydro_lang::prelude::*;
1382    /// # use futures::StreamExt;
1383    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1384    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1385    /// # process
1386    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1387    /// #     .into_keyed()
1388    /// #     .first();
1389    /// keyed_singleton.filter(q!(|&v| v > 1))
1390    /// #   .entries()
1391    /// # }, |mut stream| async move {
1392    /// // { 1: 2, 2: 4 }
1393    /// # let mut results = Vec::new();
1394    /// # for _ in 0..2 {
1395    /// #     results.push(stream.next().await.unwrap());
1396    /// # }
1397    /// # results.sort();
1398    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1399    /// # }));
1400    /// # }
1401    /// ```
1402    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1403    where
1404        F: Fn(&V) -> bool + 'a,
1405    {
1406        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1407        let filter_f = q!({
1408            let orig = f;
1409            move |t: &(_, _)| orig(&t.1)
1410        })
1411        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1412        .into();
1413
1414        KeyedSingleton::new(
1415            self.location.clone(),
1416            HydroNode::Filter {
1417                f: filter_f,
1418                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1419                metadata: self
1420                    .location
1421                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1422            },
1423        )
1424    }
1425
1426    /// An operator that both filters and maps values. It yields only the key-value pairs where
1427    /// the supplied closure `f` returns `Some(value)`.
1428    ///
1429    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1430    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1431    /// If it returns `None`, the key-value pair is filtered out.
1432    ///
1433    /// # Example
1434    /// ```rust
1435    /// # #[cfg(feature = "deploy")] {
1436    /// # use hydro_lang::prelude::*;
1437    /// # use futures::StreamExt;
1438    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1439    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1440    /// # process
1441    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1442    /// #     .into_keyed()
1443    /// #     .first();
1444    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1445    /// #   .entries()
1446    /// # }, |mut stream| async move {
1447    /// // { 1: 42, 3: 100 }
1448    /// # let mut results = Vec::new();
1449    /// # for _ in 0..2 {
1450    /// #     results.push(stream.next().await.unwrap());
1451    /// # }
1452    /// # results.sort();
1453    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1454    /// # }));
1455    /// # }
1456    /// ```
1457    pub fn filter_map<F, U>(
1458        self,
1459        f: impl IntoQuotedMut<'a, F, L> + Copy,
1460    ) -> KeyedSingleton<K, U, L, B>
1461    where
1462        F: Fn(V) -> Option<U> + 'a,
1463    {
1464        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1465        let filter_map_f = q!({
1466            let orig = f;
1467            move |(k, v)| orig(v).map(|o| (k, o))
1468        })
1469        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1470        .into();
1471
1472        KeyedSingleton::new(
1473            self.location.clone(),
1474            HydroNode::FilterMap {
1475                f: filter_map_f,
1476                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1477                metadata: self
1478                    .location
1479                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1480            },
1481        )
1482    }
1483
1484    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1485    /// arrived since the previous batch was released.
1486    ///
1487    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1488    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1489    ///
1490    /// # Non-Determinism
1491    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1492    /// has a non-deterministic set of key-value pairs.
1493    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1494    where
1495        L: NoTick,
1496    {
1497        self.atomic(tick).batch_atomic(nondet)
1498    }
1499}
1500
1501impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1502where
1503    L: Location<'a> + NoTick,
1504{
1505    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1506    /// atomically processed.
1507    ///
1508    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1509    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1510    ///
1511    /// # Non-Determinism
1512    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1513    /// has a non-deterministic set of key-value pairs.
1514    pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1515        let _ = nondet;
1516        KeyedSingleton::new(
1517            self.location.clone().tick,
1518            HydroNode::Batch {
1519                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1520                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1521                    K,
1522                    V,
1523                    Tick<L>,
1524                    Bounded,
1525                >::collection_kind(
1526                )),
1527            },
1528        )
1529    }
1530}
1531
1532#[cfg(test)]
1533mod tests {
1534    #[cfg(feature = "deploy")]
1535    use futures::{SinkExt, StreamExt};
1536    #[cfg(feature = "deploy")]
1537    use hydro_deploy::Deployment;
1538    #[cfg(any(feature = "deploy", feature = "sim"))]
1539    use stageleft::q;
1540
1541    #[cfg(any(feature = "deploy", feature = "sim"))]
1542    use crate::compile::builder::FlowBuilder;
1543    #[cfg(any(feature = "deploy", feature = "sim"))]
1544    use crate::location::Location;
1545    #[cfg(any(feature = "deploy", feature = "sim"))]
1546    use crate::nondet::nondet;
1547
1548    #[cfg(feature = "deploy")]
1549    #[tokio::test]
1550    async fn key_count_bounded_value() {
1551        let mut deployment = Deployment::new();
1552
1553        let mut flow = FlowBuilder::new();
1554        let node = flow.process::<()>();
1555        let external = flow.external::<()>();
1556
1557        let (input_port, input) = node.source_external_bincode(&external);
1558        let out = input
1559            .into_keyed()
1560            .first()
1561            .key_count()
1562            .sample_eager(nondet!(/** test */))
1563            .send_bincode_external(&external);
1564
1565        let nodes = flow
1566            .with_process(&node, deployment.Localhost())
1567            .with_external(&external, deployment.Localhost())
1568            .deploy(&mut deployment);
1569
1570        deployment.deploy().await.unwrap();
1571
1572        let mut external_in = nodes.connect(input_port).await;
1573        let mut external_out = nodes.connect(out).await;
1574
1575        deployment.start().await.unwrap();
1576
1577        assert_eq!(external_out.next().await.unwrap(), 0);
1578
1579        external_in.send((1, 1)).await.unwrap();
1580        assert_eq!(external_out.next().await.unwrap(), 1);
1581
1582        external_in.send((2, 2)).await.unwrap();
1583        assert_eq!(external_out.next().await.unwrap(), 2);
1584    }
1585
1586    #[cfg(feature = "deploy")]
1587    #[tokio::test]
1588    async fn key_count_unbounded_value() {
1589        let mut deployment = Deployment::new();
1590
1591        let mut flow = FlowBuilder::new();
1592        let node = flow.process::<()>();
1593        let external = flow.external::<()>();
1594
1595        let (input_port, input) = node.source_external_bincode(&external);
1596        let out = input
1597            .into_keyed()
1598            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1599            .key_count()
1600            .sample_eager(nondet!(/** test */))
1601            .send_bincode_external(&external);
1602
1603        let nodes = flow
1604            .with_process(&node, deployment.Localhost())
1605            .with_external(&external, deployment.Localhost())
1606            .deploy(&mut deployment);
1607
1608        deployment.deploy().await.unwrap();
1609
1610        let mut external_in = nodes.connect(input_port).await;
1611        let mut external_out = nodes.connect(out).await;
1612
1613        deployment.start().await.unwrap();
1614
1615        assert_eq!(external_out.next().await.unwrap(), 0);
1616
1617        external_in.send((1, 1)).await.unwrap();
1618        assert_eq!(external_out.next().await.unwrap(), 1);
1619
1620        external_in.send((1, 2)).await.unwrap();
1621        assert_eq!(external_out.next().await.unwrap(), 1);
1622
1623        external_in.send((2, 2)).await.unwrap();
1624        assert_eq!(external_out.next().await.unwrap(), 2);
1625
1626        external_in.send((1, 1)).await.unwrap();
1627        assert_eq!(external_out.next().await.unwrap(), 2);
1628
1629        external_in.send((3, 1)).await.unwrap();
1630        assert_eq!(external_out.next().await.unwrap(), 3);
1631    }
1632
1633    #[cfg(feature = "deploy")]
1634    #[tokio::test]
1635    async fn into_singleton_bounded_value() {
1636        let mut deployment = Deployment::new();
1637
1638        let mut flow = FlowBuilder::new();
1639        let node = flow.process::<()>();
1640        let external = flow.external::<()>();
1641
1642        let (input_port, input) = node.source_external_bincode(&external);
1643        let out = input
1644            .into_keyed()
1645            .first()
1646            .into_singleton()
1647            .sample_eager(nondet!(/** test */))
1648            .send_bincode_external(&external);
1649
1650        let nodes = flow
1651            .with_process(&node, deployment.Localhost())
1652            .with_external(&external, deployment.Localhost())
1653            .deploy(&mut deployment);
1654
1655        deployment.deploy().await.unwrap();
1656
1657        let mut external_in = nodes.connect(input_port).await;
1658        let mut external_out = nodes.connect(out).await;
1659
1660        deployment.start().await.unwrap();
1661
1662        assert_eq!(
1663            external_out.next().await.unwrap(),
1664            std::collections::HashMap::new()
1665        );
1666
1667        external_in.send((1, 1)).await.unwrap();
1668        assert_eq!(
1669            external_out.next().await.unwrap(),
1670            vec![(1, 1)].into_iter().collect()
1671        );
1672
1673        external_in.send((2, 2)).await.unwrap();
1674        assert_eq!(
1675            external_out.next().await.unwrap(),
1676            vec![(1, 1), (2, 2)].into_iter().collect()
1677        );
1678    }
1679
1680    #[cfg(feature = "deploy")]
1681    #[tokio::test]
1682    async fn into_singleton_unbounded_value() {
1683        let mut deployment = Deployment::new();
1684
1685        let mut flow = FlowBuilder::new();
1686        let node = flow.process::<()>();
1687        let external = flow.external::<()>();
1688
1689        let (input_port, input) = node.source_external_bincode(&external);
1690        let out = input
1691            .into_keyed()
1692            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1693            .into_singleton()
1694            .sample_eager(nondet!(/** test */))
1695            .send_bincode_external(&external);
1696
1697        let nodes = flow
1698            .with_process(&node, deployment.Localhost())
1699            .with_external(&external, deployment.Localhost())
1700            .deploy(&mut deployment);
1701
1702        deployment.deploy().await.unwrap();
1703
1704        let mut external_in = nodes.connect(input_port).await;
1705        let mut external_out = nodes.connect(out).await;
1706
1707        deployment.start().await.unwrap();
1708
1709        assert_eq!(
1710            external_out.next().await.unwrap(),
1711            std::collections::HashMap::new()
1712        );
1713
1714        external_in.send((1, 1)).await.unwrap();
1715        assert_eq!(
1716            external_out.next().await.unwrap(),
1717            vec![(1, 1)].into_iter().collect()
1718        );
1719
1720        external_in.send((1, 2)).await.unwrap();
1721        assert_eq!(
1722            external_out.next().await.unwrap(),
1723            vec![(1, 2)].into_iter().collect()
1724        );
1725
1726        external_in.send((2, 2)).await.unwrap();
1727        assert_eq!(
1728            external_out.next().await.unwrap(),
1729            vec![(1, 2), (2, 1)].into_iter().collect()
1730        );
1731
1732        external_in.send((1, 1)).await.unwrap();
1733        assert_eq!(
1734            external_out.next().await.unwrap(),
1735            vec![(1, 3), (2, 1)].into_iter().collect()
1736        );
1737
1738        external_in.send((3, 1)).await.unwrap();
1739        assert_eq!(
1740            external_out.next().await.unwrap(),
1741            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1742        );
1743    }
1744
1745    #[cfg(feature = "sim")]
1746    #[test]
1747    fn sim_unbounded_singleton_snapshot() {
1748        let mut flow = FlowBuilder::new();
1749        let node = flow.process::<()>();
1750
1751        let (input_port, input) = node.sim_input();
1752        let output = input
1753            .into_keyed()
1754            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1755            .snapshot(&node.tick(), nondet!(/** test */))
1756            .entries()
1757            .all_ticks()
1758            .sim_output();
1759
1760        let count = flow.sim().exhaustive(async || {
1761            input_port.send((1, 123));
1762            input_port.send((1, 456));
1763            input_port.send((2, 123));
1764
1765            let all = output.collect_sorted::<Vec<_>>().await;
1766            assert_eq!(all.last().unwrap(), &(2, 1));
1767        });
1768
1769        assert_eq!(count, 8);
1770    }
1771
1772    #[cfg(feature = "deploy")]
1773    #[tokio::test]
1774    async fn join_keyed_stream() {
1775        let mut deployment = Deployment::new();
1776
1777        let mut flow = FlowBuilder::new();
1778        let node = flow.process::<()>();
1779        let external = flow.external::<()>();
1780
1781        let tick = node.tick();
1782        let keyed_data = node
1783            .source_iter(q!(vec![(1, 10), (2, 20)]))
1784            .into_keyed()
1785            .batch(&tick, nondet!(/** test */))
1786            .first();
1787        let requests = node
1788            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1789            .into_keyed()
1790            .batch(&tick, nondet!(/** test */));
1791
1792        let out = keyed_data
1793            .join_keyed_stream(requests)
1794            .entries()
1795            .all_ticks()
1796            .send_bincode_external(&external);
1797
1798        let nodes = flow
1799            .with_process(&node, deployment.Localhost())
1800            .with_external(&external, deployment.Localhost())
1801            .deploy(&mut deployment);
1802
1803        deployment.deploy().await.unwrap();
1804
1805        let mut external_out = nodes.connect(out).await;
1806
1807        deployment.start().await.unwrap();
1808
1809        let mut results = vec![];
1810        for _ in 0..2 {
1811            results.push(external_out.next().await.unwrap());
1812        }
1813        results.sort();
1814
1815        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1816    }
1817}