hydro_lang/deploy/
deploy_runtime_containerized.rs

1#![allow(
2    unused,
3    reason = "unused in trybuild but the __staged version is needed"
4)]
5#![allow(missing_docs, reason = "used internally")]
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::net::SocketAddr;
10use std::ops::{Deref, DerefMut};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Duration;
15
16use bytes::BytesMut;
17use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
18use proc_macro2::Span;
19use sinktools::buffered_lazy_sink_source::BufferedLazySinkSource;
20use sinktools::demux_map_lazy::LazyDemuxSink;
21use sinktools::lazy::{LazySink, LazySource};
22use sinktools::lazy_sink_source::LazySinkSource;
23use stageleft::runtime_support::{FreeVariableWithContext, QuoteTokens};
24use stageleft::{QuotedWithContext, q};
25use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
26use tokio::net::{TcpListener, TcpStream};
27use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
28use tracing::{debug, instrument};
29
30use crate::location::dynamic::LocationId;
31use crate::location::member_id::TaglessMemberId;
32use crate::location::{MemberId, MembershipEvent};
33
34pub fn deploy_containerized_o2o(target: &str, bind_addr: &str) -> (syn::Expr, syn::Expr) {
35    (
36        q!(LazySink::<_, _, _, bytes::Bytes>::new(move || Box::pin(
37            async move {
38                let target = target;
39                debug!(name: "connecting", %target);
40                Result::<_, std::io::Error>::Ok(FramedWrite::new(
41                    TcpStream::connect(target).await?,
42                    LengthDelimitedCodec::new(),
43                ))
44            }
45        )))
46        .splice_untyped_ctx(&()),
47        q!(LazySource::new(move || Box::pin(async move {
48            let listener = TcpListener::bind(bind_addr).await?;
49            let (stream, peer) = listener.accept().await?;
50            debug!(name: "accepting", ?peer);
51            Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
52        })))
53        .splice_untyped_ctx(&()),
54    )
55}
56
57pub fn deploy_containerized_o2m(port: u16) -> (syn::Expr, syn::Expr) {
58    (
59        QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
60            q!(sinktools::demux_map_lazy::<_, _, _, _>(
61                move |key: &TaglessMemberId| {
62                    let key = key.clone();
63
64                    LazySink::<_, _, _, bytes::Bytes>::new(move || {
65                        Box::pin(async move {
66                            let port = port;
67                            debug!(name: "connecting", target = format!("{}:{}", key.get_container_name(), port));
68                            let mut sink = FramedWrite::new(
69                                TcpStream::connect(format!(
70                                    "{}:{}",
71                                    key.get_container_name(),
72                                    port
73                                ))
74                                .await?,
75                                LengthDelimitedCodec::new(),
76                            );
77
78                            Result::<_, std::io::Error>::Ok(sink)
79                        })
80                    })
81                }
82            )),
83            &(),
84        ),
85        q!(LazySource::new(move || Box::pin(async move {
86            let bind_addr = format!("0.0.0.0:{}", port);
87            debug!(name: "listening", %bind_addr);
88            let listener = TcpListener::bind(bind_addr).await?;
89            let (stream, peer) = listener.accept().await?;
90            debug!(name: "accepting", ?peer);
91
92            Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
93        })))
94        .splice_untyped_ctx(&()),
95    )
96}
97
98pub fn deploy_containerized_m2o(port: u16, target_host: &str) -> (syn::Expr, syn::Expr) {
99    (
100        q!(LazySink::<_, _, _, bytes::Bytes>::new(move || {
101            Box::pin(async move {
102                let target = format!("{}:{}", target_host, port);
103                debug!(name: "connecting", %target);
104
105                let mut sink = FramedWrite::new(
106                    TcpStream::connect(target).await?,
107                    LengthDelimitedCodec::new(),
108                );
109
110                sink.send(bytes::Bytes::from(
111                    bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
112                        .unwrap(),
113                ))
114                .await?;
115
116                Result::<_, std::io::Error>::Ok(sink)
117            })
118        }))
119        .splice_untyped_ctx(&()),
120        QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
121            q!(LazySource::new(move || Box::pin(async move {
122                let bind_addr = format!("0.0.0.0:{}", port);
123                debug!(name: "listening", %bind_addr);
124                let listener = TcpListener::bind(bind_addr).await?;
125                Result::<_, std::io::Error>::Ok(
126                    futures::stream::unfold(listener, |listener| {
127                        Box::pin(async move {
128                            let (stream, peer) = listener.accept().await.ok()?;
129                            let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
130                            let from =
131                                bincode::deserialize::<String>(&source.next().await?.ok()?[..])
132                                    .ok()?;
133
134                            debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
135
136                            Some((
137                                source.map(move |v| {
138                                    v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
139                                }),
140                                listener,
141                            ))
142                        })
143                    })
144                    .flatten_unordered(None),
145                )
146            }))),
147            &(),
148        ),
149    )
150}
151
152pub fn deploy_containerized_m2m(port: u16) -> (syn::Expr, syn::Expr) {
153    (
154        QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
155            q!(sinktools::demux_map_lazy::<_, _, _, _>(
156                move |key: &TaglessMemberId| {
157                    let key = key.clone();
158
159                    LazySink::<_, _, _, bytes::Bytes>::new(move || {
160                        Box::pin(async move {
161                            let port = port;
162                            debug!(name: "connecting", target = format!("{}:{}", key.get_container_name(), port));
163                            let mut sink = FramedWrite::new(
164                                TcpStream::connect(format!(
165                                    "{}:{}",
166                                    key.get_container_name(),
167                                    port
168                                ))
169                                .await?,
170                                LengthDelimitedCodec::new(),
171                            );
172                            debug!(name: "connected", target = format!("{}:{}", key.get_container_name(), port));
173
174                            sink.send(bytes::Bytes::from(
175                                bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
176                                    .unwrap(),
177                            ))
178                            .await?;
179
180                            Result::<_, std::io::Error>::Ok(sink)
181                        })
182                    })
183                }
184            )),
185            &(),
186        ),
187        QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
188            q!(LazySource::new(move || Box::pin(async move {
189                let bind_addr = format!("0.0.0.0:{}", port);
190                debug!(name: "listening", %bind_addr);
191                let listener = TcpListener::bind(bind_addr).await?;
192
193                Result::<_, std::io::Error>::Ok(
194                    futures::stream::unfold(listener, |listener| {
195                        Box::pin(async move {
196                            let (stream, peer) = listener.accept().await.ok()?;
197                            let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
198                            let from =
199                                bincode::deserialize::<String>(&source.next().await?.ok()?[..])
200                                    .ok()?;
201
202                            debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
203
204                            Some((
205                                source.map(move |v| {
206                                    v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
207                                }),
208                                listener,
209                            ))
210                        })
211                    })
212                    .flatten_unordered(None),
213                )
214            }))),
215            &(),
216        ),
217    )
218}
219
220pub struct SocketIdent {
221    pub socket_ident: syn::Ident,
222}
223
224impl<Ctx> FreeVariableWithContext<Ctx> for SocketIdent {
225    type O = TcpListener;
226
227    fn to_tokens(self, _ctx: &Ctx) -> QuoteTokens
228    where
229        Self: Sized,
230    {
231        let ident = self.socket_ident;
232
233        QuoteTokens {
234            prelude: None,
235            expr: Some(quote::quote! { #ident }),
236        }
237    }
238}
239
240pub fn deploy_containerized_external_sink_source_ident(socket_ident: syn::Ident) -> syn::Expr {
241    let socket_ident = SocketIdent { socket_ident };
242
243    // q!(BufferedLazySinkSource::<
244    //     bytes::Bytes,
245    //     Result<bytes::BytesMut, std::io::Error>,
246    //     std::io::Error,
247    q!(LazySinkSource::<
248        _,
249        FramedRead<OwnedReadHalf, LengthDelimitedCodec>,
250        FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>,
251        bytes::Bytes,
252        std::io::Error,
253    >::new(async move {
254        let (stream, peer) = socket_ident.accept().await?;
255        debug!(name: "external accepting", ?peer);
256        let (rx, tx) = stream.into_split();
257
258        let fr = FramedRead::new(rx, LengthDelimitedCodec::new());
259        let fw = FramedWrite::new(tx, LengthDelimitedCodec::new());
260
261        Result::<_, std::io::Error>::Ok((fr, fw))
262    },))
263    .splice_untyped_ctx(&())
264}
265
266pub fn cluster_ids<'a>() -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone {
267    // unimplemented!(); // this is unused.
268
269    // This is a dummy piece of code, since clusters are dynamic when containerized.
270    q!(Box::leak(Box::new([TaglessMemberId::from_container_name(
271        "INVALID CONTAINER NAME cluster_ids"
272    )]))
273    .as_slice())
274}
275
276pub fn cluster_self_id<'a>() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
277    q!(TaglessMemberId::from_container_name(
278        std::env::var("CONTAINER_NAME").unwrap()
279    ))
280}
281
282pub fn cluster_membership_stream<'a>(
283    location_id: &LocationId,
284) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
285{
286    let raw_id = location_id.raw_id();
287
288    q!(Box::new(self::docker_membership_stream(
289        std::env::var("DEPLOYMENT_INSTANCE").unwrap(),
290        raw_id
291    ))
292        as Box<
293            dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin,
294        >)
295}
296
297#[instrument(skip_all, fields(%deployment_instance, %location_id))]
298fn docker_membership_stream(
299    deployment_instance: String,
300    location_id: usize,
301) -> impl Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin {
302    use bollard::Docker;
303    use bollard::query_parameters::{EventsOptions, ListContainersOptions};
304    use futures::stream::{StreamExt, once};
305    let docker = Docker::connect_with_local_defaults()
306        .unwrap()
307        .with_timeout(Duration::from_secs(1));
308
309    let mut filters = HashMap::new();
310    filters.insert("type".to_string(), vec!["container".to_string()]);
311    filters.insert(
312        "event".to_string(),
313        vec!["start".to_string(), "die".to_string()],
314    );
315    let event_options = Some(EventsOptions {
316        filters: Some(filters),
317        ..Default::default()
318    });
319
320    let events = {
321        let deployment_instance = deployment_instance.clone();
322        docker.events(event_options).filter_map(move |event| {
323            std::future::ready(event.ok().and_then(|e| {
324                let name = e
325                    .actor
326                    .and_then(|a| a.attributes.and_then(|attrs| attrs.get("name").cloned()))?;
327
328                if name.contains(format!("{deployment_instance}-{location_id}").as_str()) {
329                    match e.action.as_deref() {
330                        Some("start") => Some((name.clone(), MembershipEvent::Joined)),
331                        Some("die") => Some((name, MembershipEvent::Left)),
332                        _ => None,
333                    }
334                } else {
335                    None
336                }
337            }))
338        })
339    };
340
341    let initial = once(async move {
342        let mut filters = HashMap::new();
343
344        filters.insert(
345            "name".to_string(),
346            vec![format!("{deployment_instance}-{location_id}")],
347        );
348
349        let options = Some(ListContainersOptions {
350            // all: true,
351            filters: Some(filters),
352            ..Default::default()
353        });
354
355        let ret = docker
356            .list_containers(options)
357            .await
358            .unwrap()
359            .into_iter()
360            .filter_map(|c| {
361                c.names
362                    .and_then(|names| names.first().map(|n| n.trim_start_matches('/').to_string()))
363            })
364            .map(|name| (name, MembershipEvent::Joined))
365            .collect::<Vec<_>>();
366
367        ret
368    })
369    .flat_map(futures::stream::iter);
370
371    Box::pin(
372        initial
373            .chain(events)
374            .map(|(k, v)| (TaglessMemberId::from_container_name(k), v))
375            .inspect(|(member_id, event)| debug!(name: "membership_event", ?member_id, ?event)),
376    )
377}