TLA Line data Source code
1 : //!
2 : //! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
3 : //! similar to a lock, but it allows readers to "hold on" to an old value of RCU
4 : //! without blocking writers, and allows writing a new values without blocking
5 : //! readers. When you update the new value, the new value is immediately visible
6 : //! to new readers, but the update waits until all existing readers have
7 : //! finishe, so that no one sees the old value anymore.
8 : //!
9 : //! This implementation isn't wait-free; it uses an RwLock that is held for a
10 : //! short duration when the value is read or updated.
11 : //!
12 : //! # Examples
13 : //!
14 : //! Read a value and do things with it while holding the guard:
15 : //!
16 : //! ```
17 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
18 : //! {
19 : //! let read = rcu.read();
20 : //! println!("the current value is {}", *read);
21 : //! // exiting the scope drops the read-guard, and allows concurrent writers
22 : //! // to finish.
23 : //! }
24 : //! ```
25 : //!
26 : //! Increment the value by one, and wait for old readers to finish:
27 : //!
28 : //! ```
29 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
30 : //! let write_guard = rcu.lock_for_write();
31 : //!
32 : //! // NB: holding `write_guard` blocks new readers and writers. Keep this section short!
33 : //! let new_value = *write_guard + 1;
34 : //!
35 : //! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard`
36 : //!
37 : //! // Concurrent reads and writes are now possible again. Wait for all the readers
38 : //! // that still observe the old value to finish.
39 : //! waitlist.wait();
40 : //! ```
41 : //!
42 : #![warn(missing_docs)]
43 :
44 : use std::ops::Deref;
45 : use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
46 : use std::sync::{Arc, Weak};
47 : use std::sync::{Mutex, RwLock, RwLockWriteGuard};
48 :
49 : ///
50 : /// Rcu allows multiple readers to read and hold onto a value without blocking
51 : /// (for very long). Storing to the Rcu updates the value, making new readers
52 : /// immediately see the new value, but it also waits for all current readers to
53 : /// finish.
54 : ///
55 : pub struct Rcu<V> {
56 : inner: RwLock<RcuInner<V>>,
57 : }
58 :
59 : struct RcuInner<V> {
60 : current_cell: Arc<RcuCell<V>>,
61 : old_cells: Vec<Weak<RcuCell<V>>>,
62 : }
63 :
64 : ///
65 : /// RcuCell holds one value. It can be the latest one, or an old one.
66 : ///
67 : struct RcuCell<V> {
68 : value: V,
69 :
70 : /// A dummy channel. We never send anything to this channel. The point is
71 : /// that when the RcuCell is dropped, any cloned Senders will be notified
72 : /// that the channel is closed. Updaters can use this to wait out until the
73 : /// RcuCell has been dropped, i.e. until the old value is no longer in use.
74 : ///
75 : /// We never do anything with the receiver, we just need to hold onto it so
76 : /// that the Senders will be notified when it's dropped. But because it's
77 : /// not Sync, we need a Mutex on it.
78 : watch: (SyncSender<()>, Mutex<Receiver<()>>),
79 : }
80 :
81 : impl<V> RcuCell<V> {
82 CBC 1892 : fn new(value: V) -> Self {
83 1892 : let (watch_sender, watch_receiver) = sync_channel(0);
84 1892 : RcuCell {
85 1892 : value,
86 1892 : watch: (watch_sender, Mutex::new(watch_receiver)),
87 1892 : }
88 1892 : }
89 : }
90 :
91 : impl<V> Rcu<V> {
92 : /// Create a new `Rcu`, initialized to `starting_val`
93 1303 : pub fn new(starting_val: V) -> Self {
94 1303 : let inner = RcuInner {
95 1303 : current_cell: Arc::new(RcuCell::new(starting_val)),
96 1303 : old_cells: Vec::new(),
97 1303 : };
98 1303 : Self {
99 1303 : inner: RwLock::new(inner),
100 1303 : }
101 1303 : }
102 :
103 : ///
104 : /// Read current value. Any store() calls will block until the returned
105 : /// guard object is dropped.
106 : ///
107 3789519 : pub fn read(&self) -> RcuReadGuard<V> {
108 3789519 : let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell);
109 3789519 : RcuReadGuard { cell: current_cell }
110 3789519 : }
111 :
112 : ///
113 : /// Lock the current value for updating. Returns a guard object that can be
114 : /// used to read the current value, and to store a new value.
115 : ///
116 : /// Note: holding the write-guard blocks concurrent readers, so you should
117 : /// finish the update and drop the guard quickly! Multiple writers can be
118 : /// waiting on the RcuWriteGuard::store step at the same time, however.
119 : ///
120 589 : pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> {
121 589 : let inner = self.inner.write().unwrap();
122 589 : RcuWriteGuard { inner }
123 589 : }
124 : }
125 :
126 : ///
127 : /// Read guard returned by `read`
128 : ///
129 : pub struct RcuReadGuard<V> {
130 : cell: Arc<RcuCell<V>>,
131 : }
132 :
133 : impl<V> Deref for RcuReadGuard<V> {
134 : type Target = V;
135 :
136 3789474 : fn deref(&self) -> &V {
137 3789474 : &self.cell.value
138 3789474 : }
139 : }
140 :
141 : ///
142 : /// Write guard returned by `write`
143 : ///
144 : /// NB: Holding this guard blocks all concurrent `read` and `write` calls, so
145 : /// it should only be held for a short duration!
146 : ///
147 : /// Calling `store` consumes the guard, making new reads and new writes possible
148 : /// again.
149 : ///
150 : pub struct RcuWriteGuard<'a, V> {
151 : inner: RwLockWriteGuard<'a, RcuInner<V>>,
152 : }
153 :
154 : impl<'a, V> Deref for RcuWriteGuard<'a, V> {
155 : type Target = V;
156 :
157 589 : fn deref(&self) -> &V {
158 589 : &self.inner.current_cell.value
159 589 : }
160 : }
161 :
162 : impl<'a, V> RcuWriteGuard<'a, V> {
163 : ///
164 : /// Store a new value. The new value will be written to the Rcu immediately,
165 : /// and will be immediately seen by any `read` calls that start afterwards.
166 : ///
167 : /// Returns a list of readers that can see old values. You can call `wait()`
168 : /// on it to wait for them to finish.
169 : ///
170 589 : pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList {
171 589 : let new_cell = Arc::new(RcuCell::new(new_val));
172 589 :
173 589 : let mut watches = Vec::new();
174 589 : {
175 589 : let old = std::mem::replace(&mut self.inner.current_cell, new_cell);
176 589 : self.inner.old_cells.push(Arc::downgrade(&old));
177 589 :
178 589 : // cleanup old cells that no longer have any readers, and collect
179 589 : // the watches for any that do.
180 589 : self.inner.old_cells.retain(|weak| {
181 1073 : if let Some(cell) = weak.upgrade() {
182 590 : watches.push(cell.watch.0.clone());
183 590 : true
184 : } else {
185 483 : false
186 : }
187 1073 : });
188 589 : }
189 589 : RcuWaitList(watches)
190 589 : }
191 : }
192 :
193 : ///
194 : /// List of readers who can still see old values.
195 : ///
196 : pub struct RcuWaitList(Vec<SyncSender<()>>);
197 :
198 : impl RcuWaitList {
199 : ///
200 : /// Wait for old readers to finish.
201 : ///
202 589 : pub fn wait(mut self) {
203 : // after all the old_cells are no longer in use, we're done
204 590 : for w in self.0.iter_mut() {
205 : // This will block until the Receiver is closed. That happens when
206 : // the RcuCell is dropped.
207 : #[allow(clippy::single_match)]
208 590 : match w.send(()) {
209 UBC 0 : Ok(_) => panic!("send() unexpectedly succeeded on dummy channel"),
210 CBC 590 : Err(_) => {
211 590 : // closed, which means that the cell has been dropped, and
212 590 : // its value is no longer in use
213 590 : }
214 : }
215 : }
216 589 : }
217 : }
218 :
219 : #[cfg(test)]
220 : mod tests {
221 : use super::*;
222 : use std::sync::{Arc, Mutex};
223 : use std::thread::{sleep, spawn};
224 : use std::time::Duration;
225 :
226 1 : #[test]
227 1 : fn two_writers() {
228 1 : let rcu = Rcu::new(1);
229 1 :
230 1 : let read1 = rcu.read();
231 1 : assert_eq!(*read1, 1);
232 :
233 1 : let write2 = rcu.lock_for_write();
234 1 : assert_eq!(*write2, 1);
235 1 : let wait2 = write2.store_and_unlock(2);
236 1 :
237 1 : let read2 = rcu.read();
238 1 : assert_eq!(*read2, 2);
239 :
240 1 : let write3 = rcu.lock_for_write();
241 1 : assert_eq!(*write3, 2);
242 1 : let wait3 = write3.store_and_unlock(3);
243 1 :
244 1 : // new reader can see the new value, and old readers continue to see the old values.
245 1 : let read3 = rcu.read();
246 1 : assert_eq!(*read3, 3);
247 1 : assert_eq!(*read2, 2);
248 1 : assert_eq!(*read1, 1);
249 :
250 1 : let log = Arc::new(Mutex::new(Vec::new()));
251 1 : // Wait for the old readers to finish in separate threads.
252 1 : let log_clone = Arc::clone(&log);
253 1 : let thread2 = spawn(move || {
254 1 : wait2.wait();
255 1 : log_clone.lock().unwrap().push("wait2 done");
256 1 : });
257 1 : let log_clone = Arc::clone(&log);
258 1 : let thread3 = spawn(move || {
259 1 : wait3.wait();
260 1 : log_clone.lock().unwrap().push("wait3 done");
261 1 : });
262 1 :
263 1 : // without this sleep the test can pass on accident if the writer is slow
264 1 : sleep(Duration::from_millis(500));
265 1 :
266 1 : // Release first reader. This allows first write to finish, but calling
267 1 : // wait() on the second one would still block.
268 1 : log.lock().unwrap().push("dropping read1");
269 1 : drop(read1);
270 1 : thread2.join().unwrap();
271 1 :
272 1 : sleep(Duration::from_millis(500));
273 1 :
274 1 : // Release second reader, and finish second writer.
275 1 : log.lock().unwrap().push("dropping read2");
276 1 : drop(read2);
277 1 : thread3.join().unwrap();
278 1 :
279 1 : assert_eq!(
280 1 : log.lock().unwrap().as_slice(),
281 1 : &[
282 1 : "dropping read1",
283 1 : "wait2 done",
284 1 : "dropping read2",
285 1 : "wait3 done"
286 1 : ]
287 1 : );
288 1 : }
289 : }
|