Line data Source code
1 : //! The deleter is the final stage in the deletion queue. It accumulates remote
2 : //! paths to delete, and periodically executes them in batches of up to 1000
3 : //! using the DeleteObjects request.
4 : //!
5 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
6 : //! number of full-sized DeleteObjects requests, rather than a larger number of
7 : //! smaller requests.
8 :
9 : use remote_storage::GenericRemoteStorage;
10 : use remote_storage::RemotePath;
11 : use remote_storage::TimeoutOrCancel;
12 : use remote_storage::MAX_KEYS_PER_DELETE;
13 : use std::time::Duration;
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::info;
16 : use tracing::warn;
17 : use utils::backoff;
18 : use utils::pausable_failpoint;
19 :
20 : use crate::metrics;
21 :
22 : use super::DeletionQueueError;
23 : use super::FlushOp;
24 :
25 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
26 :
27 : pub(super) enum DeleterMessage {
28 : Delete(Vec<RemotePath>),
29 : Flush(FlushOp),
30 : }
31 :
32 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
33 : /// larger DeleteObjects requests.
34 : pub(super) struct Deleter {
35 : // Accumulate up to 1000 keys for the next deletion operation
36 : accumulator: Vec<RemotePath>,
37 :
38 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
39 :
40 : cancel: CancellationToken,
41 : remote_storage: GenericRemoteStorage,
42 : }
43 :
44 : impl Deleter {
45 8 : pub(super) fn new(
46 8 : remote_storage: GenericRemoteStorage,
47 8 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
48 8 : cancel: CancellationToken,
49 8 : ) -> Self {
50 8 : Self {
51 8 : remote_storage,
52 8 : rx,
53 8 : cancel,
54 8 : accumulator: Vec::new(),
55 8 : }
56 8 : }
57 :
58 : /// Wrap the remote `delete_objects` with a failpoint
59 6 : async fn remote_delete(&self) -> Result<(), anyhow::Error> {
60 6 : // A backoff::retry is used here for two reasons:
61 6 : // - To provide a backoff rather than busy-polling the API on errors
62 6 : // - To absorb transient 429/503 conditions without hitting our error
63 6 : // logging path for issues deleting objects.
64 6 : backoff::retry(
65 6 : || async {
66 6 : fail::fail_point!("deletion-queue-before-execute", |_| {
67 0 : info!("Skipping execution, failpoint set");
68 :
69 0 : metrics::DELETION_QUEUE
70 0 : .remote_errors
71 0 : .with_label_values(&["failpoint"])
72 0 : .inc();
73 0 : Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
74 6 : });
75 :
76 6 : self.remote_storage
77 6 : .delete_objects(&self.accumulator, &self.cancel)
78 5 : .await
79 12 : },
80 6 : TimeoutOrCancel::caused_by_cancel,
81 6 : 3,
82 6 : 10,
83 6 : "executing deletion batch",
84 6 : &self.cancel,
85 6 : )
86 5 : .await
87 6 : .ok_or_else(|| anyhow::anyhow!("Shutting down"))
88 6 : .and_then(|x| x)
89 6 : }
90 :
91 : /// Block until everything in accumulator has been executed
92 22 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
93 28 : while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
94 6 : pausable_failpoint!("deletion-queue-before-execute-pause");
95 6 : match self.remote_delete().await {
96 : Ok(()) => {
97 : // Note: we assume that the remote storage layer returns Ok(()) if some
98 : // or all of the deleted objects were already gone.
99 6 : metrics::DELETION_QUEUE
100 6 : .keys_executed
101 6 : .inc_by(self.accumulator.len() as u64);
102 6 : info!(
103 0 : "Executed deletion batch {}..{}",
104 0 : self.accumulator
105 0 : .first()
106 0 : .expect("accumulator should be non-empty"),
107 0 : self.accumulator
108 0 : .last()
109 0 : .expect("accumulator should be non-empty"),
110 : );
111 6 : self.accumulator.clear();
112 : }
113 0 : Err(e) => {
114 0 : if self.cancel.is_cancelled() {
115 0 : return Err(DeletionQueueError::ShuttingDown);
116 0 : }
117 0 : warn!("DeleteObjects request failed: {e:#}, will continue trying");
118 0 : metrics::DELETION_QUEUE
119 0 : .remote_errors
120 0 : .with_label_values(&["execute"])
121 0 : .inc();
122 : }
123 : };
124 : }
125 22 : if self.cancel.is_cancelled() {
126 : // Expose an error because we may not have actually flushed everything
127 2 : Err(DeletionQueueError::ShuttingDown)
128 : } else {
129 20 : Ok(())
130 : }
131 22 : }
132 :
133 8 : pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
134 8 : self.accumulator.reserve(MAX_KEYS_PER_DELETE);
135 :
136 : loop {
137 38 : if self.cancel.is_cancelled() {
138 0 : return Err(DeletionQueueError::ShuttingDown);
139 38 : }
140 :
141 38 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
142 28 : Ok(Some(m)) => m,
143 : Ok(None) => {
144 : // All queue senders closed
145 0 : info!("Shutting down");
146 0 : return Err(DeletionQueueError::ShuttingDown);
147 : }
148 : Err(_) => {
149 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
150 : // return immediately if no work is pending
151 4 : self.flush().await?;
152 :
153 2 : continue;
154 : }
155 : };
156 :
157 28 : match msg {
158 10 : DeleterMessage::Delete(mut list) => {
159 16 : while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
160 6 : if self.accumulator.len() == MAX_KEYS_PER_DELETE {
161 0 : self.flush().await?;
162 : // If we have received this number of keys, proceed with attempting to execute
163 0 : assert_eq!(self.accumulator.len(), 0);
164 6 : }
165 :
166 6 : let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
167 6 : let take_count = std::cmp::min(available_slots, list.len());
168 6 : for path in list.drain(list.len() - take_count..) {
169 6 : self.accumulator.push(path);
170 6 : }
171 : }
172 : }
173 18 : DeleterMessage::Flush(flush_op) => {
174 18 : // If flush() errors, we drop the flush_op and the caller will get
175 18 : // an error recv()'ing their oneshot channel.
176 18 : self.flush().await?;
177 18 : flush_op.notify();
178 : }
179 : }
180 : }
181 2 : }
182 : }
|