LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/write - flush.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 85.3 % 224 191
Test Date: 2025-07-16 12:29:03 Functions: 74.6 % 71 53

            Line data    Source code
       1              : use std::ops::ControlFlow;
       2              : 
       3              : use tokio_util::sync::CancellationToken;
       4              : use tracing::{Instrument, info_span, warn};
       5              : use utils::sync::duplex;
       6              : 
       7              : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
       8              : use crate::context::RequestContext;
       9              : use crate::virtual_file::MaybeFatalIo;
      10              : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
      11              : use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
      12              : 
      13              : /// A handle to the flush task.
      14              : pub struct FlushHandle<Buf, W> {
      15              :     inner: Option<FlushHandleInner<Buf, W>>,
      16              : }
      17              : 
      18              : pub struct FlushHandleInner<Buf, W> {
      19              :     /// A bi-directional channel that sends (buffer, offset) for writes,
      20              :     /// and receives recyled buffer.
      21              :     channel: duplex::mpsc::Duplex<Request<Buf>, FullSlice<Buf>>,
      22              :     /// Join handle for the background flush task.
      23              :     join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
      24              : }
      25              : 
      26              : struct FlushRequest<Buf> {
      27              :     slice: FullSlice<Buf>,
      28              :     offset: u64,
      29              :     #[cfg(test)]
      30              :     ready_to_flush_rx: Option<tokio::sync::oneshot::Receiver<()>>,
      31              :     #[cfg(test)]
      32              :     done_flush_tx: Option<tokio::sync::oneshot::Sender<()>>,
      33              : }
      34              : 
      35              : pub struct ShutdownRequest {
      36              :     pub set_len: Option<u64>,
      37              : }
      38              : 
      39              : enum Request<Buf> {
      40              :     Flush(FlushRequest<Buf>),
      41              :     Shutdown(ShutdownRequest),
      42              : }
      43              : 
      44              : impl<Buf> Request<Buf> {
      45        12769 :     fn op_str(&self) -> &'static str {
      46        12769 :         match self {
      47        11817 :             Request::Flush(_) => "flush",
      48          952 :             Request::Shutdown(_) => "shutdown",
      49              :         }
      50        12769 :     }
      51              : }
      52              : 
      53              : /// Constructs a request and a control object for a new flush operation.
      54              : #[cfg(not(test))]
      55            0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      56            0 :     let request = FlushRequest { slice, offset };
      57            0 :     let control = FlushControl::untracked();
      58              : 
      59            0 :     (request, control)
      60            0 : }
      61              : 
      62              : /// Constructs a request and a control object for a new flush operation.
      63              : #[cfg(test)]
      64        11817 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      65        11817 :     let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
      66        11817 :     let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
      67        11817 :     let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
      68              : 
      69        11817 :     let request = FlushRequest {
      70        11817 :         slice,
      71        11817 :         offset,
      72        11817 :         ready_to_flush_rx: Some(ready_to_flush_rx),
      73        11817 :         done_flush_tx: Some(done_flush_tx),
      74        11817 :     };
      75        11817 :     (request, control)
      76        11817 : }
      77              : 
      78              : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
      79              : #[cfg(test)]
      80              : pub(crate) struct FlushControl {
      81              :     not_started: FlushNotStarted,
      82              : }
      83              : 
      84              : #[cfg(not(test))]
      85              : pub(crate) struct FlushControl;
      86              : 
      87              : impl FlushControl {
      88              :     #[cfg(test)]
      89        11817 :     fn not_started(
      90        11817 :         ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
      91        11817 :         done_flush_rx: tokio::sync::oneshot::Receiver<()>,
      92        11817 :     ) -> Self {
      93        11817 :         FlushControl {
      94        11817 :             not_started: FlushNotStarted {
      95        11817 :                 ready_to_flush_tx,
      96        11817 :                 done_flush_rx,
      97        11817 :             },
      98        11817 :         }
      99        11817 :     }
     100              : 
     101              :     #[cfg(not(test))]
     102            0 :     fn untracked() -> Self {
     103            0 :         FlushControl
     104            0 :     }
     105              : 
     106              :     /// In tests, turn flush control into a not started state.
     107              :     #[cfg(test)]
     108            1 :     pub(crate) fn into_not_started(self) -> FlushNotStarted {
     109            1 :         self.not_started
     110            1 :     }
     111              : 
     112              :     /// Release control to the submitted buffer.
     113              :     ///
     114              :     /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
     115        10866 :     pub async fn release(self) {
     116              :         #[cfg(test)]
     117              :         {
     118        10866 :             self.not_started
     119        10866 :                 .ready_to_flush()
     120        10866 :                 .wait_until_flush_is_done()
     121        10866 :                 .await;
     122              :         }
     123        10866 :     }
     124              : }
     125              : 
     126              : impl<Buf, W> FlushHandle<Buf, W>
     127              : where
     128              :     Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
     129              :     W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
     130              : {
     131              :     /// Spawns a new background flush task and obtains a handle.
     132              :     ///
     133              :     /// Handle and background task are connected through a duplex channel.
     134              :     /// Dirty buffers are sent to the background task for flushing.
     135              :     /// Clean buffers are sent back to the handle for reuse.
     136              :     ///
     137              :     /// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
     138              :     /// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
     139         1809 :     pub fn spawn_new<B>(
     140         1809 :         file: W,
     141         1809 :         buf: B,
     142         1809 :         gate_guard: utils::sync::gate::GateGuard,
     143         1809 :         cancel: CancellationToken,
     144         1809 :         ctx: RequestContext,
     145         1809 :         span: tracing::Span,
     146         1809 :     ) -> Self
     147         1809 :     where
     148         1809 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     149              :     {
     150         1809 :         let (front, back) = duplex::mpsc::channel(1);
     151         1809 :         back.try_send(buf.flush())
     152         1809 :             .expect("we just created it with capacity 1");
     153              : 
     154         1809 :         let join_handle = tokio::spawn(
     155         1809 :             FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx)
     156         1809 :                 .run()
     157         1809 :                 .instrument(span),
     158              :         );
     159              : 
     160         1809 :         FlushHandle {
     161         1809 :             inner: Some(FlushHandleInner {
     162         1809 :                 channel: front,
     163         1809 :                 join_handle,
     164         1809 :             }),
     165         1809 :         }
     166         1809 :     }
     167              : 
     168              :     /// Submits a buffer to be flushed in the background task.
     169              :     /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
     170              :     /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
     171              :     /// clear `maybe_flushed`.
     172        11817 :     pub async fn flush(
     173        11817 :         &mut self,
     174        11817 :         slice: FullSlice<Buf>,
     175        11817 :         offset: u64,
     176        11817 :     ) -> Result<(FullSlice<Buf>, FlushControl), FlushTaskError> {
     177        11817 :         let (request, flush_control) = new_flush_op(slice, offset);
     178              : 
     179              :         // Submits the buffer to the background task.
     180        11817 :         self.send(Request::Flush(request)).await?;
     181              : 
     182              :         // Wait for an available buffer from the background flush task.
     183              :         // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
     184              :         // then the write path will eventually wait for it here.
     185        11817 :         let Some(recycled) = self.inner_mut().channel.recv().await else {
     186            0 :             return self.handle_error().await;
     187              :         };
     188              : 
     189        11817 :         Ok((recycled, flush_control))
     190        11817 :     }
     191              : 
     192              :     /// Sends poison pill to flush task and waits for it to exit.
     193          952 :     pub async fn shutdown(&mut self, req: ShutdownRequest) -> Result<W, FlushTaskError> {
     194          952 :         self.send(Request::Shutdown(req)).await?;
     195          952 :         self.wait().await
     196          952 :     }
     197              : 
     198        12769 :     async fn send(&mut self, request: Request<Buf>) -> Result<(), FlushTaskError> {
     199        12769 :         let submit = self.inner_mut().channel.send(request).await;
     200        12769 :         if submit.is_err() {
     201            0 :             return self.handle_error().await;
     202        12769 :         }
     203        12769 :         Ok(())
     204        12769 :     }
     205              : 
     206            0 :     async fn handle_error<T>(&mut self) -> Result<T, FlushTaskError> {
     207            0 :         Err(self
     208            0 :             .wait()
     209            0 :             .await
     210            0 :             .expect_err("flush task only disconnects duplex if it exits with an error"))
     211            0 :     }
     212              : 
     213          952 :     async fn wait(&mut self) -> Result<W, FlushTaskError> {
     214          952 :         let handle = self
     215          952 :             .inner
     216          952 :             .take()
     217          952 :             .expect("must not use after we returned an error");
     218          952 :         drop(handle.channel.tx);
     219          952 :         handle.join_handle.await.unwrap()
     220          952 :     }
     221              : 
     222              :     /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
     223              :     /// This only happens if the handle is used after an error.
     224        24586 :     fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
     225        24586 :         self.inner
     226        24586 :             .as_mut()
     227        24586 :             .expect("must not use after we returned an error")
     228        24586 :     }
     229              : }
     230              : 
     231              : /// A background task for flushing data to disk.
     232              : pub struct FlushBackgroundTask<Buf, W> {
     233              :     /// A bi-directional channel that receives (buffer, offset) for writes,
     234              :     /// and send back recycled buffer.
     235              :     channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
     236              :     /// A writter for persisting data to disk.
     237              :     writer: W,
     238              :     ctx: RequestContext,
     239              :     cancel: CancellationToken,
     240              :     /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
     241              :     _gate_guard: utils::sync::gate::GateGuard,
     242              : }
     243              : 
     244              : #[derive(Debug, thiserror::Error)]
     245              : pub enum FlushTaskError {
     246              :     #[error("flush task cancelled")]
     247              :     Cancelled,
     248              : }
     249              : 
     250              : impl FlushTaskError {
     251            0 :     pub fn is_cancel(&self) -> bool {
     252            0 :         match self {
     253            0 :             FlushTaskError::Cancelled => true,
     254              :         }
     255            0 :     }
     256            0 :     pub fn into_anyhow(self) -> anyhow::Error {
     257            0 :         match self {
     258            0 :             FlushTaskError::Cancelled => anyhow::anyhow!(self),
     259              :         }
     260            0 :     }
     261              : }
     262              : 
     263              : impl<Buf, W> FlushBackgroundTask<Buf, W>
     264              : where
     265              :     Buf: IoBufAligned + Send + Sync,
     266              :     W: OwnedAsyncWriter + Sync + 'static,
     267              : {
     268              :     /// Creates a new background flush task.
     269         1809 :     fn new(
     270         1809 :         channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
     271         1809 :         file: W,
     272         1809 :         gate_guard: utils::sync::gate::GateGuard,
     273         1809 :         cancel: CancellationToken,
     274         1809 :         ctx: RequestContext,
     275         1809 :     ) -> Self {
     276         1809 :         FlushBackgroundTask {
     277         1809 :             channel,
     278         1809 :             writer: file,
     279         1809 :             _gate_guard: gate_guard,
     280         1809 :             cancel,
     281         1809 :             ctx,
     282         1809 :         }
     283         1809 :     }
     284              : 
     285              :     /// Runs the background flush task.
     286         1809 :     async fn run(mut self) -> Result<W, FlushTaskError> {
     287              :         //  Exit condition: channel is closed and there is no remaining buffer to be flushed
     288        14574 :         while let Some(request) = self.channel.recv().await {
     289        12769 :             let op_kind = request.op_str();
     290              : 
     291              :             // Perform the requested operation.
     292              :             //
     293              :             // Error handling happens according to the current policy of crashing
     294              :             // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
     295              :             // (The upper layers of the Pageserver write path are not equipped to retry write errors
     296              :             //  becasuse they often deallocate the buffers that were already written).
     297              :             //
     298              :             // TODO: use utils::backoff::retry once async closures are actually usable
     299              :             //
     300        12769 :             let mut request_storage = Some(request);
     301        12769 :             for attempt in 1.. {
     302        12769 :                 if self.cancel.is_cancelled() {
     303            0 :                     return Err(FlushTaskError::Cancelled);
     304        12769 :                 }
     305        12769 :                 let result = async {
     306        12769 :                     let request: Request<Buf> = request_storage .take().expect(
     307        12769 :                         "likely previous invocation of this future didn't get polled to completion",
     308              :                     );
     309          952 :                     match &request {
     310              :                         Request::Shutdown(ShutdownRequest { set_len: None }) => {
     311          944 :                             request_storage = Some(request);
     312          944 :                             return ControlFlow::Break(());
     313              :                         },
     314        11825 :                         Request::Flush(_) | Request::Shutdown(ShutdownRequest { set_len: Some(_) }) => {
     315        11825 :                         },
     316              :                     }
     317        11825 :                     if attempt > 1 {
     318            0 :                         warn!(op=%request.op_str(), "retrying");
     319        11825 :                     }
     320              :                     // borrows so we can async move the requests into async block while not moving these borrows here
     321        11825 :                     let writer = &self.writer;
     322        11825 :                     let request_storage = &mut request_storage;
     323        11825 :                     let ctx = &self.ctx;
     324        11825 :                     let io_fut = match request {
     325        11817 :                         Request::Flush(FlushRequest { slice, offset, #[cfg(test)] ready_to_flush_rx, #[cfg(test)] done_flush_tx }) => futures::future::Either::Left(async move {
     326              :                             #[cfg(test)]
     327        11817 :                             if let Some(ready_to_flush_rx) = ready_to_flush_rx {
     328              :                                 {
     329              :                                     // In test, wait for control to signal that we are ready to flush.
     330        11817 :                                     if ready_to_flush_rx.await.is_err() {
     331          950 :                                         tracing::debug!("control dropped");
     332        10867 :                                     }
     333              :                                 }
     334            0 :                             }
     335        11817 :                             let (slice, res) = writer.write_all_at(slice, offset, ctx).await;
     336        11817 :                             *request_storage = Some(Request::Flush(FlushRequest {
     337        11817 :                                 slice,
     338        11817 :                                 offset,
     339        11817 :                                 #[cfg(test)]
     340        11817 :                                 ready_to_flush_rx: None, // the contract is that we notify before first attempt
     341        11817 :                                 #[cfg(test)]
     342        11817 :                                 done_flush_tx
     343        11817 :                             }));
     344        11817 :                             res
     345        11817 :                         }),
     346            8 :                         Request::Shutdown(ShutdownRequest { set_len }) => futures::future::Either::Right(async move {
     347            8 :                             let set_len = set_len.expect("we filter out the None case above");
     348            8 :                             let res = writer.set_len(set_len, ctx).await;
     349            8 :                             *request_storage = Some(Request::Shutdown(ShutdownRequest {
     350            8 :                                 set_len: Some(set_len),
     351            8 :                             }));
     352            8 :                             res
     353            8 :                         }),
     354              :                     };
     355              :                     // Don't cancel the io_fut by doing tokio::select with self.cancel.cancelled().
     356              :                     // The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources.
     357              :                     // If we retry indefinitely, we'll deplete those resources.
     358              :                     // Future: teach tokio-epoll-uring io_uring operation cancellation, but still,
     359              :                     // wait for cancelled ops to complete and discard their error.
     360        11825 :                     let res = io_fut.await;
     361        11825 :                     let res = res.maybe_fatal_err("owned_buffers_io flush");
     362        11825 :                     let Err(err) = res else {
     363        11825 :                         if attempt > 1 {
     364            0 :                             warn!(op=%op_kind, "retry succeeded");
     365        11825 :                         }
     366        11825 :                         return ControlFlow::Break(());
     367              :                     };
     368            0 :                     warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
     369            0 :                     utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await;
     370            0 :                     ControlFlow::Continue(())
     371        12769 :                 }
     372        12769 :                 .instrument(info_span!("attempt", %attempt, %op_kind))
     373        12769 :                 .await;
     374        12769 :                 match result {
     375        12769 :                     ControlFlow::Break(()) => break,
     376            0 :                     ControlFlow::Continue(()) => continue,
     377              :                 }
     378              :             }
     379        12769 :             let request = request_storage.expect("loop must have run at least once");
     380              : 
     381        12769 :             let slice = match request {
     382              :                 Request::Flush(FlushRequest {
     383        11817 :                     slice,
     384              :                     #[cfg(test)]
     385        11817 :                     mut done_flush_tx,
     386              :                     ..
     387              :                 }) => {
     388              :                     #[cfg(test)]
     389              :                     {
     390              :                         // In test, tell control we are done flushing buffer.
     391        11817 :                         if done_flush_tx.take().expect("always Some").send(()).is_err() {
     392          950 :                             tracing::debug!("control dropped");
     393        10867 :                         }
     394              :                     }
     395        11817 :                     slice
     396              :                 }
     397              :                 Request::Shutdown(_) => {
     398              :                     // next iteration will observe recv() returning None
     399          952 :                     continue;
     400              :                 }
     401              :             };
     402              : 
     403              :             // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
     404        11817 :             let send_res = self.channel.send(slice).await;
     405        11817 :             if send_res.is_err() {
     406              :                 // Although channel is closed. Still need to finish flushing the remaining buffers.
     407            0 :                 continue;
     408        11817 :             }
     409              :         }
     410              : 
     411         1727 :         Ok(self.writer)
     412         1727 :     }
     413              : }
     414              : 
     415              : #[cfg(test)]
     416              : pub(crate) struct FlushNotStarted {
     417              :     ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
     418              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     419              : }
     420              : 
     421              : #[cfg(test)]
     422              : pub(crate) struct FlushInProgress {
     423              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     424              : }
     425              : 
     426              : #[cfg(test)]
     427              : pub(crate) struct FlushDone;
     428              : 
     429              : #[cfg(test)]
     430              : impl FlushNotStarted {
     431              :     /// Signals the background task the buffer is ready to flush to disk.
     432        10867 :     pub fn ready_to_flush(self) -> FlushInProgress {
     433        10867 :         self.ready_to_flush_tx
     434        10867 :             .send(())
     435        10867 :             .map(|_| FlushInProgress {
     436        10867 :                 done_flush_rx: self.done_flush_rx,
     437        10867 :             })
     438        10867 :             .unwrap()
     439        10867 :     }
     440              : }
     441              : 
     442              : #[cfg(test)]
     443              : impl FlushInProgress {
     444              :     /// Waits until background flush is done.
     445        10867 :     pub async fn wait_until_flush_is_done(self) -> FlushDone {
     446        10867 :         self.done_flush_rx.await.unwrap();
     447        10867 :         FlushDone
     448        10867 :     }
     449              : }
        

Generated by: LCOV version 2.1-beta