LCOV - code coverage report
Current view: top level - pageserver/src/deletion_queue - list_writer.rs (source / functions) Coverage Total Hit
Test: 86c536b7fe84b2afe03c3bb264199e9c319ae0f8.info Lines: 69.4 % 255 177
Test Date: 2024-06-24 16:38:41 Functions: 100.0 % 9 9

            Line data    Source code
       1              : //! The list writer is the first stage in the deletion queue.  It accumulates
       2              : //! layers to delete, and periodically writes out these layers into a persistent
       3              : //! DeletionList.
       4              : //!
       5              : //! The purpose of writing DeletionLists is to decouple the decision to
       6              : //! delete an object from the validation required to execute it: even if
       7              : //! validation is not possible, e.g. due to a control plane outage, we can
       8              : //! still persist our intent to delete an object, in a way that would
       9              : //! survive a restart.
      10              : //!
      11              : //! DeletionLists are passed onwards to the Validator.
      12              : 
      13              : use super::DeletionHeader;
      14              : use super::DeletionList;
      15              : use super::FlushOp;
      16              : use super::ValidatorQueueMessage;
      17              : 
      18              : use std::collections::HashMap;
      19              : use std::fs::create_dir_all;
      20              : use std::time::Duration;
      21              : 
      22              : use pageserver_api::shard::TenantShardId;
      23              : use regex::Regex;
      24              : use remote_storage::RemotePath;
      25              : use tokio_util::sync::CancellationToken;
      26              : use tracing::debug;
      27              : use tracing::info;
      28              : use tracing::warn;
      29              : use utils::generation::Generation;
      30              : use utils::id::TimelineId;
      31              : 
      32              : use crate::config::PageServerConf;
      33              : use crate::deletion_queue::TEMP_SUFFIX;
      34              : use crate::metrics;
      35              : use crate::tenant::remote_timeline_client::remote_layer_path;
      36              : use crate::tenant::remote_timeline_client::LayerFileMetadata;
      37              : use crate::tenant::storage_layer::LayerName;
      38              : use crate::virtual_file::on_fatal_io_error;
      39              : use crate::virtual_file::MaybeFatalIo;
      40              : 
      41              : // The number of keys in a DeletionList before we will proactively persist it
      42              : // (without reaching a flush deadline).  This aims to deliver objects of the order
      43              : // of magnitude 1MB when we are under heavy delete load.
      44              : const DELETION_LIST_TARGET_SIZE: usize = 16384;
      45              : 
      46              : // Ordinarily, we only flush to DeletionList periodically, to bound the window during
      47              : // which we might leak objects from not flushing a DeletionList after
      48              : // the objects are already unlinked from timeline metadata.
      49              : const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
      50              : 
      51              : // If someone is waiting for a flush to DeletionList, only delay a little to accumulate
      52              : // more objects before doing the flush.
      53              : const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
      54              : 
      55              : #[derive(Debug)]
      56              : pub(super) struct DeletionOp {
      57              :     pub(super) tenant_shard_id: TenantShardId,
      58              :     pub(super) timeline_id: TimelineId,
      59              :     // `layers` and `objects` are both just lists of objects.  `layers` is used if you do not
      60              :     // have a config object handy to project it to a remote key, and need the consuming worker
      61              :     // to do it for you.
      62              :     pub(super) layers: Vec<(LayerName, LayerFileMetadata)>,
      63              :     pub(super) objects: Vec<RemotePath>,
      64              : 
      65              :     /// The _current_ generation of the Tenant shard attachment in which we are enqueuing
      66              :     /// this deletion.
      67              :     pub(super) generation: Generation,
      68              : }
      69              : 
      70              : #[derive(Debug)]
      71              : pub(super) struct RecoverOp {
      72              :     pub(super) attached_tenants: HashMap<TenantShardId, Generation>,
      73              : }
      74              : 
      75              : #[derive(Debug)]
      76              : pub(super) enum ListWriterQueueMessage {
      77              :     Delete(DeletionOp),
      78              :     // Wait until all prior deletions make it into a persistent DeletionList
      79              :     Flush(FlushOp),
      80              :     // Wait until all prior deletions have been executed (i.e. objects are actually deleted)
      81              :     FlushExecute(FlushOp),
      82              :     // Call once after re-attaching to control plane, to notify the deletion queue about
      83              :     // latest attached generations & load any saved deletion lists from disk.
      84              :     Recover(RecoverOp),
      85              : }
      86              : 
      87              : pub(super) struct ListWriter {
      88              :     conf: &'static PageServerConf,
      89              : 
      90              :     // Incoming frontend requests to delete some keys
      91              :     rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
      92              : 
      93              :     // Outbound requests to the backend to execute deletion lists we have composed.
      94              :     tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
      95              : 
      96              :     // The list we are currently building, contains a buffer of keys to delete
      97              :     // and our next sequence number
      98              :     pending: DeletionList,
      99              : 
     100              :     // These FlushOps should notify the next time we flush
     101              :     pending_flushes: Vec<FlushOp>,
     102              : 
     103              :     // Worker loop is torn down when this fires.
     104              :     cancel: CancellationToken,
     105              : 
     106              :     // Safety guard to do recovery exactly once
     107              :     recovered: bool,
     108              : }
     109              : 
     110              : impl ListWriter {
     111              :     // Initially DeletionHeader.validated_sequence is zero.  The place we start our
     112              :     // sequence numbers must be higher than that.
     113              :     const BASE_SEQUENCE: u64 = 1;
     114              : 
     115            8 :     pub(super) fn new(
     116            8 :         conf: &'static PageServerConf,
     117            8 :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
     118            8 :         tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
     119            8 :         cancel: CancellationToken,
     120            8 :     ) -> Self {
     121            8 :         Self {
     122            8 :             pending: DeletionList::new(Self::BASE_SEQUENCE),
     123            8 :             conf,
     124            8 :             rx,
     125            8 :             tx,
     126            8 :             pending_flushes: Vec::new(),
     127            8 :             cancel,
     128            8 :             recovered: false,
     129            8 :         }
     130            8 :     }
     131              : 
     132              :     /// Try to flush `list` to persistent storage
     133              :     ///
     134              :     /// This does not return errors, because on failure to flush we do not lose
     135              :     /// any state: flushing will be retried implicitly on the next deadline
     136           13 :     async fn flush(&mut self) {
     137           13 :         if self.pending.is_empty() {
     138            3 :             for f in self.pending_flushes.drain(..) {
     139            0 :                 f.notify();
     140            0 :             }
     141            3 :             return;
     142           10 :         }
     143           10 : 
     144           10 :         match self.pending.save(self.conf).await {
     145              :             Ok(_) => {
     146           10 :                 info!(sequence = self.pending.sequence, "Stored deletion list");
     147              : 
     148           10 :                 for f in self.pending_flushes.drain(..) {
     149            8 :                     f.notify();
     150            8 :                 }
     151              : 
     152              :                 // Take the list we've accumulated, replace it with a fresh list for the next sequence
     153           10 :                 let next_list = DeletionList::new(self.pending.sequence + 1);
     154           10 :                 let list = std::mem::replace(&mut self.pending, next_list);
     155              : 
     156           10 :                 if let Err(e) = self.tx.send(ValidatorQueueMessage::Delete(list)).await {
     157              :                     // This is allowed to fail: it will only happen if the backend worker is shut down,
     158              :                     // so we can just drop this on the floor.
     159            0 :                     info!("Deletion list dropped, this is normal during shutdown ({e:#})");
     160           10 :                 }
     161              :             }
     162            0 :             Err(e) => {
     163            0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     164            0 :                 warn!(
     165              :                     sequence = self.pending.sequence,
     166            0 :                     "Failed to write deletion list, will retry later ({e:#})"
     167              :                 );
     168              :             }
     169              :         }
     170           13 :     }
     171              : 
     172              :     /// Load the header, to learn the sequence number up to which deletions
     173              :     /// have been validated.  We will apply validated=true to DeletionLists
     174              :     /// <= this sequence when loading them.
     175              :     ///
     176              :     /// It is not an error for the header to not exist: we return None, and
     177              :     /// the caller should act as if validated_sequence is 0
     178            8 :     async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
     179            8 :         let header_path = self.conf.deletion_header_path();
     180            8 :         match tokio::fs::read(&header_path).await {
     181            0 :             Ok(header_bytes) => {
     182            0 :                 match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
     183            0 :                     Ok(h) => Ok(Some(h.validated_sequence)),
     184            0 :                     Err(e) => {
     185            0 :                         warn!(
     186            0 :                             "Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
     187              :                         );
     188              :                         // This should never happen unless we make a mistake with our serialization.
     189              :                         // Ignoring a deletion header is not consequential for correctnes because all deletions
     190              :                         // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
     191            0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     192            0 :                         Ok(None)
     193              :                     }
     194              :                 }
     195              :             }
     196            8 :             Err(e) => {
     197            8 :                 if e.kind() == std::io::ErrorKind::NotFound {
     198            8 :                     debug!("Deletion header {header_path} not found, first start?");
     199            8 :                     Ok(None)
     200              :                 } else {
     201            0 :                     on_fatal_io_error(&e, "reading deletion header");
     202              :                 }
     203              :             }
     204              :         }
     205            8 :     }
     206              : 
     207            8 :     async fn recover(
     208            8 :         &mut self,
     209            8 :         attached_tenants: HashMap<TenantShardId, Generation>,
     210            8 :     ) -> Result<(), anyhow::Error> {
     211            8 :         debug!(
     212            0 :             "recovering with {} attached tenants",
     213            0 :             attached_tenants.len()
     214              :         );
     215              : 
     216              :         // Load the header
     217            8 :         let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
     218            8 : 
     219            8 :         self.pending.sequence = validated_sequence + 1;
     220            8 : 
     221            8 :         let deletion_directory = self.conf.deletion_prefix();
     222            8 :         let mut dir = tokio::fs::read_dir(&deletion_directory)
     223            8 :             .await
     224            8 :             .fatal_err("read deletion directory");
     225            8 : 
     226            8 :         let list_name_pattern =
     227            8 :             Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
     228            8 : 
     229            8 :         let temp_extension = format!(".{TEMP_SUFFIX}");
     230            8 :         let header_path = self.conf.deletion_header_path();
     231            8 :         let mut seqs: Vec<u64> = Vec::new();
     232           12 :         while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
     233            4 :             let file_name = dentry.file_name();
     234            4 :             let dentry_str = file_name.to_string_lossy();
     235            4 : 
     236            4 :             if file_name == header_path.file_name().unwrap_or("") {
     237              :                 // Don't try and parse the header's name like a list
     238            0 :                 continue;
     239            4 :             }
     240            4 : 
     241            4 :             if dentry_str.ends_with(&temp_extension) {
     242            0 :                 info!("Cleaning up temporary file {dentry_str}");
     243            0 :                 let absolute_path =
     244            0 :                     deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
     245            0 :                 tokio::fs::remove_file(&absolute_path)
     246            0 :                     .await
     247            0 :                     .fatal_err("delete temp file");
     248            0 : 
     249            0 :                 continue;
     250            4 :             }
     251            4 : 
     252            4 :             let file_name = dentry.file_name().to_owned();
     253            4 :             let basename = file_name.to_string_lossy();
     254            4 :             let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
     255            4 :                 m.name("sequence")
     256            4 :                     .expect("Non optional group should be present")
     257            4 :                     .as_str()
     258              :             } else {
     259            0 :                 warn!("Unexpected key in deletion queue: {basename}");
     260            0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     261            0 :                 continue;
     262              :             };
     263              : 
     264            4 :             let seq: u64 = match u64::from_str_radix(seq_part, 16) {
     265            4 :                 Ok(s) => s,
     266            0 :                 Err(e) => {
     267            0 :                     warn!("Malformed key '{basename}': {e}");
     268            0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     269            0 :                     continue;
     270              :                 }
     271              :             };
     272            4 :             seqs.push(seq);
     273              :         }
     274            8 :         seqs.sort();
     275            8 : 
     276            8 :         // Start our next deletion list from after the last location validated by
     277            8 :         // previous process lifetime, or after the last location found (it is updated
     278            8 :         // below after enumerating the deletion lists)
     279            8 :         self.pending.sequence = validated_sequence + 1;
     280            8 :         if let Some(max_list_seq) = seqs.last() {
     281            2 :             self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
     282            6 :         }
     283              : 
     284           12 :         for s in seqs {
     285            4 :             let list_path = self.conf.deletion_list_path(s);
     286              : 
     287            4 :             let list_bytes = tokio::fs::read(&list_path)
     288            4 :                 .await
     289            4 :                 .fatal_err("read deletion list");
     290              : 
     291            4 :             let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
     292            4 :                 Ok(l) => l,
     293            0 :                 Err(e) => {
     294            0 :                     // Drop the list on the floor: any objects it referenced will be left behind
     295            0 :                     // for scrubbing to clean up.  This should never happen unless we have a serialization bug.
     296            0 :                     warn!(sequence = s, "Failed to deserialize deletion list: {e}");
     297            0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     298            0 :                     continue;
     299              :                 }
     300              :             };
     301              : 
     302            4 :             if deletion_list.sequence <= validated_sequence {
     303            0 :                 // If the deletion list falls below valid_seq, we may assume that it was
     304            0 :                 // already validated the last time this pageserver ran.  Otherwise, we still
     305            0 :                 // load it, as it may still contain content valid in this generation.
     306            0 :                 deletion_list.validated = true;
     307            0 :             } else {
     308              :                 // Special case optimization: if a tenant is still attached, and no other
     309              :                 // generation was issued to another node in the interval while we restarted,
     310              :                 // then we may treat deletion lists from the previous generation as if they
     311              :                 // belong to our currently attached generation, and proceed to validate & execute.
     312            8 :                 for (tenant_shard_id, tenant_list) in &mut deletion_list.tenants {
     313            4 :                     if let Some(attached_gen) = attached_tenants.get(tenant_shard_id) {
     314            4 :                         if attached_gen.previous() == tenant_list.generation {
     315            2 :                             info!(
     316              :                                 seq=%s, tenant_id=%tenant_shard_id.tenant_id,
     317            0 :                                 shard_id=%tenant_shard_id.shard_slug(),
     318            0 :                                 old_gen=?tenant_list.generation, new_gen=?attached_gen,
     319            0 :                                 "Updating gen on recovered list");
     320            2 :                             tenant_list.generation = *attached_gen;
     321              :                         } else {
     322            2 :                             info!(
     323              :                                 seq=%s, tenant_id=%tenant_shard_id.tenant_id,
     324            0 :                                 shard_id=%tenant_shard_id.shard_slug(),
     325            0 :                                 old_gen=?tenant_list.generation, new_gen=?attached_gen,
     326            0 :                                 "Encountered stale generation on recovered list");
     327              :                         }
     328            0 :                     }
     329              :                 }
     330              :             }
     331              : 
     332            4 :             info!(
     333              :                 validated = deletion_list.validated,
     334              :                 sequence = deletion_list.sequence,
     335            0 :                 "Recovered deletion list"
     336              :             );
     337              : 
     338              :             // We will drop out of recovery if this fails: it indicates that we are shutting down
     339              :             // or the backend has panicked
     340            4 :             metrics::DELETION_QUEUE
     341            4 :                 .keys_submitted
     342            4 :                 .inc_by(deletion_list.len() as u64);
     343            4 :             self.tx
     344            4 :                 .send(ValidatorQueueMessage::Delete(deletion_list))
     345            0 :                 .await?;
     346              :         }
     347              : 
     348            8 :         info!(next_sequence = self.pending.sequence, "Replay complete");
     349              : 
     350            8 :         Ok(())
     351            8 :     }
     352              : 
     353              :     /// This is the front-end ingest, where we bundle up deletion requests into DeletionList
     354              :     /// and write them out, for later validation by the backend and execution by the executor.
     355            8 :     pub(super) async fn background(&mut self) {
     356            8 :         info!("Started deletion frontend worker");
     357              : 
     358              :         // Synchronous, but we only do it once per process lifetime so it's tolerable
     359            8 :         if let Err(e) = create_dir_all(self.conf.deletion_prefix()) {
     360            0 :             tracing::error!(
     361            0 :                 "Failed to create deletion list directory {}, deletions will not be executed ({e})",
     362            0 :                 self.conf.deletion_prefix(),
     363              :             );
     364            0 :             metrics::DELETION_QUEUE.unexpected_errors.inc();
     365            0 :             return;
     366            8 :         }
     367              : 
     368           53 :         while !self.cancel.is_cancelled() {
     369           51 :             let timeout = if self.pending_flushes.is_empty() {
     370           51 :                 FRONTEND_DEFAULT_TIMEOUT
     371              :             } else {
     372            0 :                 FRONTEND_FLUSHING_TIMEOUT
     373              :             };
     374              : 
     375           51 :             let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
     376           42 :                 Ok(Some(msg)) => msg,
     377              :                 Ok(None) => {
     378              :                     // Queue sender destroyed, shutting down
     379            0 :                     break;
     380              :                 }
     381              :                 Err(_) => {
     382              :                     // Hit deadline, flush.
     383            3 :                     self.flush().await;
     384            3 :                     continue;
     385              :                 }
     386              :             };
     387              : 
     388           42 :             match msg {
     389           10 :                 ListWriterQueueMessage::Delete(op) => {
     390           10 :                     assert!(
     391           10 :                         self.recovered,
     392            0 :                         "Cannot process deletions before recovery.  This is a bug."
     393              :                     );
     394              : 
     395           10 :                     debug!(
     396            0 :                         "Delete: ingesting {} layers, {} other objects",
     397            0 :                         op.layers.len(),
     398            0 :                         op.objects.len()
     399              :                     );
     400              : 
     401           10 :                     let mut layer_paths = Vec::new();
     402           20 :                     for (layer, meta) in op.layers {
     403           10 :                         layer_paths.push(remote_layer_path(
     404           10 :                             &op.tenant_shard_id.tenant_id,
     405           10 :                             &op.timeline_id,
     406           10 :                             meta.shard,
     407           10 :                             &layer,
     408           10 :                             meta.generation,
     409           10 :                         ));
     410           10 :                     }
     411           10 :                     layer_paths.extend(op.objects);
     412           10 : 
     413           10 :                     if !self.pending.push(
     414           10 :                         &op.tenant_shard_id,
     415           10 :                         &op.timeline_id,
     416           10 :                         op.generation,
     417           10 :                         &mut layer_paths,
     418           10 :                     ) {
     419            2 :                         self.flush().await;
     420            2 :                         let retry_succeeded = self.pending.push(
     421            2 :                             &op.tenant_shard_id,
     422            2 :                             &op.timeline_id,
     423            2 :                             op.generation,
     424            2 :                             &mut layer_paths,
     425            2 :                         );
     426            2 :                         if !retry_succeeded {
     427              :                             // Unexpected: after we flush, we should have
     428              :                             // drained self.pending, so a conflict on
     429              :                             // generation numbers should be impossible.
     430            0 :                             tracing::error!(
     431            0 :                                 "Failed to enqueue deletions, leaking objects.  This is a bug."
     432              :                             );
     433            0 :                             metrics::DELETION_QUEUE.unexpected_errors.inc();
     434            2 :                         }
     435            8 :                     }
     436              :                 }
     437           14 :                 ListWriterQueueMessage::Flush(op) => {
     438           14 :                     if self.pending.is_empty() {
     439              :                         // Execute immediately
     440            6 :                         debug!("Flush: No pending objects, flushing immediately");
     441            6 :                         op.notify()
     442              :                     } else {
     443              :                         // Execute next time we flush
     444            8 :                         debug!("Flush: adding to pending flush list for next deadline flush");
     445            8 :                         self.pending_flushes.push(op);
     446              :                     }
     447              :                 }
     448           10 :                 ListWriterQueueMessage::FlushExecute(op) => {
     449           10 :                     debug!("FlushExecute: passing through to backend");
     450              :                     // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
     451           10 :                     if let Err(e) = self.tx.send(ValidatorQueueMessage::Flush(op)).await {
     452            0 :                         info!("Can't flush, shutting down ({e})");
     453              :                         // Caller will get error when their oneshot sender was dropped.
     454           10 :                     }
     455              :                 }
     456            8 :                 ListWriterQueueMessage::Recover(op) => {
     457            8 :                     if self.recovered {
     458            0 :                         tracing::error!(
     459            0 :                             "Deletion queue recovery called more than once.  This is a bug."
     460              :                         );
     461            0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     462            0 :                         // Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
     463            0 :                         continue;
     464            8 :                     }
     465              : 
     466           20 :                     if let Err(e) = self.recover(op.attached_tenants).await {
     467              :                         // This should only happen in truly unrecoverable cases, like the recovery finding that the backend
     468              :                         // queue receiver has been dropped, or something is critically broken with
     469              :                         // the local filesystem holding deletion lists.
     470            0 :                         info!(
     471            0 :                             "Deletion queue recover aborted, deletion queue will not proceed ({e})"
     472              :                         );
     473            0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     474            0 :                         return;
     475            8 :                     } else {
     476            8 :                         self.recovered = true;
     477            8 :                     }
     478              :                 }
     479              :             }
     480              : 
     481           42 :             if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
     482            8 :                 self.flush().await;
     483           34 :             }
     484              :         }
     485            2 :         info!("Deletion queue shut down.");
     486            2 :     }
     487              : }
        

Generated by: LCOV version 2.1-beta