1use core::fmt::Debug;
3use core::hash::Hash;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use std::collections::HashMap;
7
8use crate::{Sink, ready_both};
9
10pub struct DemuxMap<Key, Si> {
12 sinks: HashMap<Key, Si>,
13}
14
15impl<Key, Si> DemuxMap<Key, Si> {
16 pub fn new<Item>(sinks: impl Into<HashMap<Key, Si>>) -> Self
18 where
19 Self: Sink<(Key, Item)>,
20 {
21 Self {
22 sinks: sinks.into(),
23 }
24 }
25}
26
27impl<Key, Si, Item> Sink<(Key, Item)> for DemuxMap<Key, Si>
28where
29 Key: Eq + Hash + Debug + Unpin,
30 Si: Sink<Item> + Unpin,
31{
32 type Error = Si::Error;
33
34 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
35 #[expect(
36 clippy::disallowed_methods,
37 reason = "nondeterministic iteration order, the `try_fold` is not order-dependent"
38 )]
39 self.get_mut()
40 .sinks
41 .values_mut()
42 .try_fold(Poll::Ready(()), |poll, sink| {
43 ready_both!(poll, Pin::new(sink).poll_ready(cx)?);
44 Poll::Ready(Ok(()))
45 })
46 }
47
48 fn start_send(self: Pin<&mut Self>, item: (Key, Item)) -> Result<(), Self::Error> {
49 let sink = self
50 .get_mut()
51 .sinks
52 .get_mut(&item.0)
53 .unwrap_or_else(|| panic!("`DemuxMap` missing key {:?}", item.0));
54 Pin::new(sink).start_send(item.1)
55 }
56
57 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58 #[expect(
59 clippy::disallowed_methods,
60 reason = "nondeterministic iteration order, the `try_fold` is not order-dependent"
61 )]
62 self.get_mut()
63 .sinks
64 .values_mut()
65 .try_fold(Poll::Ready(()), |poll, sink| {
66 ready_both!(poll, Pin::new(sink).poll_flush(cx)?);
67 Poll::Ready(Ok(()))
68 })
69 }
70
71 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72 #[expect(
73 clippy::disallowed_methods,
74 reason = "nondeterministic iteration order, the `try_fold` is not order-dependent"
75 )]
76 self.get_mut()
77 .sinks
78 .values_mut()
79 .try_fold(Poll::Ready(()), |poll, sink| {
80 ready_both!(poll, Pin::new(sink).poll_close(cx)?);
81 Poll::Ready(Ok(()))
82 })
83 }
84}
85
86pub fn demux_map<Key, Si, Item>(sinks: impl Into<HashMap<Key, Si>>) -> DemuxMap<Key, Si>
90where
91 Key: Eq + Hash + Debug + Unpin,
92 Si: Sink<Item> + Unpin,
93{
94 DemuxMap::new(sinks)
95}