LCOV - differential code coverage report
Current view: top level - pageserver/src/deletion_queue - validator.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 80.8 % 213 172 1 40 172
Current Date: 2023-10-19 02:04:12 Functions: 34.1 % 85 29 56 29
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! The validator is responsible for validating DeletionLists for execution,
       2                 : //! based on whethe the generation in the DeletionList is still the latest
       3                 : //! generation for a tenant.
       4                 : //!
       5                 : //! The purpose of validation is to ensure split-brain safety in the cluster
       6                 : //! of pageservers: a deletion may only be executed if the tenant generation
       7                 : //! that originated it is still current.  See docs/rfcs/025-generation-numbers.md
       8                 : //! The purpose of accumulating lists before validating them is to reduce load
       9                 : //! on the control plane API by issuing fewer, larger requests.
      10                 : //!
      11                 : //! In addition to validating DeletionLists, the validator validates updates to remote_consistent_lsn
      12                 : //! for timelines: these are logically deletions because the safekeepers use remote_consistent_lsn
      13                 : //! to decide when old
      14                 : //!
      15                 : //! Deletions are passed onward to the Deleter.
      16                 : 
      17                 : use std::collections::HashMap;
      18                 : use std::sync::Arc;
      19                 : use std::time::Duration;
      20                 : 
      21                 : use camino::Utf8PathBuf;
      22                 : use tokio_util::sync::CancellationToken;
      23                 : use tracing::debug;
      24                 : use tracing::info;
      25                 : use tracing::warn;
      26                 : 
      27                 : use crate::config::PageServerConf;
      28                 : use crate::control_plane_client::ControlPlaneGenerationsApi;
      29                 : use crate::control_plane_client::RetryForeverError;
      30                 : use crate::metrics;
      31                 : 
      32                 : use super::deleter::DeleterMessage;
      33                 : use super::DeletionHeader;
      34                 : use super::DeletionList;
      35                 : use super::DeletionQueueError;
      36                 : use super::FlushOp;
      37                 : use super::VisibleLsnUpdates;
      38                 : 
      39                 : // After this length of time, do any validation work that is pending,
      40                 : // even if we haven't accumulated many keys to delete.
      41                 : //
      42                 : // This also causes updates to remote_consistent_lsn to be validated, even
      43                 : // if there were no deletions enqueued.
      44                 : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
      45                 : 
      46                 : // If we have received this number of keys, proceed with attempting to execute
      47                 : const AUTOFLUSH_KEY_COUNT: usize = 16384;
      48                 : 
      49 UBC           0 : #[derive(Debug)]
      50                 : pub(super) enum ValidatorQueueMessage {
      51                 :     Delete(DeletionList),
      52                 :     Flush(FlushOp),
      53                 : }
      54                 : pub(super) struct Validator<C>
      55                 : where
      56                 :     C: ControlPlaneGenerationsApi,
      57                 : {
      58                 :     conf: &'static PageServerConf,
      59                 :     rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
      60                 :     tx: tokio::sync::mpsc::Sender<DeleterMessage>,
      61                 : 
      62                 :     // Client for calling into control plane API for validation of deletes
      63                 :     control_plane_client: Option<C>,
      64                 : 
      65                 :     // DeletionLists which are waiting generation validation.  Not safe to
      66                 :     // execute until [`validate`] has processed them.
      67                 :     pending_lists: Vec<DeletionList>,
      68                 : 
      69                 :     // DeletionLists which have passed validation and are ready to execute.
      70                 :     validated_lists: Vec<DeletionList>,
      71                 : 
      72                 :     // Sum of all the lengths of lists in pending_lists
      73                 :     pending_key_count: usize,
      74                 : 
      75                 :     // Lsn validation state: we read projected LSNs and write back visible LSNs
      76                 :     // after validation.  This is the LSN equivalent of `pending_validation_lists`:
      77                 :     // it is drained in [`validate`]
      78                 :     lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
      79                 : 
      80                 :     // If we failed to rewrite a deletion list due to local filesystem I/O failure,
      81                 :     // we must remember that and refuse to advance our persistent validated sequence
      82                 :     // number past the failure.
      83                 :     list_write_failed: Option<u64>,
      84                 : 
      85                 :     cancel: CancellationToken,
      86                 : }
      87                 : 
      88                 : impl<C> Validator<C>
      89                 : where
      90                 :     C: ControlPlaneGenerationsApi,
      91                 : {
      92 CBC         564 :     pub(super) fn new(
      93             564 :         conf: &'static PageServerConf,
      94             564 :         rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
      95             564 :         tx: tokio::sync::mpsc::Sender<DeleterMessage>,
      96             564 :         control_plane_client: Option<C>,
      97             564 :         lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
      98             564 :         cancel: CancellationToken,
      99             564 :     ) -> Self {
     100             564 :         Self {
     101             564 :             conf,
     102             564 :             rx,
     103             564 :             tx,
     104             564 :             control_plane_client,
     105             564 :             lsn_table,
     106             564 :             pending_lists: Vec::new(),
     107             564 :             validated_lists: Vec::new(),
     108             564 :             pending_key_count: 0,
     109             564 :             list_write_failed: None,
     110             564 :             cancel,
     111             564 :         }
     112             564 :     }
     113                 :     /// Process any outstanding validations of generations of pending LSN updates or pending
     114                 :     /// DeletionLists.
     115                 :     ///
     116                 :     /// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
     117                 :     /// go into the queue of ready-to-execute lists.
     118             381 :     async fn validate(&mut self) -> Result<(), DeletionQueueError> {
     119             381 :         let mut tenant_generations = HashMap::new();
     120             395 :         for list in &self.pending_lists {
     121              28 :             for (tenant_id, tenant_list) in &list.tenants {
     122              14 :                 // Note: DeletionLists are in logical time order, so generation always
     123              14 :                 // goes up.  By doing a simple insert() we will always end up with
     124              14 :                 // the latest generation seen for a tenant.
     125              14 :                 tenant_generations.insert(*tenant_id, tenant_list.generation);
     126              14 :             }
     127                 :         }
     128                 : 
     129             381 :         let pending_lsn_updates = {
     130             381 :             let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned");
     131             381 :             std::mem::take(&mut *lsn_table)
     132                 :         };
     133             388 :         for (tenant_id, update) in &pending_lsn_updates.tenants {
     134               7 :             let entry = tenant_generations
     135               7 :                 .entry(*tenant_id)
     136               7 :                 .or_insert(update.generation);
     137               7 :             if update.generation > *entry {
     138 UBC           0 :                 *entry = update.generation;
     139 CBC           7 :             }
     140                 :         }
     141                 : 
     142             381 :         if tenant_generations.is_empty() {
     143                 :             // No work to do
     144             367 :             return Ok(());
     145              14 :         }
     146                 : 
     147              14 :         let tenants_valid = if let Some(control_plane_client) = &self.control_plane_client {
     148              14 :             match control_plane_client
     149              14 :                 .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
     150              41 :                 .await
     151                 :             {
     152              13 :                 Ok(tenants) => tenants,
     153                 :                 Err(RetryForeverError::ShuttingDown) => {
     154                 :                     // The only way a validation call returns an error is when the cancellation token fires
     155               1 :                     return Err(DeletionQueueError::ShuttingDown);
     156                 :                 }
     157                 :             }
     158                 :         } else {
     159                 :             // Control plane API disabled.  In legacy mode we consider everything valid.
     160 UBC           0 :             tenant_generations.keys().map(|k| (*k, true)).collect()
     161                 :         };
     162                 : 
     163 CBC          13 :         let mut validated_sequence: Option<u64> = None;
     164                 : 
     165                 :         // Apply the validation results to the pending LSN updates
     166              19 :         for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
     167               6 :             let validated_generation = tenant_generations
     168               6 :                 .get(&tenant_id)
     169               6 :                 .expect("Map was built from the same keys we're reading");
     170               6 : 
     171               6 :             let valid = tenants_valid
     172               6 :                 .get(&tenant_id)
     173               6 :                 .copied()
     174               6 :                 // If the tenant was missing from the validation response, it has been deleted.
     175               6 :                 // The Timeline that requested the LSN update is probably already torn down,
     176               6 :                 // or will be torn down soon.  In this case, drop the update by setting valid=false.
     177               6 :                 .unwrap_or(false);
     178               6 : 
     179               6 :             if valid && *validated_generation == tenant_lsn_state.generation {
     180              10 :                 for (_timeline_id, pending_lsn) in tenant_lsn_state.timelines {
     181               5 :                     pending_lsn.result_slot.store(pending_lsn.projected);
     182               5 :                 }
     183                 :             } else {
     184                 :                 // If we failed validation, then do not apply any of the projected updates
     185               1 :                 warn!("Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}", tenant_lsn_state.generation);
     186               1 :                 metrics::DELETION_QUEUE.dropped_lsn_updates.inc();
     187                 :             }
     188                 :         }
     189                 : 
     190                 :         // Apply the validation results to the pending deletion lists
     191              27 :         for list in &mut self.pending_lists {
     192                 :             // Filter the list based on whether the server responded valid: true.
     193                 :             // If a tenant is omitted in the response, it has been deleted, and we should
     194                 :             // proceed with deletion.
     195              14 :             let mut mutated = false;
     196              14 :             list.tenants.retain(|tenant_id, tenant| {
     197              14 :                 let validated_generation = tenant_generations
     198              14 :                     .get(tenant_id)
     199              14 :                     .expect("Map was built from the same keys we're reading");
     200              14 : 
     201              14 :                 // If the tenant was missing from the validation response, it has been deleted.
     202              14 :                 // This means that a deletion is valid, but also redundant since the tenant's
     203              14 :                 // objects should have already been deleted.  Treat it as invalid to drop the
     204              14 :                 // redundant deletion.
     205              14 :                 let valid = tenants_valid.get(tenant_id).copied().unwrap_or(false);
     206                 : 
     207                 :                 // A list is valid if it comes from the current _or previous_ generation.
     208                 :                 // - The previous generation case is permitted due to how we store deletion lists locally:
     209                 :                 // if we see the immediately previous generation in a locally stored deletion list,
     210                 :                 // it proves that this node's disk was used for both current & previous generations,
     211                 :                 // and therefore no other node was involved in between: the two generations may be
     212                 :                 // logically treated as the same.
     213                 :                 // - In that previous generation case, we rewrote it to the current generation
     214                 :                 // in recover(), so the comparison here is simply an equality.
     215                 : 
     216              14 :                 let this_list_valid = valid
     217              11 :                     && (tenant.generation == *validated_generation);
     218                 : 
     219              14 :                 if !this_list_valid {
     220               4 :                     warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
     221               4 :                     metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
     222               4 :                     mutated = true;
     223              10 :                 } else {
     224              10 :                     metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
     225              10 :                 }
     226              14 :                 this_list_valid
     227              14 :             });
     228              14 :             list.validated = true;
     229              14 : 
     230              14 :             if mutated {
     231                 :                 // Save the deletion list if we had to make changes due to stale generations.  The
     232                 :                 // saved list is valid for execution.
     233               4 :                 if let Err(e) = list.save(self.conf).await {
     234                 :                     // Highly unexpected.  Could happen if e.g. disk full.
     235                 :                     // If we didn't save the trimmed list, it is _not_ valid to execute.
     236 UBC           0 :                     warn!("Failed to save modified deletion list {list}: {e:#}");
     237               0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     238               0 : 
     239               0 :                     // Rather than have a complex retry process, just drop it and leak the objects,
     240               0 :                     // scrubber will clean up eventually.
     241               0 :                     list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution.
     242               0 : 
     243               0 :                     // We must remember this failure, to prevent later writing out a header that
     244               0 :                     // would imply the unwritable list was valid on disk.
     245               0 :                     if self.list_write_failed.is_none() {
     246               0 :                         self.list_write_failed = Some(list.sequence);
     247               0 :                     }
     248 CBC           4 :                 }
     249              10 :             }
     250                 : 
     251              14 :             validated_sequence = Some(list.sequence);
     252                 :         }
     253                 : 
     254              13 :         if let Some(validated_sequence) = validated_sequence {
     255              13 :             if let Some(list_write_failed) = self.list_write_failed {
     256                 :                 // Rare error case: we failed to write out a deletion list to excise invalid
     257                 :                 // entries, so we cannot advance the header's valid sequence number past that point.
     258                 :                 //
     259                 :                 // In this state we will continue to validate, execute and delete deletion lists,
     260                 :                 // we just cannot update the header.  It should be noticed and fixed by a human due to
     261                 :                 // the nonzero value of our unexpected_errors metric.
     262 UBC           0 :                 warn!(
     263               0 :                     sequence_number = list_write_failed,
     264               0 :                     "Cannot write header because writing a deletion list failed earlier",
     265               0 :                 );
     266                 :             } else {
     267                 :                 // Write the queue header to record how far validation progressed.  This avoids having
     268                 :                 // to rewrite each DeletionList to set validated=true in it.
     269 CBC          13 :                 let header = DeletionHeader::new(validated_sequence);
     270                 : 
     271                 :                 // Drop result because the validated_sequence is an optimization.  If we fail to save it,
     272                 :                 // then restart, we will drop some deletion lists, creating work for scrubber.
     273                 :                 // The save() function logs a warning on error.
     274              13 :                 if let Err(e) = header.save(self.conf).await {
     275 UBC           0 :                     warn!("Failed to write deletion queue header: {e:#}");
     276               0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     277 CBC          13 :                 }
     278                 :             }
     279 LBC         (1) :         }
     280                 : 
     281                 :         // Transfer the validated lists to the validated queue, for eventual execution
     282 CBC          13 :         self.validated_lists.append(&mut self.pending_lists);
     283              13 : 
     284              13 :         Ok(())
     285             381 :     }
     286                 : 
     287              13 :     async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
     288              27 :         for list_path in list_paths {
     289 UBC           0 :             debug!("Removing deletion list {list_path}");
     290                 : 
     291 CBC          14 :             if let Err(e) = tokio::fs::remove_file(&list_path).await {
     292                 :                 // Unexpected: we should have permissions and nothing else should
     293                 :                 // be touching these files.  We will leave the file behind.  Subsequent
     294                 :                 // pageservers will try and load it again: hopefully whatever storage
     295                 :                 // issue (probably permissions) has been fixed by then.
     296 UBC           0 :                 tracing::error!("Failed to delete {list_path}: {e:#}");
     297               0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     298               0 :                 break;
     299 CBC          14 :             }
     300                 :         }
     301              13 :     }
     302                 : 
     303             381 :     async fn flush(&mut self) -> Result<(), DeletionQueueError> {
     304 UBC           0 :         tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
     305                 : 
     306                 :         // Issue any required generation validation calls to the control plane
     307 CBC         381 :         self.validate().await?;
     308                 : 
     309                 :         // After successful validation, nothing is pending: any lists that
     310                 :         // made it through validation will be in validated_lists.
     311             380 :         assert!(self.pending_lists.is_empty());
     312             380 :         self.pending_key_count = 0;
     313                 : 
     314 UBC           0 :         tracing::debug!(
     315               0 :             "Validation complete, have {} validated lists",
     316               0 :             self.validated_lists.len()
     317               0 :         );
     318                 : 
     319                 :         // Return quickly if we have no validated lists to execute.  This avoids flushing the
     320                 :         // executor when an idle backend hits its autoflush interval
     321 CBC         380 :         if self.validated_lists.is_empty() {
     322             365 :             return Ok(());
     323              15 :         }
     324              15 : 
     325              15 :         // Drain `validated_lists` into the executor
     326              15 :         let mut executing_lists = Vec::new();
     327              16 :         for list in self.validated_lists.drain(..) {
     328              16 :             let list_path = self.conf.deletion_list_path(list.sequence);
     329              16 :             let objects = list.into_remote_paths();
     330              16 :             self.tx
     331              16 :                 .send(DeleterMessage::Delete(objects))
     332 UBC           0 :                 .await
     333 CBC          16 :                 .map_err(|_| DeletionQueueError::ShuttingDown)?;
     334              16 :             executing_lists.push(list_path);
     335                 :         }
     336                 : 
     337              15 :         self.flush_executor().await?;
     338                 : 
     339                 :         // Erase the deletion lists whose keys have all be deleted from remote storage
     340              14 :         self.cleanup_lists(executing_lists).await;
     341                 : 
     342              13 :         Ok(())
     343             379 :     }
     344                 : 
     345              15 :     async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
     346              15 :         // Flush the executor, so that all the keys referenced by these deletion lists
     347              15 :         // are actually removed from remote storage.  This is a precondition to deleting
     348              15 :         // the deletion lists themselves.
     349              15 :         let (flush_op, rx) = FlushOp::new();
     350              15 :         self.tx
     351              15 :             .send(DeleterMessage::Flush(flush_op))
     352 UBC           0 :             .await
     353 CBC          15 :             .map_err(|_| DeletionQueueError::ShuttingDown)?;
     354                 : 
     355              15 :         rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
     356              13 :     }
     357                 : 
     358             564 :     pub(super) async fn background(&mut self) {
     359             564 :         tracing::info!("Started deletion backend worker");
     360                 : 
     361             966 :         while !self.cancel.is_cancelled() {
     362             965 :             let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
     363              77 :                 Ok(Some(m)) => m,
     364                 :                 Ok(None) => {
     365                 :                     // All queue senders closed
     366             144 :                     info!("Shutting down");
     367             144 :                     break;
     368                 :                 }
     369                 :                 Err(_) => {
     370                 :                     // Timeout, we hit deadline to execute whatever we have in hand.  These functions will
     371                 :                     // return immediately if no work is pending.
     372             327 :                     match self.flush().await {
     373             324 :                         Ok(()) => {}
     374               1 :                         Err(DeletionQueueError::ShuttingDown) => {
     375               1 :                             // If we are shutting down, then auto-flush can safely be skipped
     376               1 :                         }
     377                 :                     }
     378                 : 
     379             325 :                     continue;
     380                 :                 }
     381                 :             };
     382                 : 
     383              77 :             match msg {
     384              23 :                 ValidatorQueueMessage::Delete(list) => {
     385              23 :                     if list.validated {
     386                 :                         // A pre-validated list may only be seen during recovery, if we are recovering
     387                 :                         // a DeletionList whose on-disk state has validated=true
     388               2 :                         self.validated_lists.push(list)
     389              21 :                     } else {
     390              21 :                         self.pending_key_count += list.len();
     391              21 :                         self.pending_lists.push(list);
     392              21 :                     }
     393                 : 
     394              23 :                     if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
     395 UBC           0 :                         match self.flush().await {
     396               0 :                             Ok(()) => {}
     397               0 :                             Err(DeletionQueueError::ShuttingDown) => {
     398               0 :                                 // If we are shutting down, then auto-flush can safely be skipped
     399               0 :                             }
     400                 :                         }
     401 CBC          23 :                     }
     402                 :                 }
     403              54 :                 ValidatorQueueMessage::Flush(op) => {
     404              54 :                     match self.flush().await {
     405              54 :                         Ok(()) => {
     406              54 :                             op.notify();
     407              54 :                         }
     408 UBC           0 :                         Err(DeletionQueueError::ShuttingDown) => {
     409               0 :                             // If we fail due to shutting down, we will just drop `op` to propagate that status.
     410               0 :                         }
     411                 :                     }
     412                 :                 }
     413                 :             }
     414                 :         }
     415 CBC         145 :     }
     416                 : }
        

Generated by: LCOV version 2.1-beta