|             Line data    Source code 
       1              : //!
       2              : //! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
       3              : //! similar to a lock, but it allows readers to "hold on" to an old value of RCU
       4              : //! without blocking writers, and allows writing a new value without blocking
       5              : //! readers. When you update the value, the new value is immediately visible
       6              : //! to new readers, but the update waits until all existing readers have
       7              : //! finished, so that on return, no one sees the old value anymore.
       8              : //!
       9              : //! This implementation isn't wait-free; it uses an RwLock that is held for a
      10              : //! short duration when the value is read or updated.
      11              : //!
      12              : //! # Examples
      13              : //!
      14              : //! Read a value and do things with it while holding the guard:
      15              : //!
      16              : //! ```
      17              : //! # let rcu = utils::simple_rcu::Rcu::new(1);
      18              : //! {
      19              : //!     let read = rcu.read();
      20              : //!     println!("the current value is {}", *read);
      21              : //!     // exiting the scope drops the read-guard, and allows concurrent writers
      22              : //!     // to finish.
      23              : //! }
      24              : //! ```
      25              : //!
      26              : //! Increment the value by one, and wait for old readers to finish:
      27              : //!
      28              : //! ```
      29              : //! # async fn dox() {
      30              : //! # let rcu = utils::simple_rcu::Rcu::new(1);
      31              : //! let write_guard = rcu.lock_for_write();
      32              : //!
      33              : //! // NB: holding `write_guard` blocks new readers and writers. Keep this section short!
      34              : //! let new_value = *write_guard + 1;
      35              : //!
      36              : //! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard`
      37              : //!
      38              : //! // Concurrent reads and writes are now possible again. Wait for all the readers
      39              : //! // that still observe the old value to finish.
      40              : //! waitlist.wait().await;
      41              : //! # }
      42              : //! ```
      43              : //!
      44              : #![warn(missing_docs)]
      45              : 
      46              : use std::ops::Deref;
      47              : use std::sync::{Arc, RwLock, RwLockWriteGuard, Weak};
      48              : 
      49              : use tokio::sync::watch;
      50              : 
      51              : /// Rcu allows multiple readers to read and hold onto a value without blocking
      52              : /// (for very long).
      53              : ///
      54              : /// Storing to the Rcu updates the value, making new readers immediately see
      55              : /// the new value, but it also waits for all current readers to finish.
      56              : pub struct Rcu<V> {
      57              :     inner: RwLock<RcuInner<V>>,
      58              : }
      59              : 
      60              : struct RcuInner<V> {
      61              :     current_cell: Arc<RcuCell<V>>,
      62              :     old_cells: Vec<Weak<RcuCell<V>>>,
      63              : }
      64              : 
      65              : ///
      66              : /// RcuCell holds one value. It can be the latest one, or an old one.
      67              : ///
      68              : struct RcuCell<V> {
      69              :     value: V,
      70              : 
      71              :     /// A dummy channel. We never send anything to this channel. The point is
      72              :     /// that when the RcuCell is dropped, any subscribed Receivers will be notified
      73              :     /// that the channel is closed. Updaters can use this to wait out until the
      74              :     /// RcuCell has been dropped, i.e. until the old value is no longer in use.
      75              :     ///
      76              :     /// We never send anything to this, we just need to hold onto it so that the
      77              :     /// Receivers will be notified when it's dropped.
      78              :     watch: watch::Sender<()>,
      79              : }
      80              : 
      81              : impl<V> RcuCell<V> {
      82          618 :     fn new(value: V) -> Self {
      83          618 :         let (watch_sender, _) = watch::channel(());
      84          618 :         RcuCell {
      85          618 :             value,
      86          618 :             watch: watch_sender,
      87          618 :         }
      88            3 :     }
      89              : }
      90              : 
      91              : impl<V> Rcu<V> {
      92              :     /// Create a new `Rcu`, initialized to `starting_val`
      93          236 :     pub fn new(starting_val: V) -> Self {
      94          236 :         let inner = RcuInner {
      95          236 :             current_cell: Arc::new(RcuCell::new(starting_val)),
      96          236 :             old_cells: Vec::new(),
      97          236 :         };
      98          236 :         Self {
      99          236 :             inner: RwLock::new(inner),
     100          236 :         }
     101            1 :     }
     102              : 
     103              :     ///
     104              :     /// Read current value. Any store() calls will block until the returned
     105              :     /// guard object is dropped.
     106              :     ///
     107       428319 :     pub fn read(&self) -> RcuReadGuard<V> {
     108       428319 :         let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell);
     109       428319 :         RcuReadGuard { cell: current_cell }
     110            3 :     }
     111              : 
     112              :     ///
     113              :     /// Lock the current value for updating. Returns a guard object that can be
     114              :     /// used to read the current value, and to store a new value.
     115              :     ///
     116              :     /// Note: holding the write-guard blocks concurrent readers, so you should
     117              :     /// finish the update and drop the guard quickly! Multiple writers can be
     118              :     /// waiting on the RcuWriteGuard::store step at the same time, however.
     119              :     ///
     120          382 :     pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> {
     121          382 :         let inner = self.inner.write().unwrap();
     122          382 :         RcuWriteGuard { inner }
     123            2 :     }
     124              : }
     125              : 
     126              : ///
     127              : /// Read guard returned by `read`
     128              : ///
     129              : pub struct RcuReadGuard<V> {
     130              :     cell: Arc<RcuCell<V>>,
     131              : }
     132              : 
     133              : impl<V> Deref for RcuReadGuard<V> {
     134              :     type Target = V;
     135              : 
     136         1596 :     fn deref(&self) -> &V {
     137         1596 :         &self.cell.value
     138            5 :     }
     139              : }
     140              : 
     141              : ///
     142              : /// Write guard returned by `write`
     143              : ///
     144              : /// NB: Holding this guard blocks all concurrent `read` and `write` calls, so it should only be
     145              : /// held for a short duration!
     146              : ///
     147              : /// Calling [`Self::store_and_unlock`] consumes the guard, making new reads and new writes possible
     148              : /// again.
     149              : ///
     150              : pub struct RcuWriteGuard<'a, V> {
     151              :     inner: RwLockWriteGuard<'a, RcuInner<V>>,
     152              : }
     153              : 
     154              : impl<V> Deref for RcuWriteGuard<'_, V> {
     155              :     type Target = V;
     156              : 
     157          367 :     fn deref(&self) -> &V {
     158          367 :         &self.inner.current_cell.value
     159            2 :     }
     160              : }
     161              : 
     162              : impl<V> RcuWriteGuard<'_, V> {
     163              :     ///
     164              :     /// Store a new value. The new value will be written to the Rcu immediately,
     165              :     /// and will be immediately seen by any `read` calls that start afterwards.
     166              :     ///
     167              :     /// Returns a list of readers that can see old values. You can call `wait()`
     168              :     /// on it to wait for them to finish.
     169              :     ///
     170          382 :     pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList {
     171          382 :         let new_cell = Arc::new(RcuCell::new(new_val));
     172              : 
     173          382 :         let mut watches = Vec::new();
     174              :         {
     175          382 :             let old = std::mem::replace(&mut self.inner.current_cell, new_cell);
     176          382 :             self.inner.old_cells.push(Arc::downgrade(&old));
     177              : 
     178              :             // cleanup old cells that no longer have any readers, and collect
     179              :             // the watches for any that do.
     180          382 :             self.inner.old_cells.retain(|weak| {
     181          691 :                 if let Some(cell) = weak.upgrade() {
     182          383 :                     watches.push(cell.watch.subscribe());
     183          383 :                     true
     184              :                 } else {
     185          308 :                     false
     186              :                 }
     187            3 :             });
     188              :         }
     189          382 :         RcuWaitList(watches)
     190            2 :     }
     191              : }
     192              : 
     193              : ///
     194              : /// List of readers who can still see old values.
     195              : ///
     196              : pub struct RcuWaitList(Vec<watch::Receiver<()>>);
     197              : 
     198              : impl RcuWaitList {
     199              :     ///
     200              :     /// Wait for old readers to finish.
     201              :     ///
     202          382 :     pub async fn wait(mut self) {
     203              :         // after all the old_cells are no longer in use, we're done
     204          383 :         for w in self.0.iter_mut() {
     205              :             // This will block until the Receiver is closed. That happens when
     206              :             // the RcuCell is dropped.
     207              :             #[allow(clippy::single_match)]
     208          383 :             match w.changed().await {
     209            0 :                 Ok(_) => panic!("changed() unexpectedly succeeded on dummy channel"),
     210          383 :                 Err(_) => {
     211          383 :                     // closed, which means that the cell has been dropped, and
     212          383 :                     // its value is no longer in use
     213          383 :                 }
     214              :             }
     215              :         }
     216          382 :     }
     217              : }
     218              : 
     219              : #[cfg(test)]
     220              : mod tests {
     221              :     use std::sync::Mutex;
     222              :     use std::time::Duration;
     223              : 
     224              :     use super::*;
     225              : 
     226              :     #[tokio::test]
     227            1 :     async fn two_writers() {
     228            1 :         let rcu = Rcu::new(1);
     229              : 
     230            1 :         let read1 = rcu.read();
     231            1 :         assert_eq!(*read1, 1);
     232              : 
     233            1 :         let write2 = rcu.lock_for_write();
     234            1 :         assert_eq!(*write2, 1);
     235            1 :         let wait2 = write2.store_and_unlock(2);
     236              : 
     237            1 :         let read2 = rcu.read();
     238            1 :         assert_eq!(*read2, 2);
     239              : 
     240            1 :         let write3 = rcu.lock_for_write();
     241            1 :         assert_eq!(*write3, 2);
     242            1 :         let wait3 = write3.store_and_unlock(3);
     243              : 
     244              :         // new reader can see the new value, and old readers continue to see the old values.
     245            1 :         let read3 = rcu.read();
     246            1 :         assert_eq!(*read3, 3);
     247            1 :         assert_eq!(*read2, 2);
     248            1 :         assert_eq!(*read1, 1);
     249              : 
     250            1 :         let log = Arc::new(Mutex::new(Vec::new()));
     251              :         // Wait for the old readers to finish in separate tasks.
     252            1 :         let log_clone = Arc::clone(&log);
     253            1 :         let task2 = tokio::spawn(async move {
     254            1 :             wait2.wait().await;
     255            1 :             log_clone.lock().unwrap().push("wait2 done");
     256            1 :         });
     257            1 :         let log_clone = Arc::clone(&log);
     258            1 :         let task3 = tokio::spawn(async move {
     259            1 :             wait3.wait().await;
     260            1 :             log_clone.lock().unwrap().push("wait3 done");
     261            1 :         });
     262              : 
     263              :         // without this sleep the test can pass on accident if the writer is slow
     264            1 :         tokio::time::sleep(Duration::from_millis(100)).await;
     265              : 
     266              :         // Release first reader. This allows first write to finish, but calling
     267              :         // wait() on the 'task3' would still block.
     268            1 :         log.lock().unwrap().push("dropping read1");
     269            1 :         drop(read1);
     270            1 :         task2.await.unwrap();
     271              : 
     272            1 :         assert!(!task3.is_finished());
     273              : 
     274            1 :         tokio::time::sleep(Duration::from_millis(100)).await;
     275              : 
     276              :         // Release second reader, and finish second writer.
     277            1 :         log.lock().unwrap().push("dropping read2");
     278            1 :         drop(read2);
     279            1 :         task3.await.unwrap();
     280              : 
     281            1 :         assert_eq!(
     282            1 :             log.lock().unwrap().as_slice(),
     283            1 :             &[
     284            1 :                 "dropping read1",
     285            1 :                 "wait2 done",
     286            1 :                 "dropping read2",
     287            1 :                 "wait3 done"
     288            1 :             ]
     289            1 :         );
     290            1 :     }
     291              : }
         |