LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/write - flush.rs (source / functions) Coverage Total Hit
Test: 91bf6c8f32e5e69adde6241313e732fdd6d6e277.info Lines: 87.1 % 155 135
Test Date: 2025-03-04 12:19:20 Functions: 80.6 % 36 29

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use utils::sync::duplex;
       4              : 
       5              : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
       6              : use crate::context::RequestContext;
       7              : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
       8              : use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
       9              : 
      10              : /// A handle to the flush task.
      11              : pub struct FlushHandle<Buf, W> {
      12              :     inner: Option<FlushHandleInner<Buf, W>>,
      13              :     /// Immutable buffer for serving tail reads.
      14              :     /// `None` if no flush request has been submitted.
      15              :     pub(super) maybe_flushed: Option<FullSlice<Buf>>,
      16              : }
      17              : 
      18              : pub struct FlushHandleInner<Buf, W> {
      19              :     /// A bi-directional channel that sends (buffer, offset) for writes,
      20              :     /// and receives recyled buffer.
      21              :     channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
      22              :     /// Join handle for the background flush task.
      23              :     join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
      24              : }
      25              : 
      26              : struct FlushRequest<Buf> {
      27              :     slice: FullSlice<Buf>,
      28              :     offset: u64,
      29              :     #[cfg(test)]
      30              :     ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
      31              :     #[cfg(test)]
      32              :     done_flush_tx: tokio::sync::oneshot::Sender<()>,
      33              : }
      34              : 
      35              : /// Constructs a request and a control object for a new flush operation.
      36              : #[cfg(not(test))]
      37            0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      38            0 :     let request = FlushRequest { slice, offset };
      39            0 :     let control = FlushControl::untracked();
      40            0 : 
      41            0 :     (request, control)
      42            0 : }
      43              : 
      44              : /// Constructs a request and a control object for a new flush operation.
      45              : #[cfg(test)]
      46        13246 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      47        13246 :     let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
      48        13246 :     let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
      49        13246 :     let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
      50        13246 : 
      51        13246 :     let request = FlushRequest {
      52        13246 :         slice,
      53        13246 :         offset,
      54        13246 :         ready_to_flush_rx,
      55        13246 :         done_flush_tx,
      56        13246 :     };
      57        13246 :     (request, control)
      58        13246 : }
      59              : 
      60              : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
      61              : #[cfg(test)]
      62              : pub(crate) struct FlushControl {
      63              :     not_started: FlushNotStarted,
      64              : }
      65              : 
      66              : #[cfg(not(test))]
      67              : pub(crate) struct FlushControl;
      68              : 
      69              : impl FlushControl {
      70              :     #[cfg(test)]
      71        13246 :     fn not_started(
      72        13246 :         ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
      73        13246 :         done_flush_rx: tokio::sync::oneshot::Receiver<()>,
      74        13246 :     ) -> Self {
      75        13246 :         FlushControl {
      76        13246 :             not_started: FlushNotStarted {
      77        13246 :                 ready_to_flush_tx,
      78        13246 :                 done_flush_rx,
      79        13246 :             },
      80        13246 :         }
      81        13246 :     }
      82              : 
      83              :     #[cfg(not(test))]
      84            0 :     fn untracked() -> Self {
      85            0 :         FlushControl
      86            0 :     }
      87              : 
      88              :     /// In tests, turn flush control into a not started state.
      89              :     #[cfg(test)]
      90            4 :     pub(crate) fn into_not_started(self) -> FlushNotStarted {
      91            4 :         self.not_started
      92            4 :     }
      93              : 
      94              :     /// Release control to the submitted buffer.
      95              :     ///
      96              :     /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
      97        13224 :     pub async fn release(self) {
      98        13224 :         #[cfg(test)]
      99        13224 :         {
     100        13224 :             self.not_started
     101        13224 :                 .ready_to_flush()
     102        13224 :                 .wait_until_flush_is_done()
     103        13224 :                 .await;
     104            0 :         }
     105        13224 :     }
     106              : }
     107              : 
     108              : impl<Buf, W> FlushHandle<Buf, W>
     109              : where
     110              :     Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
     111              :     W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
     112              : {
     113              :     /// Spawns a new background flush task and obtains a handle.
     114              :     ///
     115              :     /// Note: The background task so we do not need to explicitly maintain a queue of buffers.
     116         2650 :     pub fn spawn_new<B>(
     117         2650 :         file: Arc<W>,
     118         2650 :         buf: B,
     119         2650 :         gate_guard: utils::sync::gate::GateGuard,
     120         2650 :         ctx: RequestContext,
     121         2650 :     ) -> Self
     122         2650 :     where
     123         2650 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     124         2650 :     {
     125         2650 :         // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
     126         2650 :         let (front, back) = duplex::mpsc::channel(1);
     127         2650 : 
     128         2650 :         let join_handle = tokio::spawn(async move {
     129         2627 :             FlushBackgroundTask::new(back, file, gate_guard, ctx)
     130         2627 :                 .run(buf.flush())
     131         2627 :                 .await
     132         2650 :         });
     133         2650 : 
     134         2650 :         FlushHandle {
     135         2650 :             inner: Some(FlushHandleInner {
     136         2650 :                 channel: front,
     137         2650 :                 join_handle,
     138         2650 :             }),
     139         2650 :             maybe_flushed: None,
     140         2650 :         }
     141         2650 :     }
     142              : 
     143              :     /// Submits a buffer to be flushed in the background task.
     144              :     /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
     145              :     /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
     146              :     /// clear `maybe_flushed`.
     147        13246 :     pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
     148        13246 :     where
     149        13246 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     150        13246 :     {
     151        13246 :         let slice = buf.flush();
     152        13246 : 
     153        13246 :         // Saves a buffer for read while flushing. This also removes reference to the old buffer.
     154        13246 :         self.maybe_flushed = Some(slice.cheap_clone());
     155        13246 : 
     156        13246 :         let (request, flush_control) = new_flush_op(slice, offset);
     157              : 
     158              :         // Submits the buffer to the background task.
     159        13246 :         let submit = self.inner_mut().channel.send(request).await;
     160        13246 :         if submit.is_err() {
     161            0 :             return self.handle_error().await;
     162        13246 :         }
     163              : 
     164              :         // Wait for an available buffer from the background flush task.
     165              :         // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
     166              :         // then the write path will eventually wait for it here.
     167        13246 :         let Some(recycled) = self.inner_mut().channel.recv().await else {
     168            0 :             return self.handle_error().await;
     169              :         };
     170              : 
     171              :         // The only other place that could hold a reference to the recycled buffer
     172              :         // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
     173        13246 :         let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
     174        13246 :         Ok((recycled, flush_control))
     175        13246 :     }
     176              : 
     177            0 :     async fn handle_error<T>(&mut self) -> std::io::Result<T> {
     178            0 :         Err(self
     179            0 :             .shutdown()
     180            0 :             .await
     181            0 :             .expect_err("flush task only disconnects duplex if it exits with an error"))
     182            0 :     }
     183              : 
     184              :     /// Cleans up the channel, join the flush task.
     185           18 :     pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
     186           18 :         let handle = self
     187           18 :             .inner
     188           18 :             .take()
     189           18 :             .expect("must not use after we returned an error");
     190           18 :         drop(handle.channel.tx);
     191           18 :         handle.join_handle.await.unwrap()
     192           18 :     }
     193              : 
     194              :     /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
     195              :     /// This only happens if the handle is used after an error.
     196        26492 :     fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
     197        26492 :         self.inner
     198        26492 :             .as_mut()
     199        26492 :             .expect("must not use after we returned an error")
     200        26492 :     }
     201              : }
     202              : 
     203              : /// A background task for flushing data to disk.
     204              : pub struct FlushBackgroundTask<Buf, W> {
     205              :     /// A bi-directional channel that receives (buffer, offset) for writes,
     206              :     /// and send back recycled buffer.
     207              :     channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
     208              :     /// A writter for persisting data to disk.
     209              :     writer: Arc<W>,
     210              :     ctx: RequestContext,
     211              :     /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
     212              :     _gate_guard: utils::sync::gate::GateGuard,
     213              : }
     214              : 
     215              : impl<Buf, W> FlushBackgroundTask<Buf, W>
     216              : where
     217              :     Buf: IoBufAligned + Send + Sync,
     218              :     W: OwnedAsyncWriter + Sync + 'static,
     219              : {
     220              :     /// Creates a new background flush task.
     221         2627 :     fn new(
     222         2627 :         channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
     223         2627 :         file: Arc<W>,
     224         2627 :         gate_guard: utils::sync::gate::GateGuard,
     225         2627 :         ctx: RequestContext,
     226         2627 :     ) -> Self {
     227         2627 :         FlushBackgroundTask {
     228         2627 :             channel,
     229         2627 :             writer: file,
     230         2627 :             _gate_guard: gate_guard,
     231         2627 :             ctx,
     232         2627 :         }
     233         2627 :     }
     234              : 
     235              :     /// Runs the background flush task.
     236              :     /// The passed in slice is immediately sent back to the flush handle through the duplex channel.
     237         2627 :     async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
     238         2627 :         // Sends the extra buffer back to the handle.
     239         2627 :         self.channel.send(slice).await.map_err(|_| {
     240            0 :             std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
     241         2627 :         })?;
     242              : 
     243              :         //  Exit condition: channel is closed and there is no remaining buffer to be flushed
     244        15873 :         while let Some(request) = self.channel.recv().await {
     245              :             #[cfg(test)]
     246              :             {
     247              :                 // In test, wait for control to signal that we are ready to flush.
     248        13246 :                 if request.ready_to_flush_rx.await.is_err() {
     249           18 :                     tracing::debug!("control dropped");
     250        13228 :                 }
     251              :             }
     252              : 
     253              :             // Write slice to disk at `offset`.
     254        13246 :             let slice = self
     255        13246 :                 .writer
     256        13246 :                 .write_all_at(request.slice, request.offset, &self.ctx)
     257        13246 :                 .await?;
     258              : 
     259              :             #[cfg(test)]
     260              :             {
     261              :                 // In test, tell control we are done flushing buffer.
     262        13246 :                 if request.done_flush_tx.send(()).is_err() {
     263           18 :                     tracing::debug!("control dropped");
     264        13228 :                 }
     265              :             }
     266              : 
     267              :             // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
     268        13246 :             if self.channel.send(slice).await.is_err() {
     269              :                 // Although channel is closed. Still need to finish flushing the remaining buffers.
     270            0 :                 continue;
     271        13246 :             }
     272              :         }
     273              : 
     274         2321 :         Ok(self.writer)
     275         2321 :     }
     276              : }
     277              : 
     278              : #[cfg(test)]
     279              : pub(crate) struct FlushNotStarted {
     280              :     ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
     281              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     282              : }
     283              : 
     284              : #[cfg(test)]
     285              : pub(crate) struct FlushInProgress {
     286              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     287              : }
     288              : 
     289              : #[cfg(test)]
     290              : pub(crate) struct FlushDone;
     291              : 
     292              : #[cfg(test)]
     293              : impl FlushNotStarted {
     294              :     /// Signals the background task the buffer is ready to flush to disk.
     295        13228 :     pub fn ready_to_flush(self) -> FlushInProgress {
     296        13228 :         self.ready_to_flush_tx
     297        13228 :             .send(())
     298        13228 :             .map(|_| FlushInProgress {
     299        13228 :                 done_flush_rx: self.done_flush_rx,
     300        13228 :             })
     301        13228 :             .unwrap()
     302        13228 :     }
     303              : }
     304              : 
     305              : #[cfg(test)]
     306              : impl FlushInProgress {
     307              :     /// Waits until background flush is done.
     308        13228 :     pub async fn wait_until_flush_is_done(self) -> FlushDone {
     309        13228 :         self.done_flush_rx.await.unwrap();
     310        13228 :         FlushDone
     311        13228 :     }
     312              : }
        

Generated by: LCOV version 2.1-beta