LCOV - differential code coverage report
Current view: top level - pageserver/src - deletion_queue.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 88.1 % 817 720 97 720
Current Date: 2024-01-09 02:06:09 Functions: 58.9 % 192 113 79 113
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : mod deleter;
       2                 : mod list_writer;
       3                 : mod validator;
       4                 : 
       5                 : use std::collections::HashMap;
       6                 : use std::sync::Arc;
       7                 : use std::time::Duration;
       8                 : 
       9                 : use crate::control_plane_client::ControlPlaneGenerationsApi;
      10                 : use crate::metrics;
      11                 : use crate::tenant::remote_timeline_client::remote_layer_path;
      12                 : use crate::tenant::remote_timeline_client::remote_timeline_path;
      13                 : use crate::tenant::remote_timeline_client::LayerFileMetadata;
      14                 : use crate::virtual_file::MaybeFatalIo;
      15                 : use crate::virtual_file::VirtualFile;
      16                 : use anyhow::Context;
      17                 : use camino::Utf8PathBuf;
      18                 : use pageserver_api::shard::TenantShardId;
      19                 : use remote_storage::{GenericRemoteStorage, RemotePath};
      20                 : use serde::Deserialize;
      21                 : use serde::Serialize;
      22                 : use thiserror::Error;
      23                 : use tokio;
      24                 : use tokio_util::sync::CancellationToken;
      25                 : use tracing::Instrument;
      26                 : use tracing::{self, debug, error};
      27                 : use utils::crashsafe::path_with_suffix_extension;
      28                 : use utils::generation::Generation;
      29                 : use utils::id::TimelineId;
      30                 : use utils::lsn::AtomicLsn;
      31                 : use utils::lsn::Lsn;
      32                 : 
      33                 : use self::deleter::Deleter;
      34                 : use self::list_writer::DeletionOp;
      35                 : use self::list_writer::ListWriter;
      36                 : use self::list_writer::RecoverOp;
      37                 : use self::validator::Validator;
      38                 : use deleter::DeleterMessage;
      39                 : use list_writer::ListWriterQueueMessage;
      40                 : use validator::ValidatorQueueMessage;
      41                 : 
      42                 : use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
      43                 : 
      44                 : // TODO: configurable for how long to wait before executing deletions
      45                 : 
      46                 : /// We aggregate object deletions from many tenants in one place, for several reasons:
      47                 : /// - Coalesce deletions into fewer DeleteObjects calls
      48                 : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
      49                 : ///   to flush any outstanding deletions.
      50                 : /// - Globally control throughput of deletions, as these are a low priority task: do
      51                 : ///   not compete with the same S3 clients/connections used for higher priority uploads.
      52                 : /// - Enable gating deletions on validation of a tenant's generation number, to make
      53                 : ///   it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
      54                 : ///
      55                 : /// There are two kinds of deletion: deferred and immediate.  A deferred deletion
      56                 : /// may be intentionally delayed to protect passive readers of S3 data, and is
      57                 : /// subject to a generation number validation step.  An immediate deletion is
      58                 : /// ready to execute immediately, and is only queued up so that it can be coalesced
      59                 : /// with other deletions in flight.
      60                 : ///
      61                 : /// Deferred deletions pass through three steps:
      62                 : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
      63                 : ///   DeletionLists, which are persisted to disk.
      64                 : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
      65                 : ///   the keys in the list onward for actual deletion.  Also validate remote_consistent_lsn
      66                 : ///   updates for running timelines.
      67                 : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
      68                 : ///   batches of 1000 keys via DeleteObjects.
      69                 : ///
      70                 : /// Non-deferred deletions, such as during timeline deletion, bypass the first
      71                 : /// two stages and are passed straight into the Deleter.
      72                 : ///
      73                 : /// Internally, each stage is joined by a channel to the next.  On disk, there is only
      74                 : /// one queue (of DeletionLists), which is written by the frontend and consumed
      75                 : /// by the backend.
      76 CBC         159 : #[derive(Clone)]
      77                 : pub struct DeletionQueue {
      78                 :     client: DeletionQueueClient,
      79                 : 
      80                 :     // Parent cancellation token for the tokens passed into background workers
      81                 :     cancel: CancellationToken,
      82                 : }
      83                 : 
      84                 : /// Opaque wrapper around individual worker tasks, to avoid making the
      85                 : /// worker objects themselves public
      86                 : pub struct DeletionQueueWorkers<C>
      87                 : where
      88                 :     C: ControlPlaneGenerationsApi + Send + Sync,
      89                 : {
      90                 :     frontend: ListWriter,
      91                 :     backend: Validator<C>,
      92                 :     executor: Deleter,
      93                 : }
      94                 : 
      95                 : impl<C> DeletionQueueWorkers<C>
      96                 : where
      97                 :     C: ControlPlaneGenerationsApi + Send + Sync + 'static,
      98                 : {
      99             561 :     pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
     100             561 :         let jh_frontend = runtime.spawn(async move {
     101             561 :             self.frontend
     102             561 :                 .background()
     103             561 :                 .instrument(tracing::info_span!(parent:None, "deletion frontend"))
     104            3452 :                 .await
     105             561 :         });
     106             561 :         let jh_backend = runtime.spawn(async move {
     107             561 :             self.backend
     108             561 :                 .background()
     109             561 :                 .instrument(tracing::info_span!(parent:None, "deletion backend"))
     110            1902 :                 .await
     111             561 :         });
     112             561 :         let jh_executor = runtime.spawn(async move {
     113             561 :             self.executor
     114             561 :                 .background()
     115             561 :                 .instrument(tracing::info_span!(parent:None, "deletion executor"))
     116           10532 :                 .await
     117             561 :         });
     118             561 : 
     119             561 :         runtime.spawn({
     120             561 :             async move {
     121             561 :                 jh_frontend.await.expect("error joining frontend worker");
     122             160 :                 jh_backend.await.expect("error joining backend worker");
     123             160 :                 drop(jh_executor.await.expect("error joining executor worker"));
     124             561 :             }
     125             561 :         })
     126             561 :     }
     127                 : }
     128                 : 
     129                 : /// A FlushOp is just a oneshot channel, where we send the transmit side down
     130                 : /// another channel, and the receive side will receive a message when the channel
     131                 : /// we're flushing has reached the FlushOp we sent into it.
     132                 : ///
     133                 : /// The only extra behavior beyond the channel is that the notify() method does not
     134                 : /// return an error when the receive side has been dropped, because in this use case
     135                 : /// it is harmless (the code that initiated the flush no longer cares about the result).
     136 UBC           0 : #[derive(Debug)]
     137                 : struct FlushOp {
     138                 :     tx: tokio::sync::oneshot::Sender<()>,
     139                 : }
     140                 : 
     141                 : impl FlushOp {
     142 CBC         721 :     fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
     143             721 :         let (tx, rx) = tokio::sync::oneshot::channel::<()>();
     144             721 :         (Self { tx }, rx)
     145             721 :     }
     146                 : 
     147             719 :     fn notify(self) {
     148             719 :         if self.tx.send(()).is_err() {
     149                 :             // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
     150             104 :             debug!("deletion queue flush from dropped client");
     151             615 :         };
     152             719 :     }
     153                 : }
     154                 : 
     155            4635 : #[derive(Clone, Debug)]
     156                 : pub struct DeletionQueueClient {
     157                 :     tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
     158                 :     executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
     159                 : 
     160                 :     lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
     161                 : }
     162                 : 
     163             155 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     164                 : struct TenantDeletionList {
     165                 :     /// For each Timeline, a list of key fragments to append to the timeline remote path
     166                 :     /// when reconstructing a full key
     167                 :     timelines: HashMap<TimelineId, Vec<String>>,
     168                 : 
     169                 :     /// The generation in which this deletion was emitted: note that this may not be the
     170                 :     /// same as the generation of any layers being deleted.  The generation of the layer
     171                 :     /// has already been absorbed into the keys in `objects`
     172                 :     generation: Generation,
     173                 : }
     174                 : 
     175                 : impl TenantDeletionList {
     176              38 :     pub(crate) fn len(&self) -> usize {
     177              51 :         self.timelines.values().map(|v| v.len()).sum()
     178              38 :     }
     179                 : }
     180                 : 
     181                 : /// Files ending with this suffix will be ignored and erased
     182                 : /// during recovery as startup.
     183                 : const TEMP_SUFFIX: &str = "tmp";
     184                 : 
     185             252 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     186                 : struct DeletionList {
     187                 :     /// Serialization version, for future use
     188                 :     version: u8,
     189                 : 
     190                 :     /// Used for constructing a unique key for each deletion list we write out.
     191                 :     sequence: u64,
     192                 : 
     193                 :     /// To avoid repeating tenant/timeline IDs in every key, we store keys in
     194                 :     /// nested HashMaps by TenantTimelineID.  Each Tenant only appears once
     195                 :     /// with one unique generation ID: if someone tries to push a second generation
     196                 :     /// ID for the same tenant, we will start a new DeletionList.
     197                 :     tenants: HashMap<TenantShardId, TenantDeletionList>,
     198                 : 
     199                 :     /// Avoid having to walk `tenants` to calculate the number of keys in
     200                 :     /// the nested deletion lists
     201                 :     size: usize,
     202                 : 
     203                 :     /// Set to true when the list has undergone validation with the control
     204                 :     /// plane and the remaining contents of `tenants` are valid.  A list may
     205                 :     /// also be implicitly marked valid by DeletionHeader.validated_sequence
     206                 :     /// advancing to >= DeletionList.sequence
     207                 :     #[serde(default)]
     208                 :     #[serde(skip_serializing_if = "std::ops::Not::not")]
     209                 :     validated: bool,
     210                 : }
     211                 : 
     212             115 : #[derive(Debug, Serialize, Deserialize)]
     213                 : struct DeletionHeader {
     214                 :     /// Serialization version, for future use
     215                 :     version: u8,
     216                 : 
     217                 :     /// The highest sequence number (inclusive) that has been validated.  All deletion
     218                 :     /// lists on disk with a sequence <= this value are safe to execute.
     219                 :     validated_sequence: u64,
     220                 : }
     221                 : 
     222                 : impl DeletionHeader {
     223                 :     const VERSION_LATEST: u8 = 1;
     224                 : 
     225              37 :     fn new(validated_sequence: u64) -> Self {
     226              37 :         Self {
     227              37 :             version: Self::VERSION_LATEST,
     228              37 :             validated_sequence,
     229              37 :         }
     230              37 :     }
     231                 : 
     232              37 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     233 UBC           0 :         debug!("Saving deletion list header {:?}", self);
     234 CBC          37 :         let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
     235              37 :         let header_path = conf.deletion_header_path();
     236              37 :         let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
     237              37 :         VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
     238 UBC           0 :             .await
     239 CBC          37 :             .maybe_fatal_err("save deletion header")?;
     240                 : 
     241              37 :         Ok(())
     242              37 :     }
     243                 : }
     244                 : 
     245                 : impl DeletionList {
     246                 :     const VERSION_LATEST: u8 = 1;
     247             611 :     fn new(sequence: u64) -> Self {
     248             611 :         Self {
     249             611 :             version: Self::VERSION_LATEST,
     250             611 :             sequence,
     251             611 :             tenants: HashMap::new(),
     252             611 :             size: 0,
     253             611 :             validated: false,
     254             611 :         }
     255             611 :     }
     256                 : 
     257             707 :     fn is_empty(&self) -> bool {
     258             707 :         self.tenants.is_empty()
     259             707 :     }
     260                 : 
     261            4115 :     fn len(&self) -> usize {
     262            4115 :         self.size
     263            4115 :     }
     264                 : 
     265                 :     /// Returns true if the push was accepted, false if the caller must start a new
     266                 :     /// deletion list.
     267            3152 :     fn push(
     268            3152 :         &mut self,
     269            3152 :         tenant: &TenantShardId,
     270            3152 :         timeline: &TimelineId,
     271            3152 :         generation: Generation,
     272            3152 :         objects: &mut Vec<RemotePath>,
     273            3152 :     ) -> bool {
     274            3152 :         if objects.is_empty() {
     275                 :             // Avoid inserting an empty TimelineDeletionList: this preserves the property
     276                 :             // that if we have no keys, then self.objects is empty (used in Self::is_empty)
     277             353 :             return true;
     278            2799 :         }
     279            2799 : 
     280            2799 :         let tenant_entry = self
     281            2799 :             .tenants
     282            2799 :             .entry(*tenant)
     283            2799 :             .or_insert_with(|| TenantDeletionList {
     284              92 :                 timelines: HashMap::new(),
     285              92 :                 generation,
     286            2799 :             });
     287            2799 : 
     288            2799 :         if tenant_entry.generation != generation {
     289                 :             // Only one generation per tenant per list: signal to
     290                 :             // caller to start a new list.
     291               2 :             return false;
     292            2797 :         }
     293            2797 : 
     294            2797 :         let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
     295            2797 : 
     296            2797 :         let timeline_remote_path = remote_timeline_path(tenant, timeline);
     297            2797 : 
     298            2797 :         self.size += objects.len();
     299            2797 :         timeline_entry.extend(objects.drain(..).map(|p| {
     300            2797 :             p.strip_prefix(&timeline_remote_path)
     301            2797 :                 .expect("Timeline paths always start with the timeline prefix")
     302            2797 :                 .to_string()
     303            2797 :         }));
     304            2797 :         true
     305            3152 :     }
     306                 : 
     307              40 :     fn into_remote_paths(self) -> Vec<RemotePath> {
     308              40 :         let mut result = Vec::new();
     309              40 :         for (tenant, tenant_deletions) in self.tenants.into_iter() {
     310              41 :             for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
     311              41 :                 let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
     312              41 :                 result.extend(
     313              41 :                     timeline_layers
     314              41 :                         .into_iter()
     315             819 :                         .map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
     316              41 :                 );
     317              41 :             }
     318                 :         }
     319                 : 
     320              40 :         result
     321              40 :     }
     322                 : 
     323              61 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     324              61 :         let path = conf.deletion_list_path(self.sequence);
     325              61 :         let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
     326              61 : 
     327              61 :         let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
     328              61 :         VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
     329 UBC           0 :             .await
     330 CBC          61 :             .maybe_fatal_err("save deletion list")
     331              61 :             .map_err(Into::into)
     332              61 :     }
     333                 : }
     334                 : 
     335                 : impl std::fmt::Display for DeletionList {
     336 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     337               0 :         write!(
     338               0 :             f,
     339               0 :             "DeletionList<seq={}, tenants={}, keys={}>",
     340               0 :             self.sequence,
     341               0 :             self.tenants.len(),
     342               0 :             self.size
     343               0 :         )
     344               0 :     }
     345                 : }
     346                 : 
     347                 : struct PendingLsn {
     348                 :     projected: Lsn,
     349                 :     result_slot: Arc<AtomicLsn>,
     350                 : }
     351                 : 
     352                 : struct TenantLsnState {
     353                 :     timelines: HashMap<TimelineId, PendingLsn>,
     354                 : 
     355                 :     // In what generation was the most recent update proposed?
     356                 :     generation: Generation,
     357                 : }
     358                 : 
     359 CBC         659 : #[derive(Default)]
     360                 : struct VisibleLsnUpdates {
     361                 :     tenants: HashMap<TenantShardId, TenantLsnState>,
     362                 : }
     363                 : 
     364                 : impl VisibleLsnUpdates {
     365             602 :     fn new() -> Self {
     366             602 :         Self {
     367             602 :             tenants: HashMap::new(),
     368             602 :         }
     369             602 :     }
     370                 : }
     371                 : 
     372                 : impl std::fmt::Debug for VisibleLsnUpdates {
     373 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     374               0 :         write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
     375               0 :     }
     376                 : }
     377                 : 
     378               0 : #[derive(Error, Debug)]
     379                 : pub enum DeletionQueueError {
     380                 :     #[error("Deletion queue unavailable during shutdown")]
     381                 :     ShuttingDown,
     382                 : }
     383                 : 
     384                 : impl DeletionQueueClient {
     385               0 :     pub(crate) fn broken() -> Self {
     386               0 :         // Channels whose receivers are immediately dropped.
     387               0 :         let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
     388               0 :         let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
     389               0 :         Self {
     390               0 :             tx,
     391               0 :             executor_tx,
     392               0 :             lsn_table: Arc::default(),
     393               0 :         }
     394               0 :     }
     395                 : 
     396                 :     /// This is cancel-safe.  If you drop the future before it completes, the message
     397                 :     /// is not pushed, although in the context of the deletion queue it doesn't matter: once
     398                 :     /// we decide to do a deletion the decision is always final.
     399 CBC        3994 :     fn do_push<T>(
     400            3994 :         &self,
     401            3994 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     402            3994 :         msg: T,
     403            3994 :     ) -> Result<(), DeletionQueueError> {
     404            3994 :         match queue.send(msg) {
     405            3994 :             Ok(_) => Ok(()),
     406 UBC           0 :             Err(e) => {
     407               0 :                 // This shouldn't happen, we should shut down all tenants before
     408               0 :                 // we shut down the global delete queue.  If we encounter a bug like this,
     409               0 :                 // we may leak objects as deletions won't be processed.
     410               0 :                 error!("Deletion queue closed while pushing, shutting down? ({e})");
     411               0 :                 Err(DeletionQueueError::ShuttingDown)
     412                 :             }
     413                 :         }
     414 CBC        3994 :     }
     415                 : 
     416             560 :     pub(crate) fn recover(
     417             560 :         &self,
     418             560 :         attached_tenants: HashMap<TenantShardId, Generation>,
     419             560 :     ) -> Result<(), DeletionQueueError> {
     420             560 :         self.do_push(
     421             560 :             &self.tx,
     422             560 :             ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
     423             560 :         )
     424             560 :     }
     425                 : 
     426                 :     /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
     427                 :     /// world, it must validate its generation number before doing so.  Rather than do this synchronously,
     428                 :     /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
     429                 :     /// recently validated separately.
     430                 :     ///
     431                 :     /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates.  The
     432                 :     /// backend will later wake up and notice that the tenant's generation requires validation.
     433            5095 :     pub(crate) async fn update_remote_consistent_lsn(
     434            5095 :         &self,
     435            5095 :         tenant_shard_id: TenantShardId,
     436            5095 :         timeline_id: TimelineId,
     437            5095 :         current_generation: Generation,
     438            5095 :         lsn: Lsn,
     439            5095 :         result_slot: Arc<AtomicLsn>,
     440            5095 :     ) {
     441            5095 :         let mut locked = self
     442            5095 :             .lsn_table
     443            5095 :             .write()
     444            5095 :             .expect("Lock should never be poisoned");
     445            5095 : 
     446            5095 :         let tenant_entry = locked
     447            5095 :             .tenants
     448            5095 :             .entry(tenant_shard_id)
     449            5095 :             .or_insert(TenantLsnState {
     450            5095 :                 timelines: HashMap::new(),
     451            5095 :                 generation: current_generation,
     452            5095 :             });
     453            5095 : 
     454            5095 :         if tenant_entry.generation != current_generation {
     455               2 :             // Generation might have changed if we were detached and then re-attached: in this case,
     456               2 :             // state from the previous generation cannot be trusted.
     457               2 :             tenant_entry.timelines.clear();
     458               2 :             tenant_entry.generation = current_generation;
     459            5093 :         }
     460                 : 
     461            5095 :         tenant_entry.timelines.insert(
     462            5095 :             timeline_id,
     463            5095 :             PendingLsn {
     464            5095 :                 projected: lsn,
     465            5095 :                 result_slot,
     466            5095 :             },
     467            5095 :         );
     468            5095 :     }
     469                 : 
     470                 :     /// Submit a list of layers for deletion: this function will return before the deletion is
     471                 :     /// persistent, but it may be executed at any time after this function enters: do not push
     472                 :     /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
     473                 :     /// references them).
     474                 :     ///
     475                 :     /// The `current_generation` is the generation of this pageserver's current attachment.  The
     476                 :     /// generations in `layers` are the generations in which those layers were written.
     477            3255 :     pub(crate) async fn push_layers(
     478            3255 :         &self,
     479            3255 :         tenant_shard_id: TenantShardId,
     480            3255 :         timeline_id: TimelineId,
     481            3255 :         current_generation: Generation,
     482            3255 :         layers: Vec<(LayerFileName, LayerFileMetadata)>,
     483            3255 :     ) -> Result<(), DeletionQueueError> {
     484            3255 :         if current_generation.is_none() {
     485 UBC           0 :             debug!("Enqueuing deletions in legacy mode, skipping queue");
     486                 : 
     487 CBC          24 :             let mut layer_paths = Vec::new();
     488              48 :             for (layer, meta) in layers {
     489              24 :                 layer_paths.push(remote_layer_path(
     490              24 :                     &tenant_shard_id.tenant_id,
     491              24 :                     &timeline_id,
     492              24 :                     meta.shard,
     493              24 :                     &layer,
     494              24 :                     meta.generation,
     495              24 :                 ));
     496              24 :             }
     497              24 :             self.push_immediate(layer_paths).await?;
     498              33 :             return self.flush_immediate().await;
     499            3231 :         }
     500            3231 : 
     501            3231 :         self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
     502            3255 :     }
     503                 : 
     504                 :     /// When a Tenant has a generation, push_layers is always synchronous because
     505                 :     /// the ListValidator channel is an unbounded channel.
     506                 :     ///
     507                 :     /// This can be merged into push_layers when we remove the Generation-less mode
     508                 :     /// support (`<https://github.com/neondatabase/neon/issues/5395>`)
     509            3231 :     pub(crate) fn push_layers_sync(
     510            3231 :         &self,
     511            3231 :         tenant_shard_id: TenantShardId,
     512            3231 :         timeline_id: TimelineId,
     513            3231 :         current_generation: Generation,
     514            3231 :         layers: Vec<(LayerFileName, LayerFileMetadata)>,
     515            3231 :     ) -> Result<(), DeletionQueueError> {
     516            3231 :         metrics::DELETION_QUEUE
     517            3231 :             .keys_submitted
     518            3231 :             .inc_by(layers.len() as u64);
     519            3231 :         self.do_push(
     520            3231 :             &self.tx,
     521            3231 :             ListWriterQueueMessage::Delete(DeletionOp {
     522            3231 :                 tenant_shard_id,
     523            3231 :                 timeline_id,
     524            3231 :                 layers,
     525            3231 :                 generation: current_generation,
     526            3231 :                 objects: Vec::new(),
     527            3231 :             }),
     528            3231 :         )
     529            3231 :     }
     530                 : 
     531                 :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     532             203 :     async fn do_flush<T>(
     533             203 :         &self,
     534             203 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     535             203 :         msg: T,
     536             203 :         rx: tokio::sync::oneshot::Receiver<()>,
     537             203 :     ) -> Result<(), DeletionQueueError> {
     538             203 :         self.do_push(queue, msg)?;
     539             205 :         if rx.await.is_err() {
     540                 :             // This shouldn't happen if tenants are shut down before deletion queue.  If we
     541                 :             // encounter a bug like this, then a flusher will incorrectly believe it has flushed
     542                 :             // when it hasn't, possibly leading to leaking objects.
     543 UBC           0 :             error!("Deletion queue dropped flush op while client was still waiting");
     544               0 :             Err(DeletionQueueError::ShuttingDown)
     545                 :         } else {
     546 CBC         203 :             Ok(())
     547                 :         }
     548             203 :     }
     549                 : 
     550                 :     /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
     551                 :     ///
     552                 :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     553             186 :     pub async fn flush(&self) -> Result<(), DeletionQueueError> {
     554             186 :         let (flush_op, rx) = FlushOp::new();
     555             186 :         self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
     556             188 :             .await
     557             186 :     }
     558                 : 
     559                 :     /// Issue a flush without waiting for it to complete.  This is useful on advisory flushes where
     560                 :     /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
     561                 :     /// detach where flushing is nice but not necessary.
     562                 :     ///
     563                 :     /// This function provides no guarantees of work being done.
     564             104 :     pub fn flush_advisory(&self) {
     565             104 :         let (flush_op, _) = FlushOp::new();
     566             104 : 
     567             104 :         // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
     568             104 :         drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
     569             104 :     }
     570                 : 
     571                 :     // Wait until all previous deletions are executed
     572              17 :     pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
     573 UBC           0 :         debug!("flush_execute: flushing to deletion lists...");
     574                 :         // Flush any buffered work to deletion lists
     575 CBC          17 :         self.flush().await?;
     576                 : 
     577                 :         // Flush the backend into the executor of deletion lists
     578              17 :         let (flush_op, rx) = FlushOp::new();
     579 UBC           0 :         debug!("flush_execute: flushing backend...");
     580 CBC          17 :         self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
     581              17 :             .await?;
     582 UBC           0 :         debug!("flush_execute: finished flushing backend...");
     583                 : 
     584                 :         // Flush any immediate-mode deletions (the above backend flush will only flush
     585                 :         // the executor if deletions had flowed through the backend)
     586               0 :         debug!("flush_execute: flushing execution...");
     587 CBC          17 :         self.flush_immediate().await?;
     588 UBC           0 :         debug!("flush_execute: finished flushing execution...");
     589 CBC          17 :         Ok(())
     590              17 :     }
     591                 : 
     592                 :     /// This interface bypasses the persistent deletion queue, and any validation
     593                 :     /// that this pageserver is still elegible to execute the deletions.  It is for
     594                 :     /// use in timeline deletions, where the control plane is telling us we may
     595                 :     /// delete everything in the timeline.
     596                 :     ///
     597                 :     /// DO NOT USE THIS FROM GC OR COMPACTION CODE.  Use the regular `push_layers`.
     598             456 :     pub(crate) async fn push_immediate(
     599             456 :         &self,
     600             456 :         objects: Vec<RemotePath>,
     601             456 :     ) -> Result<(), DeletionQueueError> {
     602             456 :         metrics::DELETION_QUEUE
     603             456 :             .keys_submitted
     604             456 :             .inc_by(objects.len() as u64);
     605             456 :         self.executor_tx
     606             456 :             .send(DeleterMessage::Delete(objects))
     607               9 :             .await
     608             456 :             .map_err(|_| DeletionQueueError::ShuttingDown)
     609             456 :     }
     610                 : 
     611                 :     /// Companion to push_immediate.  When this returns Ok, all prior objects sent
     612                 :     /// into push_immediate have been deleted from remote storage.
     613             375 :     pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
     614             375 :         let (flush_op, rx) = FlushOp::new();
     615             375 :         self.executor_tx
     616             375 :             .send(DeleterMessage::Flush(flush_op))
     617               9 :             .await
     618             375 :             .map_err(|_| DeletionQueueError::ShuttingDown)?;
     619                 : 
     620             375 :         rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
     621             375 :     }
     622                 : }
     623                 : 
     624                 : impl DeletionQueue {
     625            1118 :     pub fn new_client(&self) -> DeletionQueueClient {
     626            1118 :         self.client.clone()
     627            1118 :     }
     628                 : 
     629                 :     /// Caller may use the returned object to construct clients with new_client.
     630                 :     /// Caller should tokio::spawn the background() members of the two worker objects returned:
     631                 :     /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
     632                 :     ///
     633                 :     /// If remote_storage is None, then the returned workers will also be None.
     634             561 :     pub fn new<C>(
     635             561 :         remote_storage: Option<GenericRemoteStorage>,
     636             561 :         control_plane_client: Option<C>,
     637             561 :         conf: &'static PageServerConf,
     638             561 :     ) -> (Self, Option<DeletionQueueWorkers<C>>)
     639             561 :     where
     640             561 :         C: ControlPlaneGenerationsApi + Send + Sync,
     641             561 :     {
     642             561 :         // Unbounded channel: enables non-async functions to submit deletions.  The actual length is
     643             561 :         // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
     644             561 :         // enough to avoid this taking pathologically large amount of memory.
     645             561 :         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
     646             561 : 
     647             561 :         // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
     648             561 :         let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
     649             561 : 
     650             561 :         // Shallow channel: it carries lists of paths, and we expect the main queueing to
     651             561 :         // happen in the backend (persistent), not in this queue.
     652             561 :         let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
     653             561 : 
     654             561 :         let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
     655             561 : 
     656             561 :         // The deletion queue has an independent cancellation token to
     657             561 :         // the general pageserver shutdown token, because it stays alive a bit
     658             561 :         // longer to flush after Tenants have all been torn down.
     659             561 :         let cancel = CancellationToken::new();
     660                 : 
     661             561 :         let remote_storage = match remote_storage {
     662                 :             None => {
     663 UBC           0 :                 return (
     664               0 :                     Self {
     665               0 :                         client: DeletionQueueClient {
     666               0 :                             tx,
     667               0 :                             executor_tx,
     668               0 :                             lsn_table: lsn_table.clone(),
     669               0 :                         },
     670               0 :                         cancel,
     671               0 :                     },
     672               0 :                     None,
     673               0 :                 )
     674                 :             }
     675 CBC         561 :             Some(r) => r,
     676             561 :         };
     677             561 : 
     678             561 :         (
     679             561 :             Self {
     680             561 :                 client: DeletionQueueClient {
     681             561 :                     tx,
     682             561 :                     executor_tx: executor_tx.clone(),
     683             561 :                     lsn_table: lsn_table.clone(),
     684             561 :                 },
     685             561 :                 cancel: cancel.clone(),
     686             561 :             },
     687             561 :             Some(DeletionQueueWorkers {
     688             561 :                 frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
     689             561 :                 backend: Validator::new(
     690             561 :                     conf,
     691             561 :                     backend_rx,
     692             561 :                     executor_tx,
     693             561 :                     control_plane_client,
     694             561 :                     lsn_table.clone(),
     695             561 :                     cancel.clone(),
     696             561 :                 ),
     697             561 :                 executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
     698             561 :             }),
     699             561 :         )
     700             561 :     }
     701                 : 
     702             159 :     pub async fn shutdown(&mut self, timeout: Duration) {
     703             159 :         self.cancel.cancel();
     704             159 : 
     705             161 :         match tokio::time::timeout(timeout, self.client.flush()).await {
     706                 :             Ok(Ok(())) => {
     707             159 :                 tracing::info!("Deletion queue flushed successfully on shutdown")
     708                 :             }
     709                 :             Ok(Err(DeletionQueueError::ShuttingDown)) => {
     710                 :                 // This is not harmful for correctness, but is unexpected: the deletion
     711                 :                 // queue's workers should stay alive as long as there are any client handles instantiated.
     712 UBC           0 :                 tracing::warn!("Deletion queue stopped prematurely");
     713                 :             }
     714               0 :             Err(_timeout) => {
     715               0 :                 tracing::warn!("Timed out flushing deletion queue on shutdown")
     716                 :             }
     717                 :         }
     718 CBC         159 :     }
     719                 : }
     720                 : 
     721                 : #[cfg(test)]
     722                 : mod test {
     723                 :     use camino::Utf8Path;
     724                 :     use hex_literal::hex;
     725                 :     use pageserver_api::shard::ShardIndex;
     726                 :     use std::{io::ErrorKind, time::Duration};
     727                 :     use tracing::info;
     728                 : 
     729                 :     use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
     730                 :     use tokio::task::JoinHandle;
     731                 : 
     732                 :     use crate::{
     733                 :         control_plane_client::RetryForeverError,
     734                 :         repository::Key,
     735                 :         tenant::{
     736                 :             harness::TenantHarness, remote_timeline_client::remote_timeline_path,
     737                 :             storage_layer::DeltaFileName,
     738                 :         },
     739                 :     };
     740                 : 
     741                 :     use super::*;
     742                 :     pub const TIMELINE_ID: TimelineId =
     743                 :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
     744                 : 
     745                 :     pub const EXAMPLE_LAYER_NAME: LayerFileName = LayerFileName::Delta(DeltaFileName {
     746                 :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     747                 :         lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
     748                 :     });
     749                 : 
     750                 :     // When you need a second layer in a test.
     751                 :     pub const EXAMPLE_LAYER_NAME_ALT: LayerFileName = LayerFileName::Delta(DeltaFileName {
     752                 :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     753                 :         lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
     754                 :     });
     755                 : 
     756                 :     struct TestSetup {
     757                 :         harness: TenantHarness,
     758                 :         remote_fs_dir: Utf8PathBuf,
     759                 :         storage: GenericRemoteStorage,
     760                 :         mock_control_plane: MockControlPlane,
     761                 :         deletion_queue: DeletionQueue,
     762                 :         worker_join: JoinHandle<()>,
     763                 :     }
     764                 : 
     765                 :     impl TestSetup {
     766                 :         /// Simulate a pageserver restart by destroying and recreating the deletion queue
     767               1 :         async fn restart(&mut self) {
     768               1 :             let (deletion_queue, workers) = DeletionQueue::new(
     769               1 :                 Some(self.storage.clone()),
     770               1 :                 Some(self.mock_control_plane.clone()),
     771               1 :                 self.harness.conf,
     772               1 :             );
     773                 : 
     774 UBC           0 :             tracing::debug!("Spawning worker for new queue queue");
     775 CBC           1 :             let worker_join = workers
     776               1 :                 .unwrap()
     777               1 :                 .spawn_with(&tokio::runtime::Handle::current());
     778               1 : 
     779               1 :             let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
     780               1 :             let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
     781                 : 
     782 UBC           0 :             tracing::debug!("Joining worker from previous queue");
     783 CBC           1 :             old_deletion_queue.cancel.cancel();
     784               1 :             old_worker_join
     785               1 :                 .await
     786               1 :                 .expect("Failed to join workers for previous deletion queue");
     787               1 :         }
     788                 : 
     789               3 :         fn set_latest_generation(&self, gen: Generation) {
     790               3 :             let tenant_shard_id = self.harness.tenant_shard_id;
     791               3 :             self.mock_control_plane
     792               3 :                 .latest_generation
     793               3 :                 .lock()
     794               3 :                 .unwrap()
     795               3 :                 .insert(tenant_shard_id, gen);
     796               3 :         }
     797                 : 
     798                 :         /// Returns remote layer file name, suitable for use in assert_remote_files
     799               3 :         fn write_remote_layer(
     800               3 :             &self,
     801               3 :             file_name: LayerFileName,
     802               3 :             gen: Generation,
     803               3 :         ) -> anyhow::Result<String> {
     804               3 :             let tenant_shard_id = self.harness.tenant_shard_id;
     805               3 :             let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     806               3 :             let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
     807               3 :             std::fs::create_dir_all(&remote_timeline_path)?;
     808               3 :             let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
     809               3 : 
     810               3 :             let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
     811               3 : 
     812               3 :             std::fs::write(
     813               3 :                 remote_timeline_path.join(remote_layer_file_name.clone()),
     814               3 :                 content,
     815               3 :             )?;
     816                 : 
     817               3 :             Ok(remote_layer_file_name)
     818               3 :         }
     819                 :     }
     820                 : 
     821               4 :     #[derive(Debug, Clone)]
     822                 :     struct MockControlPlane {
     823                 :         pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantShardId, Generation>>>,
     824                 :     }
     825                 : 
     826                 :     impl MockControlPlane {
     827               3 :         fn new() -> Self {
     828               3 :             Self {
     829               3 :                 latest_generation: Arc::default(),
     830               3 :             }
     831               3 :         }
     832                 :     }
     833                 : 
     834                 :     #[async_trait::async_trait]
     835                 :     impl ControlPlaneGenerationsApi for MockControlPlane {
     836                 :         #[allow(clippy::diverging_sub_expression)] // False positive via async_trait
     837 UBC           0 :         async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
     838               0 :             unimplemented!()
     839               0 :         }
     840 CBC           4 :         async fn validate(
     841               4 :             &self,
     842               4 :             tenants: Vec<(TenantShardId, Generation)>,
     843               4 :         ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     844               4 :             let mut result = HashMap::new();
     845               4 : 
     846               4 :             let latest_generation = self.latest_generation.lock().unwrap();
     847                 : 
     848               8 :             for (tenant_shard_id, generation) in tenants {
     849               4 :                 if let Some(latest) = latest_generation.get(&tenant_shard_id) {
     850               4 :                     result.insert(tenant_shard_id, *latest == generation);
     851               4 :                 }
     852                 :             }
     853                 : 
     854               4 :             Ok(result)
     855               8 :         }
     856                 :     }
     857                 : 
     858               3 :     fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
     859               3 :         let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
     860               3 :         let harness = TenantHarness::create(test_name)?;
     861                 : 
     862                 :         // We do not load() the harness: we only need its config and remote_storage
     863                 : 
     864                 :         // Set up a GenericRemoteStorage targetting a directory
     865               3 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs");
     866               3 :         std::fs::create_dir_all(remote_fs_dir)?;
     867               3 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
     868               3 :         let storage_config = RemoteStorageConfig {
     869               3 :             storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
     870               3 :         };
     871               3 :         let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
     872               3 : 
     873               3 :         let mock_control_plane = MockControlPlane::new();
     874               3 : 
     875               3 :         let (deletion_queue, worker) = DeletionQueue::new(
     876               3 :             Some(storage.clone()),
     877               3 :             Some(mock_control_plane.clone()),
     878               3 :             harness.conf,
     879               3 :         );
     880               3 : 
     881               3 :         let worker = worker.unwrap();
     882               3 :         let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
     883               3 : 
     884               3 :         Ok(TestSetup {
     885               3 :             harness,
     886               3 :             remote_fs_dir,
     887               3 :             storage,
     888               3 :             mock_control_plane,
     889               3 :             deletion_queue,
     890               3 :             worker_join,
     891               3 :         })
     892               3 :     }
     893                 : 
     894                 :     // TODO: put this in a common location so that we can share with remote_timeline_client's tests
     895               9 :     fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
     896               9 :         let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
     897               9 :         expected.sort();
     898               9 : 
     899               9 :         let mut found: Vec<String> = Vec::new();
     900               9 :         let dir = match std::fs::read_dir(remote_path) {
     901               9 :             Ok(d) => d,
     902 UBC           0 :             Err(e) => {
     903               0 :                 if e.kind() == ErrorKind::NotFound {
     904               0 :                     if expected.is_empty() {
     905                 :                         // We are asserting prefix is empty: it is expected that the dir is missing
     906               0 :                         return;
     907                 :                     } else {
     908               0 :                         assert_eq!(expected, Vec::<String>::new());
     909               0 :                         unreachable!();
     910                 :                     }
     911                 :                 } else {
     912               0 :                     panic!("Unexpected error listing {remote_path}: {e}");
     913                 :                 }
     914                 :             }
     915                 :         };
     916                 : 
     917 CBC           9 :         for entry in dir.flatten() {
     918               8 :             let entry_name = entry.file_name();
     919               8 :             let fname = entry_name.to_str().unwrap();
     920               8 :             found.push(String::from(fname));
     921               8 :         }
     922               9 :         found.sort();
     923               9 : 
     924               9 :         assert_eq!(expected, found);
     925               9 :     }
     926                 : 
     927               5 :     fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
     928               5 :         let dir = match std::fs::read_dir(directory) {
     929               4 :             Ok(d) => d,
     930                 :             Err(_) => {
     931               1 :                 assert_eq!(expected, &Vec::<String>::new());
     932               1 :                 return;
     933                 :             }
     934                 :         };
     935               4 :         let mut found = Vec::new();
     936               9 :         for dentry in dir {
     937               5 :             let dentry = dentry.unwrap();
     938               5 :             let file_name = dentry.file_name();
     939               5 :             let file_name_str = file_name.to_string_lossy();
     940               5 :             found.push(file_name_str.to_string());
     941               5 :         }
     942               4 :         found.sort();
     943               4 :         assert_eq!(expected, found);
     944               5 :     }
     945                 : 
     946               1 :     #[tokio::test]
     947               1 :     async fn deletion_queue_smoke() -> anyhow::Result<()> {
     948               1 :         // Basic test that the deletion queue processes the deletions we pass into it
     949               1 :         let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
     950               1 :         let client = ctx.deletion_queue.new_client();
     951               1 :         client.recover(HashMap::new())?;
     952                 : 
     953               1 :         let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
     954               1 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
     955               1 : 
     956               1 :         let content: Vec<u8> = "victim1 contents".into();
     957               1 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     958               1 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
     959               1 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
     960               1 : 
     961               1 :         // Exercise the distinction between the generation of the layers
     962               1 :         // we delete, and the generation of the running Tenant.
     963               1 :         let layer_generation = Generation::new(0xdeadbeef);
     964               1 :         let now_generation = Generation::new(0xfeedbeef);
     965               1 :         let layer_metadata =
     966               1 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
     967               1 : 
     968               1 :         let remote_layer_file_name_1 =
     969               1 :             format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
     970               1 : 
     971               1 :         // Set mock control plane state to valid for our generation
     972               1 :         ctx.set_latest_generation(now_generation);
     973               1 : 
     974               1 :         // Inject a victim file to remote storage
     975               1 :         info!("Writing");
     976               1 :         std::fs::create_dir_all(&remote_timeline_path)?;
     977               1 :         std::fs::write(
     978               1 :             remote_timeline_path.join(remote_layer_file_name_1.clone()),
     979               1 :             content,
     980               1 :         )?;
     981               1 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     982               1 : 
     983               1 :         // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
     984               1 :         info!("Pushing");
     985               1 :         client
     986               1 :             .push_layers(
     987               1 :                 tenant_shard_id,
     988               1 :                 TIMELINE_ID,
     989               1 :                 now_generation,
     990               1 :                 [(layer_file_name_1.clone(), layer_metadata)].to_vec(),
     991               1 :             )
     992 UBC           0 :             .await?;
     993 CBC           1 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     994               1 : 
     995               1 :         assert_local_files(&[], &deletion_prefix);
     996               1 : 
     997               1 :         // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
     998               1 :         info!("Flushing");
     999               1 :         client.flush().await?;
    1000               1 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
    1001               1 :         assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
    1002               1 : 
    1003               1 :         // File should go away when we execute
    1004               1 :         info!("Flush-executing");
    1005               3 :         client.flush_execute().await?;
    1006               1 :         assert_remote_files(&[], &remote_timeline_path);
    1007               1 :         assert_local_files(&["header-01"], &deletion_prefix);
    1008               1 : 
    1009               1 :         // Flushing on an empty queue should succeed immediately, and not write any lists
    1010               1 :         info!("Flush-executing on empty");
    1011               3 :         client.flush_execute().await?;
    1012               1 :         assert_local_files(&["header-01"], &deletion_prefix);
    1013               1 : 
    1014               1 :         Ok(())
    1015                 :     }
    1016                 : 
    1017               1 :     #[tokio::test]
    1018               1 :     async fn deletion_queue_validation() -> anyhow::Result<()> {
    1019               1 :         let ctx = setup("deletion_queue_validation").expect("Failed test setup");
    1020               1 :         let client = ctx.deletion_queue.new_client();
    1021               1 :         client.recover(HashMap::new())?;
    1022                 : 
    1023                 :         // Generation that the control plane thinks is current
    1024               1 :         let latest_generation = Generation::new(0xdeadbeef);
    1025               1 :         // Generation that our DeletionQueue thinks the tenant is running with
    1026               1 :         let stale_generation = latest_generation.previous();
    1027               1 :         // Generation that our example layer file was written with
    1028               1 :         let layer_generation = stale_generation.previous();
    1029               1 :         let layer_metadata =
    1030               1 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
    1031               1 : 
    1032               1 :         ctx.set_latest_generation(latest_generation);
    1033               1 : 
    1034               1 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
    1035               1 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
    1036               1 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
    1037                 : 
    1038                 :         // Initial state: a remote layer exists
    1039               1 :         let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
    1040               1 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
    1041               1 : 
    1042               1 :         tracing::debug!("Pushing...");
    1043               1 :         client
    1044               1 :             .push_layers(
    1045               1 :                 tenant_shard_id,
    1046               1 :                 TIMELINE_ID,
    1047               1 :                 stale_generation,
    1048               1 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1049               1 :             )
    1050 UBC           0 :             .await?;
    1051                 : 
    1052                 :         // We enqueued the operation in a stale generation: it should have failed validation
    1053 CBC           1 :         tracing::debug!("Flushing...");
    1054               3 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1055               1 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
    1056               1 : 
    1057               1 :         tracing::debug!("Pushing...");
    1058               1 :         client
    1059               1 :             .push_layers(
    1060               1 :                 tenant_shard_id,
    1061               1 :                 TIMELINE_ID,
    1062               1 :                 latest_generation,
    1063               1 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1064               1 :             )
    1065 UBC           0 :             .await?;
    1066                 : 
    1067                 :         // We enqueued the operation in a fresh generation: it should have passed validation
    1068 CBC           1 :         tracing::debug!("Flushing...");
    1069               3 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1070               1 :         assert_remote_files(&[], &remote_timeline_path);
    1071               1 : 
    1072               1 :         Ok(())
    1073                 :     }
    1074                 : 
    1075               1 :     #[tokio::test]
    1076               1 :     async fn deletion_queue_recovery() -> anyhow::Result<()> {
    1077               1 :         // Basic test that the deletion queue processes the deletions we pass into it
    1078               1 :         let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
    1079               1 :         let client = ctx.deletion_queue.new_client();
    1080               1 :         client.recover(HashMap::new())?;
    1081                 : 
    1082               1 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
    1083               1 : 
    1084               1 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
    1085               1 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
    1086               1 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
    1087               1 : 
    1088               1 :         let layer_generation = Generation::new(0xdeadbeef);
    1089               1 :         let now_generation = Generation::new(0xfeedbeef);
    1090               1 :         let layer_metadata =
    1091               1 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
    1092                 : 
    1093                 :         // Inject a deletion in the generation before generation_now: after restart,
    1094                 :         // this deletion should _not_ get executed (only the immediately previous
    1095                 :         // generation gets that treatment)
    1096               1 :         let remote_layer_file_name_historical =
    1097               1 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
    1098               1 :         client
    1099               1 :             .push_layers(
    1100               1 :                 tenant_shard_id,
    1101               1 :                 TIMELINE_ID,
    1102               1 :                 now_generation.previous(),
    1103               1 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1104               1 :             )
    1105 UBC           0 :             .await?;
    1106                 : 
    1107                 :         // Inject a deletion in the generation before generation_now: after restart,
    1108                 :         // this deletion should get executed, because we execute deletions in the
    1109                 :         // immediately previous generation on the same node.
    1110 CBC           1 :         let remote_layer_file_name_previous =
    1111               1 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
    1112               1 :         client
    1113               1 :             .push_layers(
    1114               1 :                 tenant_shard_id,
    1115               1 :                 TIMELINE_ID,
    1116               1 :                 now_generation,
    1117               1 :                 [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
    1118               1 :             )
    1119 UBC           0 :             .await?;
    1120                 : 
    1121 CBC           1 :         client.flush().await?;
    1122               1 :         assert_remote_files(
    1123               1 :             &[
    1124               1 :                 &remote_layer_file_name_historical,
    1125               1 :                 &remote_layer_file_name_previous,
    1126               1 :             ],
    1127               1 :             &remote_timeline_path,
    1128               1 :         );
    1129               1 : 
    1130               1 :         // Different generatinos for the same tenant will cause two separate
    1131               1 :         // deletion lists to be emitted.
    1132               1 :         assert_local_files(
    1133               1 :             &["0000000000000001-01.list", "0000000000000002-01.list"],
    1134               1 :             &deletion_prefix,
    1135               1 :         );
    1136               1 : 
    1137               1 :         // Simulate a node restart: the latest generation advances
    1138               1 :         let now_generation = now_generation.next();
    1139               1 :         ctx.set_latest_generation(now_generation);
    1140               1 : 
    1141               1 :         // Restart the deletion queue
    1142               1 :         drop(client);
    1143               1 :         ctx.restart().await;
    1144               1 :         let client = ctx.deletion_queue.new_client();
    1145               1 :         client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
    1146                 : 
    1147               1 :         info!("Flush-executing");
    1148               3 :         client.flush_execute().await?;
    1149                 :         // The deletion from immediately prior generation was executed, the one from
    1150                 :         // an older generation was not.
    1151               1 :         assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
    1152               1 :         Ok(())
    1153                 :     }
    1154                 : }
    1155                 : 
    1156                 : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
    1157                 : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
    1158                 : #[cfg(test)]
    1159                 : pub(crate) mod mock {
    1160                 :     use tracing::info;
    1161                 : 
    1162                 :     use crate::tenant::remote_timeline_client::remote_layer_path;
    1163                 : 
    1164                 :     use super::*;
    1165                 :     use std::sync::{
    1166                 :         atomic::{AtomicUsize, Ordering},
    1167                 :         Arc,
    1168                 :     };
    1169                 : 
    1170                 :     pub struct ConsumerState {
    1171                 :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
    1172                 :         executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
    1173                 :     }
    1174                 : 
    1175                 :     impl ConsumerState {
    1176               1 :         async fn consume(&mut self, remote_storage: &GenericRemoteStorage) -> usize {
    1177               1 :             let mut executed = 0;
    1178                 : 
    1179               1 :             info!("Executing all pending deletions");
    1180                 : 
    1181                 :             // Transform all executor messages to generic frontend messages
    1182               1 :             while let Ok(msg) = self.executor_rx.try_recv() {
    1183 UBC           0 :                 match msg {
    1184               0 :                     DeleterMessage::Delete(objects) => {
    1185               0 :                         for path in objects {
    1186               0 :                             match remote_storage.delete(&path).await {
    1187                 :                                 Ok(_) => {
    1188               0 :                                     debug!("Deleted {path}");
    1189                 :                                 }
    1190               0 :                                 Err(e) => {
    1191               0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1192                 :                                 }
    1193                 :                             }
    1194               0 :                             executed += 1;
    1195                 :                         }
    1196                 :                     }
    1197               0 :                     DeleterMessage::Flush(flush_op) => {
    1198               0 :                         flush_op.notify();
    1199               0 :                     }
    1200                 :                 }
    1201                 :             }
    1202                 : 
    1203 CBC           2 :             while let Ok(msg) = self.rx.try_recv() {
    1204               1 :                 match msg {
    1205               1 :                     ListWriterQueueMessage::Delete(op) => {
    1206               1 :                         let mut objects = op.objects;
    1207               2 :                         for (layer, meta) in op.layers {
    1208               1 :                             objects.push(remote_layer_path(
    1209               1 :                                 &op.tenant_shard_id.tenant_id,
    1210               1 :                                 &op.timeline_id,
    1211               1 :                                 meta.shard,
    1212               1 :                                 &layer,
    1213               1 :                                 meta.generation,
    1214               1 :                             ));
    1215               1 :                         }
    1216                 : 
    1217               2 :                         for path in objects {
    1218               1 :                             info!("Executing deletion {path}");
    1219               1 :                             match remote_storage.delete(&path).await {
    1220                 :                                 Ok(_) => {
    1221 UBC           0 :                                     debug!("Deleted {path}");
    1222                 :                                 }
    1223               0 :                                 Err(e) => {
    1224               0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1225                 :                                 }
    1226                 :                             }
    1227 CBC           1 :                             executed += 1;
    1228                 :                         }
    1229                 :                     }
    1230 UBC           0 :                     ListWriterQueueMessage::Flush(op) => {
    1231               0 :                         op.notify();
    1232               0 :                     }
    1233               0 :                     ListWriterQueueMessage::FlushExecute(op) => {
    1234               0 :                         // We have already executed all prior deletions because mock does them inline
    1235               0 :                         op.notify();
    1236               0 :                     }
    1237               0 :                     ListWriterQueueMessage::Recover(_) => {
    1238               0 :                         // no-op in mock
    1239               0 :                     }
    1240                 :                 }
    1241 CBC           1 :                 info!("All pending deletions have been executed");
    1242                 :             }
    1243                 : 
    1244               1 :             executed
    1245               1 :         }
    1246                 :     }
    1247                 : 
    1248                 :     pub struct MockDeletionQueue {
    1249                 :         tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
    1250                 :         executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
    1251                 :         executed: Arc<AtomicUsize>,
    1252                 :         remote_storage: Option<GenericRemoteStorage>,
    1253                 :         consumer: std::sync::Mutex<ConsumerState>,
    1254                 :         lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
    1255                 :     }
    1256                 : 
    1257                 :     impl MockDeletionQueue {
    1258              41 :         pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
    1259              41 :             let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    1260              41 :             let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
    1261              41 : 
    1262              41 :             let executed = Arc::new(AtomicUsize::new(0));
    1263              41 : 
    1264              41 :             Self {
    1265              41 :                 tx,
    1266              41 :                 executor_tx,
    1267              41 :                 executed,
    1268              41 :                 remote_storage,
    1269              41 :                 consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }),
    1270              41 :                 lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
    1271              41 :             }
    1272              41 :         }
    1273                 : 
    1274                 :         #[allow(clippy::await_holding_lock)]
    1275               1 :         pub async fn pump(&self) {
    1276               1 :             if let Some(remote_storage) = &self.remote_storage {
    1277                 :                 // Permit holding mutex across await, because this is only ever
    1278                 :                 // called once at a time in tests.
    1279               1 :                 let mut locked = self.consumer.lock().unwrap();
    1280               1 :                 let count = locked.consume(remote_storage).await;
    1281               1 :                 self.executed.fetch_add(count, Ordering::Relaxed);
    1282 UBC           0 :             }
    1283 CBC           1 :         }
    1284                 : 
    1285              47 :         pub(crate) fn new_client(&self) -> DeletionQueueClient {
    1286              47 :             DeletionQueueClient {
    1287              47 :                 tx: self.tx.clone(),
    1288              47 :                 executor_tx: self.executor_tx.clone(),
    1289              47 :                 lsn_table: self.lsn_table.clone(),
    1290              47 :             }
    1291              47 :         }
    1292                 :     }
    1293                 : 
    1294                 :     /// Test round-trip serialization/deserialization, and test stability of the format
    1295                 :     /// vs. a static expected string for the serialized version.
    1296               1 :     #[test]
    1297               1 :     fn deletion_list_serialization() -> anyhow::Result<()> {
    1298               1 :         let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
    1299               1 :             .to_string()
    1300               1 :             .parse::<TenantShardId>()?;
    1301               1 :         let timeline_id = "be322c834ed9e709e63b5c9698691910"
    1302               1 :             .to_string()
    1303               1 :             .parse::<TimelineId>()?;
    1304               1 :         let generation = Generation::new(123);
    1305                 : 
    1306               1 :         let object =
    1307               1 :             RemotePath::from_string(&format!("tenants/{tenant_id}/timelines/{timeline_id}/foo"))?;
    1308               1 :         let mut objects = [object].to_vec();
    1309               1 : 
    1310               1 :         let mut example = DeletionList::new(1);
    1311               1 :         example.push(&tenant_id, &timeline_id, generation, &mut objects);
    1312                 : 
    1313               1 :         let encoded = serde_json::to_string(&example)?;
    1314                 : 
    1315               1 :         let expected = "{\"version\":1,\"sequence\":1,\"tenants\":{\"ad6c1a56f5680419d3a16ff55d97ec3c\":{\"timelines\":{\"be322c834ed9e709e63b5c9698691910\":[\"foo\"]},\"generation\":123}},\"size\":1}".to_string();
    1316               1 :         assert_eq!(encoded, expected);
    1317                 : 
    1318               1 :         let decoded = serde_json::from_str::<DeletionList>(&encoded)?;
    1319               1 :         assert_eq!(example, decoded);
    1320                 : 
    1321               1 :         Ok(())
    1322               1 :     }
    1323                 : }
        

Generated by: LCOV version 2.1-beta