Skip to main content

sinktools/
demux_map.rs

1//! [`DemuxMap`] and related items.
2use core::fmt::Debug;
3use core::hash::Hash;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use std::collections::HashMap;
7
8use crate::{Sink, ready_both};
9
10/// Sink which receives keys paired with items `(Key, Item)`, and pushes to the corresponding output sink in a [`HashMap`] of sinks.
11pub struct DemuxMap<Key, Si> {
12    sinks: HashMap<Key, Si>,
13}
14
15impl<Key, Si> DemuxMap<Key, Si> {
16    /// Create with the given next `sinks` map.
17    pub fn new<Item>(sinks: impl Into<HashMap<Key, Si>>) -> Self
18    where
19        Self: Sink<(Key, Item)>,
20    {
21        Self {
22            sinks: sinks.into(),
23        }
24    }
25}
26
27impl<Key, Si, Item> Sink<(Key, Item)> for DemuxMap<Key, Si>
28where
29    Key: Eq + Hash + Debug + Unpin,
30    Si: Sink<Item> + Unpin,
31{
32    type Error = Si::Error;
33
34    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
35        #[expect(
36            clippy::disallowed_methods,
37            reason = "nondeterministic iteration order, the `try_fold` is not order-dependent"
38        )]
39        self.get_mut()
40            .sinks
41            .values_mut()
42            .try_fold(Poll::Ready(()), |poll, sink| {
43                ready_both!(poll, Pin::new(sink).poll_ready(cx)?);
44                Poll::Ready(Ok(()))
45            })
46    }
47
48    fn start_send(self: Pin<&mut Self>, item: (Key, Item)) -> Result<(), Self::Error> {
49        let sink = self
50            .get_mut()
51            .sinks
52            .get_mut(&item.0)
53            .unwrap_or_else(|| panic!("`DemuxMap` missing key {:?}", item.0));
54        Pin::new(sink).start_send(item.1)
55    }
56
57    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58        #[expect(
59            clippy::disallowed_methods,
60            reason = "nondeterministic iteration order, the `try_fold` is not order-dependent"
61        )]
62        self.get_mut()
63            .sinks
64            .values_mut()
65            .try_fold(Poll::Ready(()), |poll, sink| {
66                ready_both!(poll, Pin::new(sink).poll_flush(cx)?);
67                Poll::Ready(Ok(()))
68            })
69    }
70
71    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72        #[expect(
73            clippy::disallowed_methods,
74            reason = "nondeterministic iteration order, the `try_fold` is not order-dependent"
75        )]
76        self.get_mut()
77            .sinks
78            .values_mut()
79            .try_fold(Poll::Ready(()), |poll, sink| {
80                ready_both!(poll, Pin::new(sink).poll_close(cx)?);
81                Poll::Ready(Ok(()))
82            })
83    }
84}
85
86/// Creates a `DemuxMap` sink that sends each item to one of many outputs, depending on the key.
87///
88/// This requires sinks `Si` to be `Unpin`. If your sinks are not `Unpin`, first wrap them in `Box::pin` to make them `Unpin`.
89pub fn demux_map<Key, Si, Item>(sinks: impl Into<HashMap<Key, Si>>) -> DemuxMap<Key, Si>
90where
91    Key: Eq + Hash + Debug + Unpin,
92    Si: Sink<Item> + Unpin,
93{
94    DemuxMap::new(sinks)
95}