Line data Source code
1 : //! The deleter is the final stage in the deletion queue. It accumulates remote
2 : //! paths to delete, and periodically executes them in batches of up to 1000
3 : //! using the DeleteObjects request.
4 : //!
5 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
6 : //! number of full-sized DeleteObjects requests, rather than a larger number of
7 : //! smaller requests.
8 :
9 : use std::time::Duration;
10 :
11 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{info, warn};
14 : use utils::{backoff, pausable_failpoint};
15 :
16 : use super::{DeletionQueueError, FlushOp};
17 : use crate::metrics;
18 :
19 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
20 :
21 : pub(super) enum DeleterMessage {
22 : Delete(Vec<RemotePath>),
23 : Flush(FlushOp),
24 : }
25 :
26 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
27 : /// larger DeleteObjects requests.
28 : pub(super) struct Deleter {
29 : // Accumulate up to 1000 keys for the next deletion operation
30 : accumulator: Vec<RemotePath>,
31 :
32 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
33 :
34 : cancel: CancellationToken,
35 : remote_storage: GenericRemoteStorage,
36 : }
37 :
38 : impl Deleter {
39 4 : pub(super) fn new(
40 4 : remote_storage: GenericRemoteStorage,
41 4 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
42 4 : cancel: CancellationToken,
43 4 : ) -> Self {
44 4 : Self {
45 4 : remote_storage,
46 4 : rx,
47 4 : cancel,
48 4 : accumulator: Vec::new(),
49 4 : }
50 4 : }
51 :
52 : /// Wrap the remote `delete_objects` with a failpoint
53 3 : async fn remote_delete(&self) -> Result<(), anyhow::Error> {
54 : // A backoff::retry is used here for two reasons:
55 : // - To provide a backoff rather than busy-polling the API on errors
56 : // - To absorb transient 429/503 conditions without hitting our error
57 : // logging path for issues deleting objects.
58 3 : backoff::retry(
59 3 : || async {
60 3 : fail::fail_point!("deletion-queue-before-execute", |_| {
61 0 : info!("Skipping execution, failpoint set");
62 :
63 0 : metrics::DELETION_QUEUE
64 0 : .remote_errors
65 0 : .with_label_values(&["failpoint"])
66 0 : .inc();
67 0 : Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
68 0 : });
69 :
70 3 : self.remote_storage
71 3 : .delete_objects(&self.accumulator, &self.cancel)
72 3 : .await
73 6 : },
74 : TimeoutOrCancel::caused_by_cancel,
75 : 3,
76 : 10,
77 3 : "executing deletion batch",
78 3 : &self.cancel,
79 : )
80 3 : .await
81 3 : .ok_or_else(|| anyhow::anyhow!("Shutting down"))
82 3 : .and_then(|x| x)
83 3 : }
84 :
85 : /// Block until everything in accumulator has been executed
86 11 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
87 14 : while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
88 3 : pausable_failpoint!("deletion-queue-before-execute-pause");
89 3 : match self.remote_delete().await {
90 : Ok(()) => {
91 : // Note: we assume that the remote storage layer returns Ok(()) if some
92 : // or all of the deleted objects were already gone.
93 3 : metrics::DELETION_QUEUE
94 3 : .keys_executed
95 3 : .inc_by(self.accumulator.len() as u64);
96 3 : info!(
97 0 : "Executed deletion batch {}..{}",
98 0 : self.accumulator
99 0 : .first()
100 0 : .expect("accumulator should be non-empty"),
101 0 : self.accumulator
102 0 : .last()
103 0 : .expect("accumulator should be non-empty"),
104 : );
105 3 : self.accumulator.clear();
106 : }
107 0 : Err(e) => {
108 0 : if self.cancel.is_cancelled() {
109 0 : return Err(DeletionQueueError::ShuttingDown);
110 0 : }
111 0 : warn!("DeleteObjects request failed: {e:#}, will continue trying");
112 0 : metrics::DELETION_QUEUE
113 0 : .remote_errors
114 0 : .with_label_values(&["execute"])
115 0 : .inc();
116 : }
117 : };
118 : }
119 11 : if self.cancel.is_cancelled() {
120 : // Expose an error because we may not have actually flushed everything
121 1 : Err(DeletionQueueError::ShuttingDown)
122 : } else {
123 10 : Ok(())
124 : }
125 11 : }
126 :
127 4 : pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
128 4 : let max_keys_per_delete = self.remote_storage.max_keys_per_delete();
129 4 : self.accumulator.reserve(max_keys_per_delete);
130 :
131 : loop {
132 19 : if self.cancel.is_cancelled() {
133 0 : return Err(DeletionQueueError::ShuttingDown);
134 19 : }
135 :
136 19 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
137 14 : Ok(Some(m)) => m,
138 : Ok(None) => {
139 : // All queue senders closed
140 0 : info!("Shutting down");
141 0 : return Err(DeletionQueueError::ShuttingDown);
142 : }
143 : Err(_) => {
144 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
145 : // return immediately if no work is pending
146 2 : self.flush().await?;
147 :
148 1 : continue;
149 : }
150 : };
151 :
152 14 : match msg {
153 5 : DeleterMessage::Delete(mut list) => {
154 8 : while !list.is_empty() || self.accumulator.len() == max_keys_per_delete {
155 3 : if self.accumulator.len() == max_keys_per_delete {
156 0 : self.flush().await?;
157 : // If we have received this number of keys, proceed with attempting to execute
158 0 : assert_eq!(self.accumulator.len(), 0);
159 3 : }
160 :
161 3 : let available_slots = max_keys_per_delete - self.accumulator.len();
162 3 : let take_count = std::cmp::min(available_slots, list.len());
163 3 : for path in list.drain(list.len() - take_count..) {
164 3 : self.accumulator.push(path);
165 3 : }
166 : }
167 : }
168 9 : DeleterMessage::Flush(flush_op) => {
169 : // If flush() errors, we drop the flush_op and the caller will get
170 : // an error recv()'ing their oneshot channel.
171 9 : self.flush().await?;
172 9 : flush_op.notify();
173 : }
174 : }
175 : }
176 1 : }
177 : }
|