1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
34use crate::forward_handle::ForwardRef;
35#[cfg(stageleft_runtime)]
36use crate::forward_handle::{CycleCollection, ForwardHandle};
37use crate::live_collections::boundedness::{Bounded, Unbounded};
38use crate::live_collections::keyed_stream::KeyedStream;
39use crate::live_collections::singleton::Singleton;
40use crate::live_collections::stream::{
41 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
42};
43use crate::location::dynamic::LocationId;
44use crate::location::external_process::{
45 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
46};
47use crate::nondet::NonDet;
48#[cfg(feature = "sim")]
49use crate::sim::SimSender;
50use crate::staging_util::get_this_crate;
51
52pub mod dynamic;
53
54#[expect(missing_docs, reason = "TODO")]
55pub mod external_process;
56pub use external_process::External;
57
58#[expect(missing_docs, reason = "TODO")]
59pub mod process;
60pub use process::Process;
61
62#[expect(missing_docs, reason = "TODO")]
63pub mod cluster;
64pub use cluster::Cluster;
65
66#[expect(missing_docs, reason = "TODO")]
67pub mod member_id;
68pub use member_id::{MemberId, TaglessMemberId};
69
70#[expect(missing_docs, reason = "TODO")]
71pub mod tick;
72pub use tick::{Atomic, NoTick, Tick};
73
74#[expect(missing_docs, reason = "TODO")]
75#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
76pub enum MembershipEvent {
77 Joined,
78 Left,
79}
80
81#[expect(missing_docs, reason = "TODO")]
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
83pub enum NetworkHint {
84 Auto,
85 TcpPort(Option<u16>),
86}
87
88pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
89 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
90}
91
92#[stageleft::export(LocationKey)]
93new_key_type! {
94 pub struct LocationKey;
96}
97
98impl std::fmt::Display for LocationKey {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "loc{:?}", self.data()) }
102}
103
104impl std::str::FromStr for LocationKey {
107 type Err = Option<ParseIntError>;
108
109 fn from_str(s: &str) -> Result<Self, Self::Err> {
110 let nvn = s.strip_prefix("loc").ok_or(None)?;
111 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
112 let idx: u64 = idx.parse()?;
113 let ver: u64 = ver.parse()?;
114 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
115 }
116}
117
118impl LocationKey {
119 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
125 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); #[cfg(test)]
129 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); }
131
132impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
134 type O = LocationKey;
135
136 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
137 where
138 Self: Sized,
139 {
140 let root = get_this_crate();
141 let n = Key::data(&self).as_ffi();
142 (
143 QuoteTokens {
144 prelude: None,
145 expr: Some(quote! {
146 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
147 }),
148 },
149 (),
150 )
151 }
152}
153
154#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
156pub enum LocationType {
157 Process,
159 Cluster,
161 External,
163}
164
165#[expect(
179 private_bounds,
180 reason = "only internal Hydro code can define location types"
181)]
182pub trait Location<'a>: dynamic::DynLocation {
183 type Root: Location<'a>;
188
189 fn root(&self) -> Self::Root;
194
195 fn try_tick(&self) -> Option<Tick<Self>> {
202 if Self::is_top_level() {
203 let next_id = self.flow_state().borrow_mut().next_clock_id;
204 self.flow_state().borrow_mut().next_clock_id += 1;
205 Some(Tick {
206 id: next_id,
207 l: self.clone(),
208 })
209 } else {
210 None
211 }
212 }
213
214 fn id(&self) -> LocationId {
216 dynamic::DynLocation::id(self)
217 }
218
219 fn tick(&self) -> Tick<Self>
245 where
246 Self: NoTick,
247 {
248 let next_id = self.flow_state().borrow_mut().next_clock_id;
249 self.flow_state().borrow_mut().next_clock_id += 1;
250 Tick {
251 id: next_id,
252 l: self.clone(),
253 }
254 }
255
256 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
281 where
282 Self: Sized + NoTick,
283 {
284 Stream::new(
285 self.clone(),
286 HydroNode::Source {
287 source: HydroSource::Spin(),
288 metadata: self.new_node_metadata(Stream::<
289 (),
290 Self,
291 Unbounded,
292 TotalOrder,
293 ExactlyOnce,
294 >::collection_kind()),
295 },
296 )
297 }
298
299 fn source_stream<T, E>(
320 &self,
321 e: impl QuotedWithContext<'a, E, Self>,
322 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
323 where
324 E: FuturesStream<Item = T> + Unpin,
325 Self: Sized + NoTick,
326 {
327 let e = e.splice_untyped_ctx(self);
328
329 Stream::new(
330 self.clone(),
331 HydroNode::Source {
332 source: HydroSource::Stream(e.into()),
333 metadata: self.new_node_metadata(Stream::<
334 T,
335 Self,
336 Unbounded,
337 TotalOrder,
338 ExactlyOnce,
339 >::collection_kind()),
340 },
341 )
342 }
343
344 fn source_iter<T, E>(
366 &self,
367 e: impl QuotedWithContext<'a, E, Self>,
368 ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
369 where
370 E: IntoIterator<Item = T>,
371 Self: Sized + NoTick,
372 {
373 let e = e.splice_typed_ctx(self);
374
375 Stream::new(
376 self.clone(),
377 HydroNode::Source {
378 source: HydroSource::Iter(e.into()),
379 metadata: self.new_node_metadata(
380 Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
381 ),
382 },
383 )
384 }
385
386 fn source_cluster_members<C: 'a>(
420 &self,
421 cluster: &Cluster<'a, C>,
422 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
423 where
424 Self: Sized + NoTick,
425 {
426 Stream::new(
427 self.clone(),
428 HydroNode::Source {
429 source: HydroSource::ClusterMembers(cluster.id()),
430 metadata: self.new_node_metadata(Stream::<
431 (TaglessMemberId, MembershipEvent),
432 Self,
433 Unbounded,
434 TotalOrder,
435 ExactlyOnce,
436 >::collection_kind()),
437 },
438 )
439 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
440 .into_keyed()
441 }
442
443 fn source_external_bytes<L>(
451 &self,
452 from: &External<L>,
453 ) -> (
454 ExternalBytesPort,
455 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
456 )
457 where
458 Self: Sized + NoTick,
459 {
460 let (port, stream, sink) =
461 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
462
463 sink.complete(self.source_iter(q!([])));
464
465 (port, stream)
466 }
467
468 #[expect(clippy::type_complexity, reason = "stream markers")]
475 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
476 &self,
477 from: &External<L>,
478 ) -> (
479 ExternalBincodeSink<T, NotMany, O, R>,
480 Stream<T, Self, Unbounded, O, R>,
481 )
482 where
483 Self: Sized + NoTick,
484 T: Serialize + DeserializeOwned,
485 {
486 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
487 sink.complete(self.source_iter(q!([])));
488
489 (
490 ExternalBincodeSink {
491 process_key: from.key,
492 port_id: port.port_id,
493 _phantom: PhantomData,
494 },
495 stream.weaken_ordering().weaken_retries(),
496 )
497 }
498
499 #[cfg(feature = "sim")]
504 #[expect(clippy::type_complexity, reason = "stream markers")]
505 fn sim_input<T, O: Ordering, R: Retries>(
506 &self,
507 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
508 where
509 Self: Sized + NoTick,
510 T: Serialize + DeserializeOwned,
511 {
512 let external_location: External<'a, ()> = External {
513 key: LocationKey::FIRST,
514 flow_state: self.flow_state().clone(),
515 _phantom: PhantomData,
516 };
517
518 let (external, stream) = self.source_external_bincode(&external_location);
519
520 (SimSender(external.port_id, PhantomData), stream)
521 }
522
523 #[expect(clippy::type_complexity, reason = "stream markers")]
568 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
569 &self,
570 from: &External<L>,
571 port_hint: NetworkHint,
572 ) -> (
573 ExternalBytesPort<NotMany>,
574 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
575 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
576 )
577 where
578 Self: Sized + NoTick,
579 {
580 let next_external_port_id = from
581 .flow_state
582 .borrow_mut()
583 .next_external_port
584 .get_and_increment();
585
586 let (fwd_ref, to_sink) =
587 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
588 let mut flow_state_borrow = self.flow_state().borrow_mut();
589
590 flow_state_borrow.push_root(HydroRoot::SendExternal {
591 to_external_key: from.key,
592 to_port_id: next_external_port_id,
593 to_many: false,
594 unpaired: false,
595 serialize_fn: None,
596 instantiate_fn: DebugInstantiate::Building,
597 input: Box::new(to_sink.ir_node.into_inner()),
598 op_metadata: HydroIrOpMetadata::new(),
599 });
600
601 let raw_stream: Stream<
602 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
603 Self,
604 Unbounded,
605 TotalOrder,
606 ExactlyOnce,
607 > = Stream::new(
608 self.clone(),
609 HydroNode::ExternalInput {
610 from_external_key: from.key,
611 from_port_id: next_external_port_id,
612 from_many: false,
613 codec_type: quote_type::<Codec>().into(),
614 port_hint,
615 instantiate_fn: DebugInstantiate::Building,
616 deserialize_fn: None,
617 metadata: self.new_node_metadata(Stream::<
618 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
619 Self,
620 Unbounded,
621 TotalOrder,
622 ExactlyOnce,
623 >::collection_kind()),
624 },
625 );
626
627 (
628 ExternalBytesPort {
629 process_key: from.key,
630 port_id: next_external_port_id,
631 _phantom: PhantomData,
632 },
633 raw_stream.flatten_ordered(),
634 fwd_ref,
635 )
636 }
637
638 #[expect(clippy::type_complexity, reason = "stream markers")]
648 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
649 &self,
650 from: &External<L>,
651 ) -> (
652 ExternalBincodeBidi<InT, OutT, NotMany>,
653 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
654 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
655 )
656 where
657 Self: Sized + NoTick,
658 {
659 let next_external_port_id = from
660 .flow_state
661 .borrow_mut()
662 .next_external_port
663 .get_and_increment();
664
665 let (fwd_ref, to_sink) =
666 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
667 let mut flow_state_borrow = self.flow_state().borrow_mut();
668
669 let root = get_this_crate();
670
671 let out_t_type = quote_type::<OutT>();
672 let ser_fn: syn::Expr = syn::parse_quote! {
673 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
674 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
675 )
676 };
677
678 flow_state_borrow.push_root(HydroRoot::SendExternal {
679 to_external_key: from.key,
680 to_port_id: next_external_port_id,
681 to_many: false,
682 unpaired: false,
683 serialize_fn: Some(ser_fn.into()),
684 instantiate_fn: DebugInstantiate::Building,
685 input: Box::new(to_sink.ir_node.into_inner()),
686 op_metadata: HydroIrOpMetadata::new(),
687 });
688
689 let in_t_type = quote_type::<InT>();
690
691 let deser_fn: syn::Expr = syn::parse_quote! {
692 |res| {
693 let b = res.unwrap();
694 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
695 }
696 };
697
698 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
699 self.clone(),
700 HydroNode::ExternalInput {
701 from_external_key: from.key,
702 from_port_id: next_external_port_id,
703 from_many: false,
704 codec_type: quote_type::<LengthDelimitedCodec>().into(),
705 port_hint: NetworkHint::Auto,
706 instantiate_fn: DebugInstantiate::Building,
707 deserialize_fn: Some(deser_fn.into()),
708 metadata: self.new_node_metadata(Stream::<
709 InT,
710 Self,
711 Unbounded,
712 TotalOrder,
713 ExactlyOnce,
714 >::collection_kind()),
715 },
716 );
717
718 (
719 ExternalBincodeBidi {
720 process_key: from.key,
721 port_id: next_external_port_id,
722 _phantom: PhantomData,
723 },
724 raw_stream,
725 fwd_ref,
726 )
727 }
728
729 #[expect(clippy::type_complexity, reason = "stream markers")]
741 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
742 &self,
743 from: &External<L>,
744 port_hint: NetworkHint,
745 ) -> (
746 ExternalBytesPort<Many>,
747 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
748 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
749 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
750 )
751 where
752 Self: Sized + NoTick,
753 {
754 let next_external_port_id = from
755 .flow_state
756 .borrow_mut()
757 .next_external_port
758 .get_and_increment();
759
760 let (fwd_ref, to_sink) =
761 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
762 let mut flow_state_borrow = self.flow_state().borrow_mut();
763
764 flow_state_borrow.push_root(HydroRoot::SendExternal {
765 to_external_key: from.key,
766 to_port_id: next_external_port_id,
767 to_many: true,
768 unpaired: false,
769 serialize_fn: None,
770 instantiate_fn: DebugInstantiate::Building,
771 input: Box::new(to_sink.entries().ir_node.into_inner()),
772 op_metadata: HydroIrOpMetadata::new(),
773 });
774
775 let raw_stream: Stream<
776 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
777 Self,
778 Unbounded,
779 TotalOrder,
780 ExactlyOnce,
781 > = Stream::new(
782 self.clone(),
783 HydroNode::ExternalInput {
784 from_external_key: from.key,
785 from_port_id: next_external_port_id,
786 from_many: true,
787 codec_type: quote_type::<Codec>().into(),
788 port_hint,
789 instantiate_fn: DebugInstantiate::Building,
790 deserialize_fn: None,
791 metadata: self.new_node_metadata(Stream::<
792 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
793 Self,
794 Unbounded,
795 TotalOrder,
796 ExactlyOnce,
797 >::collection_kind()),
798 },
799 );
800
801 let membership_stream_ident = syn::Ident::new(
802 &format!(
803 "__hydro_deploy_many_{}_{}_membership",
804 from.key, next_external_port_id
805 ),
806 Span::call_site(),
807 );
808 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
809 let raw_membership_stream: KeyedStream<
810 u64,
811 bool,
812 Self,
813 Unbounded,
814 TotalOrder,
815 ExactlyOnce,
816 > = KeyedStream::new(
817 self.clone(),
818 HydroNode::Source {
819 source: HydroSource::Stream(membership_stream_expr.into()),
820 metadata: self.new_node_metadata(KeyedStream::<
821 u64,
822 bool,
823 Self,
824 Unbounded,
825 TotalOrder,
826 ExactlyOnce,
827 >::collection_kind()),
828 },
829 );
830
831 (
832 ExternalBytesPort {
833 process_key: from.key,
834 port_id: next_external_port_id,
835 _phantom: PhantomData,
836 },
837 raw_stream
838 .flatten_ordered() .into_keyed(),
840 raw_membership_stream.map(q!(|join| {
841 if join {
842 MembershipEvent::Joined
843 } else {
844 MembershipEvent::Left
845 }
846 })),
847 fwd_ref,
848 )
849 }
850
851 #[expect(clippy::type_complexity, reason = "stream markers")]
867 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
868 &self,
869 from: &External<L>,
870 ) -> (
871 ExternalBincodeBidi<InT, OutT, Many>,
872 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
873 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
874 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
875 )
876 where
877 Self: Sized + NoTick,
878 {
879 let next_external_port_id = from
880 .flow_state
881 .borrow_mut()
882 .next_external_port
883 .get_and_increment();
884
885 let (fwd_ref, to_sink) =
886 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
887 let mut flow_state_borrow = self.flow_state().borrow_mut();
888
889 let root = get_this_crate();
890
891 let out_t_type = quote_type::<OutT>();
892 let ser_fn: syn::Expr = syn::parse_quote! {
893 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
894 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
895 )
896 };
897
898 flow_state_borrow.push_root(HydroRoot::SendExternal {
899 to_external_key: from.key,
900 to_port_id: next_external_port_id,
901 to_many: true,
902 unpaired: false,
903 serialize_fn: Some(ser_fn.into()),
904 instantiate_fn: DebugInstantiate::Building,
905 input: Box::new(to_sink.entries().ir_node.into_inner()),
906 op_metadata: HydroIrOpMetadata::new(),
907 });
908
909 let in_t_type = quote_type::<InT>();
910
911 let deser_fn: syn::Expr = syn::parse_quote! {
912 |res| {
913 let (id, b) = res.unwrap();
914 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
915 }
916 };
917
918 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
919 KeyedStream::new(
920 self.clone(),
921 HydroNode::ExternalInput {
922 from_external_key: from.key,
923 from_port_id: next_external_port_id,
924 from_many: true,
925 codec_type: quote_type::<LengthDelimitedCodec>().into(),
926 port_hint: NetworkHint::Auto,
927 instantiate_fn: DebugInstantiate::Building,
928 deserialize_fn: Some(deser_fn.into()),
929 metadata: self.new_node_metadata(KeyedStream::<
930 u64,
931 InT,
932 Self,
933 Unbounded,
934 TotalOrder,
935 ExactlyOnce,
936 >::collection_kind()),
937 },
938 );
939
940 let membership_stream_ident = syn::Ident::new(
941 &format!(
942 "__hydro_deploy_many_{}_{}_membership",
943 from.key, next_external_port_id
944 ),
945 Span::call_site(),
946 );
947 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
948 let raw_membership_stream: KeyedStream<
949 u64,
950 bool,
951 Self,
952 Unbounded,
953 TotalOrder,
954 ExactlyOnce,
955 > = KeyedStream::new(
956 self.clone(),
957 HydroNode::Source {
958 source: HydroSource::Stream(membership_stream_expr.into()),
959 metadata: self.new_node_metadata(KeyedStream::<
960 u64,
961 bool,
962 Self,
963 Unbounded,
964 TotalOrder,
965 ExactlyOnce,
966 >::collection_kind()),
967 },
968 );
969
970 (
971 ExternalBincodeBidi {
972 process_key: from.key,
973 port_id: next_external_port_id,
974 _phantom: PhantomData,
975 },
976 raw_stream,
977 raw_membership_stream.map(q!(|join| {
978 if join {
979 MembershipEvent::Joined
980 } else {
981 MembershipEvent::Left
982 }
983 })),
984 fwd_ref,
985 )
986 }
987
988 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1006 where
1007 T: Clone,
1008 Self: Sized,
1009 {
1010 let e = e.splice_untyped_ctx(self);
1011
1012 Singleton::new(
1013 self.clone(),
1014 HydroNode::SingletonSource {
1015 value: e.into(),
1016 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1017 },
1018 )
1019 }
1020
1021 fn source_interval(
1031 &self,
1032 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1033 _nondet: NonDet,
1034 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1035 where
1036 Self: Sized + NoTick,
1037 {
1038 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1039 tokio::time::interval(interval)
1040 )))
1041 }
1042
1043 fn source_interval_delayed(
1054 &self,
1055 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1056 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1057 _nondet: NonDet,
1058 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1059 where
1060 Self: Sized + NoTick,
1061 {
1062 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1063 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1064 )))
1065 }
1066
1067 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1101 where
1102 S: CycleCollection<'a, ForwardRef, Location = Self>,
1103 {
1104 let next_id = self.flow_state().borrow_mut().next_cycle_id();
1105 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
1106
1107 (
1108 ForwardHandle {
1109 completed: false,
1110 ident: ident.clone(),
1111 expected_location: Location::id(self),
1112 _phantom: PhantomData,
1113 },
1114 S::create_source(ident, self.clone()),
1115 )
1116 }
1117}
1118
1119#[cfg(feature = "deploy")]
1120#[cfg(test)]
1121mod tests {
1122 use std::collections::HashSet;
1123
1124 use futures::{SinkExt, StreamExt};
1125 use hydro_deploy::Deployment;
1126 use stageleft::q;
1127 use tokio_util::codec::LengthDelimitedCodec;
1128
1129 use crate::compile::builder::FlowBuilder;
1130 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1131 use crate::location::{Location, NetworkHint};
1132 use crate::nondet::nondet;
1133
1134 #[tokio::test]
1135 async fn top_level_singleton_replay_cardinality() {
1136 let mut deployment = Deployment::new();
1137
1138 let mut flow = FlowBuilder::new();
1139 let node = flow.process::<()>();
1140 let external = flow.external::<()>();
1141
1142 let (in_port, input) =
1143 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1144 let singleton = node.singleton(q!(123));
1145 let tick = node.tick();
1146 let out = input
1147 .batch(&tick, nondet!())
1148 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1149 .cross_singleton(
1150 singleton
1151 .snapshot(&tick, nondet!())
1152 .into_stream()
1153 .count(),
1154 )
1155 .all_ticks()
1156 .send_bincode_external(&external);
1157
1158 let nodes = flow
1159 .with_process(&node, deployment.Localhost())
1160 .with_external(&external, deployment.Localhost())
1161 .deploy(&mut deployment);
1162
1163 deployment.deploy().await.unwrap();
1164
1165 let mut external_in = nodes.connect(in_port).await;
1166 let mut external_out = nodes.connect(out).await;
1167
1168 deployment.start().await.unwrap();
1169
1170 external_in.send(1).await.unwrap();
1171 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1172
1173 external_in.send(2).await.unwrap();
1174 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1175 }
1176
1177 #[tokio::test]
1178 async fn tick_singleton_replay_cardinality() {
1179 let mut deployment = Deployment::new();
1180
1181 let mut flow = FlowBuilder::new();
1182 let node = flow.process::<()>();
1183 let external = flow.external::<()>();
1184
1185 let (in_port, input) =
1186 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1187 let tick = node.tick();
1188 let singleton = tick.singleton(q!(123));
1189 let out = input
1190 .batch(&tick, nondet!())
1191 .cross_singleton(singleton.clone())
1192 .cross_singleton(singleton.into_stream().count())
1193 .all_ticks()
1194 .send_bincode_external(&external);
1195
1196 let nodes = flow
1197 .with_process(&node, deployment.Localhost())
1198 .with_external(&external, deployment.Localhost())
1199 .deploy(&mut deployment);
1200
1201 deployment.deploy().await.unwrap();
1202
1203 let mut external_in = nodes.connect(in_port).await;
1204 let mut external_out = nodes.connect(out).await;
1205
1206 deployment.start().await.unwrap();
1207
1208 external_in.send(1).await.unwrap();
1209 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1210
1211 external_in.send(2).await.unwrap();
1212 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1213 }
1214
1215 #[tokio::test]
1216 async fn external_bytes() {
1217 let mut deployment = Deployment::new();
1218
1219 let mut flow = FlowBuilder::new();
1220 let first_node = flow.process::<()>();
1221 let external = flow.external::<()>();
1222
1223 let (in_port, input) = first_node.source_external_bytes(&external);
1224 let out = input.send_bincode_external(&external);
1225
1226 let nodes = flow
1227 .with_process(&first_node, deployment.Localhost())
1228 .with_external(&external, deployment.Localhost())
1229 .deploy(&mut deployment);
1230
1231 deployment.deploy().await.unwrap();
1232
1233 let mut external_in = nodes.connect(in_port).await.1;
1234 let mut external_out = nodes.connect(out).await;
1235
1236 deployment.start().await.unwrap();
1237
1238 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1239
1240 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1241 }
1242
1243 #[tokio::test]
1244 async fn multi_external_source() {
1245 let mut deployment = Deployment::new();
1246
1247 let mut flow = FlowBuilder::new();
1248 let first_node = flow.process::<()>();
1249 let external = flow.external::<()>();
1250
1251 let (in_port, input, _membership, complete_sink) =
1252 first_node.bidi_external_many_bincode(&external);
1253 let out = input.entries().send_bincode_external(&external);
1254 complete_sink.complete(
1255 first_node
1256 .source_iter::<(u64, ()), _>(q!([]))
1257 .into_keyed()
1258 .weaken_ordering(),
1259 );
1260
1261 let nodes = flow
1262 .with_process(&first_node, deployment.Localhost())
1263 .with_external(&external, deployment.Localhost())
1264 .deploy(&mut deployment);
1265
1266 deployment.deploy().await.unwrap();
1267
1268 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1269 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1270 let external_out = nodes.connect(out).await;
1271
1272 deployment.start().await.unwrap();
1273
1274 external_in_1.send(123).await.unwrap();
1275 external_in_2.send(456).await.unwrap();
1276
1277 assert_eq!(
1278 external_out.take(2).collect::<HashSet<_>>().await,
1279 vec![(0, 123), (1, 456)].into_iter().collect()
1280 );
1281 }
1282
1283 #[tokio::test]
1284 async fn second_connection_only_multi_source() {
1285 let mut deployment = Deployment::new();
1286
1287 let mut flow = FlowBuilder::new();
1288 let first_node = flow.process::<()>();
1289 let external = flow.external::<()>();
1290
1291 let (in_port, input, _membership, complete_sink) =
1292 first_node.bidi_external_many_bincode(&external);
1293 let out = input.entries().send_bincode_external(&external);
1294 complete_sink.complete(
1295 first_node
1296 .source_iter::<(u64, ()), _>(q!([]))
1297 .into_keyed()
1298 .weaken_ordering(),
1299 );
1300
1301 let nodes = flow
1302 .with_process(&first_node, deployment.Localhost())
1303 .with_external(&external, deployment.Localhost())
1304 .deploy(&mut deployment);
1305
1306 deployment.deploy().await.unwrap();
1307
1308 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1310 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1311 let mut external_out = nodes.connect(out).await;
1312
1313 deployment.start().await.unwrap();
1314
1315 external_in_2.send(456).await.unwrap();
1316
1317 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1318 }
1319
1320 #[tokio::test]
1321 async fn multi_external_bytes() {
1322 let mut deployment = Deployment::new();
1323
1324 let mut flow = FlowBuilder::new();
1325 let first_node = flow.process::<()>();
1326 let external = flow.external::<()>();
1327
1328 let (in_port, input, _membership, complete_sink) = first_node
1329 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1330 let out = input.entries().send_bincode_external(&external);
1331 complete_sink.complete(
1332 first_node
1333 .source_iter(q!([]))
1334 .into_keyed()
1335 .weaken_ordering(),
1336 );
1337
1338 let nodes = flow
1339 .with_process(&first_node, deployment.Localhost())
1340 .with_external(&external, deployment.Localhost())
1341 .deploy(&mut deployment);
1342
1343 deployment.deploy().await.unwrap();
1344
1345 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1346 let mut external_in_2 = nodes.connect(in_port).await.1;
1347 let external_out = nodes.connect(out).await;
1348
1349 deployment.start().await.unwrap();
1350
1351 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1352 external_in_2.send(vec![4, 5].into()).await.unwrap();
1353
1354 assert_eq!(
1355 external_out.take(2).collect::<HashSet<_>>().await,
1356 vec![
1357 (0, (&[1u8, 2, 3] as &[u8]).into()),
1358 (1, (&[4u8, 5] as &[u8]).into())
1359 ]
1360 .into_iter()
1361 .collect()
1362 );
1363 }
1364
1365 #[tokio::test]
1366 async fn single_client_external_bytes() {
1367 let mut deployment = Deployment::new();
1368 let mut flow = FlowBuilder::new();
1369 let first_node = flow.process::<()>();
1370 let external = flow.external::<()>();
1371 let (port, input, complete_sink) = first_node
1372 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1373 complete_sink.complete(input.map(q!(|data| {
1374 let mut resp: Vec<u8> = data.into();
1375 resp.push(42);
1376 resp.into() })));
1378
1379 let nodes = flow
1380 .with_process(&first_node, deployment.Localhost())
1381 .with_external(&external, deployment.Localhost())
1382 .deploy(&mut deployment);
1383
1384 deployment.deploy().await.unwrap();
1385 deployment.start().await.unwrap();
1386
1387 let (mut external_out, mut external_in) = nodes.connect(port).await;
1388
1389 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1390 assert_eq!(
1391 external_out.next().await.unwrap().unwrap(),
1392 vec![1, 2, 3, 42]
1393 );
1394 }
1395
1396 #[tokio::test]
1397 async fn echo_external_bytes() {
1398 let mut deployment = Deployment::new();
1399
1400 let mut flow = FlowBuilder::new();
1401 let first_node = flow.process::<()>();
1402 let external = flow.external::<()>();
1403
1404 let (port, input, _membership, complete_sink) = first_node
1405 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1406 complete_sink
1407 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1408
1409 let nodes = flow
1410 .with_process(&first_node, deployment.Localhost())
1411 .with_external(&external, deployment.Localhost())
1412 .deploy(&mut deployment);
1413
1414 deployment.deploy().await.unwrap();
1415
1416 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1417 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1418
1419 deployment.start().await.unwrap();
1420
1421 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1422 external_in_2.send(vec![4, 5].into()).await.unwrap();
1423
1424 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1425 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1426 }
1427
1428 #[tokio::test]
1429 async fn echo_external_bincode() {
1430 let mut deployment = Deployment::new();
1431
1432 let mut flow = FlowBuilder::new();
1433 let first_node = flow.process::<()>();
1434 let external = flow.external::<()>();
1435
1436 let (port, input, _membership, complete_sink) =
1437 first_node.bidi_external_many_bincode(&external);
1438 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1439
1440 let nodes = flow
1441 .with_process(&first_node, deployment.Localhost())
1442 .with_external(&external, deployment.Localhost())
1443 .deploy(&mut deployment);
1444
1445 deployment.deploy().await.unwrap();
1446
1447 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1448 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1449
1450 deployment.start().await.unwrap();
1451
1452 external_in_1.send("hi".to_owned()).await.unwrap();
1453 external_in_2.send("hello".to_owned()).await.unwrap();
1454
1455 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1456 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1457 }
1458}