1use super::ir::{CollectionKind, HydroRoot, StreamOrder, StreamRetry};
2use crate::location::dynamic::ClusterConsistency;
3
4#[derive(Clone, Debug, PartialEq, Eq)]
8pub enum ConsistencyLabel {
9 SeqConsistent,
11 PerKeySeqConsistent,
13 ConvergentMultiset,
15 ConvergentSet,
17 ConvergentLattice,
19 Inconsistent,
21 NoGuarantee,
23}
24
25impl ConsistencyLabel {
26 pub fn as_str(&self) -> &'static str {
28 match self {
29 Self::SeqConsistent => "SEQ_CONSISTENT",
30 Self::PerKeySeqConsistent => "PER_KEY_SEQ_CONSISTENT",
31 Self::ConvergentMultiset => "CONVERGENT_MULTISET",
32 Self::ConvergentSet => "CONVERGENT_SET",
33 Self::ConvergentLattice => "CONVERGENT_LATTICE",
34 Self::Inconsistent => "INCONSISTENT",
35 Self::NoGuarantee => "NO_GUARANTEE",
36 }
37 }
38}
39
40impl std::fmt::Display for ConsistencyLabel {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.write_str(self.as_str())
43 }
44}
45
46#[derive(Clone, Debug)]
48pub struct SinkConsistency {
49 pub name: String,
51 pub label: ConsistencyLabel,
53 pub blame: Vec<String>,
55}
56
57pub fn derive_label(
59 consistency: Option<&ClusterConsistency>,
60 collection_kind: &CollectionKind,
61) -> ConsistencyLabel {
62 match consistency {
63 Some(ClusterConsistency::EventualConsistency) => match collection_kind {
64 CollectionKind::Stream { order, retry, .. } => stream_label(order, retry, false),
65 CollectionKind::KeyedStream {
66 value_order,
67 value_retry,
68 ..
69 } => stream_label(value_order, value_retry, true),
70 CollectionKind::Singleton { .. } | CollectionKind::KeyedSingleton { .. } => {
71 ConsistencyLabel::ConvergentLattice
72 }
73 CollectionKind::Optional { .. } => ConsistencyLabel::ConvergentSet,
74 },
75 Some(ClusterConsistency::NoConsistency) | None => ConsistencyLabel::NoGuarantee,
76 }
77}
78
79fn stream_label(order: &StreamOrder, retry: &StreamRetry, keyed: bool) -> ConsistencyLabel {
80 match (order, retry) {
81 (StreamOrder::TotalOrder, StreamRetry::ExactlyOnce) => {
82 if keyed {
83 ConsistencyLabel::PerKeySeqConsistent
84 } else {
85 ConsistencyLabel::SeqConsistent
86 }
87 }
88 (StreamOrder::TotalOrder, StreamRetry::AtLeastOnce) => ConsistencyLabel::ConvergentSet,
89 (StreamOrder::NoOrder, StreamRetry::ExactlyOnce) => ConsistencyLabel::ConvergentMultiset,
90 (StreamOrder::NoOrder, StreamRetry::AtLeastOnce) => ConsistencyLabel::ConvergentSet,
91 }
92}
93
94pub fn sink_type_label(collection_kind: &CollectionKind) -> ConsistencyLabel {
97 match collection_kind {
98 CollectionKind::Stream { order, retry, .. } => stream_label(order, retry, false),
99 CollectionKind::KeyedStream {
100 value_order,
101 value_retry,
102 ..
103 } => stream_label(value_order, value_retry, true),
104 CollectionKind::Singleton { .. } | CollectionKind::KeyedSingleton { .. } => {
105 ConsistencyLabel::ConvergentLattice
106 }
107 CollectionKind::Optional { .. } => ConsistencyLabel::ConvergentSet,
108 }
109}
110
111pub fn analyze_sink_labels(ir: &[HydroRoot]) -> Vec<SinkConsistency> {
120 ir.iter()
121 .filter(|root| !matches!(root, HydroRoot::CycleSink { .. }))
122 .map(|root| {
123 let meta = root.input_metadata();
124 let name = meta
125 .tag
126 .clone()
127 .or_else(|| find_tag(root.input()))
128 .unwrap_or_else(|| root.print_root());
129
130 let type_label = sink_type_label(&meta.collection_kind);
131
132 let mut blame = Vec::new();
133 let mut visited = std::collections::HashSet::new();
134 collect_unresolved_nondet(root.input(), &mut blame, &mut visited);
135 let mut visited = std::collections::HashSet::new();
136 collect_unresolved_nondet(root.input(), &mut blame, &mut visited);
137 let label = if blame.is_empty() {
138 type_label
139 } else {
140 ConsistencyLabel::Inconsistent
141 };
142
143 SinkConsistency { name, label, blame }
144 })
145 .collect()
146}
147
148fn collect_unresolved_nondet(
150 node: &super::ir::HydroNode,
151 blame: &mut Vec<String>,
152 visited: &mut std::collections::HashSet<usize>,
153) {
154 use std::rc::Rc;
155
156 use super::ir::HydroNode;
157 match node {
158 HydroNode::Fold { input, .. }
161 | HydroNode::FoldKeyed { input, .. }
162 | HydroNode::Reduce { input, .. }
163 | HydroNode::ReduceKeyed { input, .. }
164 | HydroNode::ReduceKeyedWatermark { input, .. }
165 if is_ci_from_input(input) => {}
166 HydroNode::ObserveNonDet {
168 trusted: false,
169 metadata,
170 ..
171 } => {
172 if let Some(span) = metadata.op.backtrace.format_span() {
173 blame.push(span);
174 } else {
175 blame.push("<unknown location>".to_owned());
176 }
177 }
178 HydroNode::Tee { inner, .. }
180 | HydroNode::Singleton { inner, .. }
181 | HydroNode::Partition { inner, .. } => {
182 let ptr = Rc::as_ptr(&inner.0) as usize;
183 if visited.insert(ptr) {
184 collect_unresolved_nondet(&inner.0.borrow(), blame, visited);
185 }
186 }
187 _ => {
189 for child in node.input() {
190 collect_unresolved_nondet(child, blame, visited);
191 }
192 }
193 }
194}
195
196fn is_ci_from_input(input: &super::ir::HydroNode) -> bool {
200 use super::ir::{CollectionKind, StreamOrder, StreamRetry};
201 let ck = &input.metadata().collection_kind;
202 let is_no_order = matches!(
203 ck,
204 CollectionKind::Stream {
205 order: StreamOrder::NoOrder,
206 ..
207 } | CollectionKind::KeyedStream {
208 value_order: StreamOrder::NoOrder,
209 ..
210 }
211 );
212 let is_at_least_once = matches!(
213 ck,
214 CollectionKind::Stream {
215 retry: StreamRetry::AtLeastOnce,
216 ..
217 } | CollectionKind::KeyedStream {
218 value_retry: StreamRetry::AtLeastOnce,
219 ..
220 }
221 );
222 is_no_order && is_at_least_once
223}
224
225fn find_tag(node: &super::ir::HydroNode) -> Option<String> {
227 use super::ir::HydroNode;
228 if let Some(ref tag) = node.metadata().tag {
229 return Some(tag.clone());
230 }
231 match node {
232 HydroNode::Tee { inner, .. }
233 | HydroNode::Singleton { inner, .. }
234 | HydroNode::Partition { inner, .. } => find_tag(&inner.0.borrow()),
235 _ => node.input().iter().find_map(|child| find_tag(child)),
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242
243 #[test]
244 fn test_derive_label_eventual_consistency() {
245 use super::super::ir::{BoundKind, StreamOrder, StreamRetry};
246
247 let ec = Some(ClusterConsistency::EventualConsistency);
248
249 assert_eq!(
251 derive_label(
252 ec.as_ref(),
253 &CollectionKind::Stream {
254 bound: BoundKind::Unbounded,
255 order: StreamOrder::TotalOrder,
256 retry: StreamRetry::ExactlyOnce,
257 element_type: super::super::ir::DebugType(Box::new(
258 syn::parse_str("i32").unwrap()
259 )),
260 }
261 ),
262 ConsistencyLabel::SeqConsistent
263 );
264
265 assert_eq!(
267 derive_label(
268 ec.as_ref(),
269 &CollectionKind::Stream {
270 bound: BoundKind::Unbounded,
271 order: StreamOrder::NoOrder,
272 retry: StreamRetry::ExactlyOnce,
273 element_type: super::super::ir::DebugType(Box::new(
274 syn::parse_str("i32").unwrap()
275 )),
276 }
277 ),
278 ConsistencyLabel::ConvergentMultiset
279 );
280
281 assert_eq!(
283 derive_label(
284 ec.as_ref(),
285 &CollectionKind::Stream {
286 bound: BoundKind::Unbounded,
287 order: StreamOrder::NoOrder,
288 retry: StreamRetry::AtLeastOnce,
289 element_type: super::super::ir::DebugType(Box::new(
290 syn::parse_str("i32").unwrap()
291 )),
292 }
293 ),
294 ConsistencyLabel::ConvergentSet
295 );
296
297 assert_eq!(
299 derive_label(
300 ec.as_ref(),
301 &CollectionKind::KeyedStream {
302 bound: BoundKind::Unbounded,
303 value_order: StreamOrder::TotalOrder,
304 value_retry: StreamRetry::ExactlyOnce,
305 key_type: super::super::ir::DebugType(Box::new(
306 syn::parse_str("String").unwrap()
307 )),
308 value_type: super::super::ir::DebugType(Box::new(
309 syn::parse_str("i32").unwrap()
310 )),
311 }
312 ),
313 ConsistencyLabel::PerKeySeqConsistent
314 );
315 }
316
317 #[test]
318 fn test_derive_label_no_consistency() {
319 use super::super::ir::{BoundKind, StreamOrder, StreamRetry};
320
321 assert_eq!(
323 derive_label(
324 Some(&ClusterConsistency::NoConsistency),
325 &CollectionKind::Stream {
326 bound: BoundKind::Unbounded,
327 order: StreamOrder::TotalOrder,
328 retry: StreamRetry::ExactlyOnce,
329 element_type: super::super::ir::DebugType(Box::new(
330 syn::parse_str("i32").unwrap()
331 )),
332 }
333 ),
334 ConsistencyLabel::NoGuarantee
335 );
336
337 assert_eq!(
339 derive_label(
340 None,
341 &CollectionKind::Stream {
342 bound: BoundKind::Unbounded,
343 order: StreamOrder::TotalOrder,
344 retry: StreamRetry::ExactlyOnce,
345 element_type: super::super::ir::DebugType(Box::new(
346 syn::parse_str("i32").unwrap()
347 )),
348 }
349 ),
350 ConsistencyLabel::NoGuarantee
351 );
352 }
353
354 #[test]
357 #[cfg(feature = "build")]
358 fn test_broadcast_closed_produces_eventual_consistency_label() {
359 use stageleft::q;
360
361 use crate::prelude::*;
362
363 let mut flow = FlowBuilder::new();
364 let process = flow.process::<()>();
365 let cluster = flow.cluster::<()>();
366
367 let numbers = process.source_iter(q!(vec![1i32, 2, 3]));
368 numbers
369 .broadcast_closed(&cluster, TCP.fail_stop().bincode())
370 .ir_node_named("broadcast_sink")
371 .for_each(q!(|_: i32| {}));
372
373 let built = flow.finalize();
374 let labels = built.analyze_consistency();
375
376 let broadcast_sink = labels.iter().find(|s| s.name == "broadcast_sink");
377 assert!(
378 broadcast_sink.is_some(),
379 "broadcast_sink not found; available: {:?}",
380 labels.iter().map(|s| &s.name).collect::<Vec<_>>()
381 );
382 assert_eq!(
385 broadcast_sink.unwrap().label,
386 ConsistencyLabel::SeqConsistent,
387 "broadcast_closed on FailStop TCP with TotalOrder source should produce SEQ_CONSISTENT"
388 );
389 }
390
391 #[test]
393 #[cfg(feature = "build")]
394 fn test_plain_cluster_has_no_guarantee() {
395 use stageleft::q;
396
397 use crate::prelude::*;
398
399 let mut flow = FlowBuilder::new();
400 let cluster = flow.cluster::<()>();
401
402 cluster
404 .source_iter(q!(vec![1i32, 2, 3]))
405 .ir_node_named("plain_cluster_sink")
406 .for_each(q!(|_: i32| {}));
407
408 let built = flow.finalize();
409 let labels = built.analyze_consistency();
410
411 let sink = labels
412 .iter()
413 .find(|s| s.name == "plain_cluster_sink")
414 .unwrap();
415 assert_eq!(sink.label, ConsistencyLabel::SeqConsistent);
417 }
418
419 #[test]
421 #[cfg(feature = "build")]
422 fn test_process_sink_has_no_guarantee() {
423 use stageleft::q;
424
425 use crate::prelude::*;
426
427 let mut flow = FlowBuilder::new();
428 let process = flow.process::<()>();
429
430 process
431 .source_iter(q!(vec![1i32, 2, 3]))
432 .ir_node_named("local_sink")
433 .for_each(q!(|_: i32| {}));
434
435 let built = flow.finalize();
436 let labels = built.analyze_consistency();
437
438 let sink = labels.iter().find(|s| s.name == "local_sink").unwrap();
439 assert_eq!(sink.label, ConsistencyLabel::SeqConsistent);
441 }
442
443 #[test]
449 #[cfg(feature = "sim")]
450 fn sim_validates_seq_consistent() {
451 use stageleft::q;
452
453 use crate::prelude::*;
454
455 let mut flow = FlowBuilder::new();
456 let process = flow.process::<()>();
457
458 let source = process.source_stream(q!(tokio_stream::iter(vec![1i32, 2, 3])));
459 let mapped = source.map(q!(|x| x * 10));
460 mapped
461 .clone()
462 .ir_node_named("seq_sink")
463 .for_each(q!(|_: i32| {}));
464 let out = mapped.sim_output();
465
466 let built = flow.finalize();
467 let labels = analyze_sink_labels(built.ir());
468 let sink = labels.iter().find(|s| s.name == "seq_sink").unwrap();
469 assert_eq!(sink.label, ConsistencyLabel::SeqConsistent);
470
471 built.sim().exhaustive(async || {
473 let results: Vec<i32> = out.collect().await;
474 assert_eq!(results, vec![10, 20, 30]);
475 });
476 }
477
478 #[test]
481 #[cfg(feature = "sim")]
482 fn sim_validates_inconsistent() {
483 use std::collections::HashSet;
484
485 use stageleft::q;
486
487 use crate::live_collections::stream::NoOrder;
488 use crate::prelude::*;
489 use crate::properties::manual_proof;
490
491 let mut flow = FlowBuilder::new();
492 let process = flow.process::<()>();
493
494 let (in_send, input) = process.sim_input::<String, NoOrder, _>();
495
496 let folded = input.fold(
498 q!(|| String::new()),
499 q!(
500 |acc, v| acc.push_str(&v),
501 commutative = manual_proof!()
502 ),
503 );
504 let out = crate::live_collections::sliced::sliced! {
505 let snapshot = use(folded, crate::nondet::nondet!());
506 snapshot.into_stream()
507 }
508 .sim_output();
509
510 let mut final_values = HashSet::new();
514 flow.sim().exhaustive(async || {
515 in_send.send_many_unordered(["a".to_owned(), "b".to_owned()]);
516 let all: Vec<String> = out.collect().await;
517 final_values.insert(all.first().unwrap().clone());
518 });
519
520 assert!(
522 final_values.len() > 1,
523 "Expected multiple distinct results proving inconsistency, got: {:?}",
524 final_values
525 );
526 }
527
528 #[test]
531 #[cfg(feature = "sim")]
532 fn sim_validates_convergent_ci_fold() {
533 use stageleft::q;
534
535 use crate::live_collections::stream::NoOrder;
536 use crate::prelude::*;
537 use crate::properties::manual_proof;
538
539 let mut flow = FlowBuilder::new();
540 let process = flow.process::<()>();
541
542 let (in_send, input) = process.sim_input::<i32, NoOrder, _>();
543
544 let folded = input.fold(
546 q!(|| i32::MIN),
547 q!(
548 |acc, v| {
549 if v > *acc {
550 *acc = v;
551 }
552 },
553 commutative = manual_proof!(),
554 idempotent = manual_proof!()
555 ),
556 );
557 let out = crate::live_collections::sliced::sliced! {
558 let snapshot = use(folded, crate::nondet::nondet!());
559 snapshot.into_stream()
560 }
561 .sim_output();
562
563 let built = flow.finalize();
566 let labels = analyze_sink_labels(built.ir());
567 assert!(
569 !labels
570 .iter()
571 .any(|s| s.label == ConsistencyLabel::Inconsistent),
572 "c+i fold should absorb nondet, got: {:?}",
573 labels
574 );
575
576 built.sim().exhaustive(async || {
578 in_send.send_many_unordered([3, 1, 4, 1, 5]);
579 let all: Vec<i32> = out.collect().await;
580 assert_eq!(*all.last().unwrap(), 5, "max of [3,1,4,1,5] should be 5");
581 });
582 }
583}