Line data Source code
1 : //! The list writer is the first stage in the deletion queue. It accumulates
2 : //! layers to delete, and periodically writes out these layers into a persistent
3 : //! DeletionList.
4 : //!
5 : //! The purpose of writing DeletionLists is to decouple the decision to
6 : //! delete an object from the validation required to execute it: even if
7 : //! validation is not possible, e.g. due to a control plane outage, we can
8 : //! still persist our intent to delete an object, in a way that would
9 : //! survive a restart.
10 : //!
11 : //! DeletionLists are passed onwards to the Validator.
12 :
13 : use super::DeletionHeader;
14 : use super::DeletionList;
15 : use super::FlushOp;
16 : use super::ValidatorQueueMessage;
17 :
18 : use std::collections::HashMap;
19 : use std::fs::create_dir_all;
20 : use std::time::Duration;
21 :
22 : use pageserver_api::shard::TenantShardId;
23 : use regex::Regex;
24 : use remote_storage::RemotePath;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::debug;
27 : use tracing::info;
28 : use tracing::warn;
29 : use utils::generation::Generation;
30 : use utils::id::TimelineId;
31 :
32 : use crate::config::PageServerConf;
33 : use crate::deletion_queue::TEMP_SUFFIX;
34 : use crate::metrics;
35 : use crate::tenant::remote_timeline_client::remote_layer_path;
36 : use crate::tenant::remote_timeline_client::LayerFileMetadata;
37 : use crate::tenant::storage_layer::LayerFileName;
38 : use crate::virtual_file::on_fatal_io_error;
39 : use crate::virtual_file::MaybeFatalIo;
40 :
41 : // The number of keys in a DeletionList before we will proactively persist it
42 : // (without reaching a flush deadline). This aims to deliver objects of the order
43 : // of magnitude 1MB when we are under heavy delete load.
44 : const DELETION_LIST_TARGET_SIZE: usize = 16384;
45 :
46 : // Ordinarily, we only flush to DeletionList periodically, to bound the window during
47 : // which we might leak objects from not flushing a DeletionList after
48 : // the objects are already unlinked from timeline metadata.
49 : const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
50 :
51 : // If someone is waiting for a flush to DeletionList, only delay a little to accumulate
52 : // more objects before doing the flush.
53 : const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
54 :
55 0 : #[derive(Debug)]
56 : pub(super) struct DeletionOp {
57 : pub(super) tenant_shard_id: TenantShardId,
58 : pub(super) timeline_id: TimelineId,
59 : // `layers` and `objects` are both just lists of objects. `layers` is used if you do not
60 : // have a config object handy to project it to a remote key, and need the consuming worker
61 : // to do it for you.
62 : pub(super) layers: Vec<(LayerFileName, LayerFileMetadata)>,
63 : pub(super) objects: Vec<RemotePath>,
64 :
65 : /// The _current_ generation of the Tenant shard attachment in which we are enqueuing
66 : /// this deletion.
67 : pub(super) generation: Generation,
68 : }
69 :
70 0 : #[derive(Debug)]
71 : pub(super) struct RecoverOp {
72 : pub(super) attached_tenants: HashMap<TenantShardId, Generation>,
73 : }
74 :
75 0 : #[derive(Debug)]
76 : pub(super) enum ListWriterQueueMessage {
77 : Delete(DeletionOp),
78 : // Wait until all prior deletions make it into a persistent DeletionList
79 : Flush(FlushOp),
80 : // Wait until all prior deletions have been executed (i.e. objects are actually deleted)
81 : FlushExecute(FlushOp),
82 : // Call once after re-attaching to control plane, to notify the deletion queue about
83 : // latest attached generations & load any saved deletion lists from disk.
84 : Recover(RecoverOp),
85 : }
86 :
87 : pub(super) struct ListWriter {
88 : conf: &'static PageServerConf,
89 :
90 : // Incoming frontend requests to delete some keys
91 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
92 :
93 : // Outbound requests to the backend to execute deletion lists we have composed.
94 : tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
95 :
96 : // The list we are currently building, contains a buffer of keys to delete
97 : // and our next sequence number
98 : pending: DeletionList,
99 :
100 : // These FlushOps should notify the next time we flush
101 : pending_flushes: Vec<FlushOp>,
102 :
103 : // Worker loop is torn down when this fires.
104 : cancel: CancellationToken,
105 :
106 : // Safety guard to do recovery exactly once
107 : recovered: bool,
108 : }
109 :
110 : impl ListWriter {
111 : // Initially DeletionHeader.validated_sequence is zero. The place we start our
112 : // sequence numbers must be higher than that.
113 : const BASE_SEQUENCE: u64 = 1;
114 :
115 612 : pub(super) fn new(
116 612 : conf: &'static PageServerConf,
117 612 : rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
118 612 : tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
119 612 : cancel: CancellationToken,
120 612 : ) -> Self {
121 612 : Self {
122 612 : pending: DeletionList::new(Self::BASE_SEQUENCE),
123 612 : conf,
124 612 : rx,
125 612 : tx,
126 612 : pending_flushes: Vec::new(),
127 612 : cancel,
128 612 : recovered: false,
129 612 : }
130 612 : }
131 :
132 : /// Try to flush `list` to persistent storage
133 : ///
134 : /// This does not return errors, because on failure to flush we do not lose
135 : /// any state: flushing will be retried implicitly on the next deadline
136 558 : async fn flush(&mut self) {
137 558 : if self.pending.is_empty() {
138 503 : for f in self.pending_flushes.drain(..) {
139 0 : f.notify();
140 0 : }
141 503 : return;
142 55 : }
143 55 :
144 55 : match self.pending.save(self.conf).await {
145 : Ok(_) => {
146 55 : info!(sequence = self.pending.sequence, "Stored deletion list");
147 :
148 55 : for f in self.pending_flushes.drain(..) {
149 36 : f.notify();
150 36 : }
151 :
152 : // Take the list we've accumulated, replace it with a fresh list for the next sequence
153 55 : let next_list = DeletionList::new(self.pending.sequence + 1);
154 55 : let list = std::mem::replace(&mut self.pending, next_list);
155 :
156 55 : if let Err(e) = self.tx.send(ValidatorQueueMessage::Delete(list)).await {
157 : // This is allowed to fail: it will only happen if the backend worker is shut down,
158 : // so we can just drop this on the floor.
159 0 : info!("Deletion list dropped, this is normal during shutdown ({e:#})");
160 55 : }
161 : }
162 0 : Err(e) => {
163 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
164 0 : warn!(
165 0 : sequence = self.pending.sequence,
166 0 : "Failed to write deletion list, will retry later ({e:#})"
167 0 : );
168 : }
169 : }
170 558 : }
171 :
172 : /// Load the header, to learn the sequence number up to which deletions
173 : /// have been validated. We will apply validated=true to DeletionLists
174 : /// <= this sequence when loading them.
175 : ///
176 : /// It is not an error for the header to not exist: we return None, and
177 : /// the caller should act as if validated_sequence is 0
178 611 : async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
179 611 : let header_path = self.conf.deletion_header_path();
180 611 : match tokio::fs::read(&header_path).await {
181 18 : Ok(header_bytes) => {
182 18 : match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
183 18 : Ok(h) => Ok(Some(h.validated_sequence)),
184 0 : Err(e) => {
185 0 : warn!(
186 0 : "Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
187 0 : );
188 : // This should never happen unless we make a mistake with our serialization.
189 : // Ignoring a deletion header is not consequential for correctnes because all deletions
190 : // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
191 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
192 0 : Ok(None)
193 : }
194 : }
195 : }
196 593 : Err(e) => {
197 593 : if e.kind() == std::io::ErrorKind::NotFound {
198 0 : debug!("Deletion header {header_path} not found, first start?");
199 593 : Ok(None)
200 : } else {
201 0 : on_fatal_io_error(&e, "reading deletion header");
202 : }
203 : }
204 : }
205 611 : }
206 :
207 611 : async fn recover(
208 611 : &mut self,
209 611 : attached_tenants: HashMap<TenantShardId, Generation>,
210 611 : ) -> Result<(), anyhow::Error> {
211 0 : debug!(
212 0 : "recovering with {} attached tenants",
213 0 : attached_tenants.len()
214 0 : );
215 :
216 : // Load the header
217 611 : let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
218 611 :
219 611 : self.pending.sequence = validated_sequence + 1;
220 611 :
221 611 : let deletion_directory = self.conf.deletion_prefix();
222 611 : let mut dir = tokio::fs::read_dir(&deletion_directory)
223 603 : .await
224 611 : .fatal_err("read deletion directory");
225 611 :
226 611 : let list_name_pattern =
227 611 : Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
228 611 :
229 611 : let temp_extension = format!(".{TEMP_SUFFIX}");
230 611 : let header_path = self.conf.deletion_header_path();
231 611 : let mut seqs: Vec<u64> = Vec::new();
232 657 : while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
233 46 : let file_name = dentry.file_name();
234 46 : let dentry_str = file_name.to_string_lossy();
235 46 :
236 46 : if file_name == header_path.file_name().unwrap_or("") {
237 : // Don't try and parse the header's name like a list
238 18 : continue;
239 28 : }
240 28 :
241 28 : if dentry_str.ends_with(&temp_extension) {
242 0 : info!("Cleaning up temporary file {dentry_str}");
243 0 : let absolute_path =
244 0 : deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
245 0 : tokio::fs::remove_file(&absolute_path)
246 0 : .await
247 0 : .fatal_err("delete temp file");
248 0 :
249 0 : continue;
250 28 : }
251 28 :
252 28 : let file_name = dentry.file_name().to_owned();
253 28 : let basename = file_name.to_string_lossy();
254 28 : let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
255 28 : m.name("sequence")
256 28 : .expect("Non optional group should be present")
257 28 : .as_str()
258 : } else {
259 0 : warn!("Unexpected key in deletion queue: {basename}");
260 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
261 0 : continue;
262 : };
263 :
264 28 : let seq: u64 = match u64::from_str_radix(seq_part, 16) {
265 28 : Ok(s) => s,
266 0 : Err(e) => {
267 0 : warn!("Malformed key '{basename}': {e}");
268 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
269 0 : continue;
270 : }
271 : };
272 28 : seqs.push(seq);
273 : }
274 611 : seqs.sort();
275 611 :
276 611 : // Start our next deletion list from after the last location validated by
277 611 : // previous process lifetime, or after the last location found (it is updated
278 611 : // below after enumerating the deletion lists)
279 611 : self.pending.sequence = validated_sequence + 1;
280 611 : if let Some(max_list_seq) = seqs.last() {
281 26 : self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
282 585 : }
283 :
284 639 : for s in seqs {
285 28 : let list_path = self.conf.deletion_list_path(s);
286 :
287 28 : let list_bytes = tokio::fs::read(&list_path)
288 28 : .await
289 28 : .fatal_err("read deletion list");
290 :
291 28 : let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
292 28 : Ok(l) => l,
293 0 : Err(e) => {
294 : // Drop the list on the floor: any objects it referenced will be left behind
295 : // for scrubbing to clean up. This should never happen unless we have a serialization bug.
296 0 : warn!(sequence = s, "Failed to deserialize deletion list: {e}");
297 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
298 0 : continue;
299 : }
300 : };
301 :
302 28 : if deletion_list.sequence <= validated_sequence {
303 2 : // If the deletion list falls below valid_seq, we may assume that it was
304 2 : // already validated the last time this pageserver ran. Otherwise, we still
305 2 : // load it, as it may still contain content valid in this generation.
306 2 : deletion_list.validated = true;
307 2 : } else {
308 : // Special case optimization: if a tenant is still attached, and no other
309 : // generation was issued to another node in the interval while we restarted,
310 : // then we may treat deletion lists from the previous generation as if they
311 : // belong to our currently attached generation, and proceed to validate & execute.
312 55 : for (tenant_shard_id, tenant_list) in &mut deletion_list.tenants {
313 29 : if let Some(attached_gen) = attached_tenants.get(tenant_shard_id) {
314 19 : if attached_gen.previous() == tenant_list.generation {
315 15 : info!(
316 15 : seq=%s, tenant_id=%tenant_shard_id.tenant_id,
317 15 : shard_id=%tenant_shard_id.shard_slug(),
318 15 : old_gen=?tenant_list.generation, new_gen=?attached_gen,
319 15 : "Updating gen on recovered list");
320 15 : tenant_list.generation = *attached_gen;
321 : } else {
322 4 : info!(
323 4 : seq=%s, tenant_id=%tenant_shard_id.tenant_id,
324 4 : shard_id=%tenant_shard_id.shard_slug(),
325 4 : old_gen=?tenant_list.generation, new_gen=?attached_gen,
326 4 : "Encountered stale generation on recovered list");
327 : }
328 10 : }
329 : }
330 : }
331 :
332 28 : info!(
333 28 : validated = deletion_list.validated,
334 28 : sequence = deletion_list.sequence,
335 28 : "Recovered deletion list"
336 28 : );
337 :
338 : // We will drop out of recovery if this fails: it indicates that we are shutting down
339 : // or the backend has panicked
340 28 : metrics::DELETION_QUEUE
341 28 : .keys_submitted
342 28 : .inc_by(deletion_list.len() as u64);
343 28 : self.tx
344 28 : .send(ValidatorQueueMessage::Delete(deletion_list))
345 0 : .await?;
346 : }
347 :
348 611 : info!(next_sequence = self.pending.sequence, "Replay complete");
349 :
350 611 : Ok(())
351 611 : }
352 :
353 : /// This is the front-end ingest, where we bundle up deletion requests into DeletionList
354 : /// and write them out, for later validation by the backend and execution by the executor.
355 612 : pub(super) async fn background(&mut self) {
356 612 : info!("Started deletion frontend worker");
357 :
358 : // Synchronous, but we only do it once per process lifetime so it's tolerable
359 612 : if let Err(e) = create_dir_all(self.conf.deletion_prefix()) {
360 0 : tracing::error!(
361 0 : "Failed to create deletion list directory {}, deletions will not be executed ({e})",
362 0 : self.conf.deletion_prefix(),
363 0 : );
364 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
365 0 : return;
366 612 : }
367 :
368 6229 : while !self.cancel.is_cancelled() {
369 6226 : let timeout = if self.pending_flushes.is_empty() {
370 6226 : FRONTEND_DEFAULT_TIMEOUT
371 : } else {
372 0 : FRONTEND_FLUSHING_TIMEOUT
373 : };
374 :
375 6226 : let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
376 5098 : Ok(Some(msg)) => msg,
377 : Ok(None) => {
378 : // Queue sender destroyed, shutting down
379 0 : break;
380 : }
381 : Err(_) => {
382 : // Hit deadline, flush.
383 519 : self.flush().await;
384 519 : continue;
385 : }
386 : };
387 :
388 5098 : match msg {
389 4099 : ListWriterQueueMessage::Delete(op) => {
390 4099 : assert!(
391 4099 : self.recovered,
392 0 : "Cannot process deletions before recovery. This is a bug."
393 : );
394 :
395 0 : debug!(
396 0 : "Delete: ingesting {} layers, {} other objects",
397 0 : op.layers.len(),
398 0 : op.objects.len()
399 0 : );
400 :
401 4099 : let mut layer_paths = Vec::new();
402 7793 : for (layer, meta) in op.layers {
403 3694 : layer_paths.push(remote_layer_path(
404 3694 : &op.tenant_shard_id.tenant_id,
405 3694 : &op.timeline_id,
406 3694 : meta.shard,
407 3694 : &layer,
408 3694 : meta.generation,
409 3694 : ));
410 3694 : }
411 4099 : layer_paths.extend(op.objects);
412 4099 :
413 4099 : if !self.pending.push(
414 4099 : &op.tenant_shard_id,
415 4099 : &op.timeline_id,
416 4099 : op.generation,
417 4099 : &mut layer_paths,
418 4099 : ) {
419 3 : self.flush().await;
420 3 : let retry_succeeded = self.pending.push(
421 3 : &op.tenant_shard_id,
422 3 : &op.timeline_id,
423 3 : op.generation,
424 3 : &mut layer_paths,
425 3 : );
426 3 : if !retry_succeeded {
427 : // Unexpected: after we flush, we should have
428 : // drained self.pending, so a conflict on
429 : // generation numbers should be impossible.
430 0 : tracing::error!(
431 0 : "Failed to enqueue deletions, leaking objects. This is a bug."
432 0 : );
433 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
434 3 : }
435 4096 : }
436 : }
437 207 : ListWriterQueueMessage::Flush(op) => {
438 207 : if self.pending.is_empty() {
439 : // Execute immediately
440 0 : debug!("Flush: No pending objects, flushing immediately");
441 171 : op.notify()
442 : } else {
443 : // Execute next time we flush
444 0 : debug!("Flush: adding to pending flush list for next deadline flush");
445 36 : self.pending_flushes.push(op);
446 : }
447 : }
448 181 : ListWriterQueueMessage::FlushExecute(op) => {
449 0 : debug!("FlushExecute: passing through to backend");
450 : // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
451 181 : if let Err(e) = self.tx.send(ValidatorQueueMessage::Flush(op)).await {
452 0 : info!("Can't flush, shutting down ({e})");
453 : // Caller will get error when their oneshot sender was dropped.
454 181 : }
455 : }
456 611 : ListWriterQueueMessage::Recover(op) => {
457 611 : if self.recovered {
458 0 : tracing::error!(
459 0 : "Deletion queue recovery called more than once. This is a bug."
460 0 : );
461 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
462 0 : // Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
463 0 : continue;
464 611 : }
465 :
466 1238 : if let Err(e) = self.recover(op.attached_tenants).await {
467 : // This should only happen in truly unrecoverable cases, like the recovery finding that the backend
468 : // queue receiver has been dropped, or something is critically broken with
469 : // the local filesystem holding deletion lists.
470 0 : info!(
471 0 : "Deletion queue recover aborted, deletion queue will not proceed ({e})"
472 0 : );
473 0 : metrics::DELETION_QUEUE.unexpected_errors.inc();
474 0 : return;
475 611 : } else {
476 611 : self.recovered = true;
477 611 : }
478 : }
479 : }
480 :
481 5098 : if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
482 36 : self.flush().await;
483 5062 : }
484 : }
485 3 : info!("Deletion queue shut down.");
486 3 : }
487 : }
|