LCOV - code coverage report
Current view: top level - libs/metrics/src - wrappers.rs (source / functions) Coverage Total Hit
Test: 2e3a7638747e564a4f6d1af1cc0c3b3438fbb740.info Lines: 73.9 % 88 65
Test Date: 2024-11-20 01:36:58 Functions: 48.3 % 29 14

            Line data    Source code
       1              : use std::io::{Read, Result, Write};
       2              : 
       3              : /// A wrapper for an object implementing [Read]
       4              : /// which allows a closure to observe the amount of bytes read.
       5              : /// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
       6              : ///
       7              : /// Example:
       8              : ///
       9              : /// ```
      10              : /// # use std::io::{Result, Read};
      11              : /// # use metrics::{register_int_counter, IntCounter};
      12              : /// # use metrics::CountedReader;
      13              : /// # use once_cell::sync::Lazy;
      14              : /// #
      15              : /// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
      16              : /// #         "int_counter",
      17              : /// #         "let's count something!"
      18              : /// #     ).unwrap()
      19              : /// # });
      20              : /// #
      21              : /// fn do_some_reads(stream: impl Read, count: usize) -> Result<Vec<u8>> {
      22              : ///     let mut reader = CountedReader::new(stream, |cnt| {
      23              : ///         // bump a counter each time we do a read
      24              : ///         INT_COUNTER.inc_by(cnt as u64);
      25              : ///     });
      26              : ///
      27              : ///     let mut proto_header = [0; 8];
      28              : ///     reader.read_exact(&mut proto_header)?;
      29              : ///     assert!(&proto_header == b"deadbeef");
      30              : ///
      31              : ///     let mut payload = vec![0; count];
      32              : ///     reader.read_exact(&mut payload)?;
      33              : ///     Ok(payload)
      34              : /// }
      35              : /// ```
      36              : ///
      37              : /// NB: rapid concurrent bumping of an atomic counter might incur
      38              : /// a performance penalty. Please make sure to amortize the amount
      39              : /// of atomic operations by either using [BufReader](std::io::BufReader)
      40              : /// or choosing a non-atomic (thread local) counter.
      41              : pub struct CountedReader<'a, T> {
      42              :     reader: T,
      43              :     update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
      44              : }
      45              : 
      46              : impl<'a, T> CountedReader<'a, T> {
      47            2 :     pub fn new(reader: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
      48            2 :         Self {
      49            2 :             reader,
      50            2 :             update_counter: Box::new(update_counter),
      51            2 :         }
      52            2 :     }
      53              : 
      54              :     /// Get an immutable reference to the underlying [Read] implementor
      55            0 :     pub fn inner(&self) -> &T {
      56            0 :         &self.reader
      57            0 :     }
      58              : 
      59              :     /// Get a mutable reference to the underlying [Read] implementor
      60            0 :     pub fn inner_mut(&mut self) -> &mut T {
      61            0 :         &mut self.reader
      62            0 :     }
      63              : 
      64              :     /// Consume the wrapper and return the underlying [Read] implementor
      65            0 :     pub fn into_inner(self) -> T {
      66            0 :         self.reader
      67            0 :     }
      68              : }
      69              : 
      70              : impl<T: Read> Read for CountedReader<'_, T> {
      71            2 :     fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
      72            2 :         let count = self.reader.read(buf)?;
      73            2 :         (self.update_counter)(count);
      74            2 :         Ok(count)
      75            2 :     }
      76              : }
      77              : 
      78              : /// A wrapper for an object implementing [Write]
      79              : /// which allows a closure to observe the amount of bytes written.
      80              : /// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
      81              : ///
      82              : /// Example:
      83              : ///
      84              : /// ```
      85              : /// # use std::io::{Result, Write};
      86              : /// # use metrics::{register_int_counter, IntCounter};
      87              : /// # use metrics::CountedWriter;
      88              : /// # use once_cell::sync::Lazy;
      89              : /// #
      90              : /// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
      91              : /// #         "int_counter",
      92              : /// #         "let's count something!"
      93              : /// #     ).unwrap()
      94              : /// # });
      95              : /// #
      96              : /// fn do_some_writes(stream: impl Write, payload: &[u8]) -> Result<()> {
      97              : ///     let mut writer = CountedWriter::new(stream, |cnt| {
      98              : ///         // bump a counter each time we do a write
      99              : ///         INT_COUNTER.inc_by(cnt as u64);
     100              : ///     });
     101              : ///
     102              : ///     let proto_header = b"deadbeef";
     103              : ///     writer.write_all(proto_header)?;
     104              : ///     writer.write_all(payload)
     105              : /// }
     106              : /// ```
     107              : ///
     108              : /// NB: rapid concurrent bumping of an atomic counter might incur
     109              : /// a performance penalty. Please make sure to amortize the amount
     110              : /// of atomic operations by either using [BufWriter](std::io::BufWriter)
     111              : /// or choosing a non-atomic (thread local) counter.
     112              : pub struct CountedWriter<'a, T> {
     113              :     writer: T,
     114              :     update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
     115              : }
     116              : 
     117              : impl<'a, T> CountedWriter<'a, T> {
     118            2 :     pub fn new(writer: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
     119            2 :         Self {
     120            2 :             writer,
     121            2 :             update_counter: Box::new(update_counter),
     122            2 :         }
     123            2 :     }
     124              : 
     125              :     /// Get an immutable reference to the underlying [Write] implementor
     126            0 :     pub fn inner(&self) -> &T {
     127            0 :         &self.writer
     128            0 :     }
     129              : 
     130              :     /// Get a mutable reference to the underlying [Write] implementor
     131            0 :     pub fn inner_mut(&mut self) -> &mut T {
     132            0 :         &mut self.writer
     133            0 :     }
     134              : 
     135              :     /// Consume the wrapper and return the underlying [Write] implementor
     136            0 :     pub fn into_inner(self) -> T {
     137            0 :         self.writer
     138            0 :     }
     139              : }
     140              : 
     141              : impl<T: Write> Write for CountedWriter<'_, T> {
     142            2 :     fn write(&mut self, buf: &[u8]) -> Result<usize> {
     143            2 :         let count = self.writer.write(buf)?;
     144            2 :         (self.update_counter)(count);
     145            2 :         Ok(count)
     146            2 :     }
     147              : 
     148            0 :     fn flush(&mut self) -> Result<()> {
     149            0 :         self.writer.flush()
     150            0 :     }
     151              : }
     152              : 
     153              : #[cfg(test)]
     154              : mod tests {
     155              :     use super::*;
     156              : 
     157              :     #[test]
     158            1 :     fn test_counted_reader() {
     159            1 :         let stream = [0; 16];
     160            1 :         let mut total = 0;
     161            2 :         let mut reader = CountedReader::new(stream.as_ref(), |cnt| {
     162            2 :             total += cnt;
     163            2 :         });
     164            1 : 
     165            1 :         let mut buffer = [0; 8];
     166            1 :         reader.read_exact(&mut buffer).unwrap();
     167            1 :         reader.read_exact(&mut buffer).unwrap();
     168            1 : 
     169            1 :         drop(reader);
     170            1 :         assert_eq!(total, stream.len());
     171            1 :     }
     172              : 
     173              :     #[test]
     174            1 :     fn test_counted_writer() {
     175            1 :         let mut stream = [0; 16];
     176            1 :         let mut total = 0;
     177            2 :         let mut writer = CountedWriter::new(stream.as_mut(), |cnt| {
     178            2 :             total += cnt;
     179            2 :         });
     180            1 : 
     181            1 :         let buffer = [0; 8];
     182            1 :         writer.write_all(&buffer).unwrap();
     183            1 :         writer.write_all(&buffer).unwrap();
     184            1 : 
     185            1 :         drop(writer);
     186            1 :         assert_eq!(total, stream.len());
     187            1 :     }
     188              : 
     189              :     // This mimics the constraints of std::thread::spawn
     190            2 :     fn assert_send_sync(_x: impl Sync + Send + 'static) {}
     191              : 
     192              :     #[test]
     193            1 :     fn test_send_sync_counted_reader() {
     194            1 :         let stream: &[u8] = &[];
     195            1 :         let mut reader = CountedReader::new(stream, |_| {});
     196            1 : 
     197            1 :         assert_send_sync(move || {
     198            0 :             reader.read_exact(&mut []).unwrap();
     199            1 :         });
     200            1 :     }
     201              : 
     202              :     #[test]
     203            1 :     fn test_send_sync_counted_writer() {
     204            1 :         let stream = Vec::<u8>::new();
     205            1 :         let mut writer = CountedWriter::new(stream, |_| {});
     206            1 : 
     207            1 :         assert_send_sync(move || {
     208            0 :             writer.write_all(&[]).unwrap();
     209            1 :         });
     210            1 :     }
     211              : }
        

Generated by: LCOV version 2.1-beta