Skip to main content

dfir_rs/compiled/pull/
lattice_bimorphism.rs

1use std::cell::RefCell;
2use std::pin::Pin;
3
4use dfir_pipes::pull::{FusedPull, Pull, PullStep};
5use dfir_pipes::{Context, Toggle};
6use lattices::{LatticeBimorphism, Merge};
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Pull combinator for lattice bimorphism operations.
11    #[must_use = "pull do nothing unless polled"]
12    pub struct LatticeBimorphismPull<'a, Func, LhsPrev, RhsPrev, LhsState, RhsState, Output> {
13        #[pin]
14        lhs_prev: LhsPrev,
15        #[pin]
16        rhs_prev: RhsPrev,
17
18        func: Func,
19
20        lhs_state: &'a RefCell<LhsState>,
21        rhs_state: &'a RefCell<RhsState>,
22
23        output: Option<Output>,
24    }
25}
26
27impl<'a, Func, LhsPrev, RhsPrev, LhsState, RhsState, Output>
28    LatticeBimorphismPull<'a, Func, LhsPrev, RhsPrev, LhsState, RhsState, Output>
29where
30    Func: 'a
31        + LatticeBimorphism<LhsState, RhsPrev::Item, Output = Output>
32        + LatticeBimorphism<LhsPrev::Item, RhsState, Output = Output>,
33    LhsPrev: 'a + FusedPull,
34    RhsPrev: 'a + FusedPull,
35    LhsState: 'static + Clone,
36    RhsState: 'static + Clone,
37    Output: Merge<Output>,
38{
39    /// Creates a new `LatticeBimorphismPull`.
40    pub fn new(
41        lhs_prev: LhsPrev,
42        rhs_prev: RhsPrev,
43        func: Func,
44        lhs_state: &'a RefCell<LhsState>,
45        rhs_state: &'a RefCell<RhsState>,
46    ) -> Self {
47        Self {
48            lhs_prev,
49            rhs_prev,
50            func,
51            lhs_state,
52            rhs_state,
53            output: None,
54        }
55    }
56}
57
58impl<'a, Func, LhsPrev, RhsPrev, LhsState, RhsState, Output> Pull
59    for LatticeBimorphismPull<'a, Func, LhsPrev, RhsPrev, LhsState, RhsState, Output>
60where
61    Func: 'a
62        + LatticeBimorphism<LhsState, RhsPrev::Item, Output = Output>
63        + LatticeBimorphism<LhsPrev::Item, RhsState, Output = Output>,
64    LhsPrev: 'a + FusedPull,
65    RhsPrev: 'a + FusedPull,
66    LhsState: 'static + Clone,
67    RhsState: 'static + Clone,
68    Output: Merge<Output>,
69{
70    type Ctx<'ctx> = <LhsPrev::Ctx<'ctx> as Context<'ctx>>::Merged<RhsPrev::Ctx<'ctx>>;
71
72    type Item = Output;
73    type Meta = ();
74    type CanPend = <LhsPrev::CanPend as Toggle>::Or<RhsPrev::CanPend>;
75    type CanEnd = <LhsPrev::CanEnd as Toggle>::And<RhsPrev::CanEnd>;
76
77    fn pull(
78        self: Pin<&mut Self>,
79        ctx: &mut Self::Ctx<'_>,
80    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
81        let mut this = self.project();
82
83        // Both streams may continue to be polled EOS (`None`) on subsequent loops or calls, so they must be fused.
84        loop {
85            let mut progress = false;
86
87            let lhs_step = this
88                .lhs_prev
89                .as_mut()
90                .pull(<LhsPrev::Ctx<'_> as Context<'_>>::unmerge_self(ctx));
91            let lhs_pending = matches!(lhs_step, PullStep::Pending(_));
92            if let PullStep::Ready(lhs_item, _meta) = lhs_step {
93                progress = true;
94                let delta = this.func.call(lhs_item, this.rhs_state.borrow().clone());
95                if let Some(output) = this.output.as_mut() {
96                    output.merge(delta);
97                } else {
98                    *this.output = Some(delta);
99                }
100            }
101
102            let rhs_step = this
103                .rhs_prev
104                .as_mut()
105                .pull(<LhsPrev::Ctx<'_> as Context<'_>>::unmerge_other(ctx));
106            let rhs_pending = matches!(rhs_step, PullStep::Pending(_));
107            if let PullStep::Ready(rhs_item, _meta) = rhs_step {
108                progress = true;
109                let delta = this.func.call(this.lhs_state.borrow().clone(), rhs_item);
110                if let Some(output) = this.output.as_mut() {
111                    output.merge(delta);
112                } else {
113                    *this.output = Some(delta);
114                }
115            }
116
117            if lhs_pending && rhs_pending {
118                return PullStep::pending();
119            }
120
121            // Exit EOS condition.
122            if !progress {
123                // Never spin, always exit if no progress has been made.
124                return if lhs_pending || rhs_pending {
125                    PullStep::pending()
126                } else {
127                    // EXIT: Release output once, then EOS.
128                    if let Some(output) = this.output.take() {
129                        PullStep::Ready(output, ())
130                    } else {
131                        PullStep::ended()
132                    }
133                };
134            }
135        }
136    }
137}