1#![allow(
2 unused,
3 reason = "unused in trybuild but the __staged version is needed"
4)]
5#![allow(missing_docs, reason = "used internally")]
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::net::SocketAddr;
10use std::ops::{Deref, DerefMut};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Duration;
15
16use bytes::BytesMut;
17use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
18use proc_macro2::Span;
19use sinktools::buffered_lazy_sink_source::BufferedLazySinkSource;
20use sinktools::demux_map_lazy::LazyDemuxSink;
21use sinktools::lazy::{LazySink, LazySource};
22use sinktools::lazy_sink_source::LazySinkSource;
23use stageleft::runtime_support::{FreeVariableWithContext, QuoteTokens};
24use stageleft::{QuotedWithContext, q};
25use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
26use tokio::net::{TcpListener, TcpStream};
27use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
28use tracing::{debug, instrument};
29
30use crate::location::dynamic::LocationId;
31use crate::location::member_id::TaglessMemberId;
32use crate::location::{MemberId, MembershipEvent};
33
34pub fn deploy_containerized_o2o(target: &str, bind_addr: &str) -> (syn::Expr, syn::Expr) {
35 (
36 q!(LazySink::<_, _, _, bytes::Bytes>::new(move || Box::pin(
37 async move {
38 let target = target;
39 debug!(name: "connecting", %target);
40 Result::<_, std::io::Error>::Ok(FramedWrite::new(
41 TcpStream::connect(target).await?,
42 LengthDelimitedCodec::new(),
43 ))
44 }
45 )))
46 .splice_untyped_ctx(&()),
47 q!(LazySource::new(move || Box::pin(async move {
48 let listener = TcpListener::bind(bind_addr).await?;
49 let (stream, peer) = listener.accept().await?;
50 debug!(name: "accepting", ?peer);
51 Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
52 })))
53 .splice_untyped_ctx(&()),
54 )
55}
56
57pub fn deploy_containerized_o2m(port: u16) -> (syn::Expr, syn::Expr) {
58 (
59 QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
60 q!(sinktools::demux_map_lazy::<_, _, _, _>(
61 move |key: &TaglessMemberId| {
62 let key = key.clone();
63
64 LazySink::<_, _, _, bytes::Bytes>::new(move || {
65 Box::pin(async move {
66 let port = port;
67 debug!(name: "connecting", target = format!("{}:{}", key.get_container_name(), port));
68 let mut sink = FramedWrite::new(
69 TcpStream::connect(format!(
70 "{}:{}",
71 key.get_container_name(),
72 port
73 ))
74 .await?,
75 LengthDelimitedCodec::new(),
76 );
77
78 Result::<_, std::io::Error>::Ok(sink)
79 })
80 })
81 }
82 )),
83 &(),
84 ),
85 q!(LazySource::new(move || Box::pin(async move {
86 let bind_addr = format!("0.0.0.0:{}", port);
87 debug!(name: "listening", %bind_addr);
88 let listener = TcpListener::bind(bind_addr).await?;
89 let (stream, peer) = listener.accept().await?;
90 debug!(name: "accepting", ?peer);
91
92 Result::<_, std::io::Error>::Ok(FramedRead::new(stream, LengthDelimitedCodec::new()))
93 })))
94 .splice_untyped_ctx(&()),
95 )
96}
97
98pub fn deploy_containerized_m2o(port: u16, target_host: &str) -> (syn::Expr, syn::Expr) {
99 (
100 q!(LazySink::<_, _, _, bytes::Bytes>::new(move || {
101 Box::pin(async move {
102 let target = format!("{}:{}", target_host, port);
103 debug!(name: "connecting", %target);
104
105 let mut sink = FramedWrite::new(
106 TcpStream::connect(target).await?,
107 LengthDelimitedCodec::new(),
108 );
109
110 sink.send(bytes::Bytes::from(
111 bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
112 .unwrap(),
113 ))
114 .await?;
115
116 Result::<_, std::io::Error>::Ok(sink)
117 })
118 }))
119 .splice_untyped_ctx(&()),
120 QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
121 q!(LazySource::new(move || Box::pin(async move {
122 let bind_addr = format!("0.0.0.0:{}", port);
123 debug!(name: "listening", %bind_addr);
124 let listener = TcpListener::bind(bind_addr).await?;
125 Result::<_, std::io::Error>::Ok(
126 futures::stream::unfold(listener, |listener| {
127 Box::pin(async move {
128 let (stream, peer) = listener.accept().await.ok()?;
129 let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
130 let from =
131 bincode::deserialize::<String>(&source.next().await?.ok()?[..])
132 .ok()?;
133
134 debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
135
136 Some((
137 source.map(move |v| {
138 v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
139 }),
140 listener,
141 ))
142 })
143 })
144 .flatten_unordered(None),
145 )
146 }))),
147 &(),
148 ),
149 )
150}
151
152pub fn deploy_containerized_m2m(port: u16) -> (syn::Expr, syn::Expr) {
153 (
154 QuotedWithContext::<'static, LazyDemuxSink<TaglessMemberId, _, _>, ()>::splice_untyped_ctx(
155 q!(sinktools::demux_map_lazy::<_, _, _, _>(
156 move |key: &TaglessMemberId| {
157 let key = key.clone();
158
159 LazySink::<_, _, _, bytes::Bytes>::new(move || {
160 Box::pin(async move {
161 let port = port;
162 debug!(name: "connecting", target = format!("{}:{}", key.get_container_name(), port));
163 let mut sink = FramedWrite::new(
164 TcpStream::connect(format!(
165 "{}:{}",
166 key.get_container_name(),
167 port
168 ))
169 .await?,
170 LengthDelimitedCodec::new(),
171 );
172 debug!(name: "connected", target = format!("{}:{}", key.get_container_name(), port));
173
174 sink.send(bytes::Bytes::from(
175 bincode::serialize(&std::env::var("CONTAINER_NAME").unwrap())
176 .unwrap(),
177 ))
178 .await?;
179
180 Result::<_, std::io::Error>::Ok(sink)
181 })
182 })
183 }
184 )),
185 &(),
186 ),
187 QuotedWithContext::<'static, LazySource<_, _, _, Result<(TaglessMemberId, BytesMut), _>>, ()>::splice_untyped_ctx(
188 q!(LazySource::new(move || Box::pin(async move {
189 let bind_addr = format!("0.0.0.0:{}", port);
190 debug!(name: "listening", %bind_addr);
191 let listener = TcpListener::bind(bind_addr).await?;
192
193 Result::<_, std::io::Error>::Ok(
194 futures::stream::unfold(listener, |listener| {
195 Box::pin(async move {
196 let (stream, peer) = listener.accept().await.ok()?;
197 let mut source = FramedRead::new(stream, LengthDelimitedCodec::new());
198 let from =
199 bincode::deserialize::<String>(&source.next().await?.ok()?[..])
200 .ok()?;
201
202 debug!(name: "accepting", endpoint = format!("{}:{}", peer, from));
203
204 Some((
205 source.map(move |v| {
206 v.map(|v| (TaglessMemberId::from_container_name(from.clone()), v))
207 }),
208 listener,
209 ))
210 })
211 })
212 .flatten_unordered(None),
213 )
214 }))),
215 &(),
216 ),
217 )
218}
219
220pub struct SocketIdent {
221 pub socket_ident: syn::Ident,
222}
223
224impl<Ctx> FreeVariableWithContext<Ctx> for SocketIdent {
225 type O = TcpListener;
226
227 fn to_tokens(self, _ctx: &Ctx) -> QuoteTokens
228 where
229 Self: Sized,
230 {
231 let ident = self.socket_ident;
232
233 QuoteTokens {
234 prelude: None,
235 expr: Some(quote::quote! { #ident }),
236 }
237 }
238}
239
240pub fn deploy_containerized_external_sink_source_ident(socket_ident: syn::Ident) -> syn::Expr {
241 let socket_ident = SocketIdent { socket_ident };
242
243 q!(LazySinkSource::<
248 _,
249 FramedRead<OwnedReadHalf, LengthDelimitedCodec>,
250 FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>,
251 bytes::Bytes,
252 std::io::Error,
253 >::new(async move {
254 let (stream, peer) = socket_ident.accept().await?;
255 debug!(name: "external accepting", ?peer);
256 let (rx, tx) = stream.into_split();
257
258 let fr = FramedRead::new(rx, LengthDelimitedCodec::new());
259 let fw = FramedWrite::new(tx, LengthDelimitedCodec::new());
260
261 Result::<_, std::io::Error>::Ok((fr, fw))
262 },))
263 .splice_untyped_ctx(&())
264}
265
266pub fn cluster_ids<'a>() -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone {
267 q!(Box::leak(Box::new([TaglessMemberId::from_container_name(
271 "INVALID CONTAINER NAME cluster_ids"
272 )]))
273 .as_slice())
274}
275
276pub fn cluster_self_id<'a>() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
277 q!(TaglessMemberId::from_container_name(
278 std::env::var("CONTAINER_NAME").unwrap()
279 ))
280}
281
282pub fn cluster_membership_stream<'a>(
283 location_id: &LocationId,
284) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
285{
286 let raw_id = location_id.raw_id();
287
288 q!(Box::new(self::docker_membership_stream(
289 std::env::var("DEPLOYMENT_INSTANCE").unwrap(),
290 raw_id
291 ))
292 as Box<
293 dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin,
294 >)
295}
296
297#[instrument(skip_all, fields(%deployment_instance, %location_id))]
298fn docker_membership_stream(
299 deployment_instance: String,
300 location_id: usize,
301) -> impl Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin {
302 use bollard::Docker;
303 use bollard::query_parameters::{EventsOptions, ListContainersOptions};
304 use futures::stream::{StreamExt, once};
305 let docker = Docker::connect_with_local_defaults()
306 .unwrap()
307 .with_timeout(Duration::from_secs(1));
308
309 let mut filters = HashMap::new();
310 filters.insert("type".to_string(), vec!["container".to_string()]);
311 filters.insert(
312 "event".to_string(),
313 vec!["start".to_string(), "die".to_string()],
314 );
315 let event_options = Some(EventsOptions {
316 filters: Some(filters),
317 ..Default::default()
318 });
319
320 let events = {
321 let deployment_instance = deployment_instance.clone();
322 docker.events(event_options).filter_map(move |event| {
323 std::future::ready(event.ok().and_then(|e| {
324 let name = e
325 .actor
326 .and_then(|a| a.attributes.and_then(|attrs| attrs.get("name").cloned()))?;
327
328 if name.contains(format!("{deployment_instance}-{location_id}").as_str()) {
329 match e.action.as_deref() {
330 Some("start") => Some((name.clone(), MembershipEvent::Joined)),
331 Some("die") => Some((name, MembershipEvent::Left)),
332 _ => None,
333 }
334 } else {
335 None
336 }
337 }))
338 })
339 };
340
341 let initial = once(async move {
342 let mut filters = HashMap::new();
343
344 filters.insert(
345 "name".to_string(),
346 vec![format!("{deployment_instance}-{location_id}")],
347 );
348
349 let options = Some(ListContainersOptions {
350 filters: Some(filters),
352 ..Default::default()
353 });
354
355 let ret = docker
356 .list_containers(options)
357 .await
358 .unwrap()
359 .into_iter()
360 .filter_map(|c| {
361 c.names
362 .and_then(|names| names.first().map(|n| n.trim_start_matches('/').to_string()))
363 })
364 .map(|name| (name, MembershipEvent::Joined))
365 .collect::<Vec<_>>();
366
367 ret
368 })
369 .flat_map(futures::stream::iter);
370
371 Box::pin(
372 initial
373 .chain(events)
374 .map(|(k, v)| (TaglessMemberId::from_container_name(k), v))
375 .inspect(|(member_id, event)| debug!(name: "membership_event", ?member_id, ?event)),
376 )
377}