Skip to main content

hydro_lang/compile/
consistency_label.rs

1use super::ir::{CollectionKind, HydroRoot, StreamOrder, StreamRetry};
2use crate::location::dynamic::ClusterConsistency;
3
4/// Consistency label derived from Hydro's type system.
5///
6/// These correspond to the labels produced by the external `coord-analysis` tool.
7#[derive(Clone, Debug, PartialEq, Eq)]
8pub enum ConsistencyLabel {
9    /// TotalOrder + ExactlyOnce on a non-keyed stream.
10    SeqConsistent,
11    /// TotalOrder + ExactlyOnce on a KeyedStream (per-key ordering, cross-key interleaving).
12    PerKeySeqConsistent,
13    /// NoOrder + ExactlyOnce (elements converge as a multiset).
14    ConvergentMultiset,
15    /// NoOrder + AtLeastOnce, or TotalOrder + AtLeastOnce (elements converge as a set).
16    ConvergentSet,
17    /// Singleton/KeyedSingleton (lattice convergence).
18    ConvergentLattice,
19    /// Untrusted nondeterminism upstream of a sensitive operator.
20    Inconsistent,
21    /// No consistency guarantee from the type system.
22    NoGuarantee,
23}
24
25impl ConsistencyLabel {
26    /// The string form matching coord-analysis output.
27    pub fn as_str(&self) -> &'static str {
28        match self {
29            Self::SeqConsistent => "SEQ_CONSISTENT",
30            Self::PerKeySeqConsistent => "PER_KEY_SEQ_CONSISTENT",
31            Self::ConvergentMultiset => "CONVERGENT_MULTISET",
32            Self::ConvergentSet => "CONVERGENT_SET",
33            Self::ConvergentLattice => "CONVERGENT_LATTICE",
34            Self::Inconsistent => "INCONSISTENT",
35            Self::NoGuarantee => "NO_GUARANTEE",
36        }
37    }
38}
39
40impl std::fmt::Display for ConsistencyLabel {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.write_str(self.as_str())
43    }
44}
45
46/// A sink's name and its type-derived consistency label.
47#[derive(Clone, Debug)]
48pub struct SinkConsistency {
49    /// The sink name (from the `tag` field, or the root's print representation).
50    pub name: String,
51    /// The consistency label derived from the type system.
52    pub label: ConsistencyLabel,
53    /// Source spans of untrusted nondet nodes that cause INCONSISTENT (if any).
54    pub blame: Vec<String>,
55}
56
57/// Derive the consistency label from a `ClusterConsistency` and `CollectionKind`.
58pub fn derive_label(
59    consistency: Option<&ClusterConsistency>,
60    collection_kind: &CollectionKind,
61) -> ConsistencyLabel {
62    match consistency {
63        Some(ClusterConsistency::EventualConsistency) => match collection_kind {
64            CollectionKind::Stream { order, retry, .. } => stream_label(order, retry, false),
65            CollectionKind::KeyedStream {
66                value_order,
67                value_retry,
68                ..
69            } => stream_label(value_order, value_retry, true),
70            CollectionKind::Singleton { .. } | CollectionKind::KeyedSingleton { .. } => {
71                ConsistencyLabel::ConvergentLattice
72            }
73            CollectionKind::Optional { .. } => ConsistencyLabel::ConvergentSet,
74        },
75        Some(ClusterConsistency::NoConsistency) | None => ConsistencyLabel::NoGuarantee,
76    }
77}
78
79fn stream_label(order: &StreamOrder, retry: &StreamRetry, keyed: bool) -> ConsistencyLabel {
80    match (order, retry) {
81        (StreamOrder::TotalOrder, StreamRetry::ExactlyOnce) => {
82            if keyed {
83                ConsistencyLabel::PerKeySeqConsistent
84            } else {
85                ConsistencyLabel::SeqConsistent
86            }
87        }
88        (StreamOrder::TotalOrder, StreamRetry::AtLeastOnce) => ConsistencyLabel::ConvergentSet,
89        (StreamOrder::NoOrder, StreamRetry::ExactlyOnce) => ConsistencyLabel::ConvergentMultiset,
90        (StreamOrder::NoOrder, StreamRetry::AtLeastOnce) => ConsistencyLabel::ConvergentSet,
91    }
92}
93
94/// Derive the "best possible" consistency label from a sink's collection kind alone,
95/// matching coord-analysis's `sink_type_level`.
96pub fn sink_type_label(collection_kind: &CollectionKind) -> ConsistencyLabel {
97    match collection_kind {
98        CollectionKind::Stream { order, retry, .. } => stream_label(order, retry, false),
99        CollectionKind::KeyedStream {
100            value_order,
101            value_retry,
102            ..
103        } => stream_label(value_order, value_retry, true),
104        CollectionKind::Singleton { .. } | CollectionKind::KeyedSingleton { .. } => {
105            ConsistencyLabel::ConvergentLattice
106        }
107        CollectionKind::Optional { .. } => ConsistencyLabel::ConvergentSet,
108    }
109}
110
111/// Analyze all observable sinks in the IR, producing consistency labels
112/// equivalent to coord-analysis.
113///
114/// For each sink:
115/// 1. Derives the "best possible" label from the sink's output collection kind
116/// 2. Walks backward through the IR checking for untrusted `ObserveNonDet` nodes
117///    that aren't resolved by a commutative+idempotent fold downstream —
118///    if unresolved, downgrades to `Inconsistent`
119pub fn analyze_sink_labels(ir: &[HydroRoot]) -> Vec<SinkConsistency> {
120    ir.iter()
121        .filter(|root| !matches!(root, HydroRoot::CycleSink { .. }))
122        .map(|root| {
123            let meta = root.input_metadata();
124            let name = meta
125                .tag
126                .clone()
127                .or_else(|| find_tag(root.input()))
128                .unwrap_or_else(|| root.print_root());
129
130            let type_label = sink_type_label(&meta.collection_kind);
131
132            let mut blame = Vec::new();
133            let mut visited = std::collections::HashSet::new();
134            collect_unresolved_nondet(root.input(), &mut blame, &mut visited);
135            let mut visited = std::collections::HashSet::new();
136            collect_unresolved_nondet(root.input(), &mut blame, &mut visited);
137            let label = if blame.is_empty() {
138                type_label
139            } else {
140                ConsistencyLabel::Inconsistent
141            };
142
143            SinkConsistency { name, label, blame }
144        })
145        .collect()
146}
147
148/// Collect spans of unresolved untrusted `ObserveNonDet` nodes.
149fn collect_unresolved_nondet(
150    node: &super::ir::HydroNode,
151    blame: &mut Vec<String>,
152    visited: &mut std::collections::HashSet<usize>,
153) {
154    use std::rc::Rc;
155
156    use super::ir::HydroNode;
157    match node {
158        // A fold/reduce whose input is NoOrder+AtLeastOnce is commutative+idempotent
159        // (the type system enforced this at compile time). Such a fold absorbs upstream nondet.
160        HydroNode::Fold { input, .. }
161        | HydroNode::FoldKeyed { input, .. }
162        | HydroNode::Reduce { input, .. }
163        | HydroNode::ReduceKeyed { input, .. }
164        | HydroNode::ReduceKeyedWatermark { input, .. }
165            if is_ci_from_input(input) => {}
166        // Untrusted nondet that hasn't been resolved
167        HydroNode::ObserveNonDet {
168            trusted: false,
169            metadata,
170            ..
171        } => {
172            if let Some(span) = metadata.op.backtrace.format_span() {
173                blame.push(span);
174            } else {
175                blame.push("<unknown location>".to_owned());
176            }
177        }
178        // Tee/Singleton/Partition: follow shared inner, skip if visited
179        HydroNode::Tee { inner, .. }
180        | HydroNode::Singleton { inner, .. }
181        | HydroNode::Partition { inner, .. } => {
182            let ptr = Rc::as_ptr(&inner.0) as usize;
183            if visited.insert(ptr) {
184                collect_unresolved_nondet(&inner.0.borrow(), blame, visited);
185            }
186        }
187        // Recurse into children
188        _ => {
189            for child in node.input() {
190                collect_unresolved_nondet(child, blame, visited);
191            }
192        }
193    }
194}
195
196/// Infer whether a fold/reduce is commutative+idempotent from its input's collection_kind.
197/// If input is NoOrder → commutative; if input is AtLeastOnce → idempotent.
198/// Both together → c+i, which absorbs upstream nondeterminism.
199fn is_ci_from_input(input: &super::ir::HydroNode) -> bool {
200    use super::ir::{CollectionKind, StreamOrder, StreamRetry};
201    let ck = &input.metadata().collection_kind;
202    let is_no_order = matches!(
203        ck,
204        CollectionKind::Stream {
205            order: StreamOrder::NoOrder,
206            ..
207        } | CollectionKind::KeyedStream {
208            value_order: StreamOrder::NoOrder,
209            ..
210        }
211    );
212    let is_at_least_once = matches!(
213        ck,
214        CollectionKind::Stream {
215            retry: StreamRetry::AtLeastOnce,
216            ..
217        } | CollectionKind::KeyedStream {
218            value_retry: StreamRetry::AtLeastOnce,
219            ..
220        }
221    );
222    is_no_order && is_at_least_once
223}
224
225/// Walk backward to find the first tagged node.
226fn find_tag(node: &super::ir::HydroNode) -> Option<String> {
227    use super::ir::HydroNode;
228    if let Some(ref tag) = node.metadata().tag {
229        return Some(tag.clone());
230    }
231    match node {
232        HydroNode::Tee { inner, .. }
233        | HydroNode::Singleton { inner, .. }
234        | HydroNode::Partition { inner, .. } => find_tag(&inner.0.borrow()),
235        _ => node.input().iter().find_map(|child| find_tag(child)),
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    #[test]
244    fn test_derive_label_eventual_consistency() {
245        use super::super::ir::{BoundKind, StreamOrder, StreamRetry};
246
247        let ec = Some(ClusterConsistency::EventualConsistency);
248
249        // Stream + TotalOrder + ExactlyOnce → SEQ_CONSISTENT
250        assert_eq!(
251            derive_label(
252                ec.as_ref(),
253                &CollectionKind::Stream {
254                    bound: BoundKind::Unbounded,
255                    order: StreamOrder::TotalOrder,
256                    retry: StreamRetry::ExactlyOnce,
257                    element_type: super::super::ir::DebugType(Box::new(
258                        syn::parse_str("i32").unwrap()
259                    )),
260                }
261            ),
262            ConsistencyLabel::SeqConsistent
263        );
264
265        // Stream + NoOrder + ExactlyOnce → CONVERGENT_MULTISET
266        assert_eq!(
267            derive_label(
268                ec.as_ref(),
269                &CollectionKind::Stream {
270                    bound: BoundKind::Unbounded,
271                    order: StreamOrder::NoOrder,
272                    retry: StreamRetry::ExactlyOnce,
273                    element_type: super::super::ir::DebugType(Box::new(
274                        syn::parse_str("i32").unwrap()
275                    )),
276                }
277            ),
278            ConsistencyLabel::ConvergentMultiset
279        );
280
281        // Stream + NoOrder + AtLeastOnce → CONVERGENT_SET
282        assert_eq!(
283            derive_label(
284                ec.as_ref(),
285                &CollectionKind::Stream {
286                    bound: BoundKind::Unbounded,
287                    order: StreamOrder::NoOrder,
288                    retry: StreamRetry::AtLeastOnce,
289                    element_type: super::super::ir::DebugType(Box::new(
290                        syn::parse_str("i32").unwrap()
291                    )),
292                }
293            ),
294            ConsistencyLabel::ConvergentSet
295        );
296
297        // KeyedStream + TotalOrder + ExactlyOnce → PER_KEY_SEQ_CONSISTENT
298        assert_eq!(
299            derive_label(
300                ec.as_ref(),
301                &CollectionKind::KeyedStream {
302                    bound: BoundKind::Unbounded,
303                    value_order: StreamOrder::TotalOrder,
304                    value_retry: StreamRetry::ExactlyOnce,
305                    key_type: super::super::ir::DebugType(Box::new(
306                        syn::parse_str("String").unwrap()
307                    )),
308                    value_type: super::super::ir::DebugType(Box::new(
309                        syn::parse_str("i32").unwrap()
310                    )),
311                }
312            ),
313            ConsistencyLabel::PerKeySeqConsistent
314        );
315    }
316
317    #[test]
318    fn test_derive_label_no_consistency() {
319        use super::super::ir::{BoundKind, StreamOrder, StreamRetry};
320
321        // NoConsistency → NO_GUARANTEE regardless of collection kind
322        assert_eq!(
323            derive_label(
324                Some(&ClusterConsistency::NoConsistency),
325                &CollectionKind::Stream {
326                    bound: BoundKind::Unbounded,
327                    order: StreamOrder::TotalOrder,
328                    retry: StreamRetry::ExactlyOnce,
329                    element_type: super::super::ir::DebugType(Box::new(
330                        syn::parse_str("i32").unwrap()
331                    )),
332                }
333            ),
334            ConsistencyLabel::NoGuarantee
335        );
336
337        // None → NO_GUARANTEE
338        assert_eq!(
339            derive_label(
340                None,
341                &CollectionKind::Stream {
342                    bound: BoundKind::Unbounded,
343                    order: StreamOrder::TotalOrder,
344                    retry: StreamRetry::ExactlyOnce,
345                    element_type: super::super::ir::DebugType(Box::new(
346                        syn::parse_str("i32").unwrap()
347                    )),
348                }
349            ),
350            ConsistencyLabel::NoGuarantee
351        );
352    }
353
354    /// Integration test: builds a flow with `broadcast_closed` and verifies
355    /// the sink on the receiving cluster has `EventualConsistency`.
356    #[test]
357    #[cfg(feature = "build")]
358    fn test_broadcast_closed_produces_eventual_consistency_label() {
359        use stageleft::q;
360
361        use crate::prelude::*;
362
363        let mut flow = FlowBuilder::new();
364        let process = flow.process::<()>();
365        let cluster = flow.cluster::<()>();
366
367        let numbers = process.source_iter(q!(vec![1i32, 2, 3]));
368        numbers
369            .broadcast_closed(&cluster, TCP.fail_stop().bincode())
370            .ir_node_named("broadcast_sink")
371            .for_each(q!(|_: i32| {}));
372
373        let built = flow.finalize();
374        let labels = built.analyze_consistency();
375
376        let broadcast_sink = labels.iter().find(|s| s.name == "broadcast_sink");
377        assert!(
378            broadcast_sink.is_some(),
379            "broadcast_sink not found; available: {:?}",
380            labels.iter().map(|s| &s.name).collect::<Vec<_>>()
381        );
382        // broadcast_closed with FailStop TCP preserves TotalOrder from source_iter,
383        // so: EventualConsistency + TotalOrder + ExactlyOnce = SEQ_CONSISTENT
384        assert_eq!(
385            broadcast_sink.unwrap().label,
386            ConsistencyLabel::SeqConsistent,
387            "broadcast_closed on FailStop TCP with TotalOrder source should produce SEQ_CONSISTENT"
388        );
389    }
390
391    /// Test: cluster without EventualConsistency has NoGuarantee.
392    #[test]
393    #[cfg(feature = "build")]
394    fn test_plain_cluster_has_no_guarantee() {
395        use stageleft::q;
396
397        use crate::prelude::*;
398
399        let mut flow = FlowBuilder::new();
400        let cluster = flow.cluster::<()>();
401
402        // Source directly on cluster (NoConsistency)
403        cluster
404            .source_iter(q!(vec![1i32, 2, 3]))
405            .ir_node_named("plain_cluster_sink")
406            .for_each(q!(|_: i32| {}));
407
408        let built = flow.finalize();
409        let labels = built.analyze_consistency();
410
411        let sink = labels
412            .iter()
413            .find(|s| s.name == "plain_cluster_sink")
414            .unwrap();
415        // Cluster with TotalOrder+ExactlyOnce → SeqConsistent (no untrusted nondet)
416        assert_eq!(sink.label, ConsistencyLabel::SeqConsistent);
417    }
418
419    /// Test: a Process-local sink has no cluster consistency (NoGuarantee).
420    #[test]
421    #[cfg(feature = "build")]
422    fn test_process_sink_has_no_guarantee() {
423        use stageleft::q;
424
425        use crate::prelude::*;
426
427        let mut flow = FlowBuilder::new();
428        let process = flow.process::<()>();
429
430        process
431            .source_iter(q!(vec![1i32, 2, 3]))
432            .ir_node_named("local_sink")
433            .for_each(q!(|_: i32| {}));
434
435        let built = flow.finalize();
436        let labels = built.analyze_consistency();
437
438        let sink = labels.iter().find(|s| s.name == "local_sink").unwrap();
439        // Process with TotalOrder+ExactlyOnce → SeqConsistent
440        assert_eq!(sink.label, ConsistencyLabel::SeqConsistent);
441    }
442
443    // ─── Simulator-validated consistency tests ────────────────────────────────
444
445    /// SEQ_CONSISTENT: a deterministic pipeline always produces the same prefix.
446    /// The simulator confirms this by exhaustively checking all schedules produce
447    /// identical output.
448    #[test]
449    #[cfg(feature = "sim")]
450    fn sim_validates_seq_consistent() {
451        use stageleft::q;
452
453        use crate::prelude::*;
454
455        let mut flow = FlowBuilder::new();
456        let process = flow.process::<()>();
457
458        let source = process.source_stream(q!(tokio_stream::iter(vec![1i32, 2, 3])));
459        let mapped = source.map(q!(|x| x * 10));
460        mapped
461            .clone()
462            .ir_node_named("seq_sink")
463            .for_each(q!(|_: i32| {}));
464        let out = mapped.sim_output();
465
466        let built = flow.finalize();
467        let labels = analyze_sink_labels(built.ir());
468        let sink = labels.iter().find(|s| s.name == "seq_sink").unwrap();
469        assert_eq!(sink.label, ConsistencyLabel::SeqConsistent);
470
471        // Simulator confirms: all executions produce the same sequence
472        built.sim().exhaustive(async || {
473            let results: Vec<i32> = out.collect().await;
474            assert_eq!(results, vec![10, 20, 30]);
475        });
476    }
477
478    /// False commutativity: fold claims commutative but isn't (string concat).
479    /// Simulator catches this by exploring all input orderings and observing divergence.
480    #[test]
481    #[cfg(feature = "sim")]
482    fn sim_validates_inconsistent() {
483        use std::collections::HashSet;
484
485        use stageleft::q;
486
487        use crate::live_collections::stream::NoOrder;
488        use crate::prelude::*;
489        use crate::properties::manual_proof;
490
491        let mut flow = FlowBuilder::new();
492        let process = flow.process::<()>();
493
494        let (in_send, input) = process.sim_input::<String, NoOrder, _>();
495
496        // Non-commutative fold with manual (false) commutativity proof
497        let folded = input.fold(
498            q!(|| String::new()),
499            q!(
500                |acc, v| acc.push_str(&v),
501                commutative = manual_proof!(/** WRONG — string concat is not commutative */)
502            ),
503        );
504        let out = crate::live_collections::sliced::sliced! {
505            let snapshot = use(folded, crate::nondet::nondet!(/** test */));
506            snapshot.into_stream()
507        }
508        .sim_output();
509
510        // The static analysis trusts the manual_proof annotations, so it will NOT
511        // report INCONSISTENT here. This test validates that the simulator catches
512        // the false commutativity claim at runtime by exploring all orderings.
513        let mut final_values = HashSet::new();
514        flow.sim().exhaustive(async || {
515            in_send.send_many_unordered(["a".to_owned(), "b".to_owned()]);
516            let all: Vec<String> = out.collect().await;
517            final_values.insert(all.first().unwrap().clone());
518        });
519
520        // Sim proves inconsistency: different executions produce different results
521        assert!(
522            final_values.len() > 1,
523            "Expected multiple distinct results proving inconsistency, got: {:?}",
524            final_values
525        );
526    }
527
528    /// CONVERGENT: a commutative+idempotent fold always converges to the same value.
529    /// Simulator confirms all executions produce the same final result.
530    #[test]
531    #[cfg(feature = "sim")]
532    fn sim_validates_convergent_ci_fold() {
533        use stageleft::q;
534
535        use crate::live_collections::stream::NoOrder;
536        use crate::prelude::*;
537        use crate::properties::manual_proof;
538
539        let mut flow = FlowBuilder::new();
540        let process = flow.process::<()>();
541
542        let (in_send, input) = process.sim_input::<i32, NoOrder, _>();
543
544        // Commutative + idempotent fold (max)
545        let folded = input.fold(
546            q!(|| i32::MIN),
547            q!(
548                |acc, v| {
549                    if v > *acc {
550                        *acc = v;
551                    }
552                },
553                commutative = manual_proof!(/** max is commutative */),
554                idempotent = manual_proof!(/** max is idempotent */)
555            ),
556        );
557        let out = crate::live_collections::sliced::sliced! {
558            let snapshot = use(folded, crate::nondet::nondet!(/** test */));
559            snapshot.into_stream()
560        }
561        .sim_output();
562
563        // Static analysis: c+i fold absorbs any upstream nondet.
564        // The sink is Stream(TotalOrder, ExactlyOnce) after into_stream(), so label is SeqConsistent.
565        let built = flow.finalize();
566        let labels = analyze_sink_labels(built.ir());
567        // No INCONSISTENT labels — the c+i fold resolved any nondet
568        assert!(
569            !labels
570                .iter()
571                .any(|s| s.label == ConsistencyLabel::Inconsistent),
572            "c+i fold should absorb nondet, got: {:?}",
573            labels
574        );
575
576        // Simulator confirms: all executions converge to the same final value
577        built.sim().exhaustive(async || {
578            in_send.send_many_unordered([3, 1, 4, 1, 5]);
579            let all: Vec<i32> = out.collect().await;
580            assert_eq!(*all.last().unwrap(), 5, "max of [3,1,4,1,5] should be 5");
581        });
582    }
583}