LCOV - code coverage report
Current view: top level - pageserver/src - deletion_queue.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 88.3 % 809 714
Test Date: 2025-04-24 20:31:15 Functions: 61.3 % 111 68

            Line data    Source code
       1              : mod deleter;
       2              : mod list_writer;
       3              : mod validator;
       4              : 
       5              : use std::collections::HashMap;
       6              : use std::sync::Arc;
       7              : use std::time::Duration;
       8              : 
       9              : use anyhow::Context;
      10              : use camino::Utf8PathBuf;
      11              : use deleter::DeleterMessage;
      12              : use list_writer::ListWriterQueueMessage;
      13              : use pageserver_api::shard::TenantShardId;
      14              : use remote_storage::{GenericRemoteStorage, RemotePath};
      15              : use serde::{Deserialize, Serialize};
      16              : use thiserror::Error;
      17              : use tokio_util::sync::CancellationToken;
      18              : use tracing::{Instrument, debug, error};
      19              : use utils::crashsafe::path_with_suffix_extension;
      20              : use utils::generation::Generation;
      21              : use utils::id::TimelineId;
      22              : use utils::lsn::{AtomicLsn, Lsn};
      23              : use validator::ValidatorQueueMessage;
      24              : 
      25              : use self::deleter::Deleter;
      26              : use self::list_writer::{DeletionOp, ListWriter, RecoverOp};
      27              : use self::validator::Validator;
      28              : use crate::config::PageServerConf;
      29              : use crate::controller_upcall_client::StorageControllerUpcallApi;
      30              : use crate::metrics;
      31              : use crate::tenant::remote_timeline_client::{LayerFileMetadata, remote_timeline_path};
      32              : use crate::tenant::storage_layer::LayerName;
      33              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      34              : 
      35              : // TODO: configurable for how long to wait before executing deletions
      36              : 
      37              : /// We aggregate object deletions from many tenants in one place, for several reasons:
      38              : /// - Coalesce deletions into fewer DeleteObjects calls
      39              : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
      40              : ///   to flush any outstanding deletions.
      41              : /// - Globally control throughput of deletions, as these are a low priority task: do
      42              : ///   not compete with the same S3 clients/connections used for higher priority uploads.
      43              : /// - Enable gating deletions on validation of a tenant's generation number, to make
      44              : ///   it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
      45              : ///
      46              : /// There are two kinds of deletion: deferred and immediate.  A deferred deletion
      47              : /// may be intentionally delayed to protect passive readers of S3 data, and is
      48              : /// subject to a generation number validation step.  An immediate deletion is
      49              : /// ready to execute immediately, and is only queued up so that it can be coalesced
      50              : /// with other deletions in flight.
      51              : ///
      52              : /// Deferred deletions pass through three steps:
      53              : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
      54              : ///   DeletionLists, which are persisted to disk.
      55              : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
      56              : ///   the keys in the list onward for actual deletion.  Also validate remote_consistent_lsn
      57              : ///   updates for running timelines.
      58              : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
      59              : ///   batches of 1000 keys via DeleteObjects.
      60              : ///
      61              : /// Non-deferred deletions, such as during timeline deletion, bypass the first
      62              : /// two stages and are passed straight into the Deleter.
      63              : ///
      64              : /// Internally, each stage is joined by a channel to the next.  On disk, there is only
      65              : /// one queue (of DeletionLists), which is written by the frontend and consumed
      66              : /// by the backend.
      67              : #[derive(Clone)]
      68              : pub struct DeletionQueue {
      69              :     client: DeletionQueueClient,
      70              : 
      71              :     // Parent cancellation token for the tokens passed into background workers
      72              :     cancel: CancellationToken,
      73              : }
      74              : 
      75              : /// Opaque wrapper around individual worker tasks, to avoid making the
      76              : /// worker objects themselves public
      77              : pub struct DeletionQueueWorkers<C>
      78              : where
      79              :     C: StorageControllerUpcallApi + Send + Sync,
      80              : {
      81              :     frontend: ListWriter,
      82              :     backend: Validator<C>,
      83              :     executor: Deleter,
      84              : }
      85              : 
      86              : impl<C> DeletionQueueWorkers<C>
      87              : where
      88              :     C: StorageControllerUpcallApi + Send + Sync + 'static,
      89              : {
      90           48 :     pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
      91           48 :         let jh_frontend = runtime.spawn(async move {
      92           48 :             self.frontend
      93           48 :                 .background()
      94           48 :                 .instrument(tracing::info_span!(parent:None, "deletion frontend"))
      95           48 :                 .await
      96           48 :         });
      97           48 :         let jh_backend = runtime.spawn(async move {
      98           48 :             self.backend
      99           48 :                 .background()
     100           48 :                 .instrument(tracing::info_span!(parent:None, "deletion backend"))
     101           48 :                 .await
     102           48 :         });
     103           48 :         let jh_executor = runtime.spawn(async move {
     104           48 :             self.executor
     105           48 :                 .background()
     106           48 :                 .instrument(tracing::info_span!(parent:None, "deletion executor"))
     107           48 :                 .await
     108           48 :         });
     109           48 : 
     110           48 :         runtime.spawn({
     111           48 :             async move {
     112           48 :                 jh_frontend.await.expect("error joining frontend worker");
     113           12 :                 jh_backend.await.expect("error joining backend worker");
     114           12 :                 drop(jh_executor.await.expect("error joining executor worker"));
     115           48 :             }
     116           48 :         })
     117           48 :     }
     118              : }
     119              : 
     120              : /// A FlushOp is just a oneshot channel, where we send the transmit side down
     121              : /// another channel, and the receive side will receive a message when the channel
     122              : /// we're flushing has reached the FlushOp we sent into it.
     123              : ///
     124              : /// The only extra behavior beyond the channel is that the notify() method does not
     125              : /// return an error when the receive side has been dropped, because in this use case
     126              : /// it is harmless (the code that initiated the flush no longer cares about the result).
     127              : #[derive(Debug)]
     128              : struct FlushOp {
     129              :     tx: tokio::sync::oneshot::Sender<()>,
     130              : }
     131              : 
     132              : impl FlushOp {
     133          252 :     fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
     134          252 :         let (tx, rx) = tokio::sync::oneshot::channel::<()>();
     135          252 :         (Self { tx }, rx)
     136          252 :     }
     137              : 
     138          264 :     fn notify(self) {
     139          264 :         if self.tx.send(()).is_err() {
     140              :             // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
     141            0 :             debug!("deletion queue flush from dropped client");
     142          264 :         };
     143          264 :     }
     144              : }
     145              : 
     146              : #[derive(Clone, Debug)]
     147              : pub struct DeletionQueueClient {
     148              :     tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
     149              :     executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
     150              : 
     151              :     lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
     152              : }
     153              : 
     154           72 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     155              : struct TenantDeletionList {
     156              :     /// For each Timeline, a list of key fragments to append to the timeline remote path
     157              :     /// when reconstructing a full key
     158              :     timelines: HashMap<TimelineId, Vec<String>>,
     159              : 
     160              :     /// The generation in which this deletion was emitted: note that this may not be the
     161              :     /// same as the generation of any layers being deleted.  The generation of the layer
     162              :     /// has already been absorbed into the keys in `objects`
     163              :     generation: Generation,
     164              : }
     165              : 
     166              : impl TenantDeletionList {
     167           60 :     pub(crate) fn len(&self) -> usize {
     168           60 :         self.timelines.values().map(|v| v.len()).sum()
     169           60 :     }
     170              : }
     171              : 
     172              : /// Files ending with this suffix will be ignored and erased
     173              : /// during recovery as startup.
     174              : const TEMP_SUFFIX: &str = "tmp";
     175              : 
     176          144 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     177              : struct DeletionList {
     178              :     /// Serialization version, for future use
     179              :     version: u8,
     180              : 
     181              :     /// Used for constructing a unique key for each deletion list we write out.
     182              :     sequence: u64,
     183              : 
     184              :     /// To avoid repeating tenant/timeline IDs in every key, we store keys in
     185              :     /// nested HashMaps by TenantTimelineID.  Each Tenant only appears once
     186              :     /// with one unique generation ID: if someone tries to push a second generation
     187              :     /// ID for the same tenant, we will start a new DeletionList.
     188              :     tenants: HashMap<TenantShardId, TenantDeletionList>,
     189              : 
     190              :     /// Avoid having to walk `tenants` to calculate the number of keys in
     191              :     /// the nested deletion lists
     192              :     size: usize,
     193              : 
     194              :     /// Set to true when the list has undergone validation with the control
     195              :     /// plane and the remaining contents of `tenants` are valid.  A list may
     196              :     /// also be implicitly marked valid by DeletionHeader.validated_sequence
     197              :     /// advancing to >= DeletionList.sequence
     198              :     #[serde(default)]
     199              :     #[serde(skip_serializing_if = "std::ops::Not::not")]
     200              :     validated: bool,
     201              : }
     202              : 
     203            0 : #[derive(Debug, Serialize, Deserialize)]
     204              : struct DeletionHeader {
     205              :     /// Serialization version, for future use
     206              :     version: u8,
     207              : 
     208              :     /// The highest sequence number (inclusive) that has been validated.  All deletion
     209              :     /// lists on disk with a sequence <= this value are safe to execute.
     210              :     validated_sequence: u64,
     211              : }
     212              : 
     213              : impl DeletionHeader {
     214              :     const VERSION_LATEST: u8 = 1;
     215              : 
     216           48 :     fn new(validated_sequence: u64) -> Self {
     217           48 :         Self {
     218           48 :             version: Self::VERSION_LATEST,
     219           48 :             validated_sequence,
     220           48 :         }
     221           48 :     }
     222              : 
     223           48 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     224           48 :         debug!("Saving deletion list header {:?}", self);
     225           48 :         let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
     226           48 :         let header_path = conf.deletion_header_path();
     227           48 :         let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
     228           48 :         VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
     229           48 :             .await
     230           48 :             .maybe_fatal_err("save deletion header")?;
     231              : 
     232           48 :         Ok(())
     233           48 :     }
     234              : }
     235              : 
     236              : impl DeletionList {
     237              :     const VERSION_LATEST: u8 = 1;
     238          120 :     fn new(sequence: u64) -> Self {
     239          120 :         Self {
     240          120 :             version: Self::VERSION_LATEST,
     241          120 :             sequence,
     242          120 :             tenants: HashMap::new(),
     243          120 :             size: 0,
     244          120 :             validated: false,
     245          120 :         }
     246          120 :     }
     247              : 
     248          156 :     fn is_empty(&self) -> bool {
     249          156 :         self.tenants.is_empty()
     250          156 :     }
     251              : 
     252          360 :     fn len(&self) -> usize {
     253          360 :         self.size
     254          360 :     }
     255              : 
     256              :     /// Returns true if the push was accepted, false if the caller must start a new
     257              :     /// deletion list.
     258           84 :     fn push(
     259           84 :         &mut self,
     260           84 :         tenant: &TenantShardId,
     261           84 :         timeline: &TimelineId,
     262           84 :         generation: Generation,
     263           84 :         objects: &mut Vec<RemotePath>,
     264           84 :     ) -> bool {
     265           84 :         if objects.is_empty() {
     266              :             // Avoid inserting an empty TimelineDeletionList: this preserves the property
     267              :             // that if we have no keys, then self.objects is empty (used in Self::is_empty)
     268            0 :             return true;
     269           84 :         }
     270           84 : 
     271           84 :         let tenant_entry = self
     272           84 :             .tenants
     273           84 :             .entry(*tenant)
     274           84 :             .or_insert_with(|| TenantDeletionList {
     275           72 :                 timelines: HashMap::new(),
     276           72 :                 generation,
     277           84 :             });
     278           84 : 
     279           84 :         if tenant_entry.generation != generation {
     280              :             // Only one generation per tenant per list: signal to
     281              :             // caller to start a new list.
     282           12 :             return false;
     283           72 :         }
     284           72 : 
     285           72 :         let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
     286           72 : 
     287           72 :         let timeline_remote_path = remote_timeline_path(tenant, timeline);
     288           72 : 
     289           72 :         self.size += objects.len();
     290           72 :         timeline_entry.extend(objects.drain(..).map(|p| {
     291           72 :             p.strip_prefix(&timeline_remote_path)
     292           72 :                 .expect("Timeline paths always start with the timeline prefix")
     293           72 :                 .to_string()
     294           72 :         }));
     295           72 :         true
     296           84 :     }
     297              : 
     298           60 :     fn into_remote_paths(self) -> Vec<RemotePath> {
     299           60 :         let mut result = Vec::new();
     300           60 :         for (tenant, tenant_deletions) in self.tenants.into_iter() {
     301           36 :             for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
     302           36 :                 let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
     303           36 :                 result.extend(
     304           36 :                     timeline_layers
     305           36 :                         .into_iter()
     306           36 :                         .map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
     307           36 :                 );
     308           36 :             }
     309              :         }
     310              : 
     311           60 :         result
     312           60 :     }
     313              : 
     314           84 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     315           84 :         let path = conf.deletion_list_path(self.sequence);
     316           84 :         let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
     317           84 : 
     318           84 :         let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
     319           84 : 
     320           84 :         VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
     321           84 :             .await
     322           84 :             .maybe_fatal_err("save deletion list")
     323           84 :             .map_err(Into::into)
     324           84 :     }
     325              : }
     326              : 
     327              : impl std::fmt::Display for DeletionList {
     328            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     329            0 :         write!(
     330            0 :             f,
     331            0 :             "DeletionList<seq={}, tenants={}, keys={}>",
     332            0 :             self.sequence,
     333            0 :             self.tenants.len(),
     334            0 :             self.size
     335            0 :         )
     336            0 :     }
     337              : }
     338              : 
     339              : struct PendingLsn {
     340              :     projected: Lsn,
     341              :     result_slot: Arc<AtomicLsn>,
     342              : }
     343              : 
     344              : struct TenantLsnState {
     345              :     timelines: HashMap<TimelineId, PendingLsn>,
     346              : 
     347              :     // In what generation was the most recent update proposed?
     348              :     generation: Generation,
     349              : }
     350              : 
     351              : #[derive(Default)]
     352              : struct VisibleLsnUpdates {
     353              :     tenants: HashMap<TenantShardId, TenantLsnState>,
     354              : }
     355              : 
     356              : impl VisibleLsnUpdates {
     357         1440 :     fn new() -> Self {
     358         1440 :         Self {
     359         1440 :             tenants: HashMap::new(),
     360         1440 :         }
     361         1440 :     }
     362              : }
     363              : 
     364              : impl std::fmt::Debug for VisibleLsnUpdates {
     365            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     366            0 :         write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
     367            0 :     }
     368              : }
     369              : 
     370              : #[derive(Error, Debug)]
     371              : pub enum DeletionQueueError {
     372              :     #[error("Deletion queue unavailable during shutdown")]
     373              :     ShuttingDown,
     374              : }
     375              : 
     376              : impl DeletionQueueClient {
     377              :     /// This is cancel-safe.  If you drop the future before it completes, the message
     378              :     /// is not pushed, although in the context of the deletion queue it doesn't matter: once
     379              :     /// we decide to do a deletion the decision is always final.
     380         2119 :     fn do_push<T>(
     381         2119 :         &self,
     382         2119 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     383         2119 :         msg: T,
     384         2119 :     ) -> Result<(), DeletionQueueError> {
     385         2119 :         match queue.send(msg) {
     386         2119 :             Ok(_) => Ok(()),
     387            0 :             Err(e) => {
     388            0 :                 // This shouldn't happen, we should shut down all tenants before
     389            0 :                 // we shut down the global delete queue.  If we encounter a bug like this,
     390            0 :                 // we may leak objects as deletions won't be processed.
     391            0 :                 error!("Deletion queue closed while pushing, shutting down? ({e})");
     392            0 :                 Err(DeletionQueueError::ShuttingDown)
     393              :             }
     394              :         }
     395         2119 :     }
     396              : 
     397           48 :     pub(crate) fn recover(
     398           48 :         &self,
     399           48 :         attached_tenants: HashMap<TenantShardId, Generation>,
     400           48 :     ) -> Result<(), DeletionQueueError> {
     401           48 :         self.do_push(
     402           48 :             &self.tx,
     403           48 :             ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
     404           48 :         )
     405           48 :     }
     406              : 
     407              :     /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
     408              :     /// world, it must validate its generation number before doing so.  Rather than do this synchronously,
     409              :     /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
     410              :     /// recently validated separately.
     411              :     ///
     412              :     /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates.  The
     413              :     /// backend will later wake up and notice that the tenant's generation requires validation.
     414         8905 :     pub(crate) async fn update_remote_consistent_lsn(
     415         8905 :         &self,
     416         8905 :         tenant_shard_id: TenantShardId,
     417         8905 :         timeline_id: TimelineId,
     418         8905 :         current_generation: Generation,
     419         8905 :         lsn: Lsn,
     420         8905 :         result_slot: Arc<AtomicLsn>,
     421         8905 :     ) {
     422         8905 :         let mut locked = self
     423         8905 :             .lsn_table
     424         8905 :             .write()
     425         8905 :             .expect("Lock should never be poisoned");
     426         8905 : 
     427         8905 :         let tenant_entry = locked
     428         8905 :             .tenants
     429         8905 :             .entry(tenant_shard_id)
     430         8905 :             .or_insert(TenantLsnState {
     431         8905 :                 timelines: HashMap::new(),
     432         8905 :                 generation: current_generation,
     433         8905 :             });
     434         8905 : 
     435         8905 :         if tenant_entry.generation != current_generation {
     436            0 :             // Generation might have changed if we were detached and then re-attached: in this case,
     437            0 :             // state from the previous generation cannot be trusted.
     438            0 :             tenant_entry.timelines.clear();
     439            0 :             tenant_entry.generation = current_generation;
     440         8905 :         }
     441              : 
     442         8905 :         tenant_entry.timelines.insert(
     443         8905 :             timeline_id,
     444         8905 :             PendingLsn {
     445         8905 :                 projected: lsn,
     446         8905 :                 result_slot,
     447         8905 :             },
     448         8905 :         );
     449         8905 :     }
     450              : 
     451              :     /// Submit a list of layers for deletion: this function will return before the deletion is
     452              :     /// persistent, but it may be executed at any time after this function enters: do not push
     453              :     /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
     454              :     /// references them).
     455              :     ///
     456              :     /// The `current_generation` is the generation of this pageserver's current attachment.  The
     457              :     /// generations in `layers` are the generations in which those layers were written.
     458         1927 :     pub(crate) fn push_layers(
     459         1927 :         &self,
     460         1927 :         tenant_shard_id: TenantShardId,
     461         1927 :         timeline_id: TimelineId,
     462         1927 :         current_generation: Generation,
     463         1927 :         layers: Vec<(LayerName, LayerFileMetadata)>,
     464         1927 :     ) -> Result<(), DeletionQueueError> {
     465         1927 :         // None generations are not valid for attached tenants: they must always be attached in
     466         1927 :         // a known generation.  None generations are still permitted for layers in the index because
     467         1927 :         // they may be historical.
     468         1927 :         assert!(!current_generation.is_none());
     469              : 
     470         1927 :         metrics::DELETION_QUEUE
     471         1927 :             .keys_submitted
     472         1927 :             .inc_by(layers.len() as u64);
     473         1927 :         self.do_push(
     474         1927 :             &self.tx,
     475         1927 :             ListWriterQueueMessage::Delete(DeletionOp {
     476         1927 :                 tenant_shard_id,
     477         1927 :                 timeline_id,
     478         1927 :                 layers,
     479         1927 :                 generation: current_generation,
     480         1927 :                 objects: Vec::new(),
     481         1927 :             }),
     482         1927 :         )
     483         1927 :     }
     484              : 
     485              :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     486          144 :     async fn do_flush<T>(
     487          144 :         &self,
     488          144 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     489          144 :         msg: T,
     490          144 :         rx: tokio::sync::oneshot::Receiver<()>,
     491          144 :     ) -> Result<(), DeletionQueueError> {
     492          144 :         self.do_push(queue, msg)?;
     493          144 :         if rx.await.is_err() {
     494              :             // This shouldn't happen if tenants are shut down before deletion queue.  If we
     495              :             // encounter a bug like this, then a flusher will incorrectly believe it has flushed
     496              :             // when it hasn't, possibly leading to leaking objects.
     497            0 :             error!("Deletion queue dropped flush op while client was still waiting");
     498            0 :             Err(DeletionQueueError::ShuttingDown)
     499              :         } else {
     500          144 :             Ok(())
     501              :         }
     502          144 :     }
     503              : 
     504              :     /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
     505              :     ///
     506              :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     507           84 :     pub async fn flush(&self) -> Result<(), DeletionQueueError> {
     508           84 :         let (flush_op, rx) = FlushOp::new();
     509           84 :         self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
     510           84 :             .await
     511           84 :     }
     512              : 
     513              :     /// Issue a flush without waiting for it to complete.  This is useful on advisory flushes where
     514              :     /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
     515              :     /// detach where flushing is nice but not necessary.
     516              :     ///
     517              :     /// This function provides no guarantees of work being done.
     518            0 :     pub fn flush_advisory(&self) {
     519            0 :         let (flush_op, _) = FlushOp::new();
     520            0 : 
     521            0 :         // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
     522            0 :         drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
     523            0 :     }
     524              : 
     525              :     // Wait until all previous deletions are executed
     526           60 :     pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
     527           60 :         debug!("flush_execute: flushing to deletion lists...");
     528              :         // Flush any buffered work to deletion lists
     529           60 :         self.flush().await?;
     530              : 
     531              :         // Flush the backend into the executor of deletion lists
     532           60 :         let (flush_op, rx) = FlushOp::new();
     533           60 :         debug!("flush_execute: flushing backend...");
     534           60 :         self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
     535           60 :             .await?;
     536           60 :         debug!("flush_execute: finished flushing backend...");
     537              : 
     538              :         // Flush any immediate-mode deletions (the above backend flush will only flush
     539              :         // the executor if deletions had flowed through the backend)
     540           60 :         debug!("flush_execute: flushing execution...");
     541           60 :         self.flush_immediate().await?;
     542           60 :         debug!("flush_execute: finished flushing execution...");
     543           60 :         Ok(())
     544           60 :     }
     545              : 
     546              :     /// This interface bypasses the persistent deletion queue, and any validation
     547              :     /// that this pageserver is still elegible to execute the deletions.  It is for
     548              :     /// use in timeline deletions, where the control plane is telling us we may
     549              :     /// delete everything in the timeline.
     550              :     ///
     551              :     /// DO NOT USE THIS FROM GC OR COMPACTION CODE.  Use the regular `push_layers`.
     552            0 :     pub(crate) async fn push_immediate(
     553            0 :         &self,
     554            0 :         objects: Vec<RemotePath>,
     555            0 :     ) -> Result<(), DeletionQueueError> {
     556            0 :         metrics::DELETION_QUEUE
     557            0 :             .keys_submitted
     558            0 :             .inc_by(objects.len() as u64);
     559            0 :         self.executor_tx
     560            0 :             .send(DeleterMessage::Delete(objects))
     561            0 :             .await
     562            0 :             .map_err(|_| DeletionQueueError::ShuttingDown)
     563            0 :     }
     564              : 
     565              :     /// Companion to push_immediate.  When this returns Ok, all prior objects sent
     566              :     /// into push_immediate have been deleted from remote storage.
     567           60 :     pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
     568           60 :         let (flush_op, rx) = FlushOp::new();
     569           60 :         self.executor_tx
     570           60 :             .send(DeleterMessage::Flush(flush_op))
     571           60 :             .await
     572           60 :             .map_err(|_| DeletionQueueError::ShuttingDown)?;
     573              : 
     574           60 :         rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
     575           60 :     }
     576              : }
     577              : 
     578              : impl DeletionQueue {
     579           48 :     pub fn new_client(&self) -> DeletionQueueClient {
     580           48 :         self.client.clone()
     581           48 :     }
     582              : 
     583              :     /// Caller may use the returned object to construct clients with new_client.
     584              :     /// Caller should tokio::spawn the background() members of the two worker objects returned:
     585              :     /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
     586           48 :     pub fn new<C>(
     587           48 :         remote_storage: GenericRemoteStorage,
     588           48 :         controller_upcall_client: Option<C>,
     589           48 :         conf: &'static PageServerConf,
     590           48 :     ) -> (Self, DeletionQueueWorkers<C>)
     591           48 :     where
     592           48 :         C: StorageControllerUpcallApi + Send + Sync,
     593           48 :     {
     594           48 :         // Unbounded channel: enables non-async functions to submit deletions.  The actual length is
     595           48 :         // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
     596           48 :         // enough to avoid this taking pathologically large amount of memory.
     597           48 :         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
     598           48 : 
     599           48 :         // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
     600           48 :         let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
     601           48 : 
     602           48 :         // Shallow channel: it carries lists of paths, and we expect the main queueing to
     603           48 :         // happen in the backend (persistent), not in this queue.
     604           48 :         let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
     605           48 : 
     606           48 :         let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
     607           48 : 
     608           48 :         // The deletion queue has an independent cancellation token to
     609           48 :         // the general pageserver shutdown token, because it stays alive a bit
     610           48 :         // longer to flush after Tenants have all been torn down.
     611           48 :         let cancel = CancellationToken::new();
     612           48 : 
     613           48 :         (
     614           48 :             Self {
     615           48 :                 client: DeletionQueueClient {
     616           48 :                     tx,
     617           48 :                     executor_tx: executor_tx.clone(),
     618           48 :                     lsn_table: lsn_table.clone(),
     619           48 :                 },
     620           48 :                 cancel: cancel.clone(),
     621           48 :             },
     622           48 :             DeletionQueueWorkers {
     623           48 :                 frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
     624           48 :                 backend: Validator::new(
     625           48 :                     conf,
     626           48 :                     backend_rx,
     627           48 :                     executor_tx,
     628           48 :                     controller_upcall_client,
     629           48 :                     lsn_table.clone(),
     630           48 :                     cancel.clone(),
     631           48 :                 ),
     632           48 :                 executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
     633           48 :             },
     634           48 :         )
     635           48 :     }
     636              : 
     637            0 :     pub async fn shutdown(&mut self, timeout: Duration) {
     638            0 :         match tokio::time::timeout(timeout, self.client.flush()).await {
     639              :             Ok(Ok(())) => {
     640            0 :                 tracing::info!("Deletion queue flushed successfully on shutdown")
     641              :             }
     642              :             Ok(Err(DeletionQueueError::ShuttingDown)) => {
     643              :                 // This is not harmful for correctness, but is unexpected: the deletion
     644              :                 // queue's workers should stay alive as long as there are any client handles instantiated.
     645            0 :                 tracing::warn!("Deletion queue stopped prematurely");
     646              :             }
     647            0 :             Err(_timeout) => {
     648            0 :                 tracing::warn!("Timed out flushing deletion queue on shutdown")
     649              :             }
     650              :         }
     651              : 
     652              :         // We only cancel _after_ flushing: otherwise we would be shutting down the
     653              :         // components that do the flush.
     654            0 :         self.cancel.cancel();
     655            0 :     }
     656              : }
     657              : 
     658              : #[cfg(test)]
     659              : mod test {
     660              :     use std::io::ErrorKind;
     661              :     use std::time::Duration;
     662              : 
     663              :     use camino::Utf8Path;
     664              :     use hex_literal::hex;
     665              :     use pageserver_api::key::Key;
     666              :     use pageserver_api::shard::ShardIndex;
     667              :     use pageserver_api::upcall_api::ReAttachResponseTenant;
     668              :     use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
     669              :     use tokio::task::JoinHandle;
     670              :     use tracing::info;
     671              : 
     672              :     use super::*;
     673              :     use crate::controller_upcall_client::RetryForeverError;
     674              :     use crate::tenant::harness::TenantHarness;
     675              :     use crate::tenant::storage_layer::DeltaLayerName;
     676              :     pub const TIMELINE_ID: TimelineId =
     677              :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
     678              : 
     679              :     pub const EXAMPLE_LAYER_NAME: LayerName = LayerName::Delta(DeltaLayerName {
     680              :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     681              :         lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
     682              :     });
     683              : 
     684              :     // When you need a second layer in a test.
     685              :     pub const EXAMPLE_LAYER_NAME_ALT: LayerName = LayerName::Delta(DeltaLayerName {
     686              :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     687              :         lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
     688              :     });
     689              : 
     690              :     struct TestSetup {
     691              :         harness: TenantHarness,
     692              :         remote_fs_dir: Utf8PathBuf,
     693              :         storage: GenericRemoteStorage,
     694              :         mock_control_plane: MockStorageController,
     695              :         deletion_queue: DeletionQueue,
     696              :         worker_join: JoinHandle<()>,
     697              :     }
     698              : 
     699              :     impl TestSetup {
     700              :         /// Simulate a pageserver restart by destroying and recreating the deletion queue
     701           12 :         async fn restart(&mut self) {
     702           12 :             let (deletion_queue, workers) = DeletionQueue::new(
     703           12 :                 self.storage.clone(),
     704           12 :                 Some(self.mock_control_plane.clone()),
     705           12 :                 self.harness.conf,
     706           12 :             );
     707           12 : 
     708           12 :             tracing::debug!("Spawning worker for new queue queue");
     709           12 :             let worker_join = workers.spawn_with(&tokio::runtime::Handle::current());
     710           12 : 
     711           12 :             let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
     712           12 :             let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
     713           12 : 
     714           12 :             tracing::debug!("Joining worker from previous queue");
     715           12 :             old_deletion_queue.cancel.cancel();
     716           12 :             old_worker_join
     717           12 :                 .await
     718           12 :                 .expect("Failed to join workers for previous deletion queue");
     719           12 :         }
     720              : 
     721           36 :         fn set_latest_generation(&self, gen_: Generation) {
     722           36 :             let tenant_shard_id = self.harness.tenant_shard_id;
     723           36 :             self.mock_control_plane
     724           36 :                 .latest_generation
     725           36 :                 .lock()
     726           36 :                 .unwrap()
     727           36 :                 .insert(tenant_shard_id, gen_);
     728           36 :         }
     729              : 
     730              :         /// Returns remote layer file name, suitable for use in assert_remote_files
     731           36 :         fn write_remote_layer(
     732           36 :             &self,
     733           36 :             file_name: LayerName,
     734           36 :             gen_: Generation,
     735           36 :         ) -> anyhow::Result<String> {
     736           36 :             let tenant_shard_id = self.harness.tenant_shard_id;
     737           36 :             let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     738           36 :             let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
     739           36 :             std::fs::create_dir_all(&remote_timeline_path)?;
     740           36 :             let remote_layer_file_name = format!("{}{}", file_name, gen_.get_suffix());
     741           36 : 
     742           36 :             let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
     743           36 : 
     744           36 :             std::fs::write(
     745           36 :                 remote_timeline_path.join(remote_layer_file_name.clone()),
     746           36 :                 content,
     747           36 :             )?;
     748              : 
     749           36 :             Ok(remote_layer_file_name)
     750           36 :         }
     751              :     }
     752              : 
     753              :     #[derive(Debug, Clone)]
     754              :     struct MockStorageController {
     755              :         pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantShardId, Generation>>>,
     756              :     }
     757              : 
     758              :     impl MockStorageController {
     759           36 :         fn new() -> Self {
     760           36 :             Self {
     761           36 :                 latest_generation: Arc::default(),
     762           36 :             }
     763           36 :         }
     764              :     }
     765              : 
     766              :     impl StorageControllerUpcallApi for MockStorageController {
     767            0 :         async fn re_attach(
     768            0 :             &self,
     769            0 :             _conf: &PageServerConf,
     770            0 :         ) -> Result<HashMap<TenantShardId, ReAttachResponseTenant>, RetryForeverError> {
     771            0 :             unimplemented!()
     772              :         }
     773              : 
     774           48 :         async fn validate(
     775           48 :             &self,
     776           48 :             tenants: Vec<(TenantShardId, Generation)>,
     777           48 :         ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     778           48 :             let mut result = HashMap::new();
     779           48 : 
     780           48 :             let latest_generation = self.latest_generation.lock().unwrap();
     781              : 
     782           96 :             for (tenant_shard_id, generation) in tenants {
     783           48 :                 if let Some(latest) = latest_generation.get(&tenant_shard_id) {
     784           48 :                     result.insert(tenant_shard_id, *latest == generation);
     785           48 :                 }
     786              :             }
     787              : 
     788           48 :             Ok(result)
     789           48 :         }
     790              : 
     791            0 :         async fn put_timeline_import_status(
     792            0 :             &self,
     793            0 :             _tenant_shard_id: TenantShardId,
     794            0 :             _timeline_id: TimelineId,
     795            0 :             _status: pageserver_api::models::ShardImportStatus,
     796            0 :         ) -> Result<(), RetryForeverError> {
     797            0 :             unimplemented!()
     798              :         }
     799              :     }
     800              : 
     801           36 :     async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
     802           36 :         let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
     803           36 :         let harness = TenantHarness::create(test_name).await?;
     804              : 
     805              :         // We do not load() the harness: we only need its config and remote_storage
     806              : 
     807              :         // Set up a GenericRemoteStorage targetting a directory
     808           36 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs");
     809           36 :         std::fs::create_dir_all(remote_fs_dir)?;
     810           36 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
     811           36 :         let storage_config = RemoteStorageConfig {
     812           36 :             storage: RemoteStorageKind::LocalFs {
     813           36 :                 local_path: remote_fs_dir.clone(),
     814           36 :             },
     815           36 :             timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     816           36 :             small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
     817           36 :         };
     818           36 :         let storage = GenericRemoteStorage::from_config(&storage_config)
     819           36 :             .await
     820           36 :             .unwrap();
     821           36 : 
     822           36 :         let mock_control_plane = MockStorageController::new();
     823           36 : 
     824           36 :         let (deletion_queue, worker) = DeletionQueue::new(
     825           36 :             storage.clone(),
     826           36 :             Some(mock_control_plane.clone()),
     827           36 :             harness.conf,
     828           36 :         );
     829           36 : 
     830           36 :         let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
     831           36 : 
     832           36 :         Ok(TestSetup {
     833           36 :             harness,
     834           36 :             remote_fs_dir,
     835           36 :             storage,
     836           36 :             mock_control_plane,
     837           36 :             deletion_queue,
     838           36 :             worker_join,
     839           36 :         })
     840           36 :     }
     841              : 
     842              :     // TODO: put this in a common location so that we can share with remote_timeline_client's tests
     843          108 :     fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
     844          108 :         let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
     845          108 :         expected.sort();
     846          108 : 
     847          108 :         let mut found: Vec<String> = Vec::new();
     848          108 :         let dir = match std::fs::read_dir(remote_path) {
     849          108 :             Ok(d) => d,
     850            0 :             Err(e) => {
     851            0 :                 if e.kind() == ErrorKind::NotFound {
     852            0 :                     if expected.is_empty() {
     853              :                         // We are asserting prefix is empty: it is expected that the dir is missing
     854            0 :                         return;
     855              :                     } else {
     856            0 :                         assert_eq!(expected, Vec::<String>::new());
     857            0 :                         unreachable!();
     858              :                     }
     859              :                 } else {
     860            0 :                     panic!("Unexpected error listing {remote_path}: {e}");
     861              :                 }
     862              :             }
     863              :         };
     864              : 
     865          108 :         for entry in dir.flatten() {
     866           96 :             let entry_name = entry.file_name();
     867           96 :             let fname = entry_name.to_str().unwrap();
     868           96 :             found.push(String::from(fname));
     869           96 :         }
     870          108 :         found.sort();
     871          108 : 
     872          108 :         assert_eq!(expected, found);
     873          108 :     }
     874              : 
     875           60 :     fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
     876           60 :         let dir = match std::fs::read_dir(directory) {
     877           48 :             Ok(d) => d,
     878              :             Err(_) => {
     879           12 :                 assert_eq!(expected, &Vec::<String>::new());
     880           12 :                 return;
     881              :             }
     882              :         };
     883           48 :         let mut found = Vec::new();
     884          108 :         for dentry in dir {
     885           60 :             let dentry = dentry.unwrap();
     886           60 :             let file_name = dentry.file_name();
     887           60 :             let file_name_str = file_name.to_string_lossy();
     888           60 :             found.push(file_name_str.to_string());
     889           60 :         }
     890           48 :         found.sort();
     891           48 :         assert_eq!(expected, found);
     892           60 :     }
     893              : 
     894              :     #[tokio::test]
     895           12 :     async fn deletion_queue_smoke() -> anyhow::Result<()> {
     896           12 :         // Basic test that the deletion queue processes the deletions we pass into it
     897           12 :         let ctx = setup("deletion_queue_smoke")
     898           12 :             .await
     899           12 :             .expect("Failed test setup");
     900           12 :         let client = ctx.deletion_queue.new_client();
     901           12 :         client.recover(HashMap::new())?;
     902           12 : 
     903           12 :         let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
     904           12 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
     905           12 : 
     906           12 :         let content: Vec<u8> = "victim1 contents".into();
     907           12 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     908           12 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
     909           12 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
     910           12 : 
     911           12 :         // Exercise the distinction between the generation of the layers
     912           12 :         // we delete, and the generation of the running Tenant.
     913           12 :         let layer_generation = Generation::new(0xdeadbeef);
     914           12 :         let now_generation = Generation::new(0xfeedbeef);
     915           12 :         let layer_metadata =
     916           12 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
     917           12 : 
     918           12 :         let remote_layer_file_name_1 =
     919           12 :             format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
     920           12 : 
     921           12 :         // Set mock control plane state to valid for our generation
     922           12 :         ctx.set_latest_generation(now_generation);
     923           12 : 
     924           12 :         // Inject a victim file to remote storage
     925           12 :         info!("Writing");
     926           12 :         std::fs::create_dir_all(&remote_timeline_path)?;
     927           12 :         std::fs::write(
     928           12 :             remote_timeline_path.join(remote_layer_file_name_1.clone()),
     929           12 :             content,
     930           12 :         )?;
     931           12 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     932           12 : 
     933           12 :         // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
     934           12 :         info!("Pushing");
     935           12 :         client.push_layers(
     936           12 :             tenant_shard_id,
     937           12 :             TIMELINE_ID,
     938           12 :             now_generation,
     939           12 :             [(layer_file_name_1.clone(), layer_metadata)].to_vec(),
     940           12 :         )?;
     941           12 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     942           12 : 
     943           12 :         assert_local_files(&[], &deletion_prefix);
     944           12 : 
     945           12 :         // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
     946           12 :         info!("Flushing");
     947           12 :         client.flush().await?;
     948           12 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     949           12 :         assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
     950           12 : 
     951           12 :         // File should go away when we execute
     952           12 :         info!("Flush-executing");
     953           12 :         client.flush_execute().await?;
     954           12 :         assert_remote_files(&[], &remote_timeline_path);
     955           12 :         assert_local_files(&["header-01"], &deletion_prefix);
     956           12 : 
     957           12 :         // Flushing on an empty queue should succeed immediately, and not write any lists
     958           12 :         info!("Flush-executing on empty");
     959           12 :         client.flush_execute().await?;
     960           12 :         assert_local_files(&["header-01"], &deletion_prefix);
     961           12 : 
     962           12 :         Ok(())
     963           12 :     }
     964              : 
     965              :     #[tokio::test]
     966           12 :     async fn deletion_queue_validation() -> anyhow::Result<()> {
     967           12 :         let ctx = setup("deletion_queue_validation")
     968           12 :             .await
     969           12 :             .expect("Failed test setup");
     970           12 :         let client = ctx.deletion_queue.new_client();
     971           12 :         client.recover(HashMap::new())?;
     972           12 : 
     973           12 :         // Generation that the control plane thinks is current
     974           12 :         let latest_generation = Generation::new(0xdeadbeef);
     975           12 :         // Generation that our DeletionQueue thinks the tenant is running with
     976           12 :         let stale_generation = latest_generation.previous();
     977           12 :         // Generation that our example layer file was written with
     978           12 :         let layer_generation = stale_generation.previous();
     979           12 :         let layer_metadata =
     980           12 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
     981           12 : 
     982           12 :         ctx.set_latest_generation(latest_generation);
     983           12 : 
     984           12 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
     985           12 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     986           12 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
     987           12 : 
     988           12 :         // Initial state: a remote layer exists
     989           12 :         let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
     990           12 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
     991           12 : 
     992           12 :         tracing::debug!("Pushing...");
     993           12 :         client.push_layers(
     994           12 :             tenant_shard_id,
     995           12 :             TIMELINE_ID,
     996           12 :             stale_generation,
     997           12 :             [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
     998           12 :         )?;
     999           12 : 
    1000           12 :         // We enqueued the operation in a stale generation: it should have failed validation
    1001           12 :         tracing::debug!("Flushing...");
    1002           12 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1003           12 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
    1004           12 : 
    1005           12 :         tracing::debug!("Pushing...");
    1006           12 :         client.push_layers(
    1007           12 :             tenant_shard_id,
    1008           12 :             TIMELINE_ID,
    1009           12 :             latest_generation,
    1010           12 :             [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1011           12 :         )?;
    1012           12 : 
    1013           12 :         // We enqueued the operation in a fresh generation: it should have passed validation
    1014           12 :         tracing::debug!("Flushing...");
    1015           12 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1016           12 :         assert_remote_files(&[], &remote_timeline_path);
    1017           12 : 
    1018           12 :         Ok(())
    1019           12 :     }
    1020              : 
    1021              :     #[tokio::test]
    1022           12 :     async fn deletion_queue_recovery() -> anyhow::Result<()> {
    1023           12 :         // Basic test that the deletion queue processes the deletions we pass into it
    1024           12 :         let mut ctx = setup("deletion_queue_recovery")
    1025           12 :             .await
    1026           12 :             .expect("Failed test setup");
    1027           12 :         let client = ctx.deletion_queue.new_client();
    1028           12 :         client.recover(HashMap::new())?;
    1029           12 : 
    1030           12 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
    1031           12 : 
    1032           12 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
    1033           12 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
    1034           12 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
    1035           12 : 
    1036           12 :         let layer_generation = Generation::new(0xdeadbeef);
    1037           12 :         let now_generation = Generation::new(0xfeedbeef);
    1038           12 :         let layer_metadata =
    1039           12 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
    1040           12 : 
    1041           12 :         // Inject a deletion in the generation before generation_now: after restart,
    1042           12 :         // this deletion should _not_ get executed (only the immediately previous
    1043           12 :         // generation gets that treatment)
    1044           12 :         let remote_layer_file_name_historical =
    1045           12 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
    1046           12 :         client.push_layers(
    1047           12 :             tenant_shard_id,
    1048           12 :             TIMELINE_ID,
    1049           12 :             now_generation.previous(),
    1050           12 :             [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1051           12 :         )?;
    1052           12 : 
    1053           12 :         // Inject a deletion in the generation before generation_now: after restart,
    1054           12 :         // this deletion should get executed, because we execute deletions in the
    1055           12 :         // immediately previous generation on the same node.
    1056           12 :         let remote_layer_file_name_previous =
    1057           12 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
    1058           12 :         client.push_layers(
    1059           12 :             tenant_shard_id,
    1060           12 :             TIMELINE_ID,
    1061           12 :             now_generation,
    1062           12 :             [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
    1063           12 :         )?;
    1064           12 : 
    1065           12 :         client.flush().await?;
    1066           12 :         assert_remote_files(
    1067           12 :             &[
    1068           12 :                 &remote_layer_file_name_historical,
    1069           12 :                 &remote_layer_file_name_previous,
    1070           12 :             ],
    1071           12 :             &remote_timeline_path,
    1072           12 :         );
    1073           12 : 
    1074           12 :         // Different generatinos for the same tenant will cause two separate
    1075           12 :         // deletion lists to be emitted.
    1076           12 :         assert_local_files(
    1077           12 :             &["0000000000000001-01.list", "0000000000000002-01.list"],
    1078           12 :             &deletion_prefix,
    1079           12 :         );
    1080           12 : 
    1081           12 :         // Simulate a node restart: the latest generation advances
    1082           12 :         let now_generation = now_generation.next();
    1083           12 :         ctx.set_latest_generation(now_generation);
    1084           12 : 
    1085           12 :         // Restart the deletion queue
    1086           12 :         drop(client);
    1087           12 :         ctx.restart().await;
    1088           12 :         let client = ctx.deletion_queue.new_client();
    1089           12 :         client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
    1090           12 : 
    1091           12 :         info!("Flush-executing");
    1092           12 :         client.flush_execute().await?;
    1093           12 :         // The deletion from immediately prior generation was executed, the one from
    1094           12 :         // an older generation was not.
    1095           12 :         assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
    1096           12 :         Ok(())
    1097           12 :     }
    1098              : }
    1099              : 
    1100              : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
    1101              : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
    1102              : #[cfg(test)]
    1103              : pub(crate) mod mock {
    1104              :     use std::sync::atomic::{AtomicUsize, Ordering};
    1105              : 
    1106              :     use tracing::info;
    1107              : 
    1108              :     use super::*;
    1109              :     use crate::tenant::remote_timeline_client::remote_layer_path;
    1110              : 
    1111              :     pub struct ConsumerState {
    1112              :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
    1113              :         executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
    1114              :         cancel: CancellationToken,
    1115              :         executed: Arc<AtomicUsize>,
    1116              :     }
    1117              : 
    1118              :     impl ConsumerState {
    1119         1392 :         async fn consume(&mut self, remote_storage: &GenericRemoteStorage) {
    1120         1392 :             info!("Executing all pending deletions");
    1121              : 
    1122              :             // Transform all executor messages to generic frontend messages
    1123         3114 :             loop {
    1124         3114 :                 use either::Either;
    1125         3114 :                 let msg = tokio::select! {
    1126         3114 :                     left = self.executor_rx.recv() => Either::Left(left),
    1127         3114 :                     right = self.rx.recv() => Either::Right(right),
    1128              :                 };
    1129           12 :                 match msg {
    1130            0 :                     Either::Left(None) => break,
    1131            0 :                     Either::Right(None) => break,
    1132            0 :                     Either::Left(Some(DeleterMessage::Delete(objects))) => {
    1133            0 :                         for path in objects {
    1134            0 :                             match remote_storage.delete(&path, &self.cancel).await {
    1135              :                                 Ok(_) => {
    1136            0 :                                     debug!("Deleted {path}");
    1137              :                                 }
    1138            0 :                                 Err(e) => {
    1139            0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1140              :                                 }
    1141              :                             }
    1142            0 :                             self.executed.fetch_add(1, Ordering::Relaxed);
    1143              :                         }
    1144              :                     }
    1145           12 :                     Either::Left(Some(DeleterMessage::Flush(flush_op))) => {
    1146           12 :                         flush_op.notify();
    1147           12 :                     }
    1148         1736 :                     Either::Right(Some(ListWriterQueueMessage::Delete(op))) => {
    1149         1736 :                         let mut objects = op.objects;
    1150         3472 :                         for (layer, meta) in op.layers {
    1151         1736 :                             objects.push(remote_layer_path(
    1152         1736 :                                 &op.tenant_shard_id.tenant_id,
    1153         1736 :                                 &op.timeline_id,
    1154         1736 :                                 meta.shard,
    1155         1736 :                                 &layer,
    1156         1736 :                                 meta.generation,
    1157         1736 :                             ));
    1158         1736 :                         }
    1159              : 
    1160         3446 :                         for path in objects {
    1161         1736 :                             info!("Executing deletion {path}");
    1162         1736 :                             match remote_storage.delete(&path, &self.cancel).await {
    1163              :                                 Ok(_) => {
    1164         1710 :                                     debug!("Deleted {path}");
    1165              :                                 }
    1166            0 :                                 Err(e) => {
    1167            0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1168              :                                 }
    1169              :                             }
    1170         1710 :                             self.executed.fetch_add(1, Ordering::Relaxed);
    1171              :                         }
    1172              :                     }
    1173            0 :                     Either::Right(Some(ListWriterQueueMessage::Flush(op))) => {
    1174            0 :                         op.notify();
    1175            0 :                     }
    1176            0 :                     Either::Right(Some(ListWriterQueueMessage::FlushExecute(op))) => {
    1177            0 :                         // We have already executed all prior deletions because mock does them inline
    1178            0 :                         op.notify();
    1179            0 :                     }
    1180            0 :                     Either::Right(Some(ListWriterQueueMessage::Recover(_))) => {
    1181            0 :                         // no-op in mock
    1182            0 :                     }
    1183              :                 }
    1184              :             }
    1185            0 :         }
    1186              :     }
    1187              : 
    1188              :     pub struct MockDeletionQueue {
    1189              :         tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
    1190              :         executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
    1191              :         lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
    1192              :     }
    1193              : 
    1194              :     impl MockDeletionQueue {
    1195         1392 :         pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
    1196         1392 :             let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    1197         1392 :             let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
    1198         1392 : 
    1199         1392 :             let executed = Arc::new(AtomicUsize::new(0));
    1200         1392 : 
    1201         1392 :             let mut consumer = ConsumerState {
    1202         1392 :                 rx,
    1203         1392 :                 executor_rx,
    1204         1392 :                 cancel: CancellationToken::new(),
    1205         1392 :                 executed: executed.clone(),
    1206         1392 :             };
    1207         1392 : 
    1208         1392 :             tokio::spawn(async move {
    1209         1392 :                 if let Some(remote_storage) = &remote_storage {
    1210         1392 :                     consumer.consume(remote_storage).await;
    1211            0 :                 }
    1212         1392 :             });
    1213         1392 : 
    1214         1392 :             Self {
    1215         1392 :                 tx,
    1216         1392 :                 executor_tx,
    1217         1392 :                 lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
    1218         1392 :             }
    1219         1392 :         }
    1220              : 
    1221              :         #[allow(clippy::await_holding_lock)]
    1222           12 :         pub async fn pump(&self) {
    1223           12 :             let (tx, rx) = tokio::sync::oneshot::channel();
    1224           12 :             self.executor_tx
    1225           12 :                 .send(DeleterMessage::Flush(FlushOp { tx }))
    1226           12 :                 .await
    1227           12 :                 .expect("Failed to send flush message");
    1228           12 :             rx.await.ok();
    1229           12 :         }
    1230              : 
    1231         1452 :         pub(crate) fn new_client(&self) -> DeletionQueueClient {
    1232         1452 :             DeletionQueueClient {
    1233         1452 :                 tx: self.tx.clone(),
    1234         1452 :                 executor_tx: self.executor_tx.clone(),
    1235         1452 :                 lsn_table: self.lsn_table.clone(),
    1236         1452 :             }
    1237         1452 :         }
    1238              :     }
    1239              : 
    1240              :     /// Test round-trip serialization/deserialization, and test stability of the format
    1241              :     /// vs. a static expected string for the serialized version.
    1242              :     #[test]
    1243           12 :     fn deletion_list_serialization() -> anyhow::Result<()> {
    1244           12 :         let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
    1245           12 :             .to_string()
    1246           12 :             .parse::<TenantShardId>()?;
    1247           12 :         let timeline_id = "be322c834ed9e709e63b5c9698691910"
    1248           12 :             .to_string()
    1249           12 :             .parse::<TimelineId>()?;
    1250           12 :         let generation = Generation::new(123);
    1251              : 
    1252           12 :         let object =
    1253           12 :             RemotePath::from_string(&format!("tenants/{tenant_id}/timelines/{timeline_id}/foo"))?;
    1254           12 :         let mut objects = [object].to_vec();
    1255           12 : 
    1256           12 :         let mut example = DeletionList::new(1);
    1257           12 :         example.push(&tenant_id, &timeline_id, generation, &mut objects);
    1258              : 
    1259           12 :         let encoded = serde_json::to_string(&example)?;
    1260              : 
    1261           12 :         let expected = "{\"version\":1,\"sequence\":1,\"tenants\":{\"ad6c1a56f5680419d3a16ff55d97ec3c\":{\"timelines\":{\"be322c834ed9e709e63b5c9698691910\":[\"foo\"]},\"generation\":123}},\"size\":1}".to_string();
    1262           12 :         assert_eq!(encoded, expected);
    1263              : 
    1264           12 :         let decoded = serde_json::from_str::<DeletionList>(&encoded)?;
    1265           12 :         assert_eq!(example, decoded);
    1266              : 
    1267           12 :         Ok(())
    1268           12 :     }
    1269              : }
        

Generated by: LCOV version 2.1-beta