Line data Source code
1 : //! The deleter is the final stage in the deletion queue. It accumulates remote
2 : //! paths to delete, and periodically executes them in batches of up to 1000
3 : //! using the DeleteObjects request.
4 : //!
5 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
6 : //! number of full-sized DeleteObjects requests, rather than a larger number of
7 : //! smaller requests.
8 :
9 : use remote_storage::GenericRemoteStorage;
10 : use remote_storage::RemotePath;
11 : use remote_storage::MAX_KEYS_PER_DELETE;
12 : use std::time::Duration;
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::info;
15 : use tracing::warn;
16 : use utils::backoff;
17 :
18 : use crate::metrics;
19 :
20 : use super::DeletionQueueError;
21 : use super::FlushOp;
22 :
23 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
24 :
25 : pub(super) enum DeleterMessage {
26 : Delete(Vec<RemotePath>),
27 : Flush(FlushOp),
28 : }
29 :
30 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
31 : /// larger DeleteObjects requests.
32 : pub(super) struct Deleter {
33 : // Accumulate up to 1000 keys for the next deletion operation
34 : accumulator: Vec<RemotePath>,
35 :
36 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
37 :
38 : cancel: CancellationToken,
39 : remote_storage: GenericRemoteStorage,
40 : }
41 :
42 : impl Deleter {
43 633 : pub(super) fn new(
44 633 : remote_storage: GenericRemoteStorage,
45 633 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
46 633 : cancel: CancellationToken,
47 633 : ) -> Self {
48 633 : Self {
49 633 : remote_storage,
50 633 : rx,
51 633 : cancel,
52 633 : accumulator: Vec::new(),
53 633 : }
54 633 : }
55 :
56 : /// Wrap the remote `delete_objects` with a failpoint
57 419 : async fn remote_delete(&self) -> Result<(), anyhow::Error> {
58 419 : // A backoff::retry is used here for two reasons:
59 419 : // - To provide a backoff rather than busy-polling the API on errors
60 419 : // - To absorb transient 429/503 conditions without hitting our error
61 419 : // logging path for issues deleting objects.
62 419 : backoff::retry(
63 535 : || async {
64 535 : fail::fail_point!("deletion-queue-before-execute", |_| {
65 4 : info!("Skipping execution, failpoint set");
66 :
67 4 : metrics::DELETION_QUEUE
68 4 : .remote_errors
69 4 : .with_label_values(&["failpoint"])
70 4 : .inc();
71 4 : Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
72 535 : });
73 :
74 10146 : self.remote_storage.delete_objects(&self.accumulator).await
75 1070 : },
76 419 : |_| false,
77 419 : 3,
78 419 : 10,
79 419 : "executing deletion batch",
80 419 : &self.cancel,
81 419 : )
82 10146 : .await
83 417 : .ok_or_else(|| anyhow::anyhow!("Shutting down"))
84 417 : .and_then(|x| x)
85 417 : }
86 :
87 : /// Block until everything in accumulator has been executed
88 1130 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
89 1547 : while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
90 10146 : match self.remote_delete().await {
91 : Ok(()) => {
92 : // Note: we assume that the remote storage layer returns Ok(()) if some
93 : // or all of the deleted objects were already gone.
94 417 : metrics::DELETION_QUEUE
95 417 : .keys_executed
96 417 : .inc_by(self.accumulator.len() as u64);
97 417 : info!(
98 417 : "Executed deletion batch {}..{}",
99 417 : self.accumulator
100 417 : .first()
101 417 : .expect("accumulator should be non-empty"),
102 417 : self.accumulator
103 417 : .last()
104 417 : .expect("accumulator should be non-empty"),
105 417 : );
106 417 : self.accumulator.clear();
107 : }
108 0 : Err(e) => {
109 0 : if self.cancel.is_cancelled() {
110 0 : return Err(DeletionQueueError::ShuttingDown);
111 0 : }
112 0 : warn!("DeleteObjects request failed: {e:#}, will continue trying");
113 0 : metrics::DELETION_QUEUE
114 0 : .remote_errors
115 0 : .with_label_values(&["execute"])
116 0 : .inc();
117 : }
118 : };
119 : }
120 1128 : if self.cancel.is_cancelled() {
121 : // Expose an error because we may not have actually flushed everything
122 2 : Err(DeletionQueueError::ShuttingDown)
123 : } else {
124 1126 : Ok(())
125 : }
126 1128 : }
127 :
128 633 : pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
129 633 : self.accumulator.reserve(MAX_KEYS_PER_DELETE);
130 :
131 2523 : loop {
132 2523 : if self.cancel.is_cancelled() {
133 0 : return Err(DeletionQueueError::ShuttingDown);
134 2523 : }
135 :
136 2523 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
137 1261 : Ok(Some(m)) => m,
138 : Ok(None) => {
139 : // All queue senders closed
140 0 : info!("Shutting down");
141 0 : return Err(DeletionQueueError::ShuttingDown);
142 : }
143 : Err(_) => {
144 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
145 : // return immediately if no work is pending
146 633 : self.flush().await?;
147 :
148 631 : continue;
149 : }
150 : };
151 :
152 1261 : match msg {
153 764 : DeleterMessage::Delete(mut list) => {
154 1497 : while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
155 733 : if self.accumulator.len() == MAX_KEYS_PER_DELETE {
156 0 : self.flush().await?;
157 : // If we have received this number of keys, proceed with attempting to execute
158 0 : assert_eq!(self.accumulator.len(), 0);
159 733 : }
160 :
161 733 : let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
162 733 : let take_count = std::cmp::min(available_slots, list.len());
163 8023 : for path in list.drain(list.len() - take_count..) {
164 8023 : self.accumulator.push(path);
165 8023 : }
166 : }
167 : }
168 497 : DeleterMessage::Flush(flush_op) => {
169 497 : // If flush() errors, we drop the flush_op and the caller will get
170 497 : // an error recv()'ing their oneshot channel.
171 10146 : self.flush().await?;
172 495 : flush_op.notify();
173 : }
174 : }
175 : }
176 2 : }
177 : }
|