LCOV - code coverage report
Current view: top level - pageserver/src/deletion_queue - deleter.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 85.7 % 98 84
Test Date: 2024-02-07 07:37:29 Functions: 81.2 % 16 13

            Line data    Source code
       1              : //! The deleter is the final stage in the deletion queue.  It accumulates remote
       2              : //! paths to delete, and periodically executes them in batches of up to 1000
       3              : //! using the DeleteObjects request.
       4              : //!
       5              : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
       6              : //! number of full-sized DeleteObjects requests, rather than a larger number of
       7              : //! smaller requests.
       8              : 
       9              : use remote_storage::GenericRemoteStorage;
      10              : use remote_storage::RemotePath;
      11              : use remote_storage::MAX_KEYS_PER_DELETE;
      12              : use std::time::Duration;
      13              : use tokio_util::sync::CancellationToken;
      14              : use tracing::info;
      15              : use tracing::warn;
      16              : use utils::backoff;
      17              : 
      18              : use crate::metrics;
      19              : 
      20              : use super::DeletionQueueError;
      21              : use super::FlushOp;
      22              : 
      23              : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
      24              : 
      25              : pub(super) enum DeleterMessage {
      26              :     Delete(Vec<RemotePath>),
      27              :     Flush(FlushOp),
      28              : }
      29              : 
      30              : /// Non-persistent deletion queue, for coalescing multiple object deletes into
      31              : /// larger DeleteObjects requests.
      32              : pub(super) struct Deleter {
      33              :     // Accumulate up to 1000 keys for the next deletion operation
      34              :     accumulator: Vec<RemotePath>,
      35              : 
      36              :     rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
      37              : 
      38              :     cancel: CancellationToken,
      39              :     remote_storage: GenericRemoteStorage,
      40              : }
      41              : 
      42              : impl Deleter {
      43          612 :     pub(super) fn new(
      44          612 :         remote_storage: GenericRemoteStorage,
      45          612 :         rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
      46          612 :         cancel: CancellationToken,
      47          612 :     ) -> Self {
      48          612 :         Self {
      49          612 :             remote_storage,
      50          612 :             rx,
      51          612 :             cancel,
      52          612 :             accumulator: Vec::new(),
      53          612 :         }
      54          612 :     }
      55              : 
      56              :     /// Wrap the remote `delete_objects` with a failpoint
      57          423 :     async fn remote_delete(&self) -> Result<(), anyhow::Error> {
      58          423 :         // A backoff::retry is used here for two reasons:
      59          423 :         // - To provide a backoff rather than busy-polling the API on errors
      60          423 :         // - To absorb transient 429/503 conditions without hitting our error
      61          423 :         //   logging path for issues deleting objects.
      62          423 :         backoff::retry(
      63          539 :             || async {
      64          539 :                 fail::fail_point!("deletion-queue-before-execute", |_| {
      65            4 :                     info!("Skipping execution, failpoint set");
      66              : 
      67            4 :                     metrics::DELETION_QUEUE
      68            4 :                         .remote_errors
      69            4 :                         .with_label_values(&["failpoint"])
      70            4 :                         .inc();
      71            4 :                     Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
      72          539 :                 });
      73              : 
      74        10519 :                 self.remote_storage.delete_objects(&self.accumulator).await
      75          539 :             },
      76          423 :             |_| false,
      77          423 :             3,
      78          423 :             10,
      79          423 :             "executing deletion batch",
      80          423 :             &self.cancel,
      81          423 :         )
      82        10519 :         .await
      83          421 :         .ok_or_else(|| anyhow::anyhow!("Shutting down"))
      84          421 :         .and_then(|x| x)
      85          421 :     }
      86              : 
      87              :     /// Block until everything in accumulator has been executed
      88         1072 :     async fn flush(&mut self) -> Result<(), DeletionQueueError> {
      89         1493 :         while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
      90        10519 :             match self.remote_delete().await {
      91              :                 Ok(()) => {
      92              :                     // Note: we assume that the remote storage layer returns Ok(()) if some
      93              :                     // or all of the deleted objects were already gone.
      94          421 :                     metrics::DELETION_QUEUE
      95          421 :                         .keys_executed
      96          421 :                         .inc_by(self.accumulator.len() as u64);
      97          421 :                     info!(
      98          421 :                         "Executed deletion batch {}..{}",
      99          421 :                         self.accumulator
     100          421 :                             .first()
     101          421 :                             .expect("accumulator should be non-empty"),
     102          421 :                         self.accumulator
     103          421 :                             .last()
     104          421 :                             .expect("accumulator should be non-empty"),
     105          421 :                     );
     106          421 :                     self.accumulator.clear();
     107              :                 }
     108            0 :                 Err(e) => {
     109            0 :                     if self.cancel.is_cancelled() {
     110            0 :                         return Err(DeletionQueueError::ShuttingDown);
     111            0 :                     }
     112            0 :                     warn!("DeleteObjects request failed: {e:#}, will continue trying");
     113            0 :                     metrics::DELETION_QUEUE
     114            0 :                         .remote_errors
     115            0 :                         .with_label_values(&["execute"])
     116            0 :                         .inc();
     117              :                 }
     118              :             };
     119              :         }
     120         1070 :         if self.cancel.is_cancelled() {
     121              :             // Expose an error because we may not have actually flushed everything
     122            2 :             Err(DeletionQueueError::ShuttingDown)
     123              :         } else {
     124         1068 :             Ok(())
     125              :         }
     126         1070 :     }
     127              : 
     128          612 :     pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
     129          612 :         self.accumulator.reserve(MAX_KEYS_PER_DELETE);
     130              : 
     131         2447 :         loop {
     132         2447 :             if self.cancel.is_cancelled() {
     133            0 :                 return Err(DeletionQueueError::ShuttingDown);
     134         2447 :             }
     135              : 
     136         2447 :             let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
     137         1263 :                 Ok(Some(m)) => m,
     138              :                 Ok(None) => {
     139              :                     // All queue senders closed
     140            0 :                     info!("Shutting down");
     141            0 :                     return Err(DeletionQueueError::ShuttingDown);
     142              :                 }
     143              :                 Err(_) => {
     144              :                     // Timeout, we hit deadline to execute whatever we have in hand.  These functions will
     145              :                     // return immediately if no work is pending
     146          576 :                     self.flush().await?;
     147              : 
     148          574 :                     continue;
     149              :                 }
     150              :             };
     151              : 
     152         1263 :             match msg {
     153          767 :                 DeleterMessage::Delete(mut list) => {
     154         1504 :                     while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
     155          737 :                         if self.accumulator.len() == MAX_KEYS_PER_DELETE {
     156            0 :                             self.flush().await?;
     157              :                             // If we have received this number of keys, proceed with attempting to execute
     158            0 :                             assert_eq!(self.accumulator.len(), 0);
     159          737 :                         }
     160              : 
     161          737 :                         let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
     162          737 :                         let take_count = std::cmp::min(available_slots, list.len());
     163         8338 :                         for path in list.drain(list.len() - take_count..) {
     164         8338 :                             self.accumulator.push(path);
     165         8338 :                         }
     166              :                     }
     167              :                 }
     168          496 :                 DeleterMessage::Flush(flush_op) => {
     169          496 :                     // If flush() errors, we drop the flush_op and the caller will get
     170          496 :                     // an error recv()'ing their oneshot channel.
     171        10519 :                     self.flush().await?;
     172          494 :                     flush_op.notify();
     173              :                 }
     174              :             }
     175              :         }
     176            2 :     }
     177              : }
        

Generated by: LCOV version 2.1-beta