LCOV - code coverage report
Current view: top level - pageserver/src/deletion_queue - validator.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 71.2 % 212 151
Test Date: 2025-03-12 00:01:28 Functions: 25.5 % 51 13

            Line data    Source code
       1              : //! The validator is responsible for validating DeletionLists for execution,
       2              : //! based on whethe the generation in the DeletionList is still the latest
       3              : //! generation for a tenant.
       4              : //!
       5              : //! The purpose of validation is to ensure split-brain safety in the cluster
       6              : //! of pageservers: a deletion may only be executed if the tenant generation
       7              : //! that originated it is still current.  See docs/rfcs/025-generation-numbers.md
       8              : //! The purpose of accumulating lists before validating them is to reduce load
       9              : //! on the control plane API by issuing fewer, larger requests.
      10              : //!
      11              : //! In addition to validating DeletionLists, the validator validates updates to remote_consistent_lsn
      12              : //! for timelines: these are logically deletions because the safekeepers use remote_consistent_lsn
      13              : //! to decide when old
      14              : //!
      15              : //! Deletions are passed onward to the Deleter.
      16              : 
      17              : use std::collections::HashMap;
      18              : use std::sync::Arc;
      19              : use std::time::Duration;
      20              : 
      21              : use camino::Utf8PathBuf;
      22              : use tokio_util::sync::CancellationToken;
      23              : use tracing::{debug, info, warn};
      24              : 
      25              : use super::deleter::DeleterMessage;
      26              : use super::{DeletionHeader, DeletionList, DeletionQueueError, FlushOp, VisibleLsnUpdates};
      27              : use crate::config::PageServerConf;
      28              : use crate::controller_upcall_client::{ControlPlaneGenerationsApi, RetryForeverError};
      29              : use crate::metrics;
      30              : use crate::virtual_file::MaybeFatalIo;
      31              : 
      32              : // After this length of time, do any validation work that is pending,
      33              : // even if we haven't accumulated many keys to delete.
      34              : //
      35              : // This also causes updates to remote_consistent_lsn to be validated, even
      36              : // if there were no deletions enqueued.
      37              : const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
      38              : 
      39              : // If we have received this number of keys, proceed with attempting to execute
      40              : const AUTOFLUSH_KEY_COUNT: usize = 16384;
      41              : 
      42              : #[derive(Debug)]
      43              : pub(super) enum ValidatorQueueMessage {
      44              :     Delete(DeletionList),
      45              :     Flush(FlushOp),
      46              : }
      47              : pub(super) struct Validator<C>
      48              : where
      49              :     C: ControlPlaneGenerationsApi,
      50              : {
      51              :     conf: &'static PageServerConf,
      52              :     rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
      53              :     tx: tokio::sync::mpsc::Sender<DeleterMessage>,
      54              : 
      55              :     // Client for calling into control plane API for validation of deletes
      56              :     controller_upcall_client: Option<C>,
      57              : 
      58              :     // DeletionLists which are waiting generation validation.  Not safe to
      59              :     // execute until [`validate`] has processed them.
      60              :     pending_lists: Vec<DeletionList>,
      61              : 
      62              :     // DeletionLists which have passed validation and are ready to execute.
      63              :     validated_lists: Vec<DeletionList>,
      64              : 
      65              :     // Sum of all the lengths of lists in pending_lists
      66              :     pending_key_count: usize,
      67              : 
      68              :     // Lsn validation state: we read projected LSNs and write back visible LSNs
      69              :     // after validation.  This is the LSN equivalent of `pending_validation_lists`:
      70              :     // it is drained in [`validate`]
      71              :     lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
      72              : 
      73              :     // If we failed to rewrite a deletion list due to local filesystem I/O failure,
      74              :     // we must remember that and refuse to advance our persistent validated sequence
      75              :     // number past the failure.
      76              :     list_write_failed: Option<u64>,
      77              : 
      78              :     cancel: CancellationToken,
      79              : }
      80              : 
      81              : impl<C> Validator<C>
      82              : where
      83              :     C: ControlPlaneGenerationsApi,
      84              : {
      85           16 :     pub(super) fn new(
      86           16 :         conf: &'static PageServerConf,
      87           16 :         rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
      88           16 :         tx: tokio::sync::mpsc::Sender<DeleterMessage>,
      89           16 :         controller_upcall_client: Option<C>,
      90           16 :         lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
      91           16 :         cancel: CancellationToken,
      92           16 :     ) -> Self {
      93           16 :         Self {
      94           16 :             conf,
      95           16 :             rx,
      96           16 :             tx,
      97           16 :             controller_upcall_client,
      98           16 :             lsn_table,
      99           16 :             pending_lists: Vec::new(),
     100           16 :             validated_lists: Vec::new(),
     101           16 :             pending_key_count: 0,
     102           16 :             list_write_failed: None,
     103           16 :             cancel,
     104           16 :         }
     105           16 :     }
     106              :     /// Process any outstanding validations of generations of pending LSN updates or pending
     107              :     /// DeletionLists.
     108              :     ///
     109              :     /// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
     110              :     /// go into the queue of ready-to-execute lists.
     111           24 :     async fn validate(&mut self) -> Result<(), DeletionQueueError> {
     112           24 :         let mut tenant_generations = HashMap::new();
     113           44 :         for list in &self.pending_lists {
     114           40 :             for (tenant_id, tenant_list) in &list.tenants {
     115           20 :                 // Note: DeletionLists are in logical time order, so generation always
     116           20 :                 // goes up.  By doing a simple insert() we will always end up with
     117           20 :                 // the latest generation seen for a tenant.
     118           20 :                 tenant_generations.insert(*tenant_id, tenant_list.generation);
     119           20 :             }
     120              :         }
     121              : 
     122           24 :         let pending_lsn_updates = {
     123           24 :             let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned");
     124           24 :             std::mem::take(&mut *lsn_table)
     125              :         };
     126           24 :         for (tenant_id, update) in &pending_lsn_updates.tenants {
     127            0 :             let entry = tenant_generations
     128            0 :                 .entry(*tenant_id)
     129            0 :                 .or_insert(update.generation);
     130            0 :             if update.generation > *entry {
     131            0 :                 *entry = update.generation;
     132            0 :             }
     133              :         }
     134              : 
     135           24 :         if tenant_generations.is_empty() {
     136              :             // No work to do
     137            8 :             return Ok(());
     138           16 :         }
     139              : 
     140           16 :         let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client {
     141           16 :             match controller_upcall_client
     142           16 :                 .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
     143           16 :                 .await
     144              :             {
     145           16 :                 Ok(tenants) => tenants,
     146              :                 Err(RetryForeverError::ShuttingDown) => {
     147              :                     // The only way a validation call returns an error is when the cancellation token fires
     148            0 :                     return Err(DeletionQueueError::ShuttingDown);
     149              :                 }
     150              :             }
     151              :         } else {
     152              :             // Control plane API disabled.  In legacy mode we consider everything valid.
     153            0 :             tenant_generations.keys().map(|k| (*k, true)).collect()
     154              :         };
     155              : 
     156           16 :         let mut validated_sequence: Option<u64> = None;
     157              : 
     158              :         // Apply the validation results to the pending LSN updates
     159           16 :         for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
     160            0 :             let validated_generation = tenant_generations
     161            0 :                 .get(&tenant_id)
     162            0 :                 .expect("Map was built from the same keys we're reading");
     163            0 : 
     164            0 :             let valid = tenants_valid
     165            0 :                 .get(&tenant_id)
     166            0 :                 .copied()
     167            0 :                 // If the tenant was missing from the validation response, it has been deleted.
     168            0 :                 // The Timeline that requested the LSN update is probably already torn down,
     169            0 :                 // or will be torn down soon.  In this case, drop the update by setting valid=false.
     170            0 :                 .unwrap_or(false);
     171            0 : 
     172            0 :             if valid && *validated_generation == tenant_lsn_state.generation {
     173            0 :                 for (timeline_id, pending_lsn) in tenant_lsn_state.timelines {
     174            0 :                     tracing::debug!(
     175              :                         %tenant_id,
     176              :                         %timeline_id,
     177            0 :                         current = %pending_lsn.result_slot.load(),
     178            0 :                         projected = %pending_lsn.projected,
     179            0 :                         "advancing validated remote_consistent_lsn",
     180              :                     );
     181            0 :                     pending_lsn.result_slot.store(pending_lsn.projected);
     182              :                 }
     183              :             } else {
     184              :                 // If we failed validation, then do not apply any of the projected updates
     185            0 :                 info!(
     186            0 :                     "Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {:?}",
     187              :                     tenant_lsn_state.generation
     188              :                 );
     189            0 :                 metrics::DELETION_QUEUE.dropped_lsn_updates.inc();
     190              :             }
     191              :         }
     192              : 
     193              :         // Apply the validation results to the pending deletion lists
     194           36 :         for list in &mut self.pending_lists {
     195              :             // Filter the list based on whether the server responded valid: true.
     196              :             // If a tenant is omitted in the response, it has been deleted, and we should
     197              :             // proceed with deletion.
     198           20 :             let mut mutated = false;
     199           20 :             list.tenants.retain(|tenant_id, tenant| {
     200           20 :                 let validated_generation = tenant_generations
     201           20 :                     .get(tenant_id)
     202           20 :                     .expect("Map was built from the same keys we're reading");
     203           20 : 
     204           20 :                 // If the tenant was missing from the validation response, it has been deleted.
     205           20 :                 // This means that a deletion is valid, but also redundant since the tenant's
     206           20 :                 // objects should have already been deleted.  Treat it as invalid to drop the
     207           20 :                 // redundant deletion.
     208           20 :                 let valid = tenants_valid.get(tenant_id).copied().unwrap_or(false);
     209              : 
     210              :                 // A list is valid if it comes from the current _or previous_ generation.
     211              :                 // - The previous generation case is permitted due to how we store deletion lists locally:
     212              :                 // if we see the immediately previous generation in a locally stored deletion list,
     213              :                 // it proves that this node's disk was used for both current & previous generations,
     214              :                 // and therefore no other node was involved in between: the two generations may be
     215              :                 // logically treated as the same.
     216              :                 // - In that previous generation case, we rewrote it to the current generation
     217              :                 // in recover(), so the comparison here is simply an equality.
     218              : 
     219           20 :                 let this_list_valid = valid
     220           16 :                     && (tenant.generation == *validated_generation);
     221              : 
     222           20 :                 if !this_list_valid {
     223            8 :                     info!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
     224            8 :                     metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
     225            8 :                     mutated = true;
     226           12 :                 } else {
     227           12 :                     metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
     228           12 :                 }
     229           20 :                 this_list_valid
     230           20 :             });
     231           20 :             list.validated = true;
     232           20 : 
     233           20 :             if mutated {
     234              :                 // Save the deletion list if we had to make changes due to stale generations.  The
     235              :                 // saved list is valid for execution.
     236            8 :                 if let Err(e) = list.save(self.conf).await {
     237              :                     // Highly unexpected.  Could happen if e.g. disk full.
     238              :                     // If we didn't save the trimmed list, it is _not_ valid to execute.
     239            0 :                     warn!("Failed to save modified deletion list {list}: {e:#}");
     240            0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     241            0 : 
     242            0 :                     // Rather than have a complex retry process, just drop it and leak the objects,
     243            0 :                     // scrubber will clean up eventually.
     244            0 :                     list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution.
     245            0 : 
     246            0 :                     // We must remember this failure, to prevent later writing out a header that
     247            0 :                     // would imply the unwritable list was valid on disk.
     248            0 :                     if self.list_write_failed.is_none() {
     249            0 :                         self.list_write_failed = Some(list.sequence);
     250            0 :                     }
     251            8 :                 }
     252           12 :             }
     253              : 
     254           20 :             validated_sequence = Some(list.sequence);
     255              :         }
     256              : 
     257           16 :         if let Some(validated_sequence) = validated_sequence {
     258           16 :             if let Some(list_write_failed) = self.list_write_failed {
     259              :                 // Rare error case: we failed to write out a deletion list to excise invalid
     260              :                 // entries, so we cannot advance the header's valid sequence number past that point.
     261              :                 //
     262              :                 // In this state we will continue to validate, execute and delete deletion lists,
     263              :                 // we just cannot update the header.  It should be noticed and fixed by a human due to
     264              :                 // the nonzero value of our unexpected_errors metric.
     265            0 :                 warn!(
     266              :                     sequence_number = list_write_failed,
     267            0 :                     "Cannot write header because writing a deletion list failed earlier",
     268              :                 );
     269              :             } else {
     270              :                 // Write the queue header to record how far validation progressed.  This avoids having
     271              :                 // to rewrite each DeletionList to set validated=true in it.
     272           16 :                 let header = DeletionHeader::new(validated_sequence);
     273              : 
     274              :                 // Drop result because the validated_sequence is an optimization.  If we fail to save it,
     275              :                 // then restart, we will drop some deletion lists, creating work for scrubber.
     276              :                 // The save() function logs a warning on error.
     277           16 :                 if let Err(e) = header.save(self.conf).await {
     278            0 :                     warn!("Failed to write deletion queue header: {e:#}");
     279            0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     280           16 :                 }
     281              :             }
     282            0 :         }
     283              : 
     284              :         // Transfer the validated lists to the validated queue, for eventual execution
     285           16 :         self.validated_lists.append(&mut self.pending_lists);
     286           16 : 
     287           16 :         Ok(())
     288           24 :     }
     289              : 
     290           16 :     async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
     291           36 :         for list_path in list_paths {
     292           20 :             debug!("Removing deletion list {list_path}");
     293           20 :             tokio::fs::remove_file(&list_path)
     294           20 :                 .await
     295           20 :                 .fatal_err("remove deletion list");
     296              :         }
     297           16 :     }
     298              : 
     299           24 :     async fn flush(&mut self) -> Result<(), DeletionQueueError> {
     300           24 :         tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
     301              : 
     302              :         // Issue any required generation validation calls to the control plane
     303           24 :         self.validate().await?;
     304              : 
     305              :         // After successful validation, nothing is pending: any lists that
     306              :         // made it through validation will be in validated_lists.
     307           24 :         assert!(self.pending_lists.is_empty());
     308           24 :         self.pending_key_count = 0;
     309           24 : 
     310           24 :         tracing::debug!(
     311            0 :             "Validation complete, have {} validated lists",
     312            0 :             self.validated_lists.len()
     313              :         );
     314              : 
     315              :         // Return quickly if we have no validated lists to execute.  This avoids flushing the
     316              :         // executor when an idle backend hits its autoflush interval
     317           24 :         if self.validated_lists.is_empty() {
     318            8 :             return Ok(());
     319           16 :         }
     320           16 : 
     321           16 :         // Drain `validated_lists` into the executor
     322           16 :         let mut executing_lists = Vec::new();
     323           20 :         for list in self.validated_lists.drain(..) {
     324           20 :             let list_path = self.conf.deletion_list_path(list.sequence);
     325           20 :             let objects = list.into_remote_paths();
     326           20 :             self.tx
     327           20 :                 .send(DeleterMessage::Delete(objects))
     328           20 :                 .await
     329           20 :                 .map_err(|_| DeletionQueueError::ShuttingDown)?;
     330           20 :             executing_lists.push(list_path);
     331              :         }
     332              : 
     333           16 :         self.flush_executor().await?;
     334              : 
     335              :         // Erase the deletion lists whose keys have all be deleted from remote storage
     336           16 :         self.cleanup_lists(executing_lists).await;
     337              : 
     338           16 :         Ok(())
     339           24 :     }
     340              : 
     341           16 :     async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
     342           16 :         // Flush the executor, so that all the keys referenced by these deletion lists
     343           16 :         // are actually removed from remote storage.  This is a precondition to deleting
     344           16 :         // the deletion lists themselves.
     345           16 :         let (flush_op, rx) = FlushOp::new();
     346           16 :         self.tx
     347           16 :             .send(DeleterMessage::Flush(flush_op))
     348           16 :             .await
     349           16 :             .map_err(|_| DeletionQueueError::ShuttingDown)?;
     350              : 
     351           16 :         rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
     352           16 :     }
     353              : 
     354           16 :     pub(super) async fn background(&mut self) {
     355           16 :         tracing::info!("Started deletion backend worker");
     356              : 
     357           68 :         while !self.cancel.is_cancelled() {
     358           68 :             let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
     359           48 :                 Ok(Some(m)) => m,
     360              :                 Ok(None) => {
     361              :                     // All queue senders closed
     362            4 :                     info!("Shutting down");
     363            4 :                     break;
     364              :                 }
     365              :                 Err(_) => {
     366              :                     // Timeout, we hit deadline to execute whatever we have in hand.  These functions will
     367              :                     // return immediately if no work is pending.
     368            4 :                     match self.flush().await {
     369            4 :                         Ok(()) => {}
     370            0 :                         Err(DeletionQueueError::ShuttingDown) => {
     371            0 :                             // If we are shutting down, then auto-flush can safely be skipped
     372            0 :                         }
     373              :                     }
     374              : 
     375            4 :                     continue;
     376              :                 }
     377              :             };
     378              : 
     379           48 :             match msg {
     380           28 :                 ValidatorQueueMessage::Delete(list) => {
     381           28 :                     if list.validated {
     382              :                         // A pre-validated list may only be seen during recovery, if we are recovering
     383              :                         // a DeletionList whose on-disk state has validated=true
     384            0 :                         self.validated_lists.push(list)
     385           28 :                     } else {
     386           28 :                         self.pending_key_count += list.len();
     387           28 :                         self.pending_lists.push(list);
     388           28 :                     }
     389              : 
     390           28 :                     if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
     391            0 :                         match self.flush().await {
     392            0 :                             Ok(()) => {}
     393            0 :                             Err(DeletionQueueError::ShuttingDown) => {
     394            0 :                                 // If we are shutting down, then auto-flush can safely be skipped
     395            0 :                             }
     396              :                         }
     397           28 :                     }
     398              :                 }
     399           20 :                 ValidatorQueueMessage::Flush(op) => {
     400           20 :                     match self.flush().await {
     401           20 :                         Ok(()) => {
     402           20 :                             op.notify();
     403           20 :                         }
     404            0 :                         Err(DeletionQueueError::ShuttingDown) => {
     405            0 :                             // If we fail due to shutting down, we will just drop `op` to propagate that status.
     406            0 :                         }
     407              :                     }
     408              :                 }
     409              :             }
     410              :         }
     411            4 :     }
     412              : }
        

Generated by: LCOV version 2.1-beta