LCOV - differential code coverage report
Current view: top level - pageserver/src/deletion_queue - deleter.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 93.3 % 90 84 6 84
Current Date: 2023-10-19 02:04:12 Functions: 87.5 % 16 14 2 14
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! The deleter is the final stage in the deletion queue.  It accumulates remote
       2                 : //! paths to delete, and periodically executes them in batches of up to 1000
       3                 : //! using the DeleteObjects request.
       4                 : //!
       5                 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
       6                 : //! number of full-sized DeleteObjects requests, rather than a larger number of
       7                 : //! smaller requests.
       8                 : 
       9                 : use remote_storage::GenericRemoteStorage;
      10                 : use remote_storage::RemotePath;
      11                 : use remote_storage::MAX_KEYS_PER_DELETE;
      12                 : use std::time::Duration;
      13                 : use tokio_util::sync::CancellationToken;
      14                 : use tracing::info;
      15                 : use tracing::warn;
      16                 : use utils::backoff;
      17                 : 
      18                 : use crate::metrics;
      19                 : 
      20                 : use super::DeletionQueueError;
      21                 : use super::FlushOp;
      22                 : 
      23                 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
      24                 : 
      25                 : pub(super) enum DeleterMessage {
      26                 :     Delete(Vec<RemotePath>),
      27                 :     Flush(FlushOp),
      28                 : }
      29                 : 
      30                 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
      31                 : /// larger DeleteObjects requests.
      32                 : pub(super) struct Deleter {
      33                 :     // Accumulate up to 1000 keys for the next deletion operation
      34                 :     accumulator: Vec<RemotePath>,
      35                 : 
      36                 :     rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
      37                 : 
      38                 :     cancel: CancellationToken,
      39                 :     remote_storage: GenericRemoteStorage,
      40                 : }
      41                 : 
      42                 : impl Deleter {
      43 CBC         564 :     pub(super) fn new(
      44             564 :         remote_storage: GenericRemoteStorage,
      45             564 :         rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
      46             564 :         cancel: CancellationToken,
      47             564 :     ) -> Self {
      48             564 :         Self {
      49             564 :             remote_storage,
      50             564 :             rx,
      51             564 :             cancel,
      52             564 :             accumulator: Vec::new(),
      53             564 :         }
      54             564 :     }
      55                 : 
      56                 :     /// Wrap the remote `delete_objects` with a failpoint
      57           18953 :     async fn remote_delete(&self) -> Result<(), anyhow::Error> {
      58           18937 :         fail::fail_point!("deletion-queue-before-execute", |_| {
      59           18314 :             info!("Skipping execution, failpoint set");
      60           18314 :             metrics::DELETION_QUEUE
      61           18314 :                 .remote_errors
      62           18314 :                 .with_label_values(&["failpoint"])
      63           18314 :                 .inc();
      64           18314 :             Err(anyhow::anyhow!("failpoint hit"))
      65           18937 :         });
      66                 : 
      67                 :         // A backoff::retry is used here for two reasons:
      68                 :         // - To provide a backoff rather than busy-polling the API on errors
      69                 :         // - To absorb transient 429/503 conditions without hitting our error
      70                 :         //   logging path for issues deleting objects.
      71             623 :         backoff::retry(
      72           11524 :             || async { self.remote_storage.delete_objects(&self.accumulator).await },
      73             623 :             |_| false,
      74             623 :             3,
      75             623 :             10,
      76             623 :             "executing deletion batch",
      77             623 :             backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
      78             623 :         )
      79           11524 :         .await
      80           18937 :     }
      81                 : 
      82                 :     /// Block until everything in accumulator has been executed
      83            1242 :     async fn flush(&mut self) -> Result<(), DeletionQueueError> {
      84           20177 :         while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
      85           18937 :             match self.remote_delete().await {
      86                 :                 Ok(()) => {
      87                 :                     // Note: we assume that the remote storage layer returns Ok(()) if some
      88                 :                     // or all of the deleted objects were already gone.
      89             623 :                     metrics::DELETION_QUEUE
      90             623 :                         .keys_executed
      91             623 :                         .inc_by(self.accumulator.len() as u64);
      92             623 :                     info!(
      93             623 :                         "Executed deletion batch {}..{}",
      94             623 :                         self.accumulator
      95             623 :                             .first()
      96             623 :                             .expect("accumulator should be non-empty"),
      97             623 :                         self.accumulator
      98             623 :                             .last()
      99             623 :                             .expect("accumulator should be non-empty"),
     100             623 :                     );
     101             623 :                     self.accumulator.clear();
     102                 :                 }
     103           18312 :                 Err(e) => {
     104           18312 :                     if self.cancel.is_cancelled() {
     105 UBC           0 :                         return Err(DeletionQueueError::ShuttingDown);
     106 CBC       18312 :                     }
     107           18312 :                     warn!("DeleteObjects request failed: {e:#}, will continue trying");
     108           18312 :                     metrics::DELETION_QUEUE
     109           18312 :                         .remote_errors
     110           18312 :                         .with_label_values(&["execute"])
     111           18312 :                         .inc();
     112                 :                 }
     113                 :             };
     114                 :         }
     115            1240 :         if self.cancel.is_cancelled() {
     116                 :             // Expose an error because we may not have actually flushed everything
     117               1 :             Err(DeletionQueueError::ShuttingDown)
     118                 :         } else {
     119            1239 :             Ok(())
     120                 :         }
     121            1240 :     }
     122                 : 
     123             564 :     pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
     124             564 :         self.accumulator.reserve(MAX_KEYS_PER_DELETE);
     125                 : 
     126            2805 :         loop {
     127            2805 :             if self.cancel.is_cancelled() {
     128 UBC           0 :                 return Err(DeletionQueueError::ShuttingDown);
     129 CBC        2805 :             }
     130                 : 
     131            2805 :             let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
     132            1967 :                 Ok(Some(m)) => m,
     133                 :                 Ok(None) => {
     134                 :                     // All queue senders closed
     135 UBC           0 :                     info!("Shutting down");
     136               0 :                     return Err(DeletionQueueError::ShuttingDown);
     137                 :                 }
     138                 :                 Err(_) => {
     139                 :                     // Timeout, we hit deadline to execute whatever we have in hand.  These functions will
     140                 :                     // return immediately if no work is pending
     141 CBC         277 :                     self.flush().await?;
     142                 : 
     143             276 :                     continue;
     144                 :                 }
     145                 :             };
     146                 : 
     147            1967 :             match msg {
     148            1002 :                 DeleterMessage::Delete(mut list) => {
     149            1675 :                     while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
     150             673 :                         if self.accumulator.len() == MAX_KEYS_PER_DELETE {
     151 UBC           0 :                             self.flush().await?;
     152                 :                             // If we have received this number of keys, proceed with attempting to execute
     153               0 :                             assert_eq!(self.accumulator.len(), 0);
     154 CBC         673 :                         }
     155                 : 
     156             673 :                         let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
     157             673 :                         let take_count = std::cmp::min(available_slots, list.len());
     158           11429 :                         for path in list.drain(list.len() - take_count..) {
     159           11429 :                             self.accumulator.push(path);
     160           11429 :                         }
     161                 :                     }
     162                 :                 }
     163             965 :                 DeleterMessage::Flush(flush_op) => {
     164             965 :                     // If flush() errors, we drop the flush_op and the caller will get
     165             965 :                     // an error recv()'ing their oneshot channel.
     166           11524 :                     self.flush().await?;
     167             963 :                     flush_op.notify();
     168                 :                 }
     169                 :             }
     170                 :         }
     171               1 :     }
     172                 : }
        

Generated by: LCOV version 2.1-beta