LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/write - flush.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 87.1 % 155 135
Test Date: 2025-02-20 13:11:02 Functions: 80.6 % 36 29

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use utils::sync::duplex;
       4              : 
       5              : use crate::{
       6              :     context::RequestContext,
       7              :     virtual_file::owned_buffers_io::{io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice},
       8              : };
       9              : 
      10              : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
      11              : 
      12              : /// A handle to the flush task.
      13              : pub struct FlushHandle<Buf, W> {
      14              :     inner: Option<FlushHandleInner<Buf, W>>,
      15              :     /// Immutable buffer for serving tail reads.
      16              :     /// `None` if no flush request has been submitted.
      17              :     pub(super) maybe_flushed: Option<FullSlice<Buf>>,
      18              : }
      19              : 
      20              : pub struct FlushHandleInner<Buf, W> {
      21              :     /// A bi-directional channel that sends (buffer, offset) for writes,
      22              :     /// and receives recyled buffer.
      23              :     channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
      24              :     /// Join handle for the background flush task.
      25              :     join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
      26              : }
      27              : 
      28              : struct FlushRequest<Buf> {
      29              :     slice: FullSlice<Buf>,
      30              :     offset: u64,
      31              :     #[cfg(test)]
      32              :     ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
      33              :     #[cfg(test)]
      34              :     done_flush_tx: tokio::sync::oneshot::Sender<()>,
      35              : }
      36              : 
      37              : /// Constructs a request and a control object for a new flush operation.
      38              : #[cfg(not(test))]
      39            0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      40            0 :     let request = FlushRequest { slice, offset };
      41            0 :     let control = FlushControl::untracked();
      42            0 : 
      43            0 :     (request, control)
      44            0 : }
      45              : 
      46              : /// Constructs a request and a control object for a new flush operation.
      47              : #[cfg(test)]
      48        13246 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      49        13246 :     let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
      50        13246 :     let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
      51        13246 :     let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
      52        13246 : 
      53        13246 :     let request = FlushRequest {
      54        13246 :         slice,
      55        13246 :         offset,
      56        13246 :         ready_to_flush_rx,
      57        13246 :         done_flush_tx,
      58        13246 :     };
      59        13246 :     (request, control)
      60        13246 : }
      61              : 
      62              : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
      63              : #[cfg(test)]
      64              : pub(crate) struct FlushControl {
      65              :     not_started: FlushNotStarted,
      66              : }
      67              : 
      68              : #[cfg(not(test))]
      69              : pub(crate) struct FlushControl;
      70              : 
      71              : impl FlushControl {
      72              :     #[cfg(test)]
      73        13246 :     fn not_started(
      74        13246 :         ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
      75        13246 :         done_flush_rx: tokio::sync::oneshot::Receiver<()>,
      76        13246 :     ) -> Self {
      77        13246 :         FlushControl {
      78        13246 :             not_started: FlushNotStarted {
      79        13246 :                 ready_to_flush_tx,
      80        13246 :                 done_flush_rx,
      81        13246 :             },
      82        13246 :         }
      83        13246 :     }
      84              : 
      85              :     #[cfg(not(test))]
      86            0 :     fn untracked() -> Self {
      87            0 :         FlushControl
      88            0 :     }
      89              : 
      90              :     /// In tests, turn flush control into a not started state.
      91              :     #[cfg(test)]
      92            4 :     pub(crate) fn into_not_started(self) -> FlushNotStarted {
      93            4 :         self.not_started
      94            4 :     }
      95              : 
      96              :     /// Release control to the submitted buffer.
      97              :     ///
      98              :     /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
      99        13224 :     pub async fn release(self) {
     100        13224 :         #[cfg(test)]
     101        13224 :         {
     102        13224 :             self.not_started
     103        13224 :                 .ready_to_flush()
     104        13224 :                 .wait_until_flush_is_done()
     105        13224 :                 .await;
     106            0 :         }
     107        13224 :     }
     108              : }
     109              : 
     110              : impl<Buf, W> FlushHandle<Buf, W>
     111              : where
     112              :     Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
     113              :     W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
     114              : {
     115              :     /// Spawns a new background flush task and obtains a handle.
     116              :     ///
     117              :     /// Note: The background task so we do not need to explicitly maintain a queue of buffers.
     118         2630 :     pub fn spawn_new<B>(
     119         2630 :         file: Arc<W>,
     120         2630 :         buf: B,
     121         2630 :         gate_guard: utils::sync::gate::GateGuard,
     122         2630 :         ctx: RequestContext,
     123         2630 :     ) -> Self
     124         2630 :     where
     125         2630 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     126         2630 :     {
     127         2630 :         // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
     128         2630 :         let (front, back) = duplex::mpsc::channel(1);
     129         2630 : 
     130         2630 :         let join_handle = tokio::spawn(async move {
     131         2615 :             FlushBackgroundTask::new(back, file, gate_guard, ctx)
     132         2615 :                 .run(buf.flush())
     133         2615 :                 .await
     134         2630 :         });
     135         2630 : 
     136         2630 :         FlushHandle {
     137         2630 :             inner: Some(FlushHandleInner {
     138         2630 :                 channel: front,
     139         2630 :                 join_handle,
     140         2630 :             }),
     141         2630 :             maybe_flushed: None,
     142         2630 :         }
     143         2630 :     }
     144              : 
     145              :     /// Submits a buffer to be flushed in the background task.
     146              :     /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
     147              :     /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
     148              :     /// clear `maybe_flushed`.
     149        13246 :     pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
     150        13246 :     where
     151        13246 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     152        13246 :     {
     153        13246 :         let slice = buf.flush();
     154        13246 : 
     155        13246 :         // Saves a buffer for read while flushing. This also removes reference to the old buffer.
     156        13246 :         self.maybe_flushed = Some(slice.cheap_clone());
     157        13246 : 
     158        13246 :         let (request, flush_control) = new_flush_op(slice, offset);
     159              : 
     160              :         // Submits the buffer to the background task.
     161        13246 :         let submit = self.inner_mut().channel.send(request).await;
     162        13246 :         if submit.is_err() {
     163            0 :             return self.handle_error().await;
     164        13246 :         }
     165              : 
     166              :         // Wait for an available buffer from the background flush task.
     167              :         // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
     168              :         // then the write path will eventually wait for it here.
     169        13246 :         let Some(recycled) = self.inner_mut().channel.recv().await else {
     170            0 :             return self.handle_error().await;
     171              :         };
     172              : 
     173              :         // The only other place that could hold a reference to the recycled buffer
     174              :         // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
     175        13246 :         let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
     176        13246 :         Ok((recycled, flush_control))
     177        13246 :     }
     178              : 
     179            0 :     async fn handle_error<T>(&mut self) -> std::io::Result<T> {
     180            0 :         Err(self
     181            0 :             .shutdown()
     182            0 :             .await
     183            0 :             .expect_err("flush task only disconnects duplex if it exits with an error"))
     184            0 :     }
     185              : 
     186              :     /// Cleans up the channel, join the flush task.
     187           18 :     pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
     188           18 :         let handle = self
     189           18 :             .inner
     190           18 :             .take()
     191           18 :             .expect("must not use after we returned an error");
     192           18 :         drop(handle.channel.tx);
     193           18 :         handle.join_handle.await.unwrap()
     194           18 :     }
     195              : 
     196              :     /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
     197              :     /// This only happens if the handle is used after an error.
     198        26492 :     fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
     199        26492 :         self.inner
     200        26492 :             .as_mut()
     201        26492 :             .expect("must not use after we returned an error")
     202        26492 :     }
     203              : }
     204              : 
     205              : /// A background task for flushing data to disk.
     206              : pub struct FlushBackgroundTask<Buf, W> {
     207              :     /// A bi-directional channel that receives (buffer, offset) for writes,
     208              :     /// and send back recycled buffer.
     209              :     channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
     210              :     /// A writter for persisting data to disk.
     211              :     writer: Arc<W>,
     212              :     ctx: RequestContext,
     213              :     /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
     214              :     _gate_guard: utils::sync::gate::GateGuard,
     215              : }
     216              : 
     217              : impl<Buf, W> FlushBackgroundTask<Buf, W>
     218              : where
     219              :     Buf: IoBufAligned + Send + Sync,
     220              :     W: OwnedAsyncWriter + Sync + 'static,
     221              : {
     222              :     /// Creates a new background flush task.
     223         2615 :     fn new(
     224         2615 :         channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
     225         2615 :         file: Arc<W>,
     226         2615 :         gate_guard: utils::sync::gate::GateGuard,
     227         2615 :         ctx: RequestContext,
     228         2615 :     ) -> Self {
     229         2615 :         FlushBackgroundTask {
     230         2615 :             channel,
     231         2615 :             writer: file,
     232         2615 :             _gate_guard: gate_guard,
     233         2615 :             ctx,
     234         2615 :         }
     235         2615 :     }
     236              : 
     237              :     /// Runs the background flush task.
     238              :     /// The passed in slice is immediately sent back to the flush handle through the duplex channel.
     239         2615 :     async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
     240         2615 :         // Sends the extra buffer back to the handle.
     241         2615 :         self.channel.send(slice).await.map_err(|_| {
     242            0 :             std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
     243         2615 :         })?;
     244              : 
     245              :         //  Exit condition: channel is closed and there is no remaining buffer to be flushed
     246        15861 :         while let Some(request) = self.channel.recv().await {
     247              :             #[cfg(test)]
     248              :             {
     249              :                 // In test, wait for control to signal that we are ready to flush.
     250        13246 :                 if request.ready_to_flush_rx.await.is_err() {
     251           18 :                     tracing::debug!("control dropped");
     252        13228 :                 }
     253              :             }
     254              : 
     255              :             // Write slice to disk at `offset`.
     256        13246 :             let slice = self
     257        13246 :                 .writer
     258        13246 :                 .write_all_at(request.slice, request.offset, &self.ctx)
     259        13246 :                 .await?;
     260              : 
     261              :             #[cfg(test)]
     262              :             {
     263              :                 // In test, tell control we are done flushing buffer.
     264        13246 :                 if request.done_flush_tx.send(()).is_err() {
     265           18 :                     tracing::debug!("control dropped");
     266        13228 :                 }
     267              :             }
     268              : 
     269              :             // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
     270        13246 :             if self.channel.send(slice).await.is_err() {
     271              :                 // Although channel is closed. Still need to finish flushing the remaining buffers.
     272            0 :                 continue;
     273        13246 :             }
     274              :         }
     275              : 
     276         2337 :         Ok(self.writer)
     277         2337 :     }
     278              : }
     279              : 
     280              : #[cfg(test)]
     281              : pub(crate) struct FlushNotStarted {
     282              :     ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
     283              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     284              : }
     285              : 
     286              : #[cfg(test)]
     287              : pub(crate) struct FlushInProgress {
     288              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     289              : }
     290              : 
     291              : #[cfg(test)]
     292              : pub(crate) struct FlushDone;
     293              : 
     294              : #[cfg(test)]
     295              : impl FlushNotStarted {
     296              :     /// Signals the background task the buffer is ready to flush to disk.
     297        13228 :     pub fn ready_to_flush(self) -> FlushInProgress {
     298        13228 :         self.ready_to_flush_tx
     299        13228 :             .send(())
     300        13228 :             .map(|_| FlushInProgress {
     301        13228 :                 done_flush_rx: self.done_flush_rx,
     302        13228 :             })
     303        13228 :             .unwrap()
     304        13228 :     }
     305              : }
     306              : 
     307              : #[cfg(test)]
     308              : impl FlushInProgress {
     309              :     /// Waits until background flush is done.
     310        13228 :     pub async fn wait_until_flush_is_done(self) -> FlushDone {
     311        13228 :         self.done_flush_rx.await.unwrap();
     312        13228 :         FlushDone
     313        13228 :     }
     314              : }
        

Generated by: LCOV version 2.1-beta