LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/write - flush.rs (source / functions) Coverage Total Hit
Test: 5445d246133daeceb0507e6cc0797ab7c1c70cb8.info Lines: 85.0 % 187 159
Test Date: 2025-03-12 18:05:02 Functions: 81.6 % 38 31

            Line data    Source code
       1              : use std::ops::ControlFlow;
       2              : use std::sync::Arc;
       3              : 
       4              : use once_cell::sync::Lazy;
       5              : use tokio_util::sync::CancellationToken;
       6              : use tracing::{Instrument, info, info_span, warn};
       7              : use utils::sync::duplex;
       8              : 
       9              : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
      10              : use crate::context::RequestContext;
      11              : use crate::virtual_file::MaybeFatalIo;
      12              : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
      13              : use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
      14              : 
      15              : /// A handle to the flush task.
      16              : pub struct FlushHandle<Buf, W> {
      17              :     inner: Option<FlushHandleInner<Buf, W>>,
      18              :     /// Immutable buffer for serving tail reads.
      19              :     /// `None` if no flush request has been submitted.
      20              :     pub(super) maybe_flushed: Option<FullSlice<Buf>>,
      21              : }
      22              : 
      23              : pub struct FlushHandleInner<Buf, W> {
      24              :     /// A bi-directional channel that sends (buffer, offset) for writes,
      25              :     /// and receives recyled buffer.
      26              :     channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
      27              :     /// Join handle for the background flush task.
      28              :     join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
      29              : }
      30              : 
      31              : struct FlushRequest<Buf> {
      32              :     slice: FullSlice<Buf>,
      33              :     offset: u64,
      34              :     #[cfg(test)]
      35              :     ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
      36              :     #[cfg(test)]
      37              :     done_flush_tx: tokio::sync::oneshot::Sender<()>,
      38              : }
      39              : 
      40              : /// Constructs a request and a control object for a new flush operation.
      41              : #[cfg(not(test))]
      42            0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      43            0 :     let request = FlushRequest { slice, offset };
      44            0 :     let control = FlushControl::untracked();
      45            0 : 
      46            0 :     (request, control)
      47            0 : }
      48              : 
      49              : /// Constructs a request and a control object for a new flush operation.
      50              : #[cfg(test)]
      51        13246 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
      52        13246 :     let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
      53        13246 :     let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
      54        13246 :     let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
      55        13246 : 
      56        13246 :     let request = FlushRequest {
      57        13246 :         slice,
      58        13246 :         offset,
      59        13246 :         ready_to_flush_rx,
      60        13246 :         done_flush_tx,
      61        13246 :     };
      62        13246 :     (request, control)
      63        13246 : }
      64              : 
      65              : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
      66              : #[cfg(test)]
      67              : pub(crate) struct FlushControl {
      68              :     not_started: FlushNotStarted,
      69              : }
      70              : 
      71              : #[cfg(not(test))]
      72              : pub(crate) struct FlushControl;
      73              : 
      74              : impl FlushControl {
      75              :     #[cfg(test)]
      76        13246 :     fn not_started(
      77        13246 :         ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
      78        13246 :         done_flush_rx: tokio::sync::oneshot::Receiver<()>,
      79        13246 :     ) -> Self {
      80        13246 :         FlushControl {
      81        13246 :             not_started: FlushNotStarted {
      82        13246 :                 ready_to_flush_tx,
      83        13246 :                 done_flush_rx,
      84        13246 :             },
      85        13246 :         }
      86        13246 :     }
      87              : 
      88              :     #[cfg(not(test))]
      89            0 :     fn untracked() -> Self {
      90            0 :         FlushControl
      91            0 :     }
      92              : 
      93              :     /// In tests, turn flush control into a not started state.
      94              :     #[cfg(test)]
      95            4 :     pub(crate) fn into_not_started(self) -> FlushNotStarted {
      96            4 :         self.not_started
      97            4 :     }
      98              : 
      99              :     /// Release control to the submitted buffer.
     100              :     ///
     101              :     /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
     102        13224 :     pub async fn release(self) {
     103        13224 :         #[cfg(test)]
     104        13224 :         {
     105        13224 :             self.not_started
     106        13224 :                 .ready_to_flush()
     107        13224 :                 .wait_until_flush_is_done()
     108        13224 :                 .await;
     109            0 :         }
     110        13224 :     }
     111              : }
     112              : 
     113              : impl<Buf, W> FlushHandle<Buf, W>
     114              : where
     115              :     Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
     116              :     W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
     117              : {
     118              :     /// Spawns a new background flush task and obtains a handle.
     119              :     ///
     120              :     /// Note: The background task so we do not need to explicitly maintain a queue of buffers.
     121         2650 :     pub fn spawn_new<B>(
     122         2650 :         file: Arc<W>,
     123         2650 :         buf: B,
     124         2650 :         gate_guard: utils::sync::gate::GateGuard,
     125         2650 :         ctx: RequestContext,
     126         2650 :         span: tracing::Span,
     127         2650 :     ) -> Self
     128         2650 :     where
     129         2650 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     130         2650 :     {
     131         2650 :         // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
     132         2650 :         let (front, back) = duplex::mpsc::channel(1);
     133         2650 : 
     134         2650 :         let join_handle = tokio::spawn(
     135         2650 :             async move {
     136         2628 :                 FlushBackgroundTask::new(back, file, gate_guard, ctx)
     137         2628 :                     .run(buf.flush())
     138         2628 :                     .await
     139         2650 :             }
     140         2650 :             .instrument(span),
     141         2650 :         );
     142         2650 : 
     143         2650 :         FlushHandle {
     144         2650 :             inner: Some(FlushHandleInner {
     145         2650 :                 channel: front,
     146         2650 :                 join_handle,
     147         2650 :             }),
     148         2650 :             maybe_flushed: None,
     149         2650 :         }
     150         2650 :     }
     151              : 
     152              :     /// Submits a buffer to be flushed in the background task.
     153              :     /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
     154              :     /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
     155              :     /// clear `maybe_flushed`.
     156        13246 :     pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
     157        13246 :     where
     158        13246 :         B: Buffer<IoBuf = Buf> + Send + 'static,
     159        13246 :     {
     160        13246 :         let slice = buf.flush();
     161        13246 : 
     162        13246 :         // Saves a buffer for read while flushing. This also removes reference to the old buffer.
     163        13246 :         self.maybe_flushed = Some(slice.cheap_clone());
     164        13246 : 
     165        13246 :         let (request, flush_control) = new_flush_op(slice, offset);
     166              : 
     167              :         // Submits the buffer to the background task.
     168        13246 :         let submit = self.inner_mut().channel.send(request).await;
     169        13246 :         if submit.is_err() {
     170            0 :             return self.handle_error().await;
     171        13246 :         }
     172              : 
     173              :         // Wait for an available buffer from the background flush task.
     174              :         // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
     175              :         // then the write path will eventually wait for it here.
     176        13246 :         let Some(recycled) = self.inner_mut().channel.recv().await else {
     177            0 :             return self.handle_error().await;
     178              :         };
     179              : 
     180              :         // The only other place that could hold a reference to the recycled buffer
     181              :         // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
     182        13246 :         let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
     183        13246 :         Ok((recycled, flush_control))
     184        13246 :     }
     185              : 
     186            0 :     async fn handle_error<T>(&mut self) -> std::io::Result<T> {
     187            0 :         Err(self
     188            0 :             .shutdown()
     189            0 :             .await
     190            0 :             .expect_err("flush task only disconnects duplex if it exits with an error"))
     191            0 :     }
     192              : 
     193              :     /// Cleans up the channel, join the flush task.
     194           18 :     pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
     195           18 :         let handle = self
     196           18 :             .inner
     197           18 :             .take()
     198           18 :             .expect("must not use after we returned an error");
     199           18 :         drop(handle.channel.tx);
     200           18 :         handle.join_handle.await.unwrap()
     201           18 :     }
     202              : 
     203              :     /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
     204              :     /// This only happens if the handle is used after an error.
     205        26492 :     fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
     206        26492 :         self.inner
     207        26492 :             .as_mut()
     208        26492 :             .expect("must not use after we returned an error")
     209        26492 :     }
     210              : }
     211              : 
     212              : /// A background task for flushing data to disk.
     213              : pub struct FlushBackgroundTask<Buf, W> {
     214              :     /// A bi-directional channel that receives (buffer, offset) for writes,
     215              :     /// and send back recycled buffer.
     216              :     channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
     217              :     /// A writter for persisting data to disk.
     218              :     writer: Arc<W>,
     219              :     ctx: RequestContext,
     220              :     /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
     221              :     _gate_guard: utils::sync::gate::GateGuard,
     222              : }
     223              : 
     224              : impl<Buf, W> FlushBackgroundTask<Buf, W>
     225              : where
     226              :     Buf: IoBufAligned + Send + Sync,
     227              :     W: OwnedAsyncWriter + Sync + 'static,
     228              : {
     229              :     /// Creates a new background flush task.
     230         2628 :     fn new(
     231         2628 :         channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
     232         2628 :         file: Arc<W>,
     233         2628 :         gate_guard: utils::sync::gate::GateGuard,
     234         2628 :         ctx: RequestContext,
     235         2628 :     ) -> Self {
     236         2628 :         FlushBackgroundTask {
     237         2628 :             channel,
     238         2628 :             writer: file,
     239         2628 :             _gate_guard: gate_guard,
     240         2628 :             ctx,
     241         2628 :         }
     242         2628 :     }
     243              : 
     244              :     /// Runs the background flush task.
     245              :     /// The passed in slice is immediately sent back to the flush handle through the duplex channel.
     246         2628 :     async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
     247         2628 :         // Sends the extra buffer back to the handle.
     248         2628 :         // TODO: can this ever await and or fail? I think not.
     249         2628 :         self.channel.send(slice).await.map_err(|_| {
     250            0 :             std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
     251         2628 :         })?;
     252              : 
     253              :         //  Exit condition: channel is closed and there is no remaining buffer to be flushed
     254        15874 :         while let Some(request) = self.channel.recv().await {
     255              :             #[cfg(test)]
     256              :             {
     257              :                 // In test, wait for control to signal that we are ready to flush.
     258        13246 :                 if request.ready_to_flush_rx.await.is_err() {
     259           18 :                     tracing::debug!("control dropped");
     260        13228 :                 }
     261              :             }
     262              : 
     263              :             // Write slice to disk at `offset`.
     264              :             //
     265              :             // Error handling happens according to the current policy of crashing
     266              :             // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
     267              :             // (The upper layers of the Pageserver write path are not equipped to retry write errors
     268              :             //  becasuse they often deallocate the buffers that were already written).
     269              :             //
     270              :             // TODO: cancellation sensitiity.
     271              :             // Without it, if we hit a bug where retrying is never successful,
     272              :             // then we can't shut down the timeline/tenant/pageserver cleanly because
     273              :             // layers of the Pageserver write path are holding the gate open for EphemeralFile.
     274              :             //
     275              :             // TODO: use utils::backoff::retry once async closures are actually usable
     276              :             //
     277        13246 :             let mut slice_storage = Some(request.slice);
     278        13246 :             for attempt in 1.. {
     279        13246 :                 let result = async {
     280        13246 :                     if attempt > 1 {
     281            0 :                         info!("retrying flush");
     282        13246 :                     }
     283        13246 :                     let slice = slice_storage.take().expect(
     284        13246 :                         "likely previous invocation of this future didn't get polled to completion",
     285        13246 :                     );
     286        13246 :                     let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
     287        13246 :                     slice_storage = Some(slice);
     288        13246 :                     let res = res.maybe_fatal_err("owned_buffers_io flush");
     289        13246 :                     let Err(err) = res else {
     290        13246 :                         return ControlFlow::Break(());
     291              :                     };
     292            0 :                     warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
     293              :                     static NO_CANCELLATION: Lazy<CancellationToken> = Lazy::new(CancellationToken::new);
     294            0 :                     utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await;
     295            0 :                     ControlFlow::Continue(())
     296        13246 :                 }
     297        13246 :                 .instrument(info_span!("flush_attempt", %attempt))
     298        13246 :                 .await;
     299        13246 :                 match result {
     300        13246 :                     ControlFlow::Break(()) => break,
     301            0 :                     ControlFlow::Continue(()) => continue,
     302              :                 }
     303              :             }
     304        13246 :             let slice = slice_storage.expect("loop must have run at least once");
     305        13246 : 
     306        13246 :             #[cfg(test)]
     307        13246 :             {
     308        13246 :                 // In test, tell control we are done flushing buffer.
     309        13246 :                 if request.done_flush_tx.send(()).is_err() {
     310           18 :                     tracing::debug!("control dropped");
     311        13228 :                 }
     312            0 :             }
     313            0 : 
     314            0 :             // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
     315        13246 :             if self.channel.send(slice).await.is_err() {
     316              :                 // Although channel is closed. Still need to finish flushing the remaining buffers.
     317            0 :                 continue;
     318        13246 :             }
     319              :         }
     320              : 
     321         2327 :         Ok(self.writer)
     322         2327 :     }
     323              : }
     324              : 
     325              : #[cfg(test)]
     326              : pub(crate) struct FlushNotStarted {
     327              :     ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
     328              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     329              : }
     330              : 
     331              : #[cfg(test)]
     332              : pub(crate) struct FlushInProgress {
     333              :     done_flush_rx: tokio::sync::oneshot::Receiver<()>,
     334              : }
     335              : 
     336              : #[cfg(test)]
     337              : pub(crate) struct FlushDone;
     338              : 
     339              : #[cfg(test)]
     340              : impl FlushNotStarted {
     341              :     /// Signals the background task the buffer is ready to flush to disk.
     342        13228 :     pub fn ready_to_flush(self) -> FlushInProgress {
     343        13228 :         self.ready_to_flush_tx
     344        13228 :             .send(())
     345        13228 :             .map(|_| FlushInProgress {
     346        13228 :                 done_flush_rx: self.done_flush_rx,
     347        13228 :             })
     348        13228 :             .unwrap()
     349        13228 :     }
     350              : }
     351              : 
     352              : #[cfg(test)]
     353              : impl FlushInProgress {
     354              :     /// Waits until background flush is done.
     355        13228 :     pub async fn wait_until_flush_is_done(self) -> FlushDone {
     356        13228 :         self.done_flush_rx.await.unwrap();
     357        13228 :         FlushDone
     358        13228 :     }
     359              : }
        

Generated by: LCOV version 2.1-beta