Line data Source code
1 : //! The deleter is the final stage in the deletion queue. It accumulates remote
2 : //! paths to delete, and periodically executes them in batches of up to 1000
3 : //! using the DeleteObjects request.
4 : //!
5 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
6 : //! number of full-sized DeleteObjects requests, rather than a larger number of
7 : //! smaller requests.
8 :
9 : use std::time::Duration;
10 :
11 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{info, warn};
14 : use utils::{backoff, pausable_failpoint};
15 :
16 : use super::{DeletionQueueError, FlushOp};
17 : use crate::metrics;
18 :
19 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
20 :
21 : pub(super) enum DeleterMessage {
22 : Delete(Vec<RemotePath>),
23 : Flush(FlushOp),
24 : }
25 :
26 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
27 : /// larger DeleteObjects requests.
28 : pub(super) struct Deleter {
29 : // Accumulate up to 1000 keys for the next deletion operation
30 : accumulator: Vec<RemotePath>,
31 :
32 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
33 :
34 : cancel: CancellationToken,
35 : remote_storage: GenericRemoteStorage,
36 : }
37 :
38 : impl Deleter {
39 16 : pub(super) fn new(
40 16 : remote_storage: GenericRemoteStorage,
41 16 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
42 16 : cancel: CancellationToken,
43 16 : ) -> Self {
44 16 : Self {
45 16 : remote_storage,
46 16 : rx,
47 16 : cancel,
48 16 : accumulator: Vec::new(),
49 16 : }
50 16 : }
51 :
52 : /// Wrap the remote `delete_objects` with a failpoint
53 12 : async fn remote_delete(&self) -> Result<(), anyhow::Error> {
54 12 : // A backoff::retry is used here for two reasons:
55 12 : // - To provide a backoff rather than busy-polling the API on errors
56 12 : // - To absorb transient 429/503 conditions without hitting our error
57 12 : // logging path for issues deleting objects.
58 12 : backoff::retry(
59 12 : || async {
60 12 : fail::fail_point!("deletion-queue-before-execute", |_| {
61 0 : info!("Skipping execution, failpoint set");
62 :
63 0 : metrics::DELETION_QUEUE
64 0 : .remote_errors
65 0 : .with_label_values(&["failpoint"])
66 0 : .inc();
67 0 : Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
68 12 : });
69 :
70 12 : self.remote_storage
71 12 : .delete_objects(&self.accumulator, &self.cancel)
72 12 : .await
73 24 : },
74 12 : TimeoutOrCancel::caused_by_cancel,
75 12 : 3,
76 12 : 10,
77 12 : "executing deletion batch",
78 12 : &self.cancel,
79 12 : )
80 12 : .await
81 12 : .ok_or_else(|| anyhow::anyhow!("Shutting down"))
82 12 : .and_then(|x| x)
83 12 : }
84 :
85 : /// Block until everything in accumulator has been executed
86 44 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
87 56 : while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
88 12 : pausable_failpoint!("deletion-queue-before-execute-pause");
89 12 : match self.remote_delete().await {
90 : Ok(()) => {
91 : // Note: we assume that the remote storage layer returns Ok(()) if some
92 : // or all of the deleted objects were already gone.
93 12 : metrics::DELETION_QUEUE
94 12 : .keys_executed
95 12 : .inc_by(self.accumulator.len() as u64);
96 12 : info!(
97 0 : "Executed deletion batch {}..{}",
98 0 : self.accumulator
99 0 : .first()
100 0 : .expect("accumulator should be non-empty"),
101 0 : self.accumulator
102 0 : .last()
103 0 : .expect("accumulator should be non-empty"),
104 : );
105 12 : self.accumulator.clear();
106 : }
107 0 : Err(e) => {
108 0 : if self.cancel.is_cancelled() {
109 0 : return Err(DeletionQueueError::ShuttingDown);
110 0 : }
111 0 : warn!("DeleteObjects request failed: {e:#}, will continue trying");
112 0 : metrics::DELETION_QUEUE
113 0 : .remote_errors
114 0 : .with_label_values(&["execute"])
115 0 : .inc();
116 : }
117 : };
118 : }
119 44 : if self.cancel.is_cancelled() {
120 : // Expose an error because we may not have actually flushed everything
121 4 : Err(DeletionQueueError::ShuttingDown)
122 : } else {
123 40 : Ok(())
124 : }
125 44 : }
126 :
127 16 : pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
128 16 : let max_keys_per_delete = self.remote_storage.max_keys_per_delete();
129 16 : self.accumulator.reserve(max_keys_per_delete);
130 :
131 : loop {
132 76 : if self.cancel.is_cancelled() {
133 0 : return Err(DeletionQueueError::ShuttingDown);
134 76 : }
135 :
136 76 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
137 56 : Ok(Some(m)) => m,
138 : Ok(None) => {
139 : // All queue senders closed
140 0 : info!("Shutting down");
141 0 : return Err(DeletionQueueError::ShuttingDown);
142 : }
143 : Err(_) => {
144 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
145 : // return immediately if no work is pending
146 8 : self.flush().await?;
147 :
148 4 : continue;
149 : }
150 : };
151 :
152 56 : match msg {
153 20 : DeleterMessage::Delete(mut list) => {
154 32 : while !list.is_empty() || self.accumulator.len() == max_keys_per_delete {
155 12 : if self.accumulator.len() == max_keys_per_delete {
156 0 : self.flush().await?;
157 : // If we have received this number of keys, proceed with attempting to execute
158 0 : assert_eq!(self.accumulator.len(), 0);
159 12 : }
160 :
161 12 : let available_slots = max_keys_per_delete - self.accumulator.len();
162 12 : let take_count = std::cmp::min(available_slots, list.len());
163 12 : for path in list.drain(list.len() - take_count..) {
164 12 : self.accumulator.push(path);
165 12 : }
166 : }
167 : }
168 36 : DeleterMessage::Flush(flush_op) => {
169 36 : // If flush() errors, we drop the flush_op and the caller will get
170 36 : // an error recv()'ing their oneshot channel.
171 36 : self.flush().await?;
172 36 : flush_op.notify();
173 : }
174 : }
175 : }
176 4 : }
177 : }
|