Line data Source code
1 : //! The list writer is the first stage in the deletion queue. It accumulates
2 : //! layers to delete, and periodically writes out these layers into a persistent
3 : //! DeletionList.
4 : //!
5 : //! The purpose of writing DeletionLists is to decouple the decision to
6 : //! delete an object from the validation required to execute it: even if
7 : //! validation is not possible, e.g. due to a control plane outage, we can
8 : //! still persist our intent to delete an object, in a way that would
9 : //! survive a restart.
10 : //!
11 : //! DeletionLists are passed onwards to the Validator.
12 :
13 : use std::collections::HashMap;
14 : use std::fs::create_dir_all;
15 : use std::time::Duration;
16 :
17 : use pageserver_api::shard::TenantShardId;
18 : use regex::Regex;
19 : use remote_storage::RemotePath;
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{debug, info, warn};
22 : use utils::generation::Generation;
23 : use utils::id::TimelineId;
24 :
25 : use super::{DeletionHeader, DeletionList, FlushOp, ValidatorQueueMessage};
26 : use crate::config::PageServerConf;
27 : use crate::deletion_queue::TEMP_SUFFIX;
28 : use crate::metrics;
29 : use crate::tenant::remote_timeline_client::{LayerFileMetadata, remote_layer_path};
30 : use crate::tenant::storage_layer::LayerName;
31 : use crate::virtual_file::{MaybeFatalIo, on_fatal_io_error};
32 :
33 : // The number of keys in a DeletionList before we will proactively persist it
34 : // (without reaching a flush deadline). This aims to deliver objects of the order
35 : // of magnitude 1MB when we are under heavy delete load.
36 : const DELETION_LIST_TARGET_SIZE: usize = 16384;
37 :
38 : // Ordinarily, we only flush to DeletionList periodically, to bound the window during
39 : // which we might leak objects from not flushing a DeletionList after
40 : // the objects are already unlinked from timeline metadata.
41 : const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
42 :
43 : // If someone is waiting for a flush to DeletionList, only delay a little to accumulate
44 : // more objects before doing the flush.
45 : const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
46 :
47 : #[derive(Debug)]
48 : pub(super) struct DeletionOp {
49 : pub(super) tenant_shard_id: TenantShardId,
50 : pub(super) timeline_id: TimelineId,
51 : // `layers` and `objects` are both just lists of objects. `layers` is used if you do not
52 : // have a config object handy to project it to a remote key, and need the consuming worker
53 : // to do it for you.
54 : pub(super) layers: Vec<(LayerName, LayerFileMetadata)>,
55 : pub(super) objects: Vec<RemotePath>,
56 :
57 : /// The _current_ generation of the Tenant shard attachment in which we are enqueuing
58 : /// this deletion.
59 : pub(super) generation: Generation,
60 : }
61 :
62 : #[derive(Debug)]
63 : pub(super) struct RecoverOp {
64 : pub(super) attached_tenants: HashMap<TenantShardId, Generation>,
65 : }
66 :
67 : #[derive(Debug)]
68 : pub(super) enum ListWriterQueueMessage {
69 : Delete(DeletionOp),
70 : // Wait until all prior deletions make it into a persistent DeletionList
71 : Flush(FlushOp),
72 : // Wait until all prior deletions have been executed (i.e. objects are actually deleted)
73 : FlushExecute(FlushOp),
74 : // Call once after re-attaching to control plane, to notify the deletion queue about
75 : // latest attached generations & load any saved deletion lists from disk.
76 : Recover(RecoverOp),
77 : }
78 :
79 : pub(super) struct ListWriter {
80 : conf: &'static PageServerConf,
81 :
82 : // Incoming frontend requests to delete some keys
83 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
84 :
85 : // Outbound requests to the backend to execute deletion lists we have composed.
86 : tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
87 :
88 : // The list we are currently building, contains a buffer of keys to delete
89 : // and our next sequence number
90 : pending: DeletionList,
91 :
92 : // These FlushOps should notify the next time we flush
93 : pending_flushes: Vec<FlushOp>,
94 :
95 : // Worker loop is torn down when this fires.
96 : cancel: CancellationToken,
97 :
98 : // Safety guard to do recovery exactly once
99 : recovered: bool,
100 : }
101 :
102 : impl ListWriter {
103 : // Initially DeletionHeader.validated_sequence is zero. The place we start our
104 : // sequence numbers must be higher than that.
105 : const BASE_SEQUENCE: u64 = 1;
106 :
107 16 : pub(super) fn new(
108 16 : conf: &'static PageServerConf,
109 16 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
110 16 : tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
111 16 : cancel: CancellationToken,
112 16 : ) -> Self {
113 16 : Self {
114 16 : pending: DeletionList::new(Self::BASE_SEQUENCE),
115 16 : conf,
116 16 : rx,
117 16 : tx,
118 16 : pending_flushes: Vec::new(),
119 16 : cancel,
120 16 : recovered: false,
121 16 : }
122 16 : }
123 :
124 : /// Try to flush `list` to persistent storage
125 : ///
126 : /// This does not return errors, because on failure to flush we do not lose
127 : /// any state: flushing will be retried implicitly on the next deadline
128 25 : async fn flush(&mut self) {
129 25 : if self.pending.is_empty() {
130 5 : for f in self.pending_flushes.drain(..) {
131 0 : f.notify();
132 0 : }
133 5 : return;
134 20 : }
135 20 :
136 20 : match self.pending.save(self.conf).await {
137 : Ok(_) => {
138 20 : info!(sequence = self.pending.sequence, "Stored deletion list");
139 :
140 20 : for f in self.pending_flushes.drain(..) {
141 16 : f.notify();
142 16 : }
143 :
144 : // Take the list we've accumulated, replace it with a fresh list for the next sequence
145 20 : let next_list = DeletionList::new(self.pending.sequence + 1);
146 20 : let list = std::mem::replace(&mut self.pending, next_list);
147 :
148 20 : if let Err(e) = self.tx.send(ValidatorQueueMessage::Delete(list)).await {
149 : // This is allowed to fail: it will only happen if the backend worker is shut down,
150 : // so we can just drop this on the floor.
151 0 : info!("Deletion list dropped, this is normal during shutdown ({e:#})");
152 20 : }
153 : }
154 0 : Err(e) => {
155 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
156 0 : warn!(
157 : sequence = self.pending.sequence,
158 0 : "Failed to write deletion list, will retry later ({e:#})"
159 : );
160 : }
161 : }
162 25 : }
163 :
164 : /// Load the header, to learn the sequence number up to which deletions
165 : /// have been validated. We will apply validated=true to DeletionLists
166 : /// <= this sequence when loading them.
167 : ///
168 : /// It is not an error for the header to not exist: we return None, and
169 : /// the caller should act as if validated_sequence is 0
170 16 : async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
171 16 : let header_path = self.conf.deletion_header_path();
172 16 : match tokio::fs::read(&header_path).await {
173 0 : Ok(header_bytes) => {
174 0 : match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
175 0 : Ok(h) => Ok(Some(h.validated_sequence)),
176 0 : Err(e) => {
177 0 : warn!(
178 0 : "Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
179 : );
180 : // This should never happen unless we make a mistake with our serialization.
181 : // Ignoring a deletion header is not consequential for correctnes because all deletions
182 : // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
183 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
184 0 : Ok(None)
185 : }
186 : }
187 : }
188 16 : Err(e) => {
189 16 : if e.kind() == std::io::ErrorKind::NotFound {
190 16 : debug!("Deletion header {header_path} not found, first start?");
191 16 : Ok(None)
192 : } else {
193 0 : on_fatal_io_error(&e, "reading deletion header");
194 : }
195 : }
196 : }
197 16 : }
198 :
199 16 : async fn recover(
200 16 : &mut self,
201 16 : attached_tenants: HashMap<TenantShardId, Generation>,
202 16 : ) -> Result<(), anyhow::Error> {
203 16 : debug!(
204 0 : "recovering with {} attached tenants",
205 0 : attached_tenants.len()
206 : );
207 :
208 : // Load the header
209 16 : let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
210 16 :
211 16 : self.pending.sequence = validated_sequence + 1;
212 16 :
213 16 : let deletion_directory = self.conf.deletion_prefix();
214 16 : let mut dir = tokio::fs::read_dir(&deletion_directory)
215 16 : .await
216 16 : .fatal_err("read deletion directory");
217 16 :
218 16 : let list_name_pattern =
219 16 : Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
220 16 :
221 16 : let temp_extension = format!(".{TEMP_SUFFIX}");
222 16 : let header_path = self.conf.deletion_header_path();
223 16 : let mut seqs: Vec<u64> = Vec::new();
224 24 : while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
225 8 : let file_name = dentry.file_name();
226 8 : let dentry_str = file_name.to_string_lossy();
227 8 :
228 8 : if file_name == header_path.file_name().unwrap_or("") {
229 : // Don't try and parse the header's name like a list
230 0 : continue;
231 8 : }
232 8 :
233 8 : if dentry_str.ends_with(&temp_extension) {
234 0 : info!("Cleaning up temporary file {dentry_str}");
235 0 : let absolute_path =
236 0 : deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
237 0 : tokio::fs::remove_file(&absolute_path)
238 0 : .await
239 0 : .fatal_err("delete temp file");
240 0 :
241 0 : continue;
242 8 : }
243 8 :
244 8 : let file_name = dentry.file_name().to_owned();
245 8 : let basename = file_name.to_string_lossy();
246 8 : let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
247 8 : m.name("sequence")
248 8 : .expect("Non optional group should be present")
249 8 : .as_str()
250 : } else {
251 0 : warn!("Unexpected key in deletion queue: {basename}");
252 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
253 0 : continue;
254 : };
255 :
256 8 : let seq: u64 = match u64::from_str_radix(seq_part, 16) {
257 8 : Ok(s) => s,
258 0 : Err(e) => {
259 0 : warn!("Malformed key '{basename}': {e}");
260 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
261 0 : continue;
262 : }
263 : };
264 8 : seqs.push(seq);
265 : }
266 16 : seqs.sort();
267 16 :
268 16 : // Start our next deletion list from after the last location validated by
269 16 : // previous process lifetime, or after the last location found (it is updated
270 16 : // below after enumerating the deletion lists)
271 16 : self.pending.sequence = validated_sequence + 1;
272 16 : if let Some(max_list_seq) = seqs.last() {
273 4 : self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
274 12 : }
275 :
276 24 : for s in seqs {
277 8 : let list_path = self.conf.deletion_list_path(s);
278 :
279 8 : let list_bytes = tokio::fs::read(&list_path)
280 8 : .await
281 8 : .fatal_err("read deletion list");
282 :
283 8 : let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
284 8 : Ok(l) => l,
285 0 : Err(e) => {
286 0 : // Drop the list on the floor: any objects it referenced will be left behind
287 0 : // for scrubbing to clean up. This should never happen unless we have a serialization bug.
288 0 : warn!(sequence = s, "Failed to deserialize deletion list: {e}");
289 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
290 0 : continue;
291 : }
292 : };
293 :
294 8 : if deletion_list.sequence <= validated_sequence {
295 0 : // If the deletion list falls below valid_seq, we may assume that it was
296 0 : // already validated the last time this pageserver ran. Otherwise, we still
297 0 : // load it, as it may still contain content valid in this generation.
298 0 : deletion_list.validated = true;
299 0 : } else {
300 : // Special case optimization: if a tenant is still attached, and no other
301 : // generation was issued to another node in the interval while we restarted,
302 : // then we may treat deletion lists from the previous generation as if they
303 : // belong to our currently attached generation, and proceed to validate & execute.
304 16 : for (tenant_shard_id, tenant_list) in &mut deletion_list.tenants {
305 8 : if let Some(attached_gen) = attached_tenants.get(tenant_shard_id) {
306 8 : if attached_gen.previous() == tenant_list.generation {
307 4 : info!(
308 : seq=%s, tenant_id=%tenant_shard_id.tenant_id,
309 0 : shard_id=%tenant_shard_id.shard_slug(),
310 0 : old_gen=?tenant_list.generation, new_gen=?attached_gen,
311 0 : "Updating gen on recovered list");
312 4 : tenant_list.generation = *attached_gen;
313 : } else {
314 4 : info!(
315 : seq=%s, tenant_id=%tenant_shard_id.tenant_id,
316 0 : shard_id=%tenant_shard_id.shard_slug(),
317 0 : old_gen=?tenant_list.generation, new_gen=?attached_gen,
318 0 : "Encountered stale generation on recovered list");
319 : }
320 0 : }
321 : }
322 : }
323 :
324 8 : info!(
325 : validated = deletion_list.validated,
326 : sequence = deletion_list.sequence,
327 0 : "Recovered deletion list"
328 : );
329 :
330 : // We will drop out of recovery if this fails: it indicates that we are shutting down
331 : // or the backend has panicked
332 8 : metrics::DELETION_QUEUE
333 8 : .keys_submitted
334 8 : .inc_by(deletion_list.len() as u64);
335 8 : self.tx
336 8 : .send(ValidatorQueueMessage::Delete(deletion_list))
337 8 : .await?;
338 : }
339 :
340 16 : info!(next_sequence = self.pending.sequence, "Replay complete");
341 :
342 16 : Ok(())
343 16 : }
344 :
345 : /// This is the front-end ingest, where we bundle up deletion requests into DeletionList
346 : /// and write them out, for later validation by the backend and execution by the executor.
347 16 : pub(super) async fn background(&mut self) {
348 16 : info!("Started deletion frontend worker");
349 :
350 : // Synchronous, but we only do it once per process lifetime so it's tolerable
351 16 : if let Err(e) = create_dir_all(self.conf.deletion_prefix()) {
352 0 : tracing::error!(
353 0 : "Failed to create deletion list directory {}, deletions will not be executed ({e})",
354 0 : self.conf.deletion_prefix(),
355 : );
356 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
357 0 : return;
358 16 : }
359 :
360 105 : while !self.cancel.is_cancelled() {
361 101 : let timeout = if self.pending_flushes.is_empty() {
362 101 : FRONTEND_DEFAULT_TIMEOUT
363 : } else {
364 0 : FRONTEND_FLUSHING_TIMEOUT
365 : };
366 :
367 101 : let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
368 84 : Ok(Some(msg)) => msg,
369 : Ok(None) => {
370 : // Queue sender destroyed, shutting down
371 0 : break;
372 : }
373 : Err(_) => {
374 : // Hit deadline, flush.
375 5 : self.flush().await;
376 5 : continue;
377 : }
378 : };
379 :
380 84 : match msg {
381 20 : ListWriterQueueMessage::Delete(op) => {
382 20 : assert!(
383 20 : self.recovered,
384 0 : "Cannot process deletions before recovery. This is a bug."
385 : );
386 :
387 20 : debug!(
388 0 : "Delete: ingesting {} layers, {} other objects",
389 0 : op.layers.len(),
390 0 : op.objects.len()
391 : );
392 :
393 20 : let mut layer_paths = Vec::new();
394 40 : for (layer, meta) in op.layers {
395 20 : layer_paths.push(remote_layer_path(
396 20 : &op.tenant_shard_id.tenant_id,
397 20 : &op.timeline_id,
398 20 : meta.shard,
399 20 : &layer,
400 20 : meta.generation,
401 20 : ));
402 20 : }
403 20 : layer_paths.extend(op.objects);
404 20 :
405 20 : if !self.pending.push(
406 20 : &op.tenant_shard_id,
407 20 : &op.timeline_id,
408 20 : op.generation,
409 20 : &mut layer_paths,
410 20 : ) {
411 4 : self.flush().await;
412 4 : let retry_succeeded = self.pending.push(
413 4 : &op.tenant_shard_id,
414 4 : &op.timeline_id,
415 4 : op.generation,
416 4 : &mut layer_paths,
417 4 : );
418 4 : if !retry_succeeded {
419 : // Unexpected: after we flush, we should have
420 : // drained self.pending, so a conflict on
421 : // generation numbers should be impossible.
422 0 : tracing::error!(
423 0 : "Failed to enqueue deletions, leaking objects. This is a bug."
424 : );
425 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
426 4 : }
427 16 : }
428 : }
429 28 : ListWriterQueueMessage::Flush(op) => {
430 28 : if self.pending.is_empty() {
431 : // Execute immediately
432 12 : debug!("Flush: No pending objects, flushing immediately");
433 12 : op.notify()
434 : } else {
435 : // Execute next time we flush
436 16 : debug!("Flush: adding to pending flush list for next deadline flush");
437 16 : self.pending_flushes.push(op);
438 : }
439 : }
440 20 : ListWriterQueueMessage::FlushExecute(op) => {
441 20 : debug!("FlushExecute: passing through to backend");
442 : // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
443 20 : if let Err(e) = self.tx.send(ValidatorQueueMessage::Flush(op)).await {
444 0 : info!("Can't flush, shutting down ({e})");
445 : // Caller will get error when their oneshot sender was dropped.
446 20 : }
447 : }
448 16 : ListWriterQueueMessage::Recover(op) => {
449 16 : if self.recovered {
450 0 : tracing::error!(
451 0 : "Deletion queue recovery called more than once. This is a bug."
452 : );
453 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
454 0 : // Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
455 0 : continue;
456 16 : }
457 :
458 16 : if let Err(e) = self.recover(op.attached_tenants).await {
459 : // This should only happen in truly unrecoverable cases, like the recovery finding that the backend
460 : // queue receiver has been dropped, or something is critically broken with
461 : // the local filesystem holding deletion lists.
462 0 : info!(
463 0 : "Deletion queue recover aborted, deletion queue will not proceed ({e})"
464 : );
465 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
466 0 : return;
467 16 : } else {
468 16 : self.recovered = true;
469 16 : }
470 : }
471 : }
472 :
473 84 : if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
474 16 : self.flush().await;
475 68 : }
476 : }
477 4 : info!("Deletion queue shut down.");
478 4 : }
479 : }
|