hydro_lang/
singleton_ref.rs1use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::rc::Rc;
6
7use proc_macro2::Span;
8use quote::quote;
9use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
10
11use crate::compile::ir::{HydroNode, SharedNode};
12use crate::location::Location;
13
14pub struct SingletonRef<'a, 'slf, T, L> {
22 pub(crate) ir_node: &'slf RefCell<HydroNode>,
24 _phantom: PhantomData<(&'a T, L)>,
25}
26impl<'slf, T, L> SingletonRef<'_, 'slf, T, L> {
27 pub(crate) fn new(ir_node: &'slf RefCell<HydroNode>) -> Self {
29 Self {
30 ir_node,
31 _phantom: PhantomData,
32 }
33 }
34}
35
36impl<T, L> Copy for SingletonRef<'_, '_, T, L> {}
37impl<T, L> Clone for SingletonRef<'_, '_, T, L> {
38 fn clone(&self) -> Self {
39 *self
40 }
41}
42
43thread_local! {
46 static SINGLETON_REFS: RefCell<Option<Vec<(syn::Ident, HydroNode)>>> = const { RefCell::new(None) };
47}
48
49pub fn with_singleton_capture(
53 f: impl FnOnce() -> crate::compile::ir::DebugExpr,
54) -> crate::compile::ir::ClosureExpr {
55 SINGLETON_REFS.with(|cell| {
56 let prev = cell.borrow_mut().replace(Vec::new());
57 assert!(
58 prev.is_none(),
59 "nested singleton capture scopes are not supported"
60 );
61 });
62 let expr = f();
63 let singleton_refs = SINGLETON_REFS.with(|cell| cell.borrow_mut().take().unwrap());
64 crate::compile::ir::ClosureExpr::new(expr, singleton_refs)
65}
66
67static SINGLETON_REF_COUNTER: std::sync::atomic::AtomicUsize =
68 std::sync::atomic::AtomicUsize::new(0);
69
70impl<'a, 'slf, T: 'a, L> FreeVariableWithContextWithProps<L, ()> for SingletonRef<'a, 'slf, T, L>
71where
72 L: Location<'a>,
73{
74 type O = &'a T;
75
76 fn to_tokens(self, _ctx: &L) -> (QuoteTokens, ()) {
77 let id = SINGLETON_REF_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
78 let ident = syn::Ident::new(&format!("__hydro_singleton_ref_{}", id), Span::call_site());
79
80 SINGLETON_REFS.with(|cell| {
81 let mut guard = cell.borrow_mut();
82 let refs = guard.as_mut().expect(
83 "SingletonRef used inside q!() but no singleton capture scope is active. \
84 This is a bug — singleton capture should be set up by the operator that uses q!().",
85 );
86
87 let metadata = self.ir_node.borrow().metadata().clone();
88
89 if !matches!(&*self.ir_node.borrow(), HydroNode::Singleton { .. }) {
92 let orig = self.ir_node.replace(HydroNode::Placeholder);
93 *self.ir_node.borrow_mut() = HydroNode::Singleton {
94 inner: SharedNode(Rc::new(RefCell::new(orig))),
95 metadata: metadata.clone(),
96 };
97 }
98
99 let borrow: std::cell::Ref<'_, HydroNode> = self.ir_node.borrow();
100 let HydroNode::Singleton { inner, .. } = &*borrow else {
101 unreachable!()
102 };
103
104 refs.push((
105 ident.clone(),
106 HydroNode::Singleton {
107 inner: SharedNode(Rc::clone(&inner.0)),
108 metadata,
109 },
110 ));
111 });
112
113 (
114 QuoteTokens {
115 prelude: None,
116 expr: Some(quote!(#ident)),
117 },
118 (),
119 )
120 }
121}
122
123#[cfg(test)]
124#[cfg(feature = "build")]
125mod tests {
126 use stageleft::q;
127
128 use crate::compile::builder::FlowBuilder;
129 use crate::location::Location;
130
131 struct P1 {}
132
133 #[test]
136 fn singleton_by_ref_compiles() {
137 let mut flow = FlowBuilder::new();
138 let node = flow.process::<P1>();
139
140 let my_count = node
141 .source_iter(q!(0..5i32))
142 .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
143 let count_ref = my_count.by_ref();
144
145 node.source_iter(q!(1..=3i32))
146 .map(q!(|x| x + *count_ref))
147 .for_each(q!(|_| {}));
148
149 my_count.into_stream().for_each(q!(|_| {}));
151
152 let _built = flow.finalize();
154 }
155
156 #[test]
158 fn singleton_by_ref_non_copy() {
159 let mut flow = FlowBuilder::new();
160 let node = flow.process::<P1>();
161
162 let my_vec = node.source_iter(q!(0..5i32)).fold(
163 q!(|| Vec::<i32>::new()),
164 q!(|acc: &mut Vec<i32>, x| acc.push(x)),
165 );
166 let vec_ref = my_vec.by_ref();
167
168 node.source_iter(q!(1..=3i32))
169 .map(q!(|x| x + vec_ref.len() as i32))
170 .for_each(q!(|_| {}));
171
172 my_vec.into_stream().for_each(q!(|_| {}));
174
175 let _built = flow.finalize();
176 }
177
178 #[test]
180 fn singleton_by_ref_filter() {
181 let mut flow = FlowBuilder::new();
182 let node = flow.process::<P1>();
183
184 let threshold = node
185 .source_iter(q!(0..5i32))
186 .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
187 let threshold_ref = threshold.by_ref();
188
189 node.source_iter(q!(1..=10i32))
190 .filter(q!(|x| *x > *threshold_ref))
191 .for_each(q!(|_| {}));
192
193 threshold.into_stream().for_each(q!(|_| {}));
194 let _built = flow.finalize();
195 }
196
197 #[test]
199 fn singleton_by_ref_flat_map() {
200 let mut flow = FlowBuilder::new();
201 let node = flow.process::<P1>();
202
203 let count = node
204 .source_iter(q!(0..3i32))
205 .fold(q!(|| 0i32), q!(|acc: &mut i32, _| *acc += 1));
206 let count_ref = count.by_ref();
207
208 node.source_iter(q!(1..=2i32))
209 .flat_map_ordered(q!(|x| (0..*count_ref).map(move |i| x + i)))
210 .for_each(q!(|_| {}));
211
212 count.into_stream().for_each(q!(|_| {}));
213 let _built = flow.finalize();
214 }
215
216 #[test]
218 fn singleton_by_ref_inspect() {
219 let mut flow = FlowBuilder::new();
220 let node = flow.process::<P1>();
221
222 let count = node
223 .source_iter(q!(0..5i32))
224 .fold(q!(|| 0i32), q!(|acc: &mut i32, _| *acc += 1));
225 let count_ref = count.by_ref();
226
227 node.source_iter(q!(1..=3i32))
228 .inspect(q!(|x| println!("count={}, x={}", *count_ref, x)))
229 .for_each(q!(|_| {}));
230
231 count.into_stream().for_each(q!(|_| {}));
232 let _built = flow.finalize();
233 }
234
235 #[test]
237 fn singleton_by_ref_partition() {
238 let mut flow = FlowBuilder::new();
239 let node = flow.process::<P1>();
240
241 let threshold = node
242 .source_iter(q!(0..5i32))
243 .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
244 let threshold_ref = threshold.by_ref();
245
246 let (above, below) = node
247 .source_iter(q!(1..=10i32))
248 .partition(q!(|x| *x > *threshold_ref));
249
250 above.for_each(q!(|_| {}));
251 below.for_each(q!(|_| {}));
252 threshold.into_stream().for_each(q!(|_| {}));
253 let _built = flow.finalize();
254 }
255
256 #[test]
262 fn singleton_by_ref_partition_with_downstream_ops() {
263 let mut flow = FlowBuilder::new();
264 let node = flow.process::<P1>();
265
266 let threshold = node
267 .source_iter(q!(0..5i32))
268 .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
269 let threshold_ref = threshold.by_ref();
270
271 let (above, below) = node
272 .source_iter(q!(1..=10i32))
273 .partition(q!(|x| *x > *threshold_ref));
274
275 above.map(q!(|x| x * 2)).for_each(q!(|_| {}));
277 below.map(q!(|x| x + 100)).for_each(q!(|_| {}));
278 threshold.into_stream().for_each(q!(|_| {}));
279 let _built = flow.finalize();
280 }
281}