TLA Line data Source code
1 : //! The deleter is the final stage in the deletion queue. It accumulates remote
2 : //! paths to delete, and periodically executes them in batches of up to 1000
3 : //! using the DeleteObjects request.
4 : //!
5 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
6 : //! number of full-sized DeleteObjects requests, rather than a larger number of
7 : //! smaller requests.
8 :
9 : use remote_storage::GenericRemoteStorage;
10 : use remote_storage::RemotePath;
11 : use remote_storage::MAX_KEYS_PER_DELETE;
12 : use std::time::Duration;
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::info;
15 : use tracing::warn;
16 : use utils::backoff;
17 :
18 : use crate::metrics;
19 :
20 : use super::DeletionQueueError;
21 : use super::FlushOp;
22 :
23 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
24 :
25 : pub(super) enum DeleterMessage {
26 : Delete(Vec<RemotePath>),
27 : Flush(FlushOp),
28 : }
29 :
30 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
31 : /// larger DeleteObjects requests.
32 : pub(super) struct Deleter {
33 : // Accumulate up to 1000 keys for the next deletion operation
34 : accumulator: Vec<RemotePath>,
35 :
36 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
37 :
38 : cancel: CancellationToken,
39 : remote_storage: GenericRemoteStorage,
40 : }
41 :
42 : impl Deleter {
43 CBC 564 : pub(super) fn new(
44 564 : remote_storage: GenericRemoteStorage,
45 564 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
46 564 : cancel: CancellationToken,
47 564 : ) -> Self {
48 564 : Self {
49 564 : remote_storage,
50 564 : rx,
51 564 : cancel,
52 564 : accumulator: Vec::new(),
53 564 : }
54 564 : }
55 :
56 : /// Wrap the remote `delete_objects` with a failpoint
57 18953 : async fn remote_delete(&self) -> Result<(), anyhow::Error> {
58 18937 : fail::fail_point!("deletion-queue-before-execute", |_| {
59 18314 : info!("Skipping execution, failpoint set");
60 18314 : metrics::DELETION_QUEUE
61 18314 : .remote_errors
62 18314 : .with_label_values(&["failpoint"])
63 18314 : .inc();
64 18314 : Err(anyhow::anyhow!("failpoint hit"))
65 18937 : });
66 :
67 : // A backoff::retry is used here for two reasons:
68 : // - To provide a backoff rather than busy-polling the API on errors
69 : // - To absorb transient 429/503 conditions without hitting our error
70 : // logging path for issues deleting objects.
71 623 : backoff::retry(
72 11524 : || async { self.remote_storage.delete_objects(&self.accumulator).await },
73 623 : |_| false,
74 623 : 3,
75 623 : 10,
76 623 : "executing deletion batch",
77 623 : backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
78 623 : )
79 11524 : .await
80 18937 : }
81 :
82 : /// Block until everything in accumulator has been executed
83 1242 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
84 20177 : while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
85 18937 : match self.remote_delete().await {
86 : Ok(()) => {
87 : // Note: we assume that the remote storage layer returns Ok(()) if some
88 : // or all of the deleted objects were already gone.
89 623 : metrics::DELETION_QUEUE
90 623 : .keys_executed
91 623 : .inc_by(self.accumulator.len() as u64);
92 623 : info!(
93 623 : "Executed deletion batch {}..{}",
94 623 : self.accumulator
95 623 : .first()
96 623 : .expect("accumulator should be non-empty"),
97 623 : self.accumulator
98 623 : .last()
99 623 : .expect("accumulator should be non-empty"),
100 623 : );
101 623 : self.accumulator.clear();
102 : }
103 18312 : Err(e) => {
104 18312 : if self.cancel.is_cancelled() {
105 UBC 0 : return Err(DeletionQueueError::ShuttingDown);
106 CBC 18312 : }
107 18312 : warn!("DeleteObjects request failed: {e:#}, will continue trying");
108 18312 : metrics::DELETION_QUEUE
109 18312 : .remote_errors
110 18312 : .with_label_values(&["execute"])
111 18312 : .inc();
112 : }
113 : };
114 : }
115 1240 : if self.cancel.is_cancelled() {
116 : // Expose an error because we may not have actually flushed everything
117 1 : Err(DeletionQueueError::ShuttingDown)
118 : } else {
119 1239 : Ok(())
120 : }
121 1240 : }
122 :
123 564 : pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
124 564 : self.accumulator.reserve(MAX_KEYS_PER_DELETE);
125 :
126 2805 : loop {
127 2805 : if self.cancel.is_cancelled() {
128 UBC 0 : return Err(DeletionQueueError::ShuttingDown);
129 CBC 2805 : }
130 :
131 2805 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
132 1967 : Ok(Some(m)) => m,
133 : Ok(None) => {
134 : // All queue senders closed
135 UBC 0 : info!("Shutting down");
136 0 : return Err(DeletionQueueError::ShuttingDown);
137 : }
138 : Err(_) => {
139 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
140 : // return immediately if no work is pending
141 CBC 277 : self.flush().await?;
142 :
143 276 : continue;
144 : }
145 : };
146 :
147 1967 : match msg {
148 1002 : DeleterMessage::Delete(mut list) => {
149 1675 : while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
150 673 : if self.accumulator.len() == MAX_KEYS_PER_DELETE {
151 UBC 0 : self.flush().await?;
152 : // If we have received this number of keys, proceed with attempting to execute
153 0 : assert_eq!(self.accumulator.len(), 0);
154 CBC 673 : }
155 :
156 673 : let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
157 673 : let take_count = std::cmp::min(available_slots, list.len());
158 11429 : for path in list.drain(list.len() - take_count..) {
159 11429 : self.accumulator.push(path);
160 11429 : }
161 : }
162 : }
163 965 : DeleterMessage::Flush(flush_op) => {
164 965 : // If flush() errors, we drop the flush_op and the caller will get
165 965 : // an error recv()'ing their oneshot channel.
166 11524 : self.flush().await?;
167 963 : flush_op.notify();
168 : }
169 : }
170 : }
171 1 : }
172 : }
|