LCOV - differential code coverage report
Current view: top level - libs/metrics/src - wrappers.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 75.0 % 92 69 23 69
Current Date: 2023-10-19 02:04:12 Functions: 55.2 % 29 16 13 16
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::io::{Read, Result, Write};
       2                 : 
       3                 : /// A wrapper for an object implementing [Read]
       4                 : /// which allows a closure to observe the amount of bytes read.
       5                 : /// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
       6                 : ///
       7                 : /// Example:
       8                 : ///
       9                 : /// ```
      10                 : /// # use std::io::{Result, Read};
      11                 : /// # use metrics::{register_int_counter, IntCounter};
      12                 : /// # use metrics::CountedReader;
      13                 : /// # use once_cell::sync::Lazy;
      14                 : /// #
      15                 : /// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
      16                 : /// #         "int_counter",
      17                 : /// #         "let's count something!"
      18                 : /// #     ).unwrap()
      19                 : /// # });
      20                 : /// #
      21                 : /// fn do_some_reads(stream: impl Read, count: usize) -> Result<Vec<u8>> {
      22                 : ///     let mut reader = CountedReader::new(stream, |cnt| {
      23                 : ///         // bump a counter each time we do a read
      24                 : ///         INT_COUNTER.inc_by(cnt as u64);
      25                 : ///     });
      26                 : ///
      27                 : ///     let mut proto_header = [0; 8];
      28                 : ///     reader.read_exact(&mut proto_header)?;
      29                 : ///     assert!(&proto_header == b"deadbeef");
      30                 : ///
      31                 : ///     let mut payload = vec![0; count];
      32                 : ///     reader.read_exact(&mut payload)?;
      33                 : ///     Ok(payload)
      34                 : /// }
      35                 : /// ```
      36                 : ///
      37                 : /// NB: rapid concurrent bumping of an atomic counter might incur
      38                 : /// a performance penalty. Please make sure to amortize the amount
      39                 : /// of atomic operations by either using [BufReader](std::io::BufReader)
      40                 : /// or choosing a non-atomic (thread local) counter.
      41                 : pub struct CountedReader<'a, T> {
      42                 :     reader: T,
      43                 :     update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
      44                 : }
      45                 : 
      46                 : impl<'a, T> CountedReader<'a, T> {
      47 CBC           2 :     pub fn new(reader: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
      48               2 :         Self {
      49               2 :             reader,
      50               2 :             update_counter: Box::new(update_counter),
      51               2 :         }
      52               2 :     }
      53                 : 
      54                 :     /// Get an immutable reference to the underlying [Read] implementor
      55 UBC           0 :     pub fn inner(&self) -> &T {
      56               0 :         &self.reader
      57               0 :     }
      58                 : 
      59                 :     /// Get a mutable reference to the underlying [Read] implementor
      60               0 :     pub fn inner_mut(&mut self) -> &mut T {
      61               0 :         &mut self.reader
      62               0 :     }
      63                 : 
      64                 :     /// Consume the wrapper and return the underlying [Read] implementor
      65               0 :     pub fn into_inner(self) -> T {
      66               0 :         self.reader
      67               0 :     }
      68                 : }
      69                 : 
      70                 : impl<T: Read> Read for CountedReader<'_, T> {
      71 CBC           2 :     fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
      72               2 :         let count = self.reader.read(buf)?;
      73               2 :         (self.update_counter)(count);
      74               2 :         Ok(count)
      75               2 :     }
      76                 : }
      77                 : 
      78                 : /// A wrapper for an object implementing [Write]
      79                 : /// which allows a closure to observe the amount of bytes written.
      80                 : /// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
      81                 : ///
      82                 : /// Example:
      83                 : ///
      84                 : /// ```
      85                 : /// # use std::io::{Result, Write};
      86                 : /// # use metrics::{register_int_counter, IntCounter};
      87                 : /// # use metrics::CountedWriter;
      88                 : /// # use once_cell::sync::Lazy;
      89                 : /// #
      90                 : /// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
      91                 : /// #         "int_counter",
      92                 : /// #         "let's count something!"
      93                 : /// #     ).unwrap()
      94                 : /// # });
      95                 : /// #
      96                 : /// fn do_some_writes(stream: impl Write, payload: &[u8]) -> Result<()> {
      97                 : ///     let mut writer = CountedWriter::new(stream, |cnt| {
      98                 : ///         // bump a counter each time we do a write
      99                 : ///         INT_COUNTER.inc_by(cnt as u64);
     100                 : ///     });
     101                 : ///
     102                 : ///     let proto_header = b"deadbeef";
     103                 : ///     writer.write_all(proto_header)?;
     104                 : ///     writer.write_all(payload)
     105                 : /// }
     106                 : /// ```
     107                 : ///
     108                 : /// NB: rapid concurrent bumping of an atomic counter might incur
     109                 : /// a performance penalty. Please make sure to amortize the amount
     110                 : /// of atomic operations by either using [BufWriter](std::io::BufWriter)
     111                 : /// or choosing a non-atomic (thread local) counter.
     112                 : pub struct CountedWriter<'a, T> {
     113                 :     writer: T,
     114                 :     update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
     115                 : }
     116                 : 
     117                 : impl<'a, T> CountedWriter<'a, T> {
     118               2 :     pub fn new(writer: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
     119               2 :         Self {
     120               2 :             writer,
     121               2 :             update_counter: Box::new(update_counter),
     122               2 :         }
     123               2 :     }
     124                 : 
     125                 :     /// Get an immutable reference to the underlying [Write] implementor
     126 UBC           0 :     pub fn inner(&self) -> &T {
     127               0 :         &self.writer
     128               0 :     }
     129                 : 
     130                 :     /// Get a mutable reference to the underlying [Write] implementor
     131               0 :     pub fn inner_mut(&mut self) -> &mut T {
     132               0 :         &mut self.writer
     133               0 :     }
     134                 : 
     135                 :     /// Consume the wrapper and return the underlying [Write] implementor
     136               0 :     pub fn into_inner(self) -> T {
     137               0 :         self.writer
     138               0 :     }
     139                 : }
     140                 : 
     141                 : impl<T: Write> Write for CountedWriter<'_, T> {
     142 CBC           2 :     fn write(&mut self, buf: &[u8]) -> Result<usize> {
     143               2 :         let count = self.writer.write(buf)?;
     144               2 :         (self.update_counter)(count);
     145               2 :         Ok(count)
     146               2 :     }
     147                 : 
     148 UBC           0 :     fn flush(&mut self) -> Result<()> {
     149               0 :         self.writer.flush()
     150               0 :     }
     151                 : }
     152                 : 
     153                 : #[cfg(test)]
     154                 : mod tests {
     155                 :     use super::*;
     156                 : 
     157 CBC           1 :     #[test]
     158               1 :     fn test_counted_reader() {
     159               1 :         let stream = [0; 16];
     160               1 :         let mut total = 0;
     161               2 :         let mut reader = CountedReader::new(stream.as_ref(), |cnt| {
     162               2 :             total += cnt;
     163               2 :         });
     164               1 : 
     165               1 :         let mut buffer = [0; 8];
     166               1 :         reader.read_exact(&mut buffer).unwrap();
     167               1 :         reader.read_exact(&mut buffer).unwrap();
     168               1 : 
     169               1 :         drop(reader);
     170               1 :         assert_eq!(total, stream.len());
     171               1 :     }
     172                 : 
     173               1 :     #[test]
     174               1 :     fn test_counted_writer() {
     175               1 :         let mut stream = [0; 16];
     176               1 :         let mut total = 0;
     177               2 :         let mut writer = CountedWriter::new(stream.as_mut(), |cnt| {
     178               2 :             total += cnt;
     179               2 :         });
     180               1 : 
     181               1 :         let buffer = [0; 8];
     182               1 :         writer.write_all(&buffer).unwrap();
     183               1 :         writer.write_all(&buffer).unwrap();
     184               1 : 
     185               1 :         drop(writer);
     186               1 :         assert_eq!(total, stream.len());
     187               1 :     }
     188                 : 
     189                 :     // This mimics the constraints of std::thread::spawn
     190               2 :     fn assert_send_sync(_x: impl Sync + Send + 'static) {}
     191                 : 
     192               1 :     #[test]
     193               1 :     fn test_send_sync_counted_reader() {
     194               1 :         let stream: &[u8] = &[];
     195               1 :         let mut reader = CountedReader::new(stream, |_| {});
     196               1 : 
     197               1 :         assert_send_sync(move || {
     198 UBC           0 :             reader.read_exact(&mut []).unwrap();
     199 CBC           1 :         });
     200               1 :     }
     201                 : 
     202               1 :     #[test]
     203               1 :     fn test_send_sync_counted_writer() {
     204               1 :         let stream = Vec::<u8>::new();
     205               1 :         let mut writer = CountedWriter::new(stream, |_| {});
     206               1 : 
     207               1 :         assert_send_sync(move || {
     208 UBC           0 :             writer.write_all(&[]).unwrap();
     209 CBC           1 :         });
     210               1 :     }
     211                 : }
        

Generated by: LCOV version 2.1-beta