Line data Source code
1 : mod flush;
2 : use std::sync::Arc;
3 :
4 : pub(crate) use flush::FlushControl;
5 : use flush::FlushHandle;
6 : pub(crate) use flush::FlushTaskError;
7 : use tokio_epoll_uring::IoBuf;
8 : use tokio_util::sync::CancellationToken;
9 :
10 : use super::io_buf_aligned::IoBufAligned;
11 : use super::io_buf_ext::{FullSlice, IoBufExt};
12 : use crate::context::RequestContext;
13 : use crate::virtual_file::{IoBuffer, IoBufferMut};
14 :
15 : pub(crate) trait CheapCloneForRead {
16 : /// Returns a cheap clone of the buffer.
17 : fn cheap_clone(&self) -> Self;
18 : }
19 :
20 : impl CheapCloneForRead for IoBuffer {
21 39978 : fn cheap_clone(&self) -> Self {
22 39978 : // Cheap clone over an `Arc`.
23 39978 : self.clone()
24 39978 : }
25 : }
26 :
27 : /// A trait for doing owned-buffer write IO.
28 : /// Think [`tokio::io::AsyncWrite`] but with owned buffers.
29 : /// The owned buffers need to be aligned due to Direct IO requirements.
30 : pub trait OwnedAsyncWriter {
31 : fn write_all_at<Buf: IoBufAligned + Send>(
32 : &self,
33 : buf: FullSlice<Buf>,
34 : offset: u64,
35 : ctx: &RequestContext,
36 : ) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
37 : }
38 :
39 : /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
40 : /// small writes into larger writes of size [`Buffer::cap`].
41 : // TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
42 : // since we would avoid copying majority of the data into the internal buffer.
43 : pub struct BufferedWriter<B: Buffer, W> {
44 : writer: Arc<W>,
45 : /// Clone of the buffer that was last submitted to the flush loop.
46 : /// `None` if no flush request has been submitted, Some forever after.
47 : pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
48 : /// New writes are accumulated here.
49 : /// `None` only during submission while we wait for flush loop to accept
50 : /// the full dirty buffer in exchange for a clean buffer.
51 : /// If that exchange fails with an [`FlushTaskError`], the write path
52 : /// bails and leaves this as `None`.
53 : /// Subsequent writes will panic if attempted.
54 : /// The read path continues to work without error because [`Self::maybe_flushed`]
55 : /// and [`Self::bytes_submitted`] are advanced before the flush loop exchange starts,
56 : /// so, they will never try to read from [`Self::mutable`] anyway, because it's past
57 : /// the [`Self::maybe_flushed`] point.
58 : mutable: Option<B>,
59 : /// A handle to the background flush task for writting data to disk.
60 : flush_handle: FlushHandle<B::IoBuf, W>,
61 : /// The number of bytes submitted to the background task.
62 : bytes_submitted: u64,
63 : }
64 :
65 : impl<B, Buf, W> BufferedWriter<B, W>
66 : where
67 : B: Buffer<IoBuf = Buf> + Send + 'static,
68 : Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
69 : W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
70 : {
71 : /// Creates a new buffered writer.
72 : ///
73 : /// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
74 8010 : pub fn new(
75 8010 : writer: Arc<W>,
76 8010 : buf_new: impl Fn() -> B,
77 8010 : gate_guard: utils::sync::gate::GateGuard,
78 8010 : cancel: CancellationToken,
79 8010 : ctx: &RequestContext,
80 8010 : flush_task_span: tracing::Span,
81 8010 : ) -> Self {
82 8010 : Self {
83 8010 : writer: writer.clone(),
84 8010 : mutable: Some(buf_new()),
85 8010 : maybe_flushed: None,
86 8010 : flush_handle: FlushHandle::spawn_new(
87 8010 : writer,
88 8010 : buf_new(),
89 8010 : gate_guard,
90 8010 : cancel,
91 8010 : ctx.attached_child(),
92 8010 : flush_task_span,
93 8010 : ),
94 8010 : bytes_submitted: 0,
95 8010 : }
96 8010 : }
97 :
98 231111 : pub fn as_inner(&self) -> &W {
99 231111 : &self.writer
100 231111 : }
101 :
102 : /// Returns the number of bytes submitted to the background flush task.
103 3180237 : pub fn bytes_submitted(&self) -> u64 {
104 3180237 : self.bytes_submitted
105 3180237 : }
106 :
107 : /// Panics if used after any of the write paths returned an error
108 3180237 : pub fn inspect_mutable(&self) -> Option<&B> {
109 3180237 : self.mutable.as_ref()
110 3180237 : }
111 :
112 : /// Gets a reference to the maybe flushed read-only buffer.
113 : /// Returns `None` if the writer has not submitted any flush request.
114 3180261 : pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice<Buf>> {
115 3180261 : self.maybe_flushed.as_ref()
116 3180261 : }
117 :
118 : #[cfg_attr(target_os = "macos", allow(dead_code))]
119 54 : pub async fn flush_and_into_inner(
120 54 : mut self,
121 54 : ctx: &RequestContext,
122 54 : ) -> Result<(u64, Arc<W>), FlushTaskError> {
123 54 : self.flush(ctx).await?;
124 :
125 : let Self {
126 54 : mutable: buf,
127 54 : maybe_flushed: _,
128 54 : writer,
129 54 : mut flush_handle,
130 54 : bytes_submitted: bytes_amount,
131 54 : } = self;
132 54 : flush_handle.shutdown().await?;
133 54 : assert!(buf.is_some());
134 54 : Ok((bytes_amount, writer))
135 54 : }
136 :
137 : #[cfg(test)]
138 60 : pub(crate) fn mutable(&self) -> &B {
139 60 : self.mutable.as_ref().expect("must not use after an error")
140 60 : }
141 :
142 : #[cfg_attr(target_os = "macos", allow(dead_code))]
143 348 : pub async fn write_buffered_borrowed(
144 348 : &mut self,
145 348 : chunk: &[u8],
146 348 : ctx: &RequestContext,
147 348 : ) -> Result<usize, FlushTaskError> {
148 348 : let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?;
149 348 : if let Some(control) = control {
150 72 : control.release().await;
151 276 : }
152 348 : Ok(len)
153 348 : }
154 :
155 : /// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior.
156 28829712 : pub(crate) async fn write_buffered_borrowed_controlled(
157 28829712 : &mut self,
158 28829712 : mut chunk: &[u8],
159 28829712 : ctx: &RequestContext,
160 28829712 : ) -> Result<(usize, Option<FlushControl>), FlushTaskError> {
161 28829712 : let chunk_len = chunk.len();
162 28829712 : let mut control: Option<FlushControl> = None;
163 57699264 : while !chunk.is_empty() {
164 28869552 : let buf = self.mutable.as_mut().expect("must not use after an error");
165 28869552 : let need = buf.cap() - buf.pending();
166 28869552 : let have = chunk.len();
167 28869552 : let n = std::cmp::min(need, have);
168 28869552 : buf.extend_from_slice(&chunk[..n]);
169 28869552 : chunk = &chunk[n..];
170 28869552 : if buf.pending() >= buf.cap() {
171 39924 : assert_eq!(buf.pending(), buf.cap());
172 39924 : if let Some(control) = control.take() {
173 6612 : control.release().await;
174 33312 : }
175 39924 : control = self.flush(ctx).await?;
176 28829628 : }
177 : }
178 28829712 : Ok((chunk_len, control))
179 28829712 : }
180 :
181 : /// This function can only error if the flush task got cancelled.
182 : /// In that case, we leave [`Self::mutable`] intentionally as `None`.
183 : ///
184 : /// The read path continues to function correctly; it can read up to the
185 : /// point where it could read before, i.e., including what was in [`Self::mutable`]
186 : /// before the call to this function, because that's now stored in [`Self::maybe_flushed`].
187 : ///
188 : /// The write path becomes unavailable and will panic if used.
189 : /// The only correct solution to retry writes is to discard the entire [`BufferedWriter`],
190 : /// which upper layers of pageserver write path currently do not support.
191 : /// It is in fact quite hard to reason about what exactly happens in today's code.
192 : /// Best case we accumulate junk in the EphemeralFile, worst case is data corruption.
193 : #[must_use = "caller must explcitly check the flush control"]
194 39978 : async fn flush(
195 39978 : &mut self,
196 39978 : _ctx: &RequestContext,
197 39978 : ) -> Result<Option<FlushControl>, FlushTaskError> {
198 39978 : let buf = self.mutable.take().expect("must not use after an error");
199 39978 : let buf_len = buf.pending();
200 39978 : if buf_len == 0 {
201 0 : self.mutable = Some(buf);
202 0 : return Ok(None);
203 39978 : }
204 39978 : // Prepare the buffer for read while flushing.
205 39978 : let slice = buf.flush();
206 39978 : // NB: this assignment also drops thereference to the old buffer, allowing us to re-own & make it mutable below.
207 39978 : self.maybe_flushed = Some(slice.cheap_clone());
208 39978 : let offset = self.bytes_submitted;
209 39978 : self.bytes_submitted += u64::try_from(buf_len).unwrap();
210 :
211 : // If we return/panic here or later, we'll leave mutable = None, breaking further
212 : // writers, but the read path should still work.
213 39978 : let (recycled, flush_control) = self.flush_handle.flush(slice, offset).await?;
214 :
215 : // The only other place that could hold a reference to the recycled buffer
216 : // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
217 39978 : let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
218 39978 :
219 39978 : // We got back some recycled buffer, can open up for more writes again.
220 39978 : self.mutable = Some(recycled);
221 39978 :
222 39978 : Ok(Some(flush_control))
223 39978 : }
224 : }
225 :
226 : /// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
227 : pub trait Buffer {
228 : type IoBuf: IoBuf;
229 :
230 : /// Capacity of the buffer. Must not change over the lifetime `self`.`
231 : fn cap(&self) -> usize;
232 :
233 : /// Add data to the buffer.
234 : /// Panics if there is not enough room to accomodate `other`'s content, i.e.,
235 : /// panics if `other.len() > self.cap() - self.pending()`.
236 : fn extend_from_slice(&mut self, other: &[u8]);
237 :
238 : /// Number of bytes in the buffer.
239 : fn pending(&self) -> usize;
240 :
241 : /// Turns `self` into a [`FullSlice`] of the pending data
242 : /// so we can use [`tokio_epoll_uring`] to write it to disk.
243 : fn flush(self) -> FullSlice<Self::IoBuf>;
244 :
245 : /// After the write to disk is done and we have gotten back the slice,
246 : /// [`BufferedWriter`] uses this method to re-use the io buffer.
247 : fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
248 : }
249 :
250 : impl Buffer for IoBufferMut {
251 : type IoBuf = IoBuffer;
252 :
253 86648580 : fn cap(&self) -> usize {
254 86648580 : self.capacity()
255 86648580 : }
256 :
257 28869552 : fn extend_from_slice(&mut self, other: &[u8]) {
258 28869552 : if self.len() + other.len() > self.cap() {
259 0 : panic!("Buffer capacity exceeded");
260 28869552 : }
261 28869552 :
262 28869552 : IoBufferMut::extend_from_slice(self, other);
263 28869552 : }
264 :
265 60999243 : fn pending(&self) -> usize {
266 60999243 : self.len()
267 60999243 : }
268 :
269 47988 : fn flush(self) -> FullSlice<Self::IoBuf> {
270 47988 : self.freeze().slice_len()
271 47988 : }
272 :
273 : /// Caller should make sure that `iobuf` only have one strong reference before invoking this method.
274 39978 : fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
275 39978 : let mut recycled = iobuf
276 39978 : .into_mut()
277 39978 : .expect("buffer should only have one strong reference");
278 39978 : recycled.clear();
279 39978 : recycled
280 39978 : }
281 : }
282 :
283 : #[cfg(test)]
284 : mod tests {
285 : use std::sync::Mutex;
286 :
287 : use super::*;
288 : use crate::context::{DownloadBehavior, RequestContext};
289 : use crate::task_mgr::TaskKind;
290 :
291 : #[derive(Default, Debug)]
292 : struct RecorderWriter {
293 : /// record bytes and write offsets.
294 : writes: Mutex<Vec<(Vec<u8>, u64)>>,
295 : }
296 :
297 : impl RecorderWriter {
298 : /// Gets recorded bytes and write offsets.
299 12 : fn get_writes(&self) -> Vec<Vec<u8>> {
300 12 : self.writes
301 12 : .lock()
302 12 : .unwrap()
303 12 : .iter()
304 96 : .map(|(buf, _)| buf.clone())
305 12 : .collect()
306 12 : }
307 : }
308 :
309 : impl OwnedAsyncWriter for RecorderWriter {
310 96 : async fn write_all_at<Buf: IoBufAligned + Send>(
311 96 : &self,
312 96 : buf: FullSlice<Buf>,
313 96 : offset: u64,
314 96 : _: &RequestContext,
315 96 : ) -> (FullSlice<Buf>, std::io::Result<()>) {
316 96 : self.writes
317 96 : .lock()
318 96 : .unwrap()
319 96 : .push((Vec::from(&buf[..]), offset));
320 96 : (buf, Ok(()))
321 96 : }
322 : }
323 :
324 12 : fn test_ctx() -> RequestContext {
325 12 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
326 12 : }
327 :
328 : #[tokio::test]
329 12 : async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
330 12 : let ctx = test_ctx();
331 12 : let ctx = &ctx;
332 12 : let recorder = Arc::new(RecorderWriter::default());
333 12 : let gate = utils::sync::gate::Gate::default();
334 12 : let cancel = CancellationToken::new();
335 12 : let mut writer = BufferedWriter::<_, RecorderWriter>::new(
336 12 : recorder,
337 24 : || IoBufferMut::with_capacity(2),
338 12 : gate.enter()?,
339 12 : cancel,
340 12 : ctx,
341 12 : tracing::Span::none(),
342 12 : );
343 12 :
344 12 : writer.write_buffered_borrowed(b"abc", ctx).await?;
345 12 : writer.write_buffered_borrowed(b"", ctx).await?;
346 12 : writer.write_buffered_borrowed(b"d", ctx).await?;
347 12 : writer.write_buffered_borrowed(b"e", ctx).await?;
348 12 : writer.write_buffered_borrowed(b"fg", ctx).await?;
349 12 : writer.write_buffered_borrowed(b"hi", ctx).await?;
350 12 : writer.write_buffered_borrowed(b"j", ctx).await?;
351 12 : writer.write_buffered_borrowed(b"klmno", ctx).await?;
352 12 :
353 12 : let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
354 12 : assert_eq!(
355 12 : recorder.get_writes(),
356 12 : {
357 12 : let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
358 12 : expect
359 12 : }
360 12 : .iter()
361 96 : .map(|v| v[..].to_vec())
362 12 : .collect::<Vec<_>>()
363 12 : );
364 12 : Ok(())
365 12 : }
366 : }
|