Skip to main content

hydro_lang/location/
external_process.rs

1use std::marker::PhantomData;
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5
6use crate::compile::builder::{ExternalPortId, FlowState};
7use crate::live_collections::stream::{ExactlyOnce, Ordering, Retries, TotalOrder};
8use crate::location::LocationKey;
9use crate::staging_util::Invariant;
10
11pub enum NotMany {}
12pub enum Many {}
13
14pub struct ExternalBytesPort<Many = NotMany> {
15    pub(crate) process_key: LocationKey,
16    pub(crate) port_id: ExternalPortId,
17    pub(crate) _phantom: PhantomData<Many>,
18}
19
20impl Clone for ExternalBytesPort<Many> {
21    fn clone(&self) -> Self {
22        Self {
23            process_key: self.process_key,
24            port_id: self.port_id,
25            _phantom: Default::default(),
26        }
27    }
28}
29
30pub struct ExternalBincodeSink<
31    Type,
32    Many = NotMany,
33    O: Ordering = TotalOrder,
34    R: Retries = ExactlyOnce,
35> where
36    Type: Serialize,
37{
38    pub(crate) process_key: LocationKey,
39    pub(crate) port_id: ExternalPortId,
40    pub(crate) _phantom: PhantomData<(Type, Many, O, R)>,
41}
42
43impl<T: Serialize, O: Ordering, R: Retries> Clone for ExternalBincodeSink<T, Many, O, R> {
44    fn clone(&self) -> Self {
45        Self {
46            process_key: self.process_key,
47            port_id: self.port_id,
48            _phantom: Default::default(),
49        }
50    }
51}
52
53pub struct ExternalBincodeBidi<InType, OutType, Many = NotMany> {
54    pub(crate) process_key: LocationKey,
55    pub(crate) port_id: ExternalPortId,
56    pub(crate) _phantom: PhantomData<(InType, OutType, Many)>,
57}
58
59impl<InT, OutT> Clone for ExternalBincodeBidi<InT, OutT, Many> {
60    fn clone(&self) -> Self {
61        Self {
62            process_key: self.process_key,
63            port_id: self.port_id,
64            _phantom: Default::default(),
65        }
66    }
67}
68
69pub struct ExternalBincodeStream<Type, O: Ordering = TotalOrder, R: Retries = ExactlyOnce>
70where
71    Type: DeserializeOwned,
72{
73    #[cfg_attr(
74        not(feature = "build"),
75        expect(unused, reason = "unused without feature")
76    )]
77    pub(crate) process_key: LocationKey,
78    #[cfg_attr(
79        not(feature = "build"),
80        expect(unused, reason = "unused without feature")
81    )]
82    pub(crate) port_id: ExternalPortId,
83    pub(crate) _phantom: PhantomData<(Type, O, R)>,
84}
85
86pub struct External<'a, Tag> {
87    pub(crate) key: LocationKey,
88
89    pub(crate) flow_state: FlowState,
90
91    pub(crate) _phantom: Invariant<'a, Tag>,
92}
93
94impl<P> Clone for External<'_, P> {
95    fn clone(&self) -> Self {
96        External {
97            key: self.key,
98            flow_state: self.flow_state.clone(),
99            _phantom: PhantomData,
100        }
101    }
102}