LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/write - flush.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 87.1 % 155 135
Test Date: 2025-01-07 20:58:07 Functions: 80.6 % 36 29

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use utils::sync::duplex;
       4              : 
       5              : use crate::{
       6              :     context::RequestContext,
       7              :     virtual_file::owned_buffers_io::{io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice},
       8              : };
       9              : 
      10              : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
      11              : 
      12              : /// A handle to the flush task.
      13              : pub struct FlushHandle<Buf, W> {
      14              :     inner: Option<FlushHandleInner<Buf, W>>,
      15              :     /// Immutable buffer for serving tail reads.
      16              :     /// `None` if no flush request has been submitted.
      17              :     pub(super) maybe_flushed: Option<FullSlice<Buf>>,
      18              : }
      19              : 
      20              : pub struct FlushHandleInner<Buf, W> {
      21              :     /// A bi-directional channel that sends (buffer, offset) for writes,
      22              :     /// and receives recyled buffer.
      23              :     channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
      24              :     /// Join handle for the background flush task.
      25              :     join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
      26              : }
      27              : 
      28              : struct FlushRequest<Buf> {
      29              :     slice: FullSlice<Buf>,
      30              :     offset: u64,
      31              :     #[cfg(test)]
      32              :     ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
      33              :     #[cfg(test)]
      34              :     done_flush_tx: tokio::sync::oneshot::Sender<()>,
      35              : }
      36              : 
      37              : /// Constructs a request and a control object for a new flush operation.
      38              : #[cfg(not(test))]
      39            0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      40            0 :     let request = FlushRequest { slice, offset };
      41            0 :     let control = FlushControl::untracked();
      42            0 : 
      43            0 :     (request, control)
      44            0 : }
      45              : 
      46              : /// Constructs a request and a control object for a new flush operation.
      47              : #[cfg(test)]
      48         6619 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      49         6619 :     let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
      50         6619 :     let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
      51         6619 :     let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
      52         6619 : 
      53         6619 :     let request = FlushRequest {
      54         6619 :         slice,
      55         6619 :         offset,
      56         6619 :         ready_to_flush_rx,
      57         6619 :         done_flush_tx,
      58         6619 :     };
      59         6619 :     (request, control)
      60         6619 : }
      61              : 
      62              : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
      63              : #[cfg(test)]
      64              : pub(crate) struct FlushControl {
      65              :     not_started: FlushNotStarted,
      66              : }
      67              : 
      68              : #[cfg(not(test))]
      69              : pub(crate) struct FlushControl;
      70              : 
      71              : impl FlushControl {
      72              :     #[cfg(test)]
      73         6619 :     fn not_started(
      74         6619 :         ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
      75         6619 :         done_flush_rx: tokio::sync::oneshot::Receiver<()>,
      76         6619 :     ) -> Self {
      77         6619 :         FlushControl {
      78         6619 :             not_started: FlushNotStarted {
      79         6619 :                 ready_to_flush_tx,
      80         6619 :                 done_flush_rx,
      81         6619 :             },
      82         6619 :         }
      83         6619 :     }
      84              : 
      85              :     #[cfg(not(test))]
      86            0 :     fn untracked() -> Self {
      87            0 :         FlushControl
      88            0 :     }
      89              : 
      90              :     /// In tests, turn flush control into a not started state.
      91              :     #[cfg(test)]
      92            2 :     pub(crate) fn into_not_started(self) -> FlushNotStarted {
      93            2 :         self.not_started
      94            2 :     }
      95              : 
      96              :     /// Release control to the submitted buffer.
      97              :     ///
      98              :     /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
      99         6612 :     pub async fn release(self) {
     100         6612 :         #[cfg(test)]
     101         6612 :         {
     102         6612 :             self.not_started
     103         6612 :                 .ready_to_flush()
     104         6612 :                 .wait_until_flush_is_done()
     105         6612 :                 .await;
     106            0 :         }
     107         6612 :     }
     108              : }
     109              : 
     110              : impl<Buf, W> FlushHandle<Buf, W>
     111              : where
     112              :     Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
     113              :     W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
     114              : {
     115              :     /// Spawns a new background flush task and obtains a handle.
     116              :     ///
     117              :     /// Note: The background task so we do not need to explicitly maintain a queue of buffers.
     118         1285 :     pub fn spawn_new<B>(
     119         1285 :         file: Arc<W>,
     120         1285 :         buf: B,
     121         1285 :         gate_guard: utils::sync::gate::GateGuard,
     122         1285 :         ctx: RequestContext,
     123         1285 :     ) -> Self
     124         1285 :     where
     125         1285 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     126         1285 :     {
     127         1285 :         // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
     128         1285 :         let (front, back) = duplex::mpsc::channel(1);
     129         1285 : 
     130         1285 :         let join_handle = tokio::spawn(async move {
     131         1277 :             FlushBackgroundTask::new(back, file, gate_guard, ctx)
     132         1277 :                 .run(buf.flush())
     133         1277 :                 .await
     134         1285 :         });
     135         1285 : 
     136         1285 :         FlushHandle {
     137         1285 :             inner: Some(FlushHandleInner {
     138         1285 :                 channel: front,
     139         1285 :                 join_handle,
     140         1285 :             }),
     141         1285 :             maybe_flushed: None,
     142         1285 :         }
     143         1285 :     }
     144              : 
     145              :     /// Submits a buffer to be flushed in the background task.
     146              :     /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
     147              :     /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
     148              :     /// clear `maybe_flushed`.
     149         6619 :     pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
     150         6619 :     where
     151         6619 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     152         6619 :     {
     153         6619 :         let slice = buf.flush();
     154         6619 : 
     155         6619 :         // Saves a buffer for read while flushing. This also removes reference to the old buffer.
     156         6619 :         self.maybe_flushed = Some(slice.cheap_clone());
     157         6619 : 
     158         6619 :         let (request, flush_control) = new_flush_op(slice, offset);
     159              : 
     160              :         // Submits the buffer to the background task.
     161         6619 :         let submit = self.inner_mut().channel.send(request).await;
     162         6619 :         if submit.is_err() {
     163            0 :             return self.handle_error().await;
     164         6619 :         }
     165              : 
     166              :         // Wait for an available buffer from the background flush task.
     167              :         // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
     168              :         // then the write path will eventually wait for it here.
     169         6619 :         let Some(recycled) = self.inner_mut().channel.recv().await else {
     170            0 :             return self.handle_error().await;
     171              :         };
     172              : 
     173              :         // The only other place that could hold a reference to the recycled buffer
     174              :         // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
     175         6619 :         let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
     176         6619 :         Ok((recycled, flush_control))
     177         6619 :     }
     178              : 
     179            0 :     async fn handle_error<T>(&mut self) -> std::io::Result<T> {
     180            0 :         Err(self
     181            0 :             .shutdown()
     182            0 :             .await
     183            0 :             .expect_err("flush task only disconnects duplex if it exits with an error"))
     184            0 :     }
     185              : 
     186              :     /// Cleans up the channel, join the flush task.
     187            5 :     pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
     188            5 :         let handle = self
     189            5 :             .inner
     190            5 :             .take()
     191            5 :             .expect("must not use after we returned an error");
     192            5 :         drop(handle.channel.tx);
     193            5 :         handle.join_handle.await.unwrap()
     194            5 :     }
     195              : 
     196              :     /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
     197              :     /// This only happens if the handle is used after an error.
     198        13238 :     fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
     199        13238 :         self.inner
     200        13238 :             .as_mut()
     201        13238 :             .expect("must not use after we returned an error")
     202        13238 :     }
     203              : }
     204              : 
     205              : /// A background task for flushing data to disk.
     206              : pub struct FlushBackgroundTask<Buf, W> {
     207              :     /// A bi-directional channel that receives (buffer, offset) for writes,
     208              :     /// and send back recycled buffer.
     209              :     channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
     210              :     /// A writter for persisting data to disk.
     211              :     writer: Arc<W>,
     212              :     ctx: RequestContext,
     213              :     /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
     214              :     _gate_guard: utils::sync::gate::GateGuard,
     215              : }
     216              : 
     217              : impl<Buf, W> FlushBackgroundTask<Buf, W>
     218              : where
     219              :     Buf: IoBufAligned + Send + Sync,
     220              :     W: OwnedAsyncWriter + Sync + 'static,
     221              : {
     222              :     /// Creates a new background flush task.
     223         1277 :     fn new(
     224         1277 :         channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
     225         1277 :         file: Arc<W>,
     226         1277 :         gate_guard: utils::sync::gate::GateGuard,
     227         1277 :         ctx: RequestContext,
     228         1277 :     ) -> Self {
     229         1277 :         FlushBackgroundTask {
     230         1277 :             channel,
     231         1277 :             writer: file,
     232         1277 :             _gate_guard: gate_guard,
     233         1277 :             ctx,
     234         1277 :         }
     235         1277 :     }
     236              : 
     237              :     /// Runs the background flush task.
     238              :     /// The passed in slice is immediately sent back to the flush handle through the duplex channel.
     239         1277 :     async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
     240         1277 :         // Sends the extra buffer back to the handle.
     241         1277 :         self.channel.send(slice).await.map_err(|_| {
     242            0 :             std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
     243         1277 :         })?;
     244              : 
     245              :         //  Exit condition: channel is closed and there is no remaining buffer to be flushed
     246         7896 :         while let Some(request) = self.channel.recv().await {
     247              :             #[cfg(test)]
     248              :             {
     249              :                 // In test, wait for control to signal that we are ready to flush.
     250         6619 :                 if request.ready_to_flush_rx.await.is_err() {
     251            5 :                     tracing::debug!("control dropped");
     252         6614 :                 }
     253              :             }
     254              : 
     255              :             // Write slice to disk at `offset`.
     256         6619 :             let slice = self
     257         6619 :                 .writer
     258         6619 :                 .write_all_at(request.slice, request.offset, &self.ctx)
     259         6619 :                 .await?;
     260              : 
     261              :             #[cfg(test)]
     262              :             {
     263              :                 // In test, tell control we are done flushing buffer.
     264         6619 :                 if request.done_flush_tx.send(()).is_err() {
     265            5 :                     tracing::debug!("control dropped");
     266         6614 :                 }
     267              :             }
     268              : 
     269              :             // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
     270         6619 :             if self.channel.send(slice).await.is_err() {
     271              :                 // Although channel is closed. Still need to finish flushing the remaining buffers.
     272            0 :                 continue;
     273         6619 :             }
     274              :         }
     275              : 
     276         1140 :         Ok(self.writer)
     277         1140 :     }
     278              : }
     279              : 
     280              : #[cfg(test)]
     281              : pub(crate) struct FlushNotStarted {
     282              :     ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
     283              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     284              : }
     285              : 
     286              : #[cfg(test)]
     287              : pub(crate) struct FlushInProgress {
     288              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     289              : }
     290              : 
     291              : #[cfg(test)]
     292              : pub(crate) struct FlushDone;
     293              : 
     294              : #[cfg(test)]
     295              : impl FlushNotStarted {
     296              :     /// Signals the background task the buffer is ready to flush to disk.
     297         6614 :     pub fn ready_to_flush(self) -> FlushInProgress {
     298         6614 :         self.ready_to_flush_tx
     299         6614 :             .send(())
     300         6614 :             .map(|_| FlushInProgress {
     301         6614 :                 done_flush_rx: self.done_flush_rx,
     302         6614 :             })
     303         6614 :             .unwrap()
     304         6614 :     }
     305              : }
     306              : 
     307              : #[cfg(test)]
     308              : impl FlushInProgress {
     309              :     /// Waits until background flush is done.
     310         6614 :     pub async fn wait_until_flush_is_done(self) -> FlushDone {
     311         6614 :         self.done_flush_rx.await.unwrap();
     312         6614 :         FlushDone
     313         6614 :     }
     314              : }
        

Generated by: LCOV version 2.1-beta