Line data Source code
1 : //! The list writer is the first stage in the deletion queue. It accumulates
2 : //! layers to delete, and periodically writes out these layers into a persistent
3 : //! DeletionList.
4 : //!
5 : //! The purpose of writing DeletionLists is to decouple the decision to
6 : //! delete an object from the validation required to execute it: even if
7 : //! validation is not possible, e.g. due to a control plane outage, we can
8 : //! still persist our intent to delete an object, in a way that would
9 : //! survive a restart.
10 : //!
11 : //! DeletionLists are passed onwards to the Validator.
12 :
13 : use super::DeletionHeader;
14 : use super::DeletionList;
15 : use super::FlushOp;
16 : use super::ValidatorQueueMessage;
17 :
18 : use std::collections::HashMap;
19 : use std::fs::create_dir_all;
20 : use std::time::Duration;
21 :
22 : use pageserver_api::shard::TenantShardId;
23 : use regex::Regex;
24 : use remote_storage::RemotePath;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::debug;
27 : use tracing::info;
28 : use tracing::warn;
29 : use utils::generation::Generation;
30 : use utils::id::TimelineId;
31 :
32 : use crate::config::PageServerConf;
33 : use crate::deletion_queue::TEMP_SUFFIX;
34 : use crate::metrics;
35 : use crate::tenant::remote_timeline_client::remote_layer_path;
36 : use crate::tenant::remote_timeline_client::LayerFileMetadata;
37 : use crate::tenant::storage_layer::LayerName;
38 : use crate::virtual_file::on_fatal_io_error;
39 : use crate::virtual_file::MaybeFatalIo;
40 :
41 : // The number of keys in a DeletionList before we will proactively persist it
42 : // (without reaching a flush deadline). This aims to deliver objects of the order
43 : // of magnitude 1MB when we are under heavy delete load.
44 : const DELETION_LIST_TARGET_SIZE: usize = 16384;
45 :
46 : // Ordinarily, we only flush to DeletionList periodically, to bound the window during
47 : // which we might leak objects from not flushing a DeletionList after
48 : // the objects are already unlinked from timeline metadata.
49 : const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
50 :
51 : // If someone is waiting for a flush to DeletionList, only delay a little to accumulate
52 : // more objects before doing the flush.
53 : const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
54 :
55 : #[derive(Debug)]
56 : pub(super) struct DeletionOp {
57 : pub(super) tenant_shard_id: TenantShardId,
58 : pub(super) timeline_id: TimelineId,
59 : // `layers` and `objects` are both just lists of objects. `layers` is used if you do not
60 : // have a config object handy to project it to a remote key, and need the consuming worker
61 : // to do it for you.
62 : pub(super) layers: Vec<(LayerName, LayerFileMetadata)>,
63 : pub(super) objects: Vec<RemotePath>,
64 :
65 : /// The _current_ generation of the Tenant shard attachment in which we are enqueuing
66 : /// this deletion.
67 : pub(super) generation: Generation,
68 : }
69 :
70 : #[derive(Debug)]
71 : pub(super) struct RecoverOp {
72 : pub(super) attached_tenants: HashMap<TenantShardId, Generation>,
73 : }
74 :
75 : #[derive(Debug)]
76 : pub(super) enum ListWriterQueueMessage {
77 : Delete(DeletionOp),
78 : // Wait until all prior deletions make it into a persistent DeletionList
79 : Flush(FlushOp),
80 : // Wait until all prior deletions have been executed (i.e. objects are actually deleted)
81 : FlushExecute(FlushOp),
82 : // Call once after re-attaching to control plane, to notify the deletion queue about
83 : // latest attached generations & load any saved deletion lists from disk.
84 : Recover(RecoverOp),
85 : }
86 :
87 : pub(super) struct ListWriter {
88 : conf: &'static PageServerConf,
89 :
90 : // Incoming frontend requests to delete some keys
91 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
92 :
93 : // Outbound requests to the backend to execute deletion lists we have composed.
94 : tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
95 :
96 : // The list we are currently building, contains a buffer of keys to delete
97 : // and our next sequence number
98 : pending: DeletionList,
99 :
100 : // These FlushOps should notify the next time we flush
101 : pending_flushes: Vec<FlushOp>,
102 :
103 : // Worker loop is torn down when this fires.
104 : cancel: CancellationToken,
105 :
106 : // Safety guard to do recovery exactly once
107 : recovered: bool,
108 : }
109 :
110 : impl ListWriter {
111 : // Initially DeletionHeader.validated_sequence is zero. The place we start our
112 : // sequence numbers must be higher than that.
113 : const BASE_SEQUENCE: u64 = 1;
114 :
115 8 : pub(super) fn new(
116 8 : conf: &'static PageServerConf,
117 8 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
118 8 : tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
119 8 : cancel: CancellationToken,
120 8 : ) -> Self {
121 8 : Self {
122 8 : pending: DeletionList::new(Self::BASE_SEQUENCE),
123 8 : conf,
124 8 : rx,
125 8 : tx,
126 8 : pending_flushes: Vec::new(),
127 8 : cancel,
128 8 : recovered: false,
129 8 : }
130 8 : }
131 :
132 : /// Try to flush `list` to persistent storage
133 : ///
134 : /// This does not return errors, because on failure to flush we do not lose
135 : /// any state: flushing will be retried implicitly on the next deadline
136 12 : async fn flush(&mut self) {
137 12 : if self.pending.is_empty() {
138 2 : for f in self.pending_flushes.drain(..) {
139 0 : f.notify();
140 0 : }
141 2 : return;
142 10 : }
143 10 :
144 10 : match self.pending.save(self.conf).await {
145 : Ok(_) => {
146 10 : info!(sequence = self.pending.sequence, "Stored deletion list");
147 :
148 10 : for f in self.pending_flushes.drain(..) {
149 8 : f.notify();
150 8 : }
151 :
152 : // Take the list we've accumulated, replace it with a fresh list for the next sequence
153 10 : let next_list = DeletionList::new(self.pending.sequence + 1);
154 10 : let list = std::mem::replace(&mut self.pending, next_list);
155 :
156 10 : if let Err(e) = self.tx.send(ValidatorQueueMessage::Delete(list)).await {
157 : // This is allowed to fail: it will only happen if the backend worker is shut down,
158 : // so we can just drop this on the floor.
159 0 : info!("Deletion list dropped, this is normal during shutdown ({e:#})");
160 10 : }
161 : }
162 0 : Err(e) => {
163 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
164 0 : warn!(
165 : sequence = self.pending.sequence,
166 0 : "Failed to write deletion list, will retry later ({e:#})"
167 : );
168 : }
169 : }
170 12 : }
171 :
172 : /// Load the header, to learn the sequence number up to which deletions
173 : /// have been validated. We will apply validated=true to DeletionLists
174 : /// <= this sequence when loading them.
175 : ///
176 : /// It is not an error for the header to not exist: we return None, and
177 : /// the caller should act as if validated_sequence is 0
178 8 : async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
179 8 : let header_path = self.conf.deletion_header_path();
180 8 : match tokio::fs::read(&header_path).await {
181 0 : Ok(header_bytes) => {
182 0 : match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
183 0 : Ok(h) => Ok(Some(h.validated_sequence)),
184 0 : Err(e) => {
185 0 : warn!(
186 0 : "Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
187 : );
188 : // This should never happen unless we make a mistake with our serialization.
189 : // Ignoring a deletion header is not consequential for correctnes because all deletions
190 : // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
191 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
192 0 : Ok(None)
193 : }
194 : }
195 : }
196 8 : Err(e) => {
197 8 : if e.kind() == std::io::ErrorKind::NotFound {
198 8 : debug!("Deletion header {header_path} not found, first start?");
199 8 : Ok(None)
200 : } else {
201 0 : on_fatal_io_error(&e, "reading deletion header");
202 : }
203 : }
204 : }
205 8 : }
206 :
207 8 : async fn recover(
208 8 : &mut self,
209 8 : attached_tenants: HashMap<TenantShardId, Generation>,
210 8 : ) -> Result<(), anyhow::Error> {
211 8 : debug!(
212 0 : "recovering with {} attached tenants",
213 0 : attached_tenants.len()
214 : );
215 :
216 : // Load the header
217 8 : let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
218 8 :
219 8 : self.pending.sequence = validated_sequence + 1;
220 8 :
221 8 : let deletion_directory = self.conf.deletion_prefix();
222 8 : let mut dir = tokio::fs::read_dir(&deletion_directory)
223 8 : .await
224 8 : .fatal_err("read deletion directory");
225 8 :
226 8 : let list_name_pattern =
227 8 : Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
228 8 :
229 8 : let temp_extension = format!(".{TEMP_SUFFIX}");
230 8 : let header_path = self.conf.deletion_header_path();
231 8 : let mut seqs: Vec<u64> = Vec::new();
232 12 : while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
233 4 : let file_name = dentry.file_name();
234 4 : let dentry_str = file_name.to_string_lossy();
235 4 :
236 4 : if file_name == header_path.file_name().unwrap_or("") {
237 : // Don't try and parse the header's name like a list
238 0 : continue;
239 4 : }
240 4 :
241 4 : if dentry_str.ends_with(&temp_extension) {
242 0 : info!("Cleaning up temporary file {dentry_str}");
243 0 : let absolute_path =
244 0 : deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
245 0 : tokio::fs::remove_file(&absolute_path)
246 0 : .await
247 0 : .fatal_err("delete temp file");
248 0 :
249 0 : continue;
250 4 : }
251 4 :
252 4 : let file_name = dentry.file_name().to_owned();
253 4 : let basename = file_name.to_string_lossy();
254 4 : let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
255 4 : m.name("sequence")
256 4 : .expect("Non optional group should be present")
257 4 : .as_str()
258 : } else {
259 0 : warn!("Unexpected key in deletion queue: {basename}");
260 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
261 0 : continue;
262 : };
263 :
264 4 : let seq: u64 = match u64::from_str_radix(seq_part, 16) {
265 4 : Ok(s) => s,
266 0 : Err(e) => {
267 0 : warn!("Malformed key '{basename}': {e}");
268 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
269 0 : continue;
270 : }
271 : };
272 4 : seqs.push(seq);
273 : }
274 8 : seqs.sort();
275 8 :
276 8 : // Start our next deletion list from after the last location validated by
277 8 : // previous process lifetime, or after the last location found (it is updated
278 8 : // below after enumerating the deletion lists)
279 8 : self.pending.sequence = validated_sequence + 1;
280 8 : if let Some(max_list_seq) = seqs.last() {
281 2 : self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
282 6 : }
283 :
284 12 : for s in seqs {
285 4 : let list_path = self.conf.deletion_list_path(s);
286 :
287 4 : let list_bytes = tokio::fs::read(&list_path)
288 4 : .await
289 4 : .fatal_err("read deletion list");
290 :
291 4 : let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
292 4 : Ok(l) => l,
293 0 : Err(e) => {
294 0 : // Drop the list on the floor: any objects it referenced will be left behind
295 0 : // for scrubbing to clean up. This should never happen unless we have a serialization bug.
296 0 : warn!(sequence = s, "Failed to deserialize deletion list: {e}");
297 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
298 0 : continue;
299 : }
300 : };
301 :
302 4 : if deletion_list.sequence <= validated_sequence {
303 0 : // If the deletion list falls below valid_seq, we may assume that it was
304 0 : // already validated the last time this pageserver ran. Otherwise, we still
305 0 : // load it, as it may still contain content valid in this generation.
306 0 : deletion_list.validated = true;
307 0 : } else {
308 : // Special case optimization: if a tenant is still attached, and no other
309 : // generation was issued to another node in the interval while we restarted,
310 : // then we may treat deletion lists from the previous generation as if they
311 : // belong to our currently attached generation, and proceed to validate & execute.
312 8 : for (tenant_shard_id, tenant_list) in &mut deletion_list.tenants {
313 4 : if let Some(attached_gen) = attached_tenants.get(tenant_shard_id) {
314 4 : if attached_gen.previous() == tenant_list.generation {
315 2 : info!(
316 : seq=%s, tenant_id=%tenant_shard_id.tenant_id,
317 0 : shard_id=%tenant_shard_id.shard_slug(),
318 0 : old_gen=?tenant_list.generation, new_gen=?attached_gen,
319 0 : "Updating gen on recovered list");
320 2 : tenant_list.generation = *attached_gen;
321 : } else {
322 2 : info!(
323 : seq=%s, tenant_id=%tenant_shard_id.tenant_id,
324 0 : shard_id=%tenant_shard_id.shard_slug(),
325 0 : old_gen=?tenant_list.generation, new_gen=?attached_gen,
326 0 : "Encountered stale generation on recovered list");
327 : }
328 0 : }
329 : }
330 : }
331 :
332 4 : info!(
333 : validated = deletion_list.validated,
334 : sequence = deletion_list.sequence,
335 0 : "Recovered deletion list"
336 : );
337 :
338 : // We will drop out of recovery if this fails: it indicates that we are shutting down
339 : // or the backend has panicked
340 4 : metrics::DELETION_QUEUE
341 4 : .keys_submitted
342 4 : .inc_by(deletion_list.len() as u64);
343 4 : self.tx
344 4 : .send(ValidatorQueueMessage::Delete(deletion_list))
345 0 : .await?;
346 : }
347 :
348 8 : info!(next_sequence = self.pending.sequence, "Replay complete");
349 :
350 8 : Ok(())
351 8 : }
352 :
353 : /// This is the front-end ingest, where we bundle up deletion requests into DeletionList
354 : /// and write them out, for later validation by the backend and execution by the executor.
355 8 : pub(super) async fn background(&mut self) {
356 8 : info!("Started deletion frontend worker");
357 :
358 : // Synchronous, but we only do it once per process lifetime so it's tolerable
359 8 : if let Err(e) = create_dir_all(self.conf.deletion_prefix()) {
360 0 : tracing::error!(
361 0 : "Failed to create deletion list directory {}, deletions will not be executed ({e})",
362 0 : self.conf.deletion_prefix(),
363 : );
364 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
365 0 : return;
366 8 : }
367 :
368 52 : while !self.cancel.is_cancelled() {
369 50 : let timeout = if self.pending_flushes.is_empty() {
370 50 : FRONTEND_DEFAULT_TIMEOUT
371 : } else {
372 0 : FRONTEND_FLUSHING_TIMEOUT
373 : };
374 :
375 50 : let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
376 42 : Ok(Some(msg)) => msg,
377 : Ok(None) => {
378 : // Queue sender destroyed, shutting down
379 0 : break;
380 : }
381 : Err(_) => {
382 : // Hit deadline, flush.
383 2 : self.flush().await;
384 2 : continue;
385 : }
386 : };
387 :
388 42 : match msg {
389 10 : ListWriterQueueMessage::Delete(op) => {
390 10 : assert!(
391 10 : self.recovered,
392 0 : "Cannot process deletions before recovery. This is a bug."
393 : );
394 :
395 10 : debug!(
396 0 : "Delete: ingesting {} layers, {} other objects",
397 0 : op.layers.len(),
398 0 : op.objects.len()
399 : );
400 :
401 10 : let mut layer_paths = Vec::new();
402 20 : for (layer, meta) in op.layers {
403 10 : layer_paths.push(remote_layer_path(
404 10 : &op.tenant_shard_id.tenant_id,
405 10 : &op.timeline_id,
406 10 : meta.shard,
407 10 : &layer,
408 10 : meta.generation,
409 10 : ));
410 10 : }
411 10 : layer_paths.extend(op.objects);
412 10 :
413 10 : if !self.pending.push(
414 10 : &op.tenant_shard_id,
415 10 : &op.timeline_id,
416 10 : op.generation,
417 10 : &mut layer_paths,
418 10 : ) {
419 2 : self.flush().await;
420 2 : let retry_succeeded = self.pending.push(
421 2 : &op.tenant_shard_id,
422 2 : &op.timeline_id,
423 2 : op.generation,
424 2 : &mut layer_paths,
425 2 : );
426 2 : if !retry_succeeded {
427 : // Unexpected: after we flush, we should have
428 : // drained self.pending, so a conflict on
429 : // generation numbers should be impossible.
430 0 : tracing::error!(
431 0 : "Failed to enqueue deletions, leaking objects. This is a bug."
432 : );
433 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
434 2 : }
435 8 : }
436 : }
437 14 : ListWriterQueueMessage::Flush(op) => {
438 14 : if self.pending.is_empty() {
439 : // Execute immediately
440 6 : debug!("Flush: No pending objects, flushing immediately");
441 6 : op.notify()
442 : } else {
443 : // Execute next time we flush
444 8 : debug!("Flush: adding to pending flush list for next deadline flush");
445 8 : self.pending_flushes.push(op);
446 : }
447 : }
448 10 : ListWriterQueueMessage::FlushExecute(op) => {
449 10 : debug!("FlushExecute: passing through to backend");
450 : // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
451 10 : if let Err(e) = self.tx.send(ValidatorQueueMessage::Flush(op)).await {
452 0 : info!("Can't flush, shutting down ({e})");
453 : // Caller will get error when their oneshot sender was dropped.
454 10 : }
455 : }
456 8 : ListWriterQueueMessage::Recover(op) => {
457 8 : if self.recovered {
458 0 : tracing::error!(
459 0 : "Deletion queue recovery called more than once. This is a bug."
460 : );
461 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
462 0 : // Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
463 0 : continue;
464 8 : }
465 :
466 20 : if let Err(e) = self.recover(op.attached_tenants).await {
467 : // This should only happen in truly unrecoverable cases, like the recovery finding that the backend
468 : // queue receiver has been dropped, or something is critically broken with
469 : // the local filesystem holding deletion lists.
470 0 : info!(
471 0 : "Deletion queue recover aborted, deletion queue will not proceed ({e})"
472 : );
473 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
474 0 : return;
475 8 : } else {
476 8 : self.recovered = true;
477 8 : }
478 : }
479 : }
480 :
481 42 : if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
482 8 : self.flush().await;
483 34 : }
484 : }
485 2 : info!("Deletion queue shut down.");
486 2 : }
487 : }
|