Line data Source code
1 : use std::io::{Read, Result, Write};
2 :
3 : /// A wrapper for an object implementing [Read]
4 : /// which allows a closure to observe the amount of bytes read.
5 : /// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
6 : ///
7 : /// Example:
8 : ///
9 : /// ```
10 : /// # use std::io::{Result, Read};
11 : /// # use metrics::{register_int_counter, IntCounter};
12 : /// # use metrics::CountedReader;
13 : /// # use once_cell::sync::Lazy;
14 : /// #
15 : /// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
16 : /// # "int_counter",
17 : /// # "let's count something!"
18 : /// # ).unwrap()
19 : /// # });
20 : /// #
21 : /// fn do_some_reads(stream: impl Read, count: usize) -> Result<Vec<u8>> {
22 : /// let mut reader = CountedReader::new(stream, |cnt| {
23 : /// // bump a counter each time we do a read
24 : /// INT_COUNTER.inc_by(cnt as u64);
25 : /// });
26 : ///
27 : /// let mut proto_header = [0; 8];
28 : /// reader.read_exact(&mut proto_header)?;
29 : /// assert!(&proto_header == b"deadbeef");
30 : ///
31 : /// let mut payload = vec![0; count];
32 : /// reader.read_exact(&mut payload)?;
33 : /// Ok(payload)
34 : /// }
35 : /// ```
36 : ///
37 : /// NB: rapid concurrent bumping of an atomic counter might incur
38 : /// a performance penalty. Please make sure to amortize the amount
39 : /// of atomic operations by either using [BufReader](std::io::BufReader)
40 : /// or choosing a non-atomic (thread local) counter.
41 : pub struct CountedReader<'a, T> {
42 : reader: T,
43 : update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
44 : }
45 :
46 : impl<'a, T> CountedReader<'a, T> {
47 2 : pub fn new(reader: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
48 2 : Self {
49 2 : reader,
50 2 : update_counter: Box::new(update_counter),
51 2 : }
52 2 : }
53 :
54 : /// Get an immutable reference to the underlying [Read] implementor
55 0 : pub fn inner(&self) -> &T {
56 0 : &self.reader
57 0 : }
58 :
59 : /// Get a mutable reference to the underlying [Read] implementor
60 0 : pub fn inner_mut(&mut self) -> &mut T {
61 0 : &mut self.reader
62 0 : }
63 :
64 : /// Consume the wrapper and return the underlying [Read] implementor
65 0 : pub fn into_inner(self) -> T {
66 0 : self.reader
67 0 : }
68 : }
69 :
70 : impl<T: Read> Read for CountedReader<'_, T> {
71 2 : fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
72 2 : let count = self.reader.read(buf)?;
73 2 : (self.update_counter)(count);
74 2 : Ok(count)
75 2 : }
76 : }
77 :
78 : /// A wrapper for an object implementing [Write]
79 : /// which allows a closure to observe the amount of bytes written.
80 : /// This is useful in conjunction with metrics (e.g. [IntCounter](crate::IntCounter)).
81 : ///
82 : /// Example:
83 : ///
84 : /// ```
85 : /// # use std::io::{Result, Write};
86 : /// # use metrics::{register_int_counter, IntCounter};
87 : /// # use metrics::CountedWriter;
88 : /// # use once_cell::sync::Lazy;
89 : /// #
90 : /// # static INT_COUNTER: Lazy<IntCounter> = Lazy::new( || { register_int_counter!(
91 : /// # "int_counter",
92 : /// # "let's count something!"
93 : /// # ).unwrap()
94 : /// # });
95 : /// #
96 : /// fn do_some_writes(stream: impl Write, payload: &[u8]) -> Result<()> {
97 : /// let mut writer = CountedWriter::new(stream, |cnt| {
98 : /// // bump a counter each time we do a write
99 : /// INT_COUNTER.inc_by(cnt as u64);
100 : /// });
101 : ///
102 : /// let proto_header = b"deadbeef";
103 : /// writer.write_all(proto_header)?;
104 : /// writer.write_all(payload)
105 : /// }
106 : /// ```
107 : ///
108 : /// NB: rapid concurrent bumping of an atomic counter might incur
109 : /// a performance penalty. Please make sure to amortize the amount
110 : /// of atomic operations by either using [BufWriter](std::io::BufWriter)
111 : /// or choosing a non-atomic (thread local) counter.
112 : pub struct CountedWriter<'a, T> {
113 : writer: T,
114 : update_counter: Box<dyn FnMut(usize) + Sync + Send + 'a>,
115 : }
116 :
117 : impl<'a, T> CountedWriter<'a, T> {
118 2 : pub fn new(writer: T, update_counter: impl FnMut(usize) + Sync + Send + 'a) -> Self {
119 2 : Self {
120 2 : writer,
121 2 : update_counter: Box::new(update_counter),
122 2 : }
123 2 : }
124 :
125 : /// Get an immutable reference to the underlying [Write] implementor
126 0 : pub fn inner(&self) -> &T {
127 0 : &self.writer
128 0 : }
129 :
130 : /// Get a mutable reference to the underlying [Write] implementor
131 0 : pub fn inner_mut(&mut self) -> &mut T {
132 0 : &mut self.writer
133 0 : }
134 :
135 : /// Consume the wrapper and return the underlying [Write] implementor
136 0 : pub fn into_inner(self) -> T {
137 0 : self.writer
138 0 : }
139 : }
140 :
141 : impl<T: Write> Write for CountedWriter<'_, T> {
142 2 : fn write(&mut self, buf: &[u8]) -> Result<usize> {
143 2 : let count = self.writer.write(buf)?;
144 2 : (self.update_counter)(count);
145 2 : Ok(count)
146 2 : }
147 :
148 0 : fn flush(&mut self) -> Result<()> {
149 0 : self.writer.flush()
150 0 : }
151 : }
152 :
153 : #[cfg(test)]
154 : mod tests {
155 : use super::*;
156 :
157 : #[test]
158 1 : fn test_counted_reader() {
159 1 : let stream = [0; 16];
160 1 : let mut total = 0;
161 2 : let mut reader = CountedReader::new(stream.as_ref(), |cnt| {
162 2 : total += cnt;
163 2 : });
164 1 :
165 1 : let mut buffer = [0; 8];
166 1 : reader.read_exact(&mut buffer).unwrap();
167 1 : reader.read_exact(&mut buffer).unwrap();
168 1 :
169 1 : drop(reader);
170 1 : assert_eq!(total, stream.len());
171 1 : }
172 :
173 : #[test]
174 1 : fn test_counted_writer() {
175 1 : let mut stream = [0; 16];
176 1 : let mut total = 0;
177 2 : let mut writer = CountedWriter::new(stream.as_mut(), |cnt| {
178 2 : total += cnt;
179 2 : });
180 1 :
181 1 : let buffer = [0; 8];
182 1 : writer.write_all(&buffer).unwrap();
183 1 : writer.write_all(&buffer).unwrap();
184 1 :
185 1 : drop(writer);
186 1 : assert_eq!(total, stream.len());
187 1 : }
188 :
189 : // This mimics the constraints of std::thread::spawn
190 2 : fn assert_send_sync(_x: impl Sync + Send + 'static) {}
191 :
192 : #[test]
193 1 : fn test_send_sync_counted_reader() {
194 1 : let stream: &[u8] = &[];
195 1 : let mut reader = CountedReader::new(stream, |_| {});
196 1 :
197 1 : assert_send_sync(move || {
198 0 : reader.read_exact(&mut []).unwrap();
199 1 : });
200 1 : }
201 :
202 : #[test]
203 1 : fn test_send_sync_counted_writer() {
204 1 : let stream = Vec::<u8>::new();
205 1 : let mut writer = CountedWriter::new(stream, |_| {});
206 1 :
207 1 : assert_send_sync(move || {
208 0 : writer.write_all(&[]).unwrap();
209 1 : });
210 1 : }
211 : }
|