1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate, TracingResults};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use stageleft::{QuotedWithContext, RuntimeData};
25use syn::parse_quote;
26
27use super::deploy_runtime::*;
28use crate::compile::deploy_provider::{
29 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
30};
31use crate::compile::trybuild::generate::{HYDRO_RUNTIME_FEATURES, create_graph_trybuild};
32use crate::location::dynamic::LocationId;
33use crate::location::member_id::TaglessMemberId;
34use crate::location::{MembershipEvent, NetworkHint};
35use crate::staging_util::get_this_crate;
36
37pub enum HydroDeploy {}
42
43impl<'a> Deploy<'a> for HydroDeploy {
44 type InstantiateEnv = Deployment;
45 type Process = DeployNode;
46 type Cluster = DeployCluster;
47 type External = DeployExternal;
48 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
49 type GraphId = ();
50 type Port = String;
51 type ExternalRawPort = CustomClientPort;
52
53 fn allocate_process_port(process: &Self::Process) -> Self::Port {
54 process.next_port()
55 }
56
57 fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port {
58 cluster.next_port()
59 }
60
61 fn allocate_external_port(external: &Self::External) -> Self::Port {
62 external.next_port()
63 }
64
65 fn o2o_sink_source(
66 _p1: &Self::Process,
67 p1_port: &Self::Port,
68 _p2: &Self::Process,
69 p2_port: &Self::Port,
70 ) -> (syn::Expr, syn::Expr) {
71 let p1_port = p1_port.as_str();
72 let p2_port = p2_port.as_str();
73 deploy_o2o(
74 RuntimeData::new("__hydro_lang_trybuild_cli"),
75 p1_port,
76 p2_port,
77 )
78 }
79
80 fn o2o_connect(
81 p1: &Self::Process,
82 p1_port: &Self::Port,
83 p2: &Self::Process,
84 p2_port: &Self::Port,
85 ) -> Box<dyn FnOnce()> {
86 let p1 = p1.clone();
87 let p1_port = p1_port.clone();
88 let p2 = p2.clone();
89 let p2_port = p2_port.clone();
90
91 Box::new(move || {
92 let self_underlying_borrow = p1.underlying.borrow();
93 let self_underlying = self_underlying_borrow.as_ref().unwrap();
94 let source_port = self_underlying.get_port(p1_port.clone());
95
96 let other_underlying_borrow = p2.underlying.borrow();
97 let other_underlying = other_underlying_borrow.as_ref().unwrap();
98 let recipient_port = other_underlying.get_port(p2_port.clone());
99
100 source_port.send_to(&recipient_port)
101 })
102 }
103
104 fn o2m_sink_source(
105 _p1: &Self::Process,
106 p1_port: &Self::Port,
107 _c2: &Self::Cluster,
108 c2_port: &Self::Port,
109 ) -> (syn::Expr, syn::Expr) {
110 let p1_port = p1_port.as_str();
111 let c2_port = c2_port.as_str();
112 deploy_o2m(
113 RuntimeData::new("__hydro_lang_trybuild_cli"),
114 p1_port,
115 c2_port,
116 )
117 }
118
119 fn o2m_connect(
120 p1: &Self::Process,
121 p1_port: &Self::Port,
122 c2: &Self::Cluster,
123 c2_port: &Self::Port,
124 ) -> Box<dyn FnOnce()> {
125 let p1 = p1.clone();
126 let p1_port = p1_port.clone();
127 let c2 = c2.clone();
128 let c2_port = c2_port.clone();
129
130 Box::new(move || {
131 let self_underlying_borrow = p1.underlying.borrow();
132 let self_underlying = self_underlying_borrow.as_ref().unwrap();
133 let source_port = self_underlying.get_port(p1_port.clone());
134
135 let recipient_port = DemuxSink {
136 demux: c2
137 .members
138 .borrow()
139 .iter()
140 .enumerate()
141 .map(|(id, c)| {
142 (
143 id as u32,
144 Arc::new(c.underlying.get_port(c2_port.clone()))
145 as Arc<dyn RustCrateSink + 'static>,
146 )
147 })
148 .collect(),
149 };
150
151 source_port.send_to(&recipient_port)
152 })
153 }
154
155 fn m2o_sink_source(
156 _c1: &Self::Cluster,
157 c1_port: &Self::Port,
158 _p2: &Self::Process,
159 p2_port: &Self::Port,
160 ) -> (syn::Expr, syn::Expr) {
161 let c1_port = c1_port.as_str();
162 let p2_port = p2_port.as_str();
163 deploy_m2o(
164 RuntimeData::new("__hydro_lang_trybuild_cli"),
165 c1_port,
166 p2_port,
167 )
168 }
169
170 fn m2o_connect(
171 c1: &Self::Cluster,
172 c1_port: &Self::Port,
173 p2: &Self::Process,
174 p2_port: &Self::Port,
175 ) -> Box<dyn FnOnce()> {
176 let c1 = c1.clone();
177 let c1_port = c1_port.clone();
178 let p2 = p2.clone();
179 let p2_port = p2_port.clone();
180
181 Box::new(move || {
182 let other_underlying_borrow = p2.underlying.borrow();
183 let other_underlying = other_underlying_borrow.as_ref().unwrap();
184 let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
185
186 for (i, node) in c1.members.borrow().iter().enumerate() {
187 let source_port = node.underlying.get_port(c1_port.clone());
188
189 TaggedSource {
190 source: Arc::new(source_port),
191 tag: i as u32,
192 }
193 .send_to(&recipient_port);
194 }
195 })
196 }
197
198 fn m2m_sink_source(
199 _c1: &Self::Cluster,
200 c1_port: &Self::Port,
201 _c2: &Self::Cluster,
202 c2_port: &Self::Port,
203 ) -> (syn::Expr, syn::Expr) {
204 let c1_port = c1_port.as_str();
205 let c2_port = c2_port.as_str();
206 deploy_m2m(
207 RuntimeData::new("__hydro_lang_trybuild_cli"),
208 c1_port,
209 c2_port,
210 )
211 }
212
213 fn m2m_connect(
214 c1: &Self::Cluster,
215 c1_port: &Self::Port,
216 c2: &Self::Cluster,
217 c2_port: &Self::Port,
218 ) -> Box<dyn FnOnce()> {
219 let c1 = c1.clone();
220 let c1_port = c1_port.clone();
221 let c2 = c2.clone();
222 let c2_port = c2_port.clone();
223
224 Box::new(move || {
225 for (i, sender) in c1.members.borrow().iter().enumerate() {
226 let source_port = sender.underlying.get_port(c1_port.clone());
227
228 let recipient_port = DemuxSink {
229 demux: c2
230 .members
231 .borrow()
232 .iter()
233 .enumerate()
234 .map(|(id, c)| {
235 (
236 id as u32,
237 Arc::new(c.underlying.get_port(c2_port.clone()).merge())
238 as Arc<dyn RustCrateSink + 'static>,
239 )
240 })
241 .collect(),
242 };
243
244 TaggedSource {
245 source: Arc::new(source_port),
246 tag: i as u32,
247 }
248 .send_to(&recipient_port);
249 }
250 })
251 }
252
253 fn e2o_many_source(
254 extra_stmts: &mut Vec<syn::Stmt>,
255 _p2: &Self::Process,
256 p2_port: &Self::Port,
257 codec_type: &syn::Type,
258 shared_handle: String,
259 ) -> syn::Expr {
260 let connect_ident = syn::Ident::new(
261 &format!("__hydro_deploy_many_{}_connect", &shared_handle),
262 Span::call_site(),
263 );
264 let source_ident = syn::Ident::new(
265 &format!("__hydro_deploy_many_{}_source", &shared_handle),
266 Span::call_site(),
267 );
268 let sink_ident = syn::Ident::new(
269 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
270 Span::call_site(),
271 );
272 let membership_ident = syn::Ident::new(
273 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
274 Span::call_site(),
275 );
276
277 let root = get_this_crate();
278
279 extra_stmts.push(syn::parse_quote! {
280 let #connect_ident = __hydro_lang_trybuild_cli
281 .port(#p2_port)
282 .connect::<#root::runtime_support::dfir_rs::util::deploy::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
283 });
284
285 extra_stmts.push(syn::parse_quote! {
286 let #source_ident = #connect_ident.source;
287 });
288
289 extra_stmts.push(syn::parse_quote! {
290 let #sink_ident = #connect_ident.sink;
291 });
292
293 extra_stmts.push(syn::parse_quote! {
294 let #membership_ident = #connect_ident.membership;
295 });
296
297 parse_quote!(#source_ident)
298 }
299
300 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
301 let sink_ident = syn::Ident::new(
302 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
303 Span::call_site(),
304 );
305 parse_quote!(#sink_ident)
306 }
307
308 fn e2o_source(
309 extra_stmts: &mut Vec<syn::Stmt>,
310 _p1: &Self::External,
311 _p1_port: &Self::Port,
312 _p2: &Self::Process,
313 p2_port: &Self::Port,
314 codec_type: &syn::Type,
315 shared_handle: String,
316 ) -> syn::Expr {
317 let connect_ident = syn::Ident::new(
318 &format!("__hydro_deploy_{}_connect", &shared_handle),
319 Span::call_site(),
320 );
321 let source_ident = syn::Ident::new(
322 &format!("__hydro_deploy_{}_source", &shared_handle),
323 Span::call_site(),
324 );
325 let sink_ident = syn::Ident::new(
326 &format!("__hydro_deploy_{}_sink", &shared_handle),
327 Span::call_site(),
328 );
329
330 let root = get_this_crate();
331
332 extra_stmts.push(syn::parse_quote! {
333 let #connect_ident = __hydro_lang_trybuild_cli
334 .port(#p2_port)
335 .connect::<#root::runtime_support::dfir_rs::util::deploy::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
336 });
337
338 extra_stmts.push(syn::parse_quote! {
339 let #source_ident = #connect_ident.source;
340 });
341
342 extra_stmts.push(syn::parse_quote! {
343 let #sink_ident = #connect_ident.sink;
344 });
345
346 parse_quote!(#source_ident)
347 }
348
349 fn e2o_connect(
350 p1: &Self::External,
351 p1_port: &Self::Port,
352 p2: &Self::Process,
353 p2_port: &Self::Port,
354 _many: bool,
355 server_hint: NetworkHint,
356 ) -> Box<dyn FnOnce()> {
357 let p1 = p1.clone();
358 let p1_port = p1_port.clone();
359 let p2 = p2.clone();
360 let p2_port = p2_port.clone();
361
362 Box::new(move || {
363 let self_underlying_borrow = p1.underlying.borrow();
364 let self_underlying = self_underlying_borrow.as_ref().unwrap();
365 let source_port = self_underlying.declare_many_client();
366
367 let other_underlying_borrow = p2.underlying.borrow();
368 let other_underlying = other_underlying_borrow.as_ref().unwrap();
369 let recipient_port = other_underlying.get_port_with_hint(
370 p2_port.clone(),
371 match server_hint {
372 NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
373 NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
374 },
375 );
376
377 source_port.send_to(&recipient_port);
378
379 p1.client_ports
380 .borrow_mut()
381 .insert(p1_port.clone(), source_port);
382 })
383 }
384
385 fn o2e_sink(
386 _p1: &Self::Process,
387 _p1_port: &Self::Port,
388 _p2: &Self::External,
389 _p2_port: &Self::Port,
390 shared_handle: String,
391 ) -> syn::Expr {
392 let sink_ident = syn::Ident::new(
393 &format!("__hydro_deploy_{}_sink", &shared_handle),
394 Span::call_site(),
395 );
396 parse_quote!(#sink_ident)
397 }
398
399 fn cluster_ids(
400 of_cluster: usize,
401 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
402 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
403 }
404
405 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
406 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
407 }
408
409 fn cluster_membership_stream(
410 location_id: &LocationId,
411 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
412 {
413 cluster_membership_stream(location_id)
414 }
415}
416
417#[expect(missing_docs, reason = "TODO")]
418pub trait DeployCrateWrapper {
419 fn underlying(&self) -> Arc<RustCrateService>;
420
421 fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
422 self.underlying().stdout()
423 }
424
425 fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
426 self.underlying().stderr()
427 }
428
429 fn stdout_filter(
430 &self,
431 prefix: impl Into<String>,
432 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
433 self.underlying().stdout_filter(prefix.into())
434 }
435
436 fn stderr_filter(
437 &self,
438 prefix: impl Into<String>,
439 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
440 self.underlying().stderr_filter(prefix.into())
441 }
442
443 fn tracing_results(&self) -> Option<TracingResults> {
444 self.underlying().tracing_results().cloned()
445 }
446}
447
448#[expect(missing_docs, reason = "TODO")]
449#[derive(Clone)]
450pub struct TrybuildHost {
451 host: Arc<dyn Host>,
452 display_name: Option<String>,
453 rustflags: Option<String>,
454 additional_hydro_features: Vec<String>,
455 features: Vec<String>,
456 tracing: Option<TracingOptions>,
457 build_envs: Vec<(String, String)>,
458 name_hint: Option<String>,
459 cluster_idx: Option<usize>,
460}
461
462impl From<Arc<dyn Host>> for TrybuildHost {
463 fn from(host: Arc<dyn Host>) -> Self {
464 Self {
465 host,
466 display_name: None,
467 rustflags: None,
468 additional_hydro_features: vec![],
469 features: vec![],
470 tracing: None,
471 build_envs: vec![],
472 name_hint: None,
473 cluster_idx: None,
474 }
475 }
476}
477
478impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
479 fn from(host: Arc<H>) -> Self {
480 Self {
481 host,
482 display_name: None,
483 rustflags: None,
484 additional_hydro_features: vec![],
485 features: vec![],
486 tracing: None,
487 build_envs: vec![],
488 name_hint: None,
489 cluster_idx: None,
490 }
491 }
492}
493
494#[expect(missing_docs, reason = "TODO")]
495impl TrybuildHost {
496 pub fn new(host: Arc<dyn Host>) -> Self {
497 Self {
498 host,
499 display_name: None,
500 rustflags: None,
501 additional_hydro_features: vec![],
502 features: vec![],
503 tracing: None,
504 build_envs: vec![],
505 name_hint: None,
506 cluster_idx: None,
507 }
508 }
509
510 pub fn display_name(self, display_name: impl Into<String>) -> Self {
511 if self.display_name.is_some() {
512 panic!("{} already set", name_of!(display_name in Self));
513 }
514
515 Self {
516 display_name: Some(display_name.into()),
517 ..self
518 }
519 }
520
521 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
522 if self.rustflags.is_some() {
523 panic!("{} already set", name_of!(rustflags in Self));
524 }
525
526 Self {
527 rustflags: Some(rustflags.into()),
528 ..self
529 }
530 }
531
532 pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
533 Self {
534 additional_hydro_features,
535 ..self
536 }
537 }
538
539 pub fn features(self, features: Vec<String>) -> Self {
540 Self {
541 features: self.features.into_iter().chain(features).collect(),
542 ..self
543 }
544 }
545
546 pub fn tracing(self, tracing: TracingOptions) -> Self {
547 if self.tracing.is_some() {
548 panic!("{} already set", name_of!(tracing in Self));
549 }
550
551 Self {
552 tracing: Some(tracing),
553 ..self
554 }
555 }
556
557 pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
558 Self {
559 build_envs: self
560 .build_envs
561 .into_iter()
562 .chain(std::iter::once((key.into(), value.into())))
563 .collect(),
564 ..self
565 }
566 }
567}
568
569impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
570 type ProcessSpec = TrybuildHost;
571 fn into_process_spec(self) -> TrybuildHost {
572 TrybuildHost {
573 host: self,
574 display_name: None,
575 rustflags: None,
576 additional_hydro_features: vec![],
577 features: vec![],
578 tracing: None,
579 build_envs: vec![],
580 name_hint: None,
581 cluster_idx: None,
582 }
583 }
584}
585
586impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
587 type ProcessSpec = TrybuildHost;
588 fn into_process_spec(self) -> TrybuildHost {
589 TrybuildHost {
590 host: self,
591 display_name: None,
592 rustflags: None,
593 additional_hydro_features: vec![],
594 features: vec![],
595 tracing: None,
596 build_envs: vec![],
597 name_hint: None,
598 cluster_idx: None,
599 }
600 }
601}
602
603#[expect(missing_docs, reason = "TODO")]
604#[derive(Clone)]
605pub struct DeployExternal {
606 next_port: Rc<RefCell<usize>>,
607 host: Arc<dyn Host>,
608 underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
609 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
610 allocated_ports: Rc<RefCell<HashMap<usize, String>>>,
611}
612
613impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
614 fn register(&self, key: usize, port: <HydroDeploy as Deploy>::Port) {
615 assert!(
616 self.allocated_ports
617 .borrow_mut()
618 .insert(key, port.clone())
619 .is_none_or(|old| old == port)
620 );
621 }
622
623 fn raw_port(&self, key: usize) -> <HydroDeploy as Deploy<'_>>::ExternalRawPort {
624 self.client_ports
625 .borrow()
626 .get(self.allocated_ports.borrow().get(&key).unwrap())
627 .unwrap()
628 .clone()
629 }
630
631 fn as_bytes_bidi(
632 &self,
633 key: usize,
634 ) -> impl Future<
635 Output = (
636 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
637 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
638 ),
639 > + 'a {
640 let port = self.raw_port(key);
641
642 async move {
643 let (source, sink) = port.connect().await.into_source_sink();
644 (
645 Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
646 Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
647 )
648 }
649 }
650
651 fn as_bincode_bidi<InT, OutT>(
652 &self,
653 key: usize,
654 ) -> impl Future<
655 Output = (
656 Pin<Box<dyn Stream<Item = OutT>>>,
657 Pin<Box<dyn Sink<InT, Error = Error>>>,
658 ),
659 > + 'a
660 where
661 InT: Serialize + 'static,
662 OutT: DeserializeOwned + 'static,
663 {
664 let port = self.raw_port(key);
665 async move {
666 let (source, sink) = port.connect().await.into_source_sink();
667 (
668 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
669 as Pin<Box<dyn Stream<Item = OutT>>>,
670 Box::pin(
671 sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
672 ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
673 )
674 }
675 }
676
677 fn as_bincode_sink<T: Serialize + 'static>(
678 &self,
679 key: usize,
680 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
681 let port = self.raw_port(key);
682 async move {
683 let sink = port.connect().await.into_sink();
684 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
685 as Pin<Box<dyn Sink<T, Error = Error>>>
686 }
687 }
688
689 fn as_bincode_source<T: DeserializeOwned + 'static>(
690 &self,
691 key: usize,
692 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
693 let port = self.raw_port(key);
694 async move {
695 let source = port.connect().await.into_source();
696 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
697 as Pin<Box<dyn Stream<Item = T>>>
698 }
699 }
700}
701
702impl Node for DeployExternal {
703 type Port = String;
704 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
705 type InstantiateEnv = Deployment;
706
707 fn next_port(&self) -> Self::Port {
708 let next_port = *self.next_port.borrow();
709 *self.next_port.borrow_mut() += 1;
710
711 format!("port_{}", next_port)
712 }
713
714 fn instantiate(
715 &self,
716 env: &mut Self::InstantiateEnv,
717 _meta: &mut Self::Meta,
718 _graph: DfirGraph,
719 _extra_stmts: Vec<syn::Stmt>,
720 ) {
721 let service = env.CustomService(self.host.clone(), vec![]);
722 *self.underlying.borrow_mut() = Some(service);
723 }
724
725 fn update_meta(&self, _meta: &Self::Meta) {}
726}
727
728impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
729 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
730 DeployExternal {
731 next_port: Rc::new(RefCell::new(0)),
732 host: self,
733 underlying: Rc::new(RefCell::new(None)),
734 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
735 client_ports: Rc::new(RefCell::new(HashMap::new())),
736 }
737 }
738}
739
740impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
741 fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
742 DeployExternal {
743 next_port: Rc::new(RefCell::new(0)),
744 host: self,
745 underlying: Rc::new(RefCell::new(None)),
746 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
747 client_ports: Rc::new(RefCell::new(HashMap::new())),
748 }
749 }
750}
751
752pub(crate) enum CrateOrTrybuild {
753 Crate(RustCrate, Arc<dyn Host>),
754 Trybuild(TrybuildHost),
755}
756
757#[expect(missing_docs, reason = "TODO")]
758#[derive(Clone)]
759pub struct DeployNode {
760 id: usize,
761 next_port: Rc<RefCell<usize>>,
762 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
763 underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
764}
765
766impl DeployCrateWrapper for DeployNode {
767 fn underlying(&self) -> Arc<RustCrateService> {
768 Arc::clone(self.underlying.borrow().as_ref().unwrap())
769 }
770}
771
772impl Node for DeployNode {
773 type Port = String;
774 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
775 type InstantiateEnv = Deployment;
776
777 fn next_port(&self) -> String {
778 let next_port = *self.next_port.borrow();
779 *self.next_port.borrow_mut() += 1;
780
781 format!("port_{}", next_port)
782 }
783
784 fn update_meta(&self, meta: &Self::Meta) {
785 let underlying_node = self.underlying.borrow();
786 underlying_node.as_ref().unwrap().update_meta(HydroMeta {
787 clusters: meta.clone(),
788 cluster_id: None,
789 subgraph_id: self.id,
790 });
791 }
792
793 fn instantiate(
794 &self,
795 env: &mut Self::InstantiateEnv,
796 _meta: &mut Self::Meta,
797 graph: DfirGraph,
798 extra_stmts: Vec<syn::Stmt>,
799 ) {
800 let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
801 CrateOrTrybuild::Crate(c, host) => (c, host),
802 CrateOrTrybuild::Trybuild(trybuild) => {
803 let (bin_name, config) =
804 create_graph_trybuild(graph, extra_stmts, &trybuild.name_hint, false);
805 let host = trybuild.host.clone();
806 (
807 create_trybuild_service(
808 trybuild,
809 &config.project_dir,
810 &config.target_dir,
811 &config.features,
812 &bin_name,
813 ),
814 host,
815 )
816 }
817 };
818
819 *self.underlying.borrow_mut() = Some(env.add_service(service, host));
820 }
821}
822
823#[expect(missing_docs, reason = "TODO")]
824#[derive(Clone)]
825pub struct DeployClusterNode {
826 underlying: Arc<RustCrateService>,
827}
828
829impl DeployCrateWrapper for DeployClusterNode {
830 fn underlying(&self) -> Arc<RustCrateService> {
831 self.underlying.clone()
832 }
833}
834#[expect(missing_docs, reason = "TODO")]
835#[derive(Clone)]
836pub struct DeployCluster {
837 id: usize,
838 next_port: Rc<RefCell<usize>>,
839 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
840 members: Rc<RefCell<Vec<DeployClusterNode>>>,
841 name_hint: Option<String>,
842}
843
844impl DeployCluster {
845 #[expect(missing_docs, reason = "TODO")]
846 pub fn members(&self) -> Vec<DeployClusterNode> {
847 self.members.borrow().clone()
848 }
849}
850
851impl Node for DeployCluster {
852 type Port = String;
853 type Meta = HashMap<usize, Vec<TaglessMemberId>>;
854 type InstantiateEnv = Deployment;
855
856 fn next_port(&self) -> String {
857 let next_port = *self.next_port.borrow();
858 *self.next_port.borrow_mut() += 1;
859
860 format!("port_{}", next_port)
861 }
862
863 fn instantiate(
864 &self,
865 env: &mut Self::InstantiateEnv,
866 meta: &mut Self::Meta,
867 graph: DfirGraph,
868 extra_stmts: Vec<syn::Stmt>,
869 ) {
870 let has_trybuild = self
871 .cluster_spec
872 .borrow()
873 .as_ref()
874 .unwrap()
875 .iter()
876 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
877
878 let maybe_trybuild = if has_trybuild {
879 Some(create_graph_trybuild(
880 graph,
881 extra_stmts,
882 &self.name_hint,
883 false,
884 ))
885 } else {
886 None
887 };
888
889 let cluster_nodes = self
890 .cluster_spec
891 .borrow_mut()
892 .take()
893 .unwrap()
894 .into_iter()
895 .map(|spec| {
896 let (service, host) = match spec {
897 CrateOrTrybuild::Crate(c, host) => (c, host),
898 CrateOrTrybuild::Trybuild(trybuild) => {
899 let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
900 let host = trybuild.host.clone();
901 (
902 create_trybuild_service(
903 trybuild,
904 &config.project_dir,
905 &config.target_dir,
906 &config.features,
907 bin_name,
908 ),
909 host,
910 )
911 }
912 };
913
914 env.add_service(service, host)
915 })
916 .collect::<Vec<_>>();
917 meta.insert(
918 self.id,
919 (0..(cluster_nodes.len() as u32))
920 .map(TaglessMemberId::from_raw_id)
921 .collect(),
922 );
923 *self.members.borrow_mut() = cluster_nodes
924 .into_iter()
925 .map(|n| DeployClusterNode { underlying: n })
926 .collect();
927 }
928
929 fn update_meta(&self, meta: &Self::Meta) {
930 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
931 node.underlying.update_meta(HydroMeta {
932 clusters: meta.clone(),
933 cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
934 subgraph_id: self.id,
935 });
936 }
937 }
938}
939
940#[expect(missing_docs, reason = "TODO")]
941#[derive(Clone)]
942pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
943
944impl DeployProcessSpec {
945 #[expect(missing_docs, reason = "TODO")]
946 pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
947 Self(t, host)
948 }
949}
950
951impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
952 fn build(self, id: usize, _name_hint: &str) -> DeployNode {
953 DeployNode {
954 id,
955 next_port: Rc::new(RefCell::new(0)),
956 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
957 underlying: Rc::new(RefCell::new(None)),
958 }
959 }
960}
961
962impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
963 fn build(mut self, id: usize, name_hint: &str) -> DeployNode {
964 self.name_hint = Some(format!("{} (process {id})", name_hint));
965 DeployNode {
966 id,
967 next_port: Rc::new(RefCell::new(0)),
968 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
969 underlying: Rc::new(RefCell::new(None)),
970 }
971 }
972}
973
974#[expect(missing_docs, reason = "TODO")]
975#[derive(Clone)]
976pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
977
978impl DeployClusterSpec {
979 #[expect(missing_docs, reason = "TODO")]
980 pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
981 Self(crates)
982 }
983}
984
985impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
986 fn build(self, id: usize, _name_hint: &str) -> DeployCluster {
987 DeployCluster {
988 id,
989 next_port: Rc::new(RefCell::new(0)),
990 cluster_spec: Rc::new(RefCell::new(Some(
991 self.0
992 .into_iter()
993 .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
994 .collect(),
995 ))),
996 members: Rc::new(RefCell::new(vec![])),
997 name_hint: None,
998 }
999 }
1000}
1001
1002impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1003 fn build(self, id: usize, name_hint: &str) -> DeployCluster {
1004 let name_hint = format!("{} (cluster {id})", name_hint);
1005 DeployCluster {
1006 id,
1007 next_port: Rc::new(RefCell::new(0)),
1008 cluster_spec: Rc::new(RefCell::new(Some(
1009 self.into_iter()
1010 .enumerate()
1011 .map(|(idx, b)| {
1012 let mut b = b.into();
1013 b.name_hint = Some(name_hint.clone());
1014 b.cluster_idx = Some(idx);
1015 CrateOrTrybuild::Trybuild(b)
1016 })
1017 .collect(),
1018 ))),
1019 members: Rc::new(RefCell::new(vec![])),
1020 name_hint: Some(name_hint),
1021 }
1022 }
1023}
1024
1025fn create_trybuild_service(
1026 trybuild: TrybuildHost,
1027 dir: &std::path::PathBuf,
1028 target_dir: &std::path::PathBuf,
1029 features: &Option<Vec<String>>,
1030 bin_name: &str,
1031) -> RustCrate {
1032 let mut ret = RustCrate::new(dir)
1033 .target_dir(target_dir)
1034 .example(bin_name)
1035 .no_default_features();
1036
1037 if let Some(display_name) = trybuild.display_name {
1038 ret = ret.display_name(display_name);
1039 } else if let Some(name_hint) = trybuild.name_hint {
1040 if let Some(cluster_idx) = trybuild.cluster_idx {
1041 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1042 } else {
1043 ret = ret.display_name(name_hint);
1044 }
1045 }
1046
1047 if let Some(rustflags) = trybuild.rustflags {
1048 ret = ret.rustflags(rustflags);
1049 }
1050
1051 if let Some(tracing) = trybuild.tracing {
1052 ret = ret.tracing(tracing);
1053 }
1054
1055 ret = ret.features(
1056 vec!["hydro___feature_deploy_integration".to_string()]
1057 .into_iter()
1058 .chain(
1059 trybuild
1060 .additional_hydro_features
1061 .into_iter()
1062 .map(|runtime_feature| {
1063 assert!(
1064 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1065 "{runtime_feature} is not a valid Hydro runtime feature"
1066 );
1067 format!("hydro___feature_{runtime_feature}")
1068 }),
1069 )
1070 .chain(trybuild.features),
1071 );
1072
1073 for (key, value) in trybuild.build_envs {
1074 ret = ret.build_env(key, value);
1075 }
1076
1077 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1078 ret = ret.config("build.incremental = false");
1079
1080 if let Some(features) = features {
1081 ret = ret.features(features);
1082 }
1083
1084 ret
1085}