TLA Line data Source code
1 : use pin_project_lite::pin_project;
2 : use std::io::Read;
3 : use std::pin::Pin;
4 : use std::{io, task};
5 : use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
6 :
7 : pin_project! {
8 : /// This stream tracks all writes and calls user provided
9 : /// callback when the underlying stream is flushed.
10 : pub struct MeasuredStream<S, R, W> {
11 : #[pin]
12 : stream: S,
13 : write_count: usize,
14 : inc_read_count: R,
15 : inc_write_count: W,
16 : }
17 : }
18 :
19 : impl<S, R, W> MeasuredStream<S, R, W> {
20 CBC 3201 : pub fn new(stream: S, inc_read_count: R, inc_write_count: W) -> Self {
21 3201 : Self {
22 3201 : stream,
23 3201 : write_count: 0,
24 3201 : inc_read_count,
25 3201 : inc_write_count,
26 3201 : }
27 3201 : }
28 : }
29 :
30 : impl<S: AsyncRead + Unpin, R: FnMut(usize), W> AsyncRead for MeasuredStream<S, R, W> {
31 7005564 : fn poll_read(
32 7005564 : self: Pin<&mut Self>,
33 7005564 : context: &mut task::Context<'_>,
34 7005564 : buf: &mut ReadBuf<'_>,
35 7005564 : ) -> task::Poll<io::Result<()>> {
36 7005564 : let this = self.project();
37 7005564 : let filled = buf.filled().len();
38 7005564 : this.stream.poll_read(context, buf).map_ok(|()| {
39 2177413 : let cnt = buf.filled().len() - filled;
40 2177413 : // Increment the read count.
41 2177413 : (this.inc_read_count)(cnt);
42 7005564 : })
43 7005564 : }
44 : }
45 :
46 : impl<S: AsyncWrite + Unpin, R, W: FnMut(usize)> AsyncWrite for MeasuredStream<S, R, W> {
47 1900278 : fn poll_write(
48 1900278 : self: Pin<&mut Self>,
49 1900278 : context: &mut task::Context<'_>,
50 1900278 : buf: &[u8],
51 1900278 : ) -> task::Poll<io::Result<usize>> {
52 1900278 : let this = self.project();
53 1900278 : this.stream.poll_write(context, buf).map_ok(|cnt| {
54 1882584 : // Increment the write count.
55 1882584 : *this.write_count += cnt;
56 1882584 : cnt
57 1900278 : })
58 1900278 : }
59 :
60 1883556 : fn poll_flush(
61 1883556 : self: Pin<&mut Self>,
62 1883556 : context: &mut task::Context<'_>,
63 1883556 : ) -> task::Poll<io::Result<()>> {
64 1883556 : let this = self.project();
65 1883556 : this.stream.poll_flush(context).map_ok(|()| {
66 1883556 : // Call the user provided callback and reset the write count.
67 1883556 : (this.inc_write_count)(*this.write_count);
68 1883556 : *this.write_count = 0;
69 1883556 : })
70 1883556 : }
71 :
72 2560 : fn poll_shutdown(
73 2560 : self: Pin<&mut Self>,
74 2560 : context: &mut task::Context<'_>,
75 2560 : ) -> task::Poll<io::Result<()>> {
76 2560 : self.project().stream.poll_shutdown(context)
77 2560 : }
78 : }
79 :
80 : /// Wrapper for a reader that counts bytes read.
81 : ///
82 : /// Similar to MeasuredStream but it's one way and it's sync
83 : pub struct MeasuredReader<R: Read> {
84 : inner: R,
85 : byte_count: usize,
86 : }
87 :
88 : impl<R: Read> MeasuredReader<R> {
89 539 : pub fn new(reader: R) -> Self {
90 539 : Self {
91 539 : inner: reader,
92 539 : byte_count: 0,
93 539 : }
94 539 : }
95 :
96 537 : pub fn get_byte_count(&self) -> usize {
97 537 : self.byte_count
98 537 : }
99 : }
100 :
101 : impl<R: Read> Read for MeasuredReader<R> {
102 797906 : fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
103 797906 : let result = self.inner.read(buf);
104 797906 : if let Ok(n_bytes) = result {
105 797904 : self.byte_count += n_bytes
106 2 : }
107 797906 : result
108 797906 : }
109 : }
|