LCOV - code coverage report
Current view: top level - libs/utils/src - simple_rcu.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 98.4 % 122 120
Test Date: 2024-11-25 17:48:16 Functions: 73.3 % 30 22

            Line data    Source code
       1              : //!
       2              : //! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
       3              : //! similar to a lock, but it allows readers to "hold on" to an old value of RCU
       4              : //! without blocking writers, and allows writing a new value without blocking
       5              : //! readers. When you update the value, the new value is immediately visible
       6              : //! to new readers, but the update waits until all existing readers have
       7              : //! finished, so that on return, no one sees the old value anymore.
       8              : //!
       9              : //! This implementation isn't wait-free; it uses an RwLock that is held for a
      10              : //! short duration when the value is read or updated.
      11              : //!
      12              : //! # Examples
      13              : //!
      14              : //! Read a value and do things with it while holding the guard:
      15              : //!
      16              : //! ```
      17              : //! # let rcu = utils::simple_rcu::Rcu::new(1);
      18              : //! {
      19              : //!     let read = rcu.read();
      20              : //!     println!("the current value is {}", *read);
      21              : //!     // exiting the scope drops the read-guard, and allows concurrent writers
      22              : //!     // to finish.
      23              : //! }
      24              : //! ```
      25              : //!
      26              : //! Increment the value by one, and wait for old readers to finish:
      27              : //!
      28              : //! ```
      29              : //! # async fn dox() {
      30              : //! # let rcu = utils::simple_rcu::Rcu::new(1);
      31              : //! let write_guard = rcu.lock_for_write();
      32              : //!
      33              : //! // NB: holding `write_guard` blocks new readers and writers. Keep this section short!
      34              : //! let new_value = *write_guard + 1;
      35              : //!
      36              : //! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard`
      37              : //!
      38              : //! // Concurrent reads and writes are now possible again. Wait for all the readers
      39              : //! // that still observe the old value to finish.
      40              : //! waitlist.wait().await;
      41              : //! # }
      42              : //! ```
      43              : //!
      44              : #![warn(missing_docs)]
      45              : 
      46              : use std::ops::Deref;
      47              : use std::sync::{Arc, Weak};
      48              : use std::sync::{RwLock, RwLockWriteGuard};
      49              : 
      50              : use tokio::sync::watch;
      51              : 
      52              : /// Rcu allows multiple readers to read and hold onto a value without blocking
      53              : /// (for very long).
      54              : ///
      55              : /// Storing to the Rcu updates the value, making new readers immediately see
      56              : /// the new value, but it also waits for all current readers to finish.
      57              : pub struct Rcu<V> {
      58              :     inner: RwLock<RcuInner<V>>,
      59              : }
      60              : 
      61              : struct RcuInner<V> {
      62              :     current_cell: Arc<RcuCell<V>>,
      63              :     old_cells: Vec<Weak<RcuCell<V>>>,
      64              : }
      65              : 
      66              : ///
      67              : /// RcuCell holds one value. It can be the latest one, or an old one.
      68              : ///
      69              : struct RcuCell<V> {
      70              :     value: V,
      71              : 
      72              :     /// A dummy channel. We never send anything to this channel. The point is
      73              :     /// that when the RcuCell is dropped, any subscribed Receivers will be notified
      74              :     /// that the channel is closed. Updaters can use this to wait out until the
      75              :     /// RcuCell has been dropped, i.e. until the old value is no longer in use.
      76              :     ///
      77              :     /// We never send anything to this, we just need to hold onto it so that the
      78              :     /// Receivers will be notified when it's dropped.
      79              :     watch: watch::Sender<()>,
      80              : }
      81              : 
      82              : impl<V> RcuCell<V> {
      83          425 :     fn new(value: V) -> Self {
      84          425 :         let (watch_sender, _) = watch::channel(());
      85          425 :         RcuCell {
      86          425 :             value,
      87          425 :             watch: watch_sender,
      88          425 :         }
      89          425 :     }
      90              : }
      91              : 
      92              : impl<V> Rcu<V> {
      93              :     /// Create a new `Rcu`, initialized to `starting_val`
      94          419 :     pub fn new(starting_val: V) -> Self {
      95          419 :         let inner = RcuInner {
      96          419 :             current_cell: Arc::new(RcuCell::new(starting_val)),
      97          419 :             old_cells: Vec::new(),
      98          419 :         };
      99          419 :         Self {
     100          419 :             inner: RwLock::new(inner),
     101          419 :         }
     102          419 :     }
     103              : 
     104              :     ///
     105              :     /// Read current value. Any store() calls will block until the returned
     106              :     /// guard object is dropped.
     107              :     ///
     108         1625 :     pub fn read(&self) -> RcuReadGuard<V> {
     109         1625 :         let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell);
     110         1625 :         RcuReadGuard { cell: current_cell }
     111         1625 :     }
     112              : 
     113              :     ///
     114              :     /// Lock the current value for updating. Returns a guard object that can be
     115              :     /// used to read the current value, and to store a new value.
     116              :     ///
     117              :     /// Note: holding the write-guard blocks concurrent readers, so you should
     118              :     /// finish the update and drop the guard quickly! Multiple writers can be
     119              :     /// waiting on the RcuWriteGuard::store step at the same time, however.
     120              :     ///
     121            6 :     pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> {
     122            6 :         let inner = self.inner.write().unwrap();
     123            6 :         RcuWriteGuard { inner }
     124            6 :     }
     125              : }
     126              : 
     127              : ///
     128              : /// Read guard returned by `read`
     129              : ///
     130              : pub struct RcuReadGuard<V> {
     131              :     cell: Arc<RcuCell<V>>,
     132              : }
     133              : 
     134              : impl<V> Deref for RcuReadGuard<V> {
     135              :     type Target = V;
     136              : 
     137         1865 :     fn deref(&self) -> &V {
     138         1865 :         &self.cell.value
     139         1865 :     }
     140              : }
     141              : 
     142              : ///
     143              : /// Write guard returned by `write`
     144              : ///
     145              : /// NB: Holding this guard blocks all concurrent `read` and `write` calls, so it should only be
     146              : /// held for a short duration!
     147              : ///
     148              : /// Calling [`Self::store_and_unlock`] consumes the guard, making new reads and new writes possible
     149              : /// again.
     150              : ///
     151              : pub struct RcuWriteGuard<'a, V> {
     152              :     inner: RwLockWriteGuard<'a, RcuInner<V>>,
     153              : }
     154              : 
     155              : impl<V> Deref for RcuWriteGuard<'_, V> {
     156              :     type Target = V;
     157              : 
     158            6 :     fn deref(&self) -> &V {
     159            6 :         &self.inner.current_cell.value
     160            6 :     }
     161              : }
     162              : 
     163              : impl<V> RcuWriteGuard<'_, V> {
     164              :     ///
     165              :     /// Store a new value. The new value will be written to the Rcu immediately,
     166              :     /// and will be immediately seen by any `read` calls that start afterwards.
     167              :     ///
     168              :     /// Returns a list of readers that can see old values. You can call `wait()`
     169              :     /// on it to wait for them to finish.
     170              :     ///
     171            6 :     pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList {
     172            6 :         let new_cell = Arc::new(RcuCell::new(new_val));
     173            6 : 
     174            6 :         let mut watches = Vec::new();
     175            6 :         {
     176            6 :             let old = std::mem::replace(&mut self.inner.current_cell, new_cell);
     177            6 :             self.inner.old_cells.push(Arc::downgrade(&old));
     178            6 : 
     179            6 :             // cleanup old cells that no longer have any readers, and collect
     180            6 :             // the watches for any that do.
     181            7 :             self.inner.old_cells.retain(|weak| {
     182            7 :                 if let Some(cell) = weak.upgrade() {
     183            7 :                     watches.push(cell.watch.subscribe());
     184            7 :                     true
     185              :                 } else {
     186            0 :                     false
     187              :                 }
     188            7 :             });
     189            6 :         }
     190            6 :         RcuWaitList(watches)
     191            6 :     }
     192              : }
     193              : 
     194              : ///
     195              : /// List of readers who can still see old values.
     196              : ///
     197              : pub struct RcuWaitList(Vec<watch::Receiver<()>>);
     198              : 
     199              : impl RcuWaitList {
     200              :     ///
     201              :     /// Wait for old readers to finish.
     202              :     ///
     203            6 :     pub async fn wait(mut self) {
     204              :         // after all the old_cells are no longer in use, we're done
     205            7 :         for w in self.0.iter_mut() {
     206              :             // This will block until the Receiver is closed. That happens when
     207              :             // the RcuCell is dropped.
     208              :             #[allow(clippy::single_match)]
     209            7 :             match w.changed().await {
     210            0 :                 Ok(_) => panic!("changed() unexpectedly succeeded on dummy channel"),
     211            7 :                 Err(_) => {
     212            7 :                     // closed, which means that the cell has been dropped, and
     213            7 :                     // its value is no longer in use
     214            7 :                 }
     215              :             }
     216              :         }
     217            6 :     }
     218              : }
     219              : 
     220              : #[cfg(test)]
     221              : mod tests {
     222              :     use super::*;
     223              :     use std::sync::Mutex;
     224              :     use std::time::Duration;
     225              : 
     226              :     #[tokio::test]
     227            1 :     async fn two_writers() {
     228            1 :         let rcu = Rcu::new(1);
     229            1 : 
     230            1 :         let read1 = rcu.read();
     231            1 :         assert_eq!(*read1, 1);
     232            1 : 
     233            1 :         let write2 = rcu.lock_for_write();
     234            1 :         assert_eq!(*write2, 1);
     235            1 :         let wait2 = write2.store_and_unlock(2);
     236            1 : 
     237            1 :         let read2 = rcu.read();
     238            1 :         assert_eq!(*read2, 2);
     239            1 : 
     240            1 :         let write3 = rcu.lock_for_write();
     241            1 :         assert_eq!(*write3, 2);
     242            1 :         let wait3 = write3.store_and_unlock(3);
     243            1 : 
     244            1 :         // new reader can see the new value, and old readers continue to see the old values.
     245            1 :         let read3 = rcu.read();
     246            1 :         assert_eq!(*read3, 3);
     247            1 :         assert_eq!(*read2, 2);
     248            1 :         assert_eq!(*read1, 1);
     249            1 : 
     250            1 :         let log = Arc::new(Mutex::new(Vec::new()));
     251            1 :         // Wait for the old readers to finish in separate tasks.
     252            1 :         let log_clone = Arc::clone(&log);
     253            1 :         let task2 = tokio::spawn(async move {
     254            1 :             wait2.wait().await;
     255            1 :             log_clone.lock().unwrap().push("wait2 done");
     256            1 :         });
     257            1 :         let log_clone = Arc::clone(&log);
     258            1 :         let task3 = tokio::spawn(async move {
     259            2 :             wait3.wait().await;
     260            1 :             log_clone.lock().unwrap().push("wait3 done");
     261            1 :         });
     262            1 : 
     263            1 :         // without this sleep the test can pass on accident if the writer is slow
     264            1 :         tokio::time::sleep(Duration::from_millis(100)).await;
     265            1 : 
     266            1 :         // Release first reader. This allows first write to finish, but calling
     267            1 :         // wait() on the 'task3' would still block.
     268            1 :         log.lock().unwrap().push("dropping read1");
     269            1 :         drop(read1);
     270            1 :         task2.await.unwrap();
     271            1 : 
     272            1 :         assert!(!task3.is_finished());
     273            1 : 
     274            1 :         tokio::time::sleep(Duration::from_millis(100)).await;
     275            1 : 
     276            1 :         // Release second reader, and finish second writer.
     277            1 :         log.lock().unwrap().push("dropping read2");
     278            1 :         drop(read2);
     279            1 :         task3.await.unwrap();
     280            1 : 
     281            1 :         assert_eq!(
     282            1 :             log.lock().unwrap().as_slice(),
     283            1 :             &[
     284            1 :                 "dropping read1",
     285            1 :                 "wait2 done",
     286            1 :                 "dropping read2",
     287            1 :                 "wait3 done"
     288            1 :             ]
     289            1 :         );
     290            1 :     }
     291              : }
        

Generated by: LCOV version 2.1-beta