LCOV - code coverage report
Current view: top level - pageserver/src - deletion_queue.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 87.3 % 829 724
Test Date: 2024-09-24 13:57:57 Functions: 56.8 % 132 75

            Line data    Source code
       1              : mod deleter;
       2              : mod list_writer;
       3              : mod validator;
       4              : 
       5              : use std::collections::HashMap;
       6              : use std::sync::Arc;
       7              : use std::time::Duration;
       8              : 
       9              : use crate::control_plane_client::ControlPlaneGenerationsApi;
      10              : use crate::metrics;
      11              : use crate::tenant::remote_timeline_client::remote_layer_path;
      12              : use crate::tenant::remote_timeline_client::remote_timeline_path;
      13              : use crate::tenant::remote_timeline_client::LayerFileMetadata;
      14              : use crate::virtual_file::MaybeFatalIo;
      15              : use crate::virtual_file::VirtualFile;
      16              : use anyhow::Context;
      17              : use camino::Utf8PathBuf;
      18              : use pageserver_api::shard::TenantShardId;
      19              : use remote_storage::{GenericRemoteStorage, RemotePath};
      20              : use serde::Deserialize;
      21              : use serde::Serialize;
      22              : use thiserror::Error;
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::Instrument;
      25              : use tracing::{debug, error};
      26              : use utils::crashsafe::path_with_suffix_extension;
      27              : use utils::generation::Generation;
      28              : use utils::id::TimelineId;
      29              : use utils::lsn::AtomicLsn;
      30              : use utils::lsn::Lsn;
      31              : 
      32              : use self::deleter::Deleter;
      33              : use self::list_writer::DeletionOp;
      34              : use self::list_writer::ListWriter;
      35              : use self::list_writer::RecoverOp;
      36              : use self::validator::Validator;
      37              : use deleter::DeleterMessage;
      38              : use list_writer::ListWriterQueueMessage;
      39              : use validator::ValidatorQueueMessage;
      40              : 
      41              : use crate::{config::PageServerConf, tenant::storage_layer::LayerName};
      42              : 
      43              : // TODO: configurable for how long to wait before executing deletions
      44              : 
      45              : /// We aggregate object deletions from many tenants in one place, for several reasons:
      46              : /// - Coalesce deletions into fewer DeleteObjects calls
      47              : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
      48              : ///   to flush any outstanding deletions.
      49              : /// - Globally control throughput of deletions, as these are a low priority task: do
      50              : ///   not compete with the same S3 clients/connections used for higher priority uploads.
      51              : /// - Enable gating deletions on validation of a tenant's generation number, to make
      52              : ///   it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
      53              : ///
      54              : /// There are two kinds of deletion: deferred and immediate.  A deferred deletion
      55              : /// may be intentionally delayed to protect passive readers of S3 data, and is
      56              : /// subject to a generation number validation step.  An immediate deletion is
      57              : /// ready to execute immediately, and is only queued up so that it can be coalesced
      58              : /// with other deletions in flight.
      59              : ///
      60              : /// Deferred deletions pass through three steps:
      61              : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
      62              : ///   DeletionLists, which are persisted to disk.
      63              : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
      64              : ///   the keys in the list onward for actual deletion.  Also validate remote_consistent_lsn
      65              : ///   updates for running timelines.
      66              : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
      67              : ///   batches of 1000 keys via DeleteObjects.
      68              : ///
      69              : /// Non-deferred deletions, such as during timeline deletion, bypass the first
      70              : /// two stages and are passed straight into the Deleter.
      71              : ///
      72              : /// Internally, each stage is joined by a channel to the next.  On disk, there is only
      73              : /// one queue (of DeletionLists), which is written by the frontend and consumed
      74              : /// by the backend.
      75              : #[derive(Clone)]
      76              : pub struct DeletionQueue {
      77              :     client: DeletionQueueClient,
      78              : 
      79              :     // Parent cancellation token for the tokens passed into background workers
      80              :     cancel: CancellationToken,
      81              : }
      82              : 
      83              : /// Opaque wrapper around individual worker tasks, to avoid making the
      84              : /// worker objects themselves public
      85              : pub struct DeletionQueueWorkers<C>
      86              : where
      87              :     C: ControlPlaneGenerationsApi + Send + Sync,
      88              : {
      89              :     frontend: ListWriter,
      90              :     backend: Validator<C>,
      91              :     executor: Deleter,
      92              : }
      93              : 
      94              : impl<C> DeletionQueueWorkers<C>
      95              : where
      96              :     C: ControlPlaneGenerationsApi + Send + Sync + 'static,
      97              : {
      98           24 :     pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
      99           24 :         let jh_frontend = runtime.spawn(async move {
     100           24 :             self.frontend
     101           24 :                 .background()
     102           24 :                 .instrument(tracing::info_span!(parent:None, "deletion frontend"))
     103          153 :                 .await
     104           24 :         });
     105           24 :         let jh_backend = runtime.spawn(async move {
     106           24 :             self.backend
     107           24 :                 .background()
     108           24 :                 .instrument(tracing::info_span!(parent:None, "deletion backend"))
     109          173 :                 .await
     110           24 :         });
     111           24 :         let jh_executor = runtime.spawn(async move {
     112           24 :             self.executor
     113           24 :                 .background()
     114           24 :                 .instrument(tracing::info_span!(parent:None, "deletion executor"))
     115           83 :                 .await
     116           24 :         });
     117           24 : 
     118           24 :         runtime.spawn({
     119           24 :             async move {
     120           24 :                 jh_frontend.await.expect("error joining frontend worker");
     121            6 :                 jh_backend.await.expect("error joining backend worker");
     122            6 :                 drop(jh_executor.await.expect("error joining executor worker"));
     123           24 :             }
     124           24 :         })
     125           24 :     }
     126              : }
     127              : 
     128              : /// A FlushOp is just a oneshot channel, where we send the transmit side down
     129              : /// another channel, and the receive side will receive a message when the channel
     130              : /// we're flushing has reached the FlushOp we sent into it.
     131              : ///
     132              : /// The only extra behavior beyond the channel is that the notify() method does not
     133              : /// return an error when the receive side has been dropped, because in this use case
     134              : /// it is harmless (the code that initiated the flush no longer cares about the result).
     135              : #[derive(Debug)]
     136              : struct FlushOp {
     137              :     tx: tokio::sync::oneshot::Sender<()>,
     138              : }
     139              : 
     140              : impl FlushOp {
     141          126 :     fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
     142          126 :         let (tx, rx) = tokio::sync::oneshot::channel::<()>();
     143          126 :         (Self { tx }, rx)
     144          126 :     }
     145              : 
     146          126 :     fn notify(self) {
     147          126 :         if self.tx.send(()).is_err() {
     148              :             // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
     149            0 :             debug!("deletion queue flush from dropped client");
     150          126 :         };
     151          126 :     }
     152              : }
     153              : 
     154              : #[derive(Clone, Debug)]
     155              : pub struct DeletionQueueClient {
     156              :     tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
     157              :     executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
     158              : 
     159              :     lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
     160              : }
     161              : 
     162           54 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     163              : struct TenantDeletionList {
     164              :     /// For each Timeline, a list of key fragments to append to the timeline remote path
     165              :     /// when reconstructing a full key
     166              :     timelines: HashMap<TimelineId, Vec<String>>,
     167              : 
     168              :     /// The generation in which this deletion was emitted: note that this may not be the
     169              :     /// same as the generation of any layers being deleted.  The generation of the layer
     170              :     /// has already been absorbed into the keys in `objects`
     171              :     generation: Generation,
     172              : }
     173              : 
     174              : impl TenantDeletionList {
     175           30 :     pub(crate) fn len(&self) -> usize {
     176           30 :         self.timelines.values().map(|v| v.len()).sum()
     177           30 :     }
     178              : }
     179              : 
     180              : /// Files ending with this suffix will be ignored and erased
     181              : /// during recovery as startup.
     182              : const TEMP_SUFFIX: &str = "tmp";
     183              : 
     184           90 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     185              : struct DeletionList {
     186              :     /// Serialization version, for future use
     187              :     version: u8,
     188              : 
     189              :     /// Used for constructing a unique key for each deletion list we write out.
     190              :     sequence: u64,
     191              : 
     192              :     /// To avoid repeating tenant/timeline IDs in every key, we store keys in
     193              :     /// nested HashMaps by TenantTimelineID.  Each Tenant only appears once
     194              :     /// with one unique generation ID: if someone tries to push a second generation
     195              :     /// ID for the same tenant, we will start a new DeletionList.
     196              :     tenants: HashMap<TenantShardId, TenantDeletionList>,
     197              : 
     198              :     /// Avoid having to walk `tenants` to calculate the number of keys in
     199              :     /// the nested deletion lists
     200              :     size: usize,
     201              : 
     202              :     /// Set to true when the list has undergone validation with the control
     203              :     /// plane and the remaining contents of `tenants` are valid.  A list may
     204              :     /// also be implicitly marked valid by DeletionHeader.validated_sequence
     205              :     /// advancing to >= DeletionList.sequence
     206              :     #[serde(default)]
     207              :     #[serde(skip_serializing_if = "std::ops::Not::not")]
     208              :     validated: bool,
     209              : }
     210              : 
     211            0 : #[derive(Debug, Serialize, Deserialize)]
     212              : struct DeletionHeader {
     213              :     /// Serialization version, for future use
     214              :     version: u8,
     215              : 
     216              :     /// The highest sequence number (inclusive) that has been validated.  All deletion
     217              :     /// lists on disk with a sequence <= this value are safe to execute.
     218              :     validated_sequence: u64,
     219              : }
     220              : 
     221              : impl DeletionHeader {
     222              :     const VERSION_LATEST: u8 = 1;
     223              : 
     224           24 :     fn new(validated_sequence: u64) -> Self {
     225           24 :         Self {
     226           24 :             version: Self::VERSION_LATEST,
     227           24 :             validated_sequence,
     228           24 :         }
     229           24 :     }
     230              : 
     231           24 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     232           24 :         debug!("Saving deletion list header {:?}", self);
     233           24 :         let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
     234           24 :         let header_path = conf.deletion_header_path();
     235           24 :         let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
     236           24 :         VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
     237           24 :             .await
     238           24 :             .maybe_fatal_err("save deletion header")?;
     239              : 
     240           24 :         Ok(())
     241           24 :     }
     242              : }
     243              : 
     244              : impl DeletionList {
     245              :     const VERSION_LATEST: u8 = 1;
     246           60 :     fn new(sequence: u64) -> Self {
     247           60 :         Self {
     248           60 :             version: Self::VERSION_LATEST,
     249           60 :             sequence,
     250           60 :             tenants: HashMap::new(),
     251           60 :             size: 0,
     252           60 :             validated: false,
     253           60 :         }
     254           60 :     }
     255              : 
     256           82 :     fn is_empty(&self) -> bool {
     257           82 :         self.tenants.is_empty()
     258           82 :     }
     259              : 
     260          180 :     fn len(&self) -> usize {
     261          180 :         self.size
     262          180 :     }
     263              : 
     264              :     /// Returns true if the push was accepted, false if the caller must start a new
     265              :     /// deletion list.
     266           42 :     fn push(
     267           42 :         &mut self,
     268           42 :         tenant: &TenantShardId,
     269           42 :         timeline: &TimelineId,
     270           42 :         generation: Generation,
     271           42 :         objects: &mut Vec<RemotePath>,
     272           42 :     ) -> bool {
     273           42 :         if objects.is_empty() {
     274              :             // Avoid inserting an empty TimelineDeletionList: this preserves the property
     275              :             // that if we have no keys, then self.objects is empty (used in Self::is_empty)
     276            0 :             return true;
     277           42 :         }
     278           42 : 
     279           42 :         let tenant_entry = self
     280           42 :             .tenants
     281           42 :             .entry(*tenant)
     282           42 :             .or_insert_with(|| TenantDeletionList {
     283           36 :                 timelines: HashMap::new(),
     284           36 :                 generation,
     285           42 :             });
     286           42 : 
     287           42 :         if tenant_entry.generation != generation {
     288              :             // Only one generation per tenant per list: signal to
     289              :             // caller to start a new list.
     290            6 :             return false;
     291           36 :         }
     292           36 : 
     293           36 :         let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
     294           36 : 
     295           36 :         let timeline_remote_path = remote_timeline_path(tenant, timeline);
     296           36 : 
     297           36 :         self.size += objects.len();
     298           36 :         timeline_entry.extend(objects.drain(..).map(|p| {
     299           36 :             p.strip_prefix(&timeline_remote_path)
     300           36 :                 .expect("Timeline paths always start with the timeline prefix")
     301           36 :                 .to_string()
     302           36 :         }));
     303           36 :         true
     304           42 :     }
     305              : 
     306           30 :     fn into_remote_paths(self) -> Vec<RemotePath> {
     307           30 :         let mut result = Vec::new();
     308           30 :         for (tenant, tenant_deletions) in self.tenants.into_iter() {
     309           18 :             for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
     310           18 :                 let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
     311           18 :                 result.extend(
     312           18 :                     timeline_layers
     313           18 :                         .into_iter()
     314           18 :                         .map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
     315           18 :                 );
     316           18 :             }
     317              :         }
     318              : 
     319           30 :         result
     320           30 :     }
     321              : 
     322           42 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     323           42 :         let path = conf.deletion_list_path(self.sequence);
     324           42 :         let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
     325           42 : 
     326           42 :         let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
     327           42 : 
     328           42 :         VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
     329           42 :             .await
     330           42 :             .maybe_fatal_err("save deletion list")
     331           42 :             .map_err(Into::into)
     332           42 :     }
     333              : }
     334              : 
     335              : impl std::fmt::Display for DeletionList {
     336            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     337            0 :         write!(
     338            0 :             f,
     339            0 :             "DeletionList<seq={}, tenants={}, keys={}>",
     340            0 :             self.sequence,
     341            0 :             self.tenants.len(),
     342            0 :             self.size
     343            0 :         )
     344            0 :     }
     345              : }
     346              : 
     347              : struct PendingLsn {
     348              :     projected: Lsn,
     349              :     result_slot: Arc<AtomicLsn>,
     350              : }
     351              : 
     352              : struct TenantLsnState {
     353              :     timelines: HashMap<TimelineId, PendingLsn>,
     354              : 
     355              :     // In what generation was the most recent update proposed?
     356              :     generation: Generation,
     357              : }
     358              : 
     359              : #[derive(Default)]
     360              : struct VisibleLsnUpdates {
     361              :     tenants: HashMap<TenantShardId, TenantLsnState>,
     362              : }
     363              : 
     364              : impl VisibleLsnUpdates {
     365          600 :     fn new() -> Self {
     366          600 :         Self {
     367          600 :             tenants: HashMap::new(),
     368          600 :         }
     369          600 :     }
     370              : }
     371              : 
     372              : impl std::fmt::Debug for VisibleLsnUpdates {
     373            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     374            0 :         write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
     375            0 :     }
     376              : }
     377              : 
     378            0 : #[derive(Error, Debug)]
     379              : pub enum DeletionQueueError {
     380              :     #[error("Deletion queue unavailable during shutdown")]
     381              :     ShuttingDown,
     382              : }
     383              : 
     384              : impl DeletionQueueClient {
     385              :     /// This is cancel-safe.  If you drop the future before it completes, the message
     386              :     /// is not pushed, although in the context of the deletion queue it doesn't matter: once
     387              :     /// we decide to do a deletion the decision is always final.
     388          839 :     fn do_push<T>(
     389          839 :         &self,
     390          839 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     391          839 :         msg: T,
     392          839 :     ) -> Result<(), DeletionQueueError> {
     393          839 :         match queue.send(msg) {
     394          839 :             Ok(_) => Ok(()),
     395            0 :             Err(e) => {
     396            0 :                 // This shouldn't happen, we should shut down all tenants before
     397            0 :                 // we shut down the global delete queue.  If we encounter a bug like this,
     398            0 :                 // we may leak objects as deletions won't be processed.
     399            0 :                 error!("Deletion queue closed while pushing, shutting down? ({e})");
     400            0 :                 Err(DeletionQueueError::ShuttingDown)
     401              :             }
     402              :         }
     403          839 :     }
     404              : 
     405           24 :     pub(crate) fn recover(
     406           24 :         &self,
     407           24 :         attached_tenants: HashMap<TenantShardId, Generation>,
     408           24 :     ) -> Result<(), DeletionQueueError> {
     409           24 :         self.do_push(
     410           24 :             &self.tx,
     411           24 :             ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
     412           24 :         )
     413           24 :     }
     414              : 
     415              :     /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
     416              :     /// world, it must validate its generation number before doing so.  Rather than do this synchronously,
     417              :     /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
     418              :     /// recently validated separately.
     419              :     ///
     420              :     /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates.  The
     421              :     /// backend will later wake up and notice that the tenant's generation requires validation.
     422         4223 :     pub(crate) async fn update_remote_consistent_lsn(
     423         4223 :         &self,
     424         4223 :         tenant_shard_id: TenantShardId,
     425         4223 :         timeline_id: TimelineId,
     426         4223 :         current_generation: Generation,
     427         4223 :         lsn: Lsn,
     428         4223 :         result_slot: Arc<AtomicLsn>,
     429         4223 :     ) {
     430         4223 :         let mut locked = self
     431         4223 :             .lsn_table
     432         4223 :             .write()
     433         4223 :             .expect("Lock should never be poisoned");
     434         4223 : 
     435         4223 :         let tenant_entry = locked
     436         4223 :             .tenants
     437         4223 :             .entry(tenant_shard_id)
     438         4223 :             .or_insert(TenantLsnState {
     439         4223 :                 timelines: HashMap::new(),
     440         4223 :                 generation: current_generation,
     441         4223 :             });
     442         4223 : 
     443         4223 :         if tenant_entry.generation != current_generation {
     444            0 :             // Generation might have changed if we were detached and then re-attached: in this case,
     445            0 :             // state from the previous generation cannot be trusted.
     446            0 :             tenant_entry.timelines.clear();
     447            0 :             tenant_entry.generation = current_generation;
     448         4223 :         }
     449              : 
     450         4223 :         tenant_entry.timelines.insert(
     451         4223 :             timeline_id,
     452         4223 :             PendingLsn {
     453         4223 :                 projected: lsn,
     454         4223 :                 result_slot,
     455         4223 :             },
     456         4223 :         );
     457         4223 :     }
     458              : 
     459              :     /// Submit a list of layers for deletion: this function will return before the deletion is
     460              :     /// persistent, but it may be executed at any time after this function enters: do not push
     461              :     /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
     462              :     /// references them).
     463              :     ///
     464              :     /// The `current_generation` is the generation of this pageserver's current attachment.  The
     465              :     /// generations in `layers` are the generations in which those layers were written.
     466          743 :     pub(crate) async fn push_layers(
     467          743 :         &self,
     468          743 :         tenant_shard_id: TenantShardId,
     469          743 :         timeline_id: TimelineId,
     470          743 :         current_generation: Generation,
     471          743 :         layers: Vec<(LayerName, LayerFileMetadata)>,
     472          743 :     ) -> Result<(), DeletionQueueError> {
     473          743 :         if current_generation.is_none() {
     474            0 :             debug!("Enqueuing deletions in legacy mode, skipping queue");
     475              : 
     476            0 :             let mut layer_paths = Vec::new();
     477            0 :             for (layer, meta) in layers {
     478            0 :                 layer_paths.push(remote_layer_path(
     479            0 :                     &tenant_shard_id.tenant_id,
     480            0 :                     &timeline_id,
     481            0 :                     meta.shard,
     482            0 :                     &layer,
     483            0 :                     meta.generation,
     484            0 :                 ));
     485            0 :             }
     486            0 :             self.push_immediate(layer_paths).await?;
     487            0 :             return self.flush_immediate().await;
     488          743 :         }
     489          743 : 
     490          743 :         self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
     491          743 :     }
     492              : 
     493              :     /// When a Tenant has a generation, push_layers is always synchronous because
     494              :     /// the ListValidator channel is an unbounded channel.
     495              :     ///
     496              :     /// This can be merged into push_layers when we remove the Generation-less mode
     497              :     /// support (`<https://github.com/neondatabase/neon/issues/5395>`)
     498          743 :     pub(crate) fn push_layers_sync(
     499          743 :         &self,
     500          743 :         tenant_shard_id: TenantShardId,
     501          743 :         timeline_id: TimelineId,
     502          743 :         current_generation: Generation,
     503          743 :         layers: Vec<(LayerName, LayerFileMetadata)>,
     504          743 :     ) -> Result<(), DeletionQueueError> {
     505          743 :         metrics::DELETION_QUEUE
     506          743 :             .keys_submitted
     507          743 :             .inc_by(layers.len() as u64);
     508          743 :         self.do_push(
     509          743 :             &self.tx,
     510          743 :             ListWriterQueueMessage::Delete(DeletionOp {
     511          743 :                 tenant_shard_id,
     512          743 :                 timeline_id,
     513          743 :                 layers,
     514          743 :                 generation: current_generation,
     515          743 :                 objects: Vec::new(),
     516          743 :             }),
     517          743 :         )
     518          743 :     }
     519              : 
     520              :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     521           72 :     async fn do_flush<T>(
     522           72 :         &self,
     523           72 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     524           72 :         msg: T,
     525           72 :         rx: tokio::sync::oneshot::Receiver<()>,
     526           72 :     ) -> Result<(), DeletionQueueError> {
     527           72 :         self.do_push(queue, msg)?;
     528           72 :         if rx.await.is_err() {
     529              :             // This shouldn't happen if tenants are shut down before deletion queue.  If we
     530              :             // encounter a bug like this, then a flusher will incorrectly believe it has flushed
     531              :             // when it hasn't, possibly leading to leaking objects.
     532            0 :             error!("Deletion queue dropped flush op while client was still waiting");
     533            0 :             Err(DeletionQueueError::ShuttingDown)
     534              :         } else {
     535           72 :             Ok(())
     536              :         }
     537           72 :     }
     538              : 
     539              :     /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
     540              :     ///
     541              :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     542           42 :     pub async fn flush(&self) -> Result<(), DeletionQueueError> {
     543           42 :         let (flush_op, rx) = FlushOp::new();
     544           42 :         self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
     545           42 :             .await
     546           42 :     }
     547              : 
     548              :     /// Issue a flush without waiting for it to complete.  This is useful on advisory flushes where
     549              :     /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
     550              :     /// detach where flushing is nice but not necessary.
     551              :     ///
     552              :     /// This function provides no guarantees of work being done.
     553            0 :     pub fn flush_advisory(&self) {
     554            0 :         let (flush_op, _) = FlushOp::new();
     555            0 : 
     556            0 :         // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
     557            0 :         drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
     558            0 :     }
     559              : 
     560              :     // Wait until all previous deletions are executed
     561           30 :     pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
     562           30 :         debug!("flush_execute: flushing to deletion lists...");
     563              :         // Flush any buffered work to deletion lists
     564           30 :         self.flush().await?;
     565              : 
     566              :         // Flush the backend into the executor of deletion lists
     567           30 :         let (flush_op, rx) = FlushOp::new();
     568           30 :         debug!("flush_execute: flushing backend...");
     569           30 :         self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
     570           30 :             .await?;
     571           30 :         debug!("flush_execute: finished flushing backend...");
     572              : 
     573              :         // Flush any immediate-mode deletions (the above backend flush will only flush
     574              :         // the executor if deletions had flowed through the backend)
     575           30 :         debug!("flush_execute: flushing execution...");
     576           30 :         self.flush_immediate().await?;
     577           30 :         debug!("flush_execute: finished flushing execution...");
     578           30 :         Ok(())
     579           30 :     }
     580              : 
     581              :     /// This interface bypasses the persistent deletion queue, and any validation
     582              :     /// that this pageserver is still elegible to execute the deletions.  It is for
     583              :     /// use in timeline deletions, where the control plane is telling us we may
     584              :     /// delete everything in the timeline.
     585              :     ///
     586              :     /// DO NOT USE THIS FROM GC OR COMPACTION CODE.  Use the regular `push_layers`.
     587            0 :     pub(crate) async fn push_immediate(
     588            0 :         &self,
     589            0 :         objects: Vec<RemotePath>,
     590            0 :     ) -> Result<(), DeletionQueueError> {
     591            0 :         metrics::DELETION_QUEUE
     592            0 :             .keys_submitted
     593            0 :             .inc_by(objects.len() as u64);
     594            0 :         self.executor_tx
     595            0 :             .send(DeleterMessage::Delete(objects))
     596            0 :             .await
     597            0 :             .map_err(|_| DeletionQueueError::ShuttingDown)
     598            0 :     }
     599              : 
     600              :     /// Companion to push_immediate.  When this returns Ok, all prior objects sent
     601              :     /// into push_immediate have been deleted from remote storage.
     602           30 :     pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
     603           30 :         let (flush_op, rx) = FlushOp::new();
     604           30 :         self.executor_tx
     605           30 :             .send(DeleterMessage::Flush(flush_op))
     606            0 :             .await
     607           30 :             .map_err(|_| DeletionQueueError::ShuttingDown)?;
     608              : 
     609           30 :         rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
     610           30 :     }
     611              : }
     612              : 
     613              : impl DeletionQueue {
     614           24 :     pub fn new_client(&self) -> DeletionQueueClient {
     615           24 :         self.client.clone()
     616           24 :     }
     617              : 
     618              :     /// Caller may use the returned object to construct clients with new_client.
     619              :     /// Caller should tokio::spawn the background() members of the two worker objects returned:
     620              :     /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
     621              :     ///
     622              :     /// If remote_storage is None, then the returned workers will also be None.
     623           24 :     pub fn new<C>(
     624           24 :         remote_storage: GenericRemoteStorage,
     625           24 :         control_plane_client: Option<C>,
     626           24 :         conf: &'static PageServerConf,
     627           24 :     ) -> (Self, Option<DeletionQueueWorkers<C>>)
     628           24 :     where
     629           24 :         C: ControlPlaneGenerationsApi + Send + Sync,
     630           24 :     {
     631           24 :         // Unbounded channel: enables non-async functions to submit deletions.  The actual length is
     632           24 :         // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
     633           24 :         // enough to avoid this taking pathologically large amount of memory.
     634           24 :         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
     635           24 : 
     636           24 :         // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
     637           24 :         let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
     638           24 : 
     639           24 :         // Shallow channel: it carries lists of paths, and we expect the main queueing to
     640           24 :         // happen in the backend (persistent), not in this queue.
     641           24 :         let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
     642           24 : 
     643           24 :         let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
     644           24 : 
     645           24 :         // The deletion queue has an independent cancellation token to
     646           24 :         // the general pageserver shutdown token, because it stays alive a bit
     647           24 :         // longer to flush after Tenants have all been torn down.
     648           24 :         let cancel = CancellationToken::new();
     649           24 : 
     650           24 :         (
     651           24 :             Self {
     652           24 :                 client: DeletionQueueClient {
     653           24 :                     tx,
     654           24 :                     executor_tx: executor_tx.clone(),
     655           24 :                     lsn_table: lsn_table.clone(),
     656           24 :                 },
     657           24 :                 cancel: cancel.clone(),
     658           24 :             },
     659           24 :             Some(DeletionQueueWorkers {
     660           24 :                 frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
     661           24 :                 backend: Validator::new(
     662           24 :                     conf,
     663           24 :                     backend_rx,
     664           24 :                     executor_tx,
     665           24 :                     control_plane_client,
     666           24 :                     lsn_table.clone(),
     667           24 :                     cancel.clone(),
     668           24 :                 ),
     669           24 :                 executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
     670           24 :             }),
     671           24 :         )
     672           24 :     }
     673              : 
     674            0 :     pub async fn shutdown(&mut self, timeout: Duration) {
     675            0 :         match tokio::time::timeout(timeout, self.client.flush()).await {
     676              :             Ok(Ok(())) => {
     677            0 :                 tracing::info!("Deletion queue flushed successfully on shutdown")
     678              :             }
     679              :             Ok(Err(DeletionQueueError::ShuttingDown)) => {
     680              :                 // This is not harmful for correctness, but is unexpected: the deletion
     681              :                 // queue's workers should stay alive as long as there are any client handles instantiated.
     682            0 :                 tracing::warn!("Deletion queue stopped prematurely");
     683              :             }
     684            0 :             Err(_timeout) => {
     685            0 :                 tracing::warn!("Timed out flushing deletion queue on shutdown")
     686              :             }
     687              :         }
     688              : 
     689              :         // We only cancel _after_ flushing: otherwise we would be shutting down the
     690              :         // components that do the flush.
     691            0 :         self.cancel.cancel();
     692            0 :     }
     693              : }
     694              : 
     695              : #[cfg(test)]
     696              : mod test {
     697              :     use camino::Utf8Path;
     698              :     use hex_literal::hex;
     699              :     use pageserver_api::{shard::ShardIndex, upcall_api::ReAttachResponseTenant};
     700              :     use std::{io::ErrorKind, time::Duration};
     701              :     use tracing::info;
     702              : 
     703              :     use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
     704              :     use tokio::task::JoinHandle;
     705              : 
     706              :     use crate::{
     707              :         control_plane_client::RetryForeverError,
     708              :         repository::Key,
     709              :         tenant::{harness::TenantHarness, storage_layer::DeltaLayerName},
     710              :     };
     711              : 
     712              :     use super::*;
     713              :     pub const TIMELINE_ID: TimelineId =
     714              :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
     715              : 
     716              :     pub const EXAMPLE_LAYER_NAME: LayerName = LayerName::Delta(DeltaLayerName {
     717              :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     718              :         lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
     719              :     });
     720              : 
     721              :     // When you need a second layer in a test.
     722              :     pub const EXAMPLE_LAYER_NAME_ALT: LayerName = LayerName::Delta(DeltaLayerName {
     723              :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     724              :         lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
     725              :     });
     726              : 
     727              :     struct TestSetup {
     728              :         harness: TenantHarness,
     729              :         remote_fs_dir: Utf8PathBuf,
     730              :         storage: GenericRemoteStorage,
     731              :         mock_control_plane: MockControlPlane,
     732              :         deletion_queue: DeletionQueue,
     733              :         worker_join: JoinHandle<()>,
     734              :     }
     735              : 
     736              :     impl TestSetup {
     737              :         /// Simulate a pageserver restart by destroying and recreating the deletion queue
     738            6 :         async fn restart(&mut self) {
     739            6 :             let (deletion_queue, workers) = DeletionQueue::new(
     740            6 :                 self.storage.clone(),
     741            6 :                 Some(self.mock_control_plane.clone()),
     742            6 :                 self.harness.conf,
     743            6 :             );
     744            6 : 
     745            6 :             tracing::debug!("Spawning worker for new queue queue");
     746            6 :             let worker_join = workers
     747            6 :                 .unwrap()
     748            6 :                 .spawn_with(&tokio::runtime::Handle::current());
     749            6 : 
     750            6 :             let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
     751            6 :             let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
     752            6 : 
     753            6 :             tracing::debug!("Joining worker from previous queue");
     754            6 :             old_deletion_queue.cancel.cancel();
     755            6 :             old_worker_join
     756            6 :                 .await
     757            6 :                 .expect("Failed to join workers for previous deletion queue");
     758            6 :         }
     759              : 
     760           18 :         fn set_latest_generation(&self, gen: Generation) {
     761           18 :             let tenant_shard_id = self.harness.tenant_shard_id;
     762           18 :             self.mock_control_plane
     763           18 :                 .latest_generation
     764           18 :                 .lock()
     765           18 :                 .unwrap()
     766           18 :                 .insert(tenant_shard_id, gen);
     767           18 :         }
     768              : 
     769              :         /// Returns remote layer file name, suitable for use in assert_remote_files
     770           18 :         fn write_remote_layer(
     771           18 :             &self,
     772           18 :             file_name: LayerName,
     773           18 :             gen: Generation,
     774           18 :         ) -> anyhow::Result<String> {
     775           18 :             let tenant_shard_id = self.harness.tenant_shard_id;
     776           18 :             let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     777           18 :             let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
     778           18 :             std::fs::create_dir_all(&remote_timeline_path)?;
     779           18 :             let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
     780           18 : 
     781           18 :             let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
     782           18 : 
     783           18 :             std::fs::write(
     784           18 :                 remote_timeline_path.join(remote_layer_file_name.clone()),
     785           18 :                 content,
     786           18 :             )?;
     787              : 
     788           18 :             Ok(remote_layer_file_name)
     789           18 :         }
     790              :     }
     791              : 
     792              :     #[derive(Debug, Clone)]
     793              :     struct MockControlPlane {
     794              :         pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantShardId, Generation>>>,
     795              :     }
     796              : 
     797              :     impl MockControlPlane {
     798           18 :         fn new() -> Self {
     799           18 :             Self {
     800           18 :                 latest_generation: Arc::default(),
     801           18 :             }
     802           18 :         }
     803              :     }
     804              : 
     805              :     impl ControlPlaneGenerationsApi for MockControlPlane {
     806            0 :         async fn re_attach(
     807            0 :             &self,
     808            0 :             _conf: &PageServerConf,
     809            0 :         ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
     810            0 :             unimplemented!()
     811              :         }
     812              : 
     813           24 :         async fn validate(
     814           24 :             &self,
     815           24 :             tenants: Vec<(TenantShardId, Generation)>,
     816           24 :         ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     817           24 :             let mut result = HashMap::new();
     818           24 : 
     819           24 :             let latest_generation = self.latest_generation.lock().unwrap();
     820              : 
     821           48 :             for (tenant_shard_id, generation) in tenants {
     822           24 :                 if let Some(latest) = latest_generation.get(&tenant_shard_id) {
     823           24 :                     result.insert(tenant_shard_id, *latest == generation);
     824           24 :                 }
     825              :             }
     826              : 
     827           24 :             Ok(result)
     828           24 :         }
     829              :     }
     830              : 
     831           18 :     async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
     832           18 :         let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
     833           18 :         let harness = TenantHarness::create(test_name).await?;
     834              : 
     835              :         // We do not load() the harness: we only need its config and remote_storage
     836              : 
     837              :         // Set up a GenericRemoteStorage targetting a directory
     838           18 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs");
     839           18 :         std::fs::create_dir_all(remote_fs_dir)?;
     840           18 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
     841           18 :         let storage_config = RemoteStorageConfig {
     842           18 :             storage: RemoteStorageKind::LocalFs {
     843           18 :                 local_path: remote_fs_dir.clone(),
     844           18 :             },
     845           18 :             timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     846           18 :         };
     847           18 :         let storage = GenericRemoteStorage::from_config(&storage_config)
     848            0 :             .await
     849           18 :             .unwrap();
     850           18 : 
     851           18 :         let mock_control_plane = MockControlPlane::new();
     852           18 : 
     853           18 :         let (deletion_queue, worker) = DeletionQueue::new(
     854           18 :             storage.clone(),
     855           18 :             Some(mock_control_plane.clone()),
     856           18 :             harness.conf,
     857           18 :         );
     858           18 : 
     859           18 :         let worker = worker.unwrap();
     860           18 :         let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
     861           18 : 
     862           18 :         Ok(TestSetup {
     863           18 :             harness,
     864           18 :             remote_fs_dir,
     865           18 :             storage,
     866           18 :             mock_control_plane,
     867           18 :             deletion_queue,
     868           18 :             worker_join,
     869           18 :         })
     870           18 :     }
     871              : 
     872              :     // TODO: put this in a common location so that we can share with remote_timeline_client's tests
     873           54 :     fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
     874           54 :         let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
     875           54 :         expected.sort();
     876           54 : 
     877           54 :         let mut found: Vec<String> = Vec::new();
     878           54 :         let dir = match std::fs::read_dir(remote_path) {
     879           54 :             Ok(d) => d,
     880            0 :             Err(e) => {
     881            0 :                 if e.kind() == ErrorKind::NotFound {
     882            0 :                     if expected.is_empty() {
     883              :                         // We are asserting prefix is empty: it is expected that the dir is missing
     884            0 :                         return;
     885              :                     } else {
     886            0 :                         assert_eq!(expected, Vec::<String>::new());
     887            0 :                         unreachable!();
     888              :                     }
     889              :                 } else {
     890            0 :                     panic!("Unexpected error listing {remote_path}: {e}");
     891              :                 }
     892              :             }
     893              :         };
     894              : 
     895           54 :         for entry in dir.flatten() {
     896           48 :             let entry_name = entry.file_name();
     897           48 :             let fname = entry_name.to_str().unwrap();
     898           48 :             found.push(String::from(fname));
     899           48 :         }
     900           54 :         found.sort();
     901           54 : 
     902           54 :         assert_eq!(expected, found);
     903           54 :     }
     904              : 
     905           30 :     fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
     906           30 :         let dir = match std::fs::read_dir(directory) {
     907           24 :             Ok(d) => d,
     908              :             Err(_) => {
     909            6 :                 assert_eq!(expected, &Vec::<String>::new());
     910            6 :                 return;
     911              :             }
     912              :         };
     913           24 :         let mut found = Vec::new();
     914           54 :         for dentry in dir {
     915           30 :             let dentry = dentry.unwrap();
     916           30 :             let file_name = dentry.file_name();
     917           30 :             let file_name_str = file_name.to_string_lossy();
     918           30 :             found.push(file_name_str.to_string());
     919           30 :         }
     920           24 :         found.sort();
     921           24 :         assert_eq!(expected, found);
     922           30 :     }
     923              : 
     924              :     #[tokio::test]
     925            6 :     async fn deletion_queue_smoke() -> anyhow::Result<()> {
     926            6 :         // Basic test that the deletion queue processes the deletions we pass into it
     927            6 :         let ctx = setup("deletion_queue_smoke")
     928            6 :             .await
     929            6 :             .expect("Failed test setup");
     930            6 :         let client = ctx.deletion_queue.new_client();
     931            6 :         client.recover(HashMap::new())?;
     932            6 : 
     933            6 :         let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
     934            6 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
     935            6 : 
     936            6 :         let content: Vec<u8> = "victim1 contents".into();
     937            6 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     938            6 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
     939            6 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
     940            6 : 
     941            6 :         // Exercise the distinction between the generation of the layers
     942            6 :         // we delete, and the generation of the running Tenant.
     943            6 :         let layer_generation = Generation::new(0xdeadbeef);
     944            6 :         let now_generation = Generation::new(0xfeedbeef);
     945            6 :         let layer_metadata =
     946            6 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
     947            6 : 
     948            6 :         let remote_layer_file_name_1 =
     949            6 :             format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
     950            6 : 
     951            6 :         // Set mock control plane state to valid for our generation
     952            6 :         ctx.set_latest_generation(now_generation);
     953            6 : 
     954            6 :         // Inject a victim file to remote storage
     955            6 :         info!("Writing");
     956            6 :         std::fs::create_dir_all(&remote_timeline_path)?;
     957            6 :         std::fs::write(
     958            6 :             remote_timeline_path.join(remote_layer_file_name_1.clone()),
     959            6 :             content,
     960            6 :         )?;
     961            6 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     962            6 : 
     963            6 :         // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
     964            6 :         info!("Pushing");
     965            6 :         client
     966            6 :             .push_layers(
     967            6 :                 tenant_shard_id,
     968            6 :                 TIMELINE_ID,
     969            6 :                 now_generation,
     970            6 :                 [(layer_file_name_1.clone(), layer_metadata)].to_vec(),
     971            6 :             )
     972            6 :             .await?;
     973            6 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     974            6 : 
     975            6 :         assert_local_files(&[], &deletion_prefix);
     976            6 : 
     977            6 :         // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
     978            6 :         info!("Flushing");
     979            6 :         client.flush().await?;
     980            6 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     981            6 :         assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
     982            6 : 
     983            6 :         // File should go away when we execute
     984            6 :         info!("Flush-executing");
     985           18 :         client.flush_execute().await?;
     986            6 :         assert_remote_files(&[], &remote_timeline_path);
     987            6 :         assert_local_files(&["header-01"], &deletion_prefix);
     988            6 : 
     989            6 :         // Flushing on an empty queue should succeed immediately, and not write any lists
     990            6 :         info!("Flush-executing on empty");
     991           18 :         client.flush_execute().await?;
     992            6 :         assert_local_files(&["header-01"], &deletion_prefix);
     993            6 : 
     994            6 :         Ok(())
     995            6 :     }
     996              : 
     997              :     #[tokio::test]
     998            6 :     async fn deletion_queue_validation() -> anyhow::Result<()> {
     999            6 :         let ctx = setup("deletion_queue_validation")
    1000            6 :             .await
    1001            6 :             .expect("Failed test setup");
    1002            6 :         let client = ctx.deletion_queue.new_client();
    1003            6 :         client.recover(HashMap::new())?;
    1004            6 : 
    1005            6 :         // Generation that the control plane thinks is current
    1006            6 :         let latest_generation = Generation::new(0xdeadbeef);
    1007            6 :         // Generation that our DeletionQueue thinks the tenant is running with
    1008            6 :         let stale_generation = latest_generation.previous();
    1009            6 :         // Generation that our example layer file was written with
    1010            6 :         let layer_generation = stale_generation.previous();
    1011            6 :         let layer_metadata =
    1012            6 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
    1013            6 : 
    1014            6 :         ctx.set_latest_generation(latest_generation);
    1015            6 : 
    1016            6 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
    1017            6 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
    1018            6 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
    1019            6 : 
    1020            6 :         // Initial state: a remote layer exists
    1021            6 :         let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
    1022            6 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
    1023            6 : 
    1024            6 :         tracing::debug!("Pushing...");
    1025            6 :         client
    1026            6 :             .push_layers(
    1027            6 :                 tenant_shard_id,
    1028            6 :                 TIMELINE_ID,
    1029            6 :                 stale_generation,
    1030            6 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1031            6 :             )
    1032            6 :             .await?;
    1033            6 : 
    1034            6 :         // We enqueued the operation in a stale generation: it should have failed validation
    1035            6 :         tracing::debug!("Flushing...");
    1036           18 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1037            6 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
    1038            6 : 
    1039            6 :         tracing::debug!("Pushing...");
    1040            6 :         client
    1041            6 :             .push_layers(
    1042            6 :                 tenant_shard_id,
    1043            6 :                 TIMELINE_ID,
    1044            6 :                 latest_generation,
    1045            6 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1046            6 :             )
    1047            6 :             .await?;
    1048            6 : 
    1049            6 :         // We enqueued the operation in a fresh generation: it should have passed validation
    1050            6 :         tracing::debug!("Flushing...");
    1051           18 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1052            6 :         assert_remote_files(&[], &remote_timeline_path);
    1053            6 : 
    1054            6 :         Ok(())
    1055            6 :     }
    1056              : 
    1057              :     #[tokio::test]
    1058            6 :     async fn deletion_queue_recovery() -> anyhow::Result<()> {
    1059            6 :         // Basic test that the deletion queue processes the deletions we pass into it
    1060            6 :         let mut ctx = setup("deletion_queue_recovery")
    1061            6 :             .await
    1062            6 :             .expect("Failed test setup");
    1063            6 :         let client = ctx.deletion_queue.new_client();
    1064            6 :         client.recover(HashMap::new())?;
    1065            6 : 
    1066            6 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
    1067            6 : 
    1068            6 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
    1069            6 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
    1070            6 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
    1071            6 : 
    1072            6 :         let layer_generation = Generation::new(0xdeadbeef);
    1073            6 :         let now_generation = Generation::new(0xfeedbeef);
    1074            6 :         let layer_metadata =
    1075            6 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
    1076            6 : 
    1077            6 :         // Inject a deletion in the generation before generation_now: after restart,
    1078            6 :         // this deletion should _not_ get executed (only the immediately previous
    1079            6 :         // generation gets that treatment)
    1080            6 :         let remote_layer_file_name_historical =
    1081            6 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
    1082            6 :         client
    1083            6 :             .push_layers(
    1084            6 :                 tenant_shard_id,
    1085            6 :                 TIMELINE_ID,
    1086            6 :                 now_generation.previous(),
    1087            6 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1088            6 :             )
    1089            6 :             .await?;
    1090            6 : 
    1091            6 :         // Inject a deletion in the generation before generation_now: after restart,
    1092            6 :         // this deletion should get executed, because we execute deletions in the
    1093            6 :         // immediately previous generation on the same node.
    1094            6 :         let remote_layer_file_name_previous =
    1095            6 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
    1096            6 :         client
    1097            6 :             .push_layers(
    1098            6 :                 tenant_shard_id,
    1099            6 :                 TIMELINE_ID,
    1100            6 :                 now_generation,
    1101            6 :                 [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
    1102            6 :             )
    1103            6 :             .await?;
    1104            6 : 
    1105            6 :         client.flush().await?;
    1106            6 :         assert_remote_files(
    1107            6 :             &[
    1108            6 :                 &remote_layer_file_name_historical,
    1109            6 :                 &remote_layer_file_name_previous,
    1110            6 :             ],
    1111            6 :             &remote_timeline_path,
    1112            6 :         );
    1113            6 : 
    1114            6 :         // Different generatinos for the same tenant will cause two separate
    1115            6 :         // deletion lists to be emitted.
    1116            6 :         assert_local_files(
    1117            6 :             &["0000000000000001-01.list", "0000000000000002-01.list"],
    1118            6 :             &deletion_prefix,
    1119            6 :         );
    1120            6 : 
    1121            6 :         // Simulate a node restart: the latest generation advances
    1122            6 :         let now_generation = now_generation.next();
    1123            6 :         ctx.set_latest_generation(now_generation);
    1124            6 : 
    1125            6 :         // Restart the deletion queue
    1126            6 :         drop(client);
    1127            6 :         ctx.restart().await;
    1128            6 :         let client = ctx.deletion_queue.new_client();
    1129            6 :         client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
    1130            6 : 
    1131            6 :         info!("Flush-executing");
    1132           18 :         client.flush_execute().await?;
    1133            6 :         // The deletion from immediately prior generation was executed, the one from
    1134            6 :         // an older generation was not.
    1135            6 :         assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
    1136            6 :         Ok(())
    1137            6 :     }
    1138              : }
    1139              : 
    1140              : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
    1141              : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
    1142              : #[cfg(test)]
    1143              : pub(crate) mod mock {
    1144              :     use tracing::info;
    1145              : 
    1146              :     use super::*;
    1147              :     use std::sync::atomic::{AtomicUsize, Ordering};
    1148              : 
    1149              :     pub struct ConsumerState {
    1150              :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
    1151              :         executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
    1152              :         cancel: CancellationToken,
    1153              :     }
    1154              : 
    1155              :     impl ConsumerState {
    1156            6 :         async fn consume(&mut self, remote_storage: &GenericRemoteStorage) -> usize {
    1157            6 :             let mut executed = 0;
    1158            6 : 
    1159            6 :             info!("Executing all pending deletions");
    1160              : 
    1161              :             // Transform all executor messages to generic frontend messages
    1162            6 :             while let Ok(msg) = self.executor_rx.try_recv() {
    1163            0 :                 match msg {
    1164            0 :                     DeleterMessage::Delete(objects) => {
    1165            0 :                         for path in objects {
    1166            0 :                             match remote_storage.delete(&path, &self.cancel).await {
    1167              :                                 Ok(_) => {
    1168            0 :                                     debug!("Deleted {path}");
    1169              :                                 }
    1170            0 :                                 Err(e) => {
    1171            0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1172              :                                 }
    1173              :                             }
    1174            0 :                             executed += 1;
    1175              :                         }
    1176              :                     }
    1177            0 :                     DeleterMessage::Flush(flush_op) => {
    1178            0 :                         flush_op.notify();
    1179            0 :                     }
    1180              :                 }
    1181              :             }
    1182              : 
    1183           12 :             while let Ok(msg) = self.rx.try_recv() {
    1184            6 :                 match msg {
    1185            6 :                     ListWriterQueueMessage::Delete(op) => {
    1186            6 :                         let mut objects = op.objects;
    1187           12 :                         for (layer, meta) in op.layers {
    1188            6 :                             objects.push(remote_layer_path(
    1189            6 :                                 &op.tenant_shard_id.tenant_id,
    1190            6 :                                 &op.timeline_id,
    1191            6 :                                 meta.shard,
    1192            6 :                                 &layer,
    1193            6 :                                 meta.generation,
    1194            6 :                             ));
    1195            6 :                         }
    1196              : 
    1197           12 :                         for path in objects {
    1198            6 :                             info!("Executing deletion {path}");
    1199            6 :                             match remote_storage.delete(&path, &self.cancel).await {
    1200              :                                 Ok(_) => {
    1201            6 :                                     debug!("Deleted {path}");
    1202              :                                 }
    1203            0 :                                 Err(e) => {
    1204            0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1205              :                                 }
    1206              :                             }
    1207            6 :                             executed += 1;
    1208              :                         }
    1209              :                     }
    1210            0 :                     ListWriterQueueMessage::Flush(op) => {
    1211            0 :                         op.notify();
    1212            0 :                     }
    1213            0 :                     ListWriterQueueMessage::FlushExecute(op) => {
    1214            0 :                         // We have already executed all prior deletions because mock does them inline
    1215            0 :                         op.notify();
    1216            0 :                     }
    1217            0 :                     ListWriterQueueMessage::Recover(_) => {
    1218            0 :                         // no-op in mock
    1219            0 :                     }
    1220              :                 }
    1221            6 :                 info!("All pending deletions have been executed");
    1222              :             }
    1223              : 
    1224            6 :             executed
    1225            6 :         }
    1226              :     }
    1227              : 
    1228              :     pub struct MockDeletionQueue {
    1229              :         tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
    1230              :         executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
    1231              :         executed: Arc<AtomicUsize>,
    1232              :         remote_storage: Option<GenericRemoteStorage>,
    1233              :         consumer: std::sync::Mutex<ConsumerState>,
    1234              :         lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
    1235              :     }
    1236              : 
    1237              :     impl MockDeletionQueue {
    1238          576 :         pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
    1239          576 :             let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    1240          576 :             let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
    1241          576 : 
    1242          576 :             let executed = Arc::new(AtomicUsize::new(0));
    1243          576 : 
    1244          576 :             Self {
    1245          576 :                 tx,
    1246          576 :                 executor_tx,
    1247          576 :                 executed,
    1248          576 :                 remote_storage,
    1249          576 :                 consumer: std::sync::Mutex::new(ConsumerState {
    1250          576 :                     rx,
    1251          576 :                     executor_rx,
    1252          576 :                     cancel: CancellationToken::new(),
    1253          576 :                 }),
    1254          576 :                 lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
    1255          576 :             }
    1256          576 :         }
    1257              : 
    1258              :         #[allow(clippy::await_holding_lock)]
    1259            6 :         pub async fn pump(&self) {
    1260            6 :             if let Some(remote_storage) = &self.remote_storage {
    1261              :                 // Permit holding mutex across await, because this is only ever
    1262              :                 // called once at a time in tests.
    1263            6 :                 let mut locked = self.consumer.lock().unwrap();
    1264            6 :                 let count = locked.consume(remote_storage).await;
    1265            6 :                 self.executed.fetch_add(count, Ordering::Relaxed);
    1266            0 :             }
    1267            6 :         }
    1268              : 
    1269          606 :         pub(crate) fn new_client(&self) -> DeletionQueueClient {
    1270          606 :             DeletionQueueClient {
    1271          606 :                 tx: self.tx.clone(),
    1272          606 :                 executor_tx: self.executor_tx.clone(),
    1273          606 :                 lsn_table: self.lsn_table.clone(),
    1274          606 :             }
    1275          606 :         }
    1276              :     }
    1277              : 
    1278              :     /// Test round-trip serialization/deserialization, and test stability of the format
    1279              :     /// vs. a static expected string for the serialized version.
    1280              :     #[test]
    1281            6 :     fn deletion_list_serialization() -> anyhow::Result<()> {
    1282            6 :         let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
    1283            6 :             .to_string()
    1284            6 :             .parse::<TenantShardId>()?;
    1285            6 :         let timeline_id = "be322c834ed9e709e63b5c9698691910"
    1286            6 :             .to_string()
    1287            6 :             .parse::<TimelineId>()?;
    1288            6 :         let generation = Generation::new(123);
    1289              : 
    1290            6 :         let object =
    1291            6 :             RemotePath::from_string(&format!("tenants/{tenant_id}/timelines/{timeline_id}/foo"))?;
    1292            6 :         let mut objects = [object].to_vec();
    1293            6 : 
    1294            6 :         let mut example = DeletionList::new(1);
    1295            6 :         example.push(&tenant_id, &timeline_id, generation, &mut objects);
    1296              : 
    1297            6 :         let encoded = serde_json::to_string(&example)?;
    1298              : 
    1299            6 :         let expected = "{\"version\":1,\"sequence\":1,\"tenants\":{\"ad6c1a56f5680419d3a16ff55d97ec3c\":{\"timelines\":{\"be322c834ed9e709e63b5c9698691910\":[\"foo\"]},\"generation\":123}},\"size\":1}".to_string();
    1300            6 :         assert_eq!(encoded, expected);
    1301              : 
    1302            6 :         let decoded = serde_json::from_str::<DeletionList>(&encoded)?;
    1303            6 :         assert_eq!(example, decoded);
    1304              : 
    1305            6 :         Ok(())
    1306            6 :     }
    1307              : }
        

Generated by: LCOV version 2.1-beta