LCOV - differential code coverage report
Current view: top level - libs/utils/src - simple_rcu.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 99.2 % 118 117 1 117
Current Date: 2023-10-19 02:04:12 Functions: 71.4 % 28 20 8 20
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
       3                 : //! similar to a lock, but it allows readers to "hold on" to an old value of RCU
       4                 : //! without blocking writers, and allows writing a new values without blocking
       5                 : //! readers. When you update the new value, the new value is immediately visible
       6                 : //! to new readers, but the update waits until all existing readers have
       7                 : //! finishe, so that no one sees the old value anymore.
       8                 : //!
       9                 : //! This implementation isn't wait-free; it uses an RwLock that is held for a
      10                 : //! short duration when the value is read or updated.
      11                 : //!
      12                 : //! # Examples
      13                 : //!
      14                 : //! Read a value and do things with it while holding the guard:
      15                 : //!
      16                 : //! ```
      17                 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
      18                 : //! {
      19                 : //!     let read = rcu.read();
      20                 : //!     println!("the current value is {}", *read);
      21                 : //!     // exiting the scope drops the read-guard, and allows concurrent writers
      22                 : //!     // to finish.
      23                 : //! }
      24                 : //! ```
      25                 : //!
      26                 : //! Increment the value by one, and wait for old readers to finish:
      27                 : //!
      28                 : //! ```
      29                 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
      30                 : //! let write_guard = rcu.lock_for_write();
      31                 : //!
      32                 : //! // NB: holding `write_guard` blocks new readers and writers. Keep this section short!
      33                 : //! let new_value = *write_guard + 1;
      34                 : //!
      35                 : //! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard`
      36                 : //!
      37                 : //! // Concurrent reads and writes are now possible again. Wait for all the readers
      38                 : //! // that still observe the old value to finish.
      39                 : //! waitlist.wait();
      40                 : //! ```
      41                 : //!
      42                 : #![warn(missing_docs)]
      43                 : 
      44                 : use std::ops::Deref;
      45                 : use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
      46                 : use std::sync::{Arc, Weak};
      47                 : use std::sync::{Mutex, RwLock, RwLockWriteGuard};
      48                 : 
      49                 : ///
      50                 : /// Rcu allows multiple readers to read and hold onto a value without blocking
      51                 : /// (for very long).  Storing to the Rcu updates the value, making new readers
      52                 : /// immediately see the new value, but it also waits for all current readers to
      53                 : /// finish.
      54                 : ///
      55                 : pub struct Rcu<V> {
      56                 :     inner: RwLock<RcuInner<V>>,
      57                 : }
      58                 : 
      59                 : struct RcuInner<V> {
      60                 :     current_cell: Arc<RcuCell<V>>,
      61                 :     old_cells: Vec<Weak<RcuCell<V>>>,
      62                 : }
      63                 : 
      64                 : ///
      65                 : /// RcuCell holds one value. It can be the latest one, or an old one.
      66                 : ///
      67                 : struct RcuCell<V> {
      68                 :     value: V,
      69                 : 
      70                 :     /// A dummy channel. We never send anything to this channel. The point is
      71                 :     /// that when the RcuCell is dropped, any cloned Senders will be notified
      72                 :     /// that the channel is closed. Updaters can use this to wait out until the
      73                 :     /// RcuCell has been dropped, i.e. until the old value is no longer in use.
      74                 :     ///
      75                 :     /// We never do anything with the receiver, we just need to hold onto it so
      76                 :     /// that the Senders will be notified when it's dropped. But because it's
      77                 :     /// not Sync, we need a Mutex on it.
      78                 :     watch: (SyncSender<()>, Mutex<Receiver<()>>),
      79                 : }
      80                 : 
      81                 : impl<V> RcuCell<V> {
      82 CBC        1892 :     fn new(value: V) -> Self {
      83            1892 :         let (watch_sender, watch_receiver) = sync_channel(0);
      84            1892 :         RcuCell {
      85            1892 :             value,
      86            1892 :             watch: (watch_sender, Mutex::new(watch_receiver)),
      87            1892 :         }
      88            1892 :     }
      89                 : }
      90                 : 
      91                 : impl<V> Rcu<V> {
      92                 :     /// Create a new `Rcu`, initialized to `starting_val`
      93            1303 :     pub fn new(starting_val: V) -> Self {
      94            1303 :         let inner = RcuInner {
      95            1303 :             current_cell: Arc::new(RcuCell::new(starting_val)),
      96            1303 :             old_cells: Vec::new(),
      97            1303 :         };
      98            1303 :         Self {
      99            1303 :             inner: RwLock::new(inner),
     100            1303 :         }
     101            1303 :     }
     102                 : 
     103                 :     ///
     104                 :     /// Read current value. Any store() calls will block until the returned
     105                 :     /// guard object is dropped.
     106                 :     ///
     107         3789519 :     pub fn read(&self) -> RcuReadGuard<V> {
     108         3789519 :         let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell);
     109         3789519 :         RcuReadGuard { cell: current_cell }
     110         3789519 :     }
     111                 : 
     112                 :     ///
     113                 :     /// Lock the current value for updating. Returns a guard object that can be
     114                 :     /// used to read the current value, and to store a new value.
     115                 :     ///
     116                 :     /// Note: holding the write-guard blocks concurrent readers, so you should
     117                 :     /// finish the update and drop the guard quickly! Multiple writers can be
     118                 :     /// waiting on the RcuWriteGuard::store step at the same time, however.
     119                 :     ///
     120             589 :     pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> {
     121             589 :         let inner = self.inner.write().unwrap();
     122             589 :         RcuWriteGuard { inner }
     123             589 :     }
     124                 : }
     125                 : 
     126                 : ///
     127                 : /// Read guard returned by `read`
     128                 : ///
     129                 : pub struct RcuReadGuard<V> {
     130                 :     cell: Arc<RcuCell<V>>,
     131                 : }
     132                 : 
     133                 : impl<V> Deref for RcuReadGuard<V> {
     134                 :     type Target = V;
     135                 : 
     136         3789474 :     fn deref(&self) -> &V {
     137         3789474 :         &self.cell.value
     138         3789474 :     }
     139                 : }
     140                 : 
     141                 : ///
     142                 : /// Write guard returned by `write`
     143                 : ///
     144                 : /// NB: Holding this guard blocks all concurrent `read` and `write` calls, so
     145                 : /// it should only be held for a short duration!
     146                 : ///
     147                 : /// Calling `store` consumes the guard, making new reads and new writes possible
     148                 : /// again.
     149                 : ///
     150                 : pub struct RcuWriteGuard<'a, V> {
     151                 :     inner: RwLockWriteGuard<'a, RcuInner<V>>,
     152                 : }
     153                 : 
     154                 : impl<'a, V> Deref for RcuWriteGuard<'a, V> {
     155                 :     type Target = V;
     156                 : 
     157             589 :     fn deref(&self) -> &V {
     158             589 :         &self.inner.current_cell.value
     159             589 :     }
     160                 : }
     161                 : 
     162                 : impl<'a, V> RcuWriteGuard<'a, V> {
     163                 :     ///
     164                 :     /// Store a new value. The new value will be written to the Rcu immediately,
     165                 :     /// and will be immediately seen by any `read` calls that start afterwards.
     166                 :     ///
     167                 :     /// Returns a list of readers that can see old values. You can call `wait()`
     168                 :     /// on it to wait for them to finish.
     169                 :     ///
     170             589 :     pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList {
     171             589 :         let new_cell = Arc::new(RcuCell::new(new_val));
     172             589 : 
     173             589 :         let mut watches = Vec::new();
     174             589 :         {
     175             589 :             let old = std::mem::replace(&mut self.inner.current_cell, new_cell);
     176             589 :             self.inner.old_cells.push(Arc::downgrade(&old));
     177             589 : 
     178             589 :             // cleanup old cells that no longer have any readers, and collect
     179             589 :             // the watches for any that do.
     180             589 :             self.inner.old_cells.retain(|weak| {
     181            1073 :                 if let Some(cell) = weak.upgrade() {
     182             590 :                     watches.push(cell.watch.0.clone());
     183             590 :                     true
     184                 :                 } else {
     185             483 :                     false
     186                 :                 }
     187            1073 :             });
     188             589 :         }
     189             589 :         RcuWaitList(watches)
     190             589 :     }
     191                 : }
     192                 : 
     193                 : ///
     194                 : /// List of readers who can still see old values.
     195                 : ///
     196                 : pub struct RcuWaitList(Vec<SyncSender<()>>);
     197                 : 
     198                 : impl RcuWaitList {
     199                 :     ///
     200                 :     /// Wait for old readers to finish.
     201                 :     ///
     202             589 :     pub fn wait(mut self) {
     203                 :         // after all the old_cells are no longer in use, we're done
     204             590 :         for w in self.0.iter_mut() {
     205                 :             // This will block until the Receiver is closed. That happens when
     206                 :             // the RcuCell is dropped.
     207                 :             #[allow(clippy::single_match)]
     208             590 :             match w.send(()) {
     209 UBC           0 :                 Ok(_) => panic!("send() unexpectedly succeeded on dummy channel"),
     210 CBC         590 :                 Err(_) => {
     211             590 :                     // closed, which means that the cell has been dropped, and
     212             590 :                     // its value is no longer in use
     213             590 :                 }
     214                 :             }
     215                 :         }
     216             589 :     }
     217                 : }
     218                 : 
     219                 : #[cfg(test)]
     220                 : mod tests {
     221                 :     use super::*;
     222                 :     use std::sync::{Arc, Mutex};
     223                 :     use std::thread::{sleep, spawn};
     224                 :     use std::time::Duration;
     225                 : 
     226               1 :     #[test]
     227               1 :     fn two_writers() {
     228               1 :         let rcu = Rcu::new(1);
     229               1 : 
     230               1 :         let read1 = rcu.read();
     231               1 :         assert_eq!(*read1, 1);
     232                 : 
     233               1 :         let write2 = rcu.lock_for_write();
     234               1 :         assert_eq!(*write2, 1);
     235               1 :         let wait2 = write2.store_and_unlock(2);
     236               1 : 
     237               1 :         let read2 = rcu.read();
     238               1 :         assert_eq!(*read2, 2);
     239                 : 
     240               1 :         let write3 = rcu.lock_for_write();
     241               1 :         assert_eq!(*write3, 2);
     242               1 :         let wait3 = write3.store_and_unlock(3);
     243               1 : 
     244               1 :         // new reader can see the new value, and old readers continue to see the old values.
     245               1 :         let read3 = rcu.read();
     246               1 :         assert_eq!(*read3, 3);
     247               1 :         assert_eq!(*read2, 2);
     248               1 :         assert_eq!(*read1, 1);
     249                 : 
     250               1 :         let log = Arc::new(Mutex::new(Vec::new()));
     251               1 :         // Wait for the old readers to finish in separate threads.
     252               1 :         let log_clone = Arc::clone(&log);
     253               1 :         let thread2 = spawn(move || {
     254               1 :             wait2.wait();
     255               1 :             log_clone.lock().unwrap().push("wait2 done");
     256               1 :         });
     257               1 :         let log_clone = Arc::clone(&log);
     258               1 :         let thread3 = spawn(move || {
     259               1 :             wait3.wait();
     260               1 :             log_clone.lock().unwrap().push("wait3 done");
     261               1 :         });
     262               1 : 
     263               1 :         // without this sleep the test can pass on accident if the writer is slow
     264               1 :         sleep(Duration::from_millis(500));
     265               1 : 
     266               1 :         // Release first reader. This allows first write to finish, but calling
     267               1 :         // wait() on the second one would still block.
     268               1 :         log.lock().unwrap().push("dropping read1");
     269               1 :         drop(read1);
     270               1 :         thread2.join().unwrap();
     271               1 : 
     272               1 :         sleep(Duration::from_millis(500));
     273               1 : 
     274               1 :         // Release second reader, and finish second writer.
     275               1 :         log.lock().unwrap().push("dropping read2");
     276               1 :         drop(read2);
     277               1 :         thread3.join().unwrap();
     278               1 : 
     279               1 :         assert_eq!(
     280               1 :             log.lock().unwrap().as_slice(),
     281               1 :             &[
     282               1 :                 "dropping read1",
     283               1 :                 "wait2 done",
     284               1 :                 "dropping read2",
     285               1 :                 "wait3 done"
     286               1 :             ]
     287               1 :         );
     288               1 :     }
     289                 : }
        

Generated by: LCOV version 2.1-beta