Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick};
23use crate::location::{Location, Tick, TopLevel, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25use crate::prelude::KeyedSingleton;
26
27/// A *nullable* Rust value that can asynchronously change over time.
28///
29/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
30/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
31/// asynchronously change over time, including becoming present of uninhabited.
32///
33/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
34/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
35///
36/// Type Parameters:
37/// - `Type`: the type of the value in this optional (when it is not null)
38/// - `Loc`: the [`Location`] where the optional is materialized
39/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
40pub struct Optional<Type, Loc, Bound: Boundedness> {
41    pub(crate) location: Loc,
42    pub(crate) ir_node: RefCell<HydroNode>,
43    pub(crate) flow_state: FlowState,
44
45    _phantom: PhantomData<(Type, Loc, Bound)>,
46}
47
48impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
49    fn drop(&mut self) {
50        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
51        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
52            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
53                input: Box::new(ir_node),
54                op_metadata: HydroIrOpMetadata::new(),
55            });
56        }
57    }
58}
59
60impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
61where
62    T: Clone,
63    L: Location<'a>,
64{
65    fn from(value: Optional<T, L, Bounded>) -> Self {
66        let tick = value.location().tick();
67        value.clone_into_tick(&tick).latest()
68    }
69}
70
71impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
72where
73    L: Location<'a>,
74{
75    fn defer_tick(self) -> Self {
76        Optional::defer_tick(self)
77    }
78}
79
80impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
81where
82    L: Location<'a>,
83{
84    type Location = Tick<L>;
85
86    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
87        Optional::new(
88            location.clone(),
89            HydroNode::CycleSource {
90                cycle_id,
91                metadata: location.new_node_metadata(Self::collection_kind()),
92            },
93        )
94    }
95}
96
97impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
98where
99    L: Location<'a>,
100{
101    type Location = Tick<L>;
102
103    fn location(&self) -> &Self::Location {
104        self.location()
105    }
106
107    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
108        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
109            location.clone(),
110            HydroNode::DeferTick {
111                input: Box::new(HydroNode::CycleSource {
112                    cycle_id,
113                    metadata: location.new_node_metadata(Self::collection_kind()),
114                }),
115                metadata: location
116                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
117            },
118        );
119
120        from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
121    }
122}
123
124impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
125where
126    L: Location<'a>,
127{
128    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
129        assert_eq!(
130            Location::id(&self.location),
131            expected_location,
132            "locations do not match"
133        );
134        self.location
135            .flow_state()
136            .borrow_mut()
137            .push_root(HydroRoot::CycleSink {
138                cycle_id,
139                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
140                op_metadata: HydroIrOpMetadata::new(),
141            });
142    }
143}
144
145impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
146where
147    L: Location<'a>,
148{
149    type Location = L;
150
151    fn create_source(cycle_id: CycleId, location: L) -> Self {
152        Optional::new(
153            location.clone(),
154            HydroNode::CycleSource {
155                cycle_id,
156                metadata: location.new_node_metadata(Self::collection_kind()),
157            },
158        )
159    }
160}
161
162impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
163where
164    L: Location<'a>,
165{
166    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
167        assert_eq!(
168            Location::id(&self.location),
169            expected_location,
170            "locations do not match"
171        );
172        self.location
173            .flow_state()
174            .borrow_mut()
175            .push_root(HydroRoot::CycleSink {
176                cycle_id,
177                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
178                op_metadata: HydroIrOpMetadata::new(),
179            });
180    }
181}
182
183impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
184where
185    L: Location<'a>,
186{
187    fn from(singleton: Singleton<T, L, B>) -> Self {
188        Optional::new(
189            singleton.location.clone(),
190            HydroNode::Cast {
191                inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
192                metadata: singleton
193                    .location
194                    .new_node_metadata(Self::collection_kind()),
195            },
196        )
197    }
198}
199
200#[cfg(stageleft_runtime)]
201pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
202    me: Optional<T, L, B>,
203    other: Optional<O, L, B>,
204) -> Optional<(T, O), L, B> {
205    check_matching_location(&me.location, &other.location);
206
207    Optional::new(
208        me.location.clone(),
209        HydroNode::CrossSingleton {
210            left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
211            right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
212            metadata: me
213                .location
214                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
215        },
216    )
217}
218
219#[cfg(stageleft_runtime)]
220fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
221    me: Optional<T, L, B>,
222    other: Optional<T, L, B>,
223) -> Optional<T, L, B> {
224    check_matching_location(&me.location, &other.location);
225
226    Optional::new(
227        me.location.clone(),
228        HydroNode::ChainFirst {
229            first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
230            second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
231            metadata: me
232                .location
233                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
234        },
235    )
236}
237
238impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
239where
240    T: Clone,
241    L: Location<'a>,
242{
243    fn clone(&self) -> Self {
244        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
245            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
246            *self.ir_node.borrow_mut() = HydroNode::Tee {
247                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
248                metadata: self.location.new_node_metadata(Self::collection_kind()),
249            };
250        }
251
252        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
253            Optional {
254                location: self.location.clone(),
255                flow_state: self.flow_state.clone(),
256                ir_node: HydroNode::Tee {
257                    inner: SharedNode(inner.0.clone()),
258                    metadata: metadata.clone(),
259                }
260                .into(),
261                _phantom: PhantomData,
262            }
263        } else {
264            unreachable!()
265        }
266    }
267}
268
269impl<'a, T, L, B: Boundedness> Optional<T, L, B>
270where
271    L: Location<'a>,
272{
273    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276        let flow_state = location.flow_state().clone();
277        Optional {
278            location,
279            flow_state,
280            ir_node: RefCell::new(ir_node),
281            _phantom: PhantomData,
282        }
283    }
284
285    pub(crate) fn collection_kind() -> CollectionKind {
286        CollectionKind::Optional {
287            bound: B::BOUND_KIND,
288            element_type: stageleft::quote_type::<T>().into(),
289        }
290    }
291
292    /// Returns the [`Location`] where this optional is being materialized.
293    pub fn location(&self) -> &L {
294        &self.location
295    }
296
297    /// Weakens the consistency of this live collection to not guarantee any consistency across
298    /// cluster members (if this collection is on a cluster).
299    pub fn weaken_consistency(self) -> Optional<T, L::DropConsistency, B>
300    where
301        L: Location<'a>,
302    {
303        if L::consistency()
304            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
305        {
306            // already no consistency
307            Optional::new(
308                self.location.drop_consistency(),
309                self.ir_node.replace(HydroNode::Placeholder),
310            )
311        } else {
312            Optional::new(
313                self.location.drop_consistency(),
314                HydroNode::Cast {
315                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
316                    metadata: self
317                        .location
318                        .clone()
319                        .drop_consistency()
320                        .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
321                },
322            )
323        }
324    }
325
326    /// Casts this live collection to have the consistency guarantees specified in the given
327    /// location type parameter. The developer must ensure that the strengthened consistency
328    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
329    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
330        self,
331        _proof: impl crate::properties::ConsistencyProof,
332    ) -> Optional<T, L2, B>
333    where
334        L: Location<'a>,
335    {
336        if L::consistency() == L2::consistency() {
337            Optional::new(
338                self.location.with_consistency_of(),
339                self.ir_node.replace(HydroNode::Placeholder),
340            )
341        } else {
342            Optional::new(
343                self.location.with_consistency_of(),
344                HydroNode::AssertIsConsistent {
345                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
346                    trusted: false,
347                    metadata: self
348                        .location
349                        .clone()
350                        .with_consistency_of::<L2>()
351                        .new_node_metadata(Optional::<T, L2, B>::collection_kind()),
352                },
353            )
354        }
355    }
356
357    /// Transforms the optional value by applying a function `f` to it,
358    /// continuously as the input is updated.
359    ///
360    /// Whenever the optional is empty, the output optional is also empty.
361    ///
362    /// # Example
363    /// ```rust
364    /// # #[cfg(feature = "deploy")] {
365    /// # use hydro_lang::prelude::*;
366    /// # use futures::StreamExt;
367    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
368    /// let tick = process.tick();
369    /// let optional = tick.optional_first_tick(q!(1));
370    /// optional.map(q!(|v| v + 1)).all_ticks()
371    /// # }, |mut stream| async move {
372    /// // 2
373    /// # assert_eq!(stream.next().await.unwrap(), 2);
374    /// # }));
375    /// # }
376    /// ```
377    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
378    where
379        F: Fn(T) -> U + 'a,
380    {
381        let f = f.splice_fn1_ctx(&self.location).into();
382        Optional::new(
383            self.location.clone(),
384            HydroNode::Map {
385                f,
386                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
387                metadata: self
388                    .location
389                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
390            },
391        )
392    }
393
394    /// Transforms the optional value by applying a function `f` to it and then flattening
395    /// the result into a stream, preserving the order of elements.
396    ///
397    /// If the optional is empty, the output stream is also empty. If the optional contains
398    /// a value, `f` is applied to produce an iterator, and all items from that iterator
399    /// are emitted in the output stream in deterministic order.
400    ///
401    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
402    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
403    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
404    ///
405    /// # Example
406    /// ```rust
407    /// # #[cfg(feature = "deploy")] {
408    /// # use hydro_lang::prelude::*;
409    /// # use futures::StreamExt;
410    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
411    /// let tick = process.tick();
412    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
413    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
414    /// # }, |mut stream| async move {
415    /// // 1, 2, 3
416    /// # for w in vec![1, 2, 3] {
417    /// #     assert_eq!(stream.next().await.unwrap(), w);
418    /// # }
419    /// # }));
420    /// # }
421    /// ```
422    pub fn flat_map_ordered<U, I, F>(
423        self,
424        f: impl IntoQuotedMut<'a, F, L>,
425    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
426    where
427        B: IsBounded,
428        I: IntoIterator<Item = U>,
429        F: Fn(T) -> I + 'a,
430    {
431        self.into_stream().flat_map_ordered(f)
432    }
433
434    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
435    /// for the output type `I` to produce items in any order.
436    ///
437    /// If the optional is empty, the output stream is also empty. If the optional contains
438    /// a value, `f` is applied to produce an iterator, and all items from that iterator
439    /// are emitted in the output stream in non-deterministic order.
440    ///
441    /// # Example
442    /// ```rust
443    /// # #[cfg(feature = "deploy")] {
444    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
445    /// # use futures::StreamExt;
446    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
447    /// let tick = process.tick();
448    /// let optional = tick.optional_first_tick(q!(
449    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
450    /// ));
451    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
452    /// # }, |mut stream| async move {
453    /// // 1, 2, 3, but in no particular order
454    /// # let mut results = Vec::new();
455    /// # for _ in 0..3 {
456    /// #     results.push(stream.next().await.unwrap());
457    /// # }
458    /// # results.sort();
459    /// # assert_eq!(results, vec![1, 2, 3]);
460    /// # }));
461    /// # }
462    /// ```
463    pub fn flat_map_unordered<U, I, F>(
464        self,
465        f: impl IntoQuotedMut<'a, F, L>,
466    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
467    where
468        B: IsBounded,
469        I: IntoIterator<Item = U>,
470        F: Fn(T) -> I + 'a,
471    {
472        self.into_stream().flat_map_unordered(f)
473    }
474
475    /// Flattens the optional value into a stream, preserving the order of elements.
476    ///
477    /// If the optional is empty, the output stream is also empty. If the optional contains
478    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
479    /// in the output stream in deterministic order.
480    ///
481    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
482    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
483    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
484    ///
485    /// # Example
486    /// ```rust
487    /// # #[cfg(feature = "deploy")] {
488    /// # use hydro_lang::prelude::*;
489    /// # use futures::StreamExt;
490    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
491    /// let tick = process.tick();
492    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
493    /// optional.flatten_ordered().all_ticks()
494    /// # }, |mut stream| async move {
495    /// // 1, 2, 3
496    /// # for w in vec![1, 2, 3] {
497    /// #     assert_eq!(stream.next().await.unwrap(), w);
498    /// # }
499    /// # }));
500    /// # }
501    /// ```
502    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
503    where
504        B: IsBounded,
505        T: IntoIterator<Item = U>,
506    {
507        self.flat_map_ordered(q!(|v| v))
508    }
509
510    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
511    /// for the element type `T` to produce items in any order.
512    ///
513    /// If the optional is empty, the output stream is also empty. If the optional contains
514    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
515    /// in the output stream in non-deterministic order.
516    ///
517    /// # Example
518    /// ```rust
519    /// # #[cfg(feature = "deploy")] {
520    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
521    /// # use futures::StreamExt;
522    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
523    /// let tick = process.tick();
524    /// let optional = tick.optional_first_tick(q!(
525    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
526    /// ));
527    /// optional.flatten_unordered().all_ticks()
528    /// # }, |mut stream| async move {
529    /// // 1, 2, 3, but in no particular order
530    /// # let mut results = Vec::new();
531    /// # for _ in 0..3 {
532    /// #     results.push(stream.next().await.unwrap());
533    /// # }
534    /// # results.sort();
535    /// # assert_eq!(results, vec![1, 2, 3]);
536    /// # }));
537    /// # }
538    /// ```
539    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
540    where
541        B: IsBounded,
542        T: IntoIterator<Item = U>,
543    {
544        self.flat_map_unordered(q!(|v| v))
545    }
546
547    /// Creates an optional containing only the value if it satisfies a predicate `f`.
548    ///
549    /// If the optional is empty, the output optional is also empty. If the optional contains
550    /// a value and the predicate returns `true`, the output optional contains the same value.
551    /// If the predicate returns `false`, the output optional is empty.
552    ///
553    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
554    /// not modify or take ownership of the value. If you need to modify the value while filtering
555    /// use [`Optional::filter_map`] instead.
556    ///
557    /// # Example
558    /// ```rust
559    /// # #[cfg(feature = "deploy")] {
560    /// # use hydro_lang::prelude::*;
561    /// # use futures::StreamExt;
562    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
563    /// let tick = process.tick();
564    /// let optional = tick.optional_first_tick(q!(5));
565    /// optional.filter(q!(|&x| x > 3)).all_ticks()
566    /// # }, |mut stream| async move {
567    /// // 5
568    /// # assert_eq!(stream.next().await.unwrap(), 5);
569    /// # }));
570    /// # }
571    /// ```
572    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
573    where
574        F: Fn(&T) -> bool + 'a,
575    {
576        let f = f.splice_fn1_borrow_ctx(&self.location).into();
577        Optional::new(
578            self.location.clone(),
579            HydroNode::Filter {
580                f,
581                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
582                metadata: self.location.new_node_metadata(Self::collection_kind()),
583            },
584        )
585    }
586
587    /// An operator that both filters and maps. It yields only the value if the supplied
588    /// closure `f` returns `Some(value)`.
589    ///
590    /// If the optional is empty, the output optional is also empty. If the optional contains
591    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
592    /// If the closure returns `None`, the output optional is empty.
593    ///
594    /// # Example
595    /// ```rust
596    /// # #[cfg(feature = "deploy")] {
597    /// # use hydro_lang::prelude::*;
598    /// # use futures::StreamExt;
599    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
600    /// let tick = process.tick();
601    /// let optional = tick.optional_first_tick(q!("42"));
602    /// optional
603    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
604    ///     .all_ticks()
605    /// # }, |mut stream| async move {
606    /// // 42
607    /// # assert_eq!(stream.next().await.unwrap(), 42);
608    /// # }));
609    /// # }
610    /// ```
611    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
612    where
613        F: Fn(T) -> Option<U> + 'a,
614    {
615        let f = f.splice_fn1_ctx(&self.location).into();
616        Optional::new(
617            self.location.clone(),
618            HydroNode::FilterMap {
619                f,
620                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
621                metadata: self
622                    .location
623                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
624            },
625        )
626    }
627
628    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
629    ///
630    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
631    /// non-null. This is useful for combining several pieces of state together.
632    ///
633    /// # Example
634    /// ```rust
635    /// # #[cfg(feature = "deploy")] {
636    /// # use hydro_lang::prelude::*;
637    /// # use futures::StreamExt;
638    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
639    /// let tick = process.tick();
640    /// let numbers = process
641    ///   .source_iter(q!(vec![123, 456, 789]))
642    ///   .batch(&tick, nondet!(/** test */));
643    /// let min = numbers.clone().min(); // Optional
644    /// let max = numbers.max(); // Optional
645    /// min.zip(max).all_ticks()
646    /// # }, |mut stream| async move {
647    /// // [(123, 789)]
648    /// # for w in vec![(123, 789)] {
649    /// #     assert_eq!(stream.next().await.unwrap(), w);
650    /// # }
651    /// # }));
652    /// # }
653    /// ```
654    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
655    where
656        B: IsBounded,
657    {
658        let other: Optional<O, L, B> = other.into();
659        check_matching_location(&self.location, &other.location);
660
661        if L::is_top_level()
662            && let Some(tick) = self.location.try_tick()
663        {
664            let self_location = self.location().clone();
665            let out = zip_inside_tick(
666                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
667                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
668            )
669            .latest();
670
671            Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
672        } else {
673            zip_inside_tick(self, other)
674        }
675    }
676
677    /// Passes through `self` when it has a value, otherwise passes through `other`.
678    ///
679    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
680    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
681    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
682    ///
683    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
684    /// of the inputs change (including to/from null states).
685    ///
686    /// # Example
687    /// ```rust
688    /// # #[cfg(feature = "deploy")] {
689    /// # use hydro_lang::prelude::*;
690    /// # use futures::StreamExt;
691    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
692    /// let tick = process.tick();
693    /// // ticks are lazy by default, forces the second tick to run
694    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
695    ///
696    /// let some_first_tick = tick.optional_first_tick(q!(123));
697    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
698    /// some_first_tick.or(some_second_tick).all_ticks()
699    /// # }, |mut stream| async move {
700    /// // [123 /* first tick */, 456 /* second tick */]
701    /// # for w in vec![123, 456] {
702    /// #     assert_eq!(stream.next().await.unwrap(), w);
703    /// # }
704    /// # }));
705    /// # }
706    /// ```
707    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
708        check_matching_location(&self.location, &other.location);
709
710        if L::is_top_level()
711            && !B::BOUNDED // only if unbounded we need to use a tick
712            && let Some(tick) = self.location.try_tick()
713        {
714            let self_location = self.location().clone();
715            let out = or_inside_tick(
716                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
717                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
718            )
719            .latest();
720
721            Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
722        } else {
723            Optional::new(
724                self.location.clone(),
725                HydroNode::ChainFirst {
726                    first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
727                    second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
728                    metadata: self.location.new_node_metadata(Self::collection_kind()),
729                },
730            )
731        }
732    }
733
734    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
735    ///
736    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
737    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
738    ///
739    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
740    /// of the inputs change (including to/from null states).
741    ///
742    /// # Example
743    /// ```rust
744    /// # #[cfg(feature = "deploy")] {
745    /// # use hydro_lang::prelude::*;
746    /// # use futures::StreamExt;
747    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
748    /// let tick = process.tick();
749    /// // ticks are lazy by default, forces the later ticks to run
750    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
751    ///
752    /// let some_first_tick = tick.optional_first_tick(q!(123));
753    /// some_first_tick
754    ///     .unwrap_or(tick.singleton(q!(456)))
755    ///     .all_ticks()
756    /// # }, |mut stream| async move {
757    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
758    /// # for w in vec![123, 456, 456, 456] {
759    /// #     assert_eq!(stream.next().await.unwrap(), w);
760    /// # }
761    /// # }));
762    /// # }
763    /// ```
764    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
765        let res_option = self.or(other.into());
766        Singleton::new(
767            res_option.location.clone(),
768            HydroNode::Cast {
769                inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
770                metadata: res_option
771                    .location
772                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
773            },
774        )
775    }
776
777    /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
778    ///
779    /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
780    /// [`Optional`] when the default value of the type is a suitable fallback.
781    ///
782    /// # Example
783    /// ```rust
784    /// # #[cfg(feature = "deploy")] {
785    /// # use hydro_lang::prelude::*;
786    /// # use futures::StreamExt;
787    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
788    /// let tick = process.tick();
789    /// // ticks are lazy by default, forces the later ticks to run
790    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
791    ///
792    /// let some_first_tick = tick.optional_first_tick(q!(123i32));
793    /// some_first_tick.unwrap_or_default().all_ticks()
794    /// # }, |mut stream| async move {
795    /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
796    /// # for w in vec![123, 0, 0, 0] {
797    /// #     assert_eq!(stream.next().await.unwrap(), w);
798    /// # }
799    /// # }));
800    /// # }
801    /// ```
802    pub fn unwrap_or_default(self) -> Singleton<T, L, B>
803    where
804        T: Default + Clone,
805    {
806        self.into_singleton().map(q!(|v| v.unwrap_or_default()))
807    }
808
809    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
810    ///
811    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
812    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
813    /// so that Hydro can skip any computation on null values.
814    ///
815    /// # Example
816    /// ```rust
817    /// # #[cfg(feature = "deploy")] {
818    /// # use hydro_lang::prelude::*;
819    /// # use futures::StreamExt;
820    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
821    /// let tick = process.tick();
822    /// // ticks are lazy by default, forces the later ticks to run
823    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
824    ///
825    /// let some_first_tick = tick.optional_first_tick(q!(123));
826    /// some_first_tick.into_singleton().all_ticks()
827    /// # }, |mut stream| async move {
828    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
829    /// # for w in vec![Some(123), None, None, None] {
830    /// #     assert_eq!(stream.next().await.unwrap(), w);
831    /// # }
832    /// # }));
833    /// # }
834    /// ```
835    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
836    where
837        T: Clone,
838    {
839        let none: syn::Expr = parse_quote!(::std::option::Option::None);
840
841        let none_singleton = Singleton::new(
842            self.location.clone(),
843            HydroNode::SingletonSource {
844                value: none.into(),
845                first_tick_only: false,
846                metadata: self
847                    .location
848                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
849            },
850        );
851
852        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
853    }
854
855    /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
856    ///
857    /// # Example
858    /// ```rust
859    /// # #[cfg(feature = "deploy")] {
860    /// # use hydro_lang::prelude::*;
861    /// # use futures::StreamExt;
862    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
863    /// let tick = process.tick();
864    /// // ticks are lazy by default, forces the second tick to run
865    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
866    ///
867    /// let some_first_tick = tick.optional_first_tick(q!(42));
868    /// some_first_tick.is_some().all_ticks()
869    /// # }, |mut stream| async move {
870    /// // [true /* first tick */, false /* second tick */, ...]
871    /// # for w in vec![true, false] {
872    /// #     assert_eq!(stream.next().await.unwrap(), w);
873    /// # }
874    /// # }));
875    /// # }
876    /// ```
877    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
878    pub fn is_some(self) -> Singleton<bool, L, B> {
879        self.map(q!(|_| ()))
880            .into_singleton()
881            .map(q!(|o| o.is_some()))
882    }
883
884    /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
885    ///
886    /// # Example
887    /// ```rust
888    /// # #[cfg(feature = "deploy")] {
889    /// # use hydro_lang::prelude::*;
890    /// # use futures::StreamExt;
891    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
892    /// let tick = process.tick();
893    /// // ticks are lazy by default, forces the second tick to run
894    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
895    ///
896    /// let some_first_tick = tick.optional_first_tick(q!(42));
897    /// some_first_tick.is_none().all_ticks()
898    /// # }, |mut stream| async move {
899    /// // [false /* first tick */, true /* second tick */, ...]
900    /// # for w in vec![false, true] {
901    /// #     assert_eq!(stream.next().await.unwrap(), w);
902    /// # }
903    /// # }));
904    /// # }
905    /// ```
906    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
907    pub fn is_none(self) -> Singleton<bool, L, B> {
908        self.map(q!(|_| ()))
909            .into_singleton()
910            .map(q!(|o| o.is_none()))
911    }
912
913    /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
914    /// values are equal, `false` otherwise (including when either is null).
915    ///
916    /// # Example
917    /// ```rust
918    /// # #[cfg(feature = "deploy")] {
919    /// # use hydro_lang::prelude::*;
920    /// # use futures::StreamExt;
921    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
922    /// let tick = process.tick();
923    /// // ticks are lazy by default, forces the second tick to run
924    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
925    ///
926    /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
927    /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
928    /// a.is_some_and_equals(b).all_ticks()
929    /// # }, |mut stream| async move {
930    /// // [true, false]
931    /// # for w in vec![true, false] {
932    /// #     assert_eq!(stream.next().await.unwrap(), w);
933    /// # }
934    /// # }));
935    /// # }
936    /// ```
937    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
938    pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
939    where
940        T: PartialEq + Clone,
941        B: IsBounded,
942    {
943        self.into_singleton()
944            .zip(other.into_singleton())
945            .map(q!(|(a, b)| a.is_some() && a == b))
946    }
947
948    /// An operator which allows you to "name" a `HydroNode`.
949    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
950    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
951        {
952            let mut node = self.ir_node.borrow_mut();
953            let metadata = node.metadata_mut();
954            metadata.tag = Some(name.to_owned());
955        }
956        self
957    }
958
959    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
960    /// implies that `B == Bounded`.
961    pub fn make_bounded(self) -> Optional<T, L, Bounded>
962    where
963        B: IsBounded,
964    {
965        Optional::new(
966            self.location.clone(),
967            self.ir_node.replace(HydroNode::Placeholder),
968        )
969    }
970
971    /// Clones this bounded optional into a tick, returning a optional that has the
972    /// same value as the outer optional. Because the outer optional is bounded, this
973    /// is deterministic because there is only a single immutable version.
974    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
975    where
976        B: IsBounded,
977        T: Clone,
978    {
979        // TODO(shadaj): avoid printing simulator logs for this snapshot
980        let inner = self.snapshot(
981            tick,
982            nondet!(/** bounded top-level optional so deterministic */),
983        );
984        Optional::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
985    }
986
987    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
988    /// non-null. Otherwise, the stream is empty.
989    ///
990    /// # Example
991    /// ```rust
992    /// # #[cfg(feature = "deploy")] {
993    /// # use hydro_lang::prelude::*;
994    /// # use futures::StreamExt;
995    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
996    /// # let tick = process.tick();
997    /// # // ticks are lazy by default, forces the second tick to run
998    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
999    /// # let batch_first_tick = process
1000    /// #   .source_iter(q!(vec![]))
1001    /// #   .batch(&tick, nondet!(/** test */));
1002    /// # let batch_second_tick = process
1003    /// #   .source_iter(q!(vec![123, 456]))
1004    /// #   .batch(&tick, nondet!(/** test */))
1005    /// #   .defer_tick(); // appears on the second tick
1006    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1007    /// input_batch // first tick: [], second tick: [123, 456]
1008    ///     .clone()
1009    ///     .max()
1010    ///     .into_stream()
1011    ///     .chain(input_batch)
1012    ///     .all_ticks()
1013    /// # }, |mut stream| async move {
1014    /// // [456, 123, 456]
1015    /// # for w in vec![456, 123, 456] {
1016    /// #     assert_eq!(stream.next().await.unwrap(), w);
1017    /// # }
1018    /// # }));
1019    /// # }
1020    /// ```
1021    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1022    where
1023        B: IsBounded,
1024    {
1025        Stream::new(
1026            self.location.clone(),
1027            HydroNode::Cast {
1028                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1029                metadata: self.location.new_node_metadata(Stream::<
1030                    T,
1031                    Tick<L>,
1032                    Bounded,
1033                    TotalOrder,
1034                    ExactlyOnce,
1035                >::collection_kind()),
1036            },
1037        )
1038    }
1039
1040    /// Filters this optional, passing through the value if the boolean signal is `true`,
1041    /// otherwise the output is null.
1042    ///
1043    /// # Example
1044    /// ```rust
1045    /// # #[cfg(feature = "deploy")] {
1046    /// # use hydro_lang::prelude::*;
1047    /// # use futures::StreamExt;
1048    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1049    /// let tick = process.tick();
1050    /// // ticks are lazy by default, forces the second tick to run
1051    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1052    ///
1053    /// let some_first_tick = tick.optional_first_tick(q!(()));
1054    /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1055    /// let batch_first_tick = process
1056    ///   .source_iter(q!(vec![456]))
1057    ///   .batch(&tick, nondet!(/** test */));
1058    /// let batch_second_tick = process
1059    ///   .source_iter(q!(vec![789]))
1060    ///   .batch(&tick, nondet!(/** test */))
1061    ///   .defer_tick();
1062    /// batch_first_tick.chain(batch_second_tick).first()
1063    ///   .filter_if(signal)
1064    ///   .unwrap_or(tick.singleton(q!(0)))
1065    ///   .all_ticks()
1066    /// # }, |mut stream| async move {
1067    /// // [456, 0]
1068    /// # for w in vec![456, 0] {
1069    /// #     assert_eq!(stream.next().await.unwrap(), w);
1070    /// # }
1071    /// # }));
1072    /// # }
1073    /// ```
1074    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1075    where
1076        B: IsBounded,
1077    {
1078        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1079    }
1080
1081    /// Filters this optional, passing through the optional value if it is non-null **and** the
1082    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1083    ///
1084    /// Useful for conditionally processing, such as only emitting an optional's value outside
1085    /// a tick if some other condition is satisfied.
1086    ///
1087    /// # Example
1088    /// ```rust
1089    /// # #[cfg(feature = "deploy")] {
1090    /// # use hydro_lang::prelude::*;
1091    /// # use futures::StreamExt;
1092    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1093    /// let tick = process.tick();
1094    /// // ticks are lazy by default, forces the second tick to run
1095    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1096    ///
1097    /// let batch_first_tick = process
1098    ///   .source_iter(q!(vec![]))
1099    ///   .batch(&tick, nondet!(/** test */));
1100    /// let batch_second_tick = process
1101    ///   .source_iter(q!(vec![456]))
1102    ///   .batch(&tick, nondet!(/** test */))
1103    ///   .defer_tick(); // appears on the second tick
1104    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1105    /// batch_first_tick.chain(batch_second_tick).first()
1106    ///   .filter_if_some(some_on_first_tick)
1107    ///   .unwrap_or(tick.singleton(q!(789)))
1108    ///   .all_ticks()
1109    /// # }, |mut stream| async move {
1110    /// // [789, 789]
1111    /// # for w in vec![789, 789] {
1112    /// #     assert_eq!(stream.next().await.unwrap(), w);
1113    /// # }
1114    /// # }));
1115    /// # }
1116    /// ```
1117    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1118    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1119    where
1120        B: IsBounded,
1121    {
1122        self.filter_if(signal.is_some())
1123    }
1124
1125    /// Filters this optional, passing through the optional value if it is non-null **and** the
1126    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1127    ///
1128    /// Useful for conditionally processing, such as only emitting an optional's value outside
1129    /// a tick if some other condition is satisfied.
1130    ///
1131    /// # Example
1132    /// ```rust
1133    /// # #[cfg(feature = "deploy")] {
1134    /// # use hydro_lang::prelude::*;
1135    /// # use futures::StreamExt;
1136    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1137    /// let tick = process.tick();
1138    /// // ticks are lazy by default, forces the second tick to run
1139    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1140    ///
1141    /// let batch_first_tick = process
1142    ///   .source_iter(q!(vec![]))
1143    ///   .batch(&tick, nondet!(/** test */));
1144    /// let batch_second_tick = process
1145    ///   .source_iter(q!(vec![456]))
1146    ///   .batch(&tick, nondet!(/** test */))
1147    ///   .defer_tick(); // appears on the second tick
1148    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1149    /// batch_first_tick.chain(batch_second_tick).first()
1150    ///   .filter_if_none(some_on_first_tick)
1151    ///   .unwrap_or(tick.singleton(q!(789)))
1152    ///   .all_ticks()
1153    /// # }, |mut stream| async move {
1154    /// // [789, 789]
1155    /// # for w in vec![789, 456] {
1156    /// #     assert_eq!(stream.next().await.unwrap(), w);
1157    /// # }
1158    /// # }));
1159    /// # }
1160    /// ```
1161    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1162    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1163    where
1164        B: IsBounded,
1165    {
1166        self.filter_if(other.is_none())
1167    }
1168
1169    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1170    ///
1171    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1172    /// having a value, such as only releasing a piece of state if the node is the leader.
1173    ///
1174    /// # Example
1175    /// ```rust
1176    /// # #[cfg(feature = "deploy")] {
1177    /// # use hydro_lang::prelude::*;
1178    /// # use futures::StreamExt;
1179    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1180    /// let tick = process.tick();
1181    /// // ticks are lazy by default, forces the second tick to run
1182    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1183    ///
1184    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1185    /// some_on_first_tick
1186    ///     .if_some_then(tick.singleton(q!(456)))
1187    ///     .unwrap_or(tick.singleton(q!(123)))
1188    /// # .all_ticks()
1189    /// # }, |mut stream| async move {
1190    /// // 456 (first tick) ~> 123 (second tick onwards)
1191    /// # for w in vec![456, 123, 123] {
1192    /// #     assert_eq!(stream.next().await.unwrap(), w);
1193    /// # }
1194    /// # }));
1195    /// # }
1196    /// ```
1197    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1198    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1199    where
1200        B: IsBounded,
1201    {
1202        value.filter_if(self.is_some())
1203    }
1204}
1205
1206impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1207where
1208    L: Location<'a>,
1209{
1210    /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1211    /// key-value pair of this [`Optional`].
1212    ///
1213    /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1214    /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1215    /// the entry will be updated and appear / disappear according to the state of the
1216    /// [`Optional`].
1217    pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1218        KeyedSingleton::new(
1219            self.location.clone(),
1220            HydroNode::Cast {
1221                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1222                metadata: self
1223                    .location
1224                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1225            },
1226        )
1227    }
1228}
1229
1230impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1231where
1232    L: Location<'a>,
1233{
1234    /// Returns an optional value corresponding to the latest snapshot of the optional
1235    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1236    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1237    /// all snapshots of this optional into the atomic-associated tick will observe the
1238    /// same value each tick.
1239    ///
1240    /// # Non-Determinism
1241    /// Because this picks a snapshot of a optional whose value is continuously changing,
1242    /// the output optional has a non-deterministic value since the snapshot can be at an
1243    /// arbitrary point in time.
1244    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1245        self,
1246        tick: &Tick<L2>,
1247        _nondet: NonDet,
1248    ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1249        Optional::new(
1250            tick.drop_consistency(),
1251            HydroNode::Batch {
1252                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1253                metadata: tick
1254                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1255            },
1256        )
1257    }
1258
1259    /// Returns this optional back into a top-level, asynchronous execution context where updates
1260    /// to the value will be asynchronously propagated.
1261    pub fn end_atomic(self) -> Optional<T, L, B> {
1262        Optional::new(
1263            self.location.tick.l.clone(),
1264            HydroNode::EndAtomic {
1265                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1266                metadata: self
1267                    .location
1268                    .tick
1269                    .l
1270                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1271            },
1272        )
1273    }
1274}
1275
1276impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1277where
1278    L: Location<'a>,
1279{
1280    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1281    /// will observe the same version of the value and will be executed synchronously before any
1282    /// outputs are yielded (in [`Optional::end_atomic`]).
1283    ///
1284    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1285    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1286    /// a different version).
1287    pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1288        let id = self.location.flow_state().borrow_mut().next_clock_id();
1289        let out_location = Atomic {
1290            tick: Tick {
1291                id,
1292                l: self.location.clone(),
1293            },
1294        };
1295        Optional::new(
1296            out_location.clone(),
1297            HydroNode::BeginAtomic {
1298                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1299                metadata: out_location
1300                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1301            },
1302        )
1303    }
1304
1305    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1306    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1307    /// relevant data that contributed to the snapshot at tick `t`.
1308    ///
1309    /// # Non-Determinism
1310    /// Because this picks a snapshot of a optional whose value is continuously changing,
1311    /// the output optional has a non-deterministic value since the snapshot can be at an
1312    /// arbitrary point in time.
1313    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1314        self,
1315        tick: &Tick<L2>,
1316        _nondet: NonDet,
1317    ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1318        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1319        Optional::new(
1320            tick.drop_consistency(),
1321            HydroNode::Batch {
1322                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1323                metadata: tick
1324                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1325            },
1326        )
1327    }
1328
1329    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1330    /// with order corresponding to increasing prefixes of data contributing to the optional.
1331    ///
1332    /// # Non-Determinism
1333    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1334    /// to non-deterministic batching and arrival of inputs, the output stream is
1335    /// non-deterministic.
1336    pub fn sample_eager(
1337        self,
1338        nondet: NonDet,
1339    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1340        let tick = self.location.tick();
1341        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1342    }
1343
1344    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1345    /// value taken at various points in time. Because the input optional may be
1346    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1347    /// represent the value of the optional given some prefix of the streams leading up to
1348    /// it.
1349    ///
1350    /// # Non-Determinism
1351    /// The output stream is non-deterministic in which elements are sampled, since this
1352    /// is controlled by a clock.
1353    pub fn sample_every(
1354        self,
1355        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1356        nondet: NonDet,
1357    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1358    where
1359        L: TopLevel<'a>,
1360    {
1361        let samples = self.location.source_interval(interval);
1362        let tick = self.location.tick();
1363
1364        self.snapshot(&tick, nondet)
1365            .filter_if(samples.batch(&tick, nondet).first().is_some())
1366            .all_ticks()
1367            .weaken_retries()
1368    }
1369}
1370
1371impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1372where
1373    L: Location<'a>,
1374{
1375    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1376    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1377    /// null values).
1378    ///
1379    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1380    /// producing one element in the output for each (non-null) tick. This is useful for batched
1381    /// computations, where the results from each tick must be combined together.
1382    ///
1383    /// # Example
1384    /// ```rust
1385    /// # #[cfg(feature = "deploy")] {
1386    /// # use hydro_lang::prelude::*;
1387    /// # use futures::StreamExt;
1388    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1389    /// # let tick = process.tick();
1390    /// # // ticks are lazy by default, forces the second tick to run
1391    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1392    /// # let batch_first_tick = process
1393    /// #   .source_iter(q!(vec![]))
1394    /// #   .batch(&tick, nondet!(/** test */));
1395    /// # let batch_second_tick = process
1396    /// #   .source_iter(q!(vec![1, 2, 3]))
1397    /// #   .batch(&tick, nondet!(/** test */))
1398    /// #   .defer_tick(); // appears on the second tick
1399    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1400    /// input_batch // first tick: [], second tick: [1, 2, 3]
1401    ///     .max()
1402    ///     .all_ticks()
1403    /// # }, |mut stream| async move {
1404    /// // [3]
1405    /// # for w in vec![3] {
1406    /// #     assert_eq!(stream.next().await.unwrap(), w);
1407    /// # }
1408    /// # }));
1409    /// # }
1410    /// ```
1411    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1412        self.into_stream().all_ticks()
1413    }
1414
1415    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1416    /// which will stream the value computed in _each_ tick as a separate stream element.
1417    ///
1418    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1419    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1420    /// optional's [`Tick`] context.
1421    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1422        self.into_stream().all_ticks_atomic()
1423    }
1424
1425    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1426    /// be asynchronously updated with the latest value of the optional inside the tick, including
1427    /// whether the optional is null or not.
1428    ///
1429    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1430    /// tick that tracks the inner value. This is useful for getting the value as of the
1431    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1432    ///
1433    /// # Example
1434    /// ```rust
1435    /// # #[cfg(feature = "deploy")] {
1436    /// # use hydro_lang::prelude::*;
1437    /// # use futures::StreamExt;
1438    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1439    /// # let tick = process.tick();
1440    /// # // ticks are lazy by default, forces the second tick to run
1441    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1442    /// # let batch_first_tick = process
1443    /// #   .source_iter(q!(vec![]))
1444    /// #   .batch(&tick, nondet!(/** test */));
1445    /// # let batch_second_tick = process
1446    /// #   .source_iter(q!(vec![1, 2, 3]))
1447    /// #   .batch(&tick, nondet!(/** test */))
1448    /// #   .defer_tick(); // appears on the second tick
1449    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1450    /// input_batch // first tick: [], second tick: [1, 2, 3]
1451    ///     .max()
1452    ///     .latest()
1453    /// # .into_singleton()
1454    /// # .sample_eager(nondet!(/** test */))
1455    /// # }, |mut stream| async move {
1456    /// // asynchronously changes from None ~> 3
1457    /// # for w in vec![None, Some(3)] {
1458    /// #     assert_eq!(stream.next().await.unwrap(), w);
1459    /// # }
1460    /// # }));
1461    /// # }
1462    /// ```
1463    pub fn latest(self) -> Optional<T, L, Unbounded> {
1464        Optional::new(
1465            self.location.outer().clone(),
1466            HydroNode::YieldConcat {
1467                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1468                metadata: self
1469                    .location
1470                    .outer()
1471                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1472            },
1473        )
1474    }
1475
1476    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1477    /// be updated with the latest value of the optional inside the tick.
1478    ///
1479    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1480    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1481    /// optional's [`Tick`] context.
1482    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1483        let out_location = Atomic {
1484            tick: self.location.clone(),
1485        };
1486
1487        Optional::new(
1488            out_location.clone(),
1489            HydroNode::YieldConcat {
1490                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1491                metadata: out_location
1492                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1493            },
1494        )
1495    }
1496
1497    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1498    /// always has the state of `self` at tick `T - 1`.
1499    ///
1500    /// At tick `0`, the output optional is null, since there is no previous tick.
1501    ///
1502    /// This operator enables stateful iterative processing with ticks, by sending data from one
1503    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1504    ///
1505    /// # Example
1506    /// ```rust
1507    /// # #[cfg(feature = "deploy")] {
1508    /// # use hydro_lang::prelude::*;
1509    /// # use futures::StreamExt;
1510    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1511    /// let tick = process.tick();
1512    /// // ticks are lazy by default, forces the second tick to run
1513    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1514    ///
1515    /// let batch_first_tick = process
1516    ///   .source_iter(q!(vec![1, 2]))
1517    ///   .batch(&tick, nondet!(/** test */));
1518    /// let batch_second_tick = process
1519    ///   .source_iter(q!(vec![3, 4]))
1520    ///   .batch(&tick, nondet!(/** test */))
1521    ///   .defer_tick(); // appears on the second tick
1522    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1523    ///   .reduce(q!(|state, v| *state += v));
1524    ///
1525    /// current_tick_sum.clone().into_singleton().zip(
1526    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1527    /// ).all_ticks()
1528    /// # }, |mut stream| async move {
1529    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1530    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1531    /// #     assert_eq!(stream.next().await.unwrap(), w);
1532    /// # }
1533    /// # }));
1534    /// # }
1535    /// ```
1536    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1537        Optional::new(
1538            self.location.clone(),
1539            HydroNode::DeferTick {
1540                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1541                metadata: self.location.new_node_metadata(Self::collection_kind()),
1542            },
1543        )
1544    }
1545}
1546
1547#[cfg(test)]
1548mod tests {
1549    #[cfg(feature = "deploy")]
1550    use futures::StreamExt;
1551    #[cfg(feature = "deploy")]
1552    use hydro_deploy::Deployment;
1553    #[cfg(any(feature = "deploy", feature = "sim"))]
1554    use stageleft::q;
1555
1556    #[cfg(feature = "deploy")]
1557    use super::Optional;
1558    #[cfg(any(feature = "deploy", feature = "sim"))]
1559    use crate::compile::builder::FlowBuilder;
1560    #[cfg(any(feature = "deploy", feature = "sim"))]
1561    use crate::location::Location;
1562    #[cfg(feature = "deploy")]
1563    use crate::nondet::nondet;
1564
1565    #[cfg(feature = "deploy")]
1566    #[tokio::test]
1567    async fn optional_or_cardinality() {
1568        let mut deployment = Deployment::new();
1569
1570        let mut flow = FlowBuilder::new();
1571        let node = flow.process::<()>();
1572        let external = flow.external::<()>();
1573
1574        let node_tick = node.tick();
1575        let tick_singleton = node_tick.singleton(q!(123));
1576        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1577        let counts = tick_optional_inhabited
1578            .clone()
1579            .or(tick_optional_inhabited)
1580            .into_stream()
1581            .count()
1582            .all_ticks()
1583            .send_bincode_external(&external);
1584
1585        let nodes = flow
1586            .with_process(&node, deployment.Localhost())
1587            .with_external(&external, deployment.Localhost())
1588            .deploy(&mut deployment);
1589
1590        deployment.deploy().await.unwrap();
1591
1592        let mut external_out = nodes.connect(counts).await;
1593
1594        deployment.start().await.unwrap();
1595
1596        assert_eq!(external_out.next().await.unwrap(), 1);
1597    }
1598
1599    #[cfg(feature = "deploy")]
1600    #[tokio::test]
1601    async fn into_singleton_top_level_none_cardinality() {
1602        let mut deployment = Deployment::new();
1603
1604        let mut flow = FlowBuilder::new();
1605        let node = flow.process::<()>();
1606        let external = flow.external::<()>();
1607
1608        let node_tick = node.tick();
1609        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1610        let into_singleton = top_level_none.into_singleton();
1611
1612        let tick_driver = node.spin();
1613
1614        let counts = into_singleton
1615            .snapshot(&node_tick, nondet!(/** test */))
1616            .into_stream()
1617            .count()
1618            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1619            .map(q!(|(c, _)| c))
1620            .all_ticks()
1621            .send_bincode_external(&external);
1622
1623        let nodes = flow
1624            .with_process(&node, deployment.Localhost())
1625            .with_external(&external, deployment.Localhost())
1626            .deploy(&mut deployment);
1627
1628        deployment.deploy().await.unwrap();
1629
1630        let mut external_out = nodes.connect(counts).await;
1631
1632        deployment.start().await.unwrap();
1633
1634        assert_eq!(external_out.next().await.unwrap(), 1);
1635        assert_eq!(external_out.next().await.unwrap(), 1);
1636        assert_eq!(external_out.next().await.unwrap(), 1);
1637    }
1638
1639    #[cfg(feature = "deploy")]
1640    #[tokio::test]
1641    async fn into_singleton_unbounded_top_level_none_cardinality() {
1642        let mut deployment = Deployment::new();
1643
1644        let mut flow = FlowBuilder::new();
1645        let node = flow.process::<()>();
1646        let external = flow.external::<()>();
1647
1648        let node_tick = node.tick();
1649        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1650        let into_singleton = top_level_none.into_singleton();
1651
1652        let tick_driver = node.spin();
1653
1654        let counts = into_singleton
1655            .snapshot(&node_tick, nondet!(/** test */))
1656            .into_stream()
1657            .count()
1658            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1659            .map(q!(|(c, _)| c))
1660            .all_ticks()
1661            .send_bincode_external(&external);
1662
1663        let nodes = flow
1664            .with_process(&node, deployment.Localhost())
1665            .with_external(&external, deployment.Localhost())
1666            .deploy(&mut deployment);
1667
1668        deployment.deploy().await.unwrap();
1669
1670        let mut external_out = nodes.connect(counts).await;
1671
1672        deployment.start().await.unwrap();
1673
1674        assert_eq!(external_out.next().await.unwrap(), 1);
1675        assert_eq!(external_out.next().await.unwrap(), 1);
1676        assert_eq!(external_out.next().await.unwrap(), 1);
1677    }
1678
1679    #[cfg(feature = "sim")]
1680    #[test]
1681    fn top_level_optional_some_into_stream_no_replay() {
1682        let mut flow = FlowBuilder::new();
1683        let node = flow.process::<()>();
1684
1685        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1686        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1687        let filtered_some = folded.filter(q!(|_| true));
1688
1689        let out_recv = filtered_some.into_stream().sim_output();
1690
1691        flow.sim().exhaustive(async || {
1692            out_recv.assert_yields_only([10]).await;
1693        });
1694    }
1695
1696    #[cfg(feature = "sim")]
1697    #[test]
1698    fn top_level_optional_none_into_stream_no_replay() {
1699        let mut flow = FlowBuilder::new();
1700        let node = flow.process::<()>();
1701
1702        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1703        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1704        let filtered_none = folded.filter(q!(|_| false));
1705
1706        let out_recv = filtered_none.into_stream().sim_output();
1707
1708        flow.sim().exhaustive(async || {
1709            out_recv.assert_yields_only([] as [i32; 0]).await;
1710        });
1711    }
1712}