LCOV - code coverage report
Current view: top level - libs/metrics/src - wrappers.rs (source / functions) Coverage Total Hit
Test: 86c536b7fe84b2afe03c3bb264199e9c319ae0f8.info Lines: 73.9 % 88 65
Test Date: 2024-06-24 16:38:41 Functions: 48.3 % 29 14

            Line data    Source code
       1              : use std::io::{Read, Result, Write};
       2              : 
       3              : /// A wrapper for an object implementing [Read]
       4              : /// which allows a closure to observe the amount of bytes read.
       5              : /// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
       6              : ///
       7              : /// Example:
       8              : ///
       9              : /// ```
      10              : /// # use std::io::{Result, Read};
      11              : /// # use metrics::{register_int_counter, IntCounter};
      12              : /// # use metrics::CountedReader;
      13              : /// # use once_cell::sync::Lazy;
      14              : /// #
      15              : /// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
      16              : /// #         "int_counter",
      17              : /// #         "let's count something!"
      18              : /// #     ).unwrap()
      19              : /// # });
      20              : /// #
      21              : /// fn do_some_reads(stream: impl Read, count: usize) -> Result<Vec<u8>> {
      22              : ///     let mut reader = CountedReader::new(stream, |cnt| {
      23              : ///         // bump a counter each time we do a read
      24              : ///         INT_COUNTER.inc_by(cnt as u64);
      25              : ///     });
      26              : ///
      27              : ///     let mut proto_header = [0; 8];
      28              : ///     reader.read_exact(&mut proto_header)?;
      29              : ///     assert!(&proto_header == b"deadbeef");
      30              : ///
      31              : ///     let mut payload = vec![0; count];
      32              : ///     reader.read_exact(&mut payload)?;
      33              : ///     Ok(payload)
      34              : /// }
      35              : /// ```
      36              : ///
      37              : /// NB: rapid concurrent bumping of an atomic counter might incur
      38              : /// a performance penalty. Please make sure to amortize the amount
      39              : /// of atomic operations by either using [BufReader](std::io::BufReader)
      40              : /// or choosing a non-atomic (thread local) counter.
      41              : pub struct CountedReader<'a, T> {
      42              :     reader: T,
      43              :     update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
      44              : }
      45              : 
      46              : impl<'a, T> CountedReader<'a, T> {
      47            4 :     pub fn new(reader: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
      48            4 :         Self {
      49            4 :             reader,
      50            4 :             update_counter: Box::new(update_counter),
      51            4 :         }
      52            4 :     }
      53              : 
      54              :     /// Get an immutable reference to the underlying [Read] implementor
      55            0 :     pub fn inner(&self) -> &T {
      56            0 :         &self.reader
      57            0 :     }
      58              : 
      59              :     /// Get a mutable reference to the underlying [Read] implementor
      60            0 :     pub fn inner_mut(&mut self) -> &mut T {
      61            0 :         &mut self.reader
      62            0 :     }
      63              : 
      64              :     /// Consume the wrapper and return the underlying [Read] implementor
      65            0 :     pub fn into_inner(self) -> T {
      66            0 :         self.reader
      67            0 :     }
      68              : }
      69              : 
      70              : impl<T: Read> Read for CountedReader<'_, T> {
      71            4 :     fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
      72            4 :         let count = self.reader.read(buf)?;
      73            4 :         (self.update_counter)(count);
      74            4 :         Ok(count)
      75            4 :     }
      76              : }
      77              : 
      78              : /// A wrapper for an object implementing [Write]
      79              : /// which allows a closure to observe the amount of bytes written.
      80              : /// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
      81              : ///
      82              : /// Example:
      83              : ///
      84              : /// ```
      85              : /// # use std::io::{Result, Write};
      86              : /// # use metrics::{register_int_counter, IntCounter};
      87              : /// # use metrics::CountedWriter;
      88              : /// # use once_cell::sync::Lazy;
      89              : /// #
      90              : /// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
      91              : /// #         "int_counter",
      92              : /// #         "let's count something!"
      93              : /// #     ).unwrap()
      94              : /// # });
      95              : /// #
      96              : /// fn do_some_writes(stream: impl Write, payload: &[u8]) -> Result<()> {
      97              : ///     let mut writer = CountedWriter::new(stream, |cnt| {
      98              : ///         // bump a counter each time we do a write
      99              : ///         INT_COUNTER.inc_by(cnt as u64);
     100              : ///     });
     101              : ///
     102              : ///     let proto_header = b"deadbeef";
     103              : ///     writer.write_all(proto_header)?;
     104              : ///     writer.write_all(payload)
     105              : /// }
     106              : /// ```
     107              : ///
     108              : /// NB: rapid concurrent bumping of an atomic counter might incur
     109              : /// a performance penalty. Please make sure to amortize the amount
     110              : /// of atomic operations by either using [BufWriter](std::io::BufWriter)
     111              : /// or choosing a non-atomic (thread local) counter.
     112              : pub struct CountedWriter<'a, T> {
     113              :     writer: T,
     114              :     update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
     115              : }
     116              : 
     117              : impl<'a, T> CountedWriter<'a, T> {
     118            4 :     pub fn new(writer: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
     119            4 :         Self {
     120            4 :             writer,
     121            4 :             update_counter: Box::new(update_counter),
     122            4 :         }
     123            4 :     }
     124              : 
     125              :     /// Get an immutable reference to the underlying [Write] implementor
     126            0 :     pub fn inner(&self) -> &T {
     127            0 :         &self.writer
     128            0 :     }
     129              : 
     130              :     /// Get a mutable reference to the underlying [Write] implementor
     131            0 :     pub fn inner_mut(&mut self) -> &mut T {
     132            0 :         &mut self.writer
     133            0 :     }
     134              : 
     135              :     /// Consume the wrapper and return the underlying [Write] implementor
     136            0 :     pub fn into_inner(self) -> T {
     137            0 :         self.writer
     138            0 :     }
     139              : }
     140              : 
     141              : impl<T: Write> Write for CountedWriter<'_, T> {
     142            4 :     fn write(&mut self, buf: &[u8]) -> Result<usize> {
     143            4 :         let count = self.writer.write(buf)?;
     144            4 :         (self.update_counter)(count);
     145            4 :         Ok(count)
     146            4 :     }
     147              : 
     148            0 :     fn flush(&mut self) -> Result<()> {
     149            0 :         self.writer.flush()
     150            0 :     }
     151              : }
     152              : 
     153              : #[cfg(test)]
     154              : mod tests {
     155              :     use super::*;
     156              : 
     157              :     #[test]
     158            2 :     fn test_counted_reader() {
     159            2 :         let stream = [0; 16];
     160            2 :         let mut total = 0;
     161            4 :         let mut reader = CountedReader::new(stream.as_ref(), |cnt| {
     162            4 :             total += cnt;
     163            4 :         });
     164            2 : 
     165            2 :         let mut buffer = [0; 8];
     166            2 :         reader.read_exact(&mut buffer).unwrap();
     167            2 :         reader.read_exact(&mut buffer).unwrap();
     168            2 : 
     169            2 :         drop(reader);
     170            2 :         assert_eq!(total, stream.len());
     171            2 :     }
     172              : 
     173              :     #[test]
     174            2 :     fn test_counted_writer() {
     175            2 :         let mut stream = [0; 16];
     176            2 :         let mut total = 0;
     177            4 :         let mut writer = CountedWriter::new(stream.as_mut(), |cnt| {
     178            4 :             total += cnt;
     179            4 :         });
     180            2 : 
     181            2 :         let buffer = [0; 8];
     182            2 :         writer.write_all(&buffer).unwrap();
     183            2 :         writer.write_all(&buffer).unwrap();
     184            2 : 
     185            2 :         drop(writer);
     186            2 :         assert_eq!(total, stream.len());
     187            2 :     }
     188              : 
     189              :     // This mimics the constraints of std::thread::spawn
     190            4 :     fn assert_send_sync(_x: impl Sync + Send + 'static) {}
     191              : 
     192              :     #[test]
     193            2 :     fn test_send_sync_counted_reader() {
     194            2 :         let stream: &[u8] = &[];
     195            2 :         let mut reader = CountedReader::new(stream, |_| {});
     196            2 : 
     197            2 :         assert_send_sync(move || {
     198            0 :             reader.read_exact(&mut []).unwrap();
     199            2 :         });
     200            2 :     }
     201              : 
     202              :     #[test]
     203            2 :     fn test_send_sync_counted_writer() {
     204            2 :         let stream = Vec::<u8>::new();
     205            2 :         let mut writer = CountedWriter::new(stream, |_| {});
     206            2 : 
     207            2 :         assert_send_sync(move || {
     208            0 :             writer.write_all(&[]).unwrap();
     209            2 :         });
     210            2 :     }
     211              : }
        

Generated by: LCOV version 2.1-beta