LCOV - differential code coverage report
Current view: top level - proxy/src - waiters.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 94.5 % 55 52 3 52
Current Date: 2023-10-19 02:04:12 Functions: 40.6 % 32 13 19 13
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use hashbrown::HashMap;
       2                 : use parking_lot::Mutex;
       3                 : use pin_project_lite::pin_project;
       4                 : use std::pin::Pin;
       5                 : use std::task;
       6                 : use thiserror::Error;
       7                 : use tokio::sync::oneshot;
       8                 : 
       9 UBC           0 : #[derive(Debug, Error)]
      10                 : pub enum RegisterError {
      11                 :     #[error("Waiter `{0}` already registered")]
      12                 :     Occupied(String),
      13                 : }
      14                 : 
      15               0 : #[derive(Debug, Error)]
      16                 : pub enum NotifyError {
      17                 :     #[error("Notify failed: waiter `{0}` not registered")]
      18                 :     NotFound(String),
      19                 : 
      20                 :     #[error("Notify failed: channel hangup")]
      21                 :     Hangup,
      22                 : }
      23                 : 
      24               0 : #[derive(Debug, Error)]
      25                 : pub enum WaitError {
      26                 :     #[error("Wait failed: channel hangup")]
      27                 :     Hangup,
      28                 : }
      29                 : 
      30                 : pub struct Waiters<T>(pub(self) Mutex<HashMap<String, oneshot::Sender<T>>>);
      31                 : 
      32                 : impl<T> Default for Waiters<T> {
      33 CBC           3 :     fn default() -> Self {
      34               3 :         Waiters(Default::default())
      35               3 :     }
      36                 : }
      37                 : 
      38                 : impl<T> Waiters<T> {
      39               4 :     pub fn register(&self, key: String) -> Result<Waiter<T>, RegisterError> {
      40               4 :         let (tx, rx) = oneshot::channel();
      41               4 : 
      42               4 :         self.0
      43               4 :             .lock()
      44               4 :             .try_insert(key.clone(), tx)
      45               4 :             .map_err(|e| RegisterError::Occupied(e.entry.key().clone()))?;
      46                 : 
      47               4 :         Ok(Waiter {
      48               4 :             receiver: rx,
      49               4 :             guard: DropKey {
      50               4 :                 registry: self,
      51               4 :                 key,
      52               4 :             },
      53               4 :         })
      54               4 :     }
      55                 : 
      56               4 :     pub fn notify(&self, key: &str, value: T) -> Result<(), NotifyError>
      57               4 :     where
      58               4 :         T: Send + Sync,
      59               4 :     {
      60               4 :         let tx = self
      61               4 :             .0
      62               4 :             .lock()
      63               4 :             .remove(key)
      64               4 :             .ok_or_else(|| NotifyError::NotFound(key.to_string()))?;
      65                 : 
      66               4 :         tx.send(value).map_err(|_| NotifyError::Hangup)
      67               4 :     }
      68                 : }
      69                 : 
      70                 : struct DropKey<'a, T> {
      71                 :     key: String,
      72                 :     registry: &'a Waiters<T>,
      73                 : }
      74                 : 
      75                 : impl<'a, T> Drop for DropKey<'a, T> {
      76               4 :     fn drop(&mut self) {
      77               4 :         self.registry.0.lock().remove(&self.key);
      78               4 :     }
      79                 : }
      80                 : 
      81                 : pin_project! {
      82                 :     pub struct Waiter<'a, T> {
      83                 :         #[pin]
      84                 :         receiver: oneshot::Receiver<T>,
      85                 :         guard: DropKey<'a, T>,
      86                 :     }
      87                 : }
      88                 : 
      89                 : impl<T> std::future::Future for Waiter<'_, T> {
      90                 :     type Output = Result<T, WaitError>;
      91                 : 
      92               8 :     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
      93               8 :         self.project()
      94               8 :             .receiver
      95               8 :             .poll(cx)
      96               8 :             .map_err(|_| WaitError::Hangup)
      97               8 :     }
      98                 : }
      99                 : 
     100                 : #[cfg(test)]
     101                 : mod tests {
     102                 :     use super::*;
     103                 :     use std::sync::Arc;
     104                 : 
     105               1 :     #[tokio::test]
     106               1 :     async fn test_waiter() -> anyhow::Result<()> {
     107               1 :         let waiters = Arc::new(Waiters::default());
     108               1 : 
     109               1 :         let key = "Key";
     110               1 :         let waiter = waiters.register(key.to_owned())?;
     111                 : 
     112               1 :         let waiters = Arc::clone(&waiters);
     113               1 :         let notifier = tokio::spawn(async move {
     114               1 :             waiters.notify(key, Default::default())?;
     115               1 :             Ok(())
     116               1 :         });
     117               1 : 
     118               1 :         waiter.await?;
     119               1 :         notifier.await?
     120                 :     }
     121                 : }
        

Generated by: LCOV version 2.1-beta