LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io - write.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 98.0 % 256 251
Test Date: 2025-04-24 20:31:15 Functions: 100.0 % 52 52

            Line data    Source code
       1              : mod flush;
       2              : 
       3              : use bytes::BufMut;
       4              : pub(crate) use flush::FlushControl;
       5              : use flush::FlushHandle;
       6              : pub(crate) use flush::FlushTaskError;
       7              : use flush::ShutdownRequest;
       8              : use tokio_epoll_uring::IoBuf;
       9              : use tokio_util::sync::CancellationToken;
      10              : use tracing::trace;
      11              : 
      12              : use super::io_buf_aligned::IoBufAligned;
      13              : use super::io_buf_aligned::IoBufAlignedMut;
      14              : use super::io_buf_ext::{FullSlice, IoBufExt};
      15              : use crate::context::RequestContext;
      16              : use crate::virtual_file::UsizeIsU64;
      17              : use crate::virtual_file::{IoBuffer, IoBufferMut};
      18              : 
      19              : pub(crate) trait CheapCloneForRead {
      20              :     /// Returns a cheap clone of the buffer.
      21              :     fn cheap_clone(&self) -> Self;
      22              : }
      23              : 
      24              : impl CheapCloneForRead for IoBuffer {
      25       141648 :     fn cheap_clone(&self) -> Self {
      26       141648 :         // Cheap clone over an `Arc`.
      27       141648 :         self.clone()
      28       141648 :     }
      29              : }
      30              : 
      31              : /// A trait for doing owned-buffer write IO.
      32              : /// Think [`tokio::io::AsyncWrite`] but with owned buffers.
      33              : /// The owned buffers need to be aligned due to Direct IO requirements.
      34              : pub trait OwnedAsyncWriter {
      35              :     fn write_all_at<Buf: IoBufAligned + Send>(
      36              :         &self,
      37              :         buf: FullSlice<Buf>,
      38              :         offset: u64,
      39              :         ctx: &RequestContext,
      40              :     ) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
      41              :     fn set_len(
      42              :         &self,
      43              :         len: u64,
      44              :         ctx: &RequestContext,
      45              :     ) -> impl Future<Output = std::io::Result<()>> + Send;
      46              : }
      47              : 
      48              : /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
      49              : /// small writes into larger writes of size [`Buffer::cap`].
      50              : ///
      51              : /// The buffer is flushed if and only if it is full ([`Buffer::pending`] == [`Buffer::cap`]).
      52              : /// This guarantees that writes to the filesystem happen
      53              : /// - at offsets that are multiples of [`Buffer::cap`]
      54              : /// - in lengths that are multiples of [`Buffer::cap`]
      55              : ///
      56              : /// Above property is useful for Direct IO, where whatever the
      57              : /// effectively dominating disk-sector/filesystem-block/memory-page size
      58              : /// determines the requirements on
      59              : /// - the alignment of the pointer passed to the read/write operation
      60              : /// - the value of `count` (i.e., the length of the read/write operation)
      61              : ///   which must be a multiple of the dominating sector/block/page size.
      62              : ///
      63              : /// See [`BufferedWriter::shutdown`] / [`BufferedWriterShutdownMode`] for different
      64              : /// ways of dealing with the special case that the buffer is not full by the time
      65              : /// we are done writing.
      66              : ///
      67              : /// The first flush to the underlying `W` happens at offset `start_offset` (arg of [`BufferedWriter::new`]).
      68              : /// The next flush is to offset `start_offset + Buffer::cap`. The one after at `start_offset + 2 * Buffer::cap` and so on.
      69              : ///
      70              : /// TODO: decouple buffer capacity from alignment requirement.
      71              : /// Right now we assume [`Buffer::cap`] is the alignment requirement,
      72              : /// but actually [`Buffer::cap`] should only determine how often we flush
      73              : /// while writing, while a separate alignment requirement argument should
      74              : /// be passed to determine alignment requirement. This could be used by
      75              : /// [`BufferedWriterShutdownMode::PadThenTruncate`] to avoid excessive
      76              : /// padding of zeroes. For example, today, with a capacity of 64KiB, we
      77              : /// would pad up to 64KiB-1 bytes of zeroes, then truncate off 64KiB-1.
      78              : /// This is wasteful, e.g., if the alignment requirement is 4KiB, we only
      79              : /// need to pad & truncate up to 4KiB-1 bytes of zeroes
      80              : ///
      81              : // TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
      82              : // since we would avoid copying majority of the data into the internal buffer.
      83              : // https://github.com/neondatabase/neon/issues/10101
      84              : pub struct BufferedWriter<B: Buffer, W> {
      85              :     /// Clone of the buffer that was last submitted to the flush loop.
      86              :     /// `None` if no flush request has been submitted, Some forever after.
      87              :     pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
      88              :     /// New writes are accumulated here.
      89              :     /// `None` only during submission while we wait for flush loop to accept
      90              :     /// the full dirty buffer in exchange for a clean buffer.
      91              :     /// If that exchange fails with an [`FlushTaskError`], the write path
      92              :     /// bails and leaves this as `None`.
      93              :     /// Subsequent writes will panic if attempted.
      94              :     /// The read path continues to work without error because [`Self::maybe_flushed`]
      95              :     /// and [`Self::bytes_submitted`] are advanced before the flush loop exchange starts,
      96              :     /// so, they will never try to read from [`Self::mutable`] anyway, because it's past
      97              :     /// the [`Self::maybe_flushed`] point.
      98              :     mutable: Option<B>,
      99              :     /// A handle to the background flush task for writting data to disk.
     100              :     flush_handle: FlushHandle<B::IoBuf, W>,
     101              :     /// The number of bytes submitted to the background task.
     102              :     bytes_submitted: u64,
     103              : }
     104              : 
     105              : /// How [`BufferedWriter::shutdown`] should deal with pending (=not-yet-flushed) data.
     106              : ///
     107              : /// Cf the [`BufferedWriter`] comment's paragraph for context on why we need to think about this.
     108              : pub enum BufferedWriterShutdownMode {
     109              :     /// Drop pending data, don't write back to file.
     110              :     DropTail,
     111              :     /// Pad the pending data with zeroes (cf [`usize::next_multiple_of`]).
     112              :     ZeroPadToNextMultiple(usize),
     113              :     /// Fill the IO buffer with zeroes, flush to disk, the `ftruncate` the
     114              :     /// file to the exact number of bytes written to [`Self`].
     115              :     ///
     116              :     /// TODO: see in [`BufferedWriter`] comment about decoupling buffer capacity from alignment requirement.
     117              :     PadThenTruncate,
     118              : }
     119              : 
     120              : impl<B, Buf, W> BufferedWriter<B, W>
     121              : where
     122              :     B: IoBufAlignedMut + Buffer<IoBuf = Buf> + Send + 'static,
     123              :     Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
     124              :     W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
     125              : {
     126              :     /// Creates a new buffered writer.
     127              :     ///
     128              :     /// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
     129        20832 :     pub fn new(
     130        20832 :         writer: W,
     131        20832 :         start_offset: u64,
     132        20832 :         buf_new: impl Fn() -> B,
     133        20832 :         gate_guard: utils::sync::gate::GateGuard,
     134        20832 :         cancel: CancellationToken,
     135        20832 :         ctx: &RequestContext,
     136        20832 :         flush_task_span: tracing::Span,
     137        20832 :     ) -> Self {
     138        20832 :         Self {
     139        20832 :             mutable: Some(buf_new()),
     140        20832 :             maybe_flushed: None,
     141        20832 :             flush_handle: FlushHandle::spawn_new(
     142        20832 :                 writer,
     143        20832 :                 buf_new(),
     144        20832 :                 gate_guard,
     145        20832 :                 cancel,
     146        20832 :                 ctx.attached_child(),
     147        20832 :                 flush_task_span,
     148        20832 :             ),
     149        20832 :             bytes_submitted: start_offset,
     150        20832 :         }
     151        20832 :     }
     152              : 
     153              :     /// Returns the number of bytes submitted to the background flush task.
     154      3179205 :     pub fn bytes_submitted(&self) -> u64 {
     155      3179205 :         self.bytes_submitted
     156      3179205 :     }
     157              : 
     158              :     /// Panics if used after any of the write paths returned an error
     159      3179205 :     pub fn inspect_mutable(&self) -> Option<&B> {
     160      3179205 :         self.mutable.as_ref()
     161      3179205 :     }
     162              : 
     163              :     /// Gets a reference to the maybe flushed read-only buffer.
     164              :     /// Returns `None` if the writer has not submitted any flush request.
     165      3179229 :     pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice<Buf>> {
     166      3179229 :         self.maybe_flushed.as_ref()
     167      3179229 :     }
     168              : 
     169              :     #[cfg_attr(target_os = "macos", allow(dead_code))]
     170        11256 :     pub async fn shutdown(
     171        11256 :         mut self,
     172        11256 :         mode: BufferedWriterShutdownMode,
     173        11256 :         ctx: &RequestContext,
     174        11256 :     ) -> Result<(u64, W), FlushTaskError> {
     175        11256 :         let mut mutable = self.mutable.take().expect("must not use after an error");
     176        11256 :         let unpadded_pending = mutable.pending();
     177        11256 :         let final_len: u64;
     178        11256 :         let shutdown_req;
     179        11256 :         match mode {
     180              :             BufferedWriterShutdownMode::DropTail => {
     181           12 :                 trace!(pending=%mutable.pending(), "dropping pending data");
     182           12 :                 drop(mutable);
     183           12 : 
     184           12 :                 final_len = self.bytes_submitted;
     185           12 :                 shutdown_req = ShutdownRequest { set_len: None };
     186              :             }
     187        11136 :             BufferedWriterShutdownMode::ZeroPadToNextMultiple(next_multiple) => {
     188        11136 :                 let len = mutable.pending();
     189        11136 :                 let cap = mutable.cap();
     190        11136 :                 assert!(
     191        11136 :                     len <= cap,
     192            0 :                     "buffer impl ensures this, but let's check because the extend_with below would panic if we go beyond"
     193              :                 );
     194        11136 :                 let padded_len = len.next_multiple_of(next_multiple);
     195        11136 :                 assert!(
     196        11136 :                     padded_len <= cap,
     197            0 :                     "caller specified a multiple that is larger than the buffer capacity"
     198              :                 );
     199        11136 :                 let count = padded_len - len;
     200        11136 :                 mutable.extend_with(0, count);
     201        11136 :                 trace!(count, "padding with zeros");
     202        11136 :                 self.mutable = Some(mutable);
     203        11136 : 
     204        11136 :                 final_len = self.bytes_submitted + padded_len.into_u64();
     205        11136 :                 shutdown_req = ShutdownRequest { set_len: None };
     206              :             }
     207              :             BufferedWriterShutdownMode::PadThenTruncate => {
     208          108 :                 let len = mutable.pending();
     209          108 :                 let cap = mutable.cap();
     210          108 :                 // TODO: see struct comment TODO on decoupling buffer capacity from alignment requirement.
     211          108 :                 let alignment_requirement = cap;
     212          108 :                 assert!(len <= cap, "buffer impl should ensure this");
     213          108 :                 let padding_end_offset = len.next_multiple_of(alignment_requirement);
     214          108 :                 assert!(
     215          108 :                     padding_end_offset <= cap,
     216            0 :                     "{padding_end_offset} <= {cap}  ({alignment_requirement})"
     217              :                 );
     218          108 :                 let count = padding_end_offset - len;
     219          108 :                 mutable.extend_with(0, count);
     220          108 :                 trace!(count, "padding with zeros");
     221          108 :                 self.mutable = Some(mutable);
     222          108 : 
     223          108 :                 final_len = self.bytes_submitted + len.into_u64();
     224          108 :                 shutdown_req = ShutdownRequest {
     225          108 :                     // Avoid set_len call if we didn't need to pad anything.
     226          108 :                     set_len: if count > 0 { Some(final_len) } else { None },
     227              :                 };
     228              :             }
     229              :         };
     230        11256 :         let padded_pending = self.mutable.as_ref().map(|b| b.pending());
     231        11256 :         trace!(unpadded_pending, padded_pending, "padding done");
     232        11256 :         if self.mutable.is_some() {
     233        11244 :             self.flush(ctx).await?;
     234           12 :         }
     235              :         let Self {
     236              :             mutable: _,
     237              :             maybe_flushed: _,
     238        11256 :             mut flush_handle,
     239        11256 :             bytes_submitted: _,
     240        11256 :         } = self;
     241        11256 :         let writer = flush_handle.shutdown(shutdown_req).await?;
     242              : 
     243        11256 :         Ok((final_len, writer))
     244        11256 :     }
     245              : 
     246              :     #[cfg(test)]
     247           60 :     pub(crate) fn mutable(&self) -> &B {
     248           60 :         self.mutable.as_ref().expect("must not use after an error")
     249           60 :     }
     250              : 
     251              :     #[cfg_attr(target_os = "macos", allow(dead_code))]
     252     78571572 :     pub async fn write_buffered_borrowed(
     253     78571572 :         &mut self,
     254     78571572 :         chunk: &[u8],
     255     78571572 :         ctx: &RequestContext,
     256     78571572 :     ) -> Result<usize, FlushTaskError> {
     257     78571572 :         let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?;
     258     78571572 :         if let Some(control) = control {
     259        71028 :             control.release().await;
     260     78500544 :         }
     261     78571572 :         Ok(len)
     262     78571572 :     }
     263              : 
     264              :     /// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior.
     265    107400936 :     pub(crate) async fn write_buffered_borrowed_controlled(
     266    107400936 :         &mut self,
     267    107400936 :         mut chunk: &[u8],
     268    107400936 :         ctx: &RequestContext,
     269    107400936 :     ) -> Result<(usize, Option<FlushControl>), FlushTaskError> {
     270    107400936 :         let chunk_len = chunk.len();
     271    107400936 :         let mut control: Option<FlushControl> = None;
     272    214931760 :         while !chunk.is_empty() {
     273    107530824 :             let buf = self.mutable.as_mut().expect("must not use after an error");
     274    107530824 :             let need = buf.cap() - buf.pending();
     275    107530824 :             let have = chunk.len();
     276    107530824 :             let n = std::cmp::min(need, have);
     277    107530824 :             buf.extend_from_slice(&chunk[..n]);
     278    107530824 :             chunk = &chunk[n..];
     279    107530824 :             if buf.pending() >= buf.cap() {
     280       130416 :                 assert_eq!(buf.pending(), buf.cap());
     281       130416 :                 if let Some(control) = control.take() {
     282        26148 :                     control.release().await;
     283       104268 :                 }
     284       130416 :                 control = self.flush(ctx).await?;
     285    107400408 :             }
     286              :         }
     287    107400936 :         Ok((chunk_len, control))
     288    107400936 :     }
     289              : 
     290              :     /// This function can only error if the flush task got cancelled.
     291              :     /// In that case, we leave [`Self::mutable`] intentionally as `None`.
     292              :     ///
     293              :     /// The read path continues to function correctly; it can read up to the
     294              :     /// point where it could read before, i.e., including what was in [`Self::mutable`]
     295              :     /// before the call to this function, because that's now stored in [`Self::maybe_flushed`].
     296              :     ///
     297              :     /// The write path becomes unavailable and will panic if used.
     298              :     /// The only correct solution to retry writes is to discard the entire [`BufferedWriter`],
     299              :     /// which upper layers of pageserver write path currently do not support.
     300              :     /// It is in fact quite hard to reason about what exactly happens in today's code.
     301              :     /// Best case we accumulate junk in the EphemeralFile, worst case is data corruption.
     302              :     #[must_use = "caller must explcitly check the flush control"]
     303       141660 :     async fn flush(
     304       141660 :         &mut self,
     305       141660 :         _ctx: &RequestContext,
     306       141660 :     ) -> Result<Option<FlushControl>, FlushTaskError> {
     307       141660 :         let buf = self.mutable.take().expect("must not use after an error");
     308       141660 :         let buf_len = buf.pending();
     309       141660 :         if buf_len == 0 {
     310           12 :             self.mutable = Some(buf);
     311           12 :             return Ok(None);
     312       141648 :         }
     313       141648 :         // Prepare the buffer for read while flushing.
     314       141648 :         let slice = buf.flush();
     315       141648 :         // NB: this assignment also drops thereference to the old buffer, allowing us to re-own & make it mutable below.
     316       141648 :         self.maybe_flushed = Some(slice.cheap_clone());
     317       141648 :         let offset = self.bytes_submitted;
     318       141648 :         self.bytes_submitted += u64::try_from(buf_len).unwrap();
     319              : 
     320              :         // If we return/panic here or later, we'll leave mutable = None, breaking further
     321              :         // writers, but the read path should still work.
     322       141648 :         let (recycled, flush_control) = self.flush_handle.flush(slice, offset).await?;
     323              : 
     324              :         // The only other place that could hold a reference to the recycled buffer
     325              :         // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
     326       141648 :         let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
     327       141648 : 
     328       141648 :         // We got back some recycled buffer, can open up for more writes again.
     329       141648 :         self.mutable = Some(recycled);
     330       141648 : 
     331       141648 :         Ok(Some(flush_control))
     332       141660 :     }
     333              : }
     334              : 
     335              : /// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
     336              : pub trait Buffer {
     337              :     type IoBuf: IoBuf;
     338              : 
     339              :     /// Capacity of the buffer. Must not change over the lifetime `self`.`
     340              :     fn cap(&self) -> usize;
     341              : 
     342              :     /// Add data to the buffer.
     343              :     /// Panics if there is not enough room to accomodate `other`'s content, i.e.,
     344              :     /// panics if `other.len() > self.cap() - self.pending()`.
     345              :     fn extend_from_slice(&mut self, other: &[u8]);
     346              : 
     347              :     /// Add `count` bytes `val` into `self`.
     348              :     /// Panics if `count > self.cap() - self.pending()`.
     349              :     fn extend_with(&mut self, val: u8, count: usize);
     350              : 
     351              :     /// Number of bytes in the buffer.
     352              :     fn pending(&self) -> usize;
     353              : 
     354              :     /// Turns `self` into a [`FullSlice`] of the pending data
     355              :     /// so we can use [`tokio_epoll_uring`] to write it to disk.
     356              :     fn flush(self) -> FullSlice<Self::IoBuf>;
     357              : 
     358              :     /// After the write to disk is done and we have gotten back the slice,
     359              :     /// [`BufferedWriter`] uses this method to re-use the io buffer.
     360              :     fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
     361              : }
     362              : 
     363              : impl Buffer for IoBufferMut {
     364              :     type IoBuf = IoBuffer;
     365              : 
     366    322842332 :     fn cap(&self) -> usize {
     367    322842332 :         self.capacity()
     368    322842332 :     }
     369              : 
     370    107530824 :     fn extend_from_slice(&mut self, other: &[u8]) {
     371    107530824 :         if self.len() + other.len() > self.cap() {
     372            0 :             panic!("Buffer capacity exceeded");
     373    107530824 :         }
     374    107530824 : 
     375    107530824 :         IoBufferMut::extend_from_slice(self, other);
     376    107530824 :     }
     377              : 
     378       108200 :     fn extend_with(&mut self, val: u8, count: usize) {
     379       108200 :         if self.len() + count > self.cap() {
     380            0 :             panic!("Buffer capacity exceeded");
     381       108200 :         }
     382       108200 : 
     383       108200 :         IoBufferMut::put_bytes(self, val, count);
     384       108200 :     }
     385              : 
     386    218546673 :     fn pending(&self) -> usize {
     387    218546673 :         self.len()
     388    218546673 :     }
     389              : 
     390       162480 :     fn flush(self) -> FullSlice<Self::IoBuf> {
     391       162480 :         self.freeze().slice_len()
     392       162480 :     }
     393              : 
     394              :     /// Caller should make sure that `iobuf` only have one strong reference before invoking this method.
     395       141648 :     fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
     396       141648 :         let mut recycled = iobuf
     397       141648 :             .into_mut()
     398       141648 :             .expect("buffer should only have one strong reference");
     399       141648 :         recycled.clear();
     400       141648 :         recycled
     401       141648 :     }
     402              : }
     403              : 
     404              : #[cfg(test)]
     405              : mod tests {
     406              :     use std::sync::Mutex;
     407              : 
     408              :     use rstest::rstest;
     409              : 
     410              :     use super::*;
     411              :     use crate::context::{DownloadBehavior, RequestContext};
     412              :     use crate::task_mgr::TaskKind;
     413              : 
     414              :     #[derive(Debug, PartialEq, Eq)]
     415              :     enum Op {
     416              :         Write { buf: Vec<u8>, offset: u64 },
     417              :         SetLen { len: u64 },
     418              :     }
     419              : 
     420              :     #[derive(Default, Debug)]
     421              :     struct RecorderWriter {
     422              :         /// record bytes and write offsets.
     423              :         recording: Mutex<Vec<Op>>,
     424              :     }
     425              : 
     426              :     impl OwnedAsyncWriter for RecorderWriter {
     427          156 :         async fn write_all_at<Buf: IoBufAligned + Send>(
     428          156 :             &self,
     429          156 :             buf: FullSlice<Buf>,
     430          156 :             offset: u64,
     431          156 :             _: &RequestContext,
     432          156 :         ) -> (FullSlice<Buf>, std::io::Result<()>) {
     433          156 :             self.recording.lock().unwrap().push(Op::Write {
     434          156 :                 buf: Vec::from(&buf[..]),
     435          156 :                 offset,
     436          156 :             });
     437          156 :             (buf, Ok(()))
     438          156 :         }
     439           12 :         async fn set_len(&self, len: u64, _ctx: &RequestContext) -> std::io::Result<()> {
     440           12 :             self.recording.lock().unwrap().push(Op::SetLen { len });
     441           12 :             Ok(())
     442           12 :         }
     443              :     }
     444              : 
     445           48 :     fn test_ctx() -> RequestContext {
     446           48 :         RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
     447           48 :     }
     448              : 
     449           36 :     #[rstest]
     450              :     #[tokio::test]
     451              :     async fn test_write_all_borrowed_always_goes_through_buffer(
     452              :         #[values(
     453              :             BufferedWriterShutdownMode::DropTail,
     454              :             BufferedWriterShutdownMode::ZeroPadToNextMultiple(2),
     455              :             BufferedWriterShutdownMode::PadThenTruncate
     456              :         )]
     457              :         mode: BufferedWriterShutdownMode,
     458              :     ) -> anyhow::Result<()> {
     459              :         let ctx = test_ctx();
     460              :         let ctx = &ctx;
     461              :         let recorder = RecorderWriter::default();
     462              :         let gate = utils::sync::gate::Gate::default();
     463              :         let cancel = CancellationToken::new();
     464              :         let cap = 4;
     465              :         let mut writer = BufferedWriter::<_, RecorderWriter>::new(
     466              :             recorder,
     467              :             0,
     468           72 :             || IoBufferMut::with_capacity(cap),
     469              :             gate.enter()?,
     470              :             cancel,
     471              :             ctx,
     472              :             tracing::Span::none(),
     473              :         );
     474              : 
     475              :         writer.write_buffered_borrowed(b"abc", ctx).await?;
     476              :         writer.write_buffered_borrowed(b"", ctx).await?;
     477              :         writer.write_buffered_borrowed(b"d", ctx).await?;
     478              :         writer.write_buffered_borrowed(b"efg", ctx).await?;
     479              :         writer.write_buffered_borrowed(b"hijklm", ctx).await?;
     480              : 
     481              :         let mut expect = {
     482              :             [(0, b"abcd"), (4, b"efgh"), (8, b"ijkl")]
     483              :                 .into_iter()
     484          108 :                 .map(|(offset, v)| Op::Write {
     485          108 :                     offset,
     486          108 :                     buf: v[..].to_vec(),
     487          108 :                 })
     488              :                 .collect::<Vec<_>>()
     489              :         };
     490              :         let expect_next_offset = 12;
     491              : 
     492              :         match &mode {
     493              :             BufferedWriterShutdownMode::DropTail => (),
     494              :             // We test the case with padding to next multiple of 2 so that it's different
     495              :             // from the alignment requirement of 4 inferred from buffer capacity.
     496              :             // See TODOs in the `BufferedWriter` struct comment on decoupling buffer capacity from alignment requirement.
     497              :             BufferedWriterShutdownMode::ZeroPadToNextMultiple(2) => {
     498              :                 expect.push(Op::Write {
     499              :                     offset: expect_next_offset,
     500              :                     // it's legitimate for pad-to-next multiple 2 to be < alignment requirement 4 inferred from buffer capacity
     501              :                     buf: b"m\0".to_vec(),
     502              :                 });
     503              :             }
     504              :             BufferedWriterShutdownMode::ZeroPadToNextMultiple(_) => unimplemented!(),
     505              :             BufferedWriterShutdownMode::PadThenTruncate => {
     506              :                 expect.push(Op::Write {
     507              :                     offset: expect_next_offset,
     508              :                     buf: b"m\0\0\0".to_vec(),
     509              :                 });
     510              :                 expect.push(Op::SetLen { len: 13 });
     511              :             }
     512              :         }
     513              : 
     514              :         let (_, recorder) = writer.shutdown(mode, ctx).await?;
     515              :         assert_eq!(&*recorder.recording.lock().unwrap(), &expect);
     516              :         Ok(())
     517              :     }
     518              : 
     519              :     #[tokio::test]
     520           12 :     async fn test_set_len_is_skipped_if_not_needed() -> anyhow::Result<()> {
     521           12 :         let ctx = test_ctx();
     522           12 :         let ctx = &ctx;
     523           12 :         let recorder = RecorderWriter::default();
     524           12 :         let gate = utils::sync::gate::Gate::default();
     525           12 :         let cancel = CancellationToken::new();
     526           12 :         let cap = 4;
     527           12 :         let mut writer = BufferedWriter::<_, RecorderWriter>::new(
     528           12 :             recorder,
     529           12 :             0,
     530           24 :             || IoBufferMut::with_capacity(cap),
     531           12 :             gate.enter()?,
     532           12 :             cancel,
     533           12 :             ctx,
     534           12 :             tracing::Span::none(),
     535           12 :         );
     536           12 : 
     537           12 :         // write a multiple of `cap`
     538           12 :         writer.write_buffered_borrowed(b"abc", ctx).await?;
     539           12 :         writer.write_buffered_borrowed(b"defgh", ctx).await?;
     540           12 : 
     541           12 :         let (_, recorder) = writer
     542           12 :             .shutdown(BufferedWriterShutdownMode::PadThenTruncate, ctx)
     543           12 :             .await?;
     544           12 : 
     545           12 :         let expect = {
     546           12 :             [(0, b"abcd"), (4, b"efgh")]
     547           12 :                 .into_iter()
     548           24 :                 .map(|(offset, v)| Op::Write {
     549           24 :                     offset,
     550           24 :                     buf: v[..].to_vec(),
     551           24 :                 })
     552           12 :                 .collect::<Vec<_>>()
     553           12 :         };
     554           12 : 
     555           12 :         assert_eq!(
     556           12 :             &*recorder.recording.lock().unwrap(),
     557           12 :             &expect,
     558           12 :             "set_len should not be called if the buffer is already aligned"
     559           12 :         );
     560           12 : 
     561           12 :         Ok(())
     562           12 :     }
     563              : }
        

Generated by: LCOV version 2.1-beta