Line data Source code
1 : use bytes::BytesMut;
2 : use tokio_epoll_uring::IoBuf;
3 :
4 : use crate::context::RequestContext;
5 :
6 : use super::io_buf_ext::{FullSlice, IoBufExt};
7 :
8 : /// A trait for doing owned-buffer write IO.
9 : /// Think [`tokio::io::AsyncWrite`] but with owned buffers.
10 : pub trait OwnedAsyncWriter {
11 : async fn write_all<Buf: IoBuf + Send>(
12 : &mut self,
13 : buf: FullSlice<Buf>,
14 : ctx: &RequestContext,
15 : ) -> std::io::Result<(usize, FullSlice<Buf>)>;
16 : }
17 :
18 : /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
19 : /// small writes into larger writes of size [`Buffer::cap`].
20 : ///
21 : /// # Passthrough Of Large Writers
22 : ///
23 : /// Calls to [`BufferedWriter::write_buffered`] that are larger than [`Buffer::cap`]
24 : /// cause the internal buffer to be flushed prematurely so that the large
25 : /// buffered write is passed through to the underlying [`OwnedAsyncWriter`].
26 : ///
27 : /// This pass-through is generally beneficial for throughput, but if
28 : /// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
29 : /// unlimited large writes may cause latency or fairness issues.
30 : ///
31 : /// In such cases, a different implementation that always buffers in memory
32 : /// may be preferable.
33 : pub struct BufferedWriter<B, W> {
34 : writer: W,
35 : /// invariant: always remains Some(buf) except
36 : /// - while IO is ongoing => goes back to Some() once the IO completed successfully
37 : /// - after an IO error => stays `None` forever
38 : ///
39 : /// In these exceptional cases, it's `None`.
40 : buf: Option<B>,
41 : }
42 :
43 : impl<B, Buf, W> BufferedWriter<B, W>
44 : where
45 : B: Buffer<IoBuf = Buf> + Send,
46 : Buf: IoBuf + Send,
47 : W: OwnedAsyncWriter,
48 : {
49 1279 : pub fn new(writer: W, buf: B) -> Self {
50 1279 : Self {
51 1279 : writer,
52 1279 : buf: Some(buf),
53 1279 : }
54 1279 : }
55 :
56 10933747 : pub fn as_inner(&self) -> &W {
57 10933747 : &self.writer
58 10933747 : }
59 :
60 : /// Panics if used after any of the write paths returned an error
61 10730339 : pub fn inspect_buffer(&self) -> &B {
62 10730339 : self.buf()
63 10730339 : }
64 :
65 : #[cfg_attr(target_os = "macos", allow(dead_code))]
66 11 : pub async fn flush_and_into_inner(mut self, ctx: &RequestContext) -> std::io::Result<W> {
67 11 : self.flush(ctx).await?;
68 :
69 11 : let Self { buf, writer } = self;
70 11 : assert!(buf.is_some());
71 11 : Ok(writer)
72 11 : }
73 :
74 : #[inline(always)]
75 10730419 : fn buf(&self) -> &B {
76 10730419 : self.buf
77 10730419 : .as_ref()
78 10730419 : .expect("must not use after we returned an error")
79 10730419 : }
80 :
81 : #[cfg_attr(target_os = "macos", allow(dead_code))]
82 44 : pub async fn write_buffered<S: IoBuf + Send>(
83 44 : &mut self,
84 44 : chunk: FullSlice<S>,
85 44 : ctx: &RequestContext,
86 44 : ) -> std::io::Result<(usize, FullSlice<S>)> {
87 44 : let chunk = chunk.into_raw_slice();
88 44 :
89 44 : let chunk_len = chunk.len();
90 44 : // avoid memcpy for the middle of the chunk
91 44 : if chunk.len() >= self.buf().cap() {
92 8 : self.flush(ctx).await?;
93 : // do a big write, bypassing `buf`
94 8 : assert_eq!(
95 8 : self.buf
96 8 : .as_ref()
97 8 : .expect("must not use after an error")
98 8 : .pending(),
99 8 : 0
100 8 : );
101 8 : let (nwritten, chunk) = self
102 8 : .writer
103 8 : .write_all(FullSlice::must_new(chunk), ctx)
104 0 : .await?;
105 8 : assert_eq!(nwritten, chunk_len);
106 8 : return Ok((nwritten, chunk));
107 36 : }
108 36 : // in-memory copy the < BUFFER_SIZED tail of the chunk
109 36 : assert!(chunk.len() < self.buf().cap());
110 36 : let mut slice = &chunk[..];
111 70 : while !slice.is_empty() {
112 34 : let buf = self.buf.as_mut().expect("must not use after an error");
113 34 : let need = buf.cap() - buf.pending();
114 34 : let have = slice.len();
115 34 : let n = std::cmp::min(need, have);
116 34 : buf.extend_from_slice(&slice[..n]);
117 34 : slice = &slice[n..];
118 34 : if buf.pending() >= buf.cap() {
119 6 : assert_eq!(buf.pending(), buf.cap());
120 6 : self.flush(ctx).await?;
121 28 : }
122 : }
123 36 : assert!(slice.is_empty(), "by now we should have drained the chunk");
124 36 : Ok((chunk_len, FullSlice::must_new(chunk)))
125 44 : }
126 :
127 : /// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data.
128 : ///
129 : /// It is less performant because we always have to copy the borrowed data into the internal buffer
130 : /// before we can do the IO. The [`Self::write_buffered`] can avoid this, which is more performant
131 : /// for large writes.
132 10221646 : pub async fn write_buffered_borrowed(
133 10221646 : &mut self,
134 10221646 : mut chunk: &[u8],
135 10221646 : ctx: &RequestContext,
136 10221646 : ) -> std::io::Result<usize> {
137 10221646 : let chunk_len = chunk.len();
138 20449899 : while !chunk.is_empty() {
139 10228253 : let buf = self.buf.as_mut().expect("must not use after an error");
140 10228253 : let need = buf.cap() - buf.pending();
141 10228253 : let have = chunk.len();
142 10228253 : let n = std::cmp::min(need, have);
143 10228253 : buf.extend_from_slice(&chunk[..n]);
144 10228253 : chunk = &chunk[n..];
145 10228253 : if buf.pending() >= buf.cap() {
146 6624 : assert_eq!(buf.pending(), buf.cap());
147 6624 : self.flush(ctx).await?;
148 10221629 : }
149 : }
150 10221646 : Ok(chunk_len)
151 10221646 : }
152 :
153 6649 : async fn flush(&mut self, ctx: &RequestContext) -> std::io::Result<()> {
154 6649 : let buf = self.buf.take().expect("must not use after an error");
155 6649 : let buf_len = buf.pending();
156 6649 : if buf_len == 0 {
157 10 : self.buf = Some(buf);
158 10 : return Ok(());
159 6639 : }
160 6639 : let slice = buf.flush();
161 6639 : let (nwritten, slice) = self.writer.write_all(slice, ctx).await?;
162 6639 : assert_eq!(nwritten, buf_len);
163 6639 : self.buf = Some(Buffer::reuse_after_flush(
164 6639 : slice.into_raw_slice().into_inner(),
165 6639 : ));
166 6639 : Ok(())
167 6649 : }
168 : }
169 :
170 : /// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
171 : pub trait Buffer {
172 : type IoBuf: IoBuf;
173 :
174 : /// Capacity of the buffer. Must not change over the lifetime `self`.`
175 : fn cap(&self) -> usize;
176 :
177 : /// Add data to the buffer.
178 : /// Panics if there is not enough room to accomodate `other`'s content, i.e.,
179 : /// panics if `other.len() > self.cap() - self.pending()`.
180 : fn extend_from_slice(&mut self, other: &[u8]);
181 :
182 : /// Number of bytes in the buffer.
183 : fn pending(&self) -> usize;
184 :
185 : /// Turns `self` into a [`FullSlice`] of the pending data
186 : /// so we can use [`tokio_epoll_uring`] to write it to disk.
187 : fn flush(self) -> FullSlice<Self::IoBuf>;
188 :
189 : /// After the write to disk is done and we have gotten back the slice,
190 : /// [`BufferedWriter`] uses this method to re-use the io buffer.
191 : fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
192 : }
193 :
194 : impl Buffer for BytesMut {
195 : type IoBuf = BytesMut;
196 :
197 : #[inline(always)]
198 216 : fn cap(&self) -> usize {
199 216 : self.capacity()
200 216 : }
201 :
202 58 : fn extend_from_slice(&mut self, other: &[u8]) {
203 58 : BytesMut::extend_from_slice(self, other)
204 58 : }
205 :
206 : #[inline(always)]
207 183 : fn pending(&self) -> usize {
208 183 : self.len()
209 183 : }
210 :
211 29 : fn flush(self) -> FullSlice<BytesMut> {
212 29 : self.slice_len()
213 29 : }
214 :
215 29 : fn reuse_after_flush(mut iobuf: BytesMut) -> Self {
216 29 : iobuf.clear();
217 29 : iobuf
218 29 : }
219 : }
220 :
221 : impl OwnedAsyncWriter for Vec<u8> {
222 0 : async fn write_all<Buf: IoBuf + Send>(
223 0 : &mut self,
224 0 : buf: FullSlice<Buf>,
225 0 : _: &RequestContext,
226 0 : ) -> std::io::Result<(usize, FullSlice<Buf>)> {
227 0 : self.extend_from_slice(&buf[..]);
228 0 : Ok((buf.len(), buf))
229 0 : }
230 : }
231 :
232 : #[cfg(test)]
233 : mod tests {
234 : use bytes::BytesMut;
235 :
236 : use super::*;
237 : use crate::context::{DownloadBehavior, RequestContext};
238 : use crate::task_mgr::TaskKind;
239 :
240 : #[derive(Default)]
241 : struct RecorderWriter {
242 : writes: Vec<Vec<u8>>,
243 : }
244 : impl OwnedAsyncWriter for RecorderWriter {
245 34 : async fn write_all<Buf: IoBuf + Send>(
246 34 : &mut self,
247 34 : buf: FullSlice<Buf>,
248 34 : _: &RequestContext,
249 34 : ) -> std::io::Result<(usize, FullSlice<Buf>)> {
250 34 : self.writes.push(Vec::from(&buf[..]));
251 34 : Ok((buf.len(), buf))
252 34 : }
253 : }
254 :
255 34 : fn test_ctx() -> RequestContext {
256 34 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
257 34 : }
258 :
259 : macro_rules! write {
260 : ($writer:ident, $data:literal) => {{
261 : $writer
262 : .write_buffered(::bytes::Bytes::from_static($data).slice_len(), &test_ctx())
263 : .await?;
264 : }};
265 : }
266 :
267 : #[tokio::test]
268 2 : async fn test_buffered_writes_only() -> std::io::Result<()> {
269 2 : let recorder = RecorderWriter::default();
270 2 : let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
271 2 : write!(writer, b"a");
272 2 : write!(writer, b"b");
273 2 : write!(writer, b"c");
274 2 : write!(writer, b"d");
275 2 : write!(writer, b"e");
276 2 : let recorder = writer.flush_and_into_inner(&test_ctx()).await?;
277 2 : assert_eq!(
278 2 : recorder.writes,
279 2 : vec![Vec::from(b"ab"), Vec::from(b"cd"), Vec::from(b"e")]
280 2 : );
281 2 : Ok(())
282 2 : }
283 :
284 : #[tokio::test]
285 2 : async fn test_passthrough_writes_only() -> std::io::Result<()> {
286 2 : let recorder = RecorderWriter::default();
287 2 : let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
288 2 : write!(writer, b"abc");
289 2 : write!(writer, b"de");
290 2 : write!(writer, b"");
291 2 : write!(writer, b"fghijk");
292 2 : let recorder = writer.flush_and_into_inner(&test_ctx()).await?;
293 2 : assert_eq!(
294 2 : recorder.writes,
295 2 : vec![Vec::from(b"abc"), Vec::from(b"de"), Vec::from(b"fghijk")]
296 2 : );
297 2 : Ok(())
298 2 : }
299 :
300 : #[tokio::test]
301 2 : async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
302 2 : let recorder = RecorderWriter::default();
303 2 : let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
304 2 : write!(writer, b"a");
305 2 : write!(writer, b"bc");
306 2 : write!(writer, b"d");
307 2 : write!(writer, b"e");
308 2 : let recorder = writer.flush_and_into_inner(&test_ctx()).await?;
309 2 : assert_eq!(
310 2 : recorder.writes,
311 2 : vec![Vec::from(b"a"), Vec::from(b"bc"), Vec::from(b"de")]
312 2 : );
313 2 : Ok(())
314 2 : }
315 :
316 : #[tokio::test]
317 2 : async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> {
318 2 : let ctx = test_ctx();
319 2 : let ctx = &ctx;
320 2 : let recorder = RecorderWriter::default();
321 2 : let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
322 2 :
323 2 : writer.write_buffered_borrowed(b"abc", ctx).await?;
324 2 : writer.write_buffered_borrowed(b"d", ctx).await?;
325 2 : writer.write_buffered_borrowed(b"e", ctx).await?;
326 2 : writer.write_buffered_borrowed(b"fg", ctx).await?;
327 2 : writer.write_buffered_borrowed(b"hi", ctx).await?;
328 2 : writer.write_buffered_borrowed(b"j", ctx).await?;
329 2 : writer.write_buffered_borrowed(b"klmno", ctx).await?;
330 2 :
331 2 : let recorder = writer.flush_and_into_inner(ctx).await?;
332 2 : assert_eq!(
333 2 : recorder.writes,
334 2 : {
335 2 : let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
336 2 : expect
337 2 : }
338 2 : .iter()
339 16 : .map(|v| v[..].to_vec())
340 2 : .collect::<Vec<_>>()
341 2 : );
342 2 : Ok(())
343 2 : }
344 : }
|