hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick};
23use crate::location::{Location, Tick, TopLevel, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25use crate::prelude::KeyedSingleton;
26
27/// A *nullable* Rust value that can asynchronously change over time.
28///
29/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
30/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
31/// asynchronously change over time, including becoming present of uninhabited.
32///
33/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
34/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
35///
36/// Type Parameters:
37/// - `Type`: the type of the value in this optional (when it is not null)
38/// - `Loc`: the [`Location`] where the optional is materialized
39/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
40pub struct Optional<Type, Loc, Bound: Boundedness> {
41 pub(crate) location: Loc,
42 pub(crate) ir_node: RefCell<HydroNode>,
43 pub(crate) flow_state: FlowState,
44
45 _phantom: PhantomData<(Type, Loc, Bound)>,
46}
47
48impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
49 fn drop(&mut self) {
50 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
51 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
52 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
53 input: Box::new(ir_node),
54 op_metadata: HydroIrOpMetadata::new(),
55 });
56 }
57 }
58}
59
60impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
61where
62 T: Clone,
63 L: Location<'a>,
64{
65 fn from(value: Optional<T, L, Bounded>) -> Self {
66 let tick = value.location().tick();
67 value.clone_into_tick(&tick).latest()
68 }
69}
70
71impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
72where
73 L: Location<'a>,
74{
75 fn defer_tick(self) -> Self {
76 Optional::defer_tick(self)
77 }
78}
79
80impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
81where
82 L: Location<'a>,
83{
84 type Location = Tick<L>;
85
86 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
87 Optional::new(
88 location.clone(),
89 HydroNode::CycleSource {
90 cycle_id,
91 metadata: location.new_node_metadata(Self::collection_kind()),
92 },
93 )
94 }
95}
96
97impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
98where
99 L: Location<'a>,
100{
101 type Location = Tick<L>;
102
103 fn location(&self) -> &Self::Location {
104 self.location()
105 }
106
107 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
108 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
109 location.clone(),
110 HydroNode::DeferTick {
111 input: Box::new(HydroNode::CycleSource {
112 cycle_id,
113 metadata: location.new_node_metadata(Self::collection_kind()),
114 }),
115 metadata: location
116 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
117 },
118 );
119
120 from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
121 }
122}
123
124impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
125where
126 L: Location<'a>,
127{
128 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
129 assert_eq!(
130 Location::id(&self.location),
131 expected_location,
132 "locations do not match"
133 );
134 self.location
135 .flow_state()
136 .borrow_mut()
137 .push_root(HydroRoot::CycleSink {
138 cycle_id,
139 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
140 op_metadata: HydroIrOpMetadata::new(),
141 });
142 }
143}
144
145impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
146where
147 L: Location<'a>,
148{
149 type Location = L;
150
151 fn create_source(cycle_id: CycleId, location: L) -> Self {
152 Optional::new(
153 location.clone(),
154 HydroNode::CycleSource {
155 cycle_id,
156 metadata: location.new_node_metadata(Self::collection_kind()),
157 },
158 )
159 }
160}
161
162impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
163where
164 L: Location<'a>,
165{
166 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
167 assert_eq!(
168 Location::id(&self.location),
169 expected_location,
170 "locations do not match"
171 );
172 self.location
173 .flow_state()
174 .borrow_mut()
175 .push_root(HydroRoot::CycleSink {
176 cycle_id,
177 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
178 op_metadata: HydroIrOpMetadata::new(),
179 });
180 }
181}
182
183impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
184where
185 L: Location<'a>,
186{
187 fn from(singleton: Singleton<T, L, B>) -> Self {
188 Optional::new(
189 singleton.location.clone(),
190 HydroNode::Cast {
191 inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
192 metadata: singleton
193 .location
194 .new_node_metadata(Self::collection_kind()),
195 },
196 )
197 }
198}
199
200#[cfg(stageleft_runtime)]
201pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
202 me: Optional<T, L, B>,
203 other: Optional<O, L, B>,
204) -> Optional<(T, O), L, B> {
205 check_matching_location(&me.location, &other.location);
206
207 Optional::new(
208 me.location.clone(),
209 HydroNode::CrossSingleton {
210 left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
211 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
212 metadata: me
213 .location
214 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
215 },
216 )
217}
218
219#[cfg(stageleft_runtime)]
220fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
221 me: Optional<T, L, B>,
222 other: Optional<T, L, B>,
223) -> Optional<T, L, B> {
224 check_matching_location(&me.location, &other.location);
225
226 Optional::new(
227 me.location.clone(),
228 HydroNode::ChainFirst {
229 first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
230 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
231 metadata: me
232 .location
233 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
234 },
235 )
236}
237
238impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
239where
240 T: Clone,
241 L: Location<'a>,
242{
243 fn clone(&self) -> Self {
244 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
245 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
246 *self.ir_node.borrow_mut() = HydroNode::Tee {
247 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
248 metadata: self.location.new_node_metadata(Self::collection_kind()),
249 };
250 }
251
252 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
253 Optional {
254 location: self.location.clone(),
255 flow_state: self.flow_state.clone(),
256 ir_node: HydroNode::Tee {
257 inner: SharedNode(inner.0.clone()),
258 metadata: metadata.clone(),
259 }
260 .into(),
261 _phantom: PhantomData,
262 }
263 } else {
264 unreachable!()
265 }
266 }
267}
268
269impl<'a, T, L, B: Boundedness> Optional<T, L, B>
270where
271 L: Location<'a>,
272{
273 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276 let flow_state = location.flow_state().clone();
277 Optional {
278 location,
279 flow_state,
280 ir_node: RefCell::new(ir_node),
281 _phantom: PhantomData,
282 }
283 }
284
285 pub(crate) fn collection_kind() -> CollectionKind {
286 CollectionKind::Optional {
287 bound: B::BOUND_KIND,
288 element_type: stageleft::quote_type::<T>().into(),
289 }
290 }
291
292 /// Returns the [`Location`] where this optional is being materialized.
293 pub fn location(&self) -> &L {
294 &self.location
295 }
296
297 /// Weakens the consistency of this live collection to not guarantee any consistency across
298 /// cluster members (if this collection is on a cluster).
299 pub fn weaken_consistency(self) -> Optional<T, L::DropConsistency, B>
300 where
301 L: Location<'a>,
302 {
303 if L::consistency()
304 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
305 {
306 // already no consistency
307 Optional::new(
308 self.location.drop_consistency(),
309 self.ir_node.replace(HydroNode::Placeholder),
310 )
311 } else {
312 Optional::new(
313 self.location.drop_consistency(),
314 HydroNode::Cast {
315 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
316 metadata: self
317 .location
318 .clone()
319 .drop_consistency()
320 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
321 },
322 )
323 }
324 }
325
326 /// Casts this live collection to have the consistency guarantees specified in the given
327 /// location type parameter. The developer must ensure that the strengthened consistency
328 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
329 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
330 self,
331 _proof: impl crate::properties::ConsistencyProof,
332 ) -> Optional<T, L2, B>
333 where
334 L: Location<'a>,
335 {
336 if L::consistency() == L2::consistency() {
337 Optional::new(
338 self.location.with_consistency_of(),
339 self.ir_node.replace(HydroNode::Placeholder),
340 )
341 } else {
342 Optional::new(
343 self.location.with_consistency_of(),
344 HydroNode::AssertIsConsistent {
345 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
346 trusted: false,
347 metadata: self
348 .location
349 .clone()
350 .with_consistency_of::<L2>()
351 .new_node_metadata(Optional::<T, L2, B>::collection_kind()),
352 },
353 )
354 }
355 }
356
357 /// Transforms the optional value by applying a function `f` to it,
358 /// continuously as the input is updated.
359 ///
360 /// Whenever the optional is empty, the output optional is also empty.
361 ///
362 /// # Example
363 /// ```rust
364 /// # #[cfg(feature = "deploy")] {
365 /// # use hydro_lang::prelude::*;
366 /// # use futures::StreamExt;
367 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
368 /// let tick = process.tick();
369 /// let optional = tick.optional_first_tick(q!(1));
370 /// optional.map(q!(|v| v + 1)).all_ticks()
371 /// # }, |mut stream| async move {
372 /// // 2
373 /// # assert_eq!(stream.next().await.unwrap(), 2);
374 /// # }));
375 /// # }
376 /// ```
377 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
378 where
379 F: Fn(T) -> U + 'a,
380 {
381 let f = f.splice_fn1_ctx(&self.location).into();
382 Optional::new(
383 self.location.clone(),
384 HydroNode::Map {
385 f,
386 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
387 metadata: self
388 .location
389 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
390 },
391 )
392 }
393
394 /// Transforms the optional value by applying a function `f` to it and then flattening
395 /// the result into a stream, preserving the order of elements.
396 ///
397 /// If the optional is empty, the output stream is also empty. If the optional contains
398 /// a value, `f` is applied to produce an iterator, and all items from that iterator
399 /// are emitted in the output stream in deterministic order.
400 ///
401 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
402 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
403 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
404 ///
405 /// # Example
406 /// ```rust
407 /// # #[cfg(feature = "deploy")] {
408 /// # use hydro_lang::prelude::*;
409 /// # use futures::StreamExt;
410 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
411 /// let tick = process.tick();
412 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
413 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
414 /// # }, |mut stream| async move {
415 /// // 1, 2, 3
416 /// # for w in vec![1, 2, 3] {
417 /// # assert_eq!(stream.next().await.unwrap(), w);
418 /// # }
419 /// # }));
420 /// # }
421 /// ```
422 pub fn flat_map_ordered<U, I, F>(
423 self,
424 f: impl IntoQuotedMut<'a, F, L>,
425 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
426 where
427 B: IsBounded,
428 I: IntoIterator<Item = U>,
429 F: Fn(T) -> I + 'a,
430 {
431 self.into_stream().flat_map_ordered(f)
432 }
433
434 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
435 /// for the output type `I` to produce items in any order.
436 ///
437 /// If the optional is empty, the output stream is also empty. If the optional contains
438 /// a value, `f` is applied to produce an iterator, and all items from that iterator
439 /// are emitted in the output stream in non-deterministic order.
440 ///
441 /// # Example
442 /// ```rust
443 /// # #[cfg(feature = "deploy")] {
444 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
445 /// # use futures::StreamExt;
446 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
447 /// let tick = process.tick();
448 /// let optional = tick.optional_first_tick(q!(
449 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
450 /// ));
451 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
452 /// # }, |mut stream| async move {
453 /// // 1, 2, 3, but in no particular order
454 /// # let mut results = Vec::new();
455 /// # for _ in 0..3 {
456 /// # results.push(stream.next().await.unwrap());
457 /// # }
458 /// # results.sort();
459 /// # assert_eq!(results, vec![1, 2, 3]);
460 /// # }));
461 /// # }
462 /// ```
463 pub fn flat_map_unordered<U, I, F>(
464 self,
465 f: impl IntoQuotedMut<'a, F, L>,
466 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
467 where
468 B: IsBounded,
469 I: IntoIterator<Item = U>,
470 F: Fn(T) -> I + 'a,
471 {
472 self.into_stream().flat_map_unordered(f)
473 }
474
475 /// Flattens the optional value into a stream, preserving the order of elements.
476 ///
477 /// If the optional is empty, the output stream is also empty. If the optional contains
478 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
479 /// in the output stream in deterministic order.
480 ///
481 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
482 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
483 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
484 ///
485 /// # Example
486 /// ```rust
487 /// # #[cfg(feature = "deploy")] {
488 /// # use hydro_lang::prelude::*;
489 /// # use futures::StreamExt;
490 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
491 /// let tick = process.tick();
492 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
493 /// optional.flatten_ordered().all_ticks()
494 /// # }, |mut stream| async move {
495 /// // 1, 2, 3
496 /// # for w in vec![1, 2, 3] {
497 /// # assert_eq!(stream.next().await.unwrap(), w);
498 /// # }
499 /// # }));
500 /// # }
501 /// ```
502 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
503 where
504 B: IsBounded,
505 T: IntoIterator<Item = U>,
506 {
507 self.flat_map_ordered(q!(|v| v))
508 }
509
510 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
511 /// for the element type `T` to produce items in any order.
512 ///
513 /// If the optional is empty, the output stream is also empty. If the optional contains
514 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
515 /// in the output stream in non-deterministic order.
516 ///
517 /// # Example
518 /// ```rust
519 /// # #[cfg(feature = "deploy")] {
520 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
521 /// # use futures::StreamExt;
522 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
523 /// let tick = process.tick();
524 /// let optional = tick.optional_first_tick(q!(
525 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
526 /// ));
527 /// optional.flatten_unordered().all_ticks()
528 /// # }, |mut stream| async move {
529 /// // 1, 2, 3, but in no particular order
530 /// # let mut results = Vec::new();
531 /// # for _ in 0..3 {
532 /// # results.push(stream.next().await.unwrap());
533 /// # }
534 /// # results.sort();
535 /// # assert_eq!(results, vec![1, 2, 3]);
536 /// # }));
537 /// # }
538 /// ```
539 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
540 where
541 B: IsBounded,
542 T: IntoIterator<Item = U>,
543 {
544 self.flat_map_unordered(q!(|v| v))
545 }
546
547 /// Creates an optional containing only the value if it satisfies a predicate `f`.
548 ///
549 /// If the optional is empty, the output optional is also empty. If the optional contains
550 /// a value and the predicate returns `true`, the output optional contains the same value.
551 /// If the predicate returns `false`, the output optional is empty.
552 ///
553 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
554 /// not modify or take ownership of the value. If you need to modify the value while filtering
555 /// use [`Optional::filter_map`] instead.
556 ///
557 /// # Example
558 /// ```rust
559 /// # #[cfg(feature = "deploy")] {
560 /// # use hydro_lang::prelude::*;
561 /// # use futures::StreamExt;
562 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
563 /// let tick = process.tick();
564 /// let optional = tick.optional_first_tick(q!(5));
565 /// optional.filter(q!(|&x| x > 3)).all_ticks()
566 /// # }, |mut stream| async move {
567 /// // 5
568 /// # assert_eq!(stream.next().await.unwrap(), 5);
569 /// # }));
570 /// # }
571 /// ```
572 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
573 where
574 F: Fn(&T) -> bool + 'a,
575 {
576 let f = f.splice_fn1_borrow_ctx(&self.location).into();
577 Optional::new(
578 self.location.clone(),
579 HydroNode::Filter {
580 f,
581 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
582 metadata: self.location.new_node_metadata(Self::collection_kind()),
583 },
584 )
585 }
586
587 /// An operator that both filters and maps. It yields only the value if the supplied
588 /// closure `f` returns `Some(value)`.
589 ///
590 /// If the optional is empty, the output optional is also empty. If the optional contains
591 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
592 /// If the closure returns `None`, the output optional is empty.
593 ///
594 /// # Example
595 /// ```rust
596 /// # #[cfg(feature = "deploy")] {
597 /// # use hydro_lang::prelude::*;
598 /// # use futures::StreamExt;
599 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
600 /// let tick = process.tick();
601 /// let optional = tick.optional_first_tick(q!("42"));
602 /// optional
603 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
604 /// .all_ticks()
605 /// # }, |mut stream| async move {
606 /// // 42
607 /// # assert_eq!(stream.next().await.unwrap(), 42);
608 /// # }));
609 /// # }
610 /// ```
611 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
612 where
613 F: Fn(T) -> Option<U> + 'a,
614 {
615 let f = f.splice_fn1_ctx(&self.location).into();
616 Optional::new(
617 self.location.clone(),
618 HydroNode::FilterMap {
619 f,
620 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
621 metadata: self
622 .location
623 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
624 },
625 )
626 }
627
628 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
629 ///
630 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
631 /// non-null. This is useful for combining several pieces of state together.
632 ///
633 /// # Example
634 /// ```rust
635 /// # #[cfg(feature = "deploy")] {
636 /// # use hydro_lang::prelude::*;
637 /// # use futures::StreamExt;
638 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
639 /// let tick = process.tick();
640 /// let numbers = process
641 /// .source_iter(q!(vec![123, 456, 789]))
642 /// .batch(&tick, nondet!(/** test */));
643 /// let min = numbers.clone().min(); // Optional
644 /// let max = numbers.max(); // Optional
645 /// min.zip(max).all_ticks()
646 /// # }, |mut stream| async move {
647 /// // [(123, 789)]
648 /// # for w in vec![(123, 789)] {
649 /// # assert_eq!(stream.next().await.unwrap(), w);
650 /// # }
651 /// # }));
652 /// # }
653 /// ```
654 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
655 where
656 B: IsBounded,
657 {
658 let other: Optional<O, L, B> = other.into();
659 check_matching_location(&self.location, &other.location);
660
661 if L::is_top_level()
662 && let Some(tick) = self.location.try_tick()
663 {
664 let self_location = self.location().clone();
665 let out = zip_inside_tick(
666 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
667 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
668 )
669 .latest();
670
671 Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
672 } else {
673 zip_inside_tick(self, other)
674 }
675 }
676
677 /// Passes through `self` when it has a value, otherwise passes through `other`.
678 ///
679 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
680 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
681 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
682 ///
683 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
684 /// of the inputs change (including to/from null states).
685 ///
686 /// # Example
687 /// ```rust
688 /// # #[cfg(feature = "deploy")] {
689 /// # use hydro_lang::prelude::*;
690 /// # use futures::StreamExt;
691 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
692 /// let tick = process.tick();
693 /// // ticks are lazy by default, forces the second tick to run
694 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
695 ///
696 /// let some_first_tick = tick.optional_first_tick(q!(123));
697 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
698 /// some_first_tick.or(some_second_tick).all_ticks()
699 /// # }, |mut stream| async move {
700 /// // [123 /* first tick */, 456 /* second tick */]
701 /// # for w in vec![123, 456] {
702 /// # assert_eq!(stream.next().await.unwrap(), w);
703 /// # }
704 /// # }));
705 /// # }
706 /// ```
707 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
708 check_matching_location(&self.location, &other.location);
709
710 if L::is_top_level()
711 && !B::BOUNDED // only if unbounded we need to use a tick
712 && let Some(tick) = self.location.try_tick()
713 {
714 let self_location = self.location().clone();
715 let out = or_inside_tick(
716 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
717 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
718 )
719 .latest();
720
721 Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
722 } else {
723 Optional::new(
724 self.location.clone(),
725 HydroNode::ChainFirst {
726 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
727 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
728 metadata: self.location.new_node_metadata(Self::collection_kind()),
729 },
730 )
731 }
732 }
733
734 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
735 ///
736 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
737 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
738 ///
739 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
740 /// of the inputs change (including to/from null states).
741 ///
742 /// # Example
743 /// ```rust
744 /// # #[cfg(feature = "deploy")] {
745 /// # use hydro_lang::prelude::*;
746 /// # use futures::StreamExt;
747 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
748 /// let tick = process.tick();
749 /// // ticks are lazy by default, forces the later ticks to run
750 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
751 ///
752 /// let some_first_tick = tick.optional_first_tick(q!(123));
753 /// some_first_tick
754 /// .unwrap_or(tick.singleton(q!(456)))
755 /// .all_ticks()
756 /// # }, |mut stream| async move {
757 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
758 /// # for w in vec![123, 456, 456, 456] {
759 /// # assert_eq!(stream.next().await.unwrap(), w);
760 /// # }
761 /// # }));
762 /// # }
763 /// ```
764 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
765 let res_option = self.or(other.into());
766 Singleton::new(
767 res_option.location.clone(),
768 HydroNode::Cast {
769 inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
770 metadata: res_option
771 .location
772 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
773 },
774 )
775 }
776
777 /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
778 ///
779 /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
780 /// [`Optional`] when the default value of the type is a suitable fallback.
781 ///
782 /// # Example
783 /// ```rust
784 /// # #[cfg(feature = "deploy")] {
785 /// # use hydro_lang::prelude::*;
786 /// # use futures::StreamExt;
787 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
788 /// let tick = process.tick();
789 /// // ticks are lazy by default, forces the later ticks to run
790 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
791 ///
792 /// let some_first_tick = tick.optional_first_tick(q!(123i32));
793 /// some_first_tick.unwrap_or_default().all_ticks()
794 /// # }, |mut stream| async move {
795 /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
796 /// # for w in vec![123, 0, 0, 0] {
797 /// # assert_eq!(stream.next().await.unwrap(), w);
798 /// # }
799 /// # }));
800 /// # }
801 /// ```
802 pub fn unwrap_or_default(self) -> Singleton<T, L, B>
803 where
804 T: Default + Clone,
805 {
806 self.into_singleton().map(q!(|v| v.unwrap_or_default()))
807 }
808
809 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
810 ///
811 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
812 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
813 /// so that Hydro can skip any computation on null values.
814 ///
815 /// # Example
816 /// ```rust
817 /// # #[cfg(feature = "deploy")] {
818 /// # use hydro_lang::prelude::*;
819 /// # use futures::StreamExt;
820 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
821 /// let tick = process.tick();
822 /// // ticks are lazy by default, forces the later ticks to run
823 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
824 ///
825 /// let some_first_tick = tick.optional_first_tick(q!(123));
826 /// some_first_tick.into_singleton().all_ticks()
827 /// # }, |mut stream| async move {
828 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
829 /// # for w in vec![Some(123), None, None, None] {
830 /// # assert_eq!(stream.next().await.unwrap(), w);
831 /// # }
832 /// # }));
833 /// # }
834 /// ```
835 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
836 where
837 T: Clone,
838 {
839 let none: syn::Expr = parse_quote!(::std::option::Option::None);
840
841 let none_singleton = Singleton::new(
842 self.location.clone(),
843 HydroNode::SingletonSource {
844 value: none.into(),
845 first_tick_only: false,
846 metadata: self
847 .location
848 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
849 },
850 );
851
852 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
853 }
854
855 /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
856 ///
857 /// # Example
858 /// ```rust
859 /// # #[cfg(feature = "deploy")] {
860 /// # use hydro_lang::prelude::*;
861 /// # use futures::StreamExt;
862 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
863 /// let tick = process.tick();
864 /// // ticks are lazy by default, forces the second tick to run
865 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
866 ///
867 /// let some_first_tick = tick.optional_first_tick(q!(42));
868 /// some_first_tick.is_some().all_ticks()
869 /// # }, |mut stream| async move {
870 /// // [true /* first tick */, false /* second tick */, ...]
871 /// # for w in vec![true, false] {
872 /// # assert_eq!(stream.next().await.unwrap(), w);
873 /// # }
874 /// # }));
875 /// # }
876 /// ```
877 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
878 pub fn is_some(self) -> Singleton<bool, L, B> {
879 self.map(q!(|_| ()))
880 .into_singleton()
881 .map(q!(|o| o.is_some()))
882 }
883
884 /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
885 ///
886 /// # Example
887 /// ```rust
888 /// # #[cfg(feature = "deploy")] {
889 /// # use hydro_lang::prelude::*;
890 /// # use futures::StreamExt;
891 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
892 /// let tick = process.tick();
893 /// // ticks are lazy by default, forces the second tick to run
894 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
895 ///
896 /// let some_first_tick = tick.optional_first_tick(q!(42));
897 /// some_first_tick.is_none().all_ticks()
898 /// # }, |mut stream| async move {
899 /// // [false /* first tick */, true /* second tick */, ...]
900 /// # for w in vec![false, true] {
901 /// # assert_eq!(stream.next().await.unwrap(), w);
902 /// # }
903 /// # }));
904 /// # }
905 /// ```
906 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
907 pub fn is_none(self) -> Singleton<bool, L, B> {
908 self.map(q!(|_| ()))
909 .into_singleton()
910 .map(q!(|o| o.is_none()))
911 }
912
913 /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
914 /// values are equal, `false` otherwise (including when either is null).
915 ///
916 /// # Example
917 /// ```rust
918 /// # #[cfg(feature = "deploy")] {
919 /// # use hydro_lang::prelude::*;
920 /// # use futures::StreamExt;
921 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
922 /// let tick = process.tick();
923 /// // ticks are lazy by default, forces the second tick to run
924 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
925 ///
926 /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
927 /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
928 /// a.is_some_and_equals(b).all_ticks()
929 /// # }, |mut stream| async move {
930 /// // [true, false]
931 /// # for w in vec![true, false] {
932 /// # assert_eq!(stream.next().await.unwrap(), w);
933 /// # }
934 /// # }));
935 /// # }
936 /// ```
937 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
938 pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
939 where
940 T: PartialEq + Clone,
941 B: IsBounded,
942 {
943 self.into_singleton()
944 .zip(other.into_singleton())
945 .map(q!(|(a, b)| a.is_some() && a == b))
946 }
947
948 /// An operator which allows you to "name" a `HydroNode`.
949 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
950 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
951 {
952 let mut node = self.ir_node.borrow_mut();
953 let metadata = node.metadata_mut();
954 metadata.tag = Some(name.to_owned());
955 }
956 self
957 }
958
959 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
960 /// implies that `B == Bounded`.
961 pub fn make_bounded(self) -> Optional<T, L, Bounded>
962 where
963 B: IsBounded,
964 {
965 Optional::new(
966 self.location.clone(),
967 self.ir_node.replace(HydroNode::Placeholder),
968 )
969 }
970
971 /// Clones this bounded optional into a tick, returning a optional that has the
972 /// same value as the outer optional. Because the outer optional is bounded, this
973 /// is deterministic because there is only a single immutable version.
974 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
975 where
976 B: IsBounded,
977 T: Clone,
978 {
979 // TODO(shadaj): avoid printing simulator logs for this snapshot
980 let inner = self.snapshot(
981 tick,
982 nondet!(/** bounded top-level optional so deterministic */),
983 );
984 Optional::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
985 }
986
987 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
988 /// non-null. Otherwise, the stream is empty.
989 ///
990 /// # Example
991 /// ```rust
992 /// # #[cfg(feature = "deploy")] {
993 /// # use hydro_lang::prelude::*;
994 /// # use futures::StreamExt;
995 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
996 /// # let tick = process.tick();
997 /// # // ticks are lazy by default, forces the second tick to run
998 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
999 /// # let batch_first_tick = process
1000 /// # .source_iter(q!(vec![]))
1001 /// # .batch(&tick, nondet!(/** test */));
1002 /// # let batch_second_tick = process
1003 /// # .source_iter(q!(vec![123, 456]))
1004 /// # .batch(&tick, nondet!(/** test */))
1005 /// # .defer_tick(); // appears on the second tick
1006 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1007 /// input_batch // first tick: [], second tick: [123, 456]
1008 /// .clone()
1009 /// .max()
1010 /// .into_stream()
1011 /// .chain(input_batch)
1012 /// .all_ticks()
1013 /// # }, |mut stream| async move {
1014 /// // [456, 123, 456]
1015 /// # for w in vec![456, 123, 456] {
1016 /// # assert_eq!(stream.next().await.unwrap(), w);
1017 /// # }
1018 /// # }));
1019 /// # }
1020 /// ```
1021 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1022 where
1023 B: IsBounded,
1024 {
1025 Stream::new(
1026 self.location.clone(),
1027 HydroNode::Cast {
1028 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1029 metadata: self.location.new_node_metadata(Stream::<
1030 T,
1031 Tick<L>,
1032 Bounded,
1033 TotalOrder,
1034 ExactlyOnce,
1035 >::collection_kind()),
1036 },
1037 )
1038 }
1039
1040 /// Filters this optional, passing through the value if the boolean signal is `true`,
1041 /// otherwise the output is null.
1042 ///
1043 /// # Example
1044 /// ```rust
1045 /// # #[cfg(feature = "deploy")] {
1046 /// # use hydro_lang::prelude::*;
1047 /// # use futures::StreamExt;
1048 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1049 /// let tick = process.tick();
1050 /// // ticks are lazy by default, forces the second tick to run
1051 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1052 ///
1053 /// let some_first_tick = tick.optional_first_tick(q!(()));
1054 /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1055 /// let batch_first_tick = process
1056 /// .source_iter(q!(vec![456]))
1057 /// .batch(&tick, nondet!(/** test */));
1058 /// let batch_second_tick = process
1059 /// .source_iter(q!(vec![789]))
1060 /// .batch(&tick, nondet!(/** test */))
1061 /// .defer_tick();
1062 /// batch_first_tick.chain(batch_second_tick).first()
1063 /// .filter_if(signal)
1064 /// .unwrap_or(tick.singleton(q!(0)))
1065 /// .all_ticks()
1066 /// # }, |mut stream| async move {
1067 /// // [456, 0]
1068 /// # for w in vec![456, 0] {
1069 /// # assert_eq!(stream.next().await.unwrap(), w);
1070 /// # }
1071 /// # }));
1072 /// # }
1073 /// ```
1074 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1075 where
1076 B: IsBounded,
1077 {
1078 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1079 }
1080
1081 /// Filters this optional, passing through the optional value if it is non-null **and** the
1082 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1083 ///
1084 /// Useful for conditionally processing, such as only emitting an optional's value outside
1085 /// a tick if some other condition is satisfied.
1086 ///
1087 /// # Example
1088 /// ```rust
1089 /// # #[cfg(feature = "deploy")] {
1090 /// # use hydro_lang::prelude::*;
1091 /// # use futures::StreamExt;
1092 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1093 /// let tick = process.tick();
1094 /// // ticks are lazy by default, forces the second tick to run
1095 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1096 ///
1097 /// let batch_first_tick = process
1098 /// .source_iter(q!(vec![]))
1099 /// .batch(&tick, nondet!(/** test */));
1100 /// let batch_second_tick = process
1101 /// .source_iter(q!(vec![456]))
1102 /// .batch(&tick, nondet!(/** test */))
1103 /// .defer_tick(); // appears on the second tick
1104 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1105 /// batch_first_tick.chain(batch_second_tick).first()
1106 /// .filter_if_some(some_on_first_tick)
1107 /// .unwrap_or(tick.singleton(q!(789)))
1108 /// .all_ticks()
1109 /// # }, |mut stream| async move {
1110 /// // [789, 789]
1111 /// # for w in vec![789, 789] {
1112 /// # assert_eq!(stream.next().await.unwrap(), w);
1113 /// # }
1114 /// # }));
1115 /// # }
1116 /// ```
1117 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1118 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1119 where
1120 B: IsBounded,
1121 {
1122 self.filter_if(signal.is_some())
1123 }
1124
1125 /// Filters this optional, passing through the optional value if it is non-null **and** the
1126 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1127 ///
1128 /// Useful for conditionally processing, such as only emitting an optional's value outside
1129 /// a tick if some other condition is satisfied.
1130 ///
1131 /// # Example
1132 /// ```rust
1133 /// # #[cfg(feature = "deploy")] {
1134 /// # use hydro_lang::prelude::*;
1135 /// # use futures::StreamExt;
1136 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1137 /// let tick = process.tick();
1138 /// // ticks are lazy by default, forces the second tick to run
1139 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1140 ///
1141 /// let batch_first_tick = process
1142 /// .source_iter(q!(vec![]))
1143 /// .batch(&tick, nondet!(/** test */));
1144 /// let batch_second_tick = process
1145 /// .source_iter(q!(vec![456]))
1146 /// .batch(&tick, nondet!(/** test */))
1147 /// .defer_tick(); // appears on the second tick
1148 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1149 /// batch_first_tick.chain(batch_second_tick).first()
1150 /// .filter_if_none(some_on_first_tick)
1151 /// .unwrap_or(tick.singleton(q!(789)))
1152 /// .all_ticks()
1153 /// # }, |mut stream| async move {
1154 /// // [789, 789]
1155 /// # for w in vec![789, 456] {
1156 /// # assert_eq!(stream.next().await.unwrap(), w);
1157 /// # }
1158 /// # }));
1159 /// # }
1160 /// ```
1161 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1162 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1163 where
1164 B: IsBounded,
1165 {
1166 self.filter_if(other.is_none())
1167 }
1168
1169 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1170 ///
1171 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1172 /// having a value, such as only releasing a piece of state if the node is the leader.
1173 ///
1174 /// # Example
1175 /// ```rust
1176 /// # #[cfg(feature = "deploy")] {
1177 /// # use hydro_lang::prelude::*;
1178 /// # use futures::StreamExt;
1179 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1180 /// let tick = process.tick();
1181 /// // ticks are lazy by default, forces the second tick to run
1182 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1183 ///
1184 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1185 /// some_on_first_tick
1186 /// .if_some_then(tick.singleton(q!(456)))
1187 /// .unwrap_or(tick.singleton(q!(123)))
1188 /// # .all_ticks()
1189 /// # }, |mut stream| async move {
1190 /// // 456 (first tick) ~> 123 (second tick onwards)
1191 /// # for w in vec![456, 123, 123] {
1192 /// # assert_eq!(stream.next().await.unwrap(), w);
1193 /// # }
1194 /// # }));
1195 /// # }
1196 /// ```
1197 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1198 pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1199 where
1200 B: IsBounded,
1201 {
1202 value.filter_if(self.is_some())
1203 }
1204}
1205
1206impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1207where
1208 L: Location<'a>,
1209{
1210 /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1211 /// key-value pair of this [`Optional`].
1212 ///
1213 /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1214 /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1215 /// the entry will be updated and appear / disappear according to the state of the
1216 /// [`Optional`].
1217 pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1218 KeyedSingleton::new(
1219 self.location.clone(),
1220 HydroNode::Cast {
1221 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1222 metadata: self
1223 .location
1224 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1225 },
1226 )
1227 }
1228}
1229
1230impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1231where
1232 L: Location<'a>,
1233{
1234 /// Returns an optional value corresponding to the latest snapshot of the optional
1235 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1236 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1237 /// all snapshots of this optional into the atomic-associated tick will observe the
1238 /// same value each tick.
1239 ///
1240 /// # Non-Determinism
1241 /// Because this picks a snapshot of a optional whose value is continuously changing,
1242 /// the output optional has a non-deterministic value since the snapshot can be at an
1243 /// arbitrary point in time.
1244 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1245 self,
1246 tick: &Tick<L2>,
1247 _nondet: NonDet,
1248 ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1249 Optional::new(
1250 tick.drop_consistency(),
1251 HydroNode::Batch {
1252 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1253 metadata: tick
1254 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1255 },
1256 )
1257 }
1258
1259 /// Returns this optional back into a top-level, asynchronous execution context where updates
1260 /// to the value will be asynchronously propagated.
1261 pub fn end_atomic(self) -> Optional<T, L, B> {
1262 Optional::new(
1263 self.location.tick.l.clone(),
1264 HydroNode::EndAtomic {
1265 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1266 metadata: self
1267 .location
1268 .tick
1269 .l
1270 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1271 },
1272 )
1273 }
1274}
1275
1276impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1277where
1278 L: Location<'a>,
1279{
1280 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1281 /// will observe the same version of the value and will be executed synchronously before any
1282 /// outputs are yielded (in [`Optional::end_atomic`]).
1283 ///
1284 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1285 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1286 /// a different version).
1287 pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1288 let id = self.location.flow_state().borrow_mut().next_clock_id();
1289 let out_location = Atomic {
1290 tick: Tick {
1291 id,
1292 l: self.location.clone(),
1293 },
1294 };
1295 Optional::new(
1296 out_location.clone(),
1297 HydroNode::BeginAtomic {
1298 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1299 metadata: out_location
1300 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1301 },
1302 )
1303 }
1304
1305 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1306 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1307 /// relevant data that contributed to the snapshot at tick `t`.
1308 ///
1309 /// # Non-Determinism
1310 /// Because this picks a snapshot of a optional whose value is continuously changing,
1311 /// the output optional has a non-deterministic value since the snapshot can be at an
1312 /// arbitrary point in time.
1313 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1314 self,
1315 tick: &Tick<L2>,
1316 _nondet: NonDet,
1317 ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1318 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1319 Optional::new(
1320 tick.drop_consistency(),
1321 HydroNode::Batch {
1322 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1323 metadata: tick
1324 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1325 },
1326 )
1327 }
1328
1329 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1330 /// with order corresponding to increasing prefixes of data contributing to the optional.
1331 ///
1332 /// # Non-Determinism
1333 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1334 /// to non-deterministic batching and arrival of inputs, the output stream is
1335 /// non-deterministic.
1336 pub fn sample_eager(
1337 self,
1338 nondet: NonDet,
1339 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1340 let tick = self.location.tick();
1341 self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1342 }
1343
1344 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1345 /// value taken at various points in time. Because the input optional may be
1346 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1347 /// represent the value of the optional given some prefix of the streams leading up to
1348 /// it.
1349 ///
1350 /// # Non-Determinism
1351 /// The output stream is non-deterministic in which elements are sampled, since this
1352 /// is controlled by a clock.
1353 pub fn sample_every(
1354 self,
1355 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1356 nondet: NonDet,
1357 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1358 where
1359 L: TopLevel<'a>,
1360 {
1361 let samples = self.location.source_interval(interval);
1362 let tick = self.location.tick();
1363
1364 self.snapshot(&tick, nondet)
1365 .filter_if(samples.batch(&tick, nondet).first().is_some())
1366 .all_ticks()
1367 .weaken_retries()
1368 }
1369}
1370
1371impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1372where
1373 L: Location<'a>,
1374{
1375 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1376 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1377 /// null values).
1378 ///
1379 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1380 /// producing one element in the output for each (non-null) tick. This is useful for batched
1381 /// computations, where the results from each tick must be combined together.
1382 ///
1383 /// # Example
1384 /// ```rust
1385 /// # #[cfg(feature = "deploy")] {
1386 /// # use hydro_lang::prelude::*;
1387 /// # use futures::StreamExt;
1388 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1389 /// # let tick = process.tick();
1390 /// # // ticks are lazy by default, forces the second tick to run
1391 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1392 /// # let batch_first_tick = process
1393 /// # .source_iter(q!(vec![]))
1394 /// # .batch(&tick, nondet!(/** test */));
1395 /// # let batch_second_tick = process
1396 /// # .source_iter(q!(vec![1, 2, 3]))
1397 /// # .batch(&tick, nondet!(/** test */))
1398 /// # .defer_tick(); // appears on the second tick
1399 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1400 /// input_batch // first tick: [], second tick: [1, 2, 3]
1401 /// .max()
1402 /// .all_ticks()
1403 /// # }, |mut stream| async move {
1404 /// // [3]
1405 /// # for w in vec![3] {
1406 /// # assert_eq!(stream.next().await.unwrap(), w);
1407 /// # }
1408 /// # }));
1409 /// # }
1410 /// ```
1411 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1412 self.into_stream().all_ticks()
1413 }
1414
1415 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1416 /// which will stream the value computed in _each_ tick as a separate stream element.
1417 ///
1418 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1419 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1420 /// optional's [`Tick`] context.
1421 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1422 self.into_stream().all_ticks_atomic()
1423 }
1424
1425 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1426 /// be asynchronously updated with the latest value of the optional inside the tick, including
1427 /// whether the optional is null or not.
1428 ///
1429 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1430 /// tick that tracks the inner value. This is useful for getting the value as of the
1431 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1432 ///
1433 /// # Example
1434 /// ```rust
1435 /// # #[cfg(feature = "deploy")] {
1436 /// # use hydro_lang::prelude::*;
1437 /// # use futures::StreamExt;
1438 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1439 /// # let tick = process.tick();
1440 /// # // ticks are lazy by default, forces the second tick to run
1441 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1442 /// # let batch_first_tick = process
1443 /// # .source_iter(q!(vec![]))
1444 /// # .batch(&tick, nondet!(/** test */));
1445 /// # let batch_second_tick = process
1446 /// # .source_iter(q!(vec![1, 2, 3]))
1447 /// # .batch(&tick, nondet!(/** test */))
1448 /// # .defer_tick(); // appears on the second tick
1449 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1450 /// input_batch // first tick: [], second tick: [1, 2, 3]
1451 /// .max()
1452 /// .latest()
1453 /// # .into_singleton()
1454 /// # .sample_eager(nondet!(/** test */))
1455 /// # }, |mut stream| async move {
1456 /// // asynchronously changes from None ~> 3
1457 /// # for w in vec![None, Some(3)] {
1458 /// # assert_eq!(stream.next().await.unwrap(), w);
1459 /// # }
1460 /// # }));
1461 /// # }
1462 /// ```
1463 pub fn latest(self) -> Optional<T, L, Unbounded> {
1464 Optional::new(
1465 self.location.outer().clone(),
1466 HydroNode::YieldConcat {
1467 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1468 metadata: self
1469 .location
1470 .outer()
1471 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1472 },
1473 )
1474 }
1475
1476 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1477 /// be updated with the latest value of the optional inside the tick.
1478 ///
1479 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1480 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1481 /// optional's [`Tick`] context.
1482 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1483 let out_location = Atomic {
1484 tick: self.location.clone(),
1485 };
1486
1487 Optional::new(
1488 out_location.clone(),
1489 HydroNode::YieldConcat {
1490 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1491 metadata: out_location
1492 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1493 },
1494 )
1495 }
1496
1497 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1498 /// always has the state of `self` at tick `T - 1`.
1499 ///
1500 /// At tick `0`, the output optional is null, since there is no previous tick.
1501 ///
1502 /// This operator enables stateful iterative processing with ticks, by sending data from one
1503 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1504 ///
1505 /// # Example
1506 /// ```rust
1507 /// # #[cfg(feature = "deploy")] {
1508 /// # use hydro_lang::prelude::*;
1509 /// # use futures::StreamExt;
1510 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1511 /// let tick = process.tick();
1512 /// // ticks are lazy by default, forces the second tick to run
1513 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1514 ///
1515 /// let batch_first_tick = process
1516 /// .source_iter(q!(vec![1, 2]))
1517 /// .batch(&tick, nondet!(/** test */));
1518 /// let batch_second_tick = process
1519 /// .source_iter(q!(vec![3, 4]))
1520 /// .batch(&tick, nondet!(/** test */))
1521 /// .defer_tick(); // appears on the second tick
1522 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1523 /// .reduce(q!(|state, v| *state += v));
1524 ///
1525 /// current_tick_sum.clone().into_singleton().zip(
1526 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1527 /// ).all_ticks()
1528 /// # }, |mut stream| async move {
1529 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1530 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1531 /// # assert_eq!(stream.next().await.unwrap(), w);
1532 /// # }
1533 /// # }));
1534 /// # }
1535 /// ```
1536 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1537 Optional::new(
1538 self.location.clone(),
1539 HydroNode::DeferTick {
1540 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1541 metadata: self.location.new_node_metadata(Self::collection_kind()),
1542 },
1543 )
1544 }
1545}
1546
1547#[cfg(test)]
1548mod tests {
1549 #[cfg(feature = "deploy")]
1550 use futures::StreamExt;
1551 #[cfg(feature = "deploy")]
1552 use hydro_deploy::Deployment;
1553 #[cfg(any(feature = "deploy", feature = "sim"))]
1554 use stageleft::q;
1555
1556 #[cfg(feature = "deploy")]
1557 use super::Optional;
1558 #[cfg(any(feature = "deploy", feature = "sim"))]
1559 use crate::compile::builder::FlowBuilder;
1560 #[cfg(any(feature = "deploy", feature = "sim"))]
1561 use crate::location::Location;
1562 #[cfg(feature = "deploy")]
1563 use crate::nondet::nondet;
1564
1565 #[cfg(feature = "deploy")]
1566 #[tokio::test]
1567 async fn optional_or_cardinality() {
1568 let mut deployment = Deployment::new();
1569
1570 let mut flow = FlowBuilder::new();
1571 let node = flow.process::<()>();
1572 let external = flow.external::<()>();
1573
1574 let node_tick = node.tick();
1575 let tick_singleton = node_tick.singleton(q!(123));
1576 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1577 let counts = tick_optional_inhabited
1578 .clone()
1579 .or(tick_optional_inhabited)
1580 .into_stream()
1581 .count()
1582 .all_ticks()
1583 .send_bincode_external(&external);
1584
1585 let nodes = flow
1586 .with_process(&node, deployment.Localhost())
1587 .with_external(&external, deployment.Localhost())
1588 .deploy(&mut deployment);
1589
1590 deployment.deploy().await.unwrap();
1591
1592 let mut external_out = nodes.connect(counts).await;
1593
1594 deployment.start().await.unwrap();
1595
1596 assert_eq!(external_out.next().await.unwrap(), 1);
1597 }
1598
1599 #[cfg(feature = "deploy")]
1600 #[tokio::test]
1601 async fn into_singleton_top_level_none_cardinality() {
1602 let mut deployment = Deployment::new();
1603
1604 let mut flow = FlowBuilder::new();
1605 let node = flow.process::<()>();
1606 let external = flow.external::<()>();
1607
1608 let node_tick = node.tick();
1609 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1610 let into_singleton = top_level_none.into_singleton();
1611
1612 let tick_driver = node.spin();
1613
1614 let counts = into_singleton
1615 .snapshot(&node_tick, nondet!(/** test */))
1616 .into_stream()
1617 .count()
1618 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1619 .map(q!(|(c, _)| c))
1620 .all_ticks()
1621 .send_bincode_external(&external);
1622
1623 let nodes = flow
1624 .with_process(&node, deployment.Localhost())
1625 .with_external(&external, deployment.Localhost())
1626 .deploy(&mut deployment);
1627
1628 deployment.deploy().await.unwrap();
1629
1630 let mut external_out = nodes.connect(counts).await;
1631
1632 deployment.start().await.unwrap();
1633
1634 assert_eq!(external_out.next().await.unwrap(), 1);
1635 assert_eq!(external_out.next().await.unwrap(), 1);
1636 assert_eq!(external_out.next().await.unwrap(), 1);
1637 }
1638
1639 #[cfg(feature = "deploy")]
1640 #[tokio::test]
1641 async fn into_singleton_unbounded_top_level_none_cardinality() {
1642 let mut deployment = Deployment::new();
1643
1644 let mut flow = FlowBuilder::new();
1645 let node = flow.process::<()>();
1646 let external = flow.external::<()>();
1647
1648 let node_tick = node.tick();
1649 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1650 let into_singleton = top_level_none.into_singleton();
1651
1652 let tick_driver = node.spin();
1653
1654 let counts = into_singleton
1655 .snapshot(&node_tick, nondet!(/** test */))
1656 .into_stream()
1657 .count()
1658 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1659 .map(q!(|(c, _)| c))
1660 .all_ticks()
1661 .send_bincode_external(&external);
1662
1663 let nodes = flow
1664 .with_process(&node, deployment.Localhost())
1665 .with_external(&external, deployment.Localhost())
1666 .deploy(&mut deployment);
1667
1668 deployment.deploy().await.unwrap();
1669
1670 let mut external_out = nodes.connect(counts).await;
1671
1672 deployment.start().await.unwrap();
1673
1674 assert_eq!(external_out.next().await.unwrap(), 1);
1675 assert_eq!(external_out.next().await.unwrap(), 1);
1676 assert_eq!(external_out.next().await.unwrap(), 1);
1677 }
1678
1679 #[cfg(feature = "sim")]
1680 #[test]
1681 fn top_level_optional_some_into_stream_no_replay() {
1682 let mut flow = FlowBuilder::new();
1683 let node = flow.process::<()>();
1684
1685 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1686 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1687 let filtered_some = folded.filter(q!(|_| true));
1688
1689 let out_recv = filtered_some.into_stream().sim_output();
1690
1691 flow.sim().exhaustive(async || {
1692 out_recv.assert_yields_only([10]).await;
1693 });
1694 }
1695
1696 #[cfg(feature = "sim")]
1697 #[test]
1698 fn top_level_optional_none_into_stream_no_replay() {
1699 let mut flow = FlowBuilder::new();
1700 let node = flow.process::<()>();
1701
1702 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1703 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1704 let filtered_none = folded.filter(q!(|_| false));
1705
1706 let out_recv = filtered_none.into_stream().sim_output();
1707
1708 flow.sim().exhaustive(async || {
1709 out_recv.assert_yields_only([] as [i32; 0]).await;
1710 });
1711 }
1712}