LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/write - flush.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 88.6 % 246 218
Test Date: 2025-04-24 20:31:15 Functions: 76.8 % 69 53

            Line data    Source code
       1              : use std::ops::ControlFlow;
       2              : 
       3              : use tokio_util::sync::CancellationToken;
       4              : use tracing::{Instrument, info_span, warn};
       5              : use utils::sync::duplex;
       6              : 
       7              : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
       8              : use crate::context::RequestContext;
       9              : use crate::virtual_file::MaybeFatalIo;
      10              : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
      11              : use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
      12              : 
      13              : /// A handle to the flush task.
      14              : pub struct FlushHandle<Buf, W> {
      15              :     inner: Option<FlushHandleInner<Buf, W>>,
      16              : }
      17              : 
      18              : pub struct FlushHandleInner<Buf, W> {
      19              :     /// A bi-directional channel that sends (buffer, offset) for writes,
      20              :     /// and receives recyled buffer.
      21              :     channel: duplex::mpsc::Duplex<Request<Buf>, FullSlice<Buf>>,
      22              :     /// Join handle for the background flush task.
      23              :     join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
      24              : }
      25              : 
      26              : struct FlushRequest<Buf> {
      27              :     slice: FullSlice<Buf>,
      28              :     offset: u64,
      29              :     #[cfg(test)]
      30              :     ready_to_flush_rx: Option<tokio::sync::oneshot::Receiver<()>>,
      31              :     #[cfg(test)]
      32              :     done_flush_tx: Option<tokio::sync::oneshot::Sender<()>>,
      33              : }
      34              : 
      35              : pub struct ShutdownRequest {
      36              :     pub set_len: Option<u64>,
      37              : }
      38              : 
      39              : enum Request<Buf> {
      40              :     Flush(FlushRequest<Buf>),
      41              :     Shutdown(ShutdownRequest),
      42              : }
      43              : 
      44              : impl<Buf> Request<Buf> {
      45       152904 :     fn op_str(&self) -> &'static str {
      46       152904 :         match self {
      47       141648 :             Request::Flush(_) => "flush",
      48        11256 :             Request::Shutdown(_) => "shutdown",
      49              :         }
      50       152904 :     }
      51              : }
      52              : 
      53              : /// Constructs a request and a control object for a new flush operation.
      54              : #[cfg(not(test))]
      55            0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      56            0 :     let request = FlushRequest { slice, offset };
      57            0 :     let control = FlushControl::untracked();
      58            0 : 
      59            0 :     (request, control)
      60            0 : }
      61              : 
      62              : /// Constructs a request and a control object for a new flush operation.
      63              : #[cfg(test)]
      64       141648 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      65       141648 :     let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
      66       141648 :     let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
      67       141648 :     let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
      68       141648 : 
      69       141648 :     let request = FlushRequest {
      70       141648 :         slice,
      71       141648 :         offset,
      72       141648 :         ready_to_flush_rx: Some(ready_to_flush_rx),
      73       141648 :         done_flush_tx: Some(done_flush_tx),
      74       141648 :     };
      75       141648 :     (request, control)
      76       141648 : }
      77              : 
      78              : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
      79              : #[cfg(test)]
      80              : pub(crate) struct FlushControl {
      81              :     not_started: FlushNotStarted,
      82              : }
      83              : 
      84              : #[cfg(not(test))]
      85              : pub(crate) struct FlushControl;
      86              : 
      87              : impl FlushControl {
      88              :     #[cfg(test)]
      89       141648 :     fn not_started(
      90       141648 :         ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
      91       141648 :         done_flush_rx: tokio::sync::oneshot::Receiver<()>,
      92       141648 :     ) -> Self {
      93       141648 :         FlushControl {
      94       141648 :             not_started: FlushNotStarted {
      95       141648 :                 ready_to_flush_tx,
      96       141648 :                 done_flush_rx,
      97       141648 :             },
      98       141648 :         }
      99       141648 :     }
     100              : 
     101              :     #[cfg(not(test))]
     102            0 :     fn untracked() -> Self {
     103            0 :         FlushControl
     104            0 :     }
     105              : 
     106              :     /// In tests, turn flush control into a not started state.
     107              :     #[cfg(test)]
     108           12 :     pub(crate) fn into_not_started(self) -> FlushNotStarted {
     109           12 :         self.not_started
     110           12 :     }
     111              : 
     112              :     /// Release control to the submitted buffer.
     113              :     ///
     114              :     /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
     115       130404 :     pub async fn release(self) {
     116       130404 :         #[cfg(test)]
     117       130404 :         {
     118       130404 :             self.not_started
     119       130404 :                 .ready_to_flush()
     120       130404 :                 .wait_until_flush_is_done()
     121       130404 :                 .await;
     122            0 :         }
     123       130404 :     }
     124              : }
     125              : 
     126              : impl<Buf, W> FlushHandle<Buf, W>
     127              : where
     128              :     Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
     129              :     W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
     130              : {
     131              :     /// Spawns a new background flush task and obtains a handle.
     132              :     ///
     133              :     /// Handle and background task are connected through a duplex channel.
     134              :     /// Dirty buffers are sent to the background task for flushing.
     135              :     /// Clean buffers are sent back to the handle for reuse.
     136              :     ///
     137              :     /// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
     138              :     /// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
     139        20832 :     pub fn spawn_new<B>(
     140        20832 :         file: W,
     141        20832 :         buf: B,
     142        20832 :         gate_guard: utils::sync::gate::GateGuard,
     143        20832 :         cancel: CancellationToken,
     144        20832 :         ctx: RequestContext,
     145        20832 :         span: tracing::Span,
     146        20832 :     ) -> Self
     147        20832 :     where
     148        20832 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     149        20832 :     {
     150        20832 :         let (front, back) = duplex::mpsc::channel(1);
     151        20832 :         back.try_send(buf.flush())
     152        20832 :             .expect("we just created it with capacity 1");
     153        20832 : 
     154        20832 :         let join_handle = tokio::spawn(
     155        20832 :             FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx)
     156        20832 :                 .run()
     157        20832 :                 .instrument(span),
     158        20832 :         );
     159        20832 : 
     160        20832 :         FlushHandle {
     161        20832 :             inner: Some(FlushHandleInner {
     162        20832 :                 channel: front,
     163        20832 :                 join_handle,
     164        20832 :             }),
     165        20832 :         }
     166        20832 :     }
     167              : 
     168              :     /// Submits a buffer to be flushed in the background task.
     169              :     /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
     170              :     /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
     171              :     /// clear `maybe_flushed`.
     172       141648 :     pub async fn flush(
     173       141648 :         &mut self,
     174       141648 :         slice: FullSlice<Buf>,
     175       141648 :         offset: u64,
     176       141648 :     ) -> Result<(FullSlice<Buf>, FlushControl), FlushTaskError> {
     177       141648 :         let (request, flush_control) = new_flush_op(slice, offset);
     178       141648 : 
     179       141648 :         // Submits the buffer to the background task.
     180       141648 :         self.send(Request::Flush(request)).await?;
     181              : 
     182              :         // Wait for an available buffer from the background flush task.
     183              :         // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
     184              :         // then the write path will eventually wait for it here.
     185       141648 :         let Some(recycled) = self.inner_mut().channel.recv().await else {
     186            0 :             return self.handle_error().await;
     187              :         };
     188              : 
     189       141648 :         Ok((recycled, flush_control))
     190       141648 :     }
     191              : 
     192              :     /// Sends poison pill to flush task and waits for it to exit.
     193        11256 :     pub async fn shutdown(&mut self, req: ShutdownRequest) -> Result<W, FlushTaskError> {
     194        11256 :         self.send(Request::Shutdown(req)).await?;
     195        11256 :         self.wait().await
     196        11256 :     }
     197              : 
     198       152904 :     async fn send(&mut self, request: Request<Buf>) -> Result<(), FlushTaskError> {
     199       152904 :         let submit = self.inner_mut().channel.send(request).await;
     200       152904 :         if submit.is_err() {
     201            0 :             return self.handle_error().await;
     202       152904 :         }
     203       152904 :         Ok(())
     204       152904 :     }
     205              : 
     206            0 :     async fn handle_error<T>(&mut self) -> Result<T, FlushTaskError> {
     207            0 :         Err(self
     208            0 :             .wait()
     209            0 :             .await
     210            0 :             .expect_err("flush task only disconnects duplex if it exits with an error"))
     211            0 :     }
     212              : 
     213        11256 :     async fn wait(&mut self) -> Result<W, FlushTaskError> {
     214        11256 :         let handle = self
     215        11256 :             .inner
     216        11256 :             .take()
     217        11256 :             .expect("must not use after we returned an error");
     218        11256 :         drop(handle.channel.tx);
     219        11256 :         handle.join_handle.await.unwrap()
     220        11256 :     }
     221              : 
     222              :     /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
     223              :     /// This only happens if the handle is used after an error.
     224       294552 :     fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
     225       294552 :         self.inner
     226       294552 :             .as_mut()
     227       294552 :             .expect("must not use after we returned an error")
     228       294552 :     }
     229              : }
     230              : 
     231              : /// A background task for flushing data to disk.
     232              : pub struct FlushBackgroundTask<Buf, W> {
     233              :     /// A bi-directional channel that receives (buffer, offset) for writes,
     234              :     /// and send back recycled buffer.
     235              :     channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
     236              :     /// A writter for persisting data to disk.
     237              :     writer: W,
     238              :     ctx: RequestContext,
     239              :     cancel: CancellationToken,
     240              :     /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
     241              :     _gate_guard: utils::sync::gate::GateGuard,
     242              : }
     243              : 
     244              : #[derive(Debug, thiserror::Error)]
     245              : pub enum FlushTaskError {
     246              :     #[error("flush task cancelled")]
     247              :     Cancelled,
     248              : }
     249              : 
     250              : impl<Buf, W> FlushBackgroundTask<Buf, W>
     251              : where
     252              :     Buf: IoBufAligned + Send + Sync,
     253              :     W: OwnedAsyncWriter + Sync + 'static,
     254              : {
     255              :     /// Creates a new background flush task.
     256        20832 :     fn new(
     257        20832 :         channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
     258        20832 :         file: W,
     259        20832 :         gate_guard: utils::sync::gate::GateGuard,
     260        20832 :         cancel: CancellationToken,
     261        20832 :         ctx: RequestContext,
     262        20832 :     ) -> Self {
     263        20832 :         FlushBackgroundTask {
     264        20832 :             channel,
     265        20832 :             writer: file,
     266        20832 :             _gate_guard: gate_guard,
     267        20832 :             cancel,
     268        20832 :             ctx,
     269        20832 :         }
     270        20832 :     }
     271              : 
     272              :     /// Runs the background flush task.
     273        20832 :     async fn run(mut self) -> Result<W, FlushTaskError> {
     274              :         //  Exit condition: channel is closed and there is no remaining buffer to be flushed
     275       173658 :         while let Some(request) = self.channel.recv().await {
     276       152904 :             let op_kind = request.op_str();
     277       152904 : 
     278       152904 :             // Perform the requested operation.
     279       152904 :             //
     280       152904 :             // Error handling happens according to the current policy of crashing
     281       152904 :             // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
     282       152904 :             // (The upper layers of the Pageserver write path are not equipped to retry write errors
     283       152904 :             //  becasuse they often deallocate the buffers that were already written).
     284       152904 :             //
     285       152904 :             // TODO: use utils::backoff::retry once async closures are actually usable
     286       152904 :             //
     287       152904 :             let mut request_storage = Some(request);
     288       152904 :             for attempt in 1.. {
     289       152904 :                 if self.cancel.is_cancelled() {
     290            0 :                     return Err(FlushTaskError::Cancelled);
     291       152904 :                 }
     292       152904 :                 let result = async {
     293       152904 :                     let request: Request<Buf> = request_storage .take().expect(
     294       152904 :                         "likely previous invocation of this future didn't get polled to completion",
     295       152904 :                     );
     296        11256 :                     match &request {
     297              :                         Request::Shutdown(ShutdownRequest { set_len: None }) => {
     298        11160 :                             request_storage = Some(request);
     299        11160 :                             return ControlFlow::Break(());
     300              :                         },
     301       141744 :                         Request::Flush(_) | Request::Shutdown(ShutdownRequest { set_len: Some(_) }) => {
     302       141744 :                         },
     303       141744 :                     }
     304       141744 :                     if attempt > 1 {
     305            0 :                         warn!(op=%request.op_str(), "retrying");
     306       141744 :                     }
     307              :                     // borrows so we can async move the requests into async block while not moving these borrows here
     308       141744 :                     let writer = &self.writer;
     309       141744 :                     let request_storage = &mut request_storage;
     310       141744 :                     let ctx = &self.ctx;
     311       141744 :                     let io_fut = match request {
     312       141648 :                         Request::Flush(FlushRequest { slice, offset, #[cfg(test)] ready_to_flush_rx, #[cfg(test)] done_flush_tx }) => futures::future::Either::Left(async move {
     313              :                             #[cfg(test)]
     314       141648 :                             if let Some(ready_to_flush_rx) = ready_to_flush_rx {
     315              :                                 {
     316              :                                     // In test, wait for control to signal that we are ready to flush.
     317       141648 :                                     if ready_to_flush_rx.await.is_err() {
     318        11232 :                                         tracing::debug!("control dropped");
     319       130416 :                                     }
     320              :                                 }
     321            0 :                             }
     322       141648 :                             let (slice, res) = writer.write_all_at(slice, offset, ctx).await;
     323       141648 :                             *request_storage = Some(Request::Flush(FlushRequest {
     324       141648 :                                 slice,
     325       141648 :                                 offset,
     326       141648 :                                 #[cfg(test)]
     327       141648 :                                 ready_to_flush_rx: None, // the contract is that we notify before first attempt
     328       141648 :                                 #[cfg(test)]
     329       141648 :                                 done_flush_tx
     330       141648 :                             }));
     331       141648 :                             res
     332       141648 :                         }),
     333           96 :                         Request::Shutdown(ShutdownRequest { set_len }) => futures::future::Either::Right(async move {
     334           96 :                             let set_len = set_len.expect("we filter out the None case above");
     335           96 :                             let res = writer.set_len(set_len, ctx).await;
     336           96 :                             *request_storage = Some(Request::Shutdown(ShutdownRequest {
     337           96 :                                 set_len: Some(set_len),
     338           96 :                             }));
     339           96 :                             res
     340           96 :                         }),
     341              :                     };
     342              :                     // Don't cancel the io_fut by doing tokio::select with self.cancel.cancelled().
     343              :                     // The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources.
     344              :                     // If we retry indefinitely, we'll deplete those resources.
     345              :                     // Future: teach tokio-epoll-uring io_uring operation cancellation, but still,
     346              :                     // wait for cancelled ops to complete and discard their error.
     347       141744 :                     let res = io_fut.await;
     348       141744 :                     let res = res.maybe_fatal_err("owned_buffers_io flush");
     349       141744 :                     let Err(err) = res else {
     350       141744 :                         if attempt > 1 {
     351            0 :                             warn!(op=%op_kind, "retry succeeded");
     352       141744 :                         }
     353       141744 :                         return ControlFlow::Break(());
     354              :                     };
     355            0 :                     warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
     356            0 :                     utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await;
     357            0 :                     ControlFlow::Continue(())
     358       152904 :                 }
     359       152904 :                 .instrument(info_span!("attempt", %attempt, %op_kind))
     360       152904 :                 .await;
     361       152904 :                 match result {
     362       152904 :                     ControlFlow::Break(()) => break,
     363            0 :                     ControlFlow::Continue(()) => continue,
     364              :                 }
     365              :             }
     366       152904 :             let request = request_storage.expect("loop must have run at least once");
     367              : 
     368       152904 :             let slice = match request {
     369              :                 Request::Flush(FlushRequest {
     370       141648 :                     slice,
     371       141648 :                     #[cfg(test)]
     372       141648 :                     mut done_flush_tx,
     373       141648 :                     ..
     374       141648 :                 }) => {
     375       141648 :                     #[cfg(test)]
     376       141648 :                     {
     377       141648 :                         // In test, tell control we are done flushing buffer.
     378       141648 :                         if done_flush_tx.take().expect("always Some").send(()).is_err() {
     379        11162 :                             tracing::debug!("control dropped");
     380       130486 :                         }
     381            0 :                     }
     382       141648 :                     slice
     383              :                 }
     384              :                 Request::Shutdown(_) => {
     385              :                     // next iteration will observe recv() returning None
     386        11256 :                     continue;
     387              :                 }
     388              :             };
     389              : 
     390              :             // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
     391       141648 :             let send_res = self.channel.send(slice).await;
     392       141648 :             if send_res.is_err() {
     393              :                 // Although channel is closed. Still need to finish flushing the remaining buffers.
     394            0 :                 continue;
     395       141648 :             }
     396              :         }
     397              : 
     398        19859 :         Ok(self.writer)
     399        19859 :     }
     400              : }
     401              : 
     402              : #[cfg(test)]
     403              : pub(crate) struct FlushNotStarted {
     404              :     ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
     405              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     406              : }
     407              : 
     408              : #[cfg(test)]
     409              : pub(crate) struct FlushInProgress {
     410              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     411              : }
     412              : 
     413              : #[cfg(test)]
     414              : pub(crate) struct FlushDone;
     415              : 
     416              : #[cfg(test)]
     417              : impl FlushNotStarted {
     418              :     /// Signals the background task the buffer is ready to flush to disk.
     419       130416 :     pub fn ready_to_flush(self) -> FlushInProgress {
     420       130416 :         self.ready_to_flush_tx
     421       130416 :             .send(())
     422       130416 :             .map(|_| FlushInProgress {
     423       130416 :                 done_flush_rx: self.done_flush_rx,
     424       130416 :             })
     425       130416 :             .unwrap()
     426       130416 :     }
     427              : }
     428              : 
     429              : #[cfg(test)]
     430              : impl FlushInProgress {
     431              :     /// Waits until background flush is done.
     432       130416 :     pub async fn wait_until_flush_is_done(self) -> FlushDone {
     433       130416 :         self.done_flush_rx.await.unwrap();
     434       130416 :         FlushDone
     435       130416 :     }
     436              : }
        

Generated by: LCOV version 2.1-beta