LCOV - differential code coverage report
Current view: top level - libs/utils/src - measured_stream.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 100.0 % 66 66 66
Current Date: 2024-01-09 02:06:09 Functions: 61.4 % 70 43 27 43
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use pin_project_lite::pin_project;
       2                 : use std::io::Read;
       3                 : use std::pin::Pin;
       4                 : use std::{io, task};
       5                 : use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
       6                 : 
       7                 : pin_project! {
       8                 :     /// This stream tracks all writes and calls user provided
       9                 :     /// callback when the underlying stream is flushed.
      10                 :     pub struct MeasuredStream<S, R, W> {
      11                 :         #[pin]
      12                 :         stream: S,
      13                 :         write_count: usize,
      14                 :         inc_read_count: R,
      15                 :         inc_write_count: W,
      16                 :     }
      17                 : }
      18                 : 
      19                 : impl<S, R, W> MeasuredStream<S, R, W> {
      20 CBC        3201 :     pub fn new(stream: S, inc_read_count: R, inc_write_count: W) -> Self {
      21            3201 :         Self {
      22            3201 :             stream,
      23            3201 :             write_count: 0,
      24            3201 :             inc_read_count,
      25            3201 :             inc_write_count,
      26            3201 :         }
      27            3201 :     }
      28                 : }
      29                 : 
      30                 : impl<S: AsyncRead + Unpin, R: FnMut(usize), W> AsyncRead for MeasuredStream<S, R, W> {
      31         7005564 :     fn poll_read(
      32         7005564 :         self: Pin<&mut Self>,
      33         7005564 :         context: &mut task::Context<'_>,
      34         7005564 :         buf: &mut ReadBuf<'_>,
      35         7005564 :     ) -> task::Poll<io::Result<()>> {
      36         7005564 :         let this = self.project();
      37         7005564 :         let filled = buf.filled().len();
      38         7005564 :         this.stream.poll_read(context, buf).map_ok(|()| {
      39         2177413 :             let cnt = buf.filled().len() - filled;
      40         2177413 :             // Increment the read count.
      41         2177413 :             (this.inc_read_count)(cnt);
      42         7005564 :         })
      43         7005564 :     }
      44                 : }
      45                 : 
      46                 : impl<S: AsyncWrite + Unpin, R, W: FnMut(usize)> AsyncWrite for MeasuredStream<S, R, W> {
      47         1900278 :     fn poll_write(
      48         1900278 :         self: Pin<&mut Self>,
      49         1900278 :         context: &mut task::Context<'_>,
      50         1900278 :         buf: &[u8],
      51         1900278 :     ) -> task::Poll<io::Result<usize>> {
      52         1900278 :         let this = self.project();
      53         1900278 :         this.stream.poll_write(context, buf).map_ok(|cnt| {
      54         1882584 :             // Increment the write count.
      55         1882584 :             *this.write_count += cnt;
      56         1882584 :             cnt
      57         1900278 :         })
      58         1900278 :     }
      59                 : 
      60         1883556 :     fn poll_flush(
      61         1883556 :         self: Pin<&mut Self>,
      62         1883556 :         context: &mut task::Context<'_>,
      63         1883556 :     ) -> task::Poll<io::Result<()>> {
      64         1883556 :         let this = self.project();
      65         1883556 :         this.stream.poll_flush(context).map_ok(|()| {
      66         1883556 :             // Call the user provided callback and reset the write count.
      67         1883556 :             (this.inc_write_count)(*this.write_count);
      68         1883556 :             *this.write_count = 0;
      69         1883556 :         })
      70         1883556 :     }
      71                 : 
      72            2560 :     fn poll_shutdown(
      73            2560 :         self: Pin<&mut Self>,
      74            2560 :         context: &mut task::Context<'_>,
      75            2560 :     ) -> task::Poll<io::Result<()>> {
      76            2560 :         self.project().stream.poll_shutdown(context)
      77            2560 :     }
      78                 : }
      79                 : 
      80                 : /// Wrapper for a reader that counts bytes read.
      81                 : ///
      82                 : /// Similar to MeasuredStream but it's one way and it's sync
      83                 : pub struct MeasuredReader<R: Read> {
      84                 :     inner: R,
      85                 :     byte_count: usize,
      86                 : }
      87                 : 
      88                 : impl<R: Read> MeasuredReader<R> {
      89             539 :     pub fn new(reader: R) -> Self {
      90             539 :         Self {
      91             539 :             inner: reader,
      92             539 :             byte_count: 0,
      93             539 :         }
      94             539 :     }
      95                 : 
      96             537 :     pub fn get_byte_count(&self) -> usize {
      97             537 :         self.byte_count
      98             537 :     }
      99                 : }
     100                 : 
     101                 : impl<R: Read> Read for MeasuredReader<R> {
     102          797906 :     fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
     103          797906 :         let result = self.inner.read(buf);
     104          797906 :         if let Ok(n_bytes) = result {
     105          797904 :             self.byte_count += n_bytes
     106               2 :         }
     107          797906 :         result
     108          797906 :     }
     109                 : }
        

Generated by: LCOV version 2.1-beta