LCOV - differential code coverage report
Current view: top level - pageserver/src/deletion_queue - list_writer.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 69.4 % 258 179 79 179
Current Date: 2023-10-19 02:04:12 Functions: 32.4 % 37 12 25 12
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! The list writer is the first stage in the deletion queue.  It accumulates
       2                 : //! layers to delete, and periodically writes out these layers into a persistent
       3                 : //! DeletionList.
       4                 : //!
       5                 : //! The purpose of writing DeletionLists is to decouple the decision to
       6                 : //! delete an object from the validation required to execute it: even if
       7                 : //! validation is not possible, e.g. due to a control plane outage, we can
       8                 : //! still persist our intent to delete an object, in a way that would
       9                 : //! survive a restart.
      10                 : //!
      11                 : //! DeletionLists are passed onwards to the Validator.
      12                 : 
      13                 : use super::DeletionHeader;
      14                 : use super::DeletionList;
      15                 : use super::FlushOp;
      16                 : use super::ValidatorQueueMessage;
      17                 : 
      18                 : use std::collections::HashMap;
      19                 : use std::fs::create_dir_all;
      20                 : use std::time::Duration;
      21                 : 
      22                 : use regex::Regex;
      23                 : use remote_storage::RemotePath;
      24                 : use tokio_util::sync::CancellationToken;
      25                 : use tracing::debug;
      26                 : use tracing::info;
      27                 : use tracing::warn;
      28                 : use utils::generation::Generation;
      29                 : use utils::id::TenantId;
      30                 : use utils::id::TimelineId;
      31                 : 
      32                 : use crate::config::PageServerConf;
      33                 : use crate::deletion_queue::TEMP_SUFFIX;
      34                 : use crate::metrics;
      35                 : use crate::tenant::remote_timeline_client::remote_layer_path;
      36                 : use crate::tenant::storage_layer::LayerFileName;
      37                 : 
      38                 : // The number of keys in a DeletionList before we will proactively persist it
      39                 : // (without reaching a flush deadline).  This aims to deliver objects of the order
      40                 : // of magnitude 1MB when we are under heavy delete load.
      41                 : const DELETION_LIST_TARGET_SIZE: usize = 16384;
      42                 : 
      43                 : // Ordinarily, we only flush to DeletionList periodically, to bound the window during
      44                 : // which we might leak objects from not flushing a DeletionList after
      45                 : // the objects are already unlinked from timeline metadata.
      46                 : const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
      47                 : 
      48                 : // If someone is waiting for a flush to DeletionList, only delay a little to accumulate
      49                 : // more objects before doing the flush.
      50                 : const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
      51                 : 
      52 UBC           0 : #[derive(Debug)]
      53                 : pub(super) struct DeletionOp {
      54                 :     pub(super) tenant_id: TenantId,
      55                 :     pub(super) timeline_id: TimelineId,
      56                 :     // `layers` and `objects` are both just lists of objects.  `layers` is used if you do not
      57                 :     // have a config object handy to project it to a remote key, and need the consuming worker
      58                 :     // to do it for you.
      59                 :     pub(super) layers: Vec<(LayerFileName, Generation)>,
      60                 :     pub(super) objects: Vec<RemotePath>,
      61                 : 
      62                 :     /// The _current_ generation of the Tenant attachment in which we are enqueuing
      63                 :     /// this deletion.
      64                 :     pub(super) generation: Generation,
      65                 : }
      66                 : 
      67               0 : #[derive(Debug)]
      68                 : pub(super) struct RecoverOp {
      69                 :     pub(super) attached_tenants: HashMap<TenantId, Generation>,
      70                 : }
      71                 : 
      72               0 : #[derive(Debug)]
      73                 : pub(super) enum ListWriterQueueMessage {
      74                 :     Delete(DeletionOp),
      75                 :     // Wait until all prior deletions make it into a persistent DeletionList
      76                 :     Flush(FlushOp),
      77                 :     // Wait until all prior deletions have been executed (i.e. objects are actually deleted)
      78                 :     FlushExecute(FlushOp),
      79                 :     // Call once after re-attaching to control plane, to notify the deletion queue about
      80                 :     // latest attached generations & load any saved deletion lists from disk.
      81                 :     Recover(RecoverOp),
      82                 : }
      83                 : 
      84                 : pub(super) struct ListWriter {
      85                 :     conf: &'static PageServerConf,
      86                 : 
      87                 :     // Incoming frontend requests to delete some keys
      88                 :     rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
      89                 : 
      90                 :     // Outbound requests to the backend to execute deletion lists we have composed.
      91                 :     tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
      92                 : 
      93                 :     // The list we are currently building, contains a buffer of keys to delete
      94                 :     // and our next sequence number
      95                 :     pending: DeletionList,
      96                 : 
      97                 :     // These FlushOps should notify the next time we flush
      98                 :     pending_flushes: Vec<FlushOp>,
      99                 : 
     100                 :     // Worker loop is torn down when this fires.
     101                 :     cancel: CancellationToken,
     102                 : 
     103                 :     // Safety guard to do recovery exactly once
     104                 :     recovered: bool,
     105                 : }
     106                 : 
     107                 : impl ListWriter {
     108                 :     // Initially DeletionHeader.validated_sequence is zero.  The place we start our
     109                 :     // sequence numbers must be higher than that.
     110                 :     const BASE_SEQUENCE: u64 = 1;
     111                 : 
     112 CBC         564 :     pub(super) fn new(
     113             564 :         conf: &'static PageServerConf,
     114             564 :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
     115             564 :         tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
     116             564 :         cancel: CancellationToken,
     117             564 :     ) -> Self {
     118             564 :         Self {
     119             564 :             pending: DeletionList::new(Self::BASE_SEQUENCE),
     120             564 :             conf,
     121             564 :             rx,
     122             564 :             tx,
     123             564 :             pending_flushes: Vec::new(),
     124             564 :             cancel,
     125             564 :             recovered: false,
     126             564 :         }
     127             564 :     }
     128                 : 
     129                 :     /// Try to flush `list` to persistent storage
     130                 :     ///
     131                 :     /// This does not return errors, because on failure to flush we do not lose
     132                 :     /// any state: flushing will be retried implicitly on the next deadline
     133             341 :     async fn flush(&mut self) {
     134             341 :         if self.pending.is_empty() {
     135             326 :             for f in self.pending_flushes.drain(..) {
     136 UBC           0 :                 f.notify();
     137               0 :             }
     138 CBC         326 :             return;
     139              15 :         }
     140              15 : 
     141              15 :         match self.pending.save(self.conf).await {
     142                 :             Ok(_) => {
     143              15 :                 info!(sequence = self.pending.sequence, "Stored deletion list");
     144                 : 
     145              15 :                 for f in self.pending_flushes.drain(..) {
     146              12 :                     f.notify();
     147              12 :                 }
     148                 : 
     149                 :                 // Take the list we've accumulated, replace it with a fresh list for the next sequence
     150              15 :                 let next_list = DeletionList::new(self.pending.sequence + 1);
     151              15 :                 let list = std::mem::replace(&mut self.pending, next_list);
     152                 : 
     153              15 :                 if let Err(e) = self.tx.send(ValidatorQueueMessage::Delete(list)).await {
     154                 :                     // This is allowed to fail: it will only happen if the backend worker is shut down,
     155                 :                     // so we can just drop this on the floor.
     156 UBC           0 :                     info!("Deletion list dropped, this is normal during shutdown ({e:#})");
     157 CBC          15 :                 }
     158                 :             }
     159 UBC           0 :             Err(e) => {
     160               0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     161               0 :                 warn!(
     162               0 :                     sequence = self.pending.sequence,
     163               0 :                     "Failed to write deletion list, will retry later ({e:#})"
     164               0 :                 );
     165                 :             }
     166                 :         }
     167 CBC         341 :     }
     168                 : 
     169                 :     /// Load the header, to learn the sequence number up to which deletions
     170                 :     /// have been validated.  We will apply validated=true to DeletionLists
     171                 :     /// <= this sequence when loading them.
     172                 :     ///
     173                 :     /// It is not an error for the header to not exist: we return None, and
     174                 :     /// the caller should act as if validated_sequence is 0
     175              35 :     async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
     176              35 :         let header_path = self.conf.deletion_header_path();
     177              35 :         match tokio::fs::read(&header_path).await {
     178               7 :             Ok(header_bytes) => {
     179               7 :                 match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
     180               7 :                     Ok(h) => Ok(Some(h.validated_sequence)),
     181 UBC           0 :                     Err(e) => {
     182               0 :                         warn!(
     183               0 :                             "Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
     184               0 :                         );
     185                 :                         // This should never happen unless we make a mistake with our serialization.
     186                 :                         // Ignoring a deletion header is not consequential for correctnes because all deletions
     187                 :                         // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
     188               0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     189               0 :                         Ok(None)
     190                 :                     }
     191                 :                 }
     192                 :             }
     193 CBC          28 :             Err(e) => {
     194              28 :                 if e.kind() == std::io::ErrorKind::NotFound {
     195 UBC           0 :                     debug!("Deletion header {header_path} not found, first start?");
     196 CBC          28 :                     Ok(None)
     197                 :                 } else {
     198 UBC           0 :                     Err(anyhow::anyhow!(e))
     199                 :                 }
     200                 :             }
     201                 :         }
     202 CBC          35 :     }
     203                 : 
     204              35 :     async fn recover(
     205              35 :         &mut self,
     206              35 :         attached_tenants: HashMap<TenantId, Generation>,
     207              35 :     ) -> Result<(), anyhow::Error> {
     208 UBC           0 :         debug!(
     209               0 :             "recovering with {} attached tenants",
     210               0 :             attached_tenants.len()
     211               0 :         );
     212                 : 
     213                 :         // Load the header
     214 CBC          35 :         let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
     215              35 : 
     216              35 :         self.pending.sequence = validated_sequence + 1;
     217              35 : 
     218              35 :         let deletion_directory = self.conf.deletion_prefix();
     219              35 :         let mut dir = match tokio::fs::read_dir(&deletion_directory).await {
     220              35 :             Ok(d) => d,
     221 UBC           0 :             Err(e) => {
     222               0 :                 warn!("Failed to open deletion list directory {deletion_directory}: {e:#}");
     223                 : 
     224                 :                 // Give up: if we can't read the deletion list directory, we probably can't
     225                 :                 // write lists into it later, so the queue won't work.
     226               0 :                 return Err(e.into());
     227                 :             }
     228                 :         };
     229                 : 
     230 CBC          35 :         let list_name_pattern =
     231              35 :             Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
     232              35 : 
     233              35 :         let temp_extension = format!(".{TEMP_SUFFIX}");
     234              35 :         let header_path = self.conf.deletion_header_path();
     235              35 :         let mut seqs: Vec<u64> = Vec::new();
     236              50 :         while let Some(dentry) = dir.next_entry().await? {
     237              15 :             let file_name = dentry.file_name();
     238              15 :             let dentry_str = file_name.to_string_lossy();
     239              15 : 
     240              15 :             if file_name == header_path.file_name().unwrap_or("") {
     241                 :                 // Don't try and parse the header's name like a list
     242               7 :                 continue;
     243               8 :             }
     244               8 : 
     245               8 :             if dentry_str.ends_with(&temp_extension) {
     246 UBC           0 :                 info!("Cleaning up temporary file {dentry_str}");
     247               0 :                 let absolute_path =
     248               0 :                     deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
     249               0 :                 if let Err(e) = tokio::fs::remove_file(&absolute_path).await {
     250                 :                     // Non-fatal error: we will just leave the file behind but not
     251                 :                     // try and load it.
     252               0 :                     warn!("Failed to clean up temporary file {absolute_path}: {e:#}");
     253               0 :                 }
     254                 : 
     255               0 :                 continue;
     256 CBC           8 :             }
     257               8 : 
     258               8 :             let file_name = dentry.file_name().to_owned();
     259               8 :             let basename = file_name.to_string_lossy();
     260               8 :             let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
     261               8 :                 m.name("sequence")
     262               8 :                     .expect("Non optional group should be present")
     263               8 :                     .as_str()
     264                 :             } else {
     265 UBC           0 :                 warn!("Unexpected key in deletion queue: {basename}");
     266               0 :                 metrics::DELETION_QUEUE.unexpected_errors.inc();
     267               0 :                 continue;
     268                 :             };
     269                 : 
     270 CBC           8 :             let seq: u64 = match u64::from_str_radix(seq_part, 16) {
     271               8 :                 Ok(s) => s,
     272 UBC           0 :                 Err(e) => {
     273               0 :                     warn!("Malformed key '{basename}': {e}");
     274               0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     275               0 :                     continue;
     276                 :                 }
     277                 :             };
     278 CBC           8 :             seqs.push(seq);
     279                 :         }
     280              35 :         seqs.sort();
     281              35 : 
     282              35 :         // Start our next deletion list from after the last location validated by
     283              35 :         // previous process lifetime, or after the last location found (it is updated
     284              35 :         // below after enumerating the deletion lists)
     285              35 :         self.pending.sequence = validated_sequence + 1;
     286              35 :         if let Some(max_list_seq) = seqs.last() {
     287               7 :             self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
     288              28 :         }
     289                 : 
     290              43 :         for s in seqs {
     291               8 :             let list_path = self.conf.deletion_list_path(s);
     292                 : 
     293               8 :             let list_bytes = tokio::fs::read(&list_path).await?;
     294                 : 
     295               8 :             let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
     296               8 :                 Ok(l) => l,
     297 UBC           0 :                 Err(e) => {
     298                 :                     // Drop the list on the floor: any objects it referenced will be left behind
     299                 :                     // for scrubbing to clean up.  This should never happen unless we have a serialization bug.
     300               0 :                     warn!(sequence = s, "Failed to deserialize deletion list: {e}");
     301               0 :                     metrics::DELETION_QUEUE.unexpected_errors.inc();
     302               0 :                     continue;
     303                 :                 }
     304                 :             };
     305                 : 
     306 CBC           8 :             if deletion_list.sequence <= validated_sequence {
     307               2 :                 // If the deletion list falls below valid_seq, we may assume that it was
     308               2 :                 // already validated the last time this pageserver ran.  Otherwise, we still
     309               2 :                 // load it, as it may still contain content valid in this generation.
     310               2 :                 deletion_list.validated = true;
     311               2 :             } else {
     312                 :                 // Special case optimization: if a tenant is still attached, and no other
     313                 :                 // generation was issued to another node in the interval while we restarted,
     314                 :                 // then we may treat deletion lists from the previous generation as if they
     315                 :                 // belong to our currently attached generation, and proceed to validate & execute.
     316              12 :                 for (tenant_id, tenant_list) in &mut deletion_list.tenants {
     317               6 :                     if let Some(attached_gen) = attached_tenants.get(tenant_id) {
     318               5 :                         if attached_gen.previous() == tenant_list.generation {
     319               3 :                             tenant_list.generation = *attached_gen;
     320               3 :                         }
     321               1 :                     }
     322                 :                 }
     323                 :             }
     324                 : 
     325               8 :             info!(
     326               8 :                 validated = deletion_list.validated,
     327               8 :                 sequence = deletion_list.sequence,
     328               8 :                 "Recovered deletion list"
     329               8 :             );
     330                 : 
     331                 :             // We will drop out of recovery if this fails: it indicates that we are shutting down
     332                 :             // or the backend has panicked
     333               8 :             metrics::DELETION_QUEUE
     334               8 :                 .keys_submitted
     335               8 :                 .inc_by(deletion_list.len() as u64);
     336               8 :             self.tx
     337               8 :                 .send(ValidatorQueueMessage::Delete(deletion_list))
     338 UBC           0 :                 .await?;
     339                 :         }
     340                 : 
     341 CBC          35 :         info!(next_sequence = self.pending.sequence, "Replay complete");
     342                 : 
     343              35 :         Ok(())
     344              35 :     }
     345                 : 
     346                 :     /// This is the front-end ingest, where we bundle up deletion requests into DeletionList
     347                 :     /// and write them out, for later validation by the backend and execution by the executor.
     348             564 :     pub(super) async fn background(&mut self) {
     349             564 :         info!("Started deletion frontend worker");
     350                 : 
     351                 :         // Synchronous, but we only do it once per process lifetime so it's tolerable
     352             564 :         if let Err(e) = create_dir_all(&self.conf.deletion_prefix()) {
     353 UBC           0 :             tracing::error!(
     354               0 :                 "Failed to create deletion list directory {}, deletions will not be executed ({e})",
     355               0 :                 self.conf.deletion_prefix(),
     356               0 :             );
     357               0 :             metrics::DELETION_QUEUE.unexpected_errors.inc();
     358               0 :             return;
     359 CBC         564 :         }
     360                 : 
     361            1212 :         while !self.cancel.is_cancelled() {
     362            1067 :             let timeout = if self.pending_flushes.is_empty() {
     363            1067 :                 FRONTEND_DEFAULT_TIMEOUT
     364                 :             } else {
     365 UBC           0 :                 FRONTEND_FLUSHING_TIMEOUT
     366                 :             };
     367                 : 
     368 CBC        1067 :             let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
     369             320 :                 Ok(Some(msg)) => msg,
     370                 :                 Ok(None) => {
     371                 :                     // Queue sender destroyed, shutting down
     372 UBC           0 :                     break;
     373                 :                 }
     374                 :                 Err(_) => {
     375                 :                     // Hit deadline, flush.
     376 CBC         328 :                     self.flush().await;
     377             328 :                     continue;
     378                 :                 }
     379                 :             };
     380                 : 
     381             320 :             match msg {
     382              61 :                 ListWriterQueueMessage::Delete(op) => {
     383              61 :                     assert!(
     384              61 :                         self.recovered,
     385 UBC           0 :                         "Cannot process deletions before recovery.  This is a bug."
     386                 :                     );
     387                 : 
     388               0 :                     debug!(
     389               0 :                         "Delete: ingesting {} layers, {} other objects",
     390               0 :                         op.layers.len(),
     391               0 :                         op.objects.len()
     392               0 :                     );
     393                 : 
     394 CBC          61 :                     let mut layer_paths = Vec::new();
     395            1733 :                     for (layer, generation) in op.layers {
     396            1672 :                         layer_paths.push(remote_layer_path(
     397            1672 :                             &op.tenant_id,
     398            1672 :                             &op.timeline_id,
     399            1672 :                             &layer,
     400            1672 :                             generation,
     401            1672 :                         ));
     402            1672 :                     }
     403              61 :                     layer_paths.extend(op.objects);
     404              61 : 
     405              61 :                     if !self.pending.push(
     406              61 :                         &op.tenant_id,
     407              61 :                         &op.timeline_id,
     408              61 :                         op.generation,
     409              61 :                         &mut layer_paths,
     410              61 :                     ) {
     411               1 :                         self.flush().await;
     412               1 :                         let retry_succeeded = self.pending.push(
     413               1 :                             &op.tenant_id,
     414               1 :                             &op.timeline_id,
     415               1 :                             op.generation,
     416               1 :                             &mut layer_paths,
     417               1 :                         );
     418               1 :                         if !retry_succeeded {
     419                 :                             // Unexpected: after we flush, we should have
     420                 :                             // drained self.pending, so a conflict on
     421                 :                             // generation numbers should be impossible.
     422 UBC           0 :                             tracing::error!(
     423               0 :                                 "Failed to enqueue deletions, leaking objects.  This is a bug."
     424               0 :                             );
     425               0 :                             metrics::DELETION_QUEUE.unexpected_errors.inc();
     426 CBC           1 :                         }
     427              60 :                     }
     428                 :                 }
     429             170 :                 ListWriterQueueMessage::Flush(op) => {
     430             170 :                     if self.pending.is_empty() {
     431                 :                         // Execute immediately
     432 UBC           0 :                         debug!("Flush: No pending objects, flushing immediately");
     433 CBC         158 :                         op.notify()
     434                 :                     } else {
     435                 :                         // Execute next time we flush
     436 UBC           0 :                         debug!("Flush: adding to pending flush list for next deadline flush");
     437 CBC          12 :                         self.pending_flushes.push(op);
     438                 :                     }
     439                 :                 }
     440              54 :                 ListWriterQueueMessage::FlushExecute(op) => {
     441 UBC           0 :                     debug!("FlushExecute: passing through to backend");
     442                 :                     // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
     443 CBC          54 :                     if let Err(e) = self.tx.send(ValidatorQueueMessage::Flush(op)).await {
     444 UBC           0 :                         info!("Can't flush, shutting down ({e})");
     445                 :                         // Caller will get error when their oneshot sender was dropped.
     446 CBC          54 :                     }
     447                 :                 }
     448              35 :                 ListWriterQueueMessage::Recover(op) => {
     449              35 :                     if self.recovered {
     450 UBC           0 :                         tracing::error!(
     451               0 :                             "Deletion queue recovery called more than once.  This is a bug."
     452               0 :                         );
     453               0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     454               0 :                         // Non-fatal: although this is a bug, since we did recovery at least once we may proceed.
     455               0 :                         continue;
     456 CBC          35 :                     }
     457                 : 
     458             113 :                     if let Err(e) = self.recover(op.attached_tenants).await {
     459                 :                         // This should only happen in truly unrecoverable cases, like the recovery finding that the backend
     460                 :                         // queue receiver has been dropped, or something is critically broken with
     461                 :                         // the local filesystem holding deletion lists.
     462 UBC           0 :                         info!(
     463               0 :                             "Deletion queue recover aborted, deletion queue will not proceed ({e})"
     464               0 :                         );
     465               0 :                         metrics::DELETION_QUEUE.unexpected_errors.inc();
     466               0 :                         return;
     467 CBC          35 :                     } else {
     468              35 :                         self.recovered = true;
     469              35 :                     }
     470                 :                 }
     471                 :             }
     472                 : 
     473             320 :             if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
     474              12 :                 self.flush().await;
     475             308 :             }
     476                 :         }
     477             145 :         info!("Deletion queue shut down.");
     478             145 :     }
     479                 : }
        

Generated by: LCOV version 2.1-beta