Line data Source code
1 : //! The deleter is the final stage in the deletion queue. It accumulates remote
2 : //! paths to delete, and periodically executes them in batches of up to 1000
3 : //! using the DeleteObjects request.
4 : //!
5 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
6 : //! number of full-sized DeleteObjects requests, rather than a larger number of
7 : //! smaller requests.
8 :
9 : use remote_storage::GenericRemoteStorage;
10 : use remote_storage::RemotePath;
11 : use remote_storage::MAX_KEYS_PER_DELETE;
12 : use std::time::Duration;
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::info;
15 : use tracing::warn;
16 : use utils::backoff;
17 :
18 : use crate::metrics;
19 :
20 : use super::DeletionQueueError;
21 : use super::FlushOp;
22 :
23 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
24 :
25 : pub(super) enum DeleterMessage {
26 : Delete(Vec<RemotePath>),
27 : Flush(FlushOp),
28 : }
29 :
30 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
31 : /// larger DeleteObjects requests.
32 : pub(super) struct Deleter {
33 : // Accumulate up to 1000 keys for the next deletion operation
34 : accumulator: Vec<RemotePath>,
35 :
36 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
37 :
38 : cancel: CancellationToken,
39 : remote_storage: GenericRemoteStorage,
40 : }
41 :
42 : impl Deleter {
43 612 : pub(super) fn new(
44 612 : remote_storage: GenericRemoteStorage,
45 612 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
46 612 : cancel: CancellationToken,
47 612 : ) -> Self {
48 612 : Self {
49 612 : remote_storage,
50 612 : rx,
51 612 : cancel,
52 612 : accumulator: Vec::new(),
53 612 : }
54 612 : }
55 :
56 : /// Wrap the remote `delete_objects` with a failpoint
57 423 : async fn remote_delete(&self) -> Result<(), anyhow::Error> {
58 423 : // A backoff::retry is used here for two reasons:
59 423 : // - To provide a backoff rather than busy-polling the API on errors
60 423 : // - To absorb transient 429/503 conditions without hitting our error
61 423 : // logging path for issues deleting objects.
62 423 : backoff::retry(
63 539 : || async {
64 539 : fail::fail_point!("deletion-queue-before-execute", |_| {
65 4 : info!("Skipping execution, failpoint set");
66 :
67 4 : metrics::DELETION_QUEUE
68 4 : .remote_errors
69 4 : .with_label_values(&["failpoint"])
70 4 : .inc();
71 4 : Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
72 539 : });
73 :
74 10519 : self.remote_storage.delete_objects(&self.accumulator).await
75 539 : },
76 423 : |_| false,
77 423 : 3,
78 423 : 10,
79 423 : "executing deletion batch",
80 423 : &self.cancel,
81 423 : )
82 10519 : .await
83 421 : .ok_or_else(|| anyhow::anyhow!("Shutting down"))
84 421 : .and_then(|x| x)
85 421 : }
86 :
87 : /// Block until everything in accumulator has been executed
88 1072 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
89 1493 : while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
90 10519 : match self.remote_delete().await {
91 : Ok(()) => {
92 : // Note: we assume that the remote storage layer returns Ok(()) if some
93 : // or all of the deleted objects were already gone.
94 421 : metrics::DELETION_QUEUE
95 421 : .keys_executed
96 421 : .inc_by(self.accumulator.len() as u64);
97 421 : info!(
98 421 : "Executed deletion batch {}..{}",
99 421 : self.accumulator
100 421 : .first()
101 421 : .expect("accumulator should be non-empty"),
102 421 : self.accumulator
103 421 : .last()
104 421 : .expect("accumulator should be non-empty"),
105 421 : );
106 421 : self.accumulator.clear();
107 : }
108 0 : Err(e) => {
109 0 : if self.cancel.is_cancelled() {
110 0 : return Err(DeletionQueueError::ShuttingDown);
111 0 : }
112 0 : warn!("DeleteObjects request failed: {e:#}, will continue trying");
113 0 : metrics::DELETION_QUEUE
114 0 : .remote_errors
115 0 : .with_label_values(&["execute"])
116 0 : .inc();
117 : }
118 : };
119 : }
120 1070 : if self.cancel.is_cancelled() {
121 : // Expose an error because we may not have actually flushed everything
122 2 : Err(DeletionQueueError::ShuttingDown)
123 : } else {
124 1068 : Ok(())
125 : }
126 1070 : }
127 :
128 612 : pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
129 612 : self.accumulator.reserve(MAX_KEYS_PER_DELETE);
130 :
131 2447 : loop {
132 2447 : if self.cancel.is_cancelled() {
133 0 : return Err(DeletionQueueError::ShuttingDown);
134 2447 : }
135 :
136 2447 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
137 1263 : Ok(Some(m)) => m,
138 : Ok(None) => {
139 : // All queue senders closed
140 0 : info!("Shutting down");
141 0 : return Err(DeletionQueueError::ShuttingDown);
142 : }
143 : Err(_) => {
144 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
145 : // return immediately if no work is pending
146 576 : self.flush().await?;
147 :
148 574 : continue;
149 : }
150 : };
151 :
152 1263 : match msg {
153 767 : DeleterMessage::Delete(mut list) => {
154 1504 : while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
155 737 : if self.accumulator.len() == MAX_KEYS_PER_DELETE {
156 0 : self.flush().await?;
157 : // If we have received this number of keys, proceed with attempting to execute
158 0 : assert_eq!(self.accumulator.len(), 0);
159 737 : }
160 :
161 737 : let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
162 737 : let take_count = std::cmp::min(available_slots, list.len());
163 8338 : for path in list.drain(list.len() - take_count..) {
164 8338 : self.accumulator.push(path);
165 8338 : }
166 : }
167 : }
168 496 : DeleterMessage::Flush(flush_op) => {
169 496 : // If flush() errors, we drop the flush_op and the caller will get
170 496 : // an error recv()'ing their oneshot channel.
171 10519 : self.flush().await?;
172 494 : flush_op.notify();
173 : }
174 : }
175 : }
176 2 : }
177 : }
|