TLA Line data Source code
1 : //! The list writer is the first stage in the deletion queue. It accumulates
2 : //! layers to delete, and periodically writes out these layers into a persistent
3 : //! DeletionList.
4 : //!
5 : //! The purpose of writing DeletionLists is to decouple the decision to
6 : //! delete an object from the validation required to execute it: even if
7 : //! validation is not possible, e.g. due to a control plane outage, we can
8 : //! still persist our intent to delete an object, in a way that would
9 : //! survive a restart.
10 : //!
11 : //! DeletionLists are passed onwards to the Validator.
12 :
13 : use super::DeletionHeader;
14 : use super::DeletionList;
15 : use super::FlushOp;
16 : use super::ValidatorQueueMessage;
17 :
18 : use std::collections::HashMap;
19 : use std::fs::create_dir_all;
20 : use std::time::Duration;
21 :
22 : use regex::Regex;
23 : use remote_storage::RemotePath;
24 : use tokio_util::sync::CancellationToken;
25 : use tracing::debug;
26 : use tracing::info;
27 : use tracing::warn;
28 : use utils::generation::Generation;
29 : use utils::id::TenantId;
30 : use utils::id::TimelineId;
31 :
32 : use crate::config::PageServerConf;
33 : use crate::deletion_queue::TEMP_SUFFIX;
34 : use crate::metrics;
35 : use crate::tenant::remote_timeline_client::remote_layer_path;
36 : use crate::tenant::storage_layer::LayerFileName;
37 :
38 : // The number of keys in a DeletionList before we will proactively persist it
39 : // (without reaching a flush deadline). This aims to deliver objects of the order
40 : // of magnitude 1MB when we are under heavy delete load.
41 : const DELETION_LIST_TARGET_SIZE: usize = 16384;
42 :
43 : // Ordinarily, we only flush to DeletionList periodically, to bound the window during
44 : // which we might leak objects from not flushing a DeletionList after
45 : // the objects are already unlinked from timeline metadata.
46 : const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
47 :
48 : // If someone is waiting for a flush to DeletionList, only delay a little to accumulate
49 : // more objects before doing the flush.
50 : const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
51 :
52 UBC 0 : #[derive(Debug)]
53 : pub(super) struct DeletionOp {
54 : pub(super) tenant_id: TenantId,
55 : pub(super) timeline_id: TimelineId,
56 : // `layers` and `objects` are both just lists of objects. `layers` is used if you do not
57 : // have a config object handy to project it to a remote key, and need the consuming worker
58 : // to do it for you.
59 : pub(super) layers: Vec<(LayerFileName, Generation)>,
60 : pub(super) objects: Vec<RemotePath>,
61 :
62 : /// The _current_ generation of the Tenant attachment in which we are enqueuing
63 : /// this deletion.
64 : pub(super) generation: Generation,
65 : }
66 :
67 0 : #[derive(Debug)]
68 : pub(super) struct RecoverOp {
69 : pub(super) attached_tenants: HashMap<TenantId, Generation>,
70 : }
71 :
72 0 : #[derive(Debug)]
73 : pub(super) enum ListWriterQueueMessage {
74 : Delete(DeletionOp),
75 : // Wait until all prior deletions make it into a persistent DeletionList
76 : Flush(FlushOp),
77 : // Wait until all prior deletions have been executed (i.e. objects are actually deleted)
78 : FlushExecute(FlushOp),
79 : // Call once after re-attaching to control plane, to notify the deletion queue about
80 : // latest attached generations & load any saved deletion lists from disk.
81 : Recover(RecoverOp),
82 : }
83 :
84 : pub(super) struct ListWriter {
85 : conf: &'static PageServerConf,
86 :
87 : // Incoming frontend requests to delete some keys
88 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
89 :
90 : // Outbound requests to the backend to execute deletion lists we have composed.
91 : tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
92 :
93 : // The list we are currently building, contains a buffer of keys to delete
94 : // and our next sequence number
95 : pending: DeletionList,
96 :
97 : // These FlushOps should notify the next time we flush
98 : pending_flushes: Vec<FlushOp>,
99 :
100 : // Worker loop is torn down when this fires.
101 : cancel: CancellationToken,
102 :
103 : // Safety guard to do recovery exactly once
104 : recovered: bool,
105 : }
106 :
107 : impl ListWriter {
108 : // Initially DeletionHeader.validated_sequence is zero. The place we start our
109 : // sequence numbers must be higher than that.
110 : const BASE_SEQUENCE: u64 = 1;
111 :
112 CBC 564 : pub(super) fn new(
113 564 : conf: &'static PageServerConf,
114 564 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
115 564 : tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
116 564 : cancel: CancellationToken,
117 564 : ) -> Self {
118 564 : Self {
119 564 : pending: DeletionList::new(Self::BASE_SEQUENCE),
120 564 : conf,
121 564 : rx,
122 564 : tx,
123 564 : pending_flushes: Vec::new(),
124 564 : cancel,
125 564 : recovered: false,
126 564 : }
127 564 : }
128 :
129 : /// Try to flush `list` to persistent storage
130 : ///
131 : /// This does not return errors, because on failure to flush we do not lose
132 : /// any state: flushing will be retried implicitly on the next deadline
133 341 : async fn flush(&mut self) {
134 341 : if self.pending.is_empty() {
135 326 : for f in self.pending_flushes.drain(..) {
136 UBC 0 : f.notify();
137 0 : }
138 CBC 326 : return;
139 15 : }
140 15 :
141 15 : match self.pending.save(self.conf).await {
142 : Ok(_) => {
143 15 : info!(sequence = self.pending.sequence, "Stored deletion list");
144 :
145 15 : for f in self.pending_flushes.drain(..) {
146 12 : f.notify();
147 12 : }
148 :
149 : // Take the list we've accumulated, replace it with a fresh list for the next sequence
150 15 : let next_list = DeletionList::new(self.pending.sequence + 1);
151 15 : let list = std::mem::replace(&mut self.pending, next_list);
152 :
153 15 : if let Err(e) = self.tx.send(ValidatorQueueMessage::Delete(list)).await {
154 : // This is allowed to fail: it will only happen if the backend worker is shut down,
155 : // so we can just drop this on the floor.
156 UBC 0 : info!("Deletion list dropped, this is normal during shutdown ({e:#})");
157 CBC 15 : }
158 : }
159 UBC 0 : Err(e) => {
160 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
161 0 : warn!(
162 0 : sequence = self.pending.sequence,
163 0 : "Failed to write deletion list, will retry later ({e:#})"
164 0 : );
165 : }
166 : }
167 CBC 341 : }
168 :
169 : /// Load the header, to learn the sequence number up to which deletions
170 : /// have been validated. We will apply validated=true to DeletionLists
171 : /// <= this sequence when loading them.
172 : ///
173 : /// It is not an error for the header to not exist: we return None, and
174 : /// the caller should act as if validated_sequence is 0
175 35 : async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
176 35 : let header_path = self.conf.deletion_header_path();
177 35 : match tokio::fs::read(&header_path).await {
178 7 : Ok(header_bytes) => {
179 7 : match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
180 7 : Ok(h) => Ok(Some(h.validated_sequence)),
181 UBC 0 : Err(e) => {
182 0 : warn!(
183 0 : "Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
184 0 : );
185 : // This should never happen unless we make a mistake with our serialization.
186 : // Ignoring a deletion header is not consequential for correctnes because all deletions
187 : // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
188 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
189 0 : Ok(None)
190 : }
191 : }
192 : }
193 CBC 28 : Err(e) => {
194 28 : if e.kind() == std::io::ErrorKind::NotFound {
195 UBC 0 : debug!("Deletion header {header_path} not found, first start?");
196 CBC 28 : Ok(None)
197 : } else {
198 UBC 0 : Err(anyhow::anyhow!(e))
199 : }
200 : }
201 : }
202 CBC 35 : }
203 :
204 35 : async fn recover(
205 35 : &mut self,
206 35 : attached_tenants: HashMap<TenantId, Generation>,
207 35 : ) -> Result<(), anyhow::Error> {
208 UBC 0 : debug!(
209 0 : "recovering with {} attached tenants",
210 0 : attached_tenants.len()
211 0 : );
212 :
213 : // Load the header
214 CBC 35 : let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
215 35 :
216 35 : self.pending.sequence = validated_sequence + 1;
217 35 :
218 35 : let deletion_directory = self.conf.deletion_prefix();
219 35 : let mut dir = match tokio::fs::read_dir(&deletion_directory).await {
220 35 : Ok(d) => d,
221 UBC 0 : Err(e) => {
222 0 : warn!("Failed to open deletion list directory {deletion_directory}: {e:#}");
223 :
224 : // Give up: if we can't read the deletion list directory, we probably can't
225 : // write lists into it later, so the queue won't work.
226 0 : return Err(e.into());
227 : }
228 : };
229 :
230 CBC 35 : let list_name_pattern =
231 35 : Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
232 35 :
233 35 : let temp_extension = format!(".{TEMP_SUFFIX}");
234 35 : let header_path = self.conf.deletion_header_path();
235 35 : let mut seqs: Vec<u64> = Vec::new();
236 50 : while let Some(dentry) = dir.next_entry().await? {
237 15 : let file_name = dentry.file_name();
238 15 : let dentry_str = file_name.to_string_lossy();
239 15 :
240 15 : if file_name == header_path.file_name().unwrap_or("") {
241 : // Don't try and parse the header's name like a list
242 7 : continue;
243 8 : }
244 8 :
245 8 : if dentry_str.ends_with(&temp_extension) {
246 UBC 0 : info!("Cleaning up temporary file {dentry_str}");
247 0 : let absolute_path =
248 0 : deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
249 0 : if let Err(e) = tokio::fs::remove_file(&absolute_path).await {
250 : // Non-fatal error: we will just leave the file behind but not
251 : // try and load it.
252 0 : warn!("Failed to clean up temporary file {absolute_path}: {e:#}");
253 0 : }
254 :
255 0 : continue;
256 CBC 8 : }
257 8 :
258 8 : let file_name = dentry.file_name().to_owned();
259 8 : let basename = file_name.to_string_lossy();
260 8 : let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
261 8 : m.name("sequence")
262 8 : .expect("Non optional group should be present")
263 8 : .as_str()
264 : } else {
265 UBC 0 : warn!("Unexpected key in deletion queue: {basename}");
266 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
267 0 : continue;
268 : };
269 :
270 CBC 8 : let seq: u64 = match u64::from_str_radix(seq_part, 16) {
271 8 : Ok(s) => s,
272 UBC 0 : Err(e) => {
273 0 : warn!("Malformed key '{basename}': {e}");
274 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
275 0 : continue;
276 : }
277 : };
278 CBC 8 : seqs.push(seq);
279 : }
280 35 : seqs.sort();
281 35 :
282 35 : // Start our next deletion list from after the last location validated by
283 35 : // previous process lifetime, or after the last location found (it is updated
284 35 : // below after enumerating the deletion lists)
285 35 : self.pending.sequence = validated_sequence + 1;
286 35 : if let Some(max_list_seq) = seqs.last() {
287 7 : self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
288 28 : }
289 :
290 43 : for s in seqs {
291 8 : let list_path = self.conf.deletion_list_path(s);
292 :
293 8 : let list_bytes = tokio::fs::read(&list_path).await?;
294 :
295 8 : let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
296 8 : Ok(l) => l,
297 UBC 0 : Err(e) => {
298 : // Drop the list on the floor: any objects it referenced will be left behind
299 : // for scrubbing to clean up. This should never happen unless we have a serialization bug.
300 0 : warn!(sequence = s, "Failed to deserialize deletion list: {e}");
301 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
302 0 : continue;
303 : }
304 : };
305 :
306 CBC 8 : if deletion_list.sequence <= validated_sequence {
307 2 : // If the deletion list falls below valid_seq, we may assume that it was
308 2 : // already validated the last time this pageserver ran. Otherwise, we still
309 2 : // load it, as it may still contain content valid in this generation.
310 2 : deletion_list.validated = true;
311 2 : } else {
312 : // Special case optimization: if a tenant is still attached, and no other
313 : // generation was issued to another node in the interval while we restarted,
314 : // then we may treat deletion lists from the previous generation as if they
315 : // belong to our currently attached generation, and proceed to validate & execute.
316 12 : for (tenant_id, tenant_list) in &mut deletion_list.tenants {
317 6 : if let Some(attached_gen) = attached_tenants.get(tenant_id) {
318 5 : if attached_gen.previous() == tenant_list.generation {
319 3 : tenant_list.generation = *attached_gen;
320 3 : }
321 1 : }
322 : }
323 : }
324 :
325 8 : info!(
326 8 : validated = deletion_list.validated,
327 8 : sequence = deletion_list.sequence,
328 8 : "Recovered deletion list"
329 8 : );
330 :
331 : // We will drop out of recovery if this fails: it indicates that we are shutting down
332 : // or the backend has panicked
333 8 : metrics::DELETION_QUEUE
334 8 : .keys_submitted
335 8 : .inc_by(deletion_list.len() as u64);
336 8 : self.tx
337 8 : .send(ValidatorQueueMessage::Delete(deletion_list))
338 UBC 0 : .await?;
339 : }
340 :
341 CBC 35 : info!(next_sequence = self.pending.sequence, "Replay complete");
342 :
343 35 : Ok(())
344 35 : }
345 :
346 : /// This is the front-end ingest, where we bundle up deletion requests into DeletionList
347 : /// and write them out, for later validation by the backend and execution by the executor.
348 564 : pub(super) async fn background(&mut self) {
349 564 : info!("Started deletion frontend worker");
350 :
351 : // Synchronous, but we only do it once per process lifetime so it's tolerable
352 564 : if let Err(e) = create_dir_all(&self.conf.deletion_prefix()) {
353 UBC 0 : tracing::error!(
354 0 : "Failed to create deletion list directory {}, deletions will not be executed ({e})",
355 0 : self.conf.deletion_prefix(),
356 0 : );
357 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
358 0 : return;
359 CBC 564 : }
360 :
361 1212 : while !self.cancel.is_cancelled() {
362 1067 : let timeout = if self.pending_flushes.is_empty() {
363 1067 : FRONTEND_DEFAULT_TIMEOUT
364 : } else {
365 UBC 0 : FRONTEND_FLUSHING_TIMEOUT
366 : };
367 :
368 CBC 1067 : let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
369 320 : Ok(Some(msg)) => msg,
370 : Ok(None) => {
371 : // Queue sender destroyed, shutting down
372 UBC 0 : break;
373 : }
374 : Err(_) => {
375 : // Hit deadline, flush.
376 CBC 328 : self.flush().await;
377 328 : continue;
378 : }
379 : };
380 :
381 320 : match msg {
382 61 : ListWriterQueueMessage::Delete(op) => {
383 61 : assert!(
384 61 : self.recovered,
385 UBC 0 : "Cannot process deletions before recovery. This is a bug."
386 : );
387 :
388 0 : debug!(
389 0 : "Delete: ingesting {} layers, {} other objects",
390 0 : op.layers.len(),
391 0 : op.objects.len()
392 0 : );
393 :
394 CBC 61 : let mut layer_paths = Vec::new();
395 1733 : for (layer, generation) in op.layers {
396 1672 : layer_paths.push(remote_layer_path(
397 1672 : &op.tenant_id,
398 1672 : &op.timeline_id,
399 1672 : &layer,
400 1672 : generation,
401 1672 : ));
402 1672 : }
403 61 : layer_paths.extend(op.objects);
404 61 :
405 61 : if !self.pending.push(
406 61 : &op.tenant_id,
407 61 : &op.timeline_id,
408 61 : op.generation,
409 61 : &mut layer_paths,
410 61 : ) {
411 1 : self.flush().await;
412 1 : let retry_succeeded = self.pending.push(
413 1 : &op.tenant_id,
414 1 : &op.timeline_id,
415 1 : op.generation,
416 1 : &mut layer_paths,
417 1 : );
418 1 : if !retry_succeeded {
419 : // Unexpected: after we flush, we should have
420 : // drained self.pending, so a conflict on
421 : // generation numbers should be impossible.
422 UBC 0 : tracing::error!(
423 0 : "Failed to enqueue deletions, leaking objects. This is a bug."
424 0 : );
425 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
426 CBC 1 : }
427 60 : }
428 : }
429 170 : ListWriterQueueMessage::Flush(op) => {
430 170 : if self.pending.is_empty() {
431 : // Execute immediately
432 UBC 0 : debug!("Flush: No pending objects, flushing immediately");
433 CBC 158 : op.notify()
434 : } else {
435 : // Execute next time we flush
436 UBC 0 : debug!("Flush: adding to pending flush list for next deadline flush");
437 CBC 12 : self.pending_flushes.push(op);
438 : }
439 : }
440 54 : ListWriterQueueMessage::FlushExecute(op) => {
441 UBC 0 : debug!("FlushExecute: passing through to backend");
442 : // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
443 CBC 54 : if let Err(e) = self.tx.send(ValidatorQueueMessage::Flush(op)).await {
444 UBC 0 : info!("Can't flush, shutting down ({e})");
445 : // Caller will get error when their oneshot sender was dropped.
446 CBC 54 : }
447 : }
448 35 : ListWriterQueueMessage::Recover(op) => {
449 35 : if self.recovered {
450 UBC 0 : tracing::error!(
451 0 : "Deletion queue recovery called more than once. This is a bug."
452 0 : );
453 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
454 0 : // Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
455 0 : continue;
456 CBC 35 : }
457 :
458 113 : if let Err(e) = self.recover(op.attached_tenants).await {
459 : // This should only happen in truly unrecoverable cases, like the recovery finding that the backend
460 : // queue receiver has been dropped, or something is critically broken with
461 : // the local filesystem holding deletion lists.
462 UBC 0 : info!(
463 0 : "Deletion queue recover aborted, deletion queue will not proceed ({e})"
464 0 : );
465 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
466 0 : return;
467 CBC 35 : } else {
468 35 : self.recovered = true;
469 35 : }
470 : }
471 : }
472 :
473 320 : if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
474 12 : self.flush().await;
475 308 : }
476 : }
477 145 : info!("Deletion queue shut down.");
478 145 : }
479 : }
|