LCOV - code coverage report
Current view: top level - pageserver/src/deletion_queue - deleter.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 73.0 % 100 73
Test Date: 2025-03-12 18:28:53 Functions: 83.3 % 12 10

            Line data    Source code
       1              : //! The deleter is the final stage in the deletion queue.  It accumulates remote
       2              : //! paths to delete, and periodically executes them in batches of up to 1000
       3              : //! using the DeleteObjects request.
       4              : //!
       5              : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
       6              : //! number of full-sized DeleteObjects requests, rather than a larger number of
       7              : //! smaller requests.
       8              : 
       9              : use std::time::Duration;
      10              : 
      11              : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
      12              : use tokio_util::sync::CancellationToken;
      13              : use tracing::{info, warn};
      14              : use utils::{backoff, pausable_failpoint};
      15              : 
      16              : use super::{DeletionQueueError, FlushOp};
      17              : use crate::metrics;
      18              : 
      19              : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
      20              : 
      21              : pub(super) enum DeleterMessage {
      22              :     Delete(Vec<RemotePath>),
      23              :     Flush(FlushOp),
      24              : }
      25              : 
      26              : /// Non-persistent deletion queue, for coalescing multiple object deletes into
      27              : /// larger DeleteObjects requests.
      28              : pub(super) struct Deleter {
      29              :     // Accumulate up to 1000 keys for the next deletion operation
      30              :     accumulator: Vec<RemotePath>,
      31              : 
      32              :     rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
      33              : 
      34              :     cancel: CancellationToken,
      35              :     remote_storage: GenericRemoteStorage,
      36              : }
      37              : 
      38              : impl Deleter {
      39           16 :     pub(super) fn new(
      40           16 :         remote_storage: GenericRemoteStorage,
      41           16 :         rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
      42           16 :         cancel: CancellationToken,
      43           16 :     ) -> Self {
      44           16 :         Self {
      45           16 :             remote_storage,
      46           16 :             rx,
      47           16 :             cancel,
      48           16 :             accumulator: Vec::new(),
      49           16 :         }
      50           16 :     }
      51              : 
      52              :     /// Wrap the remote `delete_objects` with a failpoint
      53           12 :     async fn remote_delete(&self) -> Result<(), anyhow::Error> {
      54           12 :         // A backoff::retry is used here for two reasons:
      55           12 :         // - To provide a backoff rather than busy-polling the API on errors
      56           12 :         // - To absorb transient 429/503 conditions without hitting our error
      57           12 :         //   logging path for issues deleting objects.
      58           12 :         backoff::retry(
      59           12 :             || async {
      60           12 :                 fail::fail_point!("deletion-queue-before-execute", |_| {
      61            0 :                     info!("Skipping execution, failpoint set");
      62              : 
      63            0 :                     metrics::DELETION_QUEUE
      64            0 :                         .remote_errors
      65            0 :                         .with_label_values(&["failpoint"])
      66            0 :                         .inc();
      67            0 :                     Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
      68           12 :                 });
      69              : 
      70           12 :                 self.remote_storage
      71           12 :                     .delete_objects(&self.accumulator, &self.cancel)
      72           12 :                     .await
      73           24 :             },
      74           12 :             TimeoutOrCancel::caused_by_cancel,
      75           12 :             3,
      76           12 :             10,
      77           12 :             "executing deletion batch",
      78           12 :             &self.cancel,
      79           12 :         )
      80           12 :         .await
      81           12 :         .ok_or_else(|| anyhow::anyhow!("Shutting down"))
      82           12 :         .and_then(|x| x)
      83           12 :     }
      84              : 
      85              :     /// Block until everything in accumulator has been executed
      86           44 :     async fn flush(&mut self) -> Result<(), DeletionQueueError> {
      87           56 :         while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
      88           12 :             pausable_failpoint!("deletion-queue-before-execute-pause");
      89           12 :             match self.remote_delete().await {
      90              :                 Ok(()) => {
      91              :                     // Note: we assume that the remote storage layer returns Ok(()) if some
      92              :                     // or all of the deleted objects were already gone.
      93           12 :                     metrics::DELETION_QUEUE
      94           12 :                         .keys_executed
      95           12 :                         .inc_by(self.accumulator.len() as u64);
      96           12 :                     info!(
      97            0 :                         "Executed deletion batch {}..{}",
      98            0 :                         self.accumulator
      99            0 :                             .first()
     100            0 :                             .expect("accumulator should be non-empty"),
     101            0 :                         self.accumulator
     102            0 :                             .last()
     103            0 :                             .expect("accumulator should be non-empty"),
     104              :                     );
     105           12 :                     self.accumulator.clear();
     106              :                 }
     107            0 :                 Err(e) => {
     108            0 :                     if self.cancel.is_cancelled() {
     109            0 :                         return Err(DeletionQueueError::ShuttingDown);
     110            0 :                     }
     111            0 :                     warn!("DeleteObjects request failed: {e:#}, will continue trying");
     112            0 :                     metrics::DELETION_QUEUE
     113            0 :                         .remote_errors
     114            0 :                         .with_label_values(&["execute"])
     115            0 :                         .inc();
     116              :                 }
     117              :             };
     118              :         }
     119           44 :         if self.cancel.is_cancelled() {
     120              :             // Expose an error because we may not have actually flushed everything
     121            4 :             Err(DeletionQueueError::ShuttingDown)
     122              :         } else {
     123           40 :             Ok(())
     124              :         }
     125           44 :     }
     126              : 
     127           16 :     pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
     128           16 :         let max_keys_per_delete = self.remote_storage.max_keys_per_delete();
     129           16 :         self.accumulator.reserve(max_keys_per_delete);
     130              : 
     131              :         loop {
     132           76 :             if self.cancel.is_cancelled() {
     133            0 :                 return Err(DeletionQueueError::ShuttingDown);
     134           76 :             }
     135              : 
     136           76 :             let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
     137           56 :                 Ok(Some(m)) => m,
     138              :                 Ok(None) => {
     139              :                     // All queue senders closed
     140            0 :                     info!("Shutting down");
     141            0 :                     return Err(DeletionQueueError::ShuttingDown);
     142              :                 }
     143              :                 Err(_) => {
     144              :                     // Timeout, we hit deadline to execute whatever we have in hand.  These functions will
     145              :                     // return immediately if no work is pending
     146            8 :                     self.flush().await?;
     147              : 
     148            4 :                     continue;
     149              :                 }
     150              :             };
     151              : 
     152           56 :             match msg {
     153           20 :                 DeleterMessage::Delete(mut list) => {
     154           32 :                     while !list.is_empty() || self.accumulator.len() == max_keys_per_delete {
     155           12 :                         if self.accumulator.len() == max_keys_per_delete {
     156            0 :                             self.flush().await?;
     157              :                             // If we have received this number of keys, proceed with attempting to execute
     158            0 :                             assert_eq!(self.accumulator.len(), 0);
     159           12 :                         }
     160              : 
     161           12 :                         let available_slots = max_keys_per_delete - self.accumulator.len();
     162           12 :                         let take_count = std::cmp::min(available_slots, list.len());
     163           12 :                         for path in list.drain(list.len() - take_count..) {
     164           12 :                             self.accumulator.push(path);
     165           12 :                         }
     166              :                     }
     167              :                 }
     168           36 :                 DeleterMessage::Flush(flush_op) => {
     169           36 :                     // If flush() errors, we drop the flush_op and the caller will get
     170           36 :                     // an error recv()'ing their oneshot channel.
     171           36 :                     self.flush().await?;
     172           36 :                     flush_op.notify();
     173              :                 }
     174              :             }
     175              :         }
     176            4 :     }
     177              : }
        

Generated by: LCOV version 2.1-beta