LCOV - code coverage report
Current view: top level - pageserver/src/deletion_queue - deleter.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 72.7 % 99 72
Test Date: 2024-11-25 17:48:16 Functions: 83.3 % 12 10

            Line data    Source code
       1              : //! The deleter is the final stage in the deletion queue.  It accumulates remote
       2              : //! paths to delete, and periodically executes them in batches of up to 1000
       3              : //! using the DeleteObjects request.
       4              : //!
       5              : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
       6              : //! number of full-sized DeleteObjects requests, rather than a larger number of
       7              : //! smaller requests.
       8              : 
       9              : use remote_storage::GenericRemoteStorage;
      10              : use remote_storage::RemotePath;
      11              : use remote_storage::TimeoutOrCancel;
      12              : use remote_storage::MAX_KEYS_PER_DELETE;
      13              : use std::time::Duration;
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::info;
      16              : use tracing::warn;
      17              : use utils::backoff;
      18              : use utils::pausable_failpoint;
      19              : 
      20              : use crate::metrics;
      21              : 
      22              : use super::DeletionQueueError;
      23              : use super::FlushOp;
      24              : 
      25              : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
      26              : 
      27              : pub(super) enum DeleterMessage {
      28              :     Delete(Vec<RemotePath>),
      29              :     Flush(FlushOp),
      30              : }
      31              : 
      32              : /// Non-persistent deletion queue, for coalescing multiple object deletes into
      33              : /// larger DeleteObjects requests.
      34              : pub(super) struct Deleter {
      35              :     // Accumulate up to 1000 keys for the next deletion operation
      36              :     accumulator: Vec<RemotePath>,
      37              : 
      38              :     rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
      39              : 
      40              :     cancel: CancellationToken,
      41              :     remote_storage: GenericRemoteStorage,
      42              : }
      43              : 
      44              : impl Deleter {
      45            8 :     pub(super) fn new(
      46            8 :         remote_storage: GenericRemoteStorage,
      47            8 :         rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
      48            8 :         cancel: CancellationToken,
      49            8 :     ) -> Self {
      50            8 :         Self {
      51            8 :             remote_storage,
      52            8 :             rx,
      53            8 :             cancel,
      54            8 :             accumulator: Vec::new(),
      55            8 :         }
      56            8 :     }
      57              : 
      58              :     /// Wrap the remote `delete_objects` with a failpoint
      59            6 :     async fn remote_delete(&self) -> Result<(), anyhow::Error> {
      60            6 :         // A backoff::retry is used here for two reasons:
      61            6 :         // - To provide a backoff rather than busy-polling the API on errors
      62            6 :         // - To absorb transient 429/503 conditions without hitting our error
      63            6 :         //   logging path for issues deleting objects.
      64            6 :         backoff::retry(
      65            6 :             || async {
      66            6 :                 fail::fail_point!("deletion-queue-before-execute", |_| {
      67            0 :                     info!("Skipping execution, failpoint set");
      68              : 
      69            0 :                     metrics::DELETION_QUEUE
      70            0 :                         .remote_errors
      71            0 :                         .with_label_values(&["failpoint"])
      72            0 :                         .inc();
      73            0 :                     Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
      74            6 :                 });
      75              : 
      76            6 :                 self.remote_storage
      77            6 :                     .delete_objects(&self.accumulator, &self.cancel)
      78            5 :                     .await
      79           12 :             },
      80            6 :             TimeoutOrCancel::caused_by_cancel,
      81            6 :             3,
      82            6 :             10,
      83            6 :             "executing deletion batch",
      84            6 :             &self.cancel,
      85            6 :         )
      86            5 :         .await
      87            6 :         .ok_or_else(|| anyhow::anyhow!("Shutting down"))
      88            6 :         .and_then(|x| x)
      89            6 :     }
      90              : 
      91              :     /// Block until everything in accumulator has been executed
      92           22 :     async fn flush(&mut self) -> Result<(), DeletionQueueError> {
      93           28 :         while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
      94            6 :             pausable_failpoint!("deletion-queue-before-execute-pause");
      95            6 :             match self.remote_delete().await {
      96              :                 Ok(()) => {
      97              :                     // Note: we assume that the remote storage layer returns Ok(()) if some
      98              :                     // or all of the deleted objects were already gone.
      99            6 :                     metrics::DELETION_QUEUE
     100            6 :                         .keys_executed
     101            6 :                         .inc_by(self.accumulator.len() as u64);
     102            6 :                     info!(
     103            0 :                         "Executed deletion batch {}..{}",
     104            0 :                         self.accumulator
     105            0 :                             .first()
     106            0 :                             .expect("accumulator should be non-empty"),
     107            0 :                         self.accumulator
     108            0 :                             .last()
     109            0 :                             .expect("accumulator should be non-empty"),
     110              :                     );
     111            6 :                     self.accumulator.clear();
     112              :                 }
     113            0 :                 Err(e) => {
     114            0 :                     if self.cancel.is_cancelled() {
     115            0 :                         return Err(DeletionQueueError::ShuttingDown);
     116            0 :                     }
     117            0 :                     warn!("DeleteObjects request failed: {e:#}, will continue trying");
     118            0 :                     metrics::DELETION_QUEUE
     119            0 :                         .remote_errors
     120            0 :                         .with_label_values(&["execute"])
     121            0 :                         .inc();
     122              :                 }
     123              :             };
     124              :         }
     125           22 :         if self.cancel.is_cancelled() {
     126              :             // Expose an error because we may not have actually flushed everything
     127            2 :             Err(DeletionQueueError::ShuttingDown)
     128              :         } else {
     129           20 :             Ok(())
     130              :         }
     131           22 :     }
     132              : 
     133            8 :     pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
     134            8 :         self.accumulator.reserve(MAX_KEYS_PER_DELETE);
     135              : 
     136              :         loop {
     137           38 :             if self.cancel.is_cancelled() {
     138            0 :                 return Err(DeletionQueueError::ShuttingDown);
     139           38 :             }
     140              : 
     141           38 :             let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
     142           28 :                 Ok(Some(m)) => m,
     143              :                 Ok(None) => {
     144              :                     // All queue senders closed
     145            0 :                     info!("Shutting down");
     146            0 :                     return Err(DeletionQueueError::ShuttingDown);
     147              :                 }
     148              :                 Err(_) => {
     149              :                     // Timeout, we hit deadline to execute whatever we have in hand.  These functions will
     150              :                     // return immediately if no work is pending
     151            4 :                     self.flush().await?;
     152              : 
     153            2 :                     continue;
     154              :                 }
     155              :             };
     156              : 
     157           28 :             match msg {
     158           10 :                 DeleterMessage::Delete(mut list) => {
     159           16 :                     while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
     160            6 :                         if self.accumulator.len() == MAX_KEYS_PER_DELETE {
     161            0 :                             self.flush().await?;
     162              :                             // If we have received this number of keys, proceed with attempting to execute
     163            0 :                             assert_eq!(self.accumulator.len(), 0);
     164            6 :                         }
     165              : 
     166            6 :                         let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
     167            6 :                         let take_count = std::cmp::min(available_slots, list.len());
     168            6 :                         for path in list.drain(list.len() - take_count..) {
     169            6 :                             self.accumulator.push(path);
     170            6 :                         }
     171              :                     }
     172              :                 }
     173           18 :                 DeleterMessage::Flush(flush_op) => {
     174           18 :                     // If flush() errors, we drop the flush_op and the caller will get
     175           18 :                     // an error recv()'ing their oneshot channel.
     176           18 :                     self.flush().await?;
     177           18 :                     flush_op.notify();
     178              :                 }
     179              :             }
     180              :         }
     181            2 :     }
     182              : }
        

Generated by: LCOV version 2.1-beta