LCOV - differential code coverage report
Current view: top level - libs/utils/src - simple_rcu.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 99.1 % 112 111 1 111
Current Date: 2024-01-09 02:06:09 Functions: 73.3 % 30 22 8 22
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //!
       2                 : //! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
       3                 : //! similar to a lock, but it allows readers to "hold on" to an old value of RCU
       4                 : //! without blocking writers, and allows writing a new value without blocking
       5                 : //! readers. When you update the value, the new value is immediately visible
       6                 : //! to new readers, but the update waits until all existing readers have
       7                 : //! finished, so that on return, no one sees the old value anymore.
       8                 : //!
       9                 : //! This implementation isn't wait-free; it uses an RwLock that is held for a
      10                 : //! short duration when the value is read or updated.
      11                 : //!
      12                 : //! # Examples
      13                 : //!
      14                 : //! Read a value and do things with it while holding the guard:
      15                 : //!
      16                 : //! ```
      17                 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
      18                 : //! {
      19                 : //!     let read = rcu.read();
      20                 : //!     println!("the current value is {}", *read);
      21                 : //!     // exiting the scope drops the read-guard, and allows concurrent writers
      22                 : //!     // to finish.
      23                 : //! }
      24                 : //! ```
      25                 : //!
      26                 : //! Increment the value by one, and wait for old readers to finish:
      27                 : //!
      28                 : //! ```
      29                 : //! # async fn dox() {
      30                 : //! # let rcu = utils::simple_rcu::Rcu::new(1);
      31                 : //! let write_guard = rcu.lock_for_write();
      32                 : //!
      33                 : //! // NB: holding `write_guard` blocks new readers and writers. Keep this section short!
      34                 : //! let new_value = *write_guard + 1;
      35                 : //!
      36                 : //! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard`
      37                 : //!
      38                 : //! // Concurrent reads and writes are now possible again. Wait for all the readers
      39                 : //! // that still observe the old value to finish.
      40                 : //! waitlist.wait().await;
      41                 : //! # }
      42                 : //! ```
      43                 : //!
      44                 : #![warn(missing_docs)]
      45                 : 
      46                 : use std::ops::Deref;
      47                 : use std::sync::{Arc, Weak};
      48                 : use std::sync::{RwLock, RwLockWriteGuard};
      49                 : 
      50                 : use tokio::sync::watch;
      51                 : 
      52                 : ///
      53                 : /// Rcu allows multiple readers to read and hold onto a value without blocking
      54                 : /// (for very long).  Storing to the Rcu updates the value, making new readers
      55                 : /// immediately see the new value, but it also waits for all current readers to
      56                 : /// finish.
      57                 : ///
      58                 : pub struct Rcu<V> {
      59                 :     inner: RwLock<RcuInner<V>>,
      60                 : }
      61                 : 
      62                 : struct RcuInner<V> {
      63                 :     current_cell: Arc<RcuCell<V>>,
      64                 :     old_cells: Vec<Weak<RcuCell<V>>>,
      65                 : }
      66                 : 
      67                 : ///
      68                 : /// RcuCell holds one value. It can be the latest one, or an old one.
      69                 : ///
      70                 : struct RcuCell<V> {
      71                 :     value: V,
      72                 : 
      73                 :     /// A dummy channel. We never send anything to this channel. The point is
      74                 :     /// that when the RcuCell is dropped, any subscribed Receivers will be notified
      75                 :     /// that the channel is closed. Updaters can use this to wait out until the
      76                 :     /// RcuCell has been dropped, i.e. until the old value is no longer in use.
      77                 :     ///
      78                 :     /// We never send anything to this, we just need to hold onto it so that the
      79                 :     /// Receivers will be notified when it's dropped.
      80                 :     watch: watch::Sender<()>,
      81                 : }
      82                 : 
      83                 : impl<V> RcuCell<V> {
      84 CBC        1835 :     fn new(value: V) -> Self {
      85            1835 :         let (watch_sender, _) = watch::channel(());
      86            1835 :         RcuCell {
      87            1835 :             value,
      88            1835 :             watch: watch_sender,
      89            1835 :         }
      90            1835 :     }
      91                 : }
      92                 : 
      93                 : impl<V> Rcu<V> {
      94                 :     /// Create a new `Rcu`, initialized to `starting_val`
      95            1291 :     pub fn new(starting_val: V) -> Self {
      96            1291 :         let inner = RcuInner {
      97            1291 :             current_cell: Arc::new(RcuCell::new(starting_val)),
      98            1291 :             old_cells: Vec::new(),
      99            1291 :         };
     100            1291 :         Self {
     101            1291 :             inner: RwLock::new(inner),
     102            1291 :         }
     103            1291 :     }
     104                 : 
     105                 :     ///
     106                 :     /// Read current value. Any store() calls will block until the returned
     107                 :     /// guard object is dropped.
     108                 :     ///
     109         3651282 :     pub fn read(&self) -> RcuReadGuard<V> {
     110         3651282 :         let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell);
     111         3651282 :         RcuReadGuard { cell: current_cell }
     112         3651282 :     }
     113                 : 
     114                 :     ///
     115                 :     /// Lock the current value for updating. Returns a guard object that can be
     116                 :     /// used to read the current value, and to store a new value.
     117                 :     ///
     118                 :     /// Note: holding the write-guard blocks concurrent readers, so you should
     119                 :     /// finish the update and drop the guard quickly! Multiple writers can be
     120                 :     /// waiting on the RcuWriteGuard::store step at the same time, however.
     121                 :     ///
     122             544 :     pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> {
     123             544 :         let inner = self.inner.write().unwrap();
     124             544 :         RcuWriteGuard { inner }
     125             544 :     }
     126                 : }
     127                 : 
     128                 : ///
     129                 : /// Read guard returned by `read`
     130                 : ///
     131                 : pub struct RcuReadGuard<V> {
     132                 :     cell: Arc<RcuCell<V>>,
     133                 : }
     134                 : 
     135                 : impl<V> Deref for RcuReadGuard<V> {
     136                 :     type Target = V;
     137                 : 
     138         3651266 :     fn deref(&self) -> &V {
     139         3651266 :         &self.cell.value
     140         3651266 :     }
     141                 : }
     142                 : 
     143                 : ///
     144                 : /// Write guard returned by `write`
     145                 : ///
     146                 : /// NB: Holding this guard blocks all concurrent `read` and `write` calls, so it should only be
     147                 : /// held for a short duration!
     148                 : ///
     149                 : /// Calling [`Self::store_and_unlock`] consumes the guard, making new reads and new writes possible
     150                 : /// again.
     151                 : ///
     152                 : pub struct RcuWriteGuard<'a, V> {
     153                 :     inner: RwLockWriteGuard<'a, RcuInner<V>>,
     154                 : }
     155                 : 
     156                 : impl<'a, V> Deref for RcuWriteGuard<'a, V> {
     157                 :     type Target = V;
     158                 : 
     159             544 :     fn deref(&self) -> &V {
     160             544 :         &self.inner.current_cell.value
     161             544 :     }
     162                 : }
     163                 : 
     164                 : impl<'a, V> RcuWriteGuard<'a, V> {
     165                 :     ///
     166                 :     /// Store a new value. The new value will be written to the Rcu immediately,
     167                 :     /// and will be immediately seen by any `read` calls that start afterwards.
     168                 :     ///
     169                 :     /// Returns a list of readers that can see old values. You can call `wait()`
     170                 :     /// on it to wait for them to finish.
     171                 :     ///
     172             544 :     pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList {
     173             544 :         let new_cell = Arc::new(RcuCell::new(new_val));
     174             544 : 
     175             544 :         let mut watches = Vec::new();
     176             544 :         {
     177             544 :             let old = std::mem::replace(&mut self.inner.current_cell, new_cell);
     178             544 :             self.inner.old_cells.push(Arc::downgrade(&old));
     179             544 : 
     180             544 :             // cleanup old cells that no longer have any readers, and collect
     181             544 :             // the watches for any that do.
     182             984 :             self.inner.old_cells.retain(|weak| {
     183             984 :                 if let Some(cell) = weak.upgrade() {
     184             545 :                     watches.push(cell.watch.subscribe());
     185             545 :                     true
     186                 :                 } else {
     187             439 :                     false
     188                 :                 }
     189             984 :             });
     190             544 :         }
     191             544 :         RcuWaitList(watches)
     192             544 :     }
     193                 : }
     194                 : 
     195                 : ///
     196                 : /// List of readers who can still see old values.
     197                 : ///
     198                 : pub struct RcuWaitList(Vec<watch::Receiver<()>>);
     199                 : 
     200                 : impl RcuWaitList {
     201                 :     ///
     202                 :     /// Wait for old readers to finish.
     203                 :     ///
     204             544 :     pub async fn wait(mut self) {
     205                 :         // after all the old_cells are no longer in use, we're done
     206             545 :         for w in self.0.iter_mut() {
     207                 :             // This will block until the Receiver is closed. That happens when
     208                 :             // the RcuCell is dropped.
     209                 :             #[allow(clippy::single_match)]
     210             545 :             match w.changed().await {
     211 UBC           0 :                 Ok(_) => panic!("changed() unexpectedly succeeded on dummy channel"),
     212 CBC         545 :                 Err(_) => {
     213             545 :                     // closed, which means that the cell has been dropped, and
     214             545 :                     // its value is no longer in use
     215             545 :                 }
     216                 :             }
     217                 :         }
     218             544 :     }
     219                 : }
     220                 : 
     221                 : #[cfg(test)]
     222                 : mod tests {
     223                 :     use super::*;
     224                 :     use std::sync::{Arc, Mutex};
     225                 :     use std::time::Duration;
     226                 : 
     227               1 :     #[tokio::test]
     228               1 :     async fn two_writers() {
     229               1 :         let rcu = Rcu::new(1);
     230               1 : 
     231               1 :         let read1 = rcu.read();
     232               1 :         assert_eq!(*read1, 1);
     233                 : 
     234               1 :         let write2 = rcu.lock_for_write();
     235               1 :         assert_eq!(*write2, 1);
     236               1 :         let wait2 = write2.store_and_unlock(2);
     237               1 : 
     238               1 :         let read2 = rcu.read();
     239               1 :         assert_eq!(*read2, 2);
     240                 : 
     241               1 :         let write3 = rcu.lock_for_write();
     242               1 :         assert_eq!(*write3, 2);
     243               1 :         let wait3 = write3.store_and_unlock(3);
     244               1 : 
     245               1 :         // new reader can see the new value, and old readers continue to see the old values.
     246               1 :         let read3 = rcu.read();
     247               1 :         assert_eq!(*read3, 3);
     248               1 :         assert_eq!(*read2, 2);
     249               1 :         assert_eq!(*read1, 1);
     250                 : 
     251               1 :         let log = Arc::new(Mutex::new(Vec::new()));
     252               1 :         // Wait for the old readers to finish in separate tasks.
     253               1 :         let log_clone = Arc::clone(&log);
     254               1 :         let task2 = tokio::spawn(async move {
     255               1 :             wait2.wait().await;
     256               1 :             log_clone.lock().unwrap().push("wait2 done");
     257               1 :         });
     258               1 :         let log_clone = Arc::clone(&log);
     259               1 :         let task3 = tokio::spawn(async move {
     260               2 :             wait3.wait().await;
     261               1 :             log_clone.lock().unwrap().push("wait3 done");
     262               1 :         });
     263               1 : 
     264               1 :         // without this sleep the test can pass on accident if the writer is slow
     265               1 :         tokio::time::sleep(Duration::from_millis(100)).await;
     266                 : 
     267                 :         // Release first reader. This allows first write to finish, but calling
     268                 :         // wait() on the 'task3' would still block.
     269               1 :         log.lock().unwrap().push("dropping read1");
     270               1 :         drop(read1);
     271               1 :         task2.await.unwrap();
     272                 : 
     273               1 :         assert!(!task3.is_finished());
     274                 : 
     275               1 :         tokio::time::sleep(Duration::from_millis(100)).await;
     276                 : 
     277                 :         // Release second reader, and finish second writer.
     278               1 :         log.lock().unwrap().push("dropping read2");
     279               1 :         drop(read2);
     280               1 :         task3.await.unwrap();
     281               1 : 
     282               1 :         assert_eq!(
     283               1 :             log.lock().unwrap().as_slice(),
     284               1 :             &[
     285               1 :                 "dropping read1",
     286               1 :                 "wait2 done",
     287               1 :                 "dropping read2",
     288               1 :                 "wait3 done"
     289               1 :             ]
     290               1 :         );
     291                 :     }
     292                 : }
        

Generated by: LCOV version 2.1-beta