LCOV - code coverage report
Current view: top level - pageserver/src - deletion_queue.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 89.3 % 842 752
Test Date: 2024-02-14 18:05:35 Functions: 58.9 % 192 113

            Line data    Source code
       1              : mod deleter;
       2              : mod list_writer;
       3              : mod validator;
       4              : 
       5              : use std::collections::HashMap;
       6              : use std::sync::Arc;
       7              : use std::time::Duration;
       8              : 
       9              : use crate::control_plane_client::ControlPlaneGenerationsApi;
      10              : use crate::metrics;
      11              : use crate::tenant::remote_timeline_client::remote_layer_path;
      12              : use crate::tenant::remote_timeline_client::remote_timeline_path;
      13              : use crate::tenant::remote_timeline_client::LayerFileMetadata;
      14              : use crate::virtual_file::MaybeFatalIo;
      15              : use crate::virtual_file::VirtualFile;
      16              : use anyhow::Context;
      17              : use camino::Utf8PathBuf;
      18              : use pageserver_api::shard::TenantShardId;
      19              : use remote_storage::{GenericRemoteStorage, RemotePath};
      20              : use serde::Deserialize;
      21              : use serde::Serialize;
      22              : use thiserror::Error;
      23              : use tokio;
      24              : use tokio_util::sync::CancellationToken;
      25              : use tracing::Instrument;
      26              : use tracing::{self, debug, error};
      27              : use utils::crashsafe::path_with_suffix_extension;
      28              : use utils::generation::Generation;
      29              : use utils::id::TimelineId;
      30              : use utils::lsn::AtomicLsn;
      31              : use utils::lsn::Lsn;
      32              : 
      33              : use self::deleter::Deleter;
      34              : use self::list_writer::DeletionOp;
      35              : use self::list_writer::ListWriter;
      36              : use self::list_writer::RecoverOp;
      37              : use self::validator::Validator;
      38              : use deleter::DeleterMessage;
      39              : use list_writer::ListWriterQueueMessage;
      40              : use validator::ValidatorQueueMessage;
      41              : 
      42              : use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
      43              : 
      44              : // TODO: configurable for how long to wait before executing deletions
      45              : 
      46              : /// We aggregate object deletions from many tenants in one place, for several reasons:
      47              : /// - Coalesce deletions into fewer DeleteObjects calls
      48              : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
      49              : ///   to flush any outstanding deletions.
      50              : /// - Globally control throughput of deletions, as these are a low priority task: do
      51              : ///   not compete with the same S3 clients/connections used for higher priority uploads.
      52              : /// - Enable gating deletions on validation of a tenant's generation number, to make
      53              : ///   it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
      54              : ///
      55              : /// There are two kinds of deletion: deferred and immediate.  A deferred deletion
      56              : /// may be intentionally delayed to protect passive readers of S3 data, and is
      57              : /// subject to a generation number validation step.  An immediate deletion is
      58              : /// ready to execute immediately, and is only queued up so that it can be coalesced
      59              : /// with other deletions in flight.
      60              : ///
      61              : /// Deferred deletions pass through three steps:
      62              : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
      63              : ///   DeletionLists, which are persisted to disk.
      64              : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
      65              : ///   the keys in the list onward for actual deletion.  Also validate remote_consistent_lsn
      66              : ///   updates for running timelines.
      67              : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
      68              : ///   batches of 1000 keys via DeleteObjects.
      69              : ///
      70              : /// Non-deferred deletions, such as during timeline deletion, bypass the first
      71              : /// two stages and are passed straight into the Deleter.
      72              : ///
      73              : /// Internally, each stage is joined by a channel to the next.  On disk, there is only
      74              : /// one queue (of DeletionLists), which is written by the frontend and consumed
      75              : /// by the backend.
      76          183 : #[derive(Clone)]
      77              : pub struct DeletionQueue {
      78              :     client: DeletionQueueClient,
      79              : 
      80              :     // Parent cancellation token for the tokens passed into background workers
      81              :     cancel: CancellationToken,
      82              : }
      83              : 
      84              : /// Opaque wrapper around individual worker tasks, to avoid making the
      85              : /// worker objects themselves public
      86              : pub struct DeletionQueueWorkers<C>
      87              : where
      88              :     C: ControlPlaneGenerationsApi + Send + Sync,
      89              : {
      90              :     frontend: ListWriter,
      91              :     backend: Validator<C>,
      92              :     executor: Deleter,
      93              : }
      94              : 
      95              : impl<C> DeletionQueueWorkers<C>
      96              : where
      97              :     C: ControlPlaneGenerationsApi + Send + Sync + 'static,
      98              : {
      99          633 :     pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
     100          633 :         let jh_frontend = runtime.spawn(async move {
     101          633 :             self.frontend
     102          633 :                 .background()
     103          633 :                 .instrument(tracing::info_span!(parent:None, "deletion frontend"))
     104         3970 :                 .await
     105          633 :         });
     106          633 :         let jh_backend = runtime.spawn(async move {
     107          633 :             self.backend
     108          633 :                 .background()
     109          633 :                 .instrument(tracing::info_span!(parent:None, "deletion backend"))
     110         2035 :                 .await
     111          633 :         });
     112          633 :         let jh_executor = runtime.spawn(async move {
     113          633 :             self.executor
     114          633 :                 .background()
     115          633 :                 .instrument(tracing::info_span!(parent:None, "deletion executor"))
     116        11233 :                 .await
     117          633 :         });
     118          633 : 
     119          633 :         runtime.spawn({
     120          633 :             async move {
     121          633 :                 jh_frontend.await.expect("error joining frontend worker");
     122            3 :                 jh_backend.await.expect("error joining backend worker");
     123            3 :                 drop(jh_executor.await.expect("error joining executor worker"));
     124          633 :             }
     125          633 :         })
     126          633 :     }
     127              : }
     128              : 
     129              : /// A FlushOp is just a oneshot channel, where we send the transmit side down
     130              : /// another channel, and the receive side will receive a message when the channel
     131              : /// we're flushing has reached the FlushOp we sent into it.
     132              : ///
     133              : /// The only extra behavior beyond the channel is that the notify() method does not
     134              : /// return an error when the receive side has been dropped, because in this use case
     135              : /// it is harmless (the code that initiated the flush no longer cares about the result).
     136            0 : #[derive(Debug)]
     137              : struct FlushOp {
     138              :     tx: tokio::sync::oneshot::Sender<()>,
     139              : }
     140              : 
     141              : impl FlushOp {
     142          909 :     fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
     143          909 :         let (tx, rx) = tokio::sync::oneshot::channel::<()>();
     144          909 :         (Self { tx }, rx)
     145          909 :     }
     146              : 
     147          907 :     fn notify(self) {
     148          907 :         if self.tx.send(()).is_err() {
     149              :             // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
     150          163 :             debug!("deletion queue flush from dropped client");
     151          744 :         };
     152          907 :     }
     153              : }
     154              : 
     155         5513 : #[derive(Clone, Debug)]
     156              : pub struct DeletionQueueClient {
     157              :     tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
     158              :     executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
     159              : 
     160              :     lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
     161              : }
     162              : 
     163          180 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     164              : struct TenantDeletionList {
     165              :     /// For each Timeline, a list of key fragments to append to the timeline remote path
     166              :     /// when reconstructing a full key
     167              :     timelines: HashMap<TimelineId, Vec<String>>,
     168              : 
     169              :     /// The generation in which this deletion was emitted: note that this may not be the
     170              :     /// same as the generation of any layers being deleted.  The generation of the layer
     171              :     /// has already been absorbed into the keys in `objects`
     172              :     generation: Generation,
     173              : }
     174              : 
     175              : impl TenantDeletionList {
     176           40 :     pub(crate) fn len(&self) -> usize {
     177           54 :         self.timelines.values().map(|v| v.len()).sum()
     178           40 :     }
     179              : }
     180              : 
     181              : /// Files ending with this suffix will be ignored and erased
     182              : /// during recovery as startup.
     183              : const TEMP_SUFFIX: &str = "tmp";
     184              : 
     185          297 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     186              : struct DeletionList {
     187              :     /// Serialization version, for future use
     188              :     version: u8,
     189              : 
     190              :     /// Used for constructing a unique key for each deletion list we write out.
     191              :     sequence: u64,
     192              : 
     193              :     /// To avoid repeating tenant/timeline IDs in every key, we store keys in
     194              :     /// nested HashMaps by TenantTimelineID.  Each Tenant only appears once
     195              :     /// with one unique generation ID: if someone tries to push a second generation
     196              :     /// ID for the same tenant, we will start a new DeletionList.
     197              :     tenants: HashMap<TenantShardId, TenantDeletionList>,
     198              : 
     199              :     /// Avoid having to walk `tenants` to calculate the number of keys in
     200              :     /// the nested deletion lists
     201              :     size: usize,
     202              : 
     203              :     /// Set to true when the list has undergone validation with the control
     204              :     /// plane and the remaining contents of `tenants` are valid.  A list may
     205              :     /// also be implicitly marked valid by DeletionHeader.validated_sequence
     206              :     /// advancing to >= DeletionList.sequence
     207              :     #[serde(default)]
     208              :     #[serde(skip_serializing_if = "std::ops::Not::not")]
     209              :     validated: bool,
     210              : }
     211              : 
     212           90 : #[derive(Debug, Serialize, Deserialize)]
     213              : struct DeletionHeader {
     214              :     /// Serialization version, for future use
     215              :     version: u8,
     216              : 
     217              :     /// The highest sequence number (inclusive) that has been validated.  All deletion
     218              :     /// lists on disk with a sequence <= this value are safe to execute.
     219              :     validated_sequence: u64,
     220              : }
     221              : 
     222              : impl DeletionHeader {
     223              :     const VERSION_LATEST: u8 = 1;
     224              : 
     225           38 :     fn new(validated_sequence: u64) -> Self {
     226           38 :         Self {
     227           38 :             version: Self::VERSION_LATEST,
     228           38 :             validated_sequence,
     229           38 :         }
     230           38 :     }
     231              : 
     232           38 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     233            0 :         debug!("Saving deletion list header {:?}", self);
     234           38 :         let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
     235           38 :         let header_path = conf.deletion_header_path();
     236           38 :         let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
     237           38 :         VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes)
     238           38 :             .await
     239           38 :             .maybe_fatal_err("save deletion header")?;
     240              : 
     241           38 :         Ok(())
     242           38 :     }
     243              : }
     244              : 
     245              : impl DeletionList {
     246              :     const VERSION_LATEST: u8 = 1;
     247          690 :     fn new(sequence: u64) -> Self {
     248          690 :         Self {
     249          690 :             version: Self::VERSION_LATEST,
     250          690 :             sequence,
     251          690 :             tenants: HashMap::new(),
     252          690 :             size: 0,
     253          690 :             validated: false,
     254          690 :         }
     255          690 :     }
     256              : 
     257          835 :     fn is_empty(&self) -> bool {
     258          835 :         self.tenants.is_empty()
     259          835 :     }
     260              : 
     261         5252 :     fn len(&self) -> usize {
     262         5252 :         self.size
     263         5252 :     }
     264              : 
     265              :     /// Returns true if the push was accepted, false if the caller must start a new
     266              :     /// deletion list.
     267         4100 :     fn push(
     268         4100 :         &mut self,
     269         4100 :         tenant: &TenantShardId,
     270         4100 :         timeline: &TimelineId,
     271         4100 :         generation: Generation,
     272         4100 :         objects: &mut Vec<RemotePath>,
     273         4100 :     ) -> bool {
     274         4100 :         if objects.is_empty() {
     275              :             // Avoid inserting an empty TimelineDeletionList: this preserves the property
     276              :             // that if we have no keys, then self.objects is empty (used in Self::is_empty)
     277          422 :             return true;
     278         3678 :         }
     279         3678 : 
     280         3678 :         let tenant_entry = self
     281         3678 :             .tenants
     282         3678 :             .entry(*tenant)
     283         3678 :             .or_insert_with(|| TenantDeletionList {
     284          112 :                 timelines: HashMap::new(),
     285          112 :                 generation,
     286         3678 :             });
     287         3678 : 
     288         3678 :         if tenant_entry.generation != generation {
     289              :             // Only one generation per tenant per list: signal to
     290              :             // caller to start a new list.
     291            3 :             return false;
     292         3675 :         }
     293         3675 : 
     294         3675 :         let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
     295         3675 : 
     296         3675 :         let timeline_remote_path = remote_timeline_path(tenant, timeline);
     297         3675 : 
     298         3675 :         self.size += objects.len();
     299         3675 :         timeline_entry.extend(objects.drain(..).map(|p| {
     300         3675 :             p.strip_prefix(&timeline_remote_path)
     301         3675 :                 .expect("Timeline paths always start with the timeline prefix")
     302         3675 :                 .to_string()
     303         3675 :         }));
     304         3675 :         true
     305         4100 :     }
     306              : 
     307           42 :     fn into_remote_paths(self) -> Vec<RemotePath> {
     308           42 :         let mut result = Vec::new();
     309           42 :         for (tenant, tenant_deletions) in self.tenants.into_iter() {
     310           42 :             for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
     311           42 :                 let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
     312           42 :                 result.extend(
     313           42 :                     timeline_layers
     314           42 :                         .into_iter()
     315         1082 :                         .map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
     316           42 :                 );
     317           42 :             }
     318              :         }
     319              : 
     320           42 :         result
     321           42 :     }
     322              : 
     323           69 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     324           69 :         let path = conf.deletion_list_path(self.sequence);
     325           69 :         let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
     326           69 : 
     327           69 :         let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
     328           69 : 
     329           69 :         VirtualFile::crashsafe_overwrite(path, temp_path, bytes)
     330           69 :             .await
     331           69 :             .maybe_fatal_err("save deletion list")
     332           69 :             .map_err(Into::into)
     333           69 :     }
     334              : }
     335              : 
     336              : impl std::fmt::Display for DeletionList {
     337            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     338            0 :         write!(
     339            0 :             f,
     340            0 :             "DeletionList<seq={}, tenants={}, keys={}>",
     341            0 :             self.sequence,
     342            0 :             self.tenants.len(),
     343            0 :             self.size
     344            0 :         )
     345            0 :     }
     346              : }
     347              : 
     348              : struct PendingLsn {
     349              :     projected: Lsn,
     350              :     result_slot: Arc<AtomicLsn>,
     351              : }
     352              : 
     353              : struct TenantLsnState {
     354              :     timelines: HashMap<TimelineId, PendingLsn>,
     355              : 
     356              :     // In what generation was the most recent update proposed?
     357              :     generation: Generation,
     358              : }
     359              : 
     360          812 : #[derive(Default)]
     361              : struct VisibleLsnUpdates {
     362              :     tenants: HashMap<TenantShardId, TenantLsnState>,
     363              : }
     364              : 
     365              : impl VisibleLsnUpdates {
     366          717 :     fn new() -> Self {
     367          717 :         Self {
     368          717 :             tenants: HashMap::new(),
     369          717 :         }
     370          717 :     }
     371              : }
     372              : 
     373              : impl std::fmt::Debug for VisibleLsnUpdates {
     374            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     375            0 :         write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
     376            0 :     }
     377              : }
     378              : 
     379            0 : #[derive(Error, Debug)]
     380              : pub enum DeletionQueueError {
     381              :     #[error("Deletion queue unavailable during shutdown")]
     382              :     ShuttingDown,
     383              : }
     384              : 
     385              : impl DeletionQueueClient {
     386            0 :     pub(crate) fn broken() -> Self {
     387            0 :         // Channels whose receivers are immediately dropped.
     388            0 :         let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
     389            0 :         let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
     390            0 :         Self {
     391            0 :             tx,
     392            0 :             executor_tx,
     393            0 :             lsn_table: Arc::default(),
     394            0 :         }
     395            0 :     }
     396              : 
     397              :     /// This is cancel-safe.  If you drop the future before it completes, the message
     398              :     /// is not pushed, although in the context of the deletion queue it doesn't matter: once
     399              :     /// we decide to do a deletion the decision is always final.
     400         5140 :     fn do_push<T>(
     401         5140 :         &self,
     402         5140 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     403         5140 :         msg: T,
     404         5140 :     ) -> Result<(), DeletionQueueError> {
     405         5140 :         match queue.send(msg) {
     406         5140 :             Ok(_) => Ok(()),
     407            0 :             Err(e) => {
     408            0 :                 // This shouldn't happen, we should shut down all tenants before
     409            0 :                 // we shut down the global delete queue.  If we encounter a bug like this,
     410            0 :                 // we may leak objects as deletions won't be processed.
     411            0 :                 error!("Deletion queue closed while pushing, shutting down? ({e})");
     412            0 :                 Err(DeletionQueueError::ShuttingDown)
     413              :             }
     414              :         }
     415         5140 :     }
     416              : 
     417          632 :     pub(crate) fn recover(
     418          632 :         &self,
     419          632 :         attached_tenants: HashMap<TenantShardId, Generation>,
     420          632 :     ) -> Result<(), DeletionQueueError> {
     421          632 :         self.do_push(
     422          632 :             &self.tx,
     423          632 :             ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
     424          632 :         )
     425          632 :     }
     426              : 
     427              :     /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
     428              :     /// world, it must validate its generation number before doing so.  Rather than do this synchronously,
     429              :     /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
     430              :     /// recently validated separately.
     431              :     ///
     432              :     /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates.  The
     433              :     /// backend will later wake up and notice that the tenant's generation requires validation.
     434         5964 :     pub(crate) async fn update_remote_consistent_lsn(
     435         5964 :         &self,
     436         5964 :         tenant_shard_id: TenantShardId,
     437         5964 :         timeline_id: TimelineId,
     438         5964 :         current_generation: Generation,
     439         5964 :         lsn: Lsn,
     440         5964 :         result_slot: Arc<AtomicLsn>,
     441         5964 :     ) {
     442         5964 :         let mut locked = self
     443         5964 :             .lsn_table
     444         5964 :             .write()
     445         5964 :             .expect("Lock should never be poisoned");
     446         5964 : 
     447         5964 :         let tenant_entry = locked
     448         5964 :             .tenants
     449         5964 :             .entry(tenant_shard_id)
     450         5964 :             .or_insert(TenantLsnState {
     451         5964 :                 timelines: HashMap::new(),
     452         5964 :                 generation: current_generation,
     453         5964 :             });
     454         5964 : 
     455         5964 :         if tenant_entry.generation != current_generation {
     456            2 :             // Generation might have changed if we were detached and then re-attached: in this case,
     457            2 :             // state from the previous generation cannot be trusted.
     458            2 :             tenant_entry.timelines.clear();
     459            2 :             tenant_entry.generation = current_generation;
     460         5962 :         }
     461              : 
     462         5964 :         tenant_entry.timelines.insert(
     463         5964 :             timeline_id,
     464         5964 :             PendingLsn {
     465         5964 :                 projected: lsn,
     466         5964 :                 result_slot,
     467         5964 :             },
     468         5964 :         );
     469         5964 :     }
     470              : 
     471              :     /// Submit a list of layers for deletion: this function will return before the deletion is
     472              :     /// persistent, but it may be executed at any time after this function enters: do not push
     473              :     /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
     474              :     /// references them).
     475              :     ///
     476              :     /// The `current_generation` is the generation of this pageserver's current attachment.  The
     477              :     /// generations in `layers` are the generations in which those layers were written.
     478         4319 :     pub(crate) async fn push_layers(
     479         4319 :         &self,
     480         4319 :         tenant_shard_id: TenantShardId,
     481         4319 :         timeline_id: TimelineId,
     482         4319 :         current_generation: Generation,
     483         4319 :         layers: Vec<(LayerFileName, LayerFileMetadata)>,
     484         4319 :     ) -> Result<(), DeletionQueueError> {
     485         4319 :         if current_generation.is_none() {
     486            0 :             debug!("Enqueuing deletions in legacy mode, skipping queue");
     487              : 
     488           60 :             let mut layer_paths = Vec::new();
     489          120 :             for (layer, meta) in layers {
     490           60 :                 layer_paths.push(remote_layer_path(
     491           60 :                     &tenant_shard_id.tenant_id,
     492           60 :                     &timeline_id,
     493           60 :                     meta.shard,
     494           60 :                     &layer,
     495           60 :                     meta.generation,
     496           60 :                 ));
     497           60 :             }
     498           60 :             self.push_immediate(layer_paths).await?;
     499          105 :             return self.flush_immediate().await;
     500         4259 :         }
     501         4259 : 
     502         4259 :         self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
     503         4319 :     }
     504              : 
     505              :     /// When a Tenant has a generation, push_layers is always synchronous because
     506              :     /// the ListValidator channel is an unbounded channel.
     507              :     ///
     508              :     /// This can be merged into push_layers when we remove the Generation-less mode
     509              :     /// support (`<https://github.com/neondatabase/neon/issues/5395>`)
     510         4259 :     pub(crate) fn push_layers_sync(
     511         4259 :         &self,
     512         4259 :         tenant_shard_id: TenantShardId,
     513         4259 :         timeline_id: TimelineId,
     514         4259 :         current_generation: Generation,
     515         4259 :         layers: Vec<(LayerFileName, LayerFileMetadata)>,
     516         4259 :     ) -> Result<(), DeletionQueueError> {
     517         4259 :         metrics::DELETION_QUEUE
     518         4259 :             .keys_submitted
     519         4259 :             .inc_by(layers.len() as u64);
     520         4259 :         self.do_push(
     521         4259 :             &self.tx,
     522         4259 :             ListWriterQueueMessage::Delete(DeletionOp {
     523         4259 :                 tenant_shard_id,
     524         4259 :                 timeline_id,
     525         4259 :                 layers,
     526         4259 :                 generation: current_generation,
     527         4259 :                 objects: Vec::new(),
     528         4259 :             }),
     529         4259 :         )
     530         4259 :     }
     531              : 
     532              :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     533          249 :     async fn do_flush<T>(
     534          249 :         &self,
     535          249 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     536          249 :         msg: T,
     537          249 :         rx: tokio::sync::oneshot::Receiver<()>,
     538          249 :     ) -> Result<(), DeletionQueueError> {
     539          249 :         self.do_push(queue, msg)?;
     540          250 :         if rx.await.is_err() {
     541              :             // This shouldn't happen if tenants are shut down before deletion queue.  If we
     542              :             // encounter a bug like this, then a flusher will incorrectly believe it has flushed
     543              :             // when it hasn't, possibly leading to leaking objects.
     544            0 :             error!("Deletion queue dropped flush op while client was still waiting");
     545            0 :             Err(DeletionQueueError::ShuttingDown)
     546              :         } else {
     547          249 :             Ok(())
     548              :         }
     549          249 :     }
     550              : 
     551              :     /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
     552              :     ///
     553              :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     554          222 :     pub async fn flush(&self) -> Result<(), DeletionQueueError> {
     555          222 :         let (flush_op, rx) = FlushOp::new();
     556          222 :         self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
     557          223 :             .await
     558          222 :     }
     559              : 
     560              :     /// Issue a flush without waiting for it to complete.  This is useful on advisory flushes where
     561              :     /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
     562              :     /// detach where flushing is nice but not necessary.
     563              :     ///
     564              :     /// This function provides no guarantees of work being done.
     565          163 :     pub fn flush_advisory(&self) {
     566          163 :         let (flush_op, _) = FlushOp::new();
     567          163 : 
     568          163 :         // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
     569          163 :         drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
     570          163 :     }
     571              : 
     572              :     // Wait until all previous deletions are executed
     573           27 :     pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
     574            0 :         debug!("flush_execute: flushing to deletion lists...");
     575              :         // Flush any buffered work to deletion lists
     576           27 :         self.flush().await?;
     577              : 
     578              :         // Flush the backend into the executor of deletion lists
     579           27 :         let (flush_op, rx) = FlushOp::new();
     580            0 :         debug!("flush_execute: flushing backend...");
     581           27 :         self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
     582           27 :             .await?;
     583            0 :         debug!("flush_execute: finished flushing backend...");
     584              : 
     585              :         // Flush any immediate-mode deletions (the above backend flush will only flush
     586              :         // the executor if deletions had flowed through the backend)
     587            0 :         debug!("flush_execute: flushing execution...");
     588           27 :         self.flush_immediate().await?;
     589            0 :         debug!("flush_execute: finished flushing execution...");
     590           27 :         Ok(())
     591           27 :     }
     592              : 
     593              :     /// This interface bypasses the persistent deletion queue, and any validation
     594              :     /// that this pageserver is still elegible to execute the deletions.  It is for
     595              :     /// use in timeline deletions, where the control plane is telling us we may
     596              :     /// delete everything in the timeline.
     597              :     ///
     598              :     /// DO NOT USE THIS FROM GC OR COMPACTION CODE.  Use the regular `push_layers`.
     599          722 :     pub(crate) async fn push_immediate(
     600          722 :         &self,
     601          722 :         objects: Vec<RemotePath>,
     602          722 :     ) -> Result<(), DeletionQueueError> {
     603          722 :         metrics::DELETION_QUEUE
     604          722 :             .keys_submitted
     605          722 :             .inc_by(objects.len() as u64);
     606          722 :         self.executor_tx
     607          722 :             .send(DeleterMessage::Delete(objects))
     608           45 :             .await
     609          722 :             .map_err(|_| DeletionQueueError::ShuttingDown)
     610          722 :     }
     611              : 
     612              :     /// Companion to push_immediate.  When this returns Ok, all prior objects sent
     613              :     /// into push_immediate have been deleted from remote storage.
     614          457 :     pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
     615          457 :         let (flush_op, rx) = FlushOp::new();
     616          457 :         self.executor_tx
     617          457 :             .send(DeleterMessage::Flush(flush_op))
     618           45 :             .await
     619          457 :             .map_err(|_| DeletionQueueError::ShuttingDown)?;
     620              : 
     621          457 :         rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
     622          457 :     }
     623              : }
     624              : 
     625              : impl DeletionQueue {
     626         1258 :     pub fn new_client(&self) -> DeletionQueueClient {
     627         1258 :         self.client.clone()
     628         1258 :     }
     629              : 
     630              :     /// Caller may use the returned object to construct clients with new_client.
     631              :     /// Caller should tokio::spawn the background() members of the two worker objects returned:
     632              :     /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
     633              :     ///
     634              :     /// If remote_storage is None, then the returned workers will also be None.
     635          633 :     pub fn new<C>(
     636          633 :         remote_storage: Option<GenericRemoteStorage>,
     637          633 :         control_plane_client: Option<C>,
     638          633 :         conf: &'static PageServerConf,
     639          633 :     ) -> (Self, Option<DeletionQueueWorkers<C>>)
     640          633 :     where
     641          633 :         C: ControlPlaneGenerationsApi + Send + Sync,
     642          633 :     {
     643          633 :         // Unbounded channel: enables non-async functions to submit deletions.  The actual length is
     644          633 :         // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
     645          633 :         // enough to avoid this taking pathologically large amount of memory.
     646          633 :         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
     647          633 : 
     648          633 :         // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
     649          633 :         let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
     650          633 : 
     651          633 :         // Shallow channel: it carries lists of paths, and we expect the main queueing to
     652          633 :         // happen in the backend (persistent), not in this queue.
     653          633 :         let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
     654          633 : 
     655          633 :         let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
     656          633 : 
     657          633 :         // The deletion queue has an independent cancellation token to
     658          633 :         // the general pageserver shutdown token, because it stays alive a bit
     659          633 :         // longer to flush after Tenants have all been torn down.
     660          633 :         let cancel = CancellationToken::new();
     661              : 
     662          633 :         let remote_storage = match remote_storage {
     663              :             None => {
     664            0 :                 return (
     665            0 :                     Self {
     666            0 :                         client: DeletionQueueClient {
     667            0 :                             tx,
     668            0 :                             executor_tx,
     669            0 :                             lsn_table: lsn_table.clone(),
     670            0 :                         },
     671            0 :                         cancel,
     672            0 :                     },
     673            0 :                     None,
     674            0 :                 )
     675              :             }
     676          633 :             Some(r) => r,
     677          633 :         };
     678          633 : 
     679          633 :         (
     680          633 :             Self {
     681          633 :                 client: DeletionQueueClient {
     682          633 :                     tx,
     683          633 :                     executor_tx: executor_tx.clone(),
     684          633 :                     lsn_table: lsn_table.clone(),
     685          633 :                 },
     686          633 :                 cancel: cancel.clone(),
     687          633 :             },
     688          633 :             Some(DeletionQueueWorkers {
     689          633 :                 frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
     690          633 :                 backend: Validator::new(
     691          633 :                     conf,
     692          633 :                     backend_rx,
     693          633 :                     executor_tx,
     694          633 :                     control_plane_client,
     695          633 :                     lsn_table.clone(),
     696          633 :                     cancel.clone(),
     697          633 :                 ),
     698          633 :                 executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
     699          633 :             }),
     700          633 :         )
     701          633 :     }
     702              : 
     703          183 :     pub async fn shutdown(&mut self, timeout: Duration) {
     704          184 :         match tokio::time::timeout(timeout, self.client.flush()).await {
     705              :             Ok(Ok(())) => {
     706          183 :                 tracing::info!("Deletion queue flushed successfully on shutdown")
     707              :             }
     708              :             Ok(Err(DeletionQueueError::ShuttingDown)) => {
     709              :                 // This is not harmful for correctness, but is unexpected: the deletion
     710              :                 // queue's workers should stay alive as long as there are any client handles instantiated.
     711            0 :                 tracing::warn!("Deletion queue stopped prematurely");
     712              :             }
     713            0 :             Err(_timeout) => {
     714            0 :                 tracing::warn!("Timed out flushing deletion queue on shutdown")
     715              :             }
     716              :         }
     717              : 
     718              :         // We only cancel _after_ flushing: otherwise we would be shutting down the
     719              :         // components that do the flush.
     720          183 :         self.cancel.cancel();
     721          183 :     }
     722              : }
     723              : 
     724              : #[cfg(test)]
     725              : mod test {
     726              :     use camino::Utf8Path;
     727              :     use hex_literal::hex;
     728              :     use pageserver_api::shard::ShardIndex;
     729              :     use std::{io::ErrorKind, time::Duration};
     730              :     use tracing::info;
     731              : 
     732              :     use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
     733              :     use tokio::task::JoinHandle;
     734              : 
     735              :     use crate::{
     736              :         control_plane_client::RetryForeverError,
     737              :         repository::Key,
     738              :         tenant::{
     739              :             harness::TenantHarness, remote_timeline_client::remote_timeline_path,
     740              :             storage_layer::DeltaFileName,
     741              :         },
     742              :     };
     743              : 
     744              :     use super::*;
     745              :     pub const TIMELINE_ID: TimelineId =
     746              :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
     747              : 
     748              :     pub const EXAMPLE_LAYER_NAME: LayerFileName = LayerFileName::Delta(DeltaFileName {
     749              :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     750              :         lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
     751              :     });
     752              : 
     753              :     // When you need a second layer in a test.
     754              :     pub const EXAMPLE_LAYER_NAME_ALT: LayerFileName = LayerFileName::Delta(DeltaFileName {
     755              :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     756              :         lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
     757              :     });
     758              : 
     759              :     struct TestSetup {
     760              :         harness: TenantHarness,
     761              :         remote_fs_dir: Utf8PathBuf,
     762              :         storage: GenericRemoteStorage,
     763              :         mock_control_plane: MockControlPlane,
     764              :         deletion_queue: DeletionQueue,
     765              :         worker_join: JoinHandle<()>,
     766              :     }
     767              : 
     768              :     impl TestSetup {
     769              :         /// Simulate a pageserver restart by destroying and recreating the deletion queue
     770            2 :         async fn restart(&mut self) {
     771            2 :             let (deletion_queue, workers) = DeletionQueue::new(
     772            2 :                 Some(self.storage.clone()),
     773            2 :                 Some(self.mock_control_plane.clone()),
     774            2 :                 self.harness.conf,
     775            2 :             );
     776              : 
     777            0 :             tracing::debug!("Spawning worker for new queue queue");
     778            2 :             let worker_join = workers
     779            2 :                 .unwrap()
     780            2 :                 .spawn_with(&tokio::runtime::Handle::current());
     781            2 : 
     782            2 :             let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
     783            2 :             let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
     784              : 
     785            0 :             tracing::debug!("Joining worker from previous queue");
     786            2 :             old_deletion_queue.cancel.cancel();
     787            2 :             old_worker_join
     788            2 :                 .await
     789            2 :                 .expect("Failed to join workers for previous deletion queue");
     790            2 :         }
     791              : 
     792            6 :         fn set_latest_generation(&self, gen: Generation) {
     793            6 :             let tenant_shard_id = self.harness.tenant_shard_id;
     794            6 :             self.mock_control_plane
     795            6 :                 .latest_generation
     796            6 :                 .lock()
     797            6 :                 .unwrap()
     798            6 :                 .insert(tenant_shard_id, gen);
     799            6 :         }
     800              : 
     801              :         /// Returns remote layer file name, suitable for use in assert_remote_files
     802            6 :         fn write_remote_layer(
     803            6 :             &self,
     804            6 :             file_name: LayerFileName,
     805            6 :             gen: Generation,
     806            6 :         ) -> anyhow::Result<String> {
     807            6 :             let tenant_shard_id = self.harness.tenant_shard_id;
     808            6 :             let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     809            6 :             let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
     810            6 :             std::fs::create_dir_all(&remote_timeline_path)?;
     811            6 :             let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
     812            6 : 
     813            6 :             let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
     814            6 : 
     815            6 :             std::fs::write(
     816            6 :                 remote_timeline_path.join(remote_layer_file_name.clone()),
     817            6 :                 content,
     818            6 :             )?;
     819              : 
     820            6 :             Ok(remote_layer_file_name)
     821            6 :         }
     822              :     }
     823              : 
     824            8 :     #[derive(Debug, Clone)]
     825              :     struct MockControlPlane {
     826              :         pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantShardId, Generation>>>,
     827              :     }
     828              : 
     829              :     impl MockControlPlane {
     830            6 :         fn new() -> Self {
     831            6 :             Self {
     832            6 :                 latest_generation: Arc::default(),
     833            6 :             }
     834            6 :         }
     835              :     }
     836              : 
     837              :     impl ControlPlaneGenerationsApi for MockControlPlane {
     838              :         #[allow(clippy::diverging_sub_expression)] // False positive via async_trait
     839            0 :         async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
     840            0 :             unimplemented!()
     841            0 :         }
     842            8 :         async fn validate(
     843            8 :             &self,
     844            8 :             tenants: Vec<(TenantShardId, Generation)>,
     845            8 :         ) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
     846            8 :             let mut result = HashMap::new();
     847            8 : 
     848            8 :             let latest_generation = self.latest_generation.lock().unwrap();
     849              : 
     850           16 :             for (tenant_shard_id, generation) in tenants {
     851            8 :                 if let Some(latest) = latest_generation.get(&tenant_shard_id) {
     852            8 :                     result.insert(tenant_shard_id, *latest == generation);
     853            8 :                 }
     854              :             }
     855              : 
     856            8 :             Ok(result)
     857            8 :         }
     858              :     }
     859              : 
     860            6 :     fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
     861            6 :         let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
     862            6 :         let harness = TenantHarness::create(test_name)?;
     863              : 
     864              :         // We do not load() the harness: we only need its config and remote_storage
     865              : 
     866              :         // Set up a GenericRemoteStorage targetting a directory
     867            6 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs");
     868            6 :         std::fs::create_dir_all(remote_fs_dir)?;
     869            6 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
     870            6 :         let storage_config = RemoteStorageConfig {
     871            6 :             storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
     872            6 :         };
     873            6 :         let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
     874            6 : 
     875            6 :         let mock_control_plane = MockControlPlane::new();
     876            6 : 
     877            6 :         let (deletion_queue, worker) = DeletionQueue::new(
     878            6 :             Some(storage.clone()),
     879            6 :             Some(mock_control_plane.clone()),
     880            6 :             harness.conf,
     881            6 :         );
     882            6 : 
     883            6 :         let worker = worker.unwrap();
     884            6 :         let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
     885            6 : 
     886            6 :         Ok(TestSetup {
     887            6 :             harness,
     888            6 :             remote_fs_dir,
     889            6 :             storage,
     890            6 :             mock_control_plane,
     891            6 :             deletion_queue,
     892            6 :             worker_join,
     893            6 :         })
     894            6 :     }
     895              : 
     896              :     // TODO: put this in a common location so that we can share with remote_timeline_client's tests
     897           18 :     fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
     898           18 :         let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
     899           18 :         expected.sort();
     900           18 : 
     901           18 :         let mut found: Vec<String> = Vec::new();
     902           18 :         let dir = match std::fs::read_dir(remote_path) {
     903           18 :             Ok(d) => d,
     904            0 :             Err(e) => {
     905            0 :                 if e.kind() == ErrorKind::NotFound {
     906            0 :                     if expected.is_empty() {
     907              :                         // We are asserting prefix is empty: it is expected that the dir is missing
     908            0 :                         return;
     909              :                     } else {
     910            0 :                         assert_eq!(expected, Vec::<String>::new());
     911            0 :                         unreachable!();
     912              :                     }
     913              :                 } else {
     914            0 :                     panic!("Unexpected error listing {remote_path}: {e}");
     915              :                 }
     916              :             }
     917              :         };
     918              : 
     919           18 :         for entry in dir.flatten() {
     920           16 :             let entry_name = entry.file_name();
     921           16 :             let fname = entry_name.to_str().unwrap();
     922           16 :             found.push(String::from(fname));
     923           16 :         }
     924           18 :         found.sort();
     925           18 : 
     926           18 :         assert_eq!(expected, found);
     927           18 :     }
     928              : 
     929           10 :     fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
     930           10 :         let dir = match std::fs::read_dir(directory) {
     931            8 :             Ok(d) => d,
     932              :             Err(_) => {
     933            2 :                 assert_eq!(expected, &Vec::<String>::new());
     934            2 :                 return;
     935              :             }
     936              :         };
     937            8 :         let mut found = Vec::new();
     938           18 :         for dentry in dir {
     939           10 :             let dentry = dentry.unwrap();
     940           10 :             let file_name = dentry.file_name();
     941           10 :             let file_name_str = file_name.to_string_lossy();
     942           10 :             found.push(file_name_str.to_string());
     943           10 :         }
     944            8 :         found.sort();
     945            8 :         assert_eq!(expected, found);
     946           10 :     }
     947              : 
     948            2 :     #[tokio::test]
     949            2 :     async fn deletion_queue_smoke() -> anyhow::Result<()> {
     950            2 :         // Basic test that the deletion queue processes the deletions we pass into it
     951            2 :         let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
     952            2 :         let client = ctx.deletion_queue.new_client();
     953            2 :         client.recover(HashMap::new())?;
     954            2 : 
     955            2 :         let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
     956            2 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
     957            2 : 
     958            2 :         let content: Vec<u8> = "victim1 contents".into();
     959            2 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
     960            2 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
     961            2 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
     962            2 : 
     963            2 :         // Exercise the distinction between the generation of the layers
     964            2 :         // we delete, and the generation of the running Tenant.
     965            2 :         let layer_generation = Generation::new(0xdeadbeef);
     966            2 :         let now_generation = Generation::new(0xfeedbeef);
     967            2 :         let layer_metadata =
     968            2 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
     969            2 : 
     970            2 :         let remote_layer_file_name_1 =
     971            2 :             format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
     972            2 : 
     973            2 :         // Set mock control plane state to valid for our generation
     974            2 :         ctx.set_latest_generation(now_generation);
     975            2 : 
     976            2 :         // Inject a victim file to remote storage
     977            2 :         info!("Writing");
     978            2 :         std::fs::create_dir_all(&remote_timeline_path)?;
     979            2 :         std::fs::write(
     980            2 :             remote_timeline_path.join(remote_layer_file_name_1.clone()),
     981            2 :             content,
     982            2 :         )?;
     983            2 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     984            2 : 
     985            2 :         // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
     986            2 :         info!("Pushing");
     987            2 :         client
     988            2 :             .push_layers(
     989            2 :                 tenant_shard_id,
     990            2 :                 TIMELINE_ID,
     991            2 :                 now_generation,
     992            2 :                 [(layer_file_name_1.clone(), layer_metadata)].to_vec(),
     993            2 :             )
     994            2 :             .await?;
     995            2 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
     996            2 : 
     997            2 :         assert_local_files(&[], &deletion_prefix);
     998            2 : 
     999            2 :         // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
    1000            2 :         info!("Flushing");
    1001            2 :         client.flush().await?;
    1002            2 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
    1003            2 :         assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
    1004            2 : 
    1005            2 :         // File should go away when we execute
    1006            2 :         info!("Flush-executing");
    1007            6 :         client.flush_execute().await?;
    1008            2 :         assert_remote_files(&[], &remote_timeline_path);
    1009            2 :         assert_local_files(&["header-01"], &deletion_prefix);
    1010            2 : 
    1011            2 :         // Flushing on an empty queue should succeed immediately, and not write any lists
    1012            2 :         info!("Flush-executing on empty");
    1013            6 :         client.flush_execute().await?;
    1014            2 :         assert_local_files(&["header-01"], &deletion_prefix);
    1015            2 : 
    1016            2 :         Ok(())
    1017            2 :     }
    1018              : 
    1019            2 :     #[tokio::test]
    1020            2 :     async fn deletion_queue_validation() -> anyhow::Result<()> {
    1021            2 :         let ctx = setup("deletion_queue_validation").expect("Failed test setup");
    1022            2 :         let client = ctx.deletion_queue.new_client();
    1023            2 :         client.recover(HashMap::new())?;
    1024            2 : 
    1025            2 :         // Generation that the control plane thinks is current
    1026            2 :         let latest_generation = Generation::new(0xdeadbeef);
    1027            2 :         // Generation that our DeletionQueue thinks the tenant is running with
    1028            2 :         let stale_generation = latest_generation.previous();
    1029            2 :         // Generation that our example layer file was written with
    1030            2 :         let layer_generation = stale_generation.previous();
    1031            2 :         let layer_metadata =
    1032            2 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
    1033            2 : 
    1034            2 :         ctx.set_latest_generation(latest_generation);
    1035            2 : 
    1036            2 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
    1037            2 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
    1038            2 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
    1039            2 : 
    1040            2 :         // Initial state: a remote layer exists
    1041            2 :         let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
    1042            2 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
    1043            2 : 
    1044            2 :         tracing::debug!("Pushing...");
    1045            2 :         client
    1046            2 :             .push_layers(
    1047            2 :                 tenant_shard_id,
    1048            2 :                 TIMELINE_ID,
    1049            2 :                 stale_generation,
    1050            2 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1051            2 :             )
    1052            2 :             .await?;
    1053            2 : 
    1054            2 :         // We enqueued the operation in a stale generation: it should have failed validation
    1055            2 :         tracing::debug!("Flushing...");
    1056            6 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1057            2 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
    1058            2 : 
    1059            2 :         tracing::debug!("Pushing...");
    1060            2 :         client
    1061            2 :             .push_layers(
    1062            2 :                 tenant_shard_id,
    1063            2 :                 TIMELINE_ID,
    1064            2 :                 latest_generation,
    1065            2 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1066            2 :             )
    1067            2 :             .await?;
    1068            2 : 
    1069            2 :         // We enqueued the operation in a fresh generation: it should have passed validation
    1070            2 :         tracing::debug!("Flushing...");
    1071            6 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1072            2 :         assert_remote_files(&[], &remote_timeline_path);
    1073            2 : 
    1074            2 :         Ok(())
    1075            2 :     }
    1076              : 
    1077            2 :     #[tokio::test]
    1078            2 :     async fn deletion_queue_recovery() -> anyhow::Result<()> {
    1079            2 :         // Basic test that the deletion queue processes the deletions we pass into it
    1080            2 :         let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
    1081            2 :         let client = ctx.deletion_queue.new_client();
    1082            2 :         client.recover(HashMap::new())?;
    1083            2 : 
    1084            2 :         let tenant_shard_id = ctx.harness.tenant_shard_id;
    1085            2 : 
    1086            2 :         let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
    1087            2 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
    1088            2 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
    1089            2 : 
    1090            2 :         let layer_generation = Generation::new(0xdeadbeef);
    1091            2 :         let now_generation = Generation::new(0xfeedbeef);
    1092            2 :         let layer_metadata =
    1093            2 :             LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
    1094            2 : 
    1095            2 :         // Inject a deletion in the generation before generation_now: after restart,
    1096            2 :         // this deletion should _not_ get executed (only the immediately previous
    1097            2 :         // generation gets that treatment)
    1098            2 :         let remote_layer_file_name_historical =
    1099            2 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
    1100            2 :         client
    1101            2 :             .push_layers(
    1102            2 :                 tenant_shard_id,
    1103            2 :                 TIMELINE_ID,
    1104            2 :                 now_generation.previous(),
    1105            2 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
    1106            2 :             )
    1107            2 :             .await?;
    1108            2 : 
    1109            2 :         // Inject a deletion in the generation before generation_now: after restart,
    1110            2 :         // this deletion should get executed, because we execute deletions in the
    1111            2 :         // immediately previous generation on the same node.
    1112            2 :         let remote_layer_file_name_previous =
    1113            2 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
    1114            2 :         client
    1115            2 :             .push_layers(
    1116            2 :                 tenant_shard_id,
    1117            2 :                 TIMELINE_ID,
    1118            2 :                 now_generation,
    1119            2 :                 [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
    1120            2 :             )
    1121            2 :             .await?;
    1122            2 : 
    1123            2 :         client.flush().await?;
    1124            2 :         assert_remote_files(
    1125            2 :             &[
    1126            2 :                 &remote_layer_file_name_historical,
    1127            2 :                 &remote_layer_file_name_previous,
    1128            2 :             ],
    1129            2 :             &remote_timeline_path,
    1130            2 :         );
    1131            2 : 
    1132            2 :         // Different generatinos for the same tenant will cause two separate
    1133            2 :         // deletion lists to be emitted.
    1134            2 :         assert_local_files(
    1135            2 :             &["0000000000000001-01.list", "0000000000000002-01.list"],
    1136            2 :             &deletion_prefix,
    1137            2 :         );
    1138            2 : 
    1139            2 :         // Simulate a node restart: the latest generation advances
    1140            2 :         let now_generation = now_generation.next();
    1141            2 :         ctx.set_latest_generation(now_generation);
    1142            2 : 
    1143            2 :         // Restart the deletion queue
    1144            2 :         drop(client);
    1145            2 :         ctx.restart().await;
    1146            2 :         let client = ctx.deletion_queue.new_client();
    1147            2 :         client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
    1148            2 : 
    1149            2 :         info!("Flush-executing");
    1150            6 :         client.flush_execute().await?;
    1151            2 :         // The deletion from immediately prior generation was executed, the one from
    1152            2 :         // an older generation was not.
    1153            2 :         assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
    1154            2 :         Ok(())
    1155            2 :     }
    1156              : }
    1157              : 
    1158              : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
    1159              : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
    1160              : #[cfg(test)]
    1161              : pub(crate) mod mock {
    1162              :     use tracing::info;
    1163              : 
    1164              :     use crate::tenant::remote_timeline_client::remote_layer_path;
    1165              : 
    1166              :     use super::*;
    1167              :     use std::sync::{
    1168              :         atomic::{AtomicUsize, Ordering},
    1169              :         Arc,
    1170              :     };
    1171              : 
    1172              :     pub struct ConsumerState {
    1173              :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
    1174              :         executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
    1175              :     }
    1176              : 
    1177              :     impl ConsumerState {
    1178            2 :         async fn consume(&mut self, remote_storage: &GenericRemoteStorage) -> usize {
    1179            2 :             let mut executed = 0;
    1180              : 
    1181            2 :             info!("Executing all pending deletions");
    1182              : 
    1183              :             // Transform all executor messages to generic frontend messages
    1184            2 :             while let Ok(msg) = self.executor_rx.try_recv() {
    1185            0 :                 match msg {
    1186            0 :                     DeleterMessage::Delete(objects) => {
    1187            0 :                         for path in objects {
    1188            0 :                             match remote_storage.delete(&path).await {
    1189              :                                 Ok(_) => {
    1190            0 :                                     debug!("Deleted {path}");
    1191              :                                 }
    1192            0 :                                 Err(e) => {
    1193            0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1194              :                                 }
    1195              :                             }
    1196            0 :                             executed += 1;
    1197              :                         }
    1198              :                     }
    1199            0 :                     DeleterMessage::Flush(flush_op) => {
    1200            0 :                         flush_op.notify();
    1201            0 :                     }
    1202              :                 }
    1203              :             }
    1204              : 
    1205            4 :             while let Ok(msg) = self.rx.try_recv() {
    1206            2 :                 match msg {
    1207            2 :                     ListWriterQueueMessage::Delete(op) => {
    1208            2 :                         let mut objects = op.objects;
    1209            4 :                         for (layer, meta) in op.layers {
    1210            2 :                             objects.push(remote_layer_path(
    1211            2 :                                 &op.tenant_shard_id.tenant_id,
    1212            2 :                                 &op.timeline_id,
    1213            2 :                                 meta.shard,
    1214            2 :                                 &layer,
    1215            2 :                                 meta.generation,
    1216            2 :                             ));
    1217            2 :                         }
    1218              : 
    1219            4 :                         for path in objects {
    1220            2 :                             info!("Executing deletion {path}");
    1221            2 :                             match remote_storage.delete(&path).await {
    1222              :                                 Ok(_) => {
    1223            0 :                                     debug!("Deleted {path}");
    1224              :                                 }
    1225            0 :                                 Err(e) => {
    1226            0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1227              :                                 }
    1228              :                             }
    1229            2 :                             executed += 1;
    1230              :                         }
    1231              :                     }
    1232            0 :                     ListWriterQueueMessage::Flush(op) => {
    1233            0 :                         op.notify();
    1234            0 :                     }
    1235            0 :                     ListWriterQueueMessage::FlushExecute(op) => {
    1236            0 :                         // We have already executed all prior deletions because mock does them inline
    1237            0 :                         op.notify();
    1238            0 :                     }
    1239            0 :                     ListWriterQueueMessage::Recover(_) => {
    1240            0 :                         // no-op in mock
    1241            0 :                     }
    1242              :                 }
    1243            2 :                 info!("All pending deletions have been executed");
    1244              :             }
    1245              : 
    1246            2 :             executed
    1247            2 :         }
    1248              :     }
    1249              : 
    1250              :     pub struct MockDeletionQueue {
    1251              :         tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
    1252              :         executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
    1253              :         executed: Arc<AtomicUsize>,
    1254              :         remote_storage: Option<GenericRemoteStorage>,
    1255              :         consumer: std::sync::Mutex<ConsumerState>,
    1256              :         lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
    1257              :     }
    1258              : 
    1259              :     impl MockDeletionQueue {
    1260           84 :         pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
    1261           84 :             let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    1262           84 :             let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
    1263           84 : 
    1264           84 :             let executed = Arc::new(AtomicUsize::new(0));
    1265           84 : 
    1266           84 :             Self {
    1267           84 :                 tx,
    1268           84 :                 executor_tx,
    1269           84 :                 executed,
    1270           84 :                 remote_storage,
    1271           84 :                 consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }),
    1272           84 :                 lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
    1273           84 :             }
    1274           84 :         }
    1275              : 
    1276              :         #[allow(clippy::await_holding_lock)]
    1277            2 :         pub async fn pump(&self) {
    1278            2 :             if let Some(remote_storage) = &self.remote_storage {
    1279              :                 // Permit holding mutex across await, because this is only ever
    1280              :                 // called once at a time in tests.
    1281            2 :                 let mut locked = self.consumer.lock().unwrap();
    1282            2 :                 let count = locked.consume(remote_storage).await;
    1283            2 :                 self.executed.fetch_add(count, Ordering::Relaxed);
    1284            0 :             }
    1285            2 :         }
    1286              : 
    1287           96 :         pub(crate) fn new_client(&self) -> DeletionQueueClient {
    1288           96 :             DeletionQueueClient {
    1289           96 :                 tx: self.tx.clone(),
    1290           96 :                 executor_tx: self.executor_tx.clone(),
    1291           96 :                 lsn_table: self.lsn_table.clone(),
    1292           96 :             }
    1293           96 :         }
    1294              :     }
    1295              : 
    1296              :     /// Test round-trip serialization/deserialization, and test stability of the format
    1297              :     /// vs. a static expected string for the serialized version.
    1298            2 :     #[test]
    1299            2 :     fn deletion_list_serialization() -> anyhow::Result<()> {
    1300            2 :         let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
    1301            2 :             .to_string()
    1302            2 :             .parse::<TenantShardId>()?;
    1303            2 :         let timeline_id = "be322c834ed9e709e63b5c9698691910"
    1304            2 :             .to_string()
    1305            2 :             .parse::<TimelineId>()?;
    1306            2 :         let generation = Generation::new(123);
    1307              : 
    1308            2 :         let object =
    1309            2 :             RemotePath::from_string(&format!("tenants/{tenant_id}/timelines/{timeline_id}/foo"))?;
    1310            2 :         let mut objects = [object].to_vec();
    1311            2 : 
    1312            2 :         let mut example = DeletionList::new(1);
    1313            2 :         example.push(&tenant_id, &timeline_id, generation, &mut objects);
    1314              : 
    1315            2 :         let encoded = serde_json::to_string(&example)?;
    1316              : 
    1317            2 :         let expected = "{\"version\":1,\"sequence\":1,\"tenants\":{\"ad6c1a56f5680419d3a16ff55d97ec3c\":{\"timelines\":{\"be322c834ed9e709e63b5c9698691910\":[\"foo\"]},\"generation\":123}},\"size\":1}".to_string();
    1318            2 :         assert_eq!(encoded, expected);
    1319              : 
    1320            2 :         let decoded = serde_json::from_str::<DeletionList>(&encoded)?;
    1321            2 :         assert_eq!(example, decoded);
    1322              : 
    1323            2 :         Ok(())
    1324            2 :     }
    1325              : }
        

Generated by: LCOV version 2.1-beta