LCOV - code coverage report
Current view: top level - pageserver/src/deletion_queue - list_writer.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 71.5 % 270 193
Test Date: 2024-02-07 07:37:29 Functions: 43.2 % 37 16

            Line data    Source code
       1              : //! The list writer is the first stage in the deletion queue.  It accumulates
       2              : //! layers to delete, and periodically writes out these layers into a persistent
       3              : //! DeletionList.
       4              : //!
       5              : //! The purpose of writing DeletionLists is to decouple the decision to
       6              : //! delete an object from the validation required to execute it: even if
       7              : //! validation is not possible, e.g. due to a control plane outage, we can
       8              : //! still persist our intent to delete an object, in a way that would
       9              : //! survive a restart.
      10              : //!
      11              : //! DeletionLists are passed onwards to the Validator.
      12              : 
      13              : use super::DeletionHeader;
      14              : use super::DeletionList;
      15              : use super::FlushOp;
      16              : use super::ValidatorQueueMessage;
      17              : 
      18              : use std::collections::HashMap;
      19              : use std::fs::create_dir_all;
      20              : use std::time::Duration;
      21              : 
      22              : use pageserver_api::shard::TenantShardId;
      23              : use regex::Regex;
      24              : use remote_storage::RemotePath;
      25              : use tokio_util::sync::CancellationToken;
      26              : use tracing::debug;
      27              : use tracing::info;
      28              : use tracing::warn;
      29              : use utils::generation::Generation;
      30              : use utils::id::TimelineId;
      31              : 
      32              : use crate::config::PageServerConf;
      33              : use crate::deletion_queue::TEMP_SUFFIX;
      34              : use crate::metrics;
      35              : use crate::tenant::remote_timeline_client::remote_layer_path;
      36              : use crate::tenant::remote_timeline_client::LayerFileMetadata;
      37              : use crate::tenant::storage_layer::LayerFileName;
      38              : use crate::virtual_file::on_fatal_io_error;
      39              : use crate::virtual_file::MaybeFatalIo;
      40              : 
      41              : // The number of keys in a DeletionList before we will proactively persist it
      42              : // (without reaching a flush deadline).  This aims to deliver objects of the order
      43              : // of magnitude 1MB when we are under heavy delete load.
      44              : const DELETION_LIST_TARGET_SIZE: usize = 16384;
      45              : 
      46              : // Ordinarily, we only flush to DeletionList periodically, to bound the window during
      47              : // which we might leak objects from not flushing a DeletionList after
      48              : // the objects are already unlinked from timeline metadata.
      49              : const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
      50              : 
      51              : // If someone is waiting for a flush to DeletionList, only delay a little to accumulate
      52              : // more objects before doing the flush.
      53              : const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
      54              : 
      55            0 : #[derive(Debug)]
      56              : pub(super) struct DeletionOp {
      57              :     pub(super) tenant_shard_id: TenantShardId,
      58              :     pub(super) timeline_id: TimelineId,
      59              :     // `layers` and `objects` are both just lists of objects.  `layers` is used if you do not
      60              :     // have a config object handy to project it to a remote key, and need the consuming worker
      61              :     // to do it for you.
      62              :     pub(super) layers: Vec<(LayerFileName, LayerFileMetadata)>,
      63              :     pub(super) objects: Vec<RemotePath>,
      64              : 
      65              :     /// The _current_ generation of the Tenant shard attachment in which we are enqueuing
      66              :     /// this deletion.
      67              :     pub(super) generation: Generation,
      68              : }
      69              : 
      70            0 : #[derive(Debug)]
      71              : pub(super) struct RecoverOp {
      72              :     pub(super) attached_tenants: HashMap<TenantShardId, Generation>,
      73              : }
      74              : 
      75            0 : #[derive(Debug)]
      76              : pub(super) enum ListWriterQueueMessage {
      77              :     Delete(DeletionOp),
      78              :     // Wait until all prior deletions make it into a persistent DeletionList
      79              :     Flush(FlushOp),
      80              :     // Wait until all prior deletions have been executed (i.e. objects are actually deleted)
      81              :     FlushExecute(FlushOp),
      82              :     // Call once after re-attaching to control plane, to notify the deletion queue about
      83              :     // latest attached generations & load any saved deletion lists from disk.
      84              :     Recover(RecoverOp),
      85              : }
      86              : 
      87              : pub(super) struct ListWriter {
      88              :     conf: &'static PageServerConf,
      89              : 
      90              :     // Incoming frontend requests to delete some keys
      91              :     rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
      92              : 
      93              :     // Outbound requests to the backend to execute deletion lists we have composed.
      94              :     tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
      95              : 
      96              :     // The list we are currently building, contains a buffer of keys to delete
      97              :     // and our next sequence number
      98              :     pending: DeletionList,
      99              : 
     100              :     // These FlushOps should notify the next time we flush
     101              :     pending_flushes: Vec<FlushOp>,
     102              : 
     103              :     // Worker loop is torn down when this fires.
     104              :     cancel: CancellationToken,
     105              : 
     106              :     // Safety guard to do recovery exactly once
     107              :     recovered: bool,
     108              : }
     109              : 
     110              : impl ListWriter {
     111              :     // Initially DeletionHeader.validated_sequence is zero.  The place we start our
     112              :     // sequence numbers must be higher than that.
     113              :     const BASE_SEQUENCE: u64 = 1;
     114              : 
     115          612 :     pub(super) fn new(
     116          612 :         conf: &'static PageServerConf,
     117          612 :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
     118          612 :         tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
     119          612 :         cancel: CancellationToken,
     120          612 :     ) -> Self {
     121          612 :         Self {
     122          612 :             pending: DeletionList::new(Self::BASE_SEQUENCE),
     123          612 :             conf,
     124          612 :             rx,
     125          612 :             tx,
     126          612 :             pending_flushes: Vec::new(),
     127          612 :             cancel,
     128          612 :             recovered: false,
     129          612 :         }
     130          612 :     }
     131              : 
     132              :     /// Try to flush `list` to persistent storage
     133              :     ///
     134              :     /// This does not return errors, because on failure to flush we do not lose
     135              :     /// any state: flushing will be retried implicitly on the next deadline
     136          558 :     async fn flush(&mut self) {
     137          558 :         if self.pending.is_empty() {
     138          503 :             for f in self.pending_flushes.drain(..) {
     139            0 :                 f.notify();
     140            0 :             }
     141          503 :             return;
     142           55 :         }
     143           55 : 
     144           55 :         match self.pending.save(self.conf).await {
     145              :             Ok(_) => {
     146           55 :                 info!(sequence = self.pending.sequence, "Stored deletion list");
     147              : 
     148           55 :                 for f in self.pending_flushes.drain(..) {
     149           36 :                     f.notify();
     150           36 :                 }
     151              : 
     152              :                 // Take the list we've accumulated, replace it with a fresh list for the next sequence
     153           55 :                 let next_list = DeletionList::new(self.pending.sequence + 1);
     154           55 :                 let list = std::mem::replace(&mut self.pending, next_list);
     155              : 
     156           55 :                 if let Err(e) = self.tx.send(ValidatorQueueMessage::Delete(list)).await {
     157              :                     // This is allowed to fail: it will only happen if the backend worker is shut down,
     158              :                     // so we can just drop this on the floor.
     159            0 :                     info!("Deletion list dropped, this is normal during shutdown ({e:#})");
     160           55 :                 }
     161              :             }
     162            0 :             Err(e) => {
     163            0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     164            0 :                 warn!(
     165            0 :                     sequence = self.pending.sequence,
     166            0 :                     "Failed to write deletion list, will retry later ({e:#})"
     167            0 :                 );
     168              :             }
     169              :         }
     170          558 :     }
     171              : 
     172              :     /// Load the header, to learn the sequence number up to which deletions
     173              :     /// have been validated.  We will apply validated=true to DeletionLists
     174              :     /// <= this sequence when loading them.
     175              :     ///
     176              :     /// It is not an error for the header to not exist: we return None, and
     177              :     /// the caller should act as if validated_sequence is 0
     178          611 :     async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
     179          611 :         let header_path = self.conf.deletion_header_path();
     180          611 :         match tokio::fs::read(&header_path).await {
     181           18 :             Ok(header_bytes) => {
     182           18 :                 match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
     183           18 :                     Ok(h) => Ok(Some(h.validated_sequence)),
     184            0 :                     Err(e) => {
     185            0 :                         warn!(
     186            0 :                             "Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
     187            0 :                         );
     188              :                         // This should never happen unless we make a mistake with our serialization.
     189              :                         // Ignoring a deletion header is not consequential for correctnes because all deletions
     190              :                         // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
     191            0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     192            0 :                         Ok(None)
     193              :                     }
     194              :                 }
     195              :             }
     196          593 :             Err(e) => {
     197          593 :                 if e.kind() == std::io::ErrorKind::NotFound {
     198            0 :                     debug!("Deletion header {header_path} not found, first start?");
     199          593 :                     Ok(None)
     200              :                 } else {
     201            0 :                     on_fatal_io_error(&e, "reading deletion header");
     202              :                 }
     203              :             }
     204              :         }
     205          611 :     }
     206              : 
     207          611 :     async fn recover(
     208          611 :         &mut self,
     209          611 :         attached_tenants: HashMap<TenantShardId, Generation>,
     210          611 :     ) -> Result<(), anyhow::Error> {
     211            0 :         debug!(
     212            0 :             "recovering with {} attached tenants",
     213            0 :             attached_tenants.len()
     214            0 :         );
     215              : 
     216              :         // Load the header
     217          611 :         let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
     218          611 : 
     219          611 :         self.pending.sequence = validated_sequence + 1;
     220          611 : 
     221          611 :         let deletion_directory = self.conf.deletion_prefix();
     222          611 :         let mut dir = tokio::fs::read_dir(&deletion_directory)
     223          603 :             .await
     224          611 :             .fatal_err("read deletion directory");
     225          611 : 
     226          611 :         let list_name_pattern =
     227          611 :             Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
     228          611 : 
     229          611 :         let temp_extension = format!(".{TEMP_SUFFIX}");
     230          611 :         let header_path = self.conf.deletion_header_path();
     231          611 :         let mut seqs: Vec<u64> = Vec::new();
     232          657 :         while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
     233           46 :             let file_name = dentry.file_name();
     234           46 :             let dentry_str = file_name.to_string_lossy();
     235           46 : 
     236           46 :             if file_name == header_path.file_name().unwrap_or("") {
     237              :                 // Don't try and parse the header's name like a list
     238           18 :                 continue;
     239           28 :             }
     240           28 : 
     241           28 :             if dentry_str.ends_with(&temp_extension) {
     242            0 :                 info!("Cleaning up temporary file {dentry_str}");
     243            0 :                 let absolute_path =
     244            0 :                     deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
     245            0 :                 tokio::fs::remove_file(&absolute_path)
     246            0 :                     .await
     247            0 :                     .fatal_err("delete temp file");
     248            0 : 
     249            0 :                 continue;
     250           28 :             }
     251           28 : 
     252           28 :             let file_name = dentry.file_name().to_owned();
     253           28 :             let basename = file_name.to_string_lossy();
     254           28 :             let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
     255           28 :                 m.name("sequence")
     256           28 :                     .expect("Non optional group should be present")
     257           28 :                     .as_str()
     258              :             } else {
     259            0 :                 warn!("Unexpected key in deletion queue: {basename}");
     260            0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     261            0 :                 continue;
     262              :             };
     263              : 
     264           28 :             let seq: u64 = match u64::from_str_radix(seq_part, 16) {
     265           28 :                 Ok(s) => s,
     266            0 :                 Err(e) => {
     267            0 :                     warn!("Malformed key '{basename}': {e}");
     268            0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     269            0 :                     continue;
     270              :                 }
     271              :             };
     272           28 :             seqs.push(seq);
     273              :         }
     274          611 :         seqs.sort();
     275          611 : 
     276          611 :         // Start our next deletion list from after the last location validated by
     277          611 :         // previous process lifetime, or after the last location found (it is updated
     278          611 :         // below after enumerating the deletion lists)
     279          611 :         self.pending.sequence = validated_sequence + 1;
     280          611 :         if let Some(max_list_seq) = seqs.last() {
     281           26 :             self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
     282          585 :         }
     283              : 
     284          639 :         for s in seqs {
     285           28 :             let list_path = self.conf.deletion_list_path(s);
     286              : 
     287           28 :             let list_bytes = tokio::fs::read(&list_path)
     288           28 :                 .await
     289           28 :                 .fatal_err("read deletion list");
     290              : 
     291           28 :             let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
     292           28 :                 Ok(l) => l,
     293            0 :                 Err(e) => {
     294              :                     // Drop the list on the floor: any objects it referenced will be left behind
     295              :                     // for scrubbing to clean up.  This should never happen unless we have a serialization bug.
     296            0 :                     warn!(sequence = s, "Failed to deserialize deletion list: {e}");
     297            0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     298            0 :                     continue;
     299              :                 }
     300              :             };
     301              : 
     302           28 :             if deletion_list.sequence <= validated_sequence {
     303            2 :                 // If the deletion list falls below valid_seq, we may assume that it was
     304            2 :                 // already validated the last time this pageserver ran.  Otherwise, we still
     305            2 :                 // load it, as it may still contain content valid in this generation.
     306            2 :                 deletion_list.validated = true;
     307            2 :             } else {
     308              :                 // Special case optimization: if a tenant is still attached, and no other
     309              :                 // generation was issued to another node in the interval while we restarted,
     310              :                 // then we may treat deletion lists from the previous generation as if they
     311              :                 // belong to our currently attached generation, and proceed to validate & execute.
     312           55 :                 for (tenant_shard_id, tenant_list) in &mut deletion_list.tenants {
     313           29 :                     if let Some(attached_gen) = attached_tenants.get(tenant_shard_id) {
     314           19 :                         if attached_gen.previous() == tenant_list.generation {
     315           15 :                             info!(
     316           15 :                                 seq=%s, tenant_id=%tenant_shard_id.tenant_id,
     317           15 :                                 shard_id=%tenant_shard_id.shard_slug(),
     318           15 :                                 old_gen=?tenant_list.generation, new_gen=?attached_gen,
     319           15 :                                 "Updating gen on recovered list");
     320           15 :                             tenant_list.generation = *attached_gen;
     321              :                         } else {
     322            4 :                             info!(
     323            4 :                                 seq=%s, tenant_id=%tenant_shard_id.tenant_id,
     324            4 :                                 shard_id=%tenant_shard_id.shard_slug(),
     325            4 :                                 old_gen=?tenant_list.generation, new_gen=?attached_gen,
     326            4 :                                 "Encountered stale generation on recovered list");
     327              :                         }
     328           10 :                     }
     329              :                 }
     330              :             }
     331              : 
     332           28 :             info!(
     333           28 :                 validated = deletion_list.validated,
     334           28 :                 sequence = deletion_list.sequence,
     335           28 :                 "Recovered deletion list"
     336           28 :             );
     337              : 
     338              :             // We will drop out of recovery if this fails: it indicates that we are shutting down
     339              :             // or the backend has panicked
     340           28 :             metrics::DELETION_QUEUE
     341           28 :                 .keys_submitted
     342           28 :                 .inc_by(deletion_list.len() as u64);
     343           28 :             self.tx
     344           28 :                 .send(ValidatorQueueMessage::Delete(deletion_list))
     345            0 :                 .await?;
     346              :         }
     347              : 
     348          611 :         info!(next_sequence = self.pending.sequence, "Replay complete");
     349              : 
     350          611 :         Ok(())
     351          611 :     }
     352              : 
     353              :     /// This is the front-end ingest, where we bundle up deletion requests into DeletionList
     354              :     /// and write them out, for later validation by the backend and execution by the executor.
     355          612 :     pub(super) async fn background(&mut self) {
     356          612 :         info!("Started deletion frontend worker");
     357              : 
     358              :         // Synchronous, but we only do it once per process lifetime so it's tolerable
     359          612 :         if let Err(e) = create_dir_all(self.conf.deletion_prefix()) {
     360            0 :             tracing::error!(
     361            0 :                 "Failed to create deletion list directory {}, deletions will not be executed ({e})",
     362            0 :                 self.conf.deletion_prefix(),
     363            0 :             );
     364            0 :             metrics::DELETION_QUEUE.unexpected_errors.inc();
     365            0 :             return;
     366          612 :         }
     367              : 
     368         6229 :         while !self.cancel.is_cancelled() {
     369         6226 :             let timeout = if self.pending_flushes.is_empty() {
     370         6226 :                 FRONTEND_DEFAULT_TIMEOUT
     371              :             } else {
     372            0 :                 FRONTEND_FLUSHING_TIMEOUT
     373              :             };
     374              : 
     375         6226 :             let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
     376         5098 :                 Ok(Some(msg)) => msg,
     377              :                 Ok(None) => {
     378              :                     // Queue sender destroyed, shutting down
     379            0 :                     break;
     380              :                 }
     381              :                 Err(_) => {
     382              :                     // Hit deadline, flush.
     383          519 :                     self.flush().await;
     384          519 :                     continue;
     385              :                 }
     386              :             };
     387              : 
     388         5098 :             match msg {
     389         4099 :                 ListWriterQueueMessage::Delete(op) => {
     390         4099 :                     assert!(
     391         4099 :                         self.recovered,
     392            0 :                         "Cannot process deletions before recovery.  This is a bug."
     393              :                     );
     394              : 
     395            0 :                     debug!(
     396            0 :                         "Delete: ingesting {} layers, {} other objects",
     397            0 :                         op.layers.len(),
     398            0 :                         op.objects.len()
     399            0 :                     );
     400              : 
     401         4099 :                     let mut layer_paths = Vec::new();
     402         7793 :                     for (layer, meta) in op.layers {
     403         3694 :                         layer_paths.push(remote_layer_path(
     404         3694 :                             &op.tenant_shard_id.tenant_id,
     405         3694 :                             &op.timeline_id,
     406         3694 :                             meta.shard,
     407         3694 :                             &layer,
     408         3694 :                             meta.generation,
     409         3694 :                         ));
     410         3694 :                     }
     411         4099 :                     layer_paths.extend(op.objects);
     412         4099 : 
     413         4099 :                     if !self.pending.push(
     414         4099 :                         &op.tenant_shard_id,
     415         4099 :                         &op.timeline_id,
     416         4099 :                         op.generation,
     417         4099 :                         &mut layer_paths,
     418         4099 :                     ) {
     419            3 :                         self.flush().await;
     420            3 :                         let retry_succeeded = self.pending.push(
     421            3 :                             &op.tenant_shard_id,
     422            3 :                             &op.timeline_id,
     423            3 :                             op.generation,
     424            3 :                             &mut layer_paths,
     425            3 :                         );
     426            3 :                         if !retry_succeeded {
     427              :                             // Unexpected: after we flush, we should have
     428              :                             // drained self.pending, so a conflict on
     429              :                             // generation numbers should be impossible.
     430            0 :                             tracing::error!(
     431            0 :                                 "Failed to enqueue deletions, leaking objects.  This is a bug."
     432            0 :                             );
     433            0 :                             metrics::DELETION_QUEUE.unexpected_errors.inc();
     434            3 :                         }
     435         4096 :                     }
     436              :                 }
     437          207 :                 ListWriterQueueMessage::Flush(op) => {
     438          207 :                     if self.pending.is_empty() {
     439              :                         // Execute immediately
     440            0 :                         debug!("Flush: No pending objects, flushing immediately");
     441          171 :                         op.notify()
     442              :                     } else {
     443              :                         // Execute next time we flush
     444            0 :                         debug!("Flush: adding to pending flush list for next deadline flush");
     445           36 :                         self.pending_flushes.push(op);
     446              :                     }
     447              :                 }
     448          181 :                 ListWriterQueueMessage::FlushExecute(op) => {
     449            0 :                     debug!("FlushExecute: passing through to backend");
     450              :                     // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
     451          181 :                     if let Err(e) = self.tx.send(ValidatorQueueMessage::Flush(op)).await {
     452            0 :                         info!("Can't flush, shutting down ({e})");
     453              :                         // Caller will get error when their oneshot sender was dropped.
     454          181 :                     }
     455              :                 }
     456          611 :                 ListWriterQueueMessage::Recover(op) => {
     457          611 :                     if self.recovered {
     458            0 :                         tracing::error!(
     459            0 :                             "Deletion queue recovery called more than once.  This is a bug."
     460            0 :                         );
     461            0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     462            0 :                         // Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
     463            0 :                         continue;
     464          611 :                     }
     465              : 
     466         1238 :                     if let Err(e) = self.recover(op.attached_tenants).await {
     467              :                         // This should only happen in truly unrecoverable cases, like the recovery finding that the backend
     468              :                         // queue receiver has been dropped, or something is critically broken with
     469              :                         // the local filesystem holding deletion lists.
     470            0 :                         info!(
     471            0 :                             "Deletion queue recover aborted, deletion queue will not proceed ({e})"
     472            0 :                         );
     473            0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     474            0 :                         return;
     475          611 :                     } else {
     476          611 :                         self.recovered = true;
     477          611 :                     }
     478              :                 }
     479              :             }
     480              : 
     481         5098 :             if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
     482           36 :                 self.flush().await;
     483         5062 :             }
     484              :         }
     485            3 :         info!("Deletion queue shut down.");
     486            3 :     }
     487              : }
        

Generated by: LCOV version 2.1-beta