Line data Source code
1 : use std::io::{Read, Result, Write};
2 :
3 : /// A wrapper for an object implementing [Read]
4 : /// which allows a closure to observe the amount of bytes read.
5 : /// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
6 : ///
7 : /// Example:
8 : ///
9 : /// ```
10 : /// # use std::io::{Result, Read};
11 : /// # use metrics::{register_int_counter, IntCounter};
12 : /// # use metrics::CountedReader;
13 : /// # use once_cell::sync::Lazy;
14 : /// #
15 : /// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
16 : /// # "int_counter",
17 : /// # "let's count something!"
18 : /// # ).unwrap()
19 : /// # });
20 : /// #
21 : /// fn do_some_reads(stream: impl Read, count: usize) -> Result<Vec<u8>> {
22 : /// let mut reader = CountedReader::new(stream, |cnt| {
23 : /// // bump a counter each time we do a read
24 : /// INT_COUNTER.inc_by(cnt as u64);
25 : /// });
26 : ///
27 : /// let mut proto_header = [0; 8];
28 : /// reader.read_exact(&mut proto_header)?;
29 : /// assert!(&proto_header == b"deadbeef");
30 : ///
31 : /// let mut payload = vec![0; count];
32 : /// reader.read_exact(&mut payload)?;
33 : /// Ok(payload)
34 : /// }
35 : /// ```
36 : ///
37 : /// NB: rapid concurrent bumping of an atomic counter might incur
38 : /// a performance penalty. Please make sure to amortize the amount
39 : /// of atomic operations by either using [BufReader](std::io::BufReader)
40 : /// or choosing a non-atomic (thread local) counter.
41 : pub struct CountedReader<'a, T> {
42 : reader: T,
43 : update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
44 : }
45 :
46 : impl<'a, T> CountedReader<'a, T> {
47 4 : pub fn new(reader: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
48 4 : Self {
49 4 : reader,
50 4 : update_counter: Box::new(update_counter),
51 4 : }
52 4 : }
53 :
54 : /// Get an immutable reference to the underlying [Read] implementor
55 0 : pub fn inner(&self) -> &T {
56 0 : &self.reader
57 0 : }
58 :
59 : /// Get a mutable reference to the underlying [Read] implementor
60 0 : pub fn inner_mut(&mut self) -> &mut T {
61 0 : &mut self.reader
62 0 : }
63 :
64 : /// Consume the wrapper and return the underlying [Read] implementor
65 0 : pub fn into_inner(self) -> T {
66 0 : self.reader
67 0 : }
68 : }
69 :
70 : impl<T: Read> Read for CountedReader<'_, T> {
71 4 : fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
72 4 : let count = self.reader.read(buf)?;
73 4 : (self.update_counter)(count);
74 4 : Ok(count)
75 4 : }
76 : }
77 :
78 : /// A wrapper for an object implementing [Write]
79 : /// which allows a closure to observe the amount of bytes written.
80 : /// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
81 : ///
82 : /// Example:
83 : ///
84 : /// ```
85 : /// # use std::io::{Result, Write};
86 : /// # use metrics::{register_int_counter, IntCounter};
87 : /// # use metrics::CountedWriter;
88 : /// # use once_cell::sync::Lazy;
89 : /// #
90 : /// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
91 : /// # "int_counter",
92 : /// # "let's count something!"
93 : /// # ).unwrap()
94 : /// # });
95 : /// #
96 : /// fn do_some_writes(stream: impl Write, payload: &[u8]) -> Result<()> {
97 : /// let mut writer = CountedWriter::new(stream, |cnt| {
98 : /// // bump a counter each time we do a write
99 : /// INT_COUNTER.inc_by(cnt as u64);
100 : /// });
101 : ///
102 : /// let proto_header = b"deadbeef";
103 : /// writer.write_all(proto_header)?;
104 : /// writer.write_all(payload)
105 : /// }
106 : /// ```
107 : ///
108 : /// NB: rapid concurrent bumping of an atomic counter might incur
109 : /// a performance penalty. Please make sure to amortize the amount
110 : /// of atomic operations by either using [BufWriter](std::io::BufWriter)
111 : /// or choosing a non-atomic (thread local) counter.
112 : pub struct CountedWriter<'a, T> {
113 : writer: T,
114 : update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
115 : }
116 :
117 : impl<'a, T> CountedWriter<'a, T> {
118 4 : pub fn new(writer: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
119 4 : Self {
120 4 : writer,
121 4 : update_counter: Box::new(update_counter),
122 4 : }
123 4 : }
124 :
125 : /// Get an immutable reference to the underlying [Write] implementor
126 0 : pub fn inner(&self) -> &T {
127 0 : &self.writer
128 0 : }
129 :
130 : /// Get a mutable reference to the underlying [Write] implementor
131 0 : pub fn inner_mut(&mut self) -> &mut T {
132 0 : &mut self.writer
133 0 : }
134 :
135 : /// Consume the wrapper and return the underlying [Write] implementor
136 0 : pub fn into_inner(self) -> T {
137 0 : self.writer
138 0 : }
139 : }
140 :
141 : impl<T: Write> Write for CountedWriter<'_, T> {
142 4 : fn write(&mut self, buf: &[u8]) -> Result<usize> {
143 4 : let count = self.writer.write(buf)?;
144 4 : (self.update_counter)(count);
145 4 : Ok(count)
146 4 : }
147 :
148 0 : fn flush(&mut self) -> Result<()> {
149 0 : self.writer.flush()
150 0 : }
151 : }
152 :
153 : #[cfg(test)]
154 : mod tests {
155 : use super::*;
156 :
157 2 : #[test]
158 2 : fn test_counted_reader() {
159 2 : let stream = [0; 16];
160 2 : let mut total = 0;
161 4 : let mut reader = CountedReader::new(stream.as_ref(), |cnt| {
162 4 : total += cnt;
163 4 : });
164 2 :
165 2 : let mut buffer = [0; 8];
166 2 : reader.read_exact(&mut buffer).unwrap();
167 2 : reader.read_exact(&mut buffer).unwrap();
168 2 :
169 2 : drop(reader);
170 2 : assert_eq!(total, stream.len());
171 2 : }
172 :
173 2 : #[test]
174 2 : fn test_counted_writer() {
175 2 : let mut stream = [0; 16];
176 2 : let mut total = 0;
177 4 : let mut writer = CountedWriter::new(stream.as_mut(), |cnt| {
178 4 : total += cnt;
179 4 : });
180 2 :
181 2 : let buffer = [0; 8];
182 2 : writer.write_all(&buffer).unwrap();
183 2 : writer.write_all(&buffer).unwrap();
184 2 :
185 2 : drop(writer);
186 2 : assert_eq!(total, stream.len());
187 2 : }
188 :
189 : // This mimics the constraints of std::thread::spawn
190 4 : fn assert_send_sync(_x: impl Sync + Send + 'static) {}
191 :
192 2 : #[test]
193 2 : fn test_send_sync_counted_reader() {
194 2 : let stream: &[u8] = &[];
195 2 : let mut reader = CountedReader::new(stream, |_| {});
196 2 :
197 2 : assert_send_sync(move || {
198 0 : reader.read_exact(&mut []).unwrap();
199 2 : });
200 2 : }
201 :
202 2 : #[test]
203 2 : fn test_send_sync_counted_writer() {
204 2 : let stream = Vec::<u8>::new();
205 2 : let mut writer = CountedWriter::new(stream, |_| {});
206 2 :
207 2 : assert_send_sync(move || {
208 0 : writer.write_all(&[]).unwrap();
209 2 : });
210 2 : }
211 : }
|