Line data Source code
1 : //!
2 : //! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
3 : //! similar to a lock, but it allows readers to "hold on" to an old value of RCU
4 : //! without blocking writers, and allows writing a new value without blocking
5 : //! readers. When you update the value, the new value is immediately visible
6 : //! to new readers, but the update waits until all existing readers have
7 : //! finished, so that on return, no one sees the old value anymore.
8 : //!
9 : //! This implementation isn't wait-free; it uses an RwLock that is held for a
10 : //! short duration when the value is read or updated.
11 : //!
12 : //! # Examples
13 : //!
14 : //! Read a value and do things with it while holding the guard:
15 : //!
16 : //! ```
17 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
18 : //! {
19 : //! let read = rcu.read();
20 : //! println!("the current value is {}", *read);
21 : //! // exiting the scope drops the read-guard, and allows concurrent writers
22 : //! // to finish.
23 : //! }
24 : //! ```
25 : //!
26 : //! Increment the value by one, and wait for old readers to finish:
27 : //!
28 : //! ```
29 : //! # async fn dox() {
30 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
31 : //! let write_guard = rcu.lock_for_write();
32 : //!
33 : //! // NB: holding `write_guard` blocks new readers and writers. Keep this section short!
34 : //! let new_value = *write_guard + 1;
35 : //!
36 : //! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard`
37 : //!
38 : //! // Concurrent reads and writes are now possible again. Wait for all the readers
39 : //! // that still observe the old value to finish.
40 : //! waitlist.wait().await;
41 : //! # }
42 : //! ```
43 : //!
44 : #![warn(missing_docs)]
45 :
46 : use std::ops::Deref;
47 : use std::sync::{Arc, Weak};
48 : use std::sync::{RwLock, RwLockWriteGuard};
49 :
50 : use tokio::sync::watch;
51 :
52 : /// Rcu allows multiple readers to read and hold onto a value without blocking
53 : /// (for very long).
54 : ///
55 : /// Storing to the Rcu updates the value, making new readers immediately see
56 : /// the new value, but it also waits for all current readers to finish.
57 : pub struct Rcu<V> {
58 : inner: RwLock<RcuInner<V>>,
59 : }
60 :
61 : struct RcuInner<V> {
62 : current_cell: Arc<RcuCell<V>>,
63 : old_cells: Vec<Weak<RcuCell<V>>>,
64 : }
65 :
66 : ///
67 : /// RcuCell holds one value. It can be the latest one, or an old one.
68 : ///
69 : struct RcuCell<V> {
70 : value: V,
71 :
72 : /// A dummy channel. We never send anything to this channel. The point is
73 : /// that when the RcuCell is dropped, any subscribed Receivers will be notified
74 : /// that the channel is closed. Updaters can use this to wait out until the
75 : /// RcuCell has been dropped, i.e. until the old value is no longer in use.
76 : ///
77 : /// We never send anything to this, we just need to hold onto it so that the
78 : /// Receivers will be notified when it's dropped.
79 : watch: watch::Sender<()>,
80 : }
81 :
82 : impl<V> RcuCell<V> {
83 3438 : fn new(value: V) -> Self {
84 3438 : let (watch_sender, _) = watch::channel(());
85 3438 : RcuCell {
86 3438 : value,
87 3438 : watch: watch_sender,
88 3438 : }
89 3438 : }
90 : }
91 :
92 : impl<V> Rcu<V> {
93 : /// Create a new `Rcu`, initialized to `starting_val`
94 1243 : pub fn new(starting_val: V) -> Self {
95 1243 : let inner = RcuInner {
96 1243 : current_cell: Arc::new(RcuCell::new(starting_val)),
97 1243 : old_cells: Vec::new(),
98 1243 : };
99 1243 : Self {
100 1243 : inner: RwLock::new(inner),
101 1243 : }
102 1243 : }
103 :
104 : ///
105 : /// Read current value. Any store() calls will block until the returned
106 : /// guard object is dropped.
107 : ///
108 7110 : pub fn read(&self) -> RcuReadGuard<V> {
109 7110 : let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell);
110 7110 : RcuReadGuard { cell: current_cell }
111 7110 : }
112 :
113 : ///
114 : /// Lock the current value for updating. Returns a guard object that can be
115 : /// used to read the current value, and to store a new value.
116 : ///
117 : /// Note: holding the write-guard blocks concurrent readers, so you should
118 : /// finish the update and drop the guard quickly! Multiple writers can be
119 : /// waiting on the RcuWriteGuard::store step at the same time, however.
120 : ///
121 2195 : pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> {
122 2195 : let inner = self.inner.write().unwrap();
123 2195 : RcuWriteGuard { inner }
124 2195 : }
125 : }
126 :
127 : ///
128 : /// Read guard returned by `read`
129 : ///
130 : pub struct RcuReadGuard<V> {
131 : cell: Arc<RcuCell<V>>,
132 : }
133 :
134 : impl<V> Deref for RcuReadGuard<V> {
135 : type Target = V;
136 :
137 7820 : fn deref(&self) -> &V {
138 7820 : &self.cell.value
139 7820 : }
140 : }
141 :
142 : ///
143 : /// Write guard returned by `write`
144 : ///
145 : /// NB: Holding this guard blocks all concurrent `read` and `write` calls, so it should only be
146 : /// held for a short duration!
147 : ///
148 : /// Calling [`Self::store_and_unlock`] consumes the guard, making new reads and new writes possible
149 : /// again.
150 : ///
151 : pub struct RcuWriteGuard<'a, V> {
152 : inner: RwLockWriteGuard<'a, RcuInner<V>>,
153 : }
154 :
155 : impl<'a, V> Deref for RcuWriteGuard<'a, V> {
156 : type Target = V;
157 :
158 2195 : fn deref(&self) -> &V {
159 2195 : &self.inner.current_cell.value
160 2195 : }
161 : }
162 :
163 : impl<'a, V> RcuWriteGuard<'a, V> {
164 : ///
165 : /// Store a new value. The new value will be written to the Rcu immediately,
166 : /// and will be immediately seen by any `read` calls that start afterwards.
167 : ///
168 : /// Returns a list of readers that can see old values. You can call `wait()`
169 : /// on it to wait for them to finish.
170 : ///
171 2195 : pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList {
172 2195 : let new_cell = Arc::new(RcuCell::new(new_val));
173 2195 :
174 2195 : let mut watches = Vec::new();
175 2195 : {
176 2195 : let old = std::mem::replace(&mut self.inner.current_cell, new_cell);
177 2195 : self.inner.old_cells.push(Arc::downgrade(&old));
178 2195 :
179 2195 : // cleanup old cells that no longer have any readers, and collect
180 2195 : // the watches for any that do.
181 4020 : self.inner.old_cells.retain(|weak| {
182 4020 : if let Some(cell) = weak.upgrade() {
183 2196 : watches.push(cell.watch.subscribe());
184 2196 : true
185 : } else {
186 1824 : false
187 : }
188 4020 : });
189 2195 : }
190 2195 : RcuWaitList(watches)
191 2195 : }
192 : }
193 :
194 : ///
195 : /// List of readers who can still see old values.
196 : ///
197 : pub struct RcuWaitList(Vec<watch::Receiver<()>>);
198 :
199 : impl RcuWaitList {
200 : ///
201 : /// Wait for old readers to finish.
202 : ///
203 2195 : pub async fn wait(mut self) {
204 : // after all the old_cells are no longer in use, we're done
205 2196 : for w in self.0.iter_mut() {
206 : // This will block until the Receiver is closed. That happens when
207 : // the RcuCell is dropped.
208 : #[allow(clippy::single_match)]
209 2196 : match w.changed().await {
210 0 : Ok(_) => panic!("changed() unexpectedly succeeded on dummy channel"),
211 2196 : Err(_) => {
212 2196 : // closed, which means that the cell has been dropped, and
213 2196 : // its value is no longer in use
214 2196 : }
215 : }
216 : }
217 2195 : }
218 : }
219 :
220 : #[cfg(test)]
221 : mod tests {
222 : use super::*;
223 : use std::sync::Mutex;
224 : use std::time::Duration;
225 :
226 : #[tokio::test]
227 1 : async fn two_writers() {
228 1 : let rcu = Rcu::new(1);
229 1 :
230 1 : let read1 = rcu.read();
231 1 : assert_eq!(*read1, 1);
232 1 :
233 1 : let write2 = rcu.lock_for_write();
234 1 : assert_eq!(*write2, 1);
235 1 : let wait2 = write2.store_and_unlock(2);
236 1 :
237 1 : let read2 = rcu.read();
238 1 : assert_eq!(*read2, 2);
239 1 :
240 1 : let write3 = rcu.lock_for_write();
241 1 : assert_eq!(*write3, 2);
242 1 : let wait3 = write3.store_and_unlock(3);
243 1 :
244 1 : // new reader can see the new value, and old readers continue to see the old values.
245 1 : let read3 = rcu.read();
246 1 : assert_eq!(*read3, 3);
247 1 : assert_eq!(*read2, 2);
248 1 : assert_eq!(*read1, 1);
249 1 :
250 1 : let log = Arc::new(Mutex::new(Vec::new()));
251 1 : // Wait for the old readers to finish in separate tasks.
252 1 : let log_clone = Arc::clone(&log);
253 1 : let task2 = tokio::spawn(async move {
254 1 : wait2.wait().await;
255 1 : log_clone.lock().unwrap().push("wait2 done");
256 1 : });
257 1 : let log_clone = Arc::clone(&log);
258 1 : let task3 = tokio::spawn(async move {
259 2 : wait3.wait().await;
260 1 : log_clone.lock().unwrap().push("wait3 done");
261 1 : });
262 1 :
263 1 : // without this sleep the test can pass on accident if the writer is slow
264 1 : tokio::time::sleep(Duration::from_millis(100)).await;
265 1 :
266 1 : // Release first reader. This allows first write to finish, but calling
267 1 : // wait() on the 'task3' would still block.
268 1 : log.lock().unwrap().push("dropping read1");
269 1 : drop(read1);
270 1 : task2.await.unwrap();
271 1 :
272 1 : assert!(!task3.is_finished());
273 1 :
274 1 : tokio::time::sleep(Duration::from_millis(100)).await;
275 1 :
276 1 : // Release second reader, and finish second writer.
277 1 : log.lock().unwrap().push("dropping read2");
278 1 : drop(read2);
279 1 : task3.await.unwrap();
280 1 :
281 1 : assert_eq!(
282 1 : log.lock().unwrap().as_slice(),
283 1 : &[
284 1 : "dropping read1",
285 1 : "wait2 done",
286 1 : "dropping read2",
287 1 : "wait3 done"
288 1 : ]
289 1 : );
290 1 : }
291 : }
|