Line data Source code
1 : //! The deleter is the final stage in the deletion queue. It accumulates remote
2 : //! paths to delete, and periodically executes them in batches of up to 1000
3 : //! using the DeleteObjects request.
4 : //!
5 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
6 : //! number of full-sized DeleteObjects requests, rather than a larger number of
7 : //! smaller requests.
8 :
9 : use remote_storage::GenericRemoteStorage;
10 : use remote_storage::RemotePath;
11 : use remote_storage::MAX_KEYS_PER_DELETE;
12 : use std::time::Duration;
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::info;
15 : use tracing::warn;
16 : use utils::backoff;
17 :
18 : use crate::metrics;
19 :
20 : use super::DeletionQueueError;
21 : use super::FlushOp;
22 :
23 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
24 :
25 : pub(super) enum DeleterMessage {
26 : Delete(Vec<RemotePath>),
27 : Flush(FlushOp),
28 : }
29 :
30 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
31 : /// larger DeleteObjects requests.
32 : pub(super) struct Deleter {
33 : // Accumulate up to 1000 keys for the next deletion operation
34 : accumulator: Vec<RemotePath>,
35 :
36 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
37 :
38 : cancel: CancellationToken,
39 : remote_storage: GenericRemoteStorage,
40 : }
41 :
42 : impl Deleter {
43 632 : pub(super) fn new(
44 632 : remote_storage: GenericRemoteStorage,
45 632 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
46 632 : cancel: CancellationToken,
47 632 : ) -> Self {
48 632 : Self {
49 632 : remote_storage,
50 632 : rx,
51 632 : cancel,
52 632 : accumulator: Vec::new(),
53 632 : }
54 632 : }
55 :
56 : /// Wrap the remote `delete_objects` with a failpoint
57 415 : async fn remote_delete(&self) -> Result<(), anyhow::Error> {
58 415 : // A backoff::retry is used here for two reasons:
59 415 : // - To provide a backoff rather than busy-polling the API on errors
60 415 : // - To absorb transient 429/503 conditions without hitting our error
61 415 : // logging path for issues deleting objects.
62 415 : backoff::retry(
63 531 : || async {
64 531 : fail::fail_point!("deletion-queue-before-execute", |_| {
65 4 : info!("Skipping execution, failpoint set");
66 :
67 4 : metrics::DELETION_QUEUE
68 4 : .remote_errors
69 4 : .with_label_values(&["failpoint"])
70 4 : .inc();
71 4 : Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
72 531 : });
73 :
74 10085 : self.remote_storage.delete_objects(&self.accumulator).await
75 1062 : },
76 415 : |_| false,
77 415 : 3,
78 415 : 10,
79 415 : "executing deletion batch",
80 415 : &self.cancel,
81 415 : )
82 10085 : .await
83 413 : .ok_or_else(|| anyhow::anyhow!("Shutting down"))
84 413 : .and_then(|x| x)
85 413 : }
86 :
87 : /// Block until everything in accumulator has been executed
88 1070 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
89 1483 : while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
90 10085 : match self.remote_delete().await {
91 : Ok(()) => {
92 : // Note: we assume that the remote storage layer returns Ok(()) if some
93 : // or all of the deleted objects were already gone.
94 413 : metrics::DELETION_QUEUE
95 413 : .keys_executed
96 413 : .inc_by(self.accumulator.len() as u64);
97 413 : info!(
98 413 : "Executed deletion batch {}..{}",
99 413 : self.accumulator
100 413 : .first()
101 413 : .expect("accumulator should be non-empty"),
102 413 : self.accumulator
103 413 : .last()
104 413 : .expect("accumulator should be non-empty"),
105 413 : );
106 413 : self.accumulator.clear();
107 : }
108 0 : Err(e) => {
109 0 : if self.cancel.is_cancelled() {
110 0 : return Err(DeletionQueueError::ShuttingDown);
111 0 : }
112 0 : warn!("DeleteObjects request failed: {e:#}, will continue trying");
113 0 : metrics::DELETION_QUEUE
114 0 : .remote_errors
115 0 : .with_label_values(&["execute"])
116 0 : .inc();
117 : }
118 : };
119 : }
120 1068 : if self.cancel.is_cancelled() {
121 : // Expose an error because we may not have actually flushed everything
122 2 : Err(DeletionQueueError::ShuttingDown)
123 : } else {
124 1066 : Ok(())
125 : }
126 1068 : }
127 :
128 632 : pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
129 632 : self.accumulator.reserve(MAX_KEYS_PER_DELETE);
130 :
131 2458 : loop {
132 2458 : if self.cancel.is_cancelled() {
133 0 : return Err(DeletionQueueError::ShuttingDown);
134 2458 : }
135 :
136 2458 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
137 1255 : Ok(Some(m)) => m,
138 : Ok(None) => {
139 : // All queue senders closed
140 0 : info!("Shutting down");
141 0 : return Err(DeletionQueueError::ShuttingDown);
142 : }
143 : Err(_) => {
144 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
145 : // return immediately if no work is pending
146 575 : self.flush().await?;
147 :
148 573 : continue;
149 : }
150 : };
151 :
152 1255 : match msg {
153 760 : DeleterMessage::Delete(mut list) => {
154 1489 : while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
155 729 : if self.accumulator.len() == MAX_KEYS_PER_DELETE {
156 0 : self.flush().await?;
157 : // If we have received this number of keys, proceed with attempting to execute
158 0 : assert_eq!(self.accumulator.len(), 0);
159 729 : }
160 :
161 729 : let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
162 729 : let take_count = std::cmp::min(available_slots, list.len());
163 8071 : for path in list.drain(list.len() - take_count..) {
164 8071 : self.accumulator.push(path);
165 8071 : }
166 : }
167 : }
168 495 : DeleterMessage::Flush(flush_op) => {
169 495 : // If flush() errors, we drop the flush_op and the caller will get
170 495 : // an error recv()'ing their oneshot channel.
171 10085 : self.flush().await?;
172 493 : flush_op.notify();
173 : }
174 : }
175 : }
176 2 : }
177 : }
|