1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{
34 ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
35};
36use crate::forward_handle::ForwardRef;
37#[cfg(stageleft_runtime)]
38use crate::forward_handle::{CycleCollection, ForwardHandle};
39use crate::live_collections::boundedness::{Bounded, Unbounded};
40use crate::live_collections::keyed_stream::KeyedStream;
41use crate::live_collections::singleton::Singleton;
42use crate::live_collections::stream::{
43 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
44};
45use crate::location::dynamic::LocationId;
46use crate::location::external_process::{
47 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
48};
49use crate::nondet::NonDet;
50#[cfg(feature = "sim")]
51use crate::sim::SimSender;
52use crate::staging_util::get_this_crate;
53
54pub mod dynamic;
55
56pub mod external_process;
57pub use external_process::External;
58
59pub mod process;
60pub use process::Process;
61
62pub mod cluster;
63pub use cluster::Cluster;
64
65pub mod member_id;
66pub use member_id::{MemberId, TaglessMemberId};
67
68pub mod tick;
69pub use tick::{Atomic, NoTick, Tick};
70
71#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
74pub enum MembershipEvent {
75 Joined,
77 Left,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
87pub enum NetworkHint {
88 Auto,
90 TcpPort(Option<u16>),
95}
96
97pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
98 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
99}
100
101#[stageleft::export(LocationKey)]
102new_key_type! {
103 pub struct LocationKey;
105}
106
107impl std::fmt::Display for LocationKey {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 write!(f, "loc{:?}", self.data()) }
111}
112
113impl std::str::FromStr for LocationKey {
116 type Err = Option<ParseIntError>;
117
118 fn from_str(s: &str) -> Result<Self, Self::Err> {
119 let nvn = s.strip_prefix("loc").ok_or(None)?;
120 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
121 let idx: u64 = idx.parse()?;
122 let ver: u64 = ver.parse()?;
123 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
124 }
125}
126
127impl LocationKey {
128 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
134 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); #[cfg(test)]
138 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); }
140
141impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
143 type O = LocationKey;
144
145 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
146 where
147 Self: Sized,
148 {
149 let root = get_this_crate();
150 let n = Key::data(&self).as_ffi();
151 (
152 QuoteTokens {
153 prelude: None,
154 expr: Some(quote! {
155 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
156 }),
157 },
158 (),
159 )
160 }
161}
162
163#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
165pub enum LocationType {
166 Process,
168 Cluster,
170 External,
172}
173
174#[expect(
188 private_bounds,
189 reason = "only internal Hydro code can define location types"
190)]
191pub trait Location<'a>: dynamic::DynLocation {
192 type Root: Location<'a>;
197
198 fn root(&self) -> Self::Root;
203
204 fn try_tick(&self) -> Option<Tick<Self>> {
211 if Self::is_top_level() {
212 let id = self.flow_state().borrow_mut().next_clock_id();
213 Some(Tick {
214 id,
215 l: self.clone(),
216 })
217 } else {
218 None
219 }
220 }
221
222 fn id(&self) -> LocationId {
224 dynamic::DynLocation::id(self)
225 }
226
227 fn tick(&self) -> Tick<Self>
253 where
254 Self: NoTick,
255 {
256 let id = self.flow_state().borrow_mut().next_clock_id();
257 Tick {
258 id,
259 l: self.clone(),
260 }
261 }
262
263 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
288 where
289 Self: Sized + NoTick,
290 {
291 Stream::new(
292 self.clone(),
293 HydroNode::Source {
294 source: HydroSource::Spin(),
295 metadata: self.new_node_metadata(Stream::<
296 (),
297 Self,
298 Unbounded,
299 TotalOrder,
300 ExactlyOnce,
301 >::collection_kind()),
302 },
303 )
304 }
305
306 fn source_stream<T, E>(
327 &self,
328 e: impl QuotedWithContext<'a, E, Self>,
329 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
330 where
331 E: FuturesStream<Item = T> + Unpin,
332 Self: Sized + NoTick,
333 {
334 let e = e.splice_untyped_ctx(self);
335
336 Stream::new(
337 self.clone(),
338 HydroNode::Source {
339 source: HydroSource::Stream(e.into()),
340 metadata: self.new_node_metadata(Stream::<
341 T,
342 Self,
343 Unbounded,
344 TotalOrder,
345 ExactlyOnce,
346 >::collection_kind()),
347 },
348 )
349 }
350
351 fn source_iter<T, E>(
373 &self,
374 e: impl QuotedWithContext<'a, E, Self>,
375 ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
376 where
377 E: IntoIterator<Item = T>,
378 Self: Sized,
379 {
380 let e = e.splice_typed_ctx(self);
381
382 Stream::new(
383 self.clone(),
384 HydroNode::Source {
385 source: HydroSource::Iter(e.into()),
386 metadata: self.new_node_metadata(
387 Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
388 ),
389 },
390 )
391 }
392
393 fn source_cluster_members<C: 'a>(
427 &self,
428 cluster: &Cluster<'a, C>,
429 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
430 where
431 Self: Sized + NoTick,
432 {
433 Stream::new(
434 self.clone(),
435 HydroNode::Source {
436 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
437 metadata: self.new_node_metadata(Stream::<
438 (TaglessMemberId, MembershipEvent),
439 Self,
440 Unbounded,
441 TotalOrder,
442 ExactlyOnce,
443 >::collection_kind()),
444 },
445 )
446 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
447 .into_keyed()
448 }
449
450 fn source_external_bytes<L>(
458 &self,
459 from: &External<L>,
460 ) -> (
461 ExternalBytesPort,
462 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
463 )
464 where
465 Self: Sized + NoTick,
466 {
467 let (port, stream, sink) =
468 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
469
470 sink.complete(self.source_iter(q!([])));
471
472 (port, stream)
473 }
474
475 #[expect(clippy::type_complexity, reason = "stream markers")]
482 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
483 &self,
484 from: &External<L>,
485 ) -> (
486 ExternalBincodeSink<T, NotMany, O, R>,
487 Stream<T, Self, Unbounded, O, R>,
488 )
489 where
490 Self: Sized + NoTick,
491 T: Serialize + DeserializeOwned,
492 {
493 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
494 sink.complete(self.source_iter(q!([])));
495
496 (
497 ExternalBincodeSink {
498 process_key: from.key,
499 port_id: port.port_id,
500 _phantom: PhantomData,
501 },
502 stream.weaken_ordering().weaken_retries(),
503 )
504 }
505
506 #[cfg(feature = "sim")]
511 #[expect(clippy::type_complexity, reason = "stream markers")]
512 fn sim_input<T, O: Ordering, R: Retries>(
513 &self,
514 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
515 where
516 Self: Sized + NoTick,
517 T: Serialize + DeserializeOwned,
518 {
519 let external_location: External<'a, ()> = External {
520 key: LocationKey::FIRST,
521 flow_state: self.flow_state().clone(),
522 _phantom: PhantomData,
523 };
524
525 let (external, stream) = self.source_external_bincode(&external_location);
526
527 (SimSender(external.port_id, PhantomData), stream)
528 }
529
530 fn embedded_input<T>(
536 &self,
537 name: impl Into<String>,
538 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
539 where
540 Self: Sized + NoTick,
541 {
542 let ident = syn::Ident::new(&name.into(), Span::call_site());
543
544 Stream::new(
545 self.clone(),
546 HydroNode::Source {
547 source: HydroSource::Embedded(ident),
548 metadata: self.new_node_metadata(Stream::<
549 T,
550 Self,
551 Unbounded,
552 TotalOrder,
553 ExactlyOnce,
554 >::collection_kind()),
555 },
556 )
557 }
558
559 fn embedded_singleton_input<T>(&self, name: impl Into<String>) -> Singleton<T, Self, Bounded>
565 where
566 Self: Sized + NoTick,
567 {
568 let ident = syn::Ident::new(&name.into(), Span::call_site());
569
570 Singleton::new(
571 self.clone(),
572 HydroNode::Source {
573 source: HydroSource::EmbeddedSingleton(ident),
574 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
575 },
576 )
577 }
578
579 #[expect(clippy::type_complexity, reason = "stream markers")]
624 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
625 &self,
626 from: &External<L>,
627 port_hint: NetworkHint,
628 ) -> (
629 ExternalBytesPort<NotMany>,
630 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
631 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
632 )
633 where
634 Self: Sized + NoTick,
635 {
636 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
637
638 let (fwd_ref, to_sink) =
639 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
640 let mut flow_state_borrow = self.flow_state().borrow_mut();
641
642 flow_state_borrow.push_root(HydroRoot::SendExternal {
643 to_external_key: from.key,
644 to_port_id: next_external_port_id,
645 to_many: false,
646 unpaired: false,
647 serialize_fn: None,
648 instantiate_fn: DebugInstantiate::Building,
649 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
650 op_metadata: HydroIrOpMetadata::new(),
651 });
652
653 let raw_stream: Stream<
654 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
655 Self,
656 Unbounded,
657 TotalOrder,
658 ExactlyOnce,
659 > = Stream::new(
660 self.clone(),
661 HydroNode::ExternalInput {
662 from_external_key: from.key,
663 from_port_id: next_external_port_id,
664 from_many: false,
665 codec_type: quote_type::<Codec>().into(),
666 port_hint,
667 instantiate_fn: DebugInstantiate::Building,
668 deserialize_fn: None,
669 metadata: self.new_node_metadata(Stream::<
670 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
671 Self,
672 Unbounded,
673 TotalOrder,
674 ExactlyOnce,
675 >::collection_kind()),
676 },
677 );
678
679 (
680 ExternalBytesPort {
681 process_key: from.key,
682 port_id: next_external_port_id,
683 _phantom: PhantomData,
684 },
685 raw_stream.flatten_ordered(),
686 fwd_ref,
687 )
688 }
689
690 #[expect(clippy::type_complexity, reason = "stream markers")]
700 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
701 &self,
702 from: &External<L>,
703 ) -> (
704 ExternalBincodeBidi<InT, OutT, NotMany>,
705 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
706 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
707 )
708 where
709 Self: Sized + NoTick,
710 {
711 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
712
713 let (fwd_ref, to_sink) =
714 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
715 let mut flow_state_borrow = self.flow_state().borrow_mut();
716
717 let root = get_this_crate();
718
719 let out_t_type = quote_type::<OutT>();
720 let ser_fn: syn::Expr = syn::parse_quote! {
721 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
722 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
723 )
724 };
725
726 flow_state_borrow.push_root(HydroRoot::SendExternal {
727 to_external_key: from.key,
728 to_port_id: next_external_port_id,
729 to_many: false,
730 unpaired: false,
731 serialize_fn: Some(ser_fn.into()),
732 instantiate_fn: DebugInstantiate::Building,
733 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
734 op_metadata: HydroIrOpMetadata::new(),
735 });
736
737 let in_t_type = quote_type::<InT>();
738
739 let deser_fn: syn::Expr = syn::parse_quote! {
740 |res| {
741 let b = res.unwrap();
742 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
743 }
744 };
745
746 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
747 self.clone(),
748 HydroNode::ExternalInput {
749 from_external_key: from.key,
750 from_port_id: next_external_port_id,
751 from_many: false,
752 codec_type: quote_type::<LengthDelimitedCodec>().into(),
753 port_hint: NetworkHint::Auto,
754 instantiate_fn: DebugInstantiate::Building,
755 deserialize_fn: Some(deser_fn.into()),
756 metadata: self.new_node_metadata(Stream::<
757 InT,
758 Self,
759 Unbounded,
760 TotalOrder,
761 ExactlyOnce,
762 >::collection_kind()),
763 },
764 );
765
766 (
767 ExternalBincodeBidi {
768 process_key: from.key,
769 port_id: next_external_port_id,
770 _phantom: PhantomData,
771 },
772 raw_stream,
773 fwd_ref,
774 )
775 }
776
777 #[expect(clippy::type_complexity, reason = "stream markers")]
789 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
790 &self,
791 from: &External<L>,
792 port_hint: NetworkHint,
793 ) -> (
794 ExternalBytesPort<Many>,
795 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
796 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
797 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
798 )
799 where
800 Self: Sized + NoTick,
801 {
802 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
803
804 let (fwd_ref, to_sink) =
805 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
806 let mut flow_state_borrow = self.flow_state().borrow_mut();
807
808 flow_state_borrow.push_root(HydroRoot::SendExternal {
809 to_external_key: from.key,
810 to_port_id: next_external_port_id,
811 to_many: true,
812 unpaired: false,
813 serialize_fn: None,
814 instantiate_fn: DebugInstantiate::Building,
815 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
816 op_metadata: HydroIrOpMetadata::new(),
817 });
818
819 let raw_stream: Stream<
820 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
821 Self,
822 Unbounded,
823 TotalOrder,
824 ExactlyOnce,
825 > = Stream::new(
826 self.clone(),
827 HydroNode::ExternalInput {
828 from_external_key: from.key,
829 from_port_id: next_external_port_id,
830 from_many: true,
831 codec_type: quote_type::<Codec>().into(),
832 port_hint,
833 instantiate_fn: DebugInstantiate::Building,
834 deserialize_fn: None,
835 metadata: self.new_node_metadata(Stream::<
836 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
837 Self,
838 Unbounded,
839 TotalOrder,
840 ExactlyOnce,
841 >::collection_kind()),
842 },
843 );
844
845 let membership_stream_ident = syn::Ident::new(
846 &format!(
847 "__hydro_deploy_many_{}_{}_membership",
848 from.key, next_external_port_id
849 ),
850 Span::call_site(),
851 );
852 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
853 let raw_membership_stream: KeyedStream<
854 u64,
855 bool,
856 Self,
857 Unbounded,
858 TotalOrder,
859 ExactlyOnce,
860 > = KeyedStream::new(
861 self.clone(),
862 HydroNode::Source {
863 source: HydroSource::Stream(membership_stream_expr.into()),
864 metadata: self.new_node_metadata(KeyedStream::<
865 u64,
866 bool,
867 Self,
868 Unbounded,
869 TotalOrder,
870 ExactlyOnce,
871 >::collection_kind()),
872 },
873 );
874
875 (
876 ExternalBytesPort {
877 process_key: from.key,
878 port_id: next_external_port_id,
879 _phantom: PhantomData,
880 },
881 raw_stream
882 .flatten_ordered() .into_keyed(),
884 raw_membership_stream.map(q!(|join| {
885 if join {
886 MembershipEvent::Joined
887 } else {
888 MembershipEvent::Left
889 }
890 })),
891 fwd_ref,
892 )
893 }
894
895 #[expect(clippy::type_complexity, reason = "stream markers")]
911 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
912 &self,
913 from: &External<L>,
914 ) -> (
915 ExternalBincodeBidi<InT, OutT, Many>,
916 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
917 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
918 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
919 )
920 where
921 Self: Sized + NoTick,
922 {
923 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
924
925 let (fwd_ref, to_sink) =
926 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
927 let mut flow_state_borrow = self.flow_state().borrow_mut();
928
929 let root = get_this_crate();
930
931 let out_t_type = quote_type::<OutT>();
932 let ser_fn: syn::Expr = syn::parse_quote! {
933 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
934 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
935 )
936 };
937
938 flow_state_borrow.push_root(HydroRoot::SendExternal {
939 to_external_key: from.key,
940 to_port_id: next_external_port_id,
941 to_many: true,
942 unpaired: false,
943 serialize_fn: Some(ser_fn.into()),
944 instantiate_fn: DebugInstantiate::Building,
945 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
946 op_metadata: HydroIrOpMetadata::new(),
947 });
948
949 let in_t_type = quote_type::<InT>();
950
951 let deser_fn: syn::Expr = syn::parse_quote! {
952 |res| {
953 let (id, b) = res.unwrap();
954 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
955 }
956 };
957
958 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
959 KeyedStream::new(
960 self.clone(),
961 HydroNode::ExternalInput {
962 from_external_key: from.key,
963 from_port_id: next_external_port_id,
964 from_many: true,
965 codec_type: quote_type::<LengthDelimitedCodec>().into(),
966 port_hint: NetworkHint::Auto,
967 instantiate_fn: DebugInstantiate::Building,
968 deserialize_fn: Some(deser_fn.into()),
969 metadata: self.new_node_metadata(KeyedStream::<
970 u64,
971 InT,
972 Self,
973 Unbounded,
974 TotalOrder,
975 ExactlyOnce,
976 >::collection_kind()),
977 },
978 );
979
980 let membership_stream_ident = syn::Ident::new(
981 &format!(
982 "__hydro_deploy_many_{}_{}_membership",
983 from.key, next_external_port_id
984 ),
985 Span::call_site(),
986 );
987 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
988 let raw_membership_stream: KeyedStream<
989 u64,
990 bool,
991 Self,
992 Unbounded,
993 TotalOrder,
994 ExactlyOnce,
995 > = KeyedStream::new(
996 self.clone(),
997 HydroNode::Source {
998 source: HydroSource::Stream(membership_stream_expr.into()),
999 metadata: self.new_node_metadata(KeyedStream::<
1000 u64,
1001 bool,
1002 Self,
1003 Unbounded,
1004 TotalOrder,
1005 ExactlyOnce,
1006 >::collection_kind()),
1007 },
1008 );
1009
1010 (
1011 ExternalBincodeBidi {
1012 process_key: from.key,
1013 port_id: next_external_port_id,
1014 _phantom: PhantomData,
1015 },
1016 raw_stream,
1017 raw_membership_stream.map(q!(|join| {
1018 if join {
1019 MembershipEvent::Joined
1020 } else {
1021 MembershipEvent::Left
1022 }
1023 })),
1024 fwd_ref,
1025 )
1026 }
1027
1028 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1046 where
1047 T: Clone,
1048 Self: Sized,
1049 {
1050 let e = e.splice_untyped_ctx(self);
1051
1052 Singleton::new(
1053 self.clone(),
1054 HydroNode::SingletonSource {
1055 value: e.into(),
1056 first_tick_only: false,
1057 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1058 },
1059 )
1060 }
1061
1062 fn source_interval(
1072 &self,
1073 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1074 _nondet: NonDet,
1075 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1076 where
1077 Self: Sized + NoTick,
1078 {
1079 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1080 tokio::time::interval(interval)
1081 )))
1082 }
1083
1084 fn source_interval_delayed(
1095 &self,
1096 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1097 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1098 _nondet: NonDet,
1099 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1100 where
1101 Self: Sized + NoTick,
1102 {
1103 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1104 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1105 )))
1106 }
1107
1108 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1142 where
1143 S: CycleCollection<'a, ForwardRef, Location = Self>,
1144 {
1145 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1146 (
1147 ForwardHandle::new(cycle_id, Location::id(self)),
1148 S::create_source(cycle_id, self.clone()),
1149 )
1150 }
1151}
1152
1153#[cfg(feature = "deploy")]
1154#[cfg(test)]
1155mod tests {
1156 use std::collections::HashSet;
1157
1158 use futures::{SinkExt, StreamExt};
1159 use hydro_deploy::Deployment;
1160 use stageleft::q;
1161 use tokio_util::codec::LengthDelimitedCodec;
1162
1163 use crate::compile::builder::FlowBuilder;
1164 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1165 use crate::location::{Location, NetworkHint};
1166 use crate::nondet::nondet;
1167
1168 #[tokio::test]
1169 async fn top_level_singleton_replay_cardinality() {
1170 let mut deployment = Deployment::new();
1171
1172 let mut flow = FlowBuilder::new();
1173 let node = flow.process::<()>();
1174 let external = flow.external::<()>();
1175
1176 let (in_port, input) =
1177 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1178 let singleton = node.singleton(q!(123));
1179 let tick = node.tick();
1180 let out = input
1181 .batch(&tick, nondet!())
1182 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1183 .cross_singleton(
1184 singleton
1185 .snapshot(&tick, nondet!())
1186 .into_stream()
1187 .count(),
1188 )
1189 .all_ticks()
1190 .send_bincode_external(&external);
1191
1192 let nodes = flow
1193 .with_process(&node, deployment.Localhost())
1194 .with_external(&external, deployment.Localhost())
1195 .deploy(&mut deployment);
1196
1197 deployment.deploy().await.unwrap();
1198
1199 let mut external_in = nodes.connect(in_port).await;
1200 let mut external_out = nodes.connect(out).await;
1201
1202 deployment.start().await.unwrap();
1203
1204 external_in.send(1).await.unwrap();
1205 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1206
1207 external_in.send(2).await.unwrap();
1208 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1209 }
1210
1211 #[tokio::test]
1212 async fn tick_singleton_replay_cardinality() {
1213 let mut deployment = Deployment::new();
1214
1215 let mut flow = FlowBuilder::new();
1216 let node = flow.process::<()>();
1217 let external = flow.external::<()>();
1218
1219 let (in_port, input) =
1220 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1221 let tick = node.tick();
1222 let singleton = tick.singleton(q!(123));
1223 let out = input
1224 .batch(&tick, nondet!())
1225 .cross_singleton(singleton.clone())
1226 .cross_singleton(singleton.into_stream().count())
1227 .all_ticks()
1228 .send_bincode_external(&external);
1229
1230 let nodes = flow
1231 .with_process(&node, deployment.Localhost())
1232 .with_external(&external, deployment.Localhost())
1233 .deploy(&mut deployment);
1234
1235 deployment.deploy().await.unwrap();
1236
1237 let mut external_in = nodes.connect(in_port).await;
1238 let mut external_out = nodes.connect(out).await;
1239
1240 deployment.start().await.unwrap();
1241
1242 external_in.send(1).await.unwrap();
1243 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1244
1245 external_in.send(2).await.unwrap();
1246 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1247 }
1248
1249 #[tokio::test]
1250 async fn external_bytes() {
1251 let mut deployment = Deployment::new();
1252
1253 let mut flow = FlowBuilder::new();
1254 let first_node = flow.process::<()>();
1255 let external = flow.external::<()>();
1256
1257 let (in_port, input) = first_node.source_external_bytes(&external);
1258 let out = input.send_bincode_external(&external);
1259
1260 let nodes = flow
1261 .with_process(&first_node, deployment.Localhost())
1262 .with_external(&external, deployment.Localhost())
1263 .deploy(&mut deployment);
1264
1265 deployment.deploy().await.unwrap();
1266
1267 let mut external_in = nodes.connect(in_port).await.1;
1268 let mut external_out = nodes.connect(out).await;
1269
1270 deployment.start().await.unwrap();
1271
1272 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1273
1274 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1275 }
1276
1277 #[tokio::test]
1278 async fn multi_external_source() {
1279 let mut deployment = Deployment::new();
1280
1281 let mut flow = FlowBuilder::new();
1282 let first_node = flow.process::<()>();
1283 let external = flow.external::<()>();
1284
1285 let (in_port, input, _membership, complete_sink) =
1286 first_node.bidi_external_many_bincode(&external);
1287 let out = input.entries().send_bincode_external(&external);
1288 complete_sink.complete(
1289 first_node
1290 .source_iter::<(u64, ()), _>(q!([]))
1291 .into_keyed()
1292 .weaken_ordering(),
1293 );
1294
1295 let nodes = flow
1296 .with_process(&first_node, deployment.Localhost())
1297 .with_external(&external, deployment.Localhost())
1298 .deploy(&mut deployment);
1299
1300 deployment.deploy().await.unwrap();
1301
1302 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1303 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1304 let external_out = nodes.connect(out).await;
1305
1306 deployment.start().await.unwrap();
1307
1308 external_in_1.send(123).await.unwrap();
1309 external_in_2.send(456).await.unwrap();
1310
1311 assert_eq!(
1312 external_out.take(2).collect::<HashSet<_>>().await,
1313 vec![(0, 123), (1, 456)].into_iter().collect()
1314 );
1315 }
1316
1317 #[tokio::test]
1318 async fn second_connection_only_multi_source() {
1319 let mut deployment = Deployment::new();
1320
1321 let mut flow = FlowBuilder::new();
1322 let first_node = flow.process::<()>();
1323 let external = flow.external::<()>();
1324
1325 let (in_port, input, _membership, complete_sink) =
1326 first_node.bidi_external_many_bincode(&external);
1327 let out = input.entries().send_bincode_external(&external);
1328 complete_sink.complete(
1329 first_node
1330 .source_iter::<(u64, ()), _>(q!([]))
1331 .into_keyed()
1332 .weaken_ordering(),
1333 );
1334
1335 let nodes = flow
1336 .with_process(&first_node, deployment.Localhost())
1337 .with_external(&external, deployment.Localhost())
1338 .deploy(&mut deployment);
1339
1340 deployment.deploy().await.unwrap();
1341
1342 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1344 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1345 let mut external_out = nodes.connect(out).await;
1346
1347 deployment.start().await.unwrap();
1348
1349 external_in_2.send(456).await.unwrap();
1350
1351 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1352 }
1353
1354 #[tokio::test]
1355 async fn multi_external_bytes() {
1356 let mut deployment = Deployment::new();
1357
1358 let mut flow = FlowBuilder::new();
1359 let first_node = flow.process::<()>();
1360 let external = flow.external::<()>();
1361
1362 let (in_port, input, _membership, complete_sink) = first_node
1363 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1364 let out = input.entries().send_bincode_external(&external);
1365 complete_sink.complete(
1366 first_node
1367 .source_iter(q!([]))
1368 .into_keyed()
1369 .weaken_ordering(),
1370 );
1371
1372 let nodes = flow
1373 .with_process(&first_node, deployment.Localhost())
1374 .with_external(&external, deployment.Localhost())
1375 .deploy(&mut deployment);
1376
1377 deployment.deploy().await.unwrap();
1378
1379 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1380 let mut external_in_2 = nodes.connect(in_port).await.1;
1381 let external_out = nodes.connect(out).await;
1382
1383 deployment.start().await.unwrap();
1384
1385 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1386 external_in_2.send(vec![4, 5].into()).await.unwrap();
1387
1388 assert_eq!(
1389 external_out.take(2).collect::<HashSet<_>>().await,
1390 vec![
1391 (0, (&[1u8, 2, 3] as &[u8]).into()),
1392 (1, (&[4u8, 5] as &[u8]).into())
1393 ]
1394 .into_iter()
1395 .collect()
1396 );
1397 }
1398
1399 #[tokio::test]
1400 async fn single_client_external_bytes() {
1401 let mut deployment = Deployment::new();
1402 let mut flow = FlowBuilder::new();
1403 let first_node = flow.process::<()>();
1404 let external = flow.external::<()>();
1405 let (port, input, complete_sink) = first_node
1406 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1407 complete_sink.complete(input.map(q!(|data| {
1408 let mut resp: Vec<u8> = data.into();
1409 resp.push(42);
1410 resp.into() })));
1412
1413 let nodes = flow
1414 .with_process(&first_node, deployment.Localhost())
1415 .with_external(&external, deployment.Localhost())
1416 .deploy(&mut deployment);
1417
1418 deployment.deploy().await.unwrap();
1419 deployment.start().await.unwrap();
1420
1421 let (mut external_out, mut external_in) = nodes.connect(port).await;
1422
1423 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1424 assert_eq!(
1425 external_out.next().await.unwrap().unwrap(),
1426 vec![1, 2, 3, 42]
1427 );
1428 }
1429
1430 #[tokio::test]
1431 async fn echo_external_bytes() {
1432 let mut deployment = Deployment::new();
1433
1434 let mut flow = FlowBuilder::new();
1435 let first_node = flow.process::<()>();
1436 let external = flow.external::<()>();
1437
1438 let (port, input, _membership, complete_sink) = first_node
1439 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1440 complete_sink
1441 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1442
1443 let nodes = flow
1444 .with_process(&first_node, deployment.Localhost())
1445 .with_external(&external, deployment.Localhost())
1446 .deploy(&mut deployment);
1447
1448 deployment.deploy().await.unwrap();
1449
1450 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1451 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1452
1453 deployment.start().await.unwrap();
1454
1455 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1456 external_in_2.send(vec![4, 5].into()).await.unwrap();
1457
1458 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1459 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1460 }
1461
1462 #[tokio::test]
1463 async fn echo_external_bincode() {
1464 let mut deployment = Deployment::new();
1465
1466 let mut flow = FlowBuilder::new();
1467 let first_node = flow.process::<()>();
1468 let external = flow.external::<()>();
1469
1470 let (port, input, _membership, complete_sink) =
1471 first_node.bidi_external_many_bincode(&external);
1472 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1473
1474 let nodes = flow
1475 .with_process(&first_node, deployment.Localhost())
1476 .with_external(&external, deployment.Localhost())
1477 .deploy(&mut deployment);
1478
1479 deployment.deploy().await.unwrap();
1480
1481 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1482 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1483
1484 deployment.start().await.unwrap();
1485
1486 external_in_1.send("hi".to_owned()).await.unwrap();
1487 external_in_2.send("hello".to_owned()).await.unwrap();
1488
1489 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1490 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1491 }
1492}