LCOV - differential code coverage report
Current view: top level - pageserver/src/deletion_queue - list_writer.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 71.5 % 270 193 77 193
Current Date: 2024-01-09 02:06:09 Functions: 43.2 % 37 16 21 16
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! The list writer is the first stage in the deletion queue.  It accumulates
       2                 : //! layers to delete, and periodically writes out these layers into a persistent
       3                 : //! DeletionList.
       4                 : //!
       5                 : //! The purpose of writing DeletionLists is to decouple the decision to
       6                 : //! delete an object from the validation required to execute it: even if
       7                 : //! validation is not possible, e.g. due to a control plane outage, we can
       8                 : //! still persist our intent to delete an object, in a way that would
       9                 : //! survive a restart.
      10                 : //!
      11                 : //! DeletionLists are passed onwards to the Validator.
      12                 : 
      13                 : use super::DeletionHeader;
      14                 : use super::DeletionList;
      15                 : use super::FlushOp;
      16                 : use super::ValidatorQueueMessage;
      17                 : 
      18                 : use std::collections::HashMap;
      19                 : use std::fs::create_dir_all;
      20                 : use std::time::Duration;
      21                 : 
      22                 : use pageserver_api::shard::TenantShardId;
      23                 : use regex::Regex;
      24                 : use remote_storage::RemotePath;
      25                 : use tokio_util::sync::CancellationToken;
      26                 : use tracing::debug;
      27                 : use tracing::info;
      28                 : use tracing::warn;
      29                 : use utils::generation::Generation;
      30                 : use utils::id::TimelineId;
      31                 : 
      32                 : use crate::config::PageServerConf;
      33                 : use crate::deletion_queue::TEMP_SUFFIX;
      34                 : use crate::metrics;
      35                 : use crate::tenant::remote_timeline_client::remote_layer_path;
      36                 : use crate::tenant::remote_timeline_client::LayerFileMetadata;
      37                 : use crate::tenant::storage_layer::LayerFileName;
      38                 : use crate::virtual_file::on_fatal_io_error;
      39                 : use crate::virtual_file::MaybeFatalIo;
      40                 : 
      41                 : // The number of keys in a DeletionList before we will proactively persist it
      42                 : // (without reaching a flush deadline).  This aims to deliver objects of the order
      43                 : // of magnitude 1MB when we are under heavy delete load.
      44                 : const DELETION_LIST_TARGET_SIZE: usize = 16384;
      45                 : 
      46                 : // Ordinarily, we only flush to DeletionList periodically, to bound the window during
      47                 : // which we might leak objects from not flushing a DeletionList after
      48                 : // the objects are already unlinked from timeline metadata.
      49                 : const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
      50                 : 
      51                 : // If someone is waiting for a flush to DeletionList, only delay a little to accumulate
      52                 : // more objects before doing the flush.
      53                 : const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
      54                 : 
      55 UBC           0 : #[derive(Debug)]
      56                 : pub(super) struct DeletionOp {
      57                 :     pub(super) tenant_shard_id: TenantShardId,
      58                 :     pub(super) timeline_id: TimelineId,
      59                 :     // `layers` and `objects` are both just lists of objects.  `layers` is used if you do not
      60                 :     // have a config object handy to project it to a remote key, and need the consuming worker
      61                 :     // to do it for you.
      62                 :     pub(super) layers: Vec<(LayerFileName, LayerFileMetadata)>,
      63                 :     pub(super) objects: Vec<RemotePath>,
      64                 : 
      65                 :     /// The _current_ generation of the Tenant shard attachment in which we are enqueuing
      66                 :     /// this deletion.
      67                 :     pub(super) generation: Generation,
      68                 : }
      69                 : 
      70               0 : #[derive(Debug)]
      71                 : pub(super) struct RecoverOp {
      72                 :     pub(super) attached_tenants: HashMap<TenantShardId, Generation>,
      73                 : }
      74                 : 
      75               0 : #[derive(Debug)]
      76                 : pub(super) enum ListWriterQueueMessage {
      77                 :     Delete(DeletionOp),
      78                 :     // Wait until all prior deletions make it into a persistent DeletionList
      79                 :     Flush(FlushOp),
      80                 :     // Wait until all prior deletions have been executed (i.e. objects are actually deleted)
      81                 :     FlushExecute(FlushOp),
      82                 :     // Call once after re-attaching to control plane, to notify the deletion queue about
      83                 :     // latest attached generations & load any saved deletion lists from disk.
      84                 :     Recover(RecoverOp),
      85                 : }
      86                 : 
      87                 : pub(super) struct ListWriter {
      88                 :     conf: &'static PageServerConf,
      89                 : 
      90                 :     // Incoming frontend requests to delete some keys
      91                 :     rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
      92                 : 
      93                 :     // Outbound requests to the backend to execute deletion lists we have composed.
      94                 :     tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
      95                 : 
      96                 :     // The list we are currently building, contains a buffer of keys to delete
      97                 :     // and our next sequence number
      98                 :     pending: DeletionList,
      99                 : 
     100                 :     // These FlushOps should notify the next time we flush
     101                 :     pending_flushes: Vec<FlushOp>,
     102                 : 
     103                 :     // Worker loop is torn down when this fires.
     104                 :     cancel: CancellationToken,
     105                 : 
     106                 :     // Safety guard to do recovery exactly once
     107                 :     recovered: bool,
     108                 : }
     109                 : 
     110                 : impl ListWriter {
     111                 :     // Initially DeletionHeader.validated_sequence is zero.  The place we start our
     112                 :     // sequence numbers must be higher than that.
     113                 :     const BASE_SEQUENCE: u64 = 1;
     114                 : 
     115 CBC         561 :     pub(super) fn new(
     116             561 :         conf: &'static PageServerConf,
     117             561 :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
     118             561 :         tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
     119             561 :         cancel: CancellationToken,
     120             561 :     ) -> Self {
     121             561 :         Self {
     122             561 :             pending: DeletionList::new(Self::BASE_SEQUENCE),
     123             561 :             conf,
     124             561 :             rx,
     125             561 :             tx,
     126             561 :             pending_flushes: Vec::new(),
     127             561 :             cancel,
     128             561 :             recovered: false,
     129             561 :         }
     130             561 :     }
     131                 : 
     132                 :     /// Try to flush `list` to persistent storage
     133                 :     ///
     134                 :     /// This does not return errors, because on failure to flush we do not lose
     135                 :     /// any state: flushing will be retried implicitly on the next deadline
     136             521 :     async fn flush(&mut self) {
     137             521 :         if self.pending.is_empty() {
     138             472 :             for f in self.pending_flushes.drain(..) {
     139 UBC           0 :                 f.notify();
     140               0 :             }
     141 CBC         472 :             return;
     142              49 :         }
     143              49 : 
     144              49 :         match self.pending.save(self.conf).await {
     145                 :             Ok(_) => {
     146              49 :                 info!(sequence = self.pending.sequence, "Stored deletion list");
     147                 : 
     148              49 :                 for f in self.pending_flushes.drain(..) {
     149              31 :                     f.notify();
     150              31 :                 }
     151                 : 
     152                 :                 // Take the list we've accumulated, replace it with a fresh list for the next sequence
     153              49 :                 let next_list = DeletionList::new(self.pending.sequence + 1);
     154              49 :                 let list = std::mem::replace(&mut self.pending, next_list);
     155                 : 
     156              49 :                 if let Err(e) = self.tx.send(ValidatorQueueMessage::Delete(list)).await {
     157                 :                     // This is allowed to fail: it will only happen if the backend worker is shut down,
     158                 :                     // so we can just drop this on the floor.
     159 UBC           0 :                     info!("Deletion list dropped, this is normal during shutdown ({e:#})");
     160 CBC          49 :                 }
     161                 :             }
     162 UBC           0 :             Err(e) => {
     163               0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     164               0 :                 warn!(
     165               0 :                     sequence = self.pending.sequence,
     166               0 :                     "Failed to write deletion list, will retry later ({e:#})"
     167               0 :                 );
     168                 :             }
     169                 :         }
     170 CBC         521 :     }
     171                 : 
     172                 :     /// Load the header, to learn the sequence number up to which deletions
     173                 :     /// have been validated.  We will apply validated=true to DeletionLists
     174                 :     /// <= this sequence when loading them.
     175                 :     ///
     176                 :     /// It is not an error for the header to not exist: we return None, and
     177                 :     /// the caller should act as if validated_sequence is 0
     178             560 :     async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
     179             560 :         let header_path = self.conf.deletion_header_path();
     180             560 :         match tokio::fs::read(&header_path).await {
     181              23 :             Ok(header_bytes) => {
     182              23 :                 match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
     183              23 :                     Ok(h) => Ok(Some(h.validated_sequence)),
     184 UBC           0 :                     Err(e) => {
     185               0 :                         warn!(
     186               0 :                             "Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
     187               0 :                         );
     188                 :                         // This should never happen unless we make a mistake with our serialization.
     189                 :                         // Ignoring a deletion header is not consequential for correctnes because all deletions
     190                 :                         // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
     191               0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     192               0 :                         Ok(None)
     193                 :                     }
     194                 :                 }
     195                 :             }
     196 CBC         537 :             Err(e) => {
     197             537 :                 if e.kind() == std::io::ErrorKind::NotFound {
     198 UBC           0 :                     debug!("Deletion header {header_path} not found, first start?");
     199 CBC         537 :                     Ok(None)
     200                 :                 } else {
     201 UBC           0 :                     on_fatal_io_error(&e, "reading deletion header");
     202                 :                 }
     203                 :             }
     204                 :         }
     205 CBC         560 :     }
     206                 : 
     207             560 :     async fn recover(
     208             560 :         &mut self,
     209             560 :         attached_tenants: HashMap<TenantShardId, Generation>,
     210             560 :     ) -> Result<(), anyhow::Error> {
     211 UBC           0 :         debug!(
     212               0 :             "recovering with {} attached tenants",
     213               0 :             attached_tenants.len()
     214               0 :         );
     215                 : 
     216                 :         // Load the header
     217 CBC         560 :         let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
     218             560 : 
     219             560 :         self.pending.sequence = validated_sequence + 1;
     220             560 : 
     221             560 :         let deletion_directory = self.conf.deletion_prefix();
     222             560 :         let mut dir = tokio::fs::read_dir(&deletion_directory)
     223             549 :             .await
     224             560 :             .fatal_err("read deletion directory");
     225             560 : 
     226             560 :         let list_name_pattern =
     227             560 :             Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
     228             560 : 
     229             560 :         let temp_extension = format!(".{TEMP_SUFFIX}");
     230             560 :         let header_path = self.conf.deletion_header_path();
     231             560 :         let mut seqs: Vec<u64> = Vec::new();
     232             610 :         while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
     233              50 :             let file_name = dentry.file_name();
     234              50 :             let dentry_str = file_name.to_string_lossy();
     235              50 : 
     236              50 :             if file_name == header_path.file_name().unwrap_or("") {
     237                 :                 // Don't try and parse the header's name like a list
     238              23 :                 continue;
     239              27 :             }
     240              27 : 
     241              27 :             if dentry_str.ends_with(&temp_extension) {
     242 UBC           0 :                 info!("Cleaning up temporary file {dentry_str}");
     243               0 :                 let absolute_path =
     244               0 :                     deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
     245               0 :                 tokio::fs::remove_file(&absolute_path)
     246               0 :                     .await
     247               0 :                     .fatal_err("delete temp file");
     248               0 : 
     249               0 :                 continue;
     250 CBC          27 :             }
     251              27 : 
     252              27 :             let file_name = dentry.file_name().to_owned();
     253              27 :             let basename = file_name.to_string_lossy();
     254              27 :             let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
     255              27 :                 m.name("sequence")
     256              27 :                     .expect("Non optional group should be present")
     257              27 :                     .as_str()
     258                 :             } else {
     259 UBC           0 :                 warn!("Unexpected key in deletion queue: {basename}");
     260               0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     261               0 :                 continue;
     262                 :             };
     263                 : 
     264 CBC          27 :             let seq: u64 = match u64::from_str_radix(seq_part, 16) {
     265              27 :                 Ok(s) => s,
     266 UBC           0 :                 Err(e) => {
     267               0 :                     warn!("Malformed key '{basename}': {e}");
     268               0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     269               0 :                     continue;
     270                 :                 }
     271                 :             };
     272 CBC          27 :             seqs.push(seq);
     273                 :         }
     274             560 :         seqs.sort();
     275             560 : 
     276             560 :         // Start our next deletion list from after the last location validated by
     277             560 :         // previous process lifetime, or after the last location found (it is updated
     278             560 :         // below after enumerating the deletion lists)
     279             560 :         self.pending.sequence = validated_sequence + 1;
     280             560 :         if let Some(max_list_seq) = seqs.last() {
     281              26 :             self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
     282             534 :         }
     283                 : 
     284             587 :         for s in seqs {
     285              27 :             let list_path = self.conf.deletion_list_path(s);
     286                 : 
     287              27 :             let list_bytes = tokio::fs::read(&list_path)
     288              27 :                 .await
     289              27 :                 .fatal_err("read deletion list");
     290                 : 
     291              27 :             let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
     292              27 :                 Ok(l) => l,
     293 UBC           0 :                 Err(e) => {
     294                 :                     // Drop the list on the floor: any objects it referenced will be left behind
     295                 :                     // for scrubbing to clean up.  This should never happen unless we have a serialization bug.
     296               0 :                     warn!(sequence = s, "Failed to deserialize deletion list: {e}");
     297               0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     298               0 :                     continue;
     299                 :                 }
     300                 :             };
     301                 : 
     302 CBC          27 :             if deletion_list.sequence <= validated_sequence {
     303               2 :                 // If the deletion list falls below valid_seq, we may assume that it was
     304               2 :                 // already validated the last time this pageserver ran.  Otherwise, we still
     305               2 :                 // load it, as it may still contain content valid in this generation.
     306               2 :                 deletion_list.validated = true;
     307               2 :             } else {
     308                 :                 // Special case optimization: if a tenant is still attached, and no other
     309                 :                 // generation was issued to another node in the interval while we restarted,
     310                 :                 // then we may treat deletion lists from the previous generation as if they
     311                 :                 // belong to our currently attached generation, and proceed to validate & execute.
     312              53 :                 for (tenant_shard_id, tenant_list) in &mut deletion_list.tenants {
     313              28 :                     if let Some(attached_gen) = attached_tenants.get(tenant_shard_id) {
     314              18 :                         if attached_gen.previous() == tenant_list.generation {
     315              14 :                             info!(
     316              14 :                                 seq=%s, tenant_id=%tenant_shard_id.tenant_id,
     317              14 :                                 shard_id=%tenant_shard_id.shard_slug(),
     318              14 :                                 old_gen=?tenant_list.generation, new_gen=?attached_gen,
     319              14 :                                 "Updating gen on recovered list");
     320              14 :                             tenant_list.generation = *attached_gen;
     321                 :                         } else {
     322               4 :                             info!(
     323               4 :                                 seq=%s, tenant_id=%tenant_shard_id.tenant_id,
     324               4 :                                 shard_id=%tenant_shard_id.shard_slug(),
     325               4 :                                 old_gen=?tenant_list.generation, new_gen=?attached_gen,
     326               4 :                                 "Encountered stale generation on recovered list");
     327                 :                         }
     328              10 :                     }
     329                 :                 }
     330                 :             }
     331                 : 
     332              27 :             info!(
     333              27 :                 validated = deletion_list.validated,
     334              27 :                 sequence = deletion_list.sequence,
     335              27 :                 "Recovered deletion list"
     336              27 :             );
     337                 : 
     338                 :             // We will drop out of recovery if this fails: it indicates that we are shutting down
     339                 :             // or the backend has panicked
     340              27 :             metrics::DELETION_QUEUE
     341              27 :                 .keys_submitted
     342              27 :                 .inc_by(deletion_list.len() as u64);
     343              27 :             self.tx
     344              27 :                 .send(ValidatorQueueMessage::Delete(deletion_list))
     345 UBC           0 :                 .await?;
     346                 :         }
     347                 : 
     348 CBC         560 :         info!(next_sequence = self.pending.sequence, "Replay complete");
     349                 : 
     350             560 :         Ok(())
     351             560 :     }
     352                 : 
     353                 :     /// This is the front-end ingest, where we bundle up deletion requests into DeletionList
     354                 :     /// and write them out, for later validation by the backend and execution by the executor.
     355             561 :     pub(super) async fn background(&mut self) {
     356             561 :         info!("Started deletion frontend worker");
     357                 : 
     358                 :         // Synchronous, but we only do it once per process lifetime so it's tolerable
     359             561 :         if let Err(e) = create_dir_all(self.conf.deletion_prefix()) {
     360 UBC           0 :             tracing::error!(
     361               0 :                 "Failed to create deletion list directory {}, deletions will not be executed ({e})",
     362               0 :                 self.conf.deletion_prefix(),
     363               0 :             );
     364               0 :             metrics::DELETION_QUEUE.unexpected_errors.inc();
     365               0 :             return;
     366 CBC         561 :         }
     367                 : 
     368            5065 :         while !self.cancel.is_cancelled() {
     369            4905 :             let timeout = if self.pending_flushes.is_empty() {
     370            4905 :                 FRONTEND_DEFAULT_TIMEOUT
     371                 :             } else {
     372 UBC           0 :                 FRONTEND_FLUSHING_TIMEOUT
     373                 :             };
     374                 : 
     375 CBC        4905 :             let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
     376            4016 :                 Ok(Some(msg)) => msg,
     377                 :                 Ok(None) => {
     378                 :                     // Queue sender destroyed, shutting down
     379 UBC           0 :                     break;
     380                 :                 }
     381                 :                 Err(_) => {
     382                 :                     // Hit deadline, flush.
     383 CBC         488 :                     self.flush().await;
     384             488 :                     continue;
     385                 :                 }
     386                 :             };
     387                 : 
     388            4016 :             match msg {
     389            3149 :                 ListWriterQueueMessage::Delete(op) => {
     390            3149 :                     assert!(
     391            3149 :                         self.recovered,
     392 UBC           0 :                         "Cannot process deletions before recovery.  This is a bug."
     393                 :                     );
     394                 : 
     395               0 :                     debug!(
     396               0 :                         "Delete: ingesting {} layers, {} other objects",
     397               0 :                         op.layers.len(),
     398               0 :                         op.objects.len()
     399               0 :                     );
     400                 : 
     401 CBC        3149 :                     let mut layer_paths = Vec::new();
     402            5945 :                     for (layer, meta) in op.layers {
     403            2796 :                         layer_paths.push(remote_layer_path(
     404            2796 :                             &op.tenant_shard_id.tenant_id,
     405            2796 :                             &op.timeline_id,
     406            2796 :                             meta.shard,
     407            2796 :                             &layer,
     408            2796 :                             meta.generation,
     409            2796 :                         ));
     410            2796 :                     }
     411            3149 :                     layer_paths.extend(op.objects);
     412            3149 : 
     413            3149 :                     if !self.pending.push(
     414            3149 :                         &op.tenant_shard_id,
     415            3149 :                         &op.timeline_id,
     416            3149 :                         op.generation,
     417            3149 :                         &mut layer_paths,
     418            3149 :                     ) {
     419               2 :                         self.flush().await;
     420               2 :                         let retry_succeeded = self.pending.push(
     421               2 :                             &op.tenant_shard_id,
     422               2 :                             &op.timeline_id,
     423               2 :                             op.generation,
     424               2 :                             &mut layer_paths,
     425               2 :                         );
     426               2 :                         if !retry_succeeded {
     427                 :                             // Unexpected: after we flush, we should have
     428                 :                             // drained self.pending, so a conflict on
     429                 :                             // generation numbers should be impossible.
     430 UBC           0 :                             tracing::error!(
     431               0 :                                 "Failed to enqueue deletions, leaking objects.  This is a bug."
     432               0 :                             );
     433               0 :                             metrics::DELETION_QUEUE.unexpected_errors.inc();
     434 CBC           2 :                         }
     435            3147 :                     }
     436                 :                 }
     437             186 :                 ListWriterQueueMessage::Flush(op) => {
     438             186 :                     if self.pending.is_empty() {
     439                 :                         // Execute immediately
     440 UBC           0 :                         debug!("Flush: No pending objects, flushing immediately");
     441 CBC         155 :                         op.notify()
     442                 :                     } else {
     443                 :                         // Execute next time we flush
     444 UBC           0 :                         debug!("Flush: adding to pending flush list for next deadline flush");
     445 CBC          31 :                         self.pending_flushes.push(op);
     446                 :                     }
     447                 :                 }
     448             121 :                 ListWriterQueueMessage::FlushExecute(op) => {
     449 UBC           0 :                     debug!("FlushExecute: passing through to backend");
     450                 :                     // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
     451 CBC         121 :                     if let Err(e) = self.tx.send(ValidatorQueueMessage::Flush(op)).await {
     452 UBC           0 :                         info!("Can't flush, shutting down ({e})");
     453                 :                         // Caller will get error when their oneshot sender was dropped.
     454 CBC         121 :                     }
     455                 :                 }
     456             560 :                 ListWriterQueueMessage::Recover(op) => {
     457             560 :                     if self.recovered {
     458 UBC           0 :                         tracing::error!(
     459               0 :                             "Deletion queue recovery called more than once.  This is a bug."
     460               0 :                         );
     461               0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     462               0 :                         // Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
     463               0 :                         continue;
     464 CBC         560 :                     }
     465                 : 
     466            1134 :                     if let Err(e) = self.recover(op.attached_tenants).await {
     467                 :                         // This should only happen in truly unrecoverable cases, like the recovery finding that the backend
     468                 :                         // queue receiver has been dropped, or something is critically broken with
     469                 :                         // the local filesystem holding deletion lists.
     470 UBC           0 :                         info!(
     471               0 :                             "Deletion queue recover aborted, deletion queue will not proceed ({e})"
     472               0 :                         );
     473               0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     474               0 :                         return;
     475 CBC         560 :                     } else {
     476             560 :                         self.recovered = true;
     477             560 :                     }
     478                 :                 }
     479                 :             }
     480                 : 
     481            4016 :             if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
     482              31 :                 self.flush().await;
     483            3985 :             }
     484                 :         }
     485             160 :         info!("Deletion queue shut down.");
     486             160 :     }
     487                 : }
        

Generated by: LCOV version 2.1-beta