TLA Line data Source code
1 : //! The deleter is the final stage in the deletion queue. It accumulates remote
2 : //! paths to delete, and periodically executes them in batches of up to 1000
3 : //! using the DeleteObjects request.
4 : //!
5 : //! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
6 : //! number of full-sized DeleteObjects requests, rather than a larger number of
7 : //! smaller requests.
8 :
9 : use remote_storage::GenericRemoteStorage;
10 : use remote_storage::RemotePath;
11 : use remote_storage::MAX_KEYS_PER_DELETE;
12 : use std::time::Duration;
13 : use tokio_util::sync::CancellationToken;
14 : use tracing::info;
15 : use tracing::warn;
16 : use utils::backoff;
17 :
18 : use crate::metrics;
19 :
20 : use super::DeletionQueueError;
21 : use super::FlushOp;
22 :
23 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
24 :
25 : pub(super) enum DeleterMessage {
26 : Delete(Vec<RemotePath>),
27 : Flush(FlushOp),
28 : }
29 :
30 : /// Non-persistent deletion queue, for coalescing multiple object deletes into
31 : /// larger DeleteObjects requests.
32 : pub(super) struct Deleter {
33 : // Accumulate up to 1000 keys for the next deletion operation
34 : accumulator: Vec<RemotePath>,
35 :
36 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
37 :
38 : cancel: CancellationToken,
39 : remote_storage: GenericRemoteStorage,
40 : }
41 :
42 : impl Deleter {
43 CBC 561 : pub(super) fn new(
44 561 : remote_storage: GenericRemoteStorage,
45 561 : rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
46 561 : cancel: CancellationToken,
47 561 : ) -> Self {
48 561 : Self {
49 561 : remote_storage,
50 561 : rx,
51 561 : cancel,
52 561 : accumulator: Vec::new(),
53 561 : }
54 561 : }
55 :
56 : /// Wrap the remote `delete_objects` with a failpoint
57 362 : async fn remote_delete(&self) -> Result<(), anyhow::Error> {
58 362 : // A backoff::retry is used here for two reasons:
59 362 : // - To provide a backoff rather than busy-polling the API on errors
60 362 : // - To absorb transient 429/503 conditions without hitting our error
61 362 : // logging path for issues deleting objects.
62 362 : backoff::retry(
63 477 : || async {
64 477 : fail::fail_point!("deletion-queue-before-execute", |_| {
65 4 : info!("Skipping execution, failpoint set");
66 :
67 4 : metrics::DELETION_QUEUE
68 4 : .remote_errors
69 4 : .with_label_values(&["failpoint"])
70 4 : .inc();
71 4 : Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
72 477 : });
73 :
74 9613 : self.remote_storage.delete_objects(&self.accumulator).await
75 477 : },
76 362 : |_| false,
77 362 : 3,
78 362 : 10,
79 362 : "executing deletion batch",
80 362 : backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
81 362 : )
82 9613 : .await
83 360 : }
84 :
85 : /// Block until everything in accumulator has been executed
86 942 : async fn flush(&mut self) -> Result<(), DeletionQueueError> {
87 1302 : while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
88 9613 : match self.remote_delete().await {
89 : Ok(()) => {
90 : // Note: we assume that the remote storage layer returns Ok(()) if some
91 : // or all of the deleted objects were already gone.
92 360 : metrics::DELETION_QUEUE
93 360 : .keys_executed
94 360 : .inc_by(self.accumulator.len() as u64);
95 360 : info!(
96 360 : "Executed deletion batch {}..{}",
97 360 : self.accumulator
98 360 : .first()
99 360 : .expect("accumulator should be non-empty"),
100 360 : self.accumulator
101 360 : .last()
102 360 : .expect("accumulator should be non-empty"),
103 360 : );
104 360 : self.accumulator.clear();
105 : }
106 UBC 0 : Err(e) => {
107 0 : if self.cancel.is_cancelled() {
108 0 : return Err(DeletionQueueError::ShuttingDown);
109 0 : }
110 0 : warn!("DeleteObjects request failed: {e:#}, will continue trying");
111 0 : metrics::DELETION_QUEUE
112 0 : .remote_errors
113 0 : .with_label_values(&["execute"])
114 0 : .inc();
115 : }
116 : };
117 : }
118 CBC 940 : if self.cancel.is_cancelled() {
119 : // Expose an error because we may not have actually flushed everything
120 1 : Err(DeletionQueueError::ShuttingDown)
121 : } else {
122 939 : Ok(())
123 : }
124 940 : }
125 :
126 561 : pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
127 561 : self.accumulator.reserve(MAX_KEYS_PER_DELETE);
128 :
129 1996 : loop {
130 1996 : if self.cancel.is_cancelled() {
131 UBC 0 : return Err(DeletionQueueError::ShuttingDown);
132 CBC 1996 : }
133 :
134 1996 : let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
135 910 : Ok(Some(m)) => m,
136 : Ok(None) => {
137 : // All queue senders closed
138 UBC 0 : info!("Shutting down");
139 0 : return Err(DeletionQueueError::ShuttingDown);
140 : }
141 : Err(_) => {
142 : // Timeout, we hit deadline to execute whatever we have in hand. These functions will
143 : // return immediately if no work is pending
144 CBC 528 : self.flush().await?;
145 :
146 527 : continue;
147 : }
148 : };
149 :
150 910 : match msg {
151 496 : DeleterMessage::Delete(mut list) => {
152 963 : while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
153 467 : if self.accumulator.len() == MAX_KEYS_PER_DELETE {
154 UBC 0 : self.flush().await?;
155 : // If we have received this number of keys, proceed with attempting to execute
156 0 : assert_eq!(self.accumulator.len(), 0);
157 CBC 467 : }
158 :
159 467 : let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
160 467 : let take_count = std::cmp::min(available_slots, list.len());
161 7324 : for path in list.drain(list.len() - take_count..) {
162 7324 : self.accumulator.push(path);
163 7324 : }
164 : }
165 : }
166 414 : DeleterMessage::Flush(flush_op) => {
167 414 : // If flush() errors, we drop the flush_op and the caller will get
168 414 : // an error recv()'ing their oneshot channel.
169 9613 : self.flush().await?;
170 412 : flush_op.notify();
171 : }
172 : }
173 : }
174 1 : }
175 : }
|