Line data Source code
1 : //!
2 : //! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
3 : //! similar to a lock, but it allows readers to "hold on" to an old value of RCU
4 : //! without blocking writers, and allows writing a new value without blocking
5 : //! readers. When you update the value, the new value is immediately visible
6 : //! to new readers, but the update waits until all existing readers have
7 : //! finished, so that on return, no one sees the old value anymore.
8 : //!
9 : //! This implementation isn't wait-free; it uses an RwLock that is held for a
10 : //! short duration when the value is read or updated.
11 : //!
12 : //! # Examples
13 : //!
14 : //! Read a value and do things with it while holding the guard:
15 : //!
16 : //! ```
17 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
18 : //! {
19 : //! let read = rcu.read();
20 : //! println!("the current value is {}", *read);
21 : //! // exiting the scope drops the read-guard, and allows concurrent writers
22 : //! // to finish.
23 : //! }
24 : //! ```
25 : //!
26 : //! Increment the value by one, and wait for old readers to finish:
27 : //!
28 : //! ```
29 : //! # async fn dox() {
30 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
31 : //! let write_guard = rcu.lock_for_write();
32 : //!
33 : //! // NB: holding `write_guard` blocks new readers and writers. Keep this section short!
34 : //! let new_value = *write_guard + 1;
35 : //!
36 : //! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard`
37 : //!
38 : //! // Concurrent reads and writes are now possible again. Wait for all the readers
39 : //! // that still observe the old value to finish.
40 : //! waitlist.wait().await;
41 : //! # }
42 : //! ```
43 : //!
44 : #![warn(missing_docs)]
45 :
46 : use std::ops::Deref;
47 : use std::sync::{Arc, Weak};
48 : use std::sync::{RwLock, RwLockWriteGuard};
49 :
50 : use tokio::sync::watch;
51 :
52 : ///
53 : /// Rcu allows multiple readers to read and hold onto a value without blocking
54 : /// (for very long). Storing to the Rcu updates the value, making new readers
55 : /// immediately see the new value, but it also waits for all current readers to
56 : /// finish.
57 : ///
58 : pub struct Rcu<V> {
59 : inner: RwLock<RcuInner<V>>,
60 : }
61 :
62 : struct RcuInner<V> {
63 : current_cell: Arc<RcuCell<V>>,
64 : old_cells: Vec<Weak<RcuCell<V>>>,
65 : }
66 :
67 : ///
68 : /// RcuCell holds one value. It can be the latest one, or an old one.
69 : ///
70 : struct RcuCell<V> {
71 : value: V,
72 :
73 : /// A dummy channel. We never send anything to this channel. The point is
74 : /// that when the RcuCell is dropped, any subscribed Receivers will be notified
75 : /// that the channel is closed. Updaters can use this to wait out until the
76 : /// RcuCell has been dropped, i.e. until the old value is no longer in use.
77 : ///
78 : /// We never send anything to this, we just need to hold onto it so that the
79 : /// Receivers will be notified when it's dropped.
80 : watch: watch::Sender<()>,
81 : }
82 :
83 : impl<V> RcuCell<V> {
84 2307 : fn new(value: V) -> Self {
85 2307 : let (watch_sender, _) = watch::channel(());
86 2307 : RcuCell {
87 2307 : value,
88 2307 : watch: watch_sender,
89 2307 : }
90 2307 : }
91 : }
92 :
93 : impl<V> Rcu<V> {
94 : /// Create a new `Rcu`, initialized to `starting_val`
95 1592 : pub fn new(starting_val: V) -> Self {
96 1592 : let inner = RcuInner {
97 1592 : current_cell: Arc::new(RcuCell::new(starting_val)),
98 1592 : old_cells: Vec::new(),
99 1592 : };
100 1592 : Self {
101 1592 : inner: RwLock::new(inner),
102 1592 : }
103 1592 : }
104 :
105 : ///
106 : /// Read current value. Any store() calls will block until the returned
107 : /// guard object is dropped.
108 : ///
109 4520174 : pub fn read(&self) -> RcuReadGuard<V> {
110 4520174 : let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell);
111 4520174 : RcuReadGuard { cell: current_cell }
112 4520174 : }
113 :
114 : ///
115 : /// Lock the current value for updating. Returns a guard object that can be
116 : /// used to read the current value, and to store a new value.
117 : ///
118 : /// Note: holding the write-guard blocks concurrent readers, so you should
119 : /// finish the update and drop the guard quickly! Multiple writers can be
120 : /// waiting on the RcuWriteGuard::store step at the same time, however.
121 : ///
122 715 : pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> {
123 715 : let inner = self.inner.write().unwrap();
124 715 : RcuWriteGuard { inner }
125 715 : }
126 : }
127 :
128 : ///
129 : /// Read guard returned by `read`
130 : ///
131 : pub struct RcuReadGuard<V> {
132 : cell: Arc<RcuCell<V>>,
133 : }
134 :
135 : impl<V> Deref for RcuReadGuard<V> {
136 : type Target = V;
137 :
138 4520234 : fn deref(&self) -> &V {
139 4520234 : &self.cell.value
140 4520234 : }
141 : }
142 :
143 : ///
144 : /// Write guard returned by `write`
145 : ///
146 : /// NB: Holding this guard blocks all concurrent `read` and `write` calls, so it should only be
147 : /// held for a short duration!
148 : ///
149 : /// Calling [`Self::store_and_unlock`] consumes the guard, making new reads and new writes possible
150 : /// again.
151 : ///
152 : pub struct RcuWriteGuard<'a, V> {
153 : inner: RwLockWriteGuard<'a, RcuInner<V>>,
154 : }
155 :
156 : impl<'a, V> Deref for RcuWriteGuard<'a, V> {
157 : type Target = V;
158 :
159 715 : fn deref(&self) -> &V {
160 715 : &self.inner.current_cell.value
161 715 : }
162 : }
163 :
164 : impl<'a, V> RcuWriteGuard<'a, V> {
165 : ///
166 : /// Store a new value. The new value will be written to the Rcu immediately,
167 : /// and will be immediately seen by any `read` calls that start afterwards.
168 : ///
169 : /// Returns a list of readers that can see old values. You can call `wait()`
170 : /// on it to wait for them to finish.
171 : ///
172 715 : pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList {
173 715 : let new_cell = Arc::new(RcuCell::new(new_val));
174 715 :
175 715 : let mut watches = Vec::new();
176 715 : {
177 715 : let old = std::mem::replace(&mut self.inner.current_cell, new_cell);
178 715 : self.inner.old_cells.push(Arc::downgrade(&old));
179 715 :
180 715 : // cleanup old cells that no longer have any readers, and collect
181 715 : // the watches for any that do.
182 1274 : self.inner.old_cells.retain(|weak| {
183 1274 : if let Some(cell) = weak.upgrade() {
184 717 : watches.push(cell.watch.subscribe());
185 717 : true
186 : } else {
187 557 : false
188 : }
189 1274 : });
190 715 : }
191 715 : RcuWaitList(watches)
192 715 : }
193 : }
194 :
195 : ///
196 : /// List of readers who can still see old values.
197 : ///
198 : pub struct RcuWaitList(Vec<watch::Receiver<()>>);
199 :
200 : impl RcuWaitList {
201 : ///
202 : /// Wait for old readers to finish.
203 : ///
204 715 : pub async fn wait(mut self) {
205 : // after all the old_cells are no longer in use, we're done
206 717 : for w in self.0.iter_mut() {
207 : // This will block until the Receiver is closed. That happens when
208 : // the RcuCell is dropped.
209 : #[allow(clippy::single_match)]
210 717 : match w.changed().await {
211 0 : Ok(_) => panic!("changed() unexpectedly succeeded on dummy channel"),
212 717 : Err(_) => {
213 717 : // closed, which means that the cell has been dropped, and
214 717 : // its value is no longer in use
215 717 : }
216 : }
217 : }
218 715 : }
219 : }
220 :
221 : #[cfg(test)]
222 : mod tests {
223 : use super::*;
224 : use std::sync::{Arc, Mutex};
225 : use std::time::Duration;
226 :
227 2 : #[tokio::test]
228 2 : async fn two_writers() {
229 2 : let rcu = Rcu::new(1);
230 2 :
231 2 : let read1 = rcu.read();
232 2 : assert_eq!(*read1, 1);
233 2 :
234 2 : let write2 = rcu.lock_for_write();
235 2 : assert_eq!(*write2, 1);
236 2 : let wait2 = write2.store_and_unlock(2);
237 2 :
238 2 : let read2 = rcu.read();
239 2 : assert_eq!(*read2, 2);
240 2 :
241 2 : let write3 = rcu.lock_for_write();
242 2 : assert_eq!(*write3, 2);
243 2 : let wait3 = write3.store_and_unlock(3);
244 2 :
245 2 : // new reader can see the new value, and old readers continue to see the old values.
246 2 : let read3 = rcu.read();
247 2 : assert_eq!(*read3, 3);
248 2 : assert_eq!(*read2, 2);
249 2 : assert_eq!(*read1, 1);
250 2 :
251 2 : let log = Arc::new(Mutex::new(Vec::new()));
252 2 : // Wait for the old readers to finish in separate tasks.
253 2 : let log_clone = Arc::clone(&log);
254 2 : let task2 = tokio::spawn(async move {
255 2 : wait2.wait().await;
256 2 : log_clone.lock().unwrap().push("wait2 done");
257 2 : });
258 2 : let log_clone = Arc::clone(&log);
259 2 : let task3 = tokio::spawn(async move {
260 4 : wait3.wait().await;
261 2 : log_clone.lock().unwrap().push("wait3 done");
262 2 : });
263 2 :
264 2 : // without this sleep the test can pass on accident if the writer is slow
265 2 : tokio::time::sleep(Duration::from_millis(100)).await;
266 2 :
267 2 : // Release first reader. This allows first write to finish, but calling
268 2 : // wait() on the 'task3' would still block.
269 2 : log.lock().unwrap().push("dropping read1");
270 2 : drop(read1);
271 2 : task2.await.unwrap();
272 2 :
273 2 : assert!(!task3.is_finished());
274 2 :
275 2 : tokio::time::sleep(Duration::from_millis(100)).await;
276 2 :
277 2 : // Release second reader, and finish second writer.
278 2 : log.lock().unwrap().push("dropping read2");
279 2 : drop(read2);
280 2 : task3.await.unwrap();
281 2 :
282 2 : assert_eq!(
283 2 : log.lock().unwrap().as_slice(),
284 2 : &[
285 2 : "dropping read1",
286 2 : "wait2 done",
287 2 : "dropping read2",
288 2 : "wait3 done"
289 2 : ]
290 2 : );
291 2 : }
292 : }
|