LCOV - code coverage report
Current view: top level - pageserver/src/deletion_queue - list_writer.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 69.8 % 255 178
Test Date: 2025-03-12 18:28:53 Functions: 100.0 % 9 9

            Line data    Source code
       1              : //! The list writer is the first stage in the deletion queue.  It accumulates
       2              : //! layers to delete, and periodically writes out these layers into a persistent
       3              : //! DeletionList.
       4              : //!
       5              : //! The purpose of writing DeletionLists is to decouple the decision to
       6              : //! delete an object from the validation required to execute it: even if
       7              : //! validation is not possible, e.g. due to a control plane outage, we can
       8              : //! still persist our intent to delete an object, in a way that would
       9              : //! survive a restart.
      10              : //!
      11              : //! DeletionLists are passed onwards to the Validator.
      12              : 
      13              : use std::collections::HashMap;
      14              : use std::fs::create_dir_all;
      15              : use std::time::Duration;
      16              : 
      17              : use pageserver_api::shard::TenantShardId;
      18              : use regex::Regex;
      19              : use remote_storage::RemotePath;
      20              : use tokio_util::sync::CancellationToken;
      21              : use tracing::{debug, info, warn};
      22              : use utils::generation::Generation;
      23              : use utils::id::TimelineId;
      24              : 
      25              : use super::{DeletionHeader, DeletionList, FlushOp, ValidatorQueueMessage};
      26              : use crate::config::PageServerConf;
      27              : use crate::deletion_queue::TEMP_SUFFIX;
      28              : use crate::metrics;
      29              : use crate::tenant::remote_timeline_client::{LayerFileMetadata, remote_layer_path};
      30              : use crate::tenant::storage_layer::LayerName;
      31              : use crate::virtual_file::{MaybeFatalIo, on_fatal_io_error};
      32              : 
      33              : // The number of keys in a DeletionList before we will proactively persist it
      34              : // (without reaching a flush deadline).  This aims to deliver objects of the order
      35              : // of magnitude 1MB when we are under heavy delete load.
      36              : const DELETION_LIST_TARGET_SIZE: usize = 16384;
      37              : 
      38              : // Ordinarily, we only flush to DeletionList periodically, to bound the window during
      39              : // which we might leak objects from not flushing a DeletionList after
      40              : // the objects are already unlinked from timeline metadata.
      41              : const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
      42              : 
      43              : // If someone is waiting for a flush to DeletionList, only delay a little to accumulate
      44              : // more objects before doing the flush.
      45              : const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
      46              : 
      47              : #[derive(Debug)]
      48              : pub(super) struct DeletionOp {
      49              :     pub(super) tenant_shard_id: TenantShardId,
      50              :     pub(super) timeline_id: TimelineId,
      51              :     // `layers` and `objects` are both just lists of objects.  `layers` is used if you do not
      52              :     // have a config object handy to project it to a remote key, and need the consuming worker
      53              :     // to do it for you.
      54              :     pub(super) layers: Vec<(LayerName, LayerFileMetadata)>,
      55              :     pub(super) objects: Vec<RemotePath>,
      56              : 
      57              :     /// The _current_ generation of the Tenant shard attachment in which we are enqueuing
      58              :     /// this deletion.
      59              :     pub(super) generation: Generation,
      60              : }
      61              : 
      62              : #[derive(Debug)]
      63              : pub(super) struct RecoverOp {
      64              :     pub(super) attached_tenants: HashMap<TenantShardId, Generation>,
      65              : }
      66              : 
      67              : #[derive(Debug)]
      68              : pub(super) enum ListWriterQueueMessage {
      69              :     Delete(DeletionOp),
      70              :     // Wait until all prior deletions make it into a persistent DeletionList
      71              :     Flush(FlushOp),
      72              :     // Wait until all prior deletions have been executed (i.e. objects are actually deleted)
      73              :     FlushExecute(FlushOp),
      74              :     // Call once after re-attaching to control plane, to notify the deletion queue about
      75              :     // latest attached generations & load any saved deletion lists from disk.
      76              :     Recover(RecoverOp),
      77              : }
      78              : 
      79              : pub(super) struct ListWriter {
      80              :     conf: &'static PageServerConf,
      81              : 
      82              :     // Incoming frontend requests to delete some keys
      83              :     rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
      84              : 
      85              :     // Outbound requests to the backend to execute deletion lists we have composed.
      86              :     tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
      87              : 
      88              :     // The list we are currently building, contains a buffer of keys to delete
      89              :     // and our next sequence number
      90              :     pending: DeletionList,
      91              : 
      92              :     // These FlushOps should notify the next time we flush
      93              :     pending_flushes: Vec<FlushOp>,
      94              : 
      95              :     // Worker loop is torn down when this fires.
      96              :     cancel: CancellationToken,
      97              : 
      98              :     // Safety guard to do recovery exactly once
      99              :     recovered: bool,
     100              : }
     101              : 
     102              : impl ListWriter {
     103              :     // Initially DeletionHeader.validated_sequence is zero.  The place we start our
     104              :     // sequence numbers must be higher than that.
     105              :     const BASE_SEQUENCE: u64 = 1;
     106              : 
     107           16 :     pub(super) fn new(
     108           16 :         conf: &'static PageServerConf,
     109           16 :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
     110           16 :         tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
     111           16 :         cancel: CancellationToken,
     112           16 :     ) -> Self {
     113           16 :         Self {
     114           16 :             pending: DeletionList::new(Self::BASE_SEQUENCE),
     115           16 :             conf,
     116           16 :             rx,
     117           16 :             tx,
     118           16 :             pending_flushes: Vec::new(),
     119           16 :             cancel,
     120           16 :             recovered: false,
     121           16 :         }
     122           16 :     }
     123              : 
     124              :     /// Try to flush `list` to persistent storage
     125              :     ///
     126              :     /// This does not return errors, because on failure to flush we do not lose
     127              :     /// any state: flushing will be retried implicitly on the next deadline
     128           25 :     async fn flush(&mut self) {
     129           25 :         if self.pending.is_empty() {
     130            5 :             for f in self.pending_flushes.drain(..) {
     131            0 :                 f.notify();
     132            0 :             }
     133            5 :             return;
     134           20 :         }
     135           20 : 
     136           20 :         match self.pending.save(self.conf).await {
     137              :             Ok(_) => {
     138           20 :                 info!(sequence = self.pending.sequence, "Stored deletion list");
     139              : 
     140           20 :                 for f in self.pending_flushes.drain(..) {
     141           16 :                     f.notify();
     142           16 :                 }
     143              : 
     144              :                 // Take the list we've accumulated, replace it with a fresh list for the next sequence
     145           20 :                 let next_list = DeletionList::new(self.pending.sequence + 1);
     146           20 :                 let list = std::mem::replace(&mut self.pending, next_list);
     147              : 
     148           20 :                 if let Err(e) = self.tx.send(ValidatorQueueMessage::Delete(list)).await {
     149              :                     // This is allowed to fail: it will only happen if the backend worker is shut down,
     150              :                     // so we can just drop this on the floor.
     151            0 :                     info!("Deletion list dropped, this is normal during shutdown ({e:#})");
     152           20 :                 }
     153              :             }
     154            0 :             Err(e) => {
     155            0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     156            0 :                 warn!(
     157              :                     sequence = self.pending.sequence,
     158            0 :                     "Failed to write deletion list, will retry later ({e:#})"
     159              :                 );
     160              :             }
     161              :         }
     162           25 :     }
     163              : 
     164              :     /// Load the header, to learn the sequence number up to which deletions
     165              :     /// have been validated.  We will apply validated=true to DeletionLists
     166              :     /// <= this sequence when loading them.
     167              :     ///
     168              :     /// It is not an error for the header to not exist: we return None, and
     169              :     /// the caller should act as if validated_sequence is 0
     170           16 :     async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
     171           16 :         let header_path = self.conf.deletion_header_path();
     172           16 :         match tokio::fs::read(&header_path).await {
     173            0 :             Ok(header_bytes) => {
     174            0 :                 match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
     175            0 :                     Ok(h) => Ok(Some(h.validated_sequence)),
     176            0 :                     Err(e) => {
     177            0 :                         warn!(
     178            0 :                             "Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
     179              :                         );
     180              :                         // This should never happen unless we make a mistake with our serialization.
     181              :                         // Ignoring a deletion header is not consequential for correctnes because all deletions
     182              :                         // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
     183            0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     184            0 :                         Ok(None)
     185              :                     }
     186              :                 }
     187              :             }
     188           16 :             Err(e) => {
     189           16 :                 if e.kind() == std::io::ErrorKind::NotFound {
     190           16 :                     debug!("Deletion header {header_path} not found, first start?");
     191           16 :                     Ok(None)
     192              :                 } else {
     193            0 :                     on_fatal_io_error(&e, "reading deletion header");
     194              :                 }
     195              :             }
     196              :         }
     197           16 :     }
     198              : 
     199           16 :     async fn recover(
     200           16 :         &mut self,
     201           16 :         attached_tenants: HashMap<TenantShardId, Generation>,
     202           16 :     ) -> Result<(), anyhow::Error> {
     203           16 :         debug!(
     204            0 :             "recovering with {} attached tenants",
     205            0 :             attached_tenants.len()
     206              :         );
     207              : 
     208              :         // Load the header
     209           16 :         let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
     210           16 : 
     211           16 :         self.pending.sequence = validated_sequence + 1;
     212           16 : 
     213           16 :         let deletion_directory = self.conf.deletion_prefix();
     214           16 :         let mut dir = tokio::fs::read_dir(&deletion_directory)
     215           16 :             .await
     216           16 :             .fatal_err("read deletion directory");
     217           16 : 
     218           16 :         let list_name_pattern =
     219           16 :             Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
     220           16 : 
     221           16 :         let temp_extension = format!(".{TEMP_SUFFIX}");
     222           16 :         let header_path = self.conf.deletion_header_path();
     223           16 :         let mut seqs: Vec<u64> = Vec::new();
     224           24 :         while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
     225            8 :             let file_name = dentry.file_name();
     226            8 :             let dentry_str = file_name.to_string_lossy();
     227            8 : 
     228            8 :             if file_name == header_path.file_name().unwrap_or("") {
     229              :                 // Don't try and parse the header's name like a list
     230            0 :                 continue;
     231            8 :             }
     232            8 : 
     233            8 :             if dentry_str.ends_with(&temp_extension) {
     234            0 :                 info!("Cleaning up temporary file {dentry_str}");
     235            0 :                 let absolute_path =
     236            0 :                     deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
     237            0 :                 tokio::fs::remove_file(&absolute_path)
     238            0 :                     .await
     239            0 :                     .fatal_err("delete temp file");
     240            0 : 
     241            0 :                 continue;
     242            8 :             }
     243            8 : 
     244            8 :             let file_name = dentry.file_name().to_owned();
     245            8 :             let basename = file_name.to_string_lossy();
     246            8 :             let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
     247            8 :                 m.name("sequence")
     248            8 :                     .expect("Non optional group should be present")
     249            8 :                     .as_str()
     250              :             } else {
     251            0 :                 warn!("Unexpected key in deletion queue: {basename}");
     252            0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     253            0 :                 continue;
     254              :             };
     255              : 
     256            8 :             let seq: u64 = match u64::from_str_radix(seq_part, 16) {
     257            8 :                 Ok(s) => s,
     258            0 :                 Err(e) => {
     259            0 :                     warn!("Malformed key '{basename}': {e}");
     260            0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     261            0 :                     continue;
     262              :                 }
     263              :             };
     264            8 :             seqs.push(seq);
     265              :         }
     266           16 :         seqs.sort();
     267           16 : 
     268           16 :         // Start our next deletion list from after the last location validated by
     269           16 :         // previous process lifetime, or after the last location found (it is updated
     270           16 :         // below after enumerating the deletion lists)
     271           16 :         self.pending.sequence = validated_sequence + 1;
     272           16 :         if let Some(max_list_seq) = seqs.last() {
     273            4 :             self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
     274           12 :         }
     275              : 
     276           24 :         for s in seqs {
     277            8 :             let list_path = self.conf.deletion_list_path(s);
     278              : 
     279            8 :             let list_bytes = tokio::fs::read(&list_path)
     280            8 :                 .await
     281            8 :                 .fatal_err("read deletion list");
     282              : 
     283            8 :             let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
     284            8 :                 Ok(l) => l,
     285            0 :                 Err(e) => {
     286            0 :                     // Drop the list on the floor: any objects it referenced will be left behind
     287            0 :                     // for scrubbing to clean up.  This should never happen unless we have a serialization bug.
     288            0 :                     warn!(sequence = s, "Failed to deserialize deletion list: {e}");
     289            0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     290            0 :                     continue;
     291              :                 }
     292              :             };
     293              : 
     294            8 :             if deletion_list.sequence <= validated_sequence {
     295            0 :                 // If the deletion list falls below valid_seq, we may assume that it was
     296            0 :                 // already validated the last time this pageserver ran.  Otherwise, we still
     297            0 :                 // load it, as it may still contain content valid in this generation.
     298            0 :                 deletion_list.validated = true;
     299            0 :             } else {
     300              :                 // Special case optimization: if a tenant is still attached, and no other
     301              :                 // generation was issued to another node in the interval while we restarted,
     302              :                 // then we may treat deletion lists from the previous generation as if they
     303              :                 // belong to our currently attached generation, and proceed to validate & execute.
     304           16 :                 for (tenant_shard_id, tenant_list) in &mut deletion_list.tenants {
     305            8 :                     if let Some(attached_gen) = attached_tenants.get(tenant_shard_id) {
     306            8 :                         if attached_gen.previous() == tenant_list.generation {
     307            4 :                             info!(
     308              :                                 seq=%s, tenant_id=%tenant_shard_id.tenant_id,
     309            0 :                                 shard_id=%tenant_shard_id.shard_slug(),
     310            0 :                                 old_gen=?tenant_list.generation, new_gen=?attached_gen,
     311            0 :                                 "Updating gen on recovered list");
     312            4 :                             tenant_list.generation = *attached_gen;
     313              :                         } else {
     314            4 :                             info!(
     315              :                                 seq=%s, tenant_id=%tenant_shard_id.tenant_id,
     316            0 :                                 shard_id=%tenant_shard_id.shard_slug(),
     317            0 :                                 old_gen=?tenant_list.generation, new_gen=?attached_gen,
     318            0 :                                 "Encountered stale generation on recovered list");
     319              :                         }
     320            0 :                     }
     321              :                 }
     322              :             }
     323              : 
     324            8 :             info!(
     325              :                 validated = deletion_list.validated,
     326              :                 sequence = deletion_list.sequence,
     327            0 :                 "Recovered deletion list"
     328              :             );
     329              : 
     330              :             // We will drop out of recovery if this fails: it indicates that we are shutting down
     331              :             // or the backend has panicked
     332            8 :             metrics::DELETION_QUEUE
     333            8 :                 .keys_submitted
     334            8 :                 .inc_by(deletion_list.len() as u64);
     335            8 :             self.tx
     336            8 :                 .send(ValidatorQueueMessage::Delete(deletion_list))
     337            8 :                 .await?;
     338              :         }
     339              : 
     340           16 :         info!(next_sequence = self.pending.sequence, "Replay complete");
     341              : 
     342           16 :         Ok(())
     343           16 :     }
     344              : 
     345              :     /// This is the front-end ingest, where we bundle up deletion requests into DeletionList
     346              :     /// and write them out, for later validation by the backend and execution by the executor.
     347           16 :     pub(super) async fn background(&mut self) {
     348           16 :         info!("Started deletion frontend worker");
     349              : 
     350              :         // Synchronous, but we only do it once per process lifetime so it's tolerable
     351           16 :         if let Err(e) = create_dir_all(self.conf.deletion_prefix()) {
     352            0 :             tracing::error!(
     353            0 :                 "Failed to create deletion list directory {}, deletions will not be executed ({e})",
     354            0 :                 self.conf.deletion_prefix(),
     355              :             );
     356            0 :             metrics::DELETION_QUEUE.unexpected_errors.inc();
     357            0 :             return;
     358           16 :         }
     359              : 
     360          105 :         while !self.cancel.is_cancelled() {
     361          101 :             let timeout = if self.pending_flushes.is_empty() {
     362          101 :                 FRONTEND_DEFAULT_TIMEOUT
     363              :             } else {
     364            0 :                 FRONTEND_FLUSHING_TIMEOUT
     365              :             };
     366              : 
     367          101 :             let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
     368           84 :                 Ok(Some(msg)) => msg,
     369              :                 Ok(None) => {
     370              :                     // Queue sender destroyed, shutting down
     371            0 :                     break;
     372              :                 }
     373              :                 Err(_) => {
     374              :                     // Hit deadline, flush.
     375            5 :                     self.flush().await;
     376            5 :                     continue;
     377              :                 }
     378              :             };
     379              : 
     380           84 :             match msg {
     381           20 :                 ListWriterQueueMessage::Delete(op) => {
     382           20 :                     assert!(
     383           20 :                         self.recovered,
     384            0 :                         "Cannot process deletions before recovery.  This is a bug."
     385              :                     );
     386              : 
     387           20 :                     debug!(
     388            0 :                         "Delete: ingesting {} layers, {} other objects",
     389            0 :                         op.layers.len(),
     390            0 :                         op.objects.len()
     391              :                     );
     392              : 
     393           20 :                     let mut layer_paths = Vec::new();
     394           40 :                     for (layer, meta) in op.layers {
     395           20 :                         layer_paths.push(remote_layer_path(
     396           20 :                             &op.tenant_shard_id.tenant_id,
     397           20 :                             &op.timeline_id,
     398           20 :                             meta.shard,
     399           20 :                             &layer,
     400           20 :                             meta.generation,
     401           20 :                         ));
     402           20 :                     }
     403           20 :                     layer_paths.extend(op.objects);
     404           20 : 
     405           20 :                     if !self.pending.push(
     406           20 :                         &op.tenant_shard_id,
     407           20 :                         &op.timeline_id,
     408           20 :                         op.generation,
     409           20 :                         &mut layer_paths,
     410           20 :                     ) {
     411            4 :                         self.flush().await;
     412            4 :                         let retry_succeeded = self.pending.push(
     413            4 :                             &op.tenant_shard_id,
     414            4 :                             &op.timeline_id,
     415            4 :                             op.generation,
     416            4 :                             &mut layer_paths,
     417            4 :                         );
     418            4 :                         if !retry_succeeded {
     419              :                             // Unexpected: after we flush, we should have
     420              :                             // drained self.pending, so a conflict on
     421              :                             // generation numbers should be impossible.
     422            0 :                             tracing::error!(
     423            0 :                                 "Failed to enqueue deletions, leaking objects.  This is a bug."
     424              :                             );
     425            0 :                             metrics::DELETION_QUEUE.unexpected_errors.inc();
     426            4 :                         }
     427           16 :                     }
     428              :                 }
     429           28 :                 ListWriterQueueMessage::Flush(op) => {
     430           28 :                     if self.pending.is_empty() {
     431              :                         // Execute immediately
     432           12 :                         debug!("Flush: No pending objects, flushing immediately");
     433           12 :                         op.notify()
     434              :                     } else {
     435              :                         // Execute next time we flush
     436           16 :                         debug!("Flush: adding to pending flush list for next deadline flush");
     437           16 :                         self.pending_flushes.push(op);
     438              :                     }
     439              :                 }
     440           20 :                 ListWriterQueueMessage::FlushExecute(op) => {
     441           20 :                     debug!("FlushExecute: passing through to backend");
     442              :                     // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
     443           20 :                     if let Err(e) = self.tx.send(ValidatorQueueMessage::Flush(op)).await {
     444            0 :                         info!("Can't flush, shutting down ({e})");
     445              :                         // Caller will get error when their oneshot sender was dropped.
     446           20 :                     }
     447              :                 }
     448           16 :                 ListWriterQueueMessage::Recover(op) => {
     449           16 :                     if self.recovered {
     450            0 :                         tracing::error!(
     451            0 :                             "Deletion queue recovery called more than once.  This is a bug."
     452              :                         );
     453            0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     454            0 :                         // Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
     455            0 :                         continue;
     456           16 :                     }
     457              : 
     458           16 :                     if let Err(e) = self.recover(op.attached_tenants).await {
     459              :                         // This should only happen in truly unrecoverable cases, like the recovery finding that the backend
     460              :                         // queue receiver has been dropped, or something is critically broken with
     461              :                         // the local filesystem holding deletion lists.
     462            0 :                         info!(
     463            0 :                             "Deletion queue recover aborted, deletion queue will not proceed ({e})"
     464              :                         );
     465            0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     466            0 :                         return;
     467           16 :                     } else {
     468           16 :                         self.recovered = true;
     469           16 :                     }
     470              :                 }
     471              :             }
     472              : 
     473           84 :             if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
     474           16 :                 self.flush().await;
     475           68 :             }
     476              :         }
     477            4 :         info!("Deletion queue shut down.");
     478            4 :     }
     479              : }
        

Generated by: LCOV version 2.1-beta