LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io - write.rs (source / functions) Coverage Total Hit
Test: 13fa4b48c3603751d5b1568465c493b8925758a2.info Lines: 98.5 % 198 195
Test Date: 2025-03-19 18:46:26 Functions: 100.0 % 39 39

            Line data    Source code
       1              : mod flush;
       2              : use std::sync::Arc;
       3              : 
       4              : pub(crate) use flush::FlushControl;
       5              : use flush::FlushHandle;
       6              : pub(crate) use flush::FlushTaskError;
       7              : use tokio_epoll_uring::IoBuf;
       8              : use tokio_util::sync::CancellationToken;
       9              : 
      10              : use super::io_buf_aligned::IoBufAligned;
      11              : use super::io_buf_ext::{FullSlice, IoBufExt};
      12              : use crate::context::RequestContext;
      13              : use crate::virtual_file::{IoBuffer, IoBufferMut};
      14              : 
      15              : pub(crate) trait CheapCloneForRead {
      16              :     /// Returns a cheap clone of the buffer.
      17              :     fn cheap_clone(&self) -> Self;
      18              : }
      19              : 
      20              : impl CheapCloneForRead for IoBuffer {
      21        13246 :     fn cheap_clone(&self) -> Self {
      22        13246 :         // Cheap clone over an `Arc`.
      23        13246 :         self.clone()
      24        13246 :     }
      25              : }
      26              : 
      27              : /// A trait for doing owned-buffer write IO.
      28              : /// Think [`tokio::io::AsyncWrite`] but with owned buffers.
      29              : /// The owned buffers need to be aligned due to Direct IO requirements.
      30              : pub trait OwnedAsyncWriter {
      31              :     fn write_all_at<Buf: IoBufAligned + Send>(
      32              :         &self,
      33              :         buf: FullSlice<Buf>,
      34              :         offset: u64,
      35              :         ctx: &RequestContext,
      36              :     ) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
      37              : }
      38              : 
      39              : /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
      40              : /// small writes into larger writes of size [`Buffer::cap`].
      41              : // TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
      42              : // since we would avoid copying majority of the data into the internal buffer.
      43              : pub struct BufferedWriter<B: Buffer, W> {
      44              :     writer: Arc<W>,
      45              :     /// Clone of the buffer that was last submitted to the flush loop.
      46              :     /// `None` if no flush request has been submitted, Some forever after.
      47              :     pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
      48              :     /// New writes are accumulated here.
      49              :     /// `None` only during submission while we wait for flush loop to accept
      50              :     /// the full dirty buffer in exchange for a clean buffer.
      51              :     /// If that exchange fails with an [`FlushTaskError`], the write path
      52              :     /// bails and leaves this as `None`.
      53              :     /// Subsequent writes will panic if attempted.
      54              :     /// The read path continues to work without error because [`Self::maybe_flushed`]
      55              :     /// and [`Self::bytes_submitted`] are advanced before the flush loop exchange starts,
      56              :     /// so, they will never try to read from [`Self::mutable`] anyway, because it's past
      57              :     /// the [`Self::maybe_flushed`] point.
      58              :     mutable: Option<B>,
      59              :     /// A handle to the background flush task for writting data to disk.
      60              :     flush_handle: FlushHandle<B::IoBuf, W>,
      61              :     /// The number of bytes submitted to the background task.
      62              :     bytes_submitted: u64,
      63              : }
      64              : 
      65              : impl<B, Buf, W> BufferedWriter<B, W>
      66              : where
      67              :     B: Buffer<IoBuf = Buf> + Send + 'static,
      68              :     Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
      69              :     W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
      70              : {
      71              :     /// Creates a new buffered writer.
      72              :     ///
      73              :     /// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
      74         2650 :     pub fn new(
      75         2650 :         writer: Arc<W>,
      76         2650 :         buf_new: impl Fn() -> B,
      77         2650 :         gate_guard: utils::sync::gate::GateGuard,
      78         2650 :         cancel: CancellationToken,
      79         2650 :         ctx: &RequestContext,
      80         2650 :         flush_task_span: tracing::Span,
      81         2650 :     ) -> Self {
      82         2650 :         Self {
      83         2650 :             writer: writer.clone(),
      84         2650 :             mutable: Some(buf_new()),
      85         2650 :             maybe_flushed: None,
      86         2650 :             flush_handle: FlushHandle::spawn_new(
      87         2650 :                 writer,
      88         2650 :                 buf_new(),
      89         2650 :                 gate_guard,
      90         2650 :                 cancel,
      91         2650 :                 ctx.attached_child(),
      92         2650 :                 flush_task_span,
      93         2650 :             ),
      94         2650 :             bytes_submitted: 0,
      95         2650 :         }
      96         2650 :     }
      97              : 
      98        22700 :     pub fn as_inner(&self) -> &W {
      99        22700 :         &self.writer
     100        22700 :     }
     101              : 
     102              :     /// Returns the number of bytes submitted to the background flush task.
     103       997507 :     pub fn bytes_submitted(&self) -> u64 {
     104       997507 :         self.bytes_submitted
     105       997507 :     }
     106              : 
     107              :     /// Panics if used after any of the write paths returned an error
     108       997507 :     pub fn inspect_mutable(&self) -> Option<&B> {
     109       997507 :         self.mutable.as_ref()
     110       997507 :     }
     111              : 
     112              :     /// Gets a reference to the maybe flushed read-only buffer.
     113              :     /// Returns `None` if the writer has not submitted any flush request.
     114       997515 :     pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice<Buf>> {
     115       997515 :         self.maybe_flushed.as_ref()
     116       997515 :     }
     117              : 
     118              :     #[cfg_attr(target_os = "macos", allow(dead_code))]
     119           18 :     pub async fn flush_and_into_inner(
     120           18 :         mut self,
     121           18 :         ctx: &RequestContext,
     122           18 :     ) -> Result<(u64, Arc<W>), FlushTaskError> {
     123           18 :         self.flush(ctx).await?;
     124              : 
     125              :         let Self {
     126           18 :             mutable: buf,
     127           18 :             maybe_flushed: _,
     128           18 :             writer,
     129           18 :             mut flush_handle,
     130           18 :             bytes_submitted: bytes_amount,
     131           18 :         } = self;
     132           18 :         flush_handle.shutdown().await?;
     133           18 :         assert!(buf.is_some());
     134           18 :         Ok((bytes_amount, writer))
     135           18 :     }
     136              : 
     137              :     #[cfg(test)]
     138           20 :     pub(crate) fn mutable(&self) -> &B {
     139           20 :         self.mutable.as_ref().expect("must not use after an error")
     140           20 :     }
     141              : 
     142              :     #[cfg_attr(target_os = "macos", allow(dead_code))]
     143          116 :     pub async fn write_buffered_borrowed(
     144          116 :         &mut self,
     145          116 :         chunk: &[u8],
     146          116 :         ctx: &RequestContext,
     147          116 :     ) -> Result<usize, FlushTaskError> {
     148          116 :         let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?;
     149          116 :         if let Some(control) = control {
     150           24 :             control.release().await;
     151           92 :         }
     152          116 :         Ok(len)
     153          116 :     }
     154              : 
     155              :     /// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior.
     156      9609884 :     pub(crate) async fn write_buffered_borrowed_controlled(
     157      9609884 :         &mut self,
     158      9609884 :         mut chunk: &[u8],
     159      9609884 :         ctx: &RequestContext,
     160      9609884 :     ) -> Result<(usize, Option<FlushControl>), FlushTaskError> {
     161      9609884 :         let chunk_len = chunk.len();
     162      9609884 :         let mut control: Option<FlushControl> = None;
     163     19232968 :         while !chunk.is_empty() {
     164      9623084 :             let buf = self.mutable.as_mut().expect("must not use after an error");
     165      9623084 :             let need = buf.cap() - buf.pending();
     166      9623084 :             let have = chunk.len();
     167      9623084 :             let n = std::cmp::min(need, have);
     168      9623084 :             buf.extend_from_slice(&chunk[..n]);
     169      9623084 :             chunk = &chunk[n..];
     170      9623084 :             if buf.pending() >= buf.cap() {
     171        13228 :                 assert_eq!(buf.pending(), buf.cap());
     172        13228 :                 if let Some(control) = control.take() {
     173         2132 :                     control.release().await;
     174        11096 :                 }
     175        13228 :                 control = self.flush(ctx).await?;
     176      9609856 :             }
     177              :         }
     178      9609884 :         Ok((chunk_len, control))
     179      9609884 :     }
     180              : 
     181              :     /// This function can only error if the flush task got cancelled.
     182              :     /// In that case, we leave [`Self::mutable`] intentionally as `None`.
     183              :     ///
     184              :     /// The read path continues to function correctly; it can read up to the
     185              :     /// point where it could read before, i.e., including what was in [`Self::mutable`]
     186              :     /// before the call to this function, because that's now stored in [`Self::maybe_flushed`].
     187              :     ///
     188              :     /// The write path becomes unavailable and will panic if used.
     189              :     /// The only correct solution to retry writes is to discard the entire [`BufferedWriter`],
     190              :     /// which upper layers of pageserver write path currently do not support.
     191              :     /// It is in fact quite hard to reason about what exactly happens in today's code.
     192              :     /// Best case we accumulate junk in the EphemeralFile, worst case is data corruption.
     193              :     #[must_use = "caller must explcitly check the flush control"]
     194        13246 :     async fn flush(
     195        13246 :         &mut self,
     196        13246 :         _ctx: &RequestContext,
     197        13246 :     ) -> Result<Option<FlushControl>, FlushTaskError> {
     198        13246 :         let buf = self.mutable.take().expect("must not use after an error");
     199        13246 :         let buf_len = buf.pending();
     200        13246 :         if buf_len == 0 {
     201            0 :             self.mutable = Some(buf);
     202            0 :             return Ok(None);
     203        13246 :         }
     204        13246 :         // Prepare the buffer for read while flushing.
     205        13246 :         let slice = buf.flush();
     206        13246 :         // NB: this assignment also drops thereference to the old buffer, allowing us to re-own & make it mutable below.
     207        13246 :         self.maybe_flushed = Some(slice.cheap_clone());
     208        13246 :         let offset = self.bytes_submitted;
     209        13246 :         self.bytes_submitted += u64::try_from(buf_len).unwrap();
     210              : 
     211              :         // If we return/panic here or later, we'll leave mutable = None, breaking further
     212              :         // writers, but the read path should still work.
     213        13246 :         let (recycled, flush_control) = self.flush_handle.flush(slice, offset).await?;
     214              : 
     215              :         // The only other place that could hold a reference to the recycled buffer
     216              :         // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
     217        13246 :         let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
     218        13246 : 
     219        13246 :         // We got back some recycled buffer, can open up for more writes again.
     220        13246 :         self.mutable = Some(recycled);
     221        13246 : 
     222        13246 :         Ok(Some(flush_control))
     223        13246 :     }
     224              : }
     225              : 
     226              : /// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
     227              : pub trait Buffer {
     228              :     type IoBuf: IoBuf;
     229              : 
     230              :     /// Capacity of the buffer. Must not change over the lifetime `self`.`
     231              :     fn cap(&self) -> usize;
     232              : 
     233              :     /// Add data to the buffer.
     234              :     /// Panics if there is not enough room to accomodate `other`'s content, i.e.,
     235              :     /// panics if `other.len() > self.cap() - self.pending()`.
     236              :     fn extend_from_slice(&mut self, other: &[u8]);
     237              : 
     238              :     /// Number of bytes in the buffer.
     239              :     fn pending(&self) -> usize;
     240              : 
     241              :     /// Turns `self` into a [`FullSlice`] of the pending data
     242              :     /// so we can use [`tokio_epoll_uring`] to write it to disk.
     243              :     fn flush(self) -> FullSlice<Self::IoBuf>;
     244              : 
     245              :     /// After the write to disk is done and we have gotten back the slice,
     246              :     /// [`BufferedWriter`] uses this method to re-use the io buffer.
     247              :     fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
     248              : }
     249              : 
     250              : impl Buffer for IoBufferMut {
     251              :     type IoBuf = IoBuffer;
     252              : 
     253     28882480 :     fn cap(&self) -> usize {
     254     28882480 :         self.capacity()
     255     28882480 :     }
     256              : 
     257      9623084 :     fn extend_from_slice(&mut self, other: &[u8]) {
     258      9623084 :         if self.len() + other.len() > self.cap() {
     259            0 :             panic!("Buffer capacity exceeded");
     260      9623084 :         }
     261      9623084 : 
     262      9623084 :         IoBufferMut::extend_from_slice(self, other);
     263      9623084 :     }
     264              : 
     265     20270149 :     fn pending(&self) -> usize {
     266     20270149 :         self.len()
     267     20270149 :     }
     268              : 
     269        15896 :     fn flush(self) -> FullSlice<Self::IoBuf> {
     270        15896 :         self.freeze().slice_len()
     271        15896 :     }
     272              : 
     273              :     /// Caller should make sure that `iobuf` only have one strong reference before invoking this method.
     274        13246 :     fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
     275        13246 :         let mut recycled = iobuf
     276        13246 :             .into_mut()
     277        13246 :             .expect("buffer should only have one strong reference");
     278        13246 :         recycled.clear();
     279        13246 :         recycled
     280        13246 :     }
     281              : }
     282              : 
     283              : #[cfg(test)]
     284              : mod tests {
     285              :     use std::sync::Mutex;
     286              : 
     287              :     use super::*;
     288              :     use crate::context::{DownloadBehavior, RequestContext};
     289              :     use crate::task_mgr::TaskKind;
     290              : 
     291              :     #[derive(Default, Debug)]
     292              :     struct RecorderWriter {
     293              :         /// record bytes and write offsets.
     294              :         writes: Mutex<Vec<(Vec<u8>, u64)>>,
     295              :     }
     296              : 
     297              :     impl RecorderWriter {
     298              :         /// Gets recorded bytes and write offsets.
     299            4 :         fn get_writes(&self) -> Vec<Vec<u8>> {
     300            4 :             self.writes
     301            4 :                 .lock()
     302            4 :                 .unwrap()
     303            4 :                 .iter()
     304           32 :                 .map(|(buf, _)| buf.clone())
     305            4 :                 .collect()
     306            4 :         }
     307              :     }
     308              : 
     309              :     impl OwnedAsyncWriter for RecorderWriter {
     310           32 :         async fn write_all_at<Buf: IoBufAligned + Send>(
     311           32 :             &self,
     312           32 :             buf: FullSlice<Buf>,
     313           32 :             offset: u64,
     314           32 :             _: &RequestContext,
     315           32 :         ) -> (FullSlice<Buf>, std::io::Result<()>) {
     316           32 :             self.writes
     317           32 :                 .lock()
     318           32 :                 .unwrap()
     319           32 :                 .push((Vec::from(&buf[..]), offset));
     320           32 :             (buf, Ok(()))
     321           32 :         }
     322              :     }
     323              : 
     324            4 :     fn test_ctx() -> RequestContext {
     325            4 :         RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
     326            4 :     }
     327              : 
     328              :     #[tokio::test]
     329            4 :     async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
     330            4 :         let ctx = test_ctx();
     331            4 :         let ctx = &ctx;
     332            4 :         let recorder = Arc::new(RecorderWriter::default());
     333            4 :         let gate = utils::sync::gate::Gate::default();
     334            4 :         let cancel = CancellationToken::new();
     335            4 :         let mut writer = BufferedWriter::<_, RecorderWriter>::new(
     336            4 :             recorder,
     337            8 :             || IoBufferMut::with_capacity(2),
     338            4 :             gate.enter()?,
     339            4 :             cancel,
     340            4 :             ctx,
     341            4 :             tracing::Span::none(),
     342            4 :         );
     343            4 : 
     344            4 :         writer.write_buffered_borrowed(b"abc", ctx).await?;
     345            4 :         writer.write_buffered_borrowed(b"", ctx).await?;
     346            4 :         writer.write_buffered_borrowed(b"d", ctx).await?;
     347            4 :         writer.write_buffered_borrowed(b"e", ctx).await?;
     348            4 :         writer.write_buffered_borrowed(b"fg", ctx).await?;
     349            4 :         writer.write_buffered_borrowed(b"hi", ctx).await?;
     350            4 :         writer.write_buffered_borrowed(b"j", ctx).await?;
     351            4 :         writer.write_buffered_borrowed(b"klmno", ctx).await?;
     352            4 : 
     353            4 :         let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
     354            4 :         assert_eq!(
     355            4 :             recorder.get_writes(),
     356            4 :             {
     357            4 :                 let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
     358            4 :                 expect
     359            4 :             }
     360            4 :             .iter()
     361           32 :             .map(|v| v[..].to_vec())
     362            4 :             .collect::<Vec<_>>()
     363            4 :         );
     364            4 :         Ok(())
     365            4 :     }
     366              : }
        

Generated by: LCOV version 2.1-beta