Line data Source code
1 : //!
2 : //! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
3 : //! similar to a lock, but it allows readers to "hold on" to an old value of RCU
4 : //! without blocking writers, and allows writing a new value without blocking
5 : //! readers. When you update the value, the new value is immediately visible
6 : //! to new readers, but the update waits until all existing readers have
7 : //! finished, so that on return, no one sees the old value anymore.
8 : //!
9 : //! This implementation isn't wait-free; it uses an RwLock that is held for a
10 : //! short duration when the value is read or updated.
11 : //!
12 : //! # Examples
13 : //!
14 : //! Read a value and do things with it while holding the guard:
15 : //!
16 : //! ```
17 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
18 : //! {
19 : //! let read = rcu.read();
20 : //! println!("the current value is {}", *read);
21 : //! // exiting the scope drops the read-guard, and allows concurrent writers
22 : //! // to finish.
23 : //! }
24 : //! ```
25 : //!
26 : //! Increment the value by one, and wait for old readers to finish:
27 : //!
28 : //! ```
29 : //! # async fn dox() {
30 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
31 : //! let write_guard = rcu.lock_for_write();
32 : //!
33 : //! // NB: holding `write_guard` blocks new readers and writers. Keep this section short!
34 : //! let new_value = *write_guard + 1;
35 : //!
36 : //! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard`
37 : //!
38 : //! // Concurrent reads and writes are now possible again. Wait for all the readers
39 : //! // that still observe the old value to finish.
40 : //! waitlist.wait().await;
41 : //! # }
42 : //! ```
43 : //!
44 : #![warn(missing_docs)]
45 :
46 : use std::ops::Deref;
47 : use std::sync::{Arc, Weak};
48 : use std::sync::{RwLock, RwLockWriteGuard};
49 :
50 : use tokio::sync::watch;
51 :
52 : ///
53 : /// Rcu allows multiple readers to read and hold onto a value without blocking
54 : /// (for very long). Storing to the Rcu updates the value, making new readers
55 : /// immediately see the new value, but it also waits for all current readers to
56 : /// finish.
57 : ///
58 : pub struct Rcu<V> {
59 : inner: RwLock<RcuInner<V>>,
60 : }
61 :
62 : struct RcuInner<V> {
63 : current_cell: Arc<RcuCell<V>>,
64 : old_cells: Vec<Weak<RcuCell<V>>>,
65 : }
66 :
67 : ///
68 : /// RcuCell holds one value. It can be the latest one, or an old one.
69 : ///
70 : struct RcuCell<V> {
71 : value: V,
72 :
73 : /// A dummy channel. We never send anything to this channel. The point is
74 : /// that when the RcuCell is dropped, any subscribed Receivers will be notified
75 : /// that the channel is closed. Updaters can use this to wait out until the
76 : /// RcuCell has been dropped, i.e. until the old value is no longer in use.
77 : ///
78 : /// We never send anything to this, we just need to hold onto it so that the
79 : /// Receivers will be notified when it's dropped.
80 : watch: watch::Sender<()>,
81 : }
82 :
83 : impl<V> RcuCell<V> {
84 3433 : fn new(value: V) -> Self {
85 3433 : let (watch_sender, _) = watch::channel(());
86 3433 : RcuCell {
87 3433 : value,
88 3433 : watch: watch_sender,
89 3433 : }
90 3433 : }
91 : }
92 :
93 : impl<V> Rcu<V> {
94 : /// Create a new `Rcu`, initialized to `starting_val`
95 1237 : pub fn new(starting_val: V) -> Self {
96 1237 : let inner = RcuInner {
97 1237 : current_cell: Arc::new(RcuCell::new(starting_val)),
98 1237 : old_cells: Vec::new(),
99 1237 : };
100 1237 : Self {
101 1237 : inner: RwLock::new(inner),
102 1237 : }
103 1237 : }
104 :
105 : ///
106 : /// Read current value. Any store() calls will block until the returned
107 : /// guard object is dropped.
108 : ///
109 7105 : pub fn read(&self) -> RcuReadGuard<V> {
110 7105 : let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell);
111 7105 : RcuReadGuard { cell: current_cell }
112 7105 : }
113 :
114 : ///
115 : /// Lock the current value for updating. Returns a guard object that can be
116 : /// used to read the current value, and to store a new value.
117 : ///
118 : /// Note: holding the write-guard blocks concurrent readers, so you should
119 : /// finish the update and drop the guard quickly! Multiple writers can be
120 : /// waiting on the RcuWriteGuard::store step at the same time, however.
121 : ///
122 2196 : pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> {
123 2196 : let inner = self.inner.write().unwrap();
124 2196 : RcuWriteGuard { inner }
125 2196 : }
126 : }
127 :
128 : ///
129 : /// Read guard returned by `read`
130 : ///
131 : pub struct RcuReadGuard<V> {
132 : cell: Arc<RcuCell<V>>,
133 : }
134 :
135 : impl<V> Deref for RcuReadGuard<V> {
136 : type Target = V;
137 :
138 7815 : fn deref(&self) -> &V {
139 7815 : &self.cell.value
140 7815 : }
141 : }
142 :
143 : ///
144 : /// Write guard returned by `write`
145 : ///
146 : /// NB: Holding this guard blocks all concurrent `read` and `write` calls, so it should only be
147 : /// held for a short duration!
148 : ///
149 : /// Calling [`Self::store_and_unlock`] consumes the guard, making new reads and new writes possible
150 : /// again.
151 : ///
152 : pub struct RcuWriteGuard<'a, V> {
153 : inner: RwLockWriteGuard<'a, RcuInner<V>>,
154 : }
155 :
156 : impl<'a, V> Deref for RcuWriteGuard<'a, V> {
157 : type Target = V;
158 :
159 2196 : fn deref(&self) -> &V {
160 2196 : &self.inner.current_cell.value
161 2196 : }
162 : }
163 :
164 : impl<'a, V> RcuWriteGuard<'a, V> {
165 : ///
166 : /// Store a new value. The new value will be written to the Rcu immediately,
167 : /// and will be immediately seen by any `read` calls that start afterwards.
168 : ///
169 : /// Returns a list of readers that can see old values. You can call `wait()`
170 : /// on it to wait for them to finish.
171 : ///
172 2196 : pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList {
173 2196 : let new_cell = Arc::new(RcuCell::new(new_val));
174 2196 :
175 2196 : let mut watches = Vec::new();
176 2196 : {
177 2196 : let old = std::mem::replace(&mut self.inner.current_cell, new_cell);
178 2196 : self.inner.old_cells.push(Arc::downgrade(&old));
179 2196 :
180 2196 : // cleanup old cells that no longer have any readers, and collect
181 2196 : // the watches for any that do.
182 4021 : self.inner.old_cells.retain(|weak| {
183 4021 : if let Some(cell) = weak.upgrade() {
184 2197 : watches.push(cell.watch.subscribe());
185 2197 : true
186 : } else {
187 1824 : false
188 : }
189 4021 : });
190 2196 : }
191 2196 : RcuWaitList(watches)
192 2196 : }
193 : }
194 :
195 : ///
196 : /// List of readers who can still see old values.
197 : ///
198 : pub struct RcuWaitList(Vec<watch::Receiver<()>>);
199 :
200 : impl RcuWaitList {
201 : ///
202 : /// Wait for old readers to finish.
203 : ///
204 2196 : pub async fn wait(mut self) {
205 : // after all the old_cells are no longer in use, we're done
206 2197 : for w in self.0.iter_mut() {
207 : // This will block until the Receiver is closed. That happens when
208 : // the RcuCell is dropped.
209 : #[allow(clippy::single_match)]
210 2197 : match w.changed().await {
211 0 : Ok(_) => panic!("changed() unexpectedly succeeded on dummy channel"),
212 2197 : Err(_) => {
213 2197 : // closed, which means that the cell has been dropped, and
214 2197 : // its value is no longer in use
215 2197 : }
216 : }
217 : }
218 2196 : }
219 : }
220 :
221 : #[cfg(test)]
222 : mod tests {
223 : use super::*;
224 : use std::sync::Mutex;
225 : use std::time::Duration;
226 :
227 : #[tokio::test]
228 1 : async fn two_writers() {
229 1 : let rcu = Rcu::new(1);
230 1 :
231 1 : let read1 = rcu.read();
232 1 : assert_eq!(*read1, 1);
233 1 :
234 1 : let write2 = rcu.lock_for_write();
235 1 : assert_eq!(*write2, 1);
236 1 : let wait2 = write2.store_and_unlock(2);
237 1 :
238 1 : let read2 = rcu.read();
239 1 : assert_eq!(*read2, 2);
240 1 :
241 1 : let write3 = rcu.lock_for_write();
242 1 : assert_eq!(*write3, 2);
243 1 : let wait3 = write3.store_and_unlock(3);
244 1 :
245 1 : // new reader can see the new value, and old readers continue to see the old values.
246 1 : let read3 = rcu.read();
247 1 : assert_eq!(*read3, 3);
248 1 : assert_eq!(*read2, 2);
249 1 : assert_eq!(*read1, 1);
250 1 :
251 1 : let log = Arc::new(Mutex::new(Vec::new()));
252 1 : // Wait for the old readers to finish in separate tasks.
253 1 : let log_clone = Arc::clone(&log);
254 1 : let task2 = tokio::spawn(async move {
255 1 : wait2.wait().await;
256 1 : log_clone.lock().unwrap().push("wait2 done");
257 1 : });
258 1 : let log_clone = Arc::clone(&log);
259 1 : let task3 = tokio::spawn(async move {
260 2 : wait3.wait().await;
261 1 : log_clone.lock().unwrap().push("wait3 done");
262 1 : });
263 1 :
264 1 : // without this sleep the test can pass on accident if the writer is slow
265 1 : tokio::time::sleep(Duration::from_millis(100)).await;
266 1 :
267 1 : // Release first reader. This allows first write to finish, but calling
268 1 : // wait() on the 'task3' would still block.
269 1 : log.lock().unwrap().push("dropping read1");
270 1 : drop(read1);
271 1 : task2.await.unwrap();
272 1 :
273 1 : assert!(!task3.is_finished());
274 1 :
275 1 : tokio::time::sleep(Duration::from_millis(100)).await;
276 1 :
277 1 : // Release second reader, and finish second writer.
278 1 : log.lock().unwrap().push("dropping read2");
279 1 : drop(read2);
280 1 : task3.await.unwrap();
281 1 :
282 1 : assert_eq!(
283 1 : log.lock().unwrap().as_slice(),
284 1 : &[
285 1 : "dropping read1",
286 1 : "wait2 done",
287 1 : "dropping read2",
288 1 : "wait3 done"
289 1 : ]
290 1 : );
291 1 : }
292 : }
|