1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use slotmap::SparseSecondaryMap;
25use stageleft::{QuotedWithContext, RuntimeData};
26use syn::parse_quote;
27
28use super::deploy_runtime::*;
29use crate::compile::builder::ExternalPortId;
30use crate::compile::deploy_provider::{
31 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
32};
33use crate::compile::trybuild::generate::{
34 HYDRO_RUNTIME_FEATURES, LinkingMode, create_graph_trybuild,
35};
36use crate::location::dynamic::LocationId;
37use crate::location::member_id::TaglessMemberId;
38use crate::location::{LocationKey, MembershipEvent, NetworkHint};
39use crate::staging_util::get_this_crate;
40
41pub enum HydroDeploy {}
46
47impl<'a> Deploy<'a> for HydroDeploy {
48 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
50 type InstantiateEnv = Deployment;
51
52 type Process = DeployNode;
53 type Cluster = DeployCluster;
54 type External = DeployExternal;
55
56 fn o2o_sink_source(
57 _p1: &Self::Process,
58 p1_port: &<Self::Process as Node>::Port,
59 _p2: &Self::Process,
60 p2_port: &<Self::Process as Node>::Port,
61 ) -> (syn::Expr, syn::Expr) {
62 let p1_port = p1_port.as_str();
63 let p2_port = p2_port.as_str();
64 deploy_o2o(
65 RuntimeData::new("__hydro_lang_trybuild_cli"),
66 p1_port,
67 p2_port,
68 )
69 }
70
71 fn o2o_connect(
72 p1: &Self::Process,
73 p1_port: &<Self::Process as Node>::Port,
74 p2: &Self::Process,
75 p2_port: &<Self::Process as Node>::Port,
76 ) -> Box<dyn FnOnce()> {
77 let p1 = p1.clone();
78 let p1_port = p1_port.clone();
79 let p2 = p2.clone();
80 let p2_port = p2_port.clone();
81
82 Box::new(move || {
83 let self_underlying_borrow = p1.underlying.borrow();
84 let self_underlying = self_underlying_borrow.as_ref().unwrap();
85 let source_port = self_underlying.get_port(p1_port.clone());
86
87 let other_underlying_borrow = p2.underlying.borrow();
88 let other_underlying = other_underlying_borrow.as_ref().unwrap();
89 let recipient_port = other_underlying.get_port(p2_port.clone());
90
91 source_port.send_to(&recipient_port)
92 })
93 }
94
95 fn o2m_sink_source(
96 _p1: &Self::Process,
97 p1_port: &<Self::Process as Node>::Port,
98 _c2: &Self::Cluster,
99 c2_port: &<Self::Cluster as Node>::Port,
100 ) -> (syn::Expr, syn::Expr) {
101 let p1_port = p1_port.as_str();
102 let c2_port = c2_port.as_str();
103 deploy_o2m(
104 RuntimeData::new("__hydro_lang_trybuild_cli"),
105 p1_port,
106 c2_port,
107 )
108 }
109
110 fn o2m_connect(
111 p1: &Self::Process,
112 p1_port: &<Self::Process as Node>::Port,
113 c2: &Self::Cluster,
114 c2_port: &<Self::Cluster as Node>::Port,
115 ) -> Box<dyn FnOnce()> {
116 let p1 = p1.clone();
117 let p1_port = p1_port.clone();
118 let c2 = c2.clone();
119 let c2_port = c2_port.clone();
120
121 Box::new(move || {
122 let self_underlying_borrow = p1.underlying.borrow();
123 let self_underlying = self_underlying_borrow.as_ref().unwrap();
124 let source_port = self_underlying.get_port(p1_port.clone());
125
126 let recipient_port = DemuxSink {
127 demux: c2
128 .members
129 .borrow()
130 .iter()
131 .enumerate()
132 .map(|(id, c)| {
133 (
134 id as u32,
135 Arc::new(c.underlying.get_port(c2_port.clone()))
136 as Arc<dyn RustCrateSink + 'static>,
137 )
138 })
139 .collect(),
140 };
141
142 source_port.send_to(&recipient_port)
143 })
144 }
145
146 fn m2o_sink_source(
147 _c1: &Self::Cluster,
148 c1_port: &<Self::Cluster as Node>::Port,
149 _p2: &Self::Process,
150 p2_port: &<Self::Process as Node>::Port,
151 ) -> (syn::Expr, syn::Expr) {
152 let c1_port = c1_port.as_str();
153 let p2_port = p2_port.as_str();
154 deploy_m2o(
155 RuntimeData::new("__hydro_lang_trybuild_cli"),
156 c1_port,
157 p2_port,
158 )
159 }
160
161 fn m2o_connect(
162 c1: &Self::Cluster,
163 c1_port: &<Self::Cluster as Node>::Port,
164 p2: &Self::Process,
165 p2_port: &<Self::Process as Node>::Port,
166 ) -> Box<dyn FnOnce()> {
167 let c1 = c1.clone();
168 let c1_port = c1_port.clone();
169 let p2 = p2.clone();
170 let p2_port = p2_port.clone();
171
172 Box::new(move || {
173 let other_underlying_borrow = p2.underlying.borrow();
174 let other_underlying = other_underlying_borrow.as_ref().unwrap();
175 let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
176
177 for (i, node) in c1.members.borrow().iter().enumerate() {
178 let source_port = node.underlying.get_port(c1_port.clone());
179
180 TaggedSource {
181 source: Arc::new(source_port),
182 tag: i as u32,
183 }
184 .send_to(&recipient_port);
185 }
186 })
187 }
188
189 fn m2m_sink_source(
190 _c1: &Self::Cluster,
191 c1_port: &<Self::Cluster as Node>::Port,
192 _c2: &Self::Cluster,
193 c2_port: &<Self::Cluster as Node>::Port,
194 ) -> (syn::Expr, syn::Expr) {
195 let c1_port = c1_port.as_str();
196 let c2_port = c2_port.as_str();
197 deploy_m2m(
198 RuntimeData::new("__hydro_lang_trybuild_cli"),
199 c1_port,
200 c2_port,
201 )
202 }
203
204 fn m2m_connect(
205 c1: &Self::Cluster,
206 c1_port: &<Self::Cluster as Node>::Port,
207 c2: &Self::Cluster,
208 c2_port: &<Self::Cluster as Node>::Port,
209 ) -> Box<dyn FnOnce()> {
210 let c1 = c1.clone();
211 let c1_port = c1_port.clone();
212 let c2 = c2.clone();
213 let c2_port = c2_port.clone();
214
215 Box::new(move || {
216 for (i, sender) in c1.members.borrow().iter().enumerate() {
217 let source_port = sender.underlying.get_port(c1_port.clone());
218
219 let recipient_port = DemuxSink {
220 demux: c2
221 .members
222 .borrow()
223 .iter()
224 .enumerate()
225 .map(|(id, c)| {
226 (
227 id as u32,
228 Arc::new(c.underlying.get_port(c2_port.clone()).merge())
229 as Arc<dyn RustCrateSink + 'static>,
230 )
231 })
232 .collect(),
233 };
234
235 TaggedSource {
236 source: Arc::new(source_port),
237 tag: i as u32,
238 }
239 .send_to(&recipient_port);
240 }
241 })
242 }
243
244 fn e2o_many_source(
245 extra_stmts: &mut Vec<syn::Stmt>,
246 _p2: &Self::Process,
247 p2_port: &<Self::Process as Node>::Port,
248 codec_type: &syn::Type,
249 shared_handle: String,
250 ) -> syn::Expr {
251 let connect_ident = syn::Ident::new(
252 &format!("__hydro_deploy_many_{}_connect", &shared_handle),
253 Span::call_site(),
254 );
255 let source_ident = syn::Ident::new(
256 &format!("__hydro_deploy_many_{}_source", &shared_handle),
257 Span::call_site(),
258 );
259 let sink_ident = syn::Ident::new(
260 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
261 Span::call_site(),
262 );
263 let membership_ident = syn::Ident::new(
264 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
265 Span::call_site(),
266 );
267
268 let root = get_this_crate();
269
270 extra_stmts.push(syn::parse_quote! {
271 let #connect_ident = __hydro_lang_trybuild_cli
272 .port(#p2_port)
273 .connect::<#root::runtime_support::hydro_deploy_integration::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
274 });
275
276 extra_stmts.push(syn::parse_quote! {
277 let #source_ident = #connect_ident.source;
278 });
279
280 extra_stmts.push(syn::parse_quote! {
281 let #sink_ident = #connect_ident.sink;
282 });
283
284 extra_stmts.push(syn::parse_quote! {
285 let #membership_ident = #connect_ident.membership;
286 });
287
288 parse_quote!(#source_ident)
289 }
290
291 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
292 let sink_ident = syn::Ident::new(
293 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
294 Span::call_site(),
295 );
296 parse_quote!(#sink_ident)
297 }
298
299 fn e2o_source(
300 extra_stmts: &mut Vec<syn::Stmt>,
301 _p1: &Self::External,
302 _p1_port: &<Self::External as Node>::Port,
303 _p2: &Self::Process,
304 p2_port: &<Self::Process as Node>::Port,
305 codec_type: &syn::Type,
306 shared_handle: String,
307 ) -> syn::Expr {
308 let connect_ident = syn::Ident::new(
309 &format!("__hydro_deploy_{}_connect", &shared_handle),
310 Span::call_site(),
311 );
312 let source_ident = syn::Ident::new(
313 &format!("__hydro_deploy_{}_source", &shared_handle),
314 Span::call_site(),
315 );
316 let sink_ident = syn::Ident::new(
317 &format!("__hydro_deploy_{}_sink", &shared_handle),
318 Span::call_site(),
319 );
320
321 let root = get_this_crate();
322
323 extra_stmts.push(syn::parse_quote! {
324 let #connect_ident = __hydro_lang_trybuild_cli
325 .port(#p2_port)
326 .connect::<#root::runtime_support::hydro_deploy_integration::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
327 });
328
329 extra_stmts.push(syn::parse_quote! {
330 let #source_ident = #connect_ident.source;
331 });
332
333 extra_stmts.push(syn::parse_quote! {
334 let #sink_ident = #connect_ident.sink;
335 });
336
337 parse_quote!(#source_ident)
338 }
339
340 fn e2o_connect(
341 p1: &Self::External,
342 p1_port: &<Self::External as Node>::Port,
343 p2: &Self::Process,
344 p2_port: &<Self::Process as Node>::Port,
345 _many: bool,
346 server_hint: NetworkHint,
347 ) -> Box<dyn FnOnce()> {
348 let p1 = p1.clone();
349 let p1_port = p1_port.clone();
350 let p2 = p2.clone();
351 let p2_port = p2_port.clone();
352
353 Box::new(move || {
354 let self_underlying_borrow = p1.underlying.borrow();
355 let self_underlying = self_underlying_borrow.as_ref().unwrap();
356 let source_port = self_underlying.declare_many_client();
357
358 let other_underlying_borrow = p2.underlying.borrow();
359 let other_underlying = other_underlying_borrow.as_ref().unwrap();
360 let recipient_port = other_underlying.get_port_with_hint(
361 p2_port.clone(),
362 match server_hint {
363 NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
364 NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
365 },
366 );
367
368 source_port.send_to(&recipient_port);
369
370 p1.client_ports
371 .borrow_mut()
372 .insert(p1_port.clone(), source_port);
373 })
374 }
375
376 fn o2e_sink(
377 _p1: &Self::Process,
378 _p1_port: &<Self::Process as Node>::Port,
379 _p2: &Self::External,
380 _p2_port: &<Self::External as Node>::Port,
381 shared_handle: String,
382 ) -> syn::Expr {
383 let sink_ident = syn::Ident::new(
384 &format!("__hydro_deploy_{}_sink", &shared_handle),
385 Span::call_site(),
386 );
387 parse_quote!(#sink_ident)
388 }
389
390 fn cluster_ids(
391 of_cluster: LocationKey,
392 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
393 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
394 }
395
396 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
397 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
398 }
399
400 fn cluster_membership_stream(
401 location_id: &LocationId,
402 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
403 {
404 cluster_membership_stream(location_id)
405 }
406}
407
408#[expect(missing_docs, reason = "TODO")]
409pub trait DeployCrateWrapper {
410 fn underlying(&self) -> Arc<RustCrateService>;
411
412 fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
413 self.underlying().stdout()
414 }
415
416 fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
417 self.underlying().stderr()
418 }
419
420 fn stdout_filter(
421 &self,
422 prefix: impl Into<String>,
423 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
424 self.underlying().stdout_filter(prefix.into())
425 }
426
427 fn stderr_filter(
428 &self,
429 prefix: impl Into<String>,
430 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
431 self.underlying().stderr_filter(prefix.into())
432 }
433}
434
435#[expect(missing_docs, reason = "TODO")]
436#[derive(Clone)]
437pub struct TrybuildHost {
438 host: Arc<dyn Host>,
439 display_name: Option<String>,
440 rustflags: Option<String>,
441 profile: Option<String>,
442 additional_hydro_features: Vec<String>,
443 features: Vec<String>,
444 tracing: Option<TracingOptions>,
445 build_envs: Vec<(String, String)>,
446 name_hint: Option<String>,
447 cluster_idx: Option<usize>,
448}
449
450impl From<Arc<dyn Host>> for TrybuildHost {
451 fn from(host: Arc<dyn Host>) -> Self {
452 Self {
453 host,
454 display_name: None,
455 rustflags: None,
456 profile: None,
457 additional_hydro_features: vec![],
458 features: vec![],
459 tracing: None,
460 build_envs: vec![],
461 name_hint: None,
462 cluster_idx: None,
463 }
464 }
465}
466
467impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
468 fn from(host: Arc<H>) -> Self {
469 Self {
470 host,
471 display_name: None,
472 rustflags: None,
473 profile: None,
474 additional_hydro_features: vec![],
475 features: vec![],
476 tracing: None,
477 build_envs: vec![],
478 name_hint: None,
479 cluster_idx: None,
480 }
481 }
482}
483
484#[expect(missing_docs, reason = "TODO")]
485impl TrybuildHost {
486 pub fn new(host: Arc<dyn Host>) -> Self {
487 Self {
488 host,
489 display_name: None,
490 rustflags: None,
491 profile: None,
492 additional_hydro_features: vec![],
493 features: vec![],
494 tracing: None,
495 build_envs: vec![],
496 name_hint: None,
497 cluster_idx: None,
498 }
499 }
500
501 pub fn display_name(self, display_name: impl Into<String>) -> Self {
502 if self.display_name.is_some() {
503 panic!("{} already set", name_of!(display_name in Self));
504 }
505
506 Self {
507 display_name: Some(display_name.into()),
508 ..self
509 }
510 }
511
512 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
513 if self.rustflags.is_some() {
514 panic!("{} already set", name_of!(rustflags in Self));
515 }
516
517 Self {
518 rustflags: Some(rustflags.into()),
519 ..self
520 }
521 }
522
523 pub fn profile(self, profile: impl Into<String>) -> Self {
524 if self.profile.is_some() {
525 panic!("{} already set", name_of!(profile in Self));
526 }
527
528 Self {
529 profile: Some(profile.into()),
530 ..self
531 }
532 }
533
534 pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
535 Self {
536 additional_hydro_features,
537 ..self
538 }
539 }
540
541 pub fn features(self, features: Vec<String>) -> Self {
542 Self {
543 features: self.features.into_iter().chain(features).collect(),
544 ..self
545 }
546 }
547
548 pub fn tracing(self, tracing: TracingOptions) -> Self {
549 if self.tracing.is_some() {
550 panic!("{} already set", name_of!(tracing in Self));
551 }
552
553 Self {
554 tracing: Some(tracing),
555 ..self
556 }
557 }
558
559 pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
560 Self {
561 build_envs: self
562 .build_envs
563 .into_iter()
564 .chain(std::iter::once((key.into(), value.into())))
565 .collect(),
566 ..self
567 }
568 }
569}
570
571impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
572 type ProcessSpec = TrybuildHost;
573 fn into_process_spec(self) -> TrybuildHost {
574 TrybuildHost {
575 host: self,
576 display_name: None,
577 rustflags: None,
578 profile: None,
579 additional_hydro_features: vec![],
580 features: vec![],
581 tracing: None,
582 build_envs: vec![],
583 name_hint: None,
584 cluster_idx: None,
585 }
586 }
587}
588
589impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
590 type ProcessSpec = TrybuildHost;
591 fn into_process_spec(self) -> TrybuildHost {
592 TrybuildHost {
593 host: self,
594 display_name: None,
595 rustflags: None,
596 profile: None,
597 additional_hydro_features: vec![],
598 features: vec![],
599 tracing: None,
600 build_envs: vec![],
601 name_hint: None,
602 cluster_idx: None,
603 }
604 }
605}
606
607#[expect(missing_docs, reason = "TODO")]
608#[derive(Clone)]
609pub struct DeployExternal {
610 next_port: Rc<RefCell<usize>>,
611 host: Arc<dyn Host>,
612 underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
613 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
614 allocated_ports: Rc<RefCell<HashMap<ExternalPortId, String>>>,
615}
616
617impl DeployExternal {
618 pub(crate) fn raw_port(&self, external_port_id: ExternalPortId) -> CustomClientPort {
619 self.client_ports
620 .borrow()
621 .get(
622 self.allocated_ports
623 .borrow()
624 .get(&external_port_id)
625 .unwrap(),
626 )
627 .unwrap()
628 .clone()
629 }
630}
631
632impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
633 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
634 assert!(
635 self.allocated_ports
636 .borrow_mut()
637 .insert(external_port_id, port.clone())
638 .is_none_or(|old| old == port)
639 );
640 }
641
642 fn as_bytes_bidi(
643 &self,
644 external_port_id: ExternalPortId,
645 ) -> impl Future<
646 Output = (
647 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
648 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
649 ),
650 > + 'a {
651 let port = self.raw_port(external_port_id);
652
653 async move {
654 let (source, sink) = port.connect().await.into_source_sink();
655 (
656 Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
657 Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
658 )
659 }
660 }
661
662 fn as_bincode_bidi<InT, OutT>(
663 &self,
664 external_port_id: ExternalPortId,
665 ) -> impl Future<
666 Output = (
667 Pin<Box<dyn Stream<Item = OutT>>>,
668 Pin<Box<dyn Sink<InT, Error = Error>>>,
669 ),
670 > + 'a
671 where
672 InT: Serialize + 'static,
673 OutT: DeserializeOwned + 'static,
674 {
675 let port = self.raw_port(external_port_id);
676 async move {
677 let (source, sink) = port.connect().await.into_source_sink();
678 (
679 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
680 as Pin<Box<dyn Stream<Item = OutT>>>,
681 Box::pin(
682 sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
683 ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
684 )
685 }
686 }
687
688 fn as_bincode_sink<T: Serialize + 'static>(
689 &self,
690 external_port_id: ExternalPortId,
691 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
692 let port = self.raw_port(external_port_id);
693 async move {
694 let sink = port.connect().await.into_sink();
695 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
696 as Pin<Box<dyn Sink<T, Error = Error>>>
697 }
698 }
699
700 fn as_bincode_source<T: DeserializeOwned + 'static>(
701 &self,
702 external_port_id: ExternalPortId,
703 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
704 let port = self.raw_port(external_port_id);
705 async move {
706 let source = port.connect().await.into_source();
707 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
708 as Pin<Box<dyn Stream<Item = T>>>
709 }
710 }
711}
712
713impl Node for DeployExternal {
714 type Port = String;
715 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
717 type InstantiateEnv = Deployment;
718
719 fn next_port(&self) -> Self::Port {
720 let next_port = *self.next_port.borrow();
721 *self.next_port.borrow_mut() += 1;
722
723 format!("port_{}", next_port)
724 }
725
726 fn instantiate(
727 &self,
728 env: &mut Self::InstantiateEnv,
729 _meta: &mut Self::Meta,
730 _graph: DfirGraph,
731 extra_stmts: &[syn::Stmt],
732 sidecars: &[syn::Expr],
733 ) {
734 assert!(extra_stmts.is_empty());
735 assert!(sidecars.is_empty());
736 let service = env.CustomService(self.host.clone(), vec![]);
737 *self.underlying.borrow_mut() = Some(service);
738 }
739
740 fn update_meta(&self, _meta: &Self::Meta) {}
741}
742
743impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
744 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
745 DeployExternal {
746 next_port: Rc::new(RefCell::new(0)),
747 host: self,
748 underlying: Rc::new(RefCell::new(None)),
749 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
750 client_ports: Rc::new(RefCell::new(HashMap::new())),
751 }
752 }
753}
754
755impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
756 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
757 DeployExternal {
758 next_port: Rc::new(RefCell::new(0)),
759 host: self,
760 underlying: Rc::new(RefCell::new(None)),
761 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
762 client_ports: Rc::new(RefCell::new(HashMap::new())),
763 }
764 }
765}
766
767pub(crate) enum CrateOrTrybuild {
768 Crate(RustCrate, Arc<dyn Host>),
769 Trybuild(TrybuildHost),
770}
771
772#[expect(missing_docs, reason = "TODO")]
773#[derive(Clone)]
774pub struct DeployNode {
775 next_port: Rc<RefCell<usize>>,
776 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
777 underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
778}
779
780impl DeployCrateWrapper for DeployNode {
781 fn underlying(&self) -> Arc<RustCrateService> {
782 Arc::clone(self.underlying.borrow().as_ref().unwrap())
783 }
784}
785
786impl Node for DeployNode {
787 type Port = String;
788 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
790 type InstantiateEnv = Deployment;
791
792 fn next_port(&self) -> String {
793 let next_port = *self.next_port.borrow();
794 *self.next_port.borrow_mut() += 1;
795
796 format!("port_{}", next_port)
797 }
798
799 fn update_meta(&self, meta: &Self::Meta) {
800 let underlying_node = self.underlying.borrow();
801 underlying_node.as_ref().unwrap().update_meta(HydroMeta {
802 clusters: meta.clone(),
803 cluster_id: None,
804 });
805 }
806
807 fn instantiate(
808 &self,
809 env: &mut Self::InstantiateEnv,
810 _meta: &mut Self::Meta,
811 graph: DfirGraph,
812 extra_stmts: &[syn::Stmt],
813 sidecars: &[syn::Expr],
814 ) {
815 let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
816 CrateOrTrybuild::Crate(c, host) => (c, host),
817 CrateOrTrybuild::Trybuild(trybuild) => {
818 let linking_mode = if !cfg!(target_os = "windows")
820 && trybuild.host.target_type() == hydro_deploy::HostTargetType::Local
821 {
822 LinkingMode::Dynamic
825 } else {
826 LinkingMode::Static
827 };
828 let (bin_name, config) = create_graph_trybuild(
829 graph,
830 extra_stmts,
831 sidecars,
832 trybuild.name_hint.as_deref(),
833 crate::compile::trybuild::generate::DeployMode::HydroDeploy,
834 linking_mode,
835 );
836 let host = trybuild.host.clone();
837 (
838 create_trybuild_service(
839 trybuild,
840 &config.project_dir,
841 &config.target_dir,
842 config.features.as_deref(),
843 &bin_name,
844 &config.linking_mode,
845 ),
846 host,
847 )
848 }
849 };
850
851 *self.underlying.borrow_mut() = Some(env.add_service(service, host));
852 }
853}
854
855#[expect(missing_docs, reason = "TODO")]
856#[derive(Clone)]
857pub struct DeployClusterNode {
858 underlying: Arc<RustCrateService>,
859}
860
861impl DeployCrateWrapper for DeployClusterNode {
862 fn underlying(&self) -> Arc<RustCrateService> {
863 self.underlying.clone()
864 }
865}
866#[expect(missing_docs, reason = "TODO")]
867#[derive(Clone)]
868pub struct DeployCluster {
869 key: LocationKey,
870 next_port: Rc<RefCell<usize>>,
871 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
872 members: Rc<RefCell<Vec<DeployClusterNode>>>,
873 name_hint: Option<String>,
874}
875
876impl DeployCluster {
877 #[expect(missing_docs, reason = "TODO")]
878 pub fn members(&self) -> Vec<DeployClusterNode> {
879 self.members.borrow().clone()
880 }
881}
882
883impl Node for DeployCluster {
884 type Port = String;
885 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
887 type InstantiateEnv = Deployment;
888
889 fn next_port(&self) -> String {
890 let next_port = *self.next_port.borrow();
891 *self.next_port.borrow_mut() += 1;
892
893 format!("port_{}", next_port)
894 }
895
896 fn instantiate(
897 &self,
898 env: &mut Self::InstantiateEnv,
899 meta: &mut Self::Meta,
900 graph: DfirGraph,
901 extra_stmts: &[syn::Stmt],
902 sidecars: &[syn::Expr],
903 ) {
904 let has_trybuild = self
905 .cluster_spec
906 .borrow()
907 .as_ref()
908 .unwrap()
909 .iter()
910 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
911
912 let linking_mode = if !cfg!(target_os = "windows")
914 && self
915 .cluster_spec
916 .borrow()
917 .as_ref()
918 .unwrap()
919 .iter()
920 .all(|spec| match spec {
921 CrateOrTrybuild::Crate(_, _) => true, CrateOrTrybuild::Trybuild(t) => {
923 t.host.target_type() == hydro_deploy::HostTargetType::Local
924 }
925 }) {
926 LinkingMode::Dynamic
928 } else {
929 LinkingMode::Static
930 };
931
932 let maybe_trybuild = if has_trybuild {
933 Some(create_graph_trybuild(
934 graph,
935 extra_stmts,
936 sidecars,
937 self.name_hint.as_deref(),
938 crate::compile::trybuild::generate::DeployMode::HydroDeploy,
939 linking_mode,
940 ))
941 } else {
942 None
943 };
944
945 let cluster_nodes = self
946 .cluster_spec
947 .borrow_mut()
948 .take()
949 .unwrap()
950 .into_iter()
951 .map(|spec| {
952 let (service, host) = match spec {
953 CrateOrTrybuild::Crate(c, host) => (c, host),
954 CrateOrTrybuild::Trybuild(trybuild) => {
955 let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
956 let host = trybuild.host.clone();
957 (
958 create_trybuild_service(
959 trybuild,
960 &config.project_dir,
961 &config.target_dir,
962 config.features.as_deref(),
963 bin_name,
964 &config.linking_mode,
965 ),
966 host,
967 )
968 }
969 };
970
971 env.add_service(service, host)
972 })
973 .collect::<Vec<_>>();
974 meta.insert(
975 self.key,
976 (0..(cluster_nodes.len() as u32))
977 .map(TaglessMemberId::from_raw_id)
978 .collect(),
979 );
980 *self.members.borrow_mut() = cluster_nodes
981 .into_iter()
982 .map(|n| DeployClusterNode { underlying: n })
983 .collect();
984 }
985
986 fn update_meta(&self, meta: &Self::Meta) {
987 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
988 node.underlying.update_meta(HydroMeta {
989 clusters: meta.clone(),
990 cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
991 });
992 }
993 }
994}
995
996#[expect(missing_docs, reason = "TODO")]
997#[derive(Clone)]
998pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
999
1000impl DeployProcessSpec {
1001 #[expect(missing_docs, reason = "TODO")]
1002 pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
1003 Self(t, host)
1004 }
1005}
1006
1007impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
1008 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployNode {
1009 DeployNode {
1010 next_port: Rc::new(RefCell::new(0)),
1011 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
1012 underlying: Rc::new(RefCell::new(None)),
1013 }
1014 }
1015}
1016
1017impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
1018 fn build(mut self, key: LocationKey, name_hint: &str) -> DeployNode {
1019 self.name_hint = Some(format!("{} (process {})", name_hint, key));
1020 DeployNode {
1021 next_port: Rc::new(RefCell::new(0)),
1022 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
1023 underlying: Rc::new(RefCell::new(None)),
1024 }
1025 }
1026}
1027
1028#[expect(missing_docs, reason = "TODO")]
1029#[derive(Clone)]
1030pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1031
1032impl DeployClusterSpec {
1033 #[expect(missing_docs, reason = "TODO")]
1034 pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1035 Self(crates)
1036 }
1037}
1038
1039impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1040 fn build(self, key: LocationKey, _name_hint: &str) -> DeployCluster {
1041 DeployCluster {
1042 key,
1043 next_port: Rc::new(RefCell::new(0)),
1044 cluster_spec: Rc::new(RefCell::new(Some(
1045 self.0
1046 .into_iter()
1047 .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1048 .collect(),
1049 ))),
1050 members: Rc::new(RefCell::new(vec![])),
1051 name_hint: None,
1052 }
1053 }
1054}
1055
1056impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1057 fn build(self, key: LocationKey, name_hint: &str) -> DeployCluster {
1058 let name_hint = format!("{} (cluster {})", name_hint, key);
1059 DeployCluster {
1060 key,
1061 next_port: Rc::new(RefCell::new(0)),
1062 cluster_spec: Rc::new(RefCell::new(Some(
1063 self.into_iter()
1064 .enumerate()
1065 .map(|(idx, b)| {
1066 let mut b = b.into();
1067 b.name_hint = Some(name_hint.clone());
1068 b.cluster_idx = Some(idx);
1069 CrateOrTrybuild::Trybuild(b)
1070 })
1071 .collect(),
1072 ))),
1073 members: Rc::new(RefCell::new(vec![])),
1074 name_hint: Some(name_hint),
1075 }
1076 }
1077}
1078
1079fn create_trybuild_service(
1080 trybuild: TrybuildHost,
1081 dir: &std::path::Path,
1082 target_dir: &std::path::PathBuf,
1083 features: Option<&[String]>,
1084 bin_name: &str,
1085 linking_mode: &LinkingMode,
1086) -> RustCrate {
1087 let crate_dir = match linking_mode {
1089 LinkingMode::Dynamic => dir.join("dylib-examples"),
1090 LinkingMode::Static => dir.to_path_buf(),
1091 };
1092
1093 let mut ret = RustCrate::new(&crate_dir)
1094 .target_dir(target_dir)
1095 .example(bin_name)
1096 .no_default_features();
1097
1098 ret = ret.set_is_dylib(matches!(linking_mode, LinkingMode::Dynamic));
1099
1100 if let Some(display_name) = trybuild.display_name {
1101 ret = ret.display_name(display_name);
1102 } else if let Some(name_hint) = trybuild.name_hint {
1103 if let Some(cluster_idx) = trybuild.cluster_idx {
1104 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1105 } else {
1106 ret = ret.display_name(name_hint);
1107 }
1108 }
1109
1110 if let Some(rustflags) = trybuild.rustflags {
1111 ret = ret.rustflags(rustflags);
1112 }
1113
1114 if let Some(profile) = trybuild.profile {
1115 ret = ret.profile(profile);
1116 }
1117
1118 if let Some(tracing) = trybuild.tracing {
1119 ret = ret.tracing(tracing);
1120 }
1121
1122 ret = ret.features(
1123 vec!["hydro___feature_deploy_integration".to_owned()]
1124 .into_iter()
1125 .chain(
1126 trybuild
1127 .additional_hydro_features
1128 .into_iter()
1129 .map(|runtime_feature| {
1130 assert!(
1131 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1132 "{runtime_feature} is not a valid Hydro runtime feature"
1133 );
1134 format!("hydro___feature_{runtime_feature}")
1135 }),
1136 )
1137 .chain(trybuild.features),
1138 );
1139
1140 for (key, value) in trybuild.build_envs {
1141 ret = ret.build_env(key, value);
1142 }
1143
1144 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1145 ret = ret.config("build.incremental = false");
1146
1147 if let Some(features) = features {
1148 ret = ret.features(features);
1149 }
1150
1151 ret
1152}