LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/write - flush.rs (source / functions) Coverage Total Hit
Test: 309d74c7615cee485c3506897fba3db69780454f.info Lines: 84.4 % 180 152
Test Date: 2025-03-19 16:14:43 Functions: 85.3 % 34 29

            Line data    Source code
       1              : use std::ops::ControlFlow;
       2              : use std::sync::Arc;
       3              : 
       4              : use tokio_util::sync::CancellationToken;
       5              : use tracing::{Instrument, info, info_span, warn};
       6              : use utils::sync::duplex;
       7              : 
       8              : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
       9              : use crate::context::RequestContext;
      10              : use crate::virtual_file::MaybeFatalIo;
      11              : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
      12              : use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
      13              : 
      14              : /// A handle to the flush task.
      15              : pub struct FlushHandle<Buf, W> {
      16              :     inner: Option<FlushHandleInner<Buf, W>>,
      17              : }
      18              : 
      19              : pub struct FlushHandleInner<Buf, W> {
      20              :     /// A bi-directional channel that sends (buffer, offset) for writes,
      21              :     /// and receives recyled buffer.
      22              :     channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
      23              :     /// Join handle for the background flush task.
      24              :     join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
      25              : }
      26              : 
      27              : struct FlushRequest<Buf> {
      28              :     slice: FullSlice<Buf>,
      29              :     offset: u64,
      30              :     #[cfg(test)]
      31              :     ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
      32              :     #[cfg(test)]
      33              :     done_flush_tx: tokio::sync::oneshot::Sender<()>,
      34              : }
      35              : 
      36              : /// Constructs a request and a control object for a new flush operation.
      37              : #[cfg(not(test))]
      38            0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      39            0 :     let request = FlushRequest { slice, offset };
      40            0 :     let control = FlushControl::untracked();
      41            0 : 
      42            0 :     (request, control)
      43            0 : }
      44              : 
      45              : /// Constructs a request and a control object for a new flush operation.
      46              : #[cfg(test)]
      47        13246 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      48        13246 :     let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
      49        13246 :     let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
      50        13246 :     let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
      51        13246 : 
      52        13246 :     let request = FlushRequest {
      53        13246 :         slice,
      54        13246 :         offset,
      55        13246 :         ready_to_flush_rx,
      56        13246 :         done_flush_tx,
      57        13246 :     };
      58        13246 :     (request, control)
      59        13246 : }
      60              : 
      61              : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
      62              : #[cfg(test)]
      63              : pub(crate) struct FlushControl {
      64              :     not_started: FlushNotStarted,
      65              : }
      66              : 
      67              : #[cfg(not(test))]
      68              : pub(crate) struct FlushControl;
      69              : 
      70              : impl FlushControl {
      71              :     #[cfg(test)]
      72        13246 :     fn not_started(
      73        13246 :         ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
      74        13246 :         done_flush_rx: tokio::sync::oneshot::Receiver<()>,
      75        13246 :     ) -> Self {
      76        13246 :         FlushControl {
      77        13246 :             not_started: FlushNotStarted {
      78        13246 :                 ready_to_flush_tx,
      79        13246 :                 done_flush_rx,
      80        13246 :             },
      81        13246 :         }
      82        13246 :     }
      83              : 
      84              :     #[cfg(not(test))]
      85            0 :     fn untracked() -> Self {
      86            0 :         FlushControl
      87            0 :     }
      88              : 
      89              :     /// In tests, turn flush control into a not started state.
      90              :     #[cfg(test)]
      91            4 :     pub(crate) fn into_not_started(self) -> FlushNotStarted {
      92            4 :         self.not_started
      93            4 :     }
      94              : 
      95              :     /// Release control to the submitted buffer.
      96              :     ///
      97              :     /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
      98        13224 :     pub async fn release(self) {
      99        13224 :         #[cfg(test)]
     100        13224 :         {
     101        13224 :             self.not_started
     102        13224 :                 .ready_to_flush()
     103        13224 :                 .wait_until_flush_is_done()
     104        13224 :                 .await;
     105            0 :         }
     106        13224 :     }
     107              : }
     108              : 
     109              : impl<Buf, W> FlushHandle<Buf, W>
     110              : where
     111              :     Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
     112              :     W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
     113              : {
     114              :     /// Spawns a new background flush task and obtains a handle.
     115              :     ///
     116              :     /// Handle and background task are connected through a duplex channel.
     117              :     /// Dirty buffers are sent to the background task for flushing.
     118              :     /// Clean buffers are sent back to the handle for reuse.
     119              :     ///
     120              :     /// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
     121              :     /// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
     122         2650 :     pub fn spawn_new<B>(
     123         2650 :         file: Arc<W>,
     124         2650 :         buf: B,
     125         2650 :         gate_guard: utils::sync::gate::GateGuard,
     126         2650 :         cancel: CancellationToken,
     127         2650 :         ctx: RequestContext,
     128         2650 :         span: tracing::Span,
     129         2650 :     ) -> Self
     130         2650 :     where
     131         2650 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     132         2650 :     {
     133         2650 :         let (front, back) = duplex::mpsc::channel(1);
     134         2650 :         back.try_send(buf.flush())
     135         2650 :             .expect("we just created it with capacity 1");
     136         2650 : 
     137         2650 :         let join_handle = tokio::spawn(
     138         2650 :             FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx)
     139         2650 :                 .run()
     140         2650 :                 .instrument(span),
     141         2650 :         );
     142         2650 : 
     143         2650 :         FlushHandle {
     144         2650 :             inner: Some(FlushHandleInner {
     145         2650 :                 channel: front,
     146         2650 :                 join_handle,
     147         2650 :             }),
     148         2650 :         }
     149         2650 :     }
     150              : 
     151              :     /// Submits a buffer to be flushed in the background task.
     152              :     /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
     153              :     /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
     154              :     /// clear `maybe_flushed`.
     155        13246 :     pub async fn flush(
     156        13246 :         &mut self,
     157        13246 :         slice: FullSlice<Buf>,
     158        13246 :         offset: u64,
     159        13246 :     ) -> Result<(FullSlice<Buf>, FlushControl), FlushTaskError> {
     160        13246 :         let (request, flush_control) = new_flush_op(slice, offset);
     161              : 
     162              :         // Submits the buffer to the background task.
     163        13246 :         let submit = self.inner_mut().channel.send(request).await;
     164        13246 :         if submit.is_err() {
     165            0 :             return self.handle_error().await;
     166        13246 :         }
     167              : 
     168              :         // Wait for an available buffer from the background flush task.
     169              :         // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
     170              :         // then the write path will eventually wait for it here.
     171        13246 :         let Some(recycled) = self.inner_mut().channel.recv().await else {
     172            0 :             return self.handle_error().await;
     173              :         };
     174              : 
     175        13246 :         Ok((recycled, flush_control))
     176        13246 :     }
     177              : 
     178            0 :     async fn handle_error<T>(&mut self) -> Result<T, FlushTaskError> {
     179            0 :         Err(self
     180            0 :             .shutdown()
     181            0 :             .await
     182            0 :             .expect_err("flush task only disconnects duplex if it exits with an error"))
     183            0 :     }
     184              : 
     185              :     /// Cleans up the channel, join the flush task.
     186           18 :     pub async fn shutdown(&mut self) -> Result<Arc<W>, FlushTaskError> {
     187           18 :         let handle = self
     188           18 :             .inner
     189           18 :             .take()
     190           18 :             .expect("must not use after we returned an error");
     191           18 :         drop(handle.channel.tx);
     192           18 :         handle.join_handle.await.unwrap()
     193           18 :     }
     194              : 
     195              :     /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
     196              :     /// This only happens if the handle is used after an error.
     197        26492 :     fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
     198        26492 :         self.inner
     199        26492 :             .as_mut()
     200        26492 :             .expect("must not use after we returned an error")
     201        26492 :     }
     202              : }
     203              : 
     204              : /// A background task for flushing data to disk.
     205              : pub struct FlushBackgroundTask<Buf, W> {
     206              :     /// A bi-directional channel that receives (buffer, offset) for writes,
     207              :     /// and send back recycled buffer.
     208              :     channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
     209              :     /// A writter for persisting data to disk.
     210              :     writer: Arc<W>,
     211              :     ctx: RequestContext,
     212              :     cancel: CancellationToken,
     213              :     /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
     214              :     _gate_guard: utils::sync::gate::GateGuard,
     215              : }
     216              : 
     217              : #[derive(Debug, thiserror::Error)]
     218              : pub enum FlushTaskError {
     219              :     #[error("flush task cancelled")]
     220              :     Cancelled,
     221              : }
     222              : 
     223              : impl<Buf, W> FlushBackgroundTask<Buf, W>
     224              : where
     225              :     Buf: IoBufAligned + Send + Sync,
     226              :     W: OwnedAsyncWriter + Sync + 'static,
     227              : {
     228              :     /// Creates a new background flush task.
     229         2650 :     fn new(
     230         2650 :         channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
     231         2650 :         file: Arc<W>,
     232         2650 :         gate_guard: utils::sync::gate::GateGuard,
     233         2650 :         cancel: CancellationToken,
     234         2650 :         ctx: RequestContext,
     235         2650 :     ) -> Self {
     236         2650 :         FlushBackgroundTask {
     237         2650 :             channel,
     238         2650 :             writer: file,
     239         2650 :             _gate_guard: gate_guard,
     240         2650 :             cancel,
     241         2650 :             ctx,
     242         2650 :         }
     243         2650 :     }
     244              : 
     245              :     /// Runs the background flush task.
     246         2650 :     async fn run(mut self) -> Result<Arc<W>, FlushTaskError> {
     247              :         //  Exit condition: channel is closed and there is no remaining buffer to be flushed
     248        15874 :         while let Some(request) = self.channel.recv().await {
     249              :             #[cfg(test)]
     250              :             {
     251              :                 // In test, wait for control to signal that we are ready to flush.
     252        13246 :                 if request.ready_to_flush_rx.await.is_err() {
     253           18 :                     tracing::debug!("control dropped");
     254        13228 :                 }
     255              :             }
     256              : 
     257              :             // Write slice to disk at `offset`.
     258              :             //
     259              :             // Error handling happens according to the current policy of crashing
     260              :             // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
     261              :             // (The upper layers of the Pageserver write path are not equipped to retry write errors
     262              :             //  becasuse they often deallocate the buffers that were already written).
     263              :             //
     264              :             // TODO: use utils::backoff::retry once async closures are actually usable
     265              :             //
     266        13246 :             let mut slice_storage = Some(request.slice);
     267        13246 :             for attempt in 1.. {
     268        13246 :                 if self.cancel.is_cancelled() {
     269            0 :                     return Err(FlushTaskError::Cancelled);
     270        13246 :                 }
     271        13246 :                 let result = async {
     272        13246 :                     if attempt > 1 {
     273            0 :                         info!("retrying flush");
     274        13246 :                     }
     275        13246 :                     let slice = slice_storage.take().expect(
     276        13246 :                         "likely previous invocation of this future didn't get polled to completion",
     277        13246 :                     );
     278              :                     // Don't cancel this write by doing tokio::select with self.cancel.cancelled().
     279              :                     // The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources.
     280              :                     // If we retry indefinitely, we'll deplete those resources.
     281              :                     // Future: teach tokio-epoll-uring io_uring operation cancellation, but still,
     282              :                     // wait for cancelled ops to complete and discard their error.
     283        13246 :                     let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
     284        13246 :                     slice_storage = Some(slice);
     285        13246 :                     let res = res.maybe_fatal_err("owned_buffers_io flush");
     286        13246 :                     let Err(err) = res else {
     287        13246 :                         return ControlFlow::Break(());
     288              :                     };
     289            0 :                     warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
     290            0 :                     utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await;
     291            0 :                     ControlFlow::Continue(())
     292        13246 :                 }
     293        13246 :                 .instrument(info_span!("flush_attempt", %attempt))
     294        13246 :                 .await;
     295        13246 :                 match result {
     296        13246 :                     ControlFlow::Break(()) => break,
     297            0 :                     ControlFlow::Continue(()) => continue,
     298              :                 }
     299              :             }
     300        13246 :             let slice = slice_storage.expect("loop must have run at least once");
     301        13246 : 
     302        13246 :             #[cfg(test)]
     303        13246 :             {
     304        13246 :                 // In test, tell control we are done flushing buffer.
     305        13246 :                 if request.done_flush_tx.send(()).is_err() {
     306           18 :                     tracing::debug!("control dropped");
     307        13228 :                 }
     308            0 :             }
     309            0 : 
     310            0 :             // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
     311        13246 :             if self.channel.send(slice).await.is_err() {
     312              :                 // Although channel is closed. Still need to finish flushing the remaining buffers.
     313            0 :                 continue;
     314        13246 :             }
     315              :         }
     316              : 
     317         2369 :         Ok(self.writer)
     318         2369 :     }
     319              : }
     320              : 
     321              : #[cfg(test)]
     322              : pub(crate) struct FlushNotStarted {
     323              :     ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
     324              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     325              : }
     326              : 
     327              : #[cfg(test)]
     328              : pub(crate) struct FlushInProgress {
     329              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     330              : }
     331              : 
     332              : #[cfg(test)]
     333              : pub(crate) struct FlushDone;
     334              : 
     335              : #[cfg(test)]
     336              : impl FlushNotStarted {
     337              :     /// Signals the background task the buffer is ready to flush to disk.
     338        13228 :     pub fn ready_to_flush(self) -> FlushInProgress {
     339        13228 :         self.ready_to_flush_tx
     340        13228 :             .send(())
     341        13228 :             .map(|_| FlushInProgress {
     342        13228 :                 done_flush_rx: self.done_flush_rx,
     343        13228 :             })
     344        13228 :             .unwrap()
     345        13228 :     }
     346              : }
     347              : 
     348              : #[cfg(test)]
     349              : impl FlushInProgress {
     350              :     /// Waits until background flush is done.
     351        13228 :     pub async fn wait_until_flush_is_done(self) -> FlushDone {
     352        13228 :         self.done_flush_rx.await.unwrap();
     353        13228 :         FlushDone
     354        13228 :     }
     355              : }
        

Generated by: LCOV version 2.1-beta