LCOV - differential code coverage report
Current view: top level - pageserver/src/deletion_queue - deleter.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 85.4 % 96 82 14 82
Current Date: 2024-01-09 02:06:09 Functions: 80.0 % 15 12 3 12
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! The deleter is the final stage in the deletion queue.  It accumulates remote
       2                 : //! paths to delete, and periodically executes them in batches of up to 1000
       3                 : //! using the DeleteObjects request.
       4                 : //!
       5                 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
       6                 : //! number of full-sized DeleteObjects requests, rather than a larger number of
       7                 : //! smaller requests.
       8                 : 
       9                 : use remote_storage::GenericRemoteStorage;
      10                 : use remote_storage::RemotePath;
      11                 : use remote_storage::MAX_KEYS_PER_DELETE;
      12                 : use std::time::Duration;
      13                 : use tokio_util::sync::CancellationToken;
      14                 : use tracing::info;
      15                 : use tracing::warn;
      16                 : use utils::backoff;
      17                 : 
      18                 : use crate::metrics;
      19                 : 
      20                 : use super::DeletionQueueError;
      21                 : use super::FlushOp;
      22                 : 
      23                 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
      24                 : 
      25                 : pub(super) enum DeleterMessage {
      26                 :     Delete(Vec<RemotePath>),
      27                 :     Flush(FlushOp),
      28                 : }
      29                 : 
      30                 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
      31                 : /// larger DeleteObjects requests.
      32                 : pub(super) struct Deleter {
      33                 :     // Accumulate up to 1000 keys for the next deletion operation
      34                 :     accumulator: Vec<RemotePath>,
      35                 : 
      36                 :     rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
      37                 : 
      38                 :     cancel: CancellationToken,
      39                 :     remote_storage: GenericRemoteStorage,
      40                 : }
      41                 : 
      42                 : impl Deleter {
      43 CBC         561 :     pub(super) fn new(
      44             561 :         remote_storage: GenericRemoteStorage,
      45             561 :         rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
      46             561 :         cancel: CancellationToken,
      47             561 :     ) -> Self {
      48             561 :         Self {
      49             561 :             remote_storage,
      50             561 :             rx,
      51             561 :             cancel,
      52             561 :             accumulator: Vec::new(),
      53             561 :         }
      54             561 :     }
      55                 : 
      56                 :     /// Wrap the remote `delete_objects` with a failpoint
      57             362 :     async fn remote_delete(&self) -> Result<(), anyhow::Error> {
      58             362 :         // A backoff::retry is used here for two reasons:
      59             362 :         // - To provide a backoff rather than busy-polling the API on errors
      60             362 :         // - To absorb transient 429/503 conditions without hitting our error
      61             362 :         //   logging path for issues deleting objects.
      62             362 :         backoff::retry(
      63             477 :             || async {
      64             477 :                 fail::fail_point!("deletion-queue-before-execute", |_| {
      65               4 :                     info!("Skipping execution, failpoint set");
      66                 : 
      67               4 :                     metrics::DELETION_QUEUE
      68               4 :                         .remote_errors
      69               4 :                         .with_label_values(&["failpoint"])
      70               4 :                         .inc();
      71               4 :                     Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
      72             477 :                 });
      73                 : 
      74            9613 :                 self.remote_storage.delete_objects(&self.accumulator).await
      75             477 :             },
      76             362 :             |_| false,
      77             362 :             3,
      78             362 :             10,
      79             362 :             "executing deletion batch",
      80             362 :             backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
      81             362 :         )
      82            9613 :         .await
      83             360 :     }
      84                 : 
      85                 :     /// Block until everything in accumulator has been executed
      86             942 :     async fn flush(&mut self) -> Result<(), DeletionQueueError> {
      87            1302 :         while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
      88            9613 :             match self.remote_delete().await {
      89                 :                 Ok(()) => {
      90                 :                     // Note: we assume that the remote storage layer returns Ok(()) if some
      91                 :                     // or all of the deleted objects were already gone.
      92             360 :                     metrics::DELETION_QUEUE
      93             360 :                         .keys_executed
      94             360 :                         .inc_by(self.accumulator.len() as u64);
      95             360 :                     info!(
      96             360 :                         "Executed deletion batch {}..{}",
      97             360 :                         self.accumulator
      98             360 :                             .first()
      99             360 :                             .expect("accumulator should be non-empty"),
     100             360 :                         self.accumulator
     101             360 :                             .last()
     102             360 :                             .expect("accumulator should be non-empty"),
     103             360 :                     );
     104             360 :                     self.accumulator.clear();
     105                 :                 }
     106 UBC           0 :                 Err(e) => {
     107               0 :                     if self.cancel.is_cancelled() {
     108               0 :                         return Err(DeletionQueueError::ShuttingDown);
     109               0 :                     }
     110               0 :                     warn!("DeleteObjects request failed: {e:#}, will continue trying");
     111               0 :                     metrics::DELETION_QUEUE
     112               0 :                         .remote_errors
     113               0 :                         .with_label_values(&["execute"])
     114               0 :                         .inc();
     115                 :                 }
     116                 :             };
     117                 :         }
     118 CBC         940 :         if self.cancel.is_cancelled() {
     119                 :             // Expose an error because we may not have actually flushed everything
     120               1 :             Err(DeletionQueueError::ShuttingDown)
     121                 :         } else {
     122             939 :             Ok(())
     123                 :         }
     124             940 :     }
     125                 : 
     126             561 :     pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
     127             561 :         self.accumulator.reserve(MAX_KEYS_PER_DELETE);
     128                 : 
     129            1996 :         loop {
     130            1996 :             if self.cancel.is_cancelled() {
     131 UBC           0 :                 return Err(DeletionQueueError::ShuttingDown);
     132 CBC        1996 :             }
     133                 : 
     134            1996 :             let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
     135             910 :                 Ok(Some(m)) => m,
     136                 :                 Ok(None) => {
     137                 :                     // All queue senders closed
     138 UBC           0 :                     info!("Shutting down");
     139               0 :                     return Err(DeletionQueueError::ShuttingDown);
     140                 :                 }
     141                 :                 Err(_) => {
     142                 :                     // Timeout, we hit deadline to execute whatever we have in hand.  These functions will
     143                 :                     // return immediately if no work is pending
     144 CBC         528 :                     self.flush().await?;
     145                 : 
     146             527 :                     continue;
     147                 :                 }
     148                 :             };
     149                 : 
     150             910 :             match msg {
     151             496 :                 DeleterMessage::Delete(mut list) => {
     152             963 :                     while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
     153             467 :                         if self.accumulator.len() == MAX_KEYS_PER_DELETE {
     154 UBC           0 :                             self.flush().await?;
     155                 :                             // If we have received this number of keys, proceed with attempting to execute
     156               0 :                             assert_eq!(self.accumulator.len(), 0);
     157 CBC         467 :                         }
     158                 : 
     159             467 :                         let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
     160             467 :                         let take_count = std::cmp::min(available_slots, list.len());
     161            7324 :                         for path in list.drain(list.len() - take_count..) {
     162            7324 :                             self.accumulator.push(path);
     163            7324 :                         }
     164                 :                     }
     165                 :                 }
     166             414 :                 DeleterMessage::Flush(flush_op) => {
     167             414 :                     // If flush() errors, we drop the flush_op and the caller will get
     168             414 :                     // an error recv()'ing their oneshot channel.
     169            9613 :                     self.flush().await?;
     170             412 :                     flush_op.notify();
     171                 :                 }
     172                 :             }
     173                 :         }
     174               1 :     }
     175                 : }
        

Generated by: LCOV version 2.1-beta