LCOV - code coverage report
Current view: top level - pageserver/src - deletion_queue.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 89.0 % 802 714
Test Date: 2025-02-20 13:11:02 Functions: 57.6 % 118 68

            Line data    Source code
       1              : mod deleter;
       2              : mod list_writer;
       3              : mod validator;
       4              : 
       5              : use std::collections::HashMap;
       6              : use std::sync::Arc;
       7              : use std::time::Duration;
       8              : 
       9              : use crate::controller_upcall_client::ControlPlaneGenerationsApi;
      10              : use crate::metrics;
      11              : use crate::tenant::remote_timeline_client::remote_timeline_path;
      12              : use crate::tenant::remote_timeline_client::LayerFileMetadata;
      13              : use crate::virtual_file::MaybeFatalIo;
      14              : use crate::virtual_file::VirtualFile;
      15              : use anyhow::Context;
      16              : use camino::Utf8PathBuf;
      17              : use pageserver_api::shard::TenantShardId;
      18              : use remote_storage::{GenericRemoteStorage, RemotePath};
      19              : use serde::Deserialize;
      20              : use serde::Serialize;
      21              : use thiserror::Error;
      22              : use tokio_util::sync::CancellationToken;
      23              : use tracing::Instrument;
      24              : use tracing::{debug, error};
      25              : use utils::crashsafe::path_with_suffix_extension;
      26              : use utils::generation::Generation;
      27              : use utils::id::TimelineId;
      28              : use utils::lsn::AtomicLsn;
      29              : use utils::lsn::Lsn;
      30              : 
      31              : use self::deleter::Deleter;
      32              : use self::list_writer::DeletionOp;
      33              : use self::list_writer::ListWriter;
      34              : use self::list_writer::RecoverOp;
      35              : use self::validator::Validator;
      36              : use deleter::DeleterMessage;
      37              : use list_writer::ListWriterQueueMessage;
      38              : use validator::ValidatorQueueMessage;
      39              : 
      40              : use crate::{config::PageServerConf, tenant::storage_layer::LayerName};
      41              : 
      42              : // TODO: configurable for how long to wait before executing deletions
      43              : 
      44              : /// We aggregate object deletions from many tenants in one place, for several reasons:
      45              : /// - Coalesce deletions into fewer DeleteObjects calls
      46              : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
      47              : ///   to flush any outstanding deletions.
      48              : /// - Globally control throughput of deletions, as these are a low priority task: do
      49              : ///   not compete with the same S3 clients/connections used for higher priority uploads.
      50              : /// - Enable gating deletions on validation of a tenant's generation number, to make
      51              : ///   it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
      52              : ///
      53              : /// There are two kinds of deletion: deferred and immediate.  A deferred deletion
      54              : /// may be intentionally delayed to protect passive readers of S3 data, and is
      55              : /// subject to a generation number validation step.  An immediate deletion is
      56              : /// ready to execute immediately, and is only queued up so that it can be coalesced
      57              : /// with other deletions in flight.
      58              : ///
      59              : /// Deferred deletions pass through three steps:
      60              : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
      61              : ///   DeletionLists, which are persisted to disk.
      62              : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
      63              : ///   the keys in the list onward for actual deletion.  Also validate remote_consistent_lsn
      64              : ///   updates for running timelines.
      65              : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
      66              : ///   batches of 1000 keys via DeleteObjects.
      67              : ///
      68              : /// Non-deferred deletions, such as during timeline deletion, bypass the first
      69              : /// two stages and are passed straight into the Deleter.
      70              : ///
      71              : /// Internally, each stage is joined by a channel to the next.  On disk, there is only
      72              : /// one queue (of DeletionLists), which is written by the frontend and consumed
      73              : /// by the backend.
      74              : #[derive(Clone)]
      75              : pub struct DeletionQueue {
      76              :     client: DeletionQueueClient,
      77              : 
      78              :     // Parent cancellation token for the tokens passed into background workers
      79              :     cancel: CancellationToken,
      80              : }
      81              : 
      82              : /// Opaque wrapper around individual worker tasks, to avoid making the
      83              : /// worker objects themselves public
      84              : pub struct DeletionQueueWorkers<C>
      85              : where
      86              :     C: ControlPlaneGenerationsApi + Send + Sync,
      87              : {
      88              :     frontend: ListWriter,
      89              :     backend: Validator<C>,
      90              :     executor: Deleter,
      91              : }
      92              : 
      93              : impl<C> DeletionQueueWorkers<C>
      94              : where
      95              :     C: ControlPlaneGenerationsApi + Send + Sync + 'static,
      96              : {
      97           16 :     pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
      98           16 :         let jh_frontend = runtime.spawn(async move {
      99           16 :             self.frontend
     100           16 :                 .background()
     101           16 :                 .instrument(tracing::info_span!(parent:None, "deletion frontend"))
     102           16 :                 .await
     103           16 :         });
     104           16 :         let jh_backend = runtime.spawn(async move {
     105           16 :             self.backend
     106           16 :                 .background()
     107           16 :                 .instrument(tracing::info_span!(parent:None, "deletion backend"))
     108           16 :                 .await
     109           16 :         });
     110           16 :         let jh_executor = runtime.spawn(async move {
     111           16 :             self.executor
     112           16 :                 .background()
     113           16 :                 .instrument(tracing::info_span!(parent:None, "deletion executor"))
     114           16 :                 .await
     115           16 :         });
     116           16 : 
     117           16 :         runtime.spawn({
     118           16 :             async move {
     119           16 :                 jh_frontend.await.expect("error joining frontend worker");
     120            4 :                 jh_backend.await.expect("error joining backend worker");
     121            4 :                 drop(jh_executor.await.expect("error joining executor worker"));
     122           16 :             }
     123           16 :         })
     124           16 :     }
     125              : }
     126              : 
     127              : /// A FlushOp is just a oneshot channel, where we send the transmit side down
     128              : /// another channel, and the receive side will receive a message when the channel
     129              : /// we're flushing has reached the FlushOp we sent into it.
     130              : ///
     131              : /// The only extra behavior beyond the channel is that the notify() method does not
     132              : /// return an error when the receive side has been dropped, because in this use case
     133              : /// it is harmless (the code that initiated the flush no longer cares about the result).
     134              : #[derive(Debug)]
     135              : struct FlushOp {
     136              :     tx: tokio::sync::oneshot::Sender<()>,
     137              : }
     138              : 
     139              : impl FlushOp {
     140           84 :     fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
     141           84 :         let (tx, rx) = tokio::sync::oneshot::channel::<()>();
     142           84 :         (Self { tx }, rx)
     143           84 :     }
     144              : 
     145           88 :     fn notify(self) {
     146           88 :         if self.tx.send(()).is_err() {
     147              :             // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
     148            0 :             debug!("deletion queue flush from dropped client");
     149           88 :         };
     150           88 :     }
     151              : }
     152              : 
     153              : #[derive(Clone, Debug)]
     154              : pub struct DeletionQueueClient {
     155              :     tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
     156              :     executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
     157              : 
     158              :     lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
     159              : }
     160              : 
     161           24 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     162              : struct TenantDeletionList {
     163              :     /// For each Timeline, a list of key fragments to append to the timeline remote path
     164              :     /// when reconstructing a full key
     165              :     timelines: HashMap<TimelineId, Vec<String>>,
     166              : 
     167              :     /// The generation in which this deletion was emitted: note that this may not be the
     168              :     /// same as the generation of any layers being deleted.  The generation of the layer
     169              :     /// has already been absorbed into the keys in `objects`
     170              :     generation: Generation,
     171              : }
     172              : 
     173              : impl TenantDeletionList {
     174           20 :     pub(crate) fn len(&self) -> usize {
     175           20 :         self.timelines.values().map(|v| v.len()).sum()
     176           20 :     }
     177              : }
     178              : 
     179              : /// Files ending with this suffix will be ignored and erased
     180              : /// during recovery as startup.
     181              : const TEMP_SUFFIX: &str = "tmp";
     182              : 
     183           48 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     184              : struct DeletionList {
     185              :     /// Serialization version, for future use
     186              :     version: u8,
     187              : 
     188              :     /// Used for constructing a unique key for each deletion list we write out.
     189              :     sequence: u64,
     190              : 
     191              :     /// To avoid repeating tenant/timeline IDs in every key, we store keys in
     192              :     /// nested HashMaps by TenantTimelineID.  Each Tenant only appears once
     193              :     /// with one unique generation ID: if someone tries to push a second generation
     194              :     /// ID for the same tenant, we will start a new DeletionList.
     195              :     tenants: HashMap<TenantShardId, TenantDeletionList>,
     196              : 
     197              :     /// Avoid having to walk `tenants` to calculate the number of keys in
     198              :     /// the nested deletion lists
     199              :     size: usize,
     200              : 
     201              :     /// Set to true when the list has undergone validation with the control
     202              :     /// plane and the remaining contents of `tenants` are valid.  A list may
     203              :     /// also be implicitly marked valid by DeletionHeader.validated_sequence
     204              :     /// advancing to >= DeletionList.sequence
     205              :     #[serde(default)]
     206              :     #[serde(skip_serializing_if = "std::ops::Not::not")]
     207              :     validated: bool,
     208              : }
     209              : 
     210            0 : #[derive(Debug, Serialize, Deserialize)]
     211              : struct DeletionHeader {
     212              :     /// Serialization version, for future use
     213              :     version: u8,
     214              : 
     215              :     /// The highest sequence number (inclusive) that has been validated.  All deletion
     216              :     /// lists on disk with a sequence <= this value are safe to execute.
     217              :     validated_sequence: u64,
     218              : }
     219              : 
     220              : impl DeletionHeader {
     221              :     const VERSION_LATEST: u8 = 1;
     222              : 
     223           16 :     fn new(validated_sequence: u64) -> Self {
     224           16 :         Self {
     225           16 :             version: Self::VERSION_LATEST,
     226           16 :             validated_sequence,
     227           16 :         }
     228           16 :     }
     229              : 
     230           16 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     231           16 :         debug!("Saving deletion list header {:?}", self);
     232           16 :         let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
     233           16 :         let header_path = conf.deletion_header_path();
     234           16 :         let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
     235           16 :         VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
     236           16 :             .await
     237           16 :             .maybe_fatal_err("save deletion header")?;
     238              : 
     239           16 :         Ok(())
     240           16 :     }
     241              : }
     242              : 
     243              : impl DeletionList {
     244              :     const VERSION_LATEST: u8 = 1;
     245           40 :     fn new(sequence: u64) -> Self {
     246           40 :         Self {
     247           40 :             version: Self::VERSION_LATEST,
     248           40 :             sequence,
     249           40 :             tenants: HashMap::new(),
     250           40 :             size: 0,
     251           40 :             validated: false,
     252           40 :         }
     253           40 :     }
     254              : 
     255           56 :     fn is_empty(&self) -> bool {
     256           56 :         self.tenants.is_empty()
     257           56 :     }
     258              : 
     259          120 :     fn len(&self) -> usize {
     260          120 :         self.size
     261          120 :     }
     262              : 
     263              :     /// Returns true if the push was accepted, false if the caller must start a new
     264              :     /// deletion list.
     265           28 :     fn push(
     266           28 :         &mut self,
     267           28 :         tenant: &TenantShardId,
     268           28 :         timeline: &TimelineId,
     269           28 :         generation: Generation,
     270           28 :         objects: &mut Vec<RemotePath>,
     271           28 :     ) -> bool {
     272           28 :         if objects.is_empty() {
     273              :             // Avoid inserting an empty TimelineDeletionList: this preserves the property
     274              :             // that if we have no keys, then self.objects is empty (used in Self::is_empty)
     275            0 :             return true;
     276           28 :         }
     277           28 : 
     278           28 :         let tenant_entry = self
     279           28 :             .tenants
     280           28 :             .entry(*tenant)
     281           28 :             .or_insert_with(|| TenantDeletionList {
     282           24 :                 timelines: HashMap::new(),
     283           24 :                 generation,
     284           28 :             });
     285           28 : 
     286           28 :         if tenant_entry.generation != generation {
     287              :             // Only one generation per tenant per list: signal to
     288              :             // caller to start a new list.
     289            4 :             return false;
     290           24 :         }
     291           24 : 
     292           24 :         let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
     293           24 : 
     294           24 :         let timeline_remote_path = remote_timeline_path(tenant, timeline);
     295           24 : 
     296           24 :         self.size += objects.len();
     297           24 :         timeline_entry.extend(objects.drain(..).map(|p| {
     298           24 :             p.strip_prefix(&timeline_remote_path)
     299           24 :                 .expect("Timeline paths always start with the timeline prefix")
     300           24 :                 .to_string()
     301           24 :         }));
     302           24 :         true
     303           28 :     }
     304              : 
     305           20 :     fn into_remote_paths(self) -> Vec<RemotePath> {
     306           20 :         let mut result = Vec::new();
     307           20 :         for (tenant, tenant_deletions) in self.tenants.into_iter() {
     308           12 :             for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
     309           12 :                 let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
     310           12 :                 result.extend(
     311           12 :                     timeline_layers
     312           12 :                         .into_iter()
     313           12 :                         .map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
     314           12 :                 );
     315           12 :             }
     316              :         }
     317              : 
     318           20 :         result
     319           20 :     }
     320              : 
     321           28 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     322           28 :         let path = conf.deletion_list_path(self.sequence);
     323           28 :         let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
     324           28 : 
     325           28 :         let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
     326           28 : 
     327           28 :         VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
     328           28 :             .await
     329           28 :             .maybe_fatal_err("save deletion list")
     330           28 :             .map_err(Into::into)
     331           28 :     }
     332              : }
     333              : 
     334              : impl std::fmt::Display for DeletionList {
     335            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     336            0 :         write!(
     337            0 :             f,
     338            0 :             "DeletionList<seq={}, tenants={}, keys={}>",
     339            0 :             self.sequence,
     340            0 :             self.tenants.len(),
     341            0 :             self.size
     342            0 :         )
     343            0 :     }
     344              : }
     345              : 
     346              : struct PendingLsn {
     347              :     projected: Lsn,
     348              :     result_slot: Arc<AtomicLsn>,
     349              : }
     350              : 
     351              : struct TenantLsnState {
     352              :     timelines: HashMap<TimelineId, PendingLsn>,
     353              : 
     354              :     // In what generation was the most recent update proposed?
     355              :     generation: Generation,
     356              : }
     357              : 
     358              : #[derive(Default)]
     359              : struct VisibleLsnUpdates {
     360              :     tenants: HashMap<TenantShardId, TenantLsnState>,
     361              : }
     362              : 
     363              : impl VisibleLsnUpdates {
     364          460 :     fn new() -> Self {
     365          460 :         Self {
     366          460 :             tenants: HashMap::new(),
     367          460 :         }
     368          460 :     }
     369              : }
     370              : 
     371              : impl std::fmt::Debug for VisibleLsnUpdates {
     372            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     373            0 :         write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
     374            0 :     }
     375              : }
     376              : 
     377              : #[derive(Error, Debug)]
     378              : pub enum DeletionQueueError {
     379              :     #[error("Deletion queue unavailable during shutdown")]
     380              :     ShuttingDown,
     381              : }
     382              : 
     383              : impl DeletionQueueClient {
     384              :     /// This is cancel-safe.  If you drop the future before it completes, the message
     385              :     /// is not pushed, although in the context of the deletion queue it doesn't matter: once
     386              :     /// we decide to do a deletion the decision is always final.
     387          737 :     fn do_push<T>(
     388          737 :         &self,
     389          737 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     390          737 :         msg: T,
     391          737 :     ) -> Result<(), DeletionQueueError> {
     392          737 :         match queue.send(msg) {
     393          737 :             Ok(_) => Ok(()),
     394            0 :             Err(e) => {
     395            0 :                 // This shouldn't happen, we should shut down all tenants before
     396            0 :                 // we shut down the global delete queue.  If we encounter a bug like this,
     397            0 :                 // we may leak objects as deletions won't be processed.
     398            0 :                 error!("Deletion queue closed while pushing, shutting down? ({e})");
     399            0 :                 Err(DeletionQueueError::ShuttingDown)
     400              :             }
     401              :         }
     402          737 :     }
     403              : 
     404           16 :     pub(crate) fn recover(
     405           16 :         &self,
     406           16 :         attached_tenants: HashMap<TenantShardId, Generation>,
     407           16 :     ) -> Result<(), DeletionQueueError> {
     408           16 :         self.do_push(
     409           16 :             &self.tx,
     410           16 :             ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
     411           16 :         )
     412           16 :     }
     413              : 
     414              :     /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
     415              :     /// world, it must validate its generation number before doing so.  Rather than do this synchronously,
     416              :     /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
     417              :     /// recently validated separately.
     418              :     ///
     419              :     /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates.  The
     420              :     /// backend will later wake up and notice that the tenant's generation requires validation.
     421         2965 :     pub(crate) async fn update_remote_consistent_lsn(
     422         2965 :         &self,
     423         2965 :         tenant_shard_id: TenantShardId,
     424         2965 :         timeline_id: TimelineId,
     425         2965 :         current_generation: Generation,
     426         2965 :         lsn: Lsn,
     427         2965 :         result_slot: Arc<AtomicLsn>,
     428         2965 :     ) {
     429         2965 :         let mut locked = self
     430         2965 :             .lsn_table
     431         2965 :             .write()
     432         2965 :             .expect("Lock should never be poisoned");
     433         2965 : 
     434         2965 :         let tenant_entry = locked
     435         2965 :             .tenants
     436         2965 :             .entry(tenant_shard_id)
     437         2965 :             .or_insert(TenantLsnState {
     438         2965 :                 timelines: HashMap::new(),
     439         2965 :                 generation: current_generation,
     440         2965 :             });
     441         2965 : 
     442         2965 :         if tenant_entry.generation != current_generation {
     443            0 :             // Generation might have changed if we were detached and then re-attached: in this case,
     444            0 :             // state from the previous generation cannot be trusted.
     445            0 :             tenant_entry.timelines.clear();
     446            0 :             tenant_entry.generation = current_generation;
     447         2965 :         }
     448              : 
     449         2965 :         tenant_entry.timelines.insert(
     450         2965 :             timeline_id,
     451         2965 :             PendingLsn {
     452         2965 :                 projected: lsn,
     453         2965 :                 result_slot,
     454         2965 :             },
     455         2965 :         );
     456         2965 :     }
     457              : 
     458              :     /// Submit a list of layers for deletion: this function will return before the deletion is
     459              :     /// persistent, but it may be executed at any time after this function enters: do not push
     460              :     /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
     461              :     /// references them).
     462              :     ///
     463              :     /// The `current_generation` is the generation of this pageserver's current attachment.  The
     464              :     /// generations in `layers` are the generations in which those layers were written.
     465          673 :     pub(crate) fn push_layers(
     466          673 :         &self,
     467          673 :         tenant_shard_id: TenantShardId,
     468          673 :         timeline_id: TimelineId,
     469          673 :         current_generation: Generation,
     470          673 :         layers: Vec<(LayerName, LayerFileMetadata)>,
     471          673 :     ) -> Result<(), DeletionQueueError> {
     472          673 :         // None generations are not valid for attached tenants: they must always be attached in
     473          673 :         // a known generation.  None generations are still permitted for layers in the index because
     474          673 :         // they may be historical.
     475          673 :         assert!(!current_generation.is_none());
     476              : 
     477          673 :         metrics::DELETION_QUEUE
     478          673 :             .keys_submitted
     479          673 :             .inc_by(layers.len() as u64);
     480          673 :         self.do_push(
     481          673 :             &self.tx,
     482          673 :             ListWriterQueueMessage::Delete(DeletionOp {
     483          673 :                 tenant_shard_id,
     484          673 :                 timeline_id,
     485          673 :                 layers,
     486          673 :                 generation: current_generation,
     487          673 :                 objects: Vec::new(),
     488          673 :             }),
     489          673 :         )
     490          673 :     }
     491              : 
     492              :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     493           48 :     async fn do_flush<T>(
     494           48 :         &self,
     495           48 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     496           48 :         msg: T,
     497           48 :         rx: tokio::sync::oneshot::Receiver<()>,
     498           48 :     ) -> Result<(), DeletionQueueError> {
     499           48 :         self.do_push(queue, msg)?;
     500           48 :         if rx.await.is_err() {
     501              :             // This shouldn't happen if tenants are shut down before deletion queue.  If we
     502              :             // encounter a bug like this, then a flusher will incorrectly believe it has flushed
     503              :             // when it hasn't, possibly leading to leaking objects.
     504            0 :             error!("Deletion queue dropped flush op while client was still waiting");
     505            0 :             Err(DeletionQueueError::ShuttingDown)
     506              :         } else {
     507           48 :             Ok(())
     508              :         }
     509           48 :     }
     510              : 
     511              :     /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
     512              :     ///
     513              :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     514           28 :     pub async fn flush(&self) -> Result<(), DeletionQueueError> {
     515           28 :         let (flush_op, rx) = FlushOp::new();
     516           28 :         self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
     517           28 :             .await
     518           28 :     }
     519              : 
     520              :     /// Issue a flush without waiting for it to complete.  This is useful on advisory flushes where
     521              :     /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
     522              :     /// detach where flushing is nice but not necessary.
     523              :     ///
     524              :     /// This function provides no guarantees of work being done.
     525            0 :     pub fn flush_advisory(&self) {
     526            0 :         let (flush_op, _) = FlushOp::new();
     527            0 : 
     528            0 :         // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
     529            0 :         drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
     530            0 :     }
     531              : 
     532              :     // Wait until all previous deletions are executed
     533           20 :     pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
     534           20 :         debug!("flush_execute: flushing to deletion lists...");
     535              :         // Flush any buffered work to deletion lists
     536           20 :         self.flush().await?;
     537              : 
     538              :         // Flush the backend into the executor of deletion lists
     539           20 :         let (flush_op, rx) = FlushOp::new();
     540           20 :         debug!("flush_execute: flushing backend...");
     541           20 :         self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
     542           20 :             .await?;
     543           20 :         debug!("flush_execute: finished flushing backend...");
     544              : 
     545              :         // Flush any immediate-mode deletions (the above backend flush will only flush
     546              :         // the executor if deletions had flowed through the backend)
     547           20 :         debug!("flush_execute: flushing execution...");
     548           20 :         self.flush_immediate().await?;
     549           20 :         debug!("flush_execute: finished flushing execution...");
     550           20 :         Ok(())
     551           20 :     }
     552              : 
     553              :     /// This interface bypasses the persistent deletion queue, and any validation
     554              :     /// that this pageserver is still elegible to execute the deletions.  It is for
     555              :     /// use in timeline deletions, where the control plane is telling us we may
     556              :     /// delete everything in the timeline.
     557              :     ///
     558              :     /// DO NOT USE THIS FROM GC OR COMPACTION CODE.  Use the regular `push_layers`.
     559            0 :     pub(crate) async fn push_immediate(
     560            0 :         &self,
     561            0 :         objects: Vec<RemotePath>,
     562            0 :     ) -> Result<(), DeletionQueueError> {
     563            0 :         metrics::DELETION_QUEUE
     564            0 :             .keys_submitted
     565            0 :             .inc_by(objects.len() as u64);
     566            0 :         self.executor_tx
     567            0 :             .send(DeleterMessage::Delete(objects))
     568            0 :             .await
     569            0 :             .map_err(|_| DeletionQueueError::ShuttingDown)
     570            0 :     }
     571              : 
     572              :     /// Companion to push_immediate.  When this returns Ok, all prior objects sent
     573              :     /// into push_immediate have been deleted from remote storage.
     574           20 :     pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
     575           20 :         let (flush_op, rx) = FlushOp::new();
     576           20 :         self.executor_tx
     577           20 :             .send(DeleterMessage::Flush(flush_op))
     578           20 :             .await
     579           20 :             .map_err(|_| DeletionQueueError::ShuttingDown)?;
     580              : 
     581           20 :         rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
     582           20 :     }
     583              : }
     584              : 
     585              : impl DeletionQueue {
     586           16 :     pub fn new_client(&self) -> DeletionQueueClient {
     587           16 :         self.client.clone()
     588           16 :     }
     589              : 
     590              :     /// Caller may use the returned object to construct clients with new_client.
     591              :     /// Caller should tokio::spawn the background() members of the two worker objects returned:
     592              :     /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
     593           16 :     pub fn new<C>(
     594           16 :         remote_storage: GenericRemoteStorage,
     595           16 :         controller_upcall_client: Option<C>,
     596           16 :         conf: &'static PageServerConf,
     597           16 :     ) -> (Self, DeletionQueueWorkers<C>)
     598           16 :     where
     599           16 :         C: ControlPlaneGenerationsApi + Send + Sync,
     600           16 :     {
     601           16 :         // Unbounded channel: enables non-async functions to submit deletions.  The actual length is
     602           16 :         // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
     603           16 :         // enough to avoid this taking pathologically large amount of memory.
     604           16 :         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
     605           16 : 
     606           16 :         // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
     607           16 :         let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
     608           16 : 
     609           16 :         // Shallow channel: it carries lists of paths, and we expect the main queueing to
     610           16 :         // happen in the backend (persistent), not in this queue.
     611           16 :         let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
     612           16 : 
     613           16 :         let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
     614           16 : 
     615           16 :         // The deletion queue has an independent cancellation token to
     616           16 :         // the general pageserver shutdown token, because it stays alive a bit
     617           16 :         // longer to flush after Tenants have all been torn down.
     618           16 :         let cancel = CancellationToken::new();
     619           16 : 
     620           16 :         (
     621           16 :             Self {
     622           16 :                 client: DeletionQueueClient {
     623           16 :                     tx,
     624           16 :                     executor_tx: executor_tx.clone(),
     625           16 :                     lsn_table: lsn_table.clone(),
     626           16 :                 },
     627           16 :                 cancel: cancel.clone(),
     628           16 :             },
     629           16 :             DeletionQueueWorkers {
     630           16 :                 frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
     631           16 :                 backend: Validator::new(
     632           16 :                     conf,
     633           16 :                     backend_rx,
     634           16 :                     executor_tx,
     635           16 :                     controller_upcall_client,
     636           16 :                     lsn_table.clone(),
     637           16 :                     cancel.clone(),
     638           16 :                 ),
     639           16 :                 executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
     640           16 :             },
     641           16 :         )
     642           16 :     }
     643              : 
     644            0 :     pub async fn shutdown(&mut self, timeout: Duration) {
     645            0 :         match tokio::time::timeout(timeout, self.client.flush()).await {
     646              :             Ok(Ok(())) => {
     647            0 :                 tracing::info!("Deletion queue flushed successfully on shutdown")
     648              :             }
     649              :             Ok(Err(DeletionQueueError::ShuttingDown)) => {
     650              :                 // This is not harmful for correctness, but is unexpected: the deletion
     651              :                 // queue's workers should stay alive as long as there are any client handles instantiated.
     652            0 :                 tracing::warn!("Deletion queue stopped prematurely");
     653              :             }
     654            0 :             Err(_timeout) => {
     655            0 :                 tracing::warn!("Timed out flushing deletion queue on shutdown")
     656              :             }
     657              :         }
     658              : 
     659              :         // We only cancel _after_ flushing: otherwise we would be shutting down the
     660              :         // components that do the flush.
     661            0 :         self.cancel.cancel();
     662            0 :     }
     663              : }
     664              : 
     665              : #[cfg(test)]
     666              : mod test {
     667              :     use camino::Utf8Path;
     668              :     use hex_literal::hex;
     669              :     use pageserver_api::{key::Key, shard::ShardIndex, upcall_api::ReAttachResponseTenant};
     670              :     use std::{io::ErrorKind, time::Duration};
     671              :     use tracing::info;
     672              : 
     673              :     use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
     674              :     use tokio::task::JoinHandle;
     675              : 
     676              :     use crate::{
     677              :         controller_upcall_client::RetryForeverError,
     678              :         tenant::{harness::TenantHarness, storage_layer::DeltaLayerName},
     679              :     };
     680              : 
     681              :     use super::*;
     682              :     pub const TIMELINE_ID: TimelineId =
     683              :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
     684              : 
     685              :     pub const EXAMPLE_LAYER_NAME: LayerName = LayerName::Delta(DeltaLayerName {
     686              :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     687              :         lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
     688              :     });
     689              : 
     690              :     // When you need a second layer in a test.
     691              :     pub const EXAMPLE_LAYER_NAME_ALT: LayerName = LayerName::Delta(DeltaLayerName {
     692              :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     693              :         lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
     694              :     });
     695              : 
     696              :     struct TestSetup {
     697              :         harness: TenantHarness,
     698              :         remote_fs_dir: Utf8PathBuf,
     699              :         storage: GenericRemoteStorage,
     700              :         mock_control_plane: MockControlPlane,
     701              :         deletion_queue: DeletionQueue,
     702              :         worker_join: JoinHandle<()>,
     703              :     }
     704              : 
     705              :     impl TestSetup {
     706              :         /// Simulate a pageserver restart by destroying and recreating the deletion queue
     707            4 :         async fn restart(&mut self) {
     708            4 :             let (deletion_queue, workers) = DeletionQueue::new(
     709            4 :                 self.storage.clone(),
     710            4 :                 Some(self.mock_control_plane.clone()),
     711            4 :                 self.harness.conf,
     712            4 :             );
     713            4 : 
     714            4 :             tracing::debug!("Spawning worker for new queue queue");
     715            4 :             let worker_join = workers.spawn_with(&tokio::runtime::Handle::current());
     716            4 : 
     717            4 :             let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
     718            4 :             let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
     719            4 : 
     720            4 :             tracing::debug!("Joining worker from previous queue");
     721            4 :             old_deletion_queue.cancel.cancel();
     722            4 :             old_worker_join
     723            4 :                 .await
     724            4 :                 .expect("Failed to join workers for previous deletion queue");
     725            4 :         }
     726              : 
     727           12 :         fn set_latest_generation(&self, gen: Generation) {
     728           12 :             let tenant_shard_id = self.harness.tenant_shard_id;
     729           12 :             self.mock_control_plane
     730           12 :                 .latest_generation
     731           12 :                 .lock()
     732           12 :                 .unwrap()
     733           12 :                 .insert(tenant_shard_id, gen);
     734           12 :         }
     735              : 
     736              :         /// Returns remote layer file name, suitable for use in assert_remote_files
     737           12 :         fn write_remote_layer(
     738           12 :             &self,
     739           12 :             file_name: LayerName,
     740           12 :             gen: Generation,
     741           12 :         ) -> anyhow::Result<String> {
     742           12 :             let tenant_shard_id = self.harness.tenant_shard_id;
     743           12 :             let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     744           12 :             let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
     745           12 :             std::fs::create_dir_all(&remote_timeline_path)?;
     746           12 :             let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
     747           12 : 
     748           12 :             let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
     749           12 : 
     750           12 :             std::fs::write(
     751           12 :                 remote_timeline_path.join(remote_layer_file_name.clone()),
     752           12 :                 content,
     753           12 :             )?;
     754              : 
     755           12 :             Ok(remote_layer_file_name)
     756           12 :         }
     757              :     }
     758              : 
     759              :     #[derive(Debug, Clone)]
     760              :     struct MockControlPlane {
     761              :         pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantShardId, Generation>>>,
     762              :     }
     763              : 
     764              :     impl MockControlPlane {
     765           12 :         fn new() -> Self {
     766           12 :             Self {
     767           12 :                 latest_generation: Arc::default(),
     768           12 :             }
     769           12 :         }
     770              :     }
     771              : 
     772              :     impl ControlPlaneGenerationsApi for MockControlPlane {
     773            0 :         async fn re_attach(
     774            0 :             &self,
     775            0 :             _conf: &PageServerConf,
     776            0 :         ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
     777            0 :             unimplemented!()
     778              :         }
     779              : 
     780           16 :         async fn validate(
     781           16 :             &self,
     782           16 :             tenants: Vec<(TenantShardId, Generation)>,
     783           16 :         ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     784           16 :             let mut result = HashMap::new();
     785           16 : 
     786           16 :             let latest_generation = self.latest_generation.lock().unwrap();
     787              : 
     788           32 :             for (tenant_shard_id, generation) in tenants {
     789           16 :                 if let Some(latest) = latest_generation.get(&tenant_shard_id) {
     790           16 :                     result.insert(tenant_shard_id, *latest == generation);
     791           16 :                 }
     792              :             }
     793              : 
     794           16 :             Ok(result)
     795           16 :         }
     796              :     }
     797              : 
     798           12 :     async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
     799           12 :         let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
     800           12 :         let harness = TenantHarness::create(test_name).await?;
     801              : 
     802              :         // We do not load() the harness: we only need its config and remote_storage
     803              : 
     804              :         // Set up a GenericRemoteStorage targetting a directory
     805           12 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs");
     806           12 :         std::fs::create_dir_all(remote_fs_dir)?;
     807           12 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
     808           12 :         let storage_config = RemoteStorageConfig {
     809           12 :             storage: RemoteStorageKind::LocalFs {
     810           12 :                 local_path: remote_fs_dir.clone(),
     811           12 :             },
     812           12 :             timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     813           12 :             small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
     814           12 :         };
     815           12 :         let storage = GenericRemoteStorage::from_config(&storage_config)
     816           12 :             .await
     817           12 :             .unwrap();
     818           12 : 
     819           12 :         let mock_control_plane = MockControlPlane::new();
     820           12 : 
     821           12 :         let (deletion_queue, worker) = DeletionQueue::new(
     822           12 :             storage.clone(),
     823           12 :             Some(mock_control_plane.clone()),
     824           12 :             harness.conf,
     825           12 :         );
     826           12 : 
     827           12 :         let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
     828           12 : 
     829           12 :         Ok(TestSetup {
     830           12 :             harness,
     831           12 :             remote_fs_dir,
     832           12 :             storage,
     833           12 :             mock_control_plane,
     834           12 :             deletion_queue,
     835           12 :             worker_join,
     836           12 :         })
     837           12 :     }
     838              : 
     839              :     // TODO: put this in a common location so that we can share with remote_timeline_client's tests
     840           36 :     fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
     841           36 :         let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
     842           36 :         expected.sort();
     843           36 : 
     844           36 :         let mut found: Vec<String> = Vec::new();
     845           36 :         let dir = match std::fs::read_dir(remote_path) {
     846           36 :             Ok(d) => d,
     847            0 :             Err(e) => {
     848            0 :                 if e.kind() == ErrorKind::NotFound {
     849            0 :                     if expected.is_empty() {
     850              :                         // We are asserting prefix is empty: it is expected that the dir is missing
     851            0 :                         return;
     852              :                     } else {
     853            0 :                         assert_eq!(expected, Vec::<String>::new());
     854            0 :                         unreachable!();
     855              :                     }
     856              :                 } else {
     857            0 :                     panic!("Unexpected error listing {remote_path}: {e}");
     858              :                 }
     859              :             }
     860              :         };
     861              : 
     862           36 :         for entry in dir.flatten() {
     863           32 :             let entry_name = entry.file_name();
     864           32 :             let fname = entry_name.to_str().unwrap();
     865           32 :             found.push(String::from(fname));
     866           32 :         }
     867           36 :         found.sort();
     868           36 : 
     869           36 :         assert_eq!(expected, found);
     870           36 :     }
     871              : 
     872           20 :     fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
     873           20 :         let dir = match std::fs::read_dir(directory) {
     874           16 :             Ok(d) => d,
     875              :             Err(_) => {
     876            4 :                 assert_eq!(expected, &Vec::<String>::new());
     877            4 :                 return;
     878              :             }
     879              :         };
     880           16 :         let mut found = Vec::new();
     881           36 :         for dentry in dir {
     882           20 :             let dentry = dentry.unwrap();
     883           20 :             let file_name = dentry.file_name();
     884           20 :             let file_name_str = file_name.to_string_lossy();
     885           20 :             found.push(file_name_str.to_string());
     886           20 :         }
     887           16 :         found.sort();
     888           16 :         assert_eq!(expected, found);
     889           20 :     }
     890              : 
     891              :     #[tokio::test]
     892            4 :     async fn deletion_queue_smoke() -> anyhow::Result<()> {
     893            4 :         // Basic test that the deletion queue processes the deletions we pass into it
     894            4 :         let ctx = setup("deletion_queue_smoke")
     895            4 :             .await
     896            4 :             .expect("Failed test setup");
     897            4 :         let client = ctx.deletion_queue.new_client();
     898            4 :         client.recover(HashMap::new())?;
     899            4 : 
     900            4 :         let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
     901            4 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
     902            4 : 
     903            4 :         let content: Vec<u8> = "victim1 contents".into();
     904            4 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     905            4 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
     906            4 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
     907            4 : 
     908            4 :         // Exercise the distinction between the generation of the layers
     909            4 :         // we delete, and the generation of the running Tenant.
     910            4 :         let layer_generation = Generation::new(0xdeadbeef);
     911            4 :         let now_generation = Generation::new(0xfeedbeef);
     912            4 :         let layer_metadata =
     913            4 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
     914            4 : 
     915            4 :         let remote_layer_file_name_1 =
     916            4 :             format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
     917            4 : 
     918            4 :         // Set mock control plane state to valid for our generation
     919            4 :         ctx.set_latest_generation(now_generation);
     920            4 : 
     921            4 :         // Inject a victim file to remote storage
     922            4 :         info!("Writing");
     923            4 :         std::fs::create_dir_all(&remote_timeline_path)?;
     924            4 :         std::fs::write(
     925            4 :             remote_timeline_path.join(remote_layer_file_name_1.clone()),
     926            4 :             content,
     927            4 :         )?;
     928            4 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     929            4 : 
     930            4 :         // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
     931            4 :         info!("Pushing");
     932            4 :         client.push_layers(
     933            4 :             tenant_shard_id,
     934            4 :             TIMELINE_ID,
     935            4 :             now_generation,
     936            4 :             [(layer_file_name_1.clone(), layer_metadata)].to_vec(),
     937            4 :         )?;
     938            4 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     939            4 : 
     940            4 :         assert_local_files(&[], &deletion_prefix);
     941            4 : 
     942            4 :         // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
     943            4 :         info!("Flushing");
     944            4 :         client.flush().await?;
     945            4 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     946            4 :         assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
     947            4 : 
     948            4 :         // File should go away when we execute
     949            4 :         info!("Flush-executing");
     950            4 :         client.flush_execute().await?;
     951            4 :         assert_remote_files(&[], &remote_timeline_path);
     952            4 :         assert_local_files(&["header-01"], &deletion_prefix);
     953            4 : 
     954            4 :         // Flushing on an empty queue should succeed immediately, and not write any lists
     955            4 :         info!("Flush-executing on empty");
     956            4 :         client.flush_execute().await?;
     957            4 :         assert_local_files(&["header-01"], &deletion_prefix);
     958            4 : 
     959            4 :         Ok(())
     960            4 :     }
     961              : 
     962              :     #[tokio::test]
     963            4 :     async fn deletion_queue_validation() -> anyhow::Result<()> {
     964            4 :         let ctx = setup("deletion_queue_validation")
     965            4 :             .await
     966            4 :             .expect("Failed test setup");
     967            4 :         let client = ctx.deletion_queue.new_client();
     968            4 :         client.recover(HashMap::new())?;
     969            4 : 
     970            4 :         // Generation that the control plane thinks is current
     971            4 :         let latest_generation = Generation::new(0xdeadbeef);
     972            4 :         // Generation that our DeletionQueue thinks the tenant is running with
     973            4 :         let stale_generation = latest_generation.previous();
     974            4 :         // Generation that our example layer file was written with
     975            4 :         let layer_generation = stale_generation.previous();
     976            4 :         let layer_metadata =
     977            4 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
     978            4 : 
     979            4 :         ctx.set_latest_generation(latest_generation);
     980            4 : 
     981            4 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
     982            4 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     983            4 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
     984            4 : 
     985            4 :         // Initial state: a remote layer exists
     986            4 :         let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
     987            4 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
     988            4 : 
     989            4 :         tracing::debug!("Pushing...");
     990            4 :         client.push_layers(
     991            4 :             tenant_shard_id,
     992            4 :             TIMELINE_ID,
     993            4 :             stale_generation,
     994            4 :             [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
     995            4 :         )?;
     996            4 : 
     997            4 :         // We enqueued the operation in a stale generation: it should have failed validation
     998            4 :         tracing::debug!("Flushing...");
     999            4 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1000            4 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
    1001            4 : 
    1002            4 :         tracing::debug!("Pushing...");
    1003            4 :         client.push_layers(
    1004            4 :             tenant_shard_id,
    1005            4 :             TIMELINE_ID,
    1006            4 :             latest_generation,
    1007            4 :             [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1008            4 :         )?;
    1009            4 : 
    1010            4 :         // We enqueued the operation in a fresh generation: it should have passed validation
    1011            4 :         tracing::debug!("Flushing...");
    1012            4 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1013            4 :         assert_remote_files(&[], &remote_timeline_path);
    1014            4 : 
    1015            4 :         Ok(())
    1016            4 :     }
    1017              : 
    1018              :     #[tokio::test]
    1019            4 :     async fn deletion_queue_recovery() -> anyhow::Result<()> {
    1020            4 :         // Basic test that the deletion queue processes the deletions we pass into it
    1021            4 :         let mut ctx = setup("deletion_queue_recovery")
    1022            4 :             .await
    1023            4 :             .expect("Failed test setup");
    1024            4 :         let client = ctx.deletion_queue.new_client();
    1025            4 :         client.recover(HashMap::new())?;
    1026            4 : 
    1027            4 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
    1028            4 : 
    1029            4 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
    1030            4 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
    1031            4 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
    1032            4 : 
    1033            4 :         let layer_generation = Generation::new(0xdeadbeef);
    1034            4 :         let now_generation = Generation::new(0xfeedbeef);
    1035            4 :         let layer_metadata =
    1036            4 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
    1037            4 : 
    1038            4 :         // Inject a deletion in the generation before generation_now: after restart,
    1039            4 :         // this deletion should _not_ get executed (only the immediately previous
    1040            4 :         // generation gets that treatment)
    1041            4 :         let remote_layer_file_name_historical =
    1042            4 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
    1043            4 :         client.push_layers(
    1044            4 :             tenant_shard_id,
    1045            4 :             TIMELINE_ID,
    1046            4 :             now_generation.previous(),
    1047            4 :             [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1048            4 :         )?;
    1049            4 : 
    1050            4 :         // Inject a deletion in the generation before generation_now: after restart,
    1051            4 :         // this deletion should get executed, because we execute deletions in the
    1052            4 :         // immediately previous generation on the same node.
    1053            4 :         let remote_layer_file_name_previous =
    1054            4 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
    1055            4 :         client.push_layers(
    1056            4 :             tenant_shard_id,
    1057            4 :             TIMELINE_ID,
    1058            4 :             now_generation,
    1059            4 :             [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
    1060            4 :         )?;
    1061            4 : 
    1062            4 :         client.flush().await?;
    1063            4 :         assert_remote_files(
    1064            4 :             &[
    1065            4 :                 &remote_layer_file_name_historical,
    1066            4 :                 &remote_layer_file_name_previous,
    1067            4 :             ],
    1068            4 :             &remote_timeline_path,
    1069            4 :         );
    1070            4 : 
    1071            4 :         // Different generatinos for the same tenant will cause two separate
    1072            4 :         // deletion lists to be emitted.
    1073            4 :         assert_local_files(
    1074            4 :             &["0000000000000001-01.list", "0000000000000002-01.list"],
    1075            4 :             &deletion_prefix,
    1076            4 :         );
    1077            4 : 
    1078            4 :         // Simulate a node restart: the latest generation advances
    1079            4 :         let now_generation = now_generation.next();
    1080            4 :         ctx.set_latest_generation(now_generation);
    1081            4 : 
    1082            4 :         // Restart the deletion queue
    1083            4 :         drop(client);
    1084            4 :         ctx.restart().await;
    1085            4 :         let client = ctx.deletion_queue.new_client();
    1086            4 :         client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
    1087            4 : 
    1088            4 :         info!("Flush-executing");
    1089            4 :         client.flush_execute().await?;
    1090            4 :         // The deletion from immediately prior generation was executed, the one from
    1091            4 :         // an older generation was not.
    1092            4 :         assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
    1093            4 :         Ok(())
    1094            4 :     }
    1095              : }
    1096              : 
    1097              : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
    1098              : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
    1099              : #[cfg(test)]
    1100              : pub(crate) mod mock {
    1101              :     use tracing::info;
    1102              : 
    1103              :     use super::*;
    1104              :     use crate::tenant::remote_timeline_client::remote_layer_path;
    1105              :     use std::sync::atomic::{AtomicUsize, Ordering};
    1106              : 
    1107              :     pub struct ConsumerState {
    1108              :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
    1109              :         executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
    1110              :         cancel: CancellationToken,
    1111              :         executed: Arc<AtomicUsize>,
    1112              :     }
    1113              : 
    1114              :     impl ConsumerState {
    1115          444 :         async fn consume(&mut self, remote_storage: &GenericRemoteStorage) {
    1116          444 :             info!("Executing all pending deletions");
    1117              : 
    1118              :             // Transform all executor messages to generic frontend messages
    1119         1019 :             loop {
    1120         1019 :                 use either::Either;
    1121         1019 :                 let msg = tokio::select! {
    1122         1019 :                     left = self.executor_rx.recv() => Either::Left(left),
    1123         1019 :                     right = self.rx.recv() => Either::Right(right),
    1124              :                 };
    1125            4 :                 match msg {
    1126            0 :                     Either::Left(None) => break,
    1127            0 :                     Either::Right(None) => break,
    1128            0 :                     Either::Left(Some(DeleterMessage::Delete(objects))) => {
    1129            0 :                         for path in objects {
    1130            0 :                             match remote_storage.delete(&path, &self.cancel).await {
    1131              :                                 Ok(_) => {
    1132            0 :                                     debug!("Deleted {path}");
    1133              :                                 }
    1134            0 :                                 Err(e) => {
    1135            0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1136              :                                 }
    1137              :                             }
    1138            0 :                             self.executed.fetch_add(1, Ordering::Relaxed);
    1139              :                         }
    1140              :                     }
    1141            4 :                     Either::Left(Some(DeleterMessage::Flush(flush_op))) => {
    1142            4 :                         flush_op.notify();
    1143            4 :                     }
    1144          578 :                     Either::Right(Some(ListWriterQueueMessage::Delete(op))) => {
    1145          578 :                         let mut objects = op.objects;
    1146         1156 :                         for (layer, meta) in op.layers {
    1147          578 :                             objects.push(remote_layer_path(
    1148          578 :                                 &op.tenant_shard_id.tenant_id,
    1149          578 :                                 &op.timeline_id,
    1150          578 :                                 meta.shard,
    1151          578 :                                 &layer,
    1152          578 :                                 meta.generation,
    1153          578 :                             ));
    1154          578 :                         }
    1155              : 
    1156         1149 :                         for path in objects {
    1157          578 :                             info!("Executing deletion {path}");
    1158          578 :                             match remote_storage.delete(&path, &self.cancel).await {
    1159              :                                 Ok(_) => {
    1160          571 :                                     debug!("Deleted {path}");
    1161              :                                 }
    1162            0 :                                 Err(e) => {
    1163            0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1164              :                                 }
    1165              :                             }
    1166          571 :                             self.executed.fetch_add(1, Ordering::Relaxed);
    1167              :                         }
    1168              :                     }
    1169            0 :                     Either::Right(Some(ListWriterQueueMessage::Flush(op))) => {
    1170            0 :                         op.notify();
    1171            0 :                     }
    1172            0 :                     Either::Right(Some(ListWriterQueueMessage::FlushExecute(op))) => {
    1173            0 :                         // We have already executed all prior deletions because mock does them inline
    1174            0 :                         op.notify();
    1175            0 :                     }
    1176            0 :                     Either::Right(Some(ListWriterQueueMessage::Recover(_))) => {
    1177            0 :                         // no-op in mock
    1178            0 :                     }
    1179              :                 }
    1180              :             }
    1181            0 :         }
    1182              :     }
    1183              : 
    1184              :     pub struct MockDeletionQueue {
    1185              :         tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
    1186              :         executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
    1187              :         lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
    1188              :     }
    1189              : 
    1190              :     impl MockDeletionQueue {
    1191          444 :         pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
    1192          444 :             let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    1193          444 :             let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
    1194          444 : 
    1195          444 :             let executed = Arc::new(AtomicUsize::new(0));
    1196          444 : 
    1197          444 :             let mut consumer = ConsumerState {
    1198          444 :                 rx,
    1199          444 :                 executor_rx,
    1200          444 :                 cancel: CancellationToken::new(),
    1201          444 :                 executed: executed.clone(),
    1202          444 :             };
    1203          444 : 
    1204          444 :             tokio::spawn(async move {
    1205          444 :                 if let Some(remote_storage) = &remote_storage {
    1206          444 :                     consumer.consume(remote_storage).await;
    1207            0 :                 }
    1208          444 :             });
    1209          444 : 
    1210          444 :             Self {
    1211          444 :                 tx,
    1212          444 :                 executor_tx,
    1213          444 :                 lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
    1214          444 :             }
    1215          444 :         }
    1216              : 
    1217              :         #[allow(clippy::await_holding_lock)]
    1218            4 :         pub async fn pump(&self) {
    1219            4 :             let (tx, rx) = tokio::sync::oneshot::channel();
    1220            4 :             self.executor_tx
    1221            4 :                 .send(DeleterMessage::Flush(FlushOp { tx }))
    1222            4 :                 .await
    1223            4 :                 .expect("Failed to send flush message");
    1224            4 :             rx.await.ok();
    1225            4 :         }
    1226              : 
    1227          464 :         pub(crate) fn new_client(&self) -> DeletionQueueClient {
    1228          464 :             DeletionQueueClient {
    1229          464 :                 tx: self.tx.clone(),
    1230          464 :                 executor_tx: self.executor_tx.clone(),
    1231          464 :                 lsn_table: self.lsn_table.clone(),
    1232          464 :             }
    1233          464 :         }
    1234              :     }
    1235              : 
    1236              :     /// Test round-trip serialization/deserialization, and test stability of the format
    1237              :     /// vs. a static expected string for the serialized version.
    1238              :     #[test]
    1239            4 :     fn deletion_list_serialization() -> anyhow::Result<()> {
    1240            4 :         let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
    1241            4 :             .to_string()
    1242            4 :             .parse::<TenantShardId>()?;
    1243            4 :         let timeline_id = "be322c834ed9e709e63b5c9698691910"
    1244            4 :             .to_string()
    1245            4 :             .parse::<TimelineId>()?;
    1246            4 :         let generation = Generation::new(123);
    1247              : 
    1248            4 :         let object =
    1249            4 :             RemotePath::from_string(&format!("tenants/{tenant_id}/timelines/{timeline_id}/foo"))?;
    1250            4 :         let mut objects = [object].to_vec();
    1251            4 : 
    1252            4 :         let mut example = DeletionList::new(1);
    1253            4 :         example.push(&tenant_id, &timeline_id, generation, &mut objects);
    1254              : 
    1255            4 :         let encoded = serde_json::to_string(&example)?;
    1256              : 
    1257            4 :         let expected = "{\"version\":1,\"sequence\":1,\"tenants\":{\"ad6c1a56f5680419d3a16ff55d97ec3c\":{\"timelines\":{\"be322c834ed9e709e63b5c9698691910\":[\"foo\"]},\"generation\":123}},\"size\":1}".to_string();
    1258            4 :         assert_eq!(encoded, expected);
    1259              : 
    1260            4 :         let decoded = serde_json::from_str::<DeletionList>(&encoded)?;
    1261            4 :         assert_eq!(example, decoded);
    1262              : 
    1263            4 :         Ok(())
    1264            4 :     }
    1265              : }
        

Generated by: LCOV version 2.1-beta