Line data Source code
1 : mod flush;
2 : use std::sync::Arc;
3 :
4 : pub(crate) use flush::FlushControl;
5 : use flush::FlushHandle;
6 : pub(crate) use flush::FlushTaskError;
7 : use tokio_epoll_uring::IoBuf;
8 : use tokio_util::sync::CancellationToken;
9 :
10 : use super::io_buf_aligned::IoBufAligned;
11 : use super::io_buf_ext::{FullSlice, IoBufExt};
12 : use crate::context::RequestContext;
13 : use crate::virtual_file::{IoBuffer, IoBufferMut};
14 :
15 : pub(crate) trait CheapCloneForRead {
16 : /// Returns a cheap clone of the buffer.
17 : fn cheap_clone(&self) -> Self;
18 : }
19 :
20 : impl CheapCloneForRead for IoBuffer {
21 13246 : fn cheap_clone(&self) -> Self {
22 13246 : // Cheap clone over an `Arc`.
23 13246 : self.clone()
24 13246 : }
25 : }
26 :
27 : /// A trait for doing owned-buffer write IO.
28 : /// Think [`tokio::io::AsyncWrite`] but with owned buffers.
29 : /// The owned buffers need to be aligned due to Direct IO requirements.
30 : pub trait OwnedAsyncWriter {
31 : fn write_all_at<Buf: IoBufAligned + Send>(
32 : &self,
33 : buf: FullSlice<Buf>,
34 : offset: u64,
35 : ctx: &RequestContext,
36 : ) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
37 : }
38 :
39 : /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
40 : /// small writes into larger writes of size [`Buffer::cap`].
41 : // TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
42 : // since we would avoid copying majority of the data into the internal buffer.
43 : pub struct BufferedWriter<B: Buffer, W> {
44 : writer: Arc<W>,
45 : /// Clone of the buffer that was last submitted to the flush loop.
46 : /// `None` if no flush request has been submitted, Some forever after.
47 : pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
48 : /// New writes are accumulated here.
49 : /// `None` only during submission while we wait for flush loop to accept
50 : /// the full dirty buffer in exchange for a clean buffer.
51 : /// If that exchange fails with an [`FlushTaskError`], the write path
52 : /// bails and leaves this as `None`.
53 : /// Subsequent writes will panic if attempted.
54 : /// The read path continues to work without error because [`Self::maybe_flushed`]
55 : /// and [`Self::bytes_submitted`] are advanced before the flush loop exchange starts,
56 : /// so, they will never try to read from [`Self::mutable`] anyway, because it's past
57 : /// the [`Self::maybe_flushed`] point.
58 : mutable: Option<B>,
59 : /// A handle to the background flush task for writting data to disk.
60 : flush_handle: FlushHandle<B::IoBuf, W>,
61 : /// The number of bytes submitted to the background task.
62 : bytes_submitted: u64,
63 : }
64 :
65 : impl<B, Buf, W> BufferedWriter<B, W>
66 : where
67 : B: Buffer<IoBuf = Buf> + Send + 'static,
68 : Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
69 : W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
70 : {
71 : /// Creates a new buffered writer.
72 : ///
73 : /// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
74 2650 : pub fn new(
75 2650 : writer: Arc<W>,
76 2650 : buf_new: impl Fn() -> B,
77 2650 : gate_guard: utils::sync::gate::GateGuard,
78 2650 : cancel: CancellationToken,
79 2650 : ctx: &RequestContext,
80 2650 : flush_task_span: tracing::Span,
81 2650 : ) -> Self {
82 2650 : Self {
83 2650 : writer: writer.clone(),
84 2650 : mutable: Some(buf_new()),
85 2650 : maybe_flushed: None,
86 2650 : flush_handle: FlushHandle::spawn_new(
87 2650 : writer,
88 2650 : buf_new(),
89 2650 : gate_guard,
90 2650 : cancel,
91 2650 : ctx.attached_child(),
92 2650 : flush_task_span,
93 2650 : ),
94 2650 : bytes_submitted: 0,
95 2650 : }
96 2650 : }
97 :
98 22700 : pub fn as_inner(&self) -> &W {
99 22700 : &self.writer
100 22700 : }
101 :
102 : /// Returns the number of bytes submitted to the background flush task.
103 997507 : pub fn bytes_submitted(&self) -> u64 {
104 997507 : self.bytes_submitted
105 997507 : }
106 :
107 : /// Panics if used after any of the write paths returned an error
108 997507 : pub fn inspect_mutable(&self) -> Option<&B> {
109 997507 : self.mutable.as_ref()
110 997507 : }
111 :
112 : /// Gets a reference to the maybe flushed read-only buffer.
113 : /// Returns `None` if the writer has not submitted any flush request.
114 997515 : pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice<Buf>> {
115 997515 : self.maybe_flushed.as_ref()
116 997515 : }
117 :
118 : #[cfg_attr(target_os = "macos", allow(dead_code))]
119 18 : pub async fn flush_and_into_inner(
120 18 : mut self,
121 18 : ctx: &RequestContext,
122 18 : ) -> Result<(u64, Arc<W>), FlushTaskError> {
123 18 : self.flush(ctx).await?;
124 :
125 : let Self {
126 18 : mutable: buf,
127 18 : maybe_flushed: _,
128 18 : writer,
129 18 : mut flush_handle,
130 18 : bytes_submitted: bytes_amount,
131 18 : } = self;
132 18 : flush_handle.shutdown().await?;
133 18 : assert!(buf.is_some());
134 18 : Ok((bytes_amount, writer))
135 18 : }
136 :
137 : #[cfg(test)]
138 20 : pub(crate) fn mutable(&self) -> &B {
139 20 : self.mutable.as_ref().expect("must not use after an error")
140 20 : }
141 :
142 : #[cfg_attr(target_os = "macos", allow(dead_code))]
143 116 : pub async fn write_buffered_borrowed(
144 116 : &mut self,
145 116 : chunk: &[u8],
146 116 : ctx: &RequestContext,
147 116 : ) -> Result<usize, FlushTaskError> {
148 116 : let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?;
149 116 : if let Some(control) = control {
150 24 : control.release().await;
151 92 : }
152 116 : Ok(len)
153 116 : }
154 :
155 : /// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior.
156 9609884 : pub(crate) async fn write_buffered_borrowed_controlled(
157 9609884 : &mut self,
158 9609884 : mut chunk: &[u8],
159 9609884 : ctx: &RequestContext,
160 9609884 : ) -> Result<(usize, Option<FlushControl>), FlushTaskError> {
161 9609884 : let chunk_len = chunk.len();
162 9609884 : let mut control: Option<FlushControl> = None;
163 19232968 : while !chunk.is_empty() {
164 9623084 : let buf = self.mutable.as_mut().expect("must not use after an error");
165 9623084 : let need = buf.cap() - buf.pending();
166 9623084 : let have = chunk.len();
167 9623084 : let n = std::cmp::min(need, have);
168 9623084 : buf.extend_from_slice(&chunk[..n]);
169 9623084 : chunk = &chunk[n..];
170 9623084 : if buf.pending() >= buf.cap() {
171 13228 : assert_eq!(buf.pending(), buf.cap());
172 13228 : if let Some(control) = control.take() {
173 2132 : control.release().await;
174 11096 : }
175 13228 : control = self.flush(ctx).await?;
176 9609856 : }
177 : }
178 9609884 : Ok((chunk_len, control))
179 9609884 : }
180 :
181 : /// This function can only error if the flush task got cancelled.
182 : /// In that case, we leave [`Self::mutable`] intentionally as `None`.
183 : ///
184 : /// The read path continues to function correctly; it can read up to the
185 : /// point where it could read before, i.e., including what was in [`Self::mutable`]
186 : /// before the call to this function, because that's now stored in [`Self::maybe_flushed`].
187 : ///
188 : /// The write path becomes unavailable and will panic if used.
189 : /// The only correct solution to retry writes is to discard the entire [`BufferedWriter`],
190 : /// which upper layers of pageserver write path currently do not support.
191 : /// It is in fact quite hard to reason about what exactly happens in today's code.
192 : /// Best case we accumulate junk in the EphemeralFile, worst case is data corruption.
193 : #[must_use = "caller must explcitly check the flush control"]
194 13246 : async fn flush(
195 13246 : &mut self,
196 13246 : _ctx: &RequestContext,
197 13246 : ) -> Result<Option<FlushControl>, FlushTaskError> {
198 13246 : let buf = self.mutable.take().expect("must not use after an error");
199 13246 : let buf_len = buf.pending();
200 13246 : if buf_len == 0 {
201 0 : self.mutable = Some(buf);
202 0 : return Ok(None);
203 13246 : }
204 13246 : // Prepare the buffer for read while flushing.
205 13246 : let slice = buf.flush();
206 13246 : // NB: this assignment also drops thereference to the old buffer, allowing us to re-own & make it mutable below.
207 13246 : self.maybe_flushed = Some(slice.cheap_clone());
208 13246 : let offset = self.bytes_submitted;
209 13246 : self.bytes_submitted += u64::try_from(buf_len).unwrap();
210 :
211 : // If we return/panic here or later, we'll leave mutable = None, breaking further
212 : // writers, but the read path should still work.
213 13246 : let (recycled, flush_control) = self.flush_handle.flush(slice, offset).await?;
214 :
215 : // The only other place that could hold a reference to the recycled buffer
216 : // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
217 13246 : let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
218 13246 :
219 13246 : // We got back some recycled buffer, can open up for more writes again.
220 13246 : self.mutable = Some(recycled);
221 13246 :
222 13246 : Ok(Some(flush_control))
223 13246 : }
224 : }
225 :
226 : /// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
227 : pub trait Buffer {
228 : type IoBuf: IoBuf;
229 :
230 : /// Capacity of the buffer. Must not change over the lifetime `self`.`
231 : fn cap(&self) -> usize;
232 :
233 : /// Add data to the buffer.
234 : /// Panics if there is not enough room to accomodate `other`'s content, i.e.,
235 : /// panics if `other.len() > self.cap() - self.pending()`.
236 : fn extend_from_slice(&mut self, other: &[u8]);
237 :
238 : /// Number of bytes in the buffer.
239 : fn pending(&self) -> usize;
240 :
241 : /// Turns `self` into a [`FullSlice`] of the pending data
242 : /// so we can use [`tokio_epoll_uring`] to write it to disk.
243 : fn flush(self) -> FullSlice<Self::IoBuf>;
244 :
245 : /// After the write to disk is done and we have gotten back the slice,
246 : /// [`BufferedWriter`] uses this method to re-use the io buffer.
247 : fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
248 : }
249 :
250 : impl Buffer for IoBufferMut {
251 : type IoBuf = IoBuffer;
252 :
253 28882480 : fn cap(&self) -> usize {
254 28882480 : self.capacity()
255 28882480 : }
256 :
257 9623084 : fn extend_from_slice(&mut self, other: &[u8]) {
258 9623084 : if self.len() + other.len() > self.cap() {
259 0 : panic!("Buffer capacity exceeded");
260 9623084 : }
261 9623084 :
262 9623084 : IoBufferMut::extend_from_slice(self, other);
263 9623084 : }
264 :
265 20270149 : fn pending(&self) -> usize {
266 20270149 : self.len()
267 20270149 : }
268 :
269 15896 : fn flush(self) -> FullSlice<Self::IoBuf> {
270 15896 : self.freeze().slice_len()
271 15896 : }
272 :
273 : /// Caller should make sure that `iobuf` only have one strong reference before invoking this method.
274 13246 : fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
275 13246 : let mut recycled = iobuf
276 13246 : .into_mut()
277 13246 : .expect("buffer should only have one strong reference");
278 13246 : recycled.clear();
279 13246 : recycled
280 13246 : }
281 : }
282 :
283 : #[cfg(test)]
284 : mod tests {
285 : use std::sync::Mutex;
286 :
287 : use super::*;
288 : use crate::context::{DownloadBehavior, RequestContext};
289 : use crate::task_mgr::TaskKind;
290 :
291 : #[derive(Default, Debug)]
292 : struct RecorderWriter {
293 : /// record bytes and write offsets.
294 : writes: Mutex<Vec<(Vec<u8>, u64)>>,
295 : }
296 :
297 : impl RecorderWriter {
298 : /// Gets recorded bytes and write offsets.
299 4 : fn get_writes(&self) -> Vec<Vec<u8>> {
300 4 : self.writes
301 4 : .lock()
302 4 : .unwrap()
303 4 : .iter()
304 32 : .map(|(buf, _)| buf.clone())
305 4 : .collect()
306 4 : }
307 : }
308 :
309 : impl OwnedAsyncWriter for RecorderWriter {
310 32 : async fn write_all_at<Buf: IoBufAligned + Send>(
311 32 : &self,
312 32 : buf: FullSlice<Buf>,
313 32 : offset: u64,
314 32 : _: &RequestContext,
315 32 : ) -> (FullSlice<Buf>, std::io::Result<()>) {
316 32 : self.writes
317 32 : .lock()
318 32 : .unwrap()
319 32 : .push((Vec::from(&buf[..]), offset));
320 32 : (buf, Ok(()))
321 32 : }
322 : }
323 :
324 4 : fn test_ctx() -> RequestContext {
325 4 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
326 4 : }
327 :
328 : #[tokio::test]
329 4 : async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
330 4 : let ctx = test_ctx();
331 4 : let ctx = &ctx;
332 4 : let recorder = Arc::new(RecorderWriter::default());
333 4 : let gate = utils::sync::gate::Gate::default();
334 4 : let cancel = CancellationToken::new();
335 4 : let mut writer = BufferedWriter::<_, RecorderWriter>::new(
336 4 : recorder,
337 8 : || IoBufferMut::with_capacity(2),
338 4 : gate.enter()?,
339 4 : cancel,
340 4 : ctx,
341 4 : tracing::Span::none(),
342 4 : );
343 4 :
344 4 : writer.write_buffered_borrowed(b"abc", ctx).await?;
345 4 : writer.write_buffered_borrowed(b"", ctx).await?;
346 4 : writer.write_buffered_borrowed(b"d", ctx).await?;
347 4 : writer.write_buffered_borrowed(b"e", ctx).await?;
348 4 : writer.write_buffered_borrowed(b"fg", ctx).await?;
349 4 : writer.write_buffered_borrowed(b"hi", ctx).await?;
350 4 : writer.write_buffered_borrowed(b"j", ctx).await?;
351 4 : writer.write_buffered_borrowed(b"klmno", ctx).await?;
352 4 :
353 4 : let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
354 4 : assert_eq!(
355 4 : recorder.get_writes(),
356 4 : {
357 4 : let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
358 4 : expect
359 4 : }
360 4 : .iter()
361 32 : .map(|v| v[..].to_vec())
362 4 : .collect::<Vec<_>>()
363 4 : );
364 4 : Ok(())
365 4 : }
366 : }
|