Line data Source code
1 : //! The deleter is the final stage in the deletion queue. It accumulates remote
2 : //! paths to delete, and periodically executes them in batches of up to 1000
3 : //! using the DeleteObjects request.
4 : //!
5 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
6 : //! number of full-sized DeleteObjects requests, rather than a larger number of
7 : //! smaller requests.
8 :
9 : use remote_storage::GenericRemoteStorage;
10 : use remote_storage::RemotePath;
11 : use remote_storage::TimeoutOrCancel;
12 : use remote_storage::MAX_KEYS_PER_DELETE;
13 : use std::time::Duration;
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::info;
16 : use tracing::warn;
17 : use utils::backoff;
18 :
19 : use crate::metrics;
20 :
21 : use super::DeletionQueueError;
22 : use super::FlushOp;
23 :
24 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
25 :
26 : pub(super) enum DeleterMessage {
27 : Delete(Vec<RemotePath>),
28 : Flush(FlushOp),
29 : }
30 :
31 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
32 : /// larger DeleteObjects requests.
33 : pub(super) struct Deleter {
34 : // Accumulate up to 1000 keys for the next deletion operation
35 : accumulator: Vec<RemotePath>,
36 :
37 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
38 :
39 : cancel: CancellationToken,
40 : remote_storage: GenericRemoteStorage,
41 : }
42 :
43 : impl Deleter {
44 24 : pub(super) fn new(
45 24 : remote_storage: GenericRemoteStorage,
46 24 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
47 24 : cancel: CancellationToken,
48 24 : ) -> Self {
49 24 : Self {
50 24 : remote_storage,
51 24 : rx,
52 24 : cancel,
53 24 : accumulator: Vec::new(),
54 24 : }
55 24 : }
56 :
57 : /// Wrap the remote `delete_objects` with a failpoint
58 18 : async fn remote_delete(&self) -> Result<(), anyhow::Error> {
59 18 : // A backoff::retry is used here for two reasons:
60 18 : // - To provide a backoff rather than busy-polling the API on errors
61 18 : // - To absorb transient 429/503 conditions without hitting our error
62 18 : // logging path for issues deleting objects.
63 18 : backoff::retry(
64 18 : || async {
65 18 : fail::fail_point!("deletion-queue-before-execute", |_| {
66 0 : info!("Skipping execution, failpoint set");
67 :
68 0 : metrics::DELETION_QUEUE
69 0 : .remote_errors
70 0 : .with_label_values(&["failpoint"])
71 0 : .inc();
72 0 : Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
73 18 : });
74 :
75 18 : self.remote_storage
76 18 : .delete_objects(&self.accumulator, &self.cancel)
77 18 : .await
78 36 : },
79 18 : TimeoutOrCancel::caused_by_cancel,
80 18 : 3,
81 18 : 10,
82 18 : "executing deletion batch",
83 18 : &self.cancel,
84 18 : )
85 18 : .await
86 18 : .ok_or_else(|| anyhow::anyhow!("Shutting down"))
87 18 : .and_then(|x| x)
88 18 : }
89 :
90 : /// Block until everything in accumulator has been executed
91 66 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
92 84 : while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
93 18 : match self.remote_delete().await {
94 : Ok(()) => {
95 : // Note: we assume that the remote storage layer returns Ok(()) if some
96 : // or all of the deleted objects were already gone.
97 18 : metrics::DELETION_QUEUE
98 18 : .keys_executed
99 18 : .inc_by(self.accumulator.len() as u64);
100 18 : info!(
101 0 : "Executed deletion batch {}..{}",
102 0 : self.accumulator
103 0 : .first()
104 0 : .expect("accumulator should be non-empty"),
105 0 : self.accumulator
106 0 : .last()
107 0 : .expect("accumulator should be non-empty"),
108 : );
109 18 : self.accumulator.clear();
110 : }
111 0 : Err(e) => {
112 0 : if self.cancel.is_cancelled() {
113 0 : return Err(DeletionQueueError::ShuttingDown);
114 0 : }
115 0 : warn!("DeleteObjects request failed: {e:#}, will continue trying");
116 0 : metrics::DELETION_QUEUE
117 0 : .remote_errors
118 0 : .with_label_values(&["execute"])
119 0 : .inc();
120 : }
121 : };
122 : }
123 66 : if self.cancel.is_cancelled() {
124 : // Expose an error because we may not have actually flushed everything
125 6 : Err(DeletionQueueError::ShuttingDown)
126 : } else {
127 60 : Ok(())
128 : }
129 66 : }
130 :
131 24 : pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
132 24 : self.accumulator.reserve(MAX_KEYS_PER_DELETE);
133 :
134 : loop {
135 114 : if self.cancel.is_cancelled() {
136 0 : return Err(DeletionQueueError::ShuttingDown);
137 114 : }
138 :
139 114 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
140 84 : Ok(Some(m)) => m,
141 : Ok(None) => {
142 : // All queue senders closed
143 0 : info!("Shutting down");
144 0 : return Err(DeletionQueueError::ShuttingDown);
145 : }
146 : Err(_) => {
147 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
148 : // return immediately if no work is pending
149 12 : self.flush().await?;
150 :
151 6 : continue;
152 : }
153 : };
154 :
155 84 : match msg {
156 30 : DeleterMessage::Delete(mut list) => {
157 48 : while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
158 18 : if self.accumulator.len() == MAX_KEYS_PER_DELETE {
159 0 : self.flush().await?;
160 : // If we have received this number of keys, proceed with attempting to execute
161 0 : assert_eq!(self.accumulator.len(), 0);
162 18 : }
163 :
164 18 : let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
165 18 : let take_count = std::cmp::min(available_slots, list.len());
166 18 : for path in list.drain(list.len() - take_count..) {
167 18 : self.accumulator.push(path);
168 18 : }
169 : }
170 : }
171 54 : DeleterMessage::Flush(flush_op) => {
172 54 : // If flush() errors, we drop the flush_op and the caller will get
173 54 : // an error recv()'ing their oneshot channel.
174 54 : self.flush().await?;
175 54 : flush_op.notify();
176 : }
177 : }
178 : }
179 6 : }
180 : }
|