Line data Source code
1 : //! The list writer is the first stage in the deletion queue. It accumulates
2 : //! layers to delete, and periodically writes out these layers into a persistent
3 : //! DeletionList.
4 : //!
5 : //! The purpose of writing DeletionLists is to decouple the decision to
6 : //! delete an object from the validation required to execute it: even if
7 : //! validation is not possible, e.g. due to a control plane outage, we can
8 : //! still persist our intent to delete an object, in a way that would
9 : //! survive a restart.
10 : //!
11 : //! DeletionLists are passed onwards to the Validator.
12 :
13 : use std::collections::HashMap;
14 : use std::fs::create_dir_all;
15 : use std::time::Duration;
16 :
17 : use pageserver_api::shard::TenantShardId;
18 : use regex::Regex;
19 : use remote_storage::RemotePath;
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{debug, info, warn};
22 : use utils::generation::Generation;
23 : use utils::id::TimelineId;
24 :
25 : use super::{DeletionHeader, DeletionList, FlushOp, ValidatorQueueMessage};
26 : use crate::config::PageServerConf;
27 : use crate::deletion_queue::TEMP_SUFFIX;
28 : use crate::metrics;
29 : use crate::tenant::remote_timeline_client::{LayerFileMetadata, remote_layer_path};
30 : use crate::tenant::storage_layer::LayerName;
31 : use crate::virtual_file::{MaybeFatalIo, on_fatal_io_error};
32 :
33 : // The number of keys in a DeletionList before we will proactively persist it
34 : // (without reaching a flush deadline). This aims to deliver objects of the order
35 : // of magnitude 1MB when we are under heavy delete load.
36 : const DELETION_LIST_TARGET_SIZE: usize = 16384;
37 :
38 : // Ordinarily, we only flush to DeletionList periodically, to bound the window during
39 : // which we might leak objects from not flushing a DeletionList after
40 : // the objects are already unlinked from timeline metadata.
41 : const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
42 :
43 : // If someone is waiting for a flush to DeletionList, only delay a little to accumulate
44 : // more objects before doing the flush.
45 : const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
46 :
47 : #[derive(Debug)]
48 : pub(super) struct DeletionOp {
49 : pub(super) tenant_shard_id: TenantShardId,
50 : pub(super) timeline_id: TimelineId,
51 : // `layers` and `objects` are both just lists of objects. `layers` is used if you do not
52 : // have a config object handy to project it to a remote key, and need the consuming worker
53 : // to do it for you.
54 : pub(super) layers: Vec<(LayerName, LayerFileMetadata)>,
55 : pub(super) objects: Vec<RemotePath>,
56 :
57 : /// The _current_ generation of the Tenant shard attachment in which we are enqueuing
58 : /// this deletion.
59 : pub(super) generation: Generation,
60 : }
61 :
62 : #[derive(Debug)]
63 : pub(super) struct RecoverOp {
64 : pub(super) attached_tenants: HashMap<TenantShardId, Generation>,
65 : }
66 :
67 : #[derive(Debug)]
68 : pub(super) enum ListWriterQueueMessage {
69 : Delete(DeletionOp),
70 : // Wait until all prior deletions make it into a persistent DeletionList
71 : Flush(FlushOp),
72 : // Wait until all prior deletions have been executed (i.e. objects are actually deleted)
73 : FlushExecute(FlushOp),
74 : // Call once after re-attaching to control plane, to notify the deletion queue about
75 : // latest attached generations & load any saved deletion lists from disk.
76 : Recover(RecoverOp),
77 : }
78 :
79 : pub(super) struct ListWriter {
80 : conf: &'static PageServerConf,
81 :
82 : // Incoming frontend requests to delete some keys
83 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
84 :
85 : // Outbound requests to the backend to execute deletion lists we have composed.
86 : tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
87 :
88 : // The list we are currently building, contains a buffer of keys to delete
89 : // and our next sequence number
90 : pending: DeletionList,
91 :
92 : // These FlushOps should notify the next time we flush
93 : pending_flushes: Vec<FlushOp>,
94 :
95 : // Worker loop is torn down when this fires.
96 : cancel: CancellationToken,
97 :
98 : // Safety guard to do recovery exactly once
99 : recovered: bool,
100 : }
101 :
102 : impl ListWriter {
103 : // Initially DeletionHeader.validated_sequence is zero. The place we start our
104 : // sequence numbers must be higher than that.
105 : const BASE_SEQUENCE: u64 = 1;
106 :
107 4 : pub(super) fn new(
108 4 : conf: &'static PageServerConf,
109 4 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
110 4 : tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
111 4 : cancel: CancellationToken,
112 4 : ) -> Self {
113 4 : Self {
114 4 : pending: DeletionList::new(Self::BASE_SEQUENCE),
115 4 : conf,
116 4 : rx,
117 4 : tx,
118 4 : pending_flushes: Vec::new(),
119 4 : cancel,
120 4 : recovered: false,
121 4 : }
122 4 : }
123 :
124 : /// Try to flush `list` to persistent storage
125 : ///
126 : /// This does not return errors, because on failure to flush we do not lose
127 : /// any state: flushing will be retried implicitly on the next deadline
128 6 : async fn flush(&mut self) {
129 6 : if self.pending.is_empty() {
130 1 : for f in self.pending_flushes.drain(..) {
131 0 : f.notify();
132 0 : }
133 1 : return;
134 5 : }
135 :
136 5 : match self.pending.save(self.conf).await {
137 : Ok(_) => {
138 5 : info!(sequence = self.pending.sequence, "Stored deletion list");
139 :
140 5 : for f in self.pending_flushes.drain(..) {
141 4 : f.notify();
142 4 : }
143 :
144 : // Take the list we've accumulated, replace it with a fresh list for the next sequence
145 5 : let next_list = DeletionList::new(self.pending.sequence + 1);
146 5 : let list = std::mem::replace(&mut self.pending, next_list);
147 :
148 5 : if let Err(e) = self.tx.send(ValidatorQueueMessage::Delete(list)).await {
149 : // This is allowed to fail: it will only happen if the backend worker is shut down,
150 : // so we can just drop this on the floor.
151 0 : info!("Deletion list dropped, this is normal during shutdown ({e:#})");
152 5 : }
153 : }
154 0 : Err(e) => {
155 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
156 0 : warn!(
157 : sequence = self.pending.sequence,
158 0 : "Failed to write deletion list, will retry later ({e:#})"
159 : );
160 : }
161 : }
162 6 : }
163 :
164 : /// Load the header, to learn the sequence number up to which deletions
165 : /// have been validated. We will apply validated=true to DeletionLists
166 : /// <= this sequence when loading them.
167 : ///
168 : /// It is not an error for the header to not exist: we return None, and
169 : /// the caller should act as if validated_sequence is 0
170 4 : async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
171 4 : let header_path = self.conf.deletion_header_path();
172 4 : match tokio::fs::read(&header_path).await {
173 0 : Ok(header_bytes) => {
174 0 : match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
175 0 : Ok(h) => Ok(Some(h.validated_sequence)),
176 0 : Err(e) => {
177 0 : warn!(
178 0 : "Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
179 : );
180 : // This should never happen unless we make a mistake with our serialization.
181 : // Ignoring a deletion header is not consequential for correctnes because all deletions
182 : // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
183 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
184 0 : Ok(None)
185 : }
186 : }
187 : }
188 4 : Err(e) => {
189 4 : if e.kind() == std::io::ErrorKind::NotFound {
190 4 : debug!("Deletion header {header_path} not found, first start?");
191 4 : Ok(None)
192 : } else {
193 0 : on_fatal_io_error(&e, "reading deletion header");
194 : }
195 : }
196 : }
197 4 : }
198 :
199 4 : async fn recover(
200 4 : &mut self,
201 4 : attached_tenants: HashMap<TenantShardId, Generation>,
202 4 : ) -> Result<(), anyhow::Error> {
203 4 : debug!(
204 0 : "recovering with {} attached tenants",
205 0 : attached_tenants.len()
206 : );
207 :
208 : // Load the header
209 4 : let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
210 :
211 4 : self.pending.sequence = validated_sequence + 1;
212 :
213 4 : let deletion_directory = self.conf.deletion_prefix();
214 4 : let mut dir = tokio::fs::read_dir(&deletion_directory)
215 4 : .await
216 4 : .fatal_err("read deletion directory");
217 :
218 4 : let list_name_pattern =
219 4 : Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
220 :
221 4 : let temp_extension = format!(".{TEMP_SUFFIX}");
222 4 : let header_path = self.conf.deletion_header_path();
223 4 : let mut seqs: Vec<u64> = Vec::new();
224 6 : while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
225 2 : let file_name = dentry.file_name();
226 2 : let dentry_str = file_name.to_string_lossy();
227 :
228 2 : if file_name == header_path.file_name().unwrap_or("") {
229 : // Don't try and parse the header's name like a list
230 0 : continue;
231 2 : }
232 :
233 2 : if dentry_str.ends_with(&temp_extension) {
234 0 : info!("Cleaning up temporary file {dentry_str}");
235 0 : let absolute_path =
236 0 : deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
237 0 : tokio::fs::remove_file(&absolute_path)
238 0 : .await
239 0 : .fatal_err("delete temp file");
240 :
241 0 : continue;
242 2 : }
243 :
244 2 : let file_name = dentry.file_name().to_owned();
245 2 : let basename = file_name.to_string_lossy();
246 2 : let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
247 2 : m.name("sequence")
248 2 : .expect("Non optional group should be present")
249 2 : .as_str()
250 : } else {
251 0 : warn!("Unexpected key in deletion queue: {basename}");
252 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
253 0 : continue;
254 : };
255 :
256 2 : let seq: u64 = match u64::from_str_radix(seq_part, 16) {
257 2 : Ok(s) => s,
258 0 : Err(e) => {
259 0 : warn!("Malformed key '{basename}': {e}");
260 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
261 0 : continue;
262 : }
263 : };
264 2 : seqs.push(seq);
265 : }
266 4 : seqs.sort();
267 :
268 : // Start our next deletion list from after the last location validated by
269 : // previous process lifetime, or after the last location found (it is updated
270 : // below after enumerating the deletion lists)
271 4 : self.pending.sequence = validated_sequence + 1;
272 4 : if let Some(max_list_seq) = seqs.last() {
273 1 : self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
274 3 : }
275 :
276 6 : for s in seqs {
277 2 : let list_path = self.conf.deletion_list_path(s);
278 :
279 2 : let list_bytes = tokio::fs::read(&list_path)
280 2 : .await
281 2 : .fatal_err("read deletion list");
282 :
283 2 : let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
284 2 : Ok(l) => l,
285 0 : Err(e) => {
286 : // Drop the list on the floor: any objects it referenced will be left behind
287 : // for scrubbing to clean up. This should never happen unless we have a serialization bug.
288 0 : warn!(sequence = s, "Failed to deserialize deletion list: {e}");
289 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
290 0 : continue;
291 : }
292 : };
293 :
294 2 : if deletion_list.sequence <= validated_sequence {
295 0 : // If the deletion list falls below valid_seq, we may assume that it was
296 0 : // already validated the last time this pageserver ran. Otherwise, we still
297 0 : // load it, as it may still contain content valid in this generation.
298 0 : deletion_list.validated = true;
299 0 : } else {
300 : // Special case optimization: if a tenant is still attached, and no other
301 : // generation was issued to another node in the interval while we restarted,
302 : // then we may treat deletion lists from the previous generation as if they
303 : // belong to our currently attached generation, and proceed to validate & execute.
304 4 : for (tenant_shard_id, tenant_list) in &mut deletion_list.tenants {
305 2 : if let Some(attached_gen) = attached_tenants.get(tenant_shard_id) {
306 2 : if attached_gen.previous() == tenant_list.generation {
307 1 : info!(
308 : seq=%s, tenant_id=%tenant_shard_id.tenant_id,
309 0 : shard_id=%tenant_shard_id.shard_slug(),
310 : old_gen=?tenant_list.generation, new_gen=?attached_gen,
311 0 : "Updating gen on recovered list");
312 1 : tenant_list.generation = *attached_gen;
313 : } else {
314 1 : info!(
315 : seq=%s, tenant_id=%tenant_shard_id.tenant_id,
316 0 : shard_id=%tenant_shard_id.shard_slug(),
317 : old_gen=?tenant_list.generation, new_gen=?attached_gen,
318 0 : "Encountered stale generation on recovered list");
319 : }
320 0 : }
321 : }
322 : }
323 :
324 2 : info!(
325 : validated = deletion_list.validated,
326 : sequence = deletion_list.sequence,
327 0 : "Recovered deletion list"
328 : );
329 :
330 : // We will drop out of recovery if this fails: it indicates that we are shutting down
331 : // or the backend has panicked
332 2 : metrics::DELETION_QUEUE
333 2 : .keys_submitted
334 2 : .inc_by(deletion_list.len() as u64);
335 2 : self.tx
336 2 : .send(ValidatorQueueMessage::Delete(deletion_list))
337 2 : .await?;
338 : }
339 :
340 4 : info!(next_sequence = self.pending.sequence, "Replay complete");
341 :
342 4 : Ok(())
343 4 : }
344 :
345 : /// This is the front-end ingest, where we bundle up deletion requests into DeletionList
346 : /// and write them out, for later validation by the backend and execution by the executor.
347 4 : pub(super) async fn background(&mut self) {
348 4 : info!("Started deletion frontend worker");
349 :
350 : // Synchronous, but we only do it once per process lifetime so it's tolerable
351 4 : if let Err(e) = create_dir_all(self.conf.deletion_prefix()) {
352 0 : tracing::error!(
353 0 : "Failed to create deletion list directory {}, deletions will not be executed ({e})",
354 0 : self.conf.deletion_prefix(),
355 : );
356 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
357 0 : return;
358 4 : }
359 :
360 26 : while !self.cancel.is_cancelled() {
361 25 : let timeout = if self.pending_flushes.is_empty() {
362 25 : FRONTEND_DEFAULT_TIMEOUT
363 : } else {
364 0 : FRONTEND_FLUSHING_TIMEOUT
365 : };
366 :
367 25 : let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
368 21 : Ok(Some(msg)) => msg,
369 : Ok(None) => {
370 : // Queue sender destroyed, shutting down
371 0 : break;
372 : }
373 : Err(_) => {
374 : // Hit deadline, flush.
375 1 : self.flush().await;
376 1 : continue;
377 : }
378 : };
379 :
380 21 : match msg {
381 5 : ListWriterQueueMessage::Delete(op) => {
382 5 : assert!(
383 5 : self.recovered,
384 0 : "Cannot process deletions before recovery. This is a bug."
385 : );
386 :
387 5 : debug!(
388 0 : "Delete: ingesting {} layers, {} other objects",
389 0 : op.layers.len(),
390 0 : op.objects.len()
391 : );
392 :
393 5 : let mut layer_paths = Vec::new();
394 10 : for (layer, meta) in op.layers {
395 5 : layer_paths.push(remote_layer_path(
396 5 : &op.tenant_shard_id.tenant_id,
397 5 : &op.timeline_id,
398 5 : meta.shard,
399 5 : &layer,
400 5 : meta.generation,
401 5 : ));
402 5 : }
403 5 : layer_paths.extend(op.objects);
404 :
405 5 : if !self.pending.push(
406 5 : &op.tenant_shard_id,
407 5 : &op.timeline_id,
408 5 : op.generation,
409 5 : &mut layer_paths,
410 5 : ) {
411 1 : self.flush().await;
412 1 : let retry_succeeded = self.pending.push(
413 1 : &op.tenant_shard_id,
414 1 : &op.timeline_id,
415 1 : op.generation,
416 1 : &mut layer_paths,
417 : );
418 1 : if !retry_succeeded {
419 : // Unexpected: after we flush, we should have
420 : // drained self.pending, so a conflict on
421 : // generation numbers should be impossible.
422 0 : tracing::error!(
423 0 : "Failed to enqueue deletions, leaking objects. This is a bug."
424 : );
425 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
426 1 : }
427 4 : }
428 : }
429 7 : ListWriterQueueMessage::Flush(op) => {
430 7 : if self.pending.is_empty() {
431 : // Execute immediately
432 3 : debug!("Flush: No pending objects, flushing immediately");
433 3 : op.notify()
434 : } else {
435 : // Execute next time we flush
436 4 : debug!("Flush: adding to pending flush list for next deadline flush");
437 4 : self.pending_flushes.push(op);
438 : }
439 : }
440 5 : ListWriterQueueMessage::FlushExecute(op) => {
441 5 : debug!("FlushExecute: passing through to backend");
442 : // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
443 5 : if let Err(e) = self.tx.send(ValidatorQueueMessage::Flush(op)).await {
444 0 : info!("Can't flush, shutting down ({e})");
445 : // Caller will get error when their oneshot sender was dropped.
446 5 : }
447 : }
448 4 : ListWriterQueueMessage::Recover(op) => {
449 4 : if self.recovered {
450 0 : tracing::error!(
451 0 : "Deletion queue recovery called more than once. This is a bug."
452 : );
453 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
454 : // Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
455 0 : continue;
456 4 : }
457 :
458 4 : if let Err(e) = self.recover(op.attached_tenants).await {
459 : // This should only happen in truly unrecoverable cases, like the recovery finding that the backend
460 : // queue receiver has been dropped, or something is critically broken with
461 : // the local filesystem holding deletion lists.
462 0 : info!(
463 0 : "Deletion queue recover aborted, deletion queue will not proceed ({e})"
464 : );
465 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
466 0 : return;
467 4 : } else {
468 4 : self.recovered = true;
469 4 : }
470 : }
471 : }
472 :
473 21 : if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
474 4 : self.flush().await;
475 17 : }
476 : }
477 1 : info!("Deletion queue shut down.");
478 1 : }
479 : }
|