Skip to main content

hydro_lang/
singleton_ref.rs

1//! Singleton reference handle for capturing singletons in `q!()` closures.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::rc::Rc;
6
7use proc_macro2::Span;
8use quote::quote;
9use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
10
11use crate::compile::ir::{HydroNode, SharedNode};
12use crate::location::Location;
13
14/// A lightweight handle to a singleton that can be captured inside `q!()` closures.
15///
16/// Created via [`Singleton::by_ref()`](crate::live_collections::Singleton::by_ref). When used
17/// inside a `q!()` closure, resolves to a reference to the singleton's value (`&T`) at runtime.
18///
19/// This type is `Copy` (required by `q!()` macro internals).
20/// TODO(mingwei): <https://github.com/hydro-project/stageleft/issues/73>
21pub struct SingletonRef<'a, 'slf, T, L> {
22    /// Will be updated to `HydroNode::Singleton` when used, if not already.
23    pub(crate) ir_node: &'slf RefCell<HydroNode>,
24    _phantom: PhantomData<(&'a T, L)>,
25}
26impl<'slf, T, L> SingletonRef<'_, 'slf, T, L> {
27    /// Creates a `SingletonRef` from a shared node.
28    pub(crate) fn new(ir_node: &'slf RefCell<HydroNode>) -> Self {
29        Self {
30            ir_node,
31            _phantom: PhantomData,
32        }
33    }
34}
35
36impl<T, L> Copy for SingletonRef<'_, '_, T, L> {}
37impl<T, L> Clone for SingletonRef<'_, '_, T, L> {
38    fn clone(&self) -> Self {
39        *self
40    }
41}
42
43// Thread-local storage for singleton references captured during `q!()` expansion.
44// Maps local ident name -> SharedNode for each singleton captured in the current closure.
45thread_local! {
46    static SINGLETON_REFS: RefCell<Option<Vec<(syn::Ident, HydroNode)>>> = const { RefCell::new(None) };
47}
48
49/// Activate the singleton reference capture context. Must be called before `q!()` expansion
50/// that may capture singletons. Returns a `ClosureExpr` bundling the expression with any
51/// captured singleton references.
52pub fn with_singleton_capture(
53    f: impl FnOnce() -> crate::compile::ir::DebugExpr,
54) -> crate::compile::ir::ClosureExpr {
55    SINGLETON_REFS.with(|cell| {
56        let prev = cell.borrow_mut().replace(Vec::new());
57        assert!(
58            prev.is_none(),
59            "nested singleton capture scopes are not supported"
60        );
61    });
62    let expr = f();
63    let singleton_refs = SINGLETON_REFS.with(|cell| cell.borrow_mut().take().unwrap());
64    crate::compile::ir::ClosureExpr::new(expr, singleton_refs)
65}
66
67static SINGLETON_REF_COUNTER: std::sync::atomic::AtomicUsize =
68    std::sync::atomic::AtomicUsize::new(0);
69
70impl<'a, 'slf, T: 'a, L> FreeVariableWithContextWithProps<L, ()> for SingletonRef<'a, 'slf, T, L>
71where
72    L: Location<'a>,
73{
74    type O = &'a T;
75
76    fn to_tokens(self, _ctx: &L) -> (QuoteTokens, ()) {
77        let id = SINGLETON_REF_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
78        let ident = syn::Ident::new(&format!("__hydro_singleton_ref_{}", id), Span::call_site());
79
80        SINGLETON_REFS.with(|cell| {
81            let mut guard = cell.borrow_mut();
82            let refs = guard.as_mut().expect(
83                "SingletonRef used inside q!() but no singleton capture scope is active. \
84                 This is a bug — singleton capture should be set up by the operator that uses q!().",
85            );
86
87            let metadata = self.ir_node.borrow().metadata().clone();
88
89            // Wrap in HydroNode::Singleton for materialization + identity tracking. If already a Singleton node,
90            // reuse it.
91            if !matches!(&*self.ir_node.borrow(), HydroNode::Singleton { .. }) {
92                let orig = self.ir_node.replace(HydroNode::Placeholder);
93                *self.ir_node.borrow_mut() = HydroNode::Singleton {
94                    inner: SharedNode(Rc::new(RefCell::new(orig))),
95                    metadata: metadata.clone(),
96                };
97            }
98
99            let borrow: std::cell::Ref<'_, HydroNode> = self.ir_node.borrow();
100            let HydroNode::Singleton { inner, .. } = &*borrow else {
101                unreachable!()
102            };
103
104            refs.push((
105                ident.clone(),
106                HydroNode::Singleton {
107                    inner: SharedNode(Rc::clone(&inner.0)),
108                    metadata,
109                },
110            ));
111        });
112
113        (
114            QuoteTokens {
115                prelude: None,
116                expr: Some(quote!(#ident)),
117            },
118            (),
119        )
120    }
121}
122
123#[cfg(test)]
124#[cfg(feature = "build")]
125mod tests {
126    use stageleft::q;
127
128    use crate::compile::builder::FlowBuilder;
129    use crate::location::Location;
130
131    struct P1 {}
132
133    /// Compile-only test: verifies that `by_ref()` + `q!()` produces valid IR
134    /// that can be finalized without panicking.
135    #[test]
136    fn singleton_by_ref_compiles() {
137        let mut flow = FlowBuilder::new();
138        let node = flow.process::<P1>();
139
140        let my_count = node
141            .source_iter(q!(0..5i32))
142            .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
143        let count_ref = my_count.by_ref();
144
145        node.source_iter(q!(1..=3i32))
146            .map(q!(|x| x + *count_ref))
147            .for_each(q!(|_| {}));
148
149        // Also consume the singleton via pipe (tests Tee works correctly).
150        my_count.into_stream().for_each(q!(|_| {}));
151
152        // If this doesn't panic, the IR was built successfully with singleton refs.
153        let _built = flow.finalize();
154    }
155
156    /// Test with a non-Copy type (Vec) to ensure we're borrowing, not copying.
157    #[test]
158    fn singleton_by_ref_non_copy() {
159        let mut flow = FlowBuilder::new();
160        let node = flow.process::<P1>();
161
162        let my_vec = node.source_iter(q!(0..5i32)).fold(
163            q!(|| Vec::<i32>::new()),
164            q!(|acc: &mut Vec<i32>, x| acc.push(x)),
165        );
166        let vec_ref = my_vec.by_ref();
167
168        node.source_iter(q!(1..=3i32))
169            .map(q!(|x| x + vec_ref.len() as i32))
170            .for_each(q!(|_| {}));
171
172        // Also consume the singleton via pipe.
173        my_vec.into_stream().for_each(q!(|_| {}));
174
175        let _built = flow.finalize();
176    }
177
178    /// Compile-only: singleton ref inside filter closure.
179    #[test]
180    fn singleton_by_ref_filter() {
181        let mut flow = FlowBuilder::new();
182        let node = flow.process::<P1>();
183
184        let threshold = node
185            .source_iter(q!(0..5i32))
186            .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
187        let threshold_ref = threshold.by_ref();
188
189        node.source_iter(q!(1..=10i32))
190            .filter(q!(|x| *x > *threshold_ref))
191            .for_each(q!(|_| {}));
192
193        threshold.into_stream().for_each(q!(|_| {}));
194        let _built = flow.finalize();
195    }
196
197    /// Compile-only: singleton ref inside flat_map closure.
198    #[test]
199    fn singleton_by_ref_flat_map() {
200        let mut flow = FlowBuilder::new();
201        let node = flow.process::<P1>();
202
203        let count = node
204            .source_iter(q!(0..3i32))
205            .fold(q!(|| 0i32), q!(|acc: &mut i32, _| *acc += 1));
206        let count_ref = count.by_ref();
207
208        node.source_iter(q!(1..=2i32))
209            .flat_map_ordered(q!(|x| (0..*count_ref).map(move |i| x + i)))
210            .for_each(q!(|_| {}));
211
212        count.into_stream().for_each(q!(|_| {}));
213        let _built = flow.finalize();
214    }
215
216    /// Compile-only: singleton ref inside inspect closure.
217    #[test]
218    fn singleton_by_ref_inspect() {
219        let mut flow = FlowBuilder::new();
220        let node = flow.process::<P1>();
221
222        let count = node
223            .source_iter(q!(0..5i32))
224            .fold(q!(|| 0i32), q!(|acc: &mut i32, _| *acc += 1));
225        let count_ref = count.by_ref();
226
227        node.source_iter(q!(1..=3i32))
228            .inspect(q!(|x| println!("count={}, x={}", *count_ref, x)))
229            .for_each(q!(|_| {}));
230
231        count.into_stream().for_each(q!(|_| {}));
232        let _built = flow.finalize();
233    }
234
235    /// Compile-only: singleton ref inside partition predicate.
236    #[test]
237    fn singleton_by_ref_partition() {
238        let mut flow = FlowBuilder::new();
239        let node = flow.process::<P1>();
240
241        let threshold = node
242            .source_iter(q!(0..5i32))
243            .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
244        let threshold_ref = threshold.by_ref();
245
246        let (above, below) = node
247            .source_iter(q!(1..=10i32))
248            .partition(q!(|x| *x > *threshold_ref));
249
250        above.for_each(q!(|_| {}));
251        below.for_each(q!(|_| {}));
252        threshold.into_stream().for_each(q!(|_| {}));
253        let _built = flow.finalize();
254    }
255
256    /// Compile-only: singleton ref inside partition with downstream operators on both branches.
257    ///
258    /// This exercises the ident_stack pop logic in the "already built" path of Partition
259    /// code generation. When the second branch is processed, singleton ref idents pushed by
260    /// transform_children must be popped to keep the stack consistent for downstream ops.
261    #[test]
262    fn singleton_by_ref_partition_with_downstream_ops() {
263        let mut flow = FlowBuilder::new();
264        let node = flow.process::<P1>();
265
266        let threshold = node
267            .source_iter(q!(0..5i32))
268            .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
269        let threshold_ref = threshold.by_ref();
270
271        let (above, below) = node
272            .source_iter(q!(1..=10i32))
273            .partition(q!(|x| *x > *threshold_ref));
274
275        // Downstream operators on both branches — if the pop is missing, these will fail
276        above.map(q!(|x| x * 2)).for_each(q!(|_| {}));
277        below.map(q!(|x| x + 100)).for_each(q!(|_| {}));
278        threshold.into_stream().for_each(q!(|_| {}));
279        let _built = flow.finalize();
280    }
281}