LCOV - code coverage report
Current view: top level - libs/utils/src - simple_rcu.rs (source / functions) Coverage Total Hit
Test: 960803fca14b2e843c565dddf575f7017d250bc3.info Lines: 99.2 % 122 121
Test Date: 2024-06-22 23:41:44 Functions: 73.3 % 30 22

            Line data    Source code
       1              : //!
       2              : //! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
       3              : //! similar to a lock, but it allows readers to "hold on" to an old value of RCU
       4              : //! without blocking writers, and allows writing a new value without blocking
       5              : //! readers. When you update the value, the new value is immediately visible
       6              : //! to new readers, but the update waits until all existing readers have
       7              : //! finished, so that on return, no one sees the old value anymore.
       8              : //!
       9              : //! This implementation isn't wait-free; it uses an RwLock that is held for a
      10              : //! short duration when the value is read or updated.
      11              : //!
      12              : //! # Examples
      13              : //!
      14              : //! Read a value and do things with it while holding the guard:
      15              : //!
      16              : //! ```
      17              : //! # let rcu = utils::simple_rcu::Rcu::new(1);
      18              : //! {
      19              : //!     let read = rcu.read();
      20              : //!     println!("the current value is {}", *read);
      21              : //!     // exiting the scope drops the read-guard, and allows concurrent writers
      22              : //!     // to finish.
      23              : //! }
      24              : //! ```
      25              : //!
      26              : //! Increment the value by one, and wait for old readers to finish:
      27              : //!
      28              : //! ```
      29              : //! # async fn dox() {
      30              : //! # let rcu = utils::simple_rcu::Rcu::new(1);
      31              : //! let write_guard = rcu.lock_for_write();
      32              : //!
      33              : //! // NB: holding `write_guard` blocks new readers and writers. Keep this section short!
      34              : //! let new_value = *write_guard + 1;
      35              : //!
      36              : //! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard`
      37              : //!
      38              : //! // Concurrent reads and writes are now possible again. Wait for all the readers
      39              : //! // that still observe the old value to finish.
      40              : //! waitlist.wait().await;
      41              : //! # }
      42              : //! ```
      43              : //!
      44              : #![warn(missing_docs)]
      45              : 
      46              : use std::ops::Deref;
      47              : use std::sync::{Arc, Weak};
      48              : use std::sync::{RwLock, RwLockWriteGuard};
      49              : 
      50              : use tokio::sync::watch;
      51              : 
      52              : ///
      53              : /// Rcu allows multiple readers to read and hold onto a value without blocking
      54              : /// (for very long).  Storing to the Rcu updates the value, making new readers
      55              : /// immediately see the new value, but it also waits for all current readers to
      56              : /// finish.
      57              : ///
      58              : pub struct Rcu<V> {
      59              :     inner: RwLock<RcuInner<V>>,
      60              : }
      61              : 
      62              : struct RcuInner<V> {
      63              :     current_cell: Arc<RcuCell<V>>,
      64              :     old_cells: Vec<Weak<RcuCell<V>>>,
      65              : }
      66              : 
      67              : ///
      68              : /// RcuCell holds one value. It can be the latest one, or an old one.
      69              : ///
      70              : struct RcuCell<V> {
      71              :     value: V,
      72              : 
      73              :     /// A dummy channel. We never send anything to this channel. The point is
      74              :     /// that when the RcuCell is dropped, any subscribed Receivers will be notified
      75              :     /// that the channel is closed. Updaters can use this to wait out until the
      76              :     /// RcuCell has been dropped, i.e. until the old value is no longer in use.
      77              :     ///
      78              :     /// We never send anything to this, we just need to hold onto it so that the
      79              :     /// Receivers will be notified when it's dropped.
      80              :     watch: watch::Sender<()>,
      81              : }
      82              : 
      83              : impl<V> RcuCell<V> {
      84         1115 :     fn new(value: V) -> Self {
      85         1115 :         let (watch_sender, _) = watch::channel(());
      86         1115 :         RcuCell {
      87         1115 :             value,
      88         1115 :             watch: watch_sender,
      89         1115 :         }
      90         1115 :     }
      91              : }
      92              : 
      93              : impl<V> Rcu<V> {
      94              :     /// Create a new `Rcu`, initialized to `starting_val`
      95          381 :     pub fn new(starting_val: V) -> Self {
      96          381 :         let inner = RcuInner {
      97          381 :             current_cell: Arc::new(RcuCell::new(starting_val)),
      98          381 :             old_cells: Vec::new(),
      99          381 :         };
     100          381 :         Self {
     101          381 :             inner: RwLock::new(inner),
     102          381 :         }
     103          381 :     }
     104              : 
     105              :     ///
     106              :     /// Read current value. Any store() calls will block until the returned
     107              :     /// guard object is dropped.
     108              :     ///
     109         2337 :     pub fn read(&self) -> RcuReadGuard<V> {
     110         2337 :         let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell);
     111         2337 :         RcuReadGuard { cell: current_cell }
     112         2337 :     }
     113              : 
     114              :     ///
     115              :     /// Lock the current value for updating. Returns a guard object that can be
     116              :     /// used to read the current value, and to store a new value.
     117              :     ///
     118              :     /// Note: holding the write-guard blocks concurrent readers, so you should
     119              :     /// finish the update and drop the guard quickly! Multiple writers can be
     120              :     /// waiting on the RcuWriteGuard::store step at the same time, however.
     121              :     ///
     122          734 :     pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> {
     123          734 :         let inner = self.inner.write().unwrap();
     124          734 :         RcuWriteGuard { inner }
     125          734 :     }
     126              : }
     127              : 
     128              : ///
     129              : /// Read guard returned by `read`
     130              : ///
     131              : pub struct RcuReadGuard<V> {
     132              :     cell: Arc<RcuCell<V>>,
     133              : }
     134              : 
     135              : impl<V> Deref for RcuReadGuard<V> {
     136              :     type Target = V;
     137              : 
     138         2575 :     fn deref(&self) -> &V {
     139         2575 :         &self.cell.value
     140         2575 :     }
     141              : }
     142              : 
     143              : ///
     144              : /// Write guard returned by `write`
     145              : ///
     146              : /// NB: Holding this guard blocks all concurrent `read` and `write` calls, so it should only be
     147              : /// held for a short duration!
     148              : ///
     149              : /// Calling [`Self::store_and_unlock`] consumes the guard, making new reads and new writes possible
     150              : /// again.
     151              : ///
     152              : pub struct RcuWriteGuard<'a, V> {
     153              :     inner: RwLockWriteGuard<'a, RcuInner<V>>,
     154              : }
     155              : 
     156              : impl<'a, V> Deref for RcuWriteGuard<'a, V> {
     157              :     type Target = V;
     158              : 
     159          734 :     fn deref(&self) -> &V {
     160          734 :         &self.inner.current_cell.value
     161          734 :     }
     162              : }
     163              : 
     164              : impl<'a, V> RcuWriteGuard<'a, V> {
     165              :     ///
     166              :     /// Store a new value. The new value will be written to the Rcu immediately,
     167              :     /// and will be immediately seen by any `read` calls that start afterwards.
     168              :     ///
     169              :     /// Returns a list of readers that can see old values. You can call `wait()`
     170              :     /// on it to wait for them to finish.
     171              :     ///
     172          734 :     pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList {
     173          734 :         let new_cell = Arc::new(RcuCell::new(new_val));
     174          734 : 
     175          734 :         let mut watches = Vec::new();
     176          734 :         {
     177          734 :             let old = std::mem::replace(&mut self.inner.current_cell, new_cell);
     178          734 :             self.inner.old_cells.push(Arc::downgrade(&old));
     179          734 : 
     180          734 :             // cleanup old cells that no longer have any readers, and collect
     181          734 :             // the watches for any that do.
     182         1344 :             self.inner.old_cells.retain(|weak| {
     183         1344 :                 if let Some(cell) = weak.upgrade() {
     184          736 :                     watches.push(cell.watch.subscribe());
     185          736 :                     true
     186              :                 } else {
     187          608 :                     false
     188              :                 }
     189         1344 :             });
     190          734 :         }
     191          734 :         RcuWaitList(watches)
     192          734 :     }
     193              : }
     194              : 
     195              : ///
     196              : /// List of readers who can still see old values.
     197              : ///
     198              : pub struct RcuWaitList(Vec<watch::Receiver<()>>);
     199              : 
     200              : impl RcuWaitList {
     201              :     ///
     202              :     /// Wait for old readers to finish.
     203              :     ///
     204          734 :     pub async fn wait(mut self) {
     205              :         // after all the old_cells are no longer in use, we're done
     206          736 :         for w in self.0.iter_mut() {
     207              :             // This will block until the Receiver is closed. That happens when
     208              :             // the RcuCell is dropped.
     209              :             #[allow(clippy::single_match)]
     210          736 :             match w.changed().await {
     211            0 :                 Ok(_) => panic!("changed() unexpectedly succeeded on dummy channel"),
     212          736 :                 Err(_) => {
     213          736 :                     // closed, which means that the cell has been dropped, and
     214          736 :                     // its value is no longer in use
     215          736 :                 }
     216              :             }
     217              :         }
     218          734 :     }
     219              : }
     220              : 
     221              : #[cfg(test)]
     222              : mod tests {
     223              :     use super::*;
     224              :     use std::sync::Mutex;
     225              :     use std::time::Duration;
     226              : 
     227              :     #[tokio::test]
     228            2 :     async fn two_writers() {
     229            2 :         let rcu = Rcu::new(1);
     230            2 : 
     231            2 :         let read1 = rcu.read();
     232            2 :         assert_eq!(*read1, 1);
     233            2 : 
     234            2 :         let write2 = rcu.lock_for_write();
     235            2 :         assert_eq!(*write2, 1);
     236            2 :         let wait2 = write2.store_and_unlock(2);
     237            2 : 
     238            2 :         let read2 = rcu.read();
     239            2 :         assert_eq!(*read2, 2);
     240            2 : 
     241            2 :         let write3 = rcu.lock_for_write();
     242            2 :         assert_eq!(*write3, 2);
     243            2 :         let wait3 = write3.store_and_unlock(3);
     244            2 : 
     245            2 :         // new reader can see the new value, and old readers continue to see the old values.
     246            2 :         let read3 = rcu.read();
     247            2 :         assert_eq!(*read3, 3);
     248            2 :         assert_eq!(*read2, 2);
     249            2 :         assert_eq!(*read1, 1);
     250            2 : 
     251            2 :         let log = Arc::new(Mutex::new(Vec::new()));
     252            2 :         // Wait for the old readers to finish in separate tasks.
     253            2 :         let log_clone = Arc::clone(&log);
     254            2 :         let task2 = tokio::spawn(async move {
     255            2 :             wait2.wait().await;
     256            2 :             log_clone.lock().unwrap().push("wait2 done");
     257            2 :         });
     258            2 :         let log_clone = Arc::clone(&log);
     259            2 :         let task3 = tokio::spawn(async move {
     260            4 :             wait3.wait().await;
     261            2 :             log_clone.lock().unwrap().push("wait3 done");
     262            2 :         });
     263            2 : 
     264            2 :         // without this sleep the test can pass on accident if the writer is slow
     265            2 :         tokio::time::sleep(Duration::from_millis(100)).await;
     266            2 : 
     267            2 :         // Release first reader. This allows first write to finish, but calling
     268            2 :         // wait() on the 'task3' would still block.
     269            2 :         log.lock().unwrap().push("dropping read1");
     270            2 :         drop(read1);
     271            2 :         task2.await.unwrap();
     272            2 : 
     273            2 :         assert!(!task3.is_finished());
     274            2 : 
     275            2 :         tokio::time::sleep(Duration::from_millis(100)).await;
     276            2 : 
     277            2 :         // Release second reader, and finish second writer.
     278            2 :         log.lock().unwrap().push("dropping read2");
     279            2 :         drop(read2);
     280            2 :         task3.await.unwrap();
     281            2 : 
     282            2 :         assert_eq!(
     283            2 :             log.lock().unwrap().as_slice(),
     284            2 :             &[
     285            2 :                 "dropping read1",
     286            2 :                 "wait2 done",
     287            2 :                 "dropping read2",
     288            2 :                 "wait3 done"
     289            2 :             ]
     290            2 :         );
     291            2 :     }
     292              : }
        

Generated by: LCOV version 2.1-beta