LCOV - differential code coverage report
Current view: top level - pageserver/src - deletion_queue.rs (source / functions) Coverage Total Hit UBC GIC CBC ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 87.4 % 818 715 103 715
Current Date: 2023-10-19 02:04:12 Functions: 52.3 % 220 115 105 3 112 3
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : mod deleter;
       2                 : mod list_writer;
       3                 : mod validator;
       4                 : 
       5                 : use std::collections::HashMap;
       6                 : use std::sync::Arc;
       7                 : use std::time::Duration;
       8                 : 
       9                 : use crate::control_plane_client::ControlPlaneGenerationsApi;
      10                 : use crate::metrics;
      11                 : use crate::tenant::remote_timeline_client::remote_layer_path;
      12                 : use crate::tenant::remote_timeline_client::remote_timeline_path;
      13                 : use crate::virtual_file::VirtualFile;
      14                 : use anyhow::Context;
      15                 : use camino::Utf8PathBuf;
      16                 : use hex::FromHex;
      17                 : use remote_storage::{GenericRemoteStorage, RemotePath};
      18                 : use serde::Deserialize;
      19                 : use serde::Serialize;
      20                 : use serde_with::serde_as;
      21                 : use thiserror::Error;
      22                 : use tokio;
      23                 : use tokio_util::sync::CancellationToken;
      24                 : use tracing::Instrument;
      25                 : use tracing::{self, debug, error};
      26                 : use utils::crashsafe::path_with_suffix_extension;
      27                 : use utils::generation::Generation;
      28                 : use utils::id::{TenantId, TimelineId};
      29                 : use utils::lsn::AtomicLsn;
      30                 : use utils::lsn::Lsn;
      31                 : 
      32                 : use self::deleter::Deleter;
      33                 : use self::list_writer::DeletionOp;
      34                 : use self::list_writer::ListWriter;
      35                 : use self::list_writer::RecoverOp;
      36                 : use self::validator::Validator;
      37                 : use deleter::DeleterMessage;
      38                 : use list_writer::ListWriterQueueMessage;
      39                 : use validator::ValidatorQueueMessage;
      40                 : 
      41                 : use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
      42                 : 
      43                 : // TODO: configurable for how long to wait before executing deletions
      44                 : 
      45                 : /// We aggregate object deletions from many tenants in one place, for several reasons:
      46                 : /// - Coalesce deletions into fewer DeleteObjects calls
      47                 : /// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
      48                 : ///   to flush any outstanding deletions.
      49                 : /// - Globally control throughput of deletions, as these are a low priority task: do
      50                 : ///   not compete with the same S3 clients/connections used for higher priority uploads.
      51                 : /// - Enable gating deletions on validation of a tenant's generation number, to make
      52                 : ///   it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md)
      53                 : ///
      54                 : /// There are two kinds of deletion: deferred and immediate.  A deferred deletion
      55                 : /// may be intentionally delayed to protect passive readers of S3 data, and is
      56                 : /// subject to a generation number validation step.  An immediate deletion is
      57                 : /// ready to execute immediately, and is only queued up so that it can be coalesced
      58                 : /// with other deletions in flight.
      59                 : ///
      60                 : /// Deferred deletions pass through three steps:
      61                 : /// - ListWriter: accumulate deletion requests from Timelines, and batch them up into
      62                 : ///   DeletionLists, which are persisted to disk.
      63                 : /// - Validator: accumulate deletion lists, and validate them en-masse prior to passing
      64                 : ///   the keys in the list onward for actual deletion.  Also validate remote_consistent_lsn
      65                 : ///   updates for running timelines.
      66                 : /// - Deleter: accumulate object keys that the validator has validated, and execute them in
      67                 : ///   batches of 1000 keys via DeleteObjects.
      68                 : ///
      69                 : /// Non-deferred deletions, such as during timeline deletion, bypass the first
      70                 : /// two stages and are passed straight into the Deleter.
      71                 : ///
      72                 : /// Internally, each stage is joined by a channel to the next.  On disk, there is only
      73                 : /// one queue (of DeletionLists), which is written by the frontend and consumed
      74                 : /// by the backend.
      75 CBC         144 : #[derive(Clone)]
      76                 : pub struct DeletionQueue {
      77                 :     client: DeletionQueueClient,
      78                 : 
      79                 :     // Parent cancellation token for the tokens passed into background workers
      80                 :     cancel: CancellationToken,
      81                 : }
      82                 : 
      83                 : /// Opaque wrapper around individual worker tasks, to avoid making the
      84                 : /// worker objects themselves public
      85                 : pub struct DeletionQueueWorkers<C>
      86                 : where
      87                 :     C: ControlPlaneGenerationsApi + Send + Sync,
      88                 : {
      89                 :     frontend: ListWriter,
      90                 :     backend: Validator<C>,
      91                 :     executor: Deleter,
      92                 : }
      93                 : 
      94                 : impl<C> DeletionQueueWorkers<C>
      95                 : where
      96                 :     C: ControlPlaneGenerationsApi + Send + Sync + 'static,
      97                 : {
      98             564 :     pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
      99             564 :         let jh_frontend = runtime.spawn(async move {
     100             564 :             self.frontend
     101             564 :                 .background()
     102             564 :                 .instrument(tracing::info_span!(parent:None, "deletion frontend"))
     103             749 :                 .await
     104             564 :         });
     105             564 :         let jh_backend = runtime.spawn(async move {
     106             564 :             self.backend
     107             564 :                 .background()
     108             564 :                 .instrument(tracing::info_span!(parent:None, "deletion backend"))
     109             612 :                 .await
     110             564 :         });
     111             564 :         let jh_executor = runtime.spawn(async move {
     112             564 :             self.executor
     113             564 :                 .background()
     114             564 :                 .instrument(tracing::info_span!(parent:None, "deletion executor"))
     115           12750 :                 .await
     116             564 :         });
     117             564 : 
     118             564 :         runtime.spawn({
     119             564 :             async move {
     120             564 :                 jh_frontend.await.expect("error joining frontend worker");
     121             145 :                 jh_backend.await.expect("error joining backend worker");
     122             145 :                 drop(jh_executor.await.expect("error joining executor worker"));
     123             564 :             }
     124             564 :         })
     125             564 :     }
     126                 : }
     127                 : 
     128                 : /// A FlushOp is just a oneshot channel, where we send the transmit side down
     129                 : /// another channel, and the receive side will receive a message when the channel
     130                 : /// we're flushing has reached the FlushOp we sent into it.
     131                 : ///
     132                 : /// The only extra behavior beyond the channel is that the notify() method does not
     133                 : /// return an error when the receive side has been dropped, because in this use case
     134                 : /// it is harmless (the code that initiated the flush no longer cares about the result).
     135 UBC           0 : #[derive(Debug)]
     136                 : struct FlushOp {
     137                 :     tx: tokio::sync::oneshot::Sender<()>,
     138                 : }
     139                 : 
     140                 : impl FlushOp {
     141 CBC        1189 :     fn new() -> (Self, tokio::sync::oneshot::Receiver<()>) {
     142            1189 :         let (tx, rx) = tokio::sync::oneshot::channel::<()>();
     143            1189 :         (Self { tx }, rx)
     144            1189 :     }
     145                 : 
     146            1187 :     fn notify(self) {
     147            1187 :         if self.tx.send(()).is_err() {
     148                 :             // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
     149              38 :             debug!("deletion queue flush from dropped client");
     150            1149 :         };
     151            1187 :     }
     152                 : }
     153                 : 
     154            5566 : #[derive(Clone, Debug)]
     155                 : pub struct DeletionQueueClient {
     156                 :     tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
     157                 :     executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
     158                 : 
     159                 :     lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
     160                 : }
     161                 : 
     162              48 : #[derive(Debug, Serialize, Deserialize)]
     163                 : struct TenantDeletionList {
     164                 :     /// For each Timeline, a list of key fragments to append to the timeline remote path
     165                 :     /// when reconstructing a full key
     166                 :     #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")]
     167                 :     timelines: HashMap<TimelineId, Vec<String>>,
     168                 : 
     169                 :     /// The generation in which this deletion was emitted: note that this may not be the
     170                 :     /// same as the generation of any layers being deleted.  The generation of the layer
     171                 :     /// has already been absorbed into the keys in `objects`
     172                 :     generation: Generation,
     173                 : }
     174                 : 
     175                 : impl TenantDeletionList {
     176              14 :     pub(crate) fn len(&self) -> usize {
     177              14 :         self.timelines.values().map(|v| v.len()).sum()
     178              14 :     }
     179                 : }
     180                 : 
     181                 : /// For HashMaps using a `hex` compatible key, where we would like to encode the key as a string
     182              34 : fn to_hex_map<S, V, I>(input: &HashMap<I, V>, serializer: S) -> Result<S::Ok, S::Error>
     183              34 : where
     184              34 :     S: serde::Serializer,
     185              34 :     V: Serialize,
     186              34 :     I: AsRef<[u8]>,
     187              34 : {
     188              34 :     let transformed = input.iter().map(|(k, v)| (hex::encode(k), v));
     189              34 : 
     190              34 :     transformed
     191              34 :         .collect::<HashMap<String, &V>>()
     192              34 :         .serialize(serializer)
     193              34 : }
     194                 : 
     195                 : /// For HashMaps using a FromHex key, where we would like to decode the key
     196              16 : fn from_hex_map<'de, D, V, I>(deserializer: D) -> Result<HashMap<I, V>, D::Error>
     197              16 : where
     198              16 :     D: serde::de::Deserializer<'de>,
     199              16 :     V: Deserialize<'de>,
     200              16 :     I: FromHex + std::hash::Hash + Eq,
     201              16 : {
     202              16 :     let hex_map = HashMap::<String, V>::deserialize(deserializer)?;
     203              16 :     hex_map
     204              16 :         .into_iter()
     205              16 :         .map(|(k, v)| {
     206              16 :             I::from_hex(k)
     207              16 :                 .map(|k| (k, v))
     208              16 :                 .map_err(|_| serde::de::Error::custom("Invalid hex ID"))
     209              16 :         })
     210              16 :         .collect()
     211              16 : }
     212                 : 
     213                 : /// Files ending with this suffix will be ignored and erased
     214                 : /// during recovery as startup.
     215                 : const TEMP_SUFFIX: &str = "tmp";
     216                 : 
     217                 : #[serde_as]
     218              80 : #[derive(Debug, Serialize, Deserialize)]
     219                 : struct DeletionList {
     220                 :     /// Serialization version, for future use
     221                 :     version: u8,
     222                 : 
     223                 :     /// Used for constructing a unique key for each deletion list we write out.
     224                 :     sequence: u64,
     225                 : 
     226                 :     /// To avoid repeating tenant/timeline IDs in every key, we store keys in
     227                 :     /// nested HashMaps by TenantTimelineID.  Each Tenant only appears once
     228                 :     /// with one unique generation ID: if someone tries to push a second generation
     229                 :     /// ID for the same tenant, we will start a new DeletionList.
     230                 :     #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")]
     231                 :     tenants: HashMap<TenantId, TenantDeletionList>,
     232                 : 
     233                 :     /// Avoid having to walk `tenants` to calculate the number of keys in
     234                 :     /// the nested deletion lists
     235                 :     size: usize,
     236                 : 
     237                 :     /// Set to true when the list has undergone validation with the control
     238                 :     /// plane and the remaining contents of `tenants` are valid.  A list may
     239                 :     /// also be implicitly marked valid by DeletionHeader.validated_sequence
     240                 :     /// advancing to >= DeletionList.sequence
     241                 :     #[serde(default)]
     242                 :     #[serde(skip_serializing_if = "std::ops::Not::not")]
     243                 :     validated: bool,
     244                 : }
     245                 : 
     246                 : #[serde_as]
     247              35 : #[derive(Debug, Serialize, Deserialize)]
     248                 : struct DeletionHeader {
     249                 :     /// Serialization version, for future use
     250                 :     version: u8,
     251                 : 
     252                 :     /// The highest sequence number (inclusive) that has been validated.  All deletion
     253                 :     /// lists on disk with a sequence <= this value are safe to execute.
     254                 :     validated_sequence: u64,
     255                 : }
     256                 : 
     257                 : impl DeletionHeader {
     258                 :     const VERSION_LATEST: u8 = 1;
     259                 : 
     260              13 :     fn new(validated_sequence: u64) -> Self {
     261              13 :         Self {
     262              13 :             version: Self::VERSION_LATEST,
     263              13 :             validated_sequence,
     264              13 :         }
     265              13 :     }
     266                 : 
     267              13 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     268 UBC           0 :         debug!("Saving deletion list header {:?}", self);
     269 CBC          13 :         let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
     270              13 :         let header_path = conf.deletion_header_path();
     271              13 :         let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
     272              13 :         VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
     273 UBC           0 :             .await
     274 CBC          13 :             .map_err(Into::into)
     275              13 :     }
     276                 : }
     277                 : 
     278                 : impl DeletionList {
     279                 :     const VERSION_LATEST: u8 = 1;
     280             579 :     fn new(sequence: u64) -> Self {
     281             579 :         Self {
     282             579 :             version: Self::VERSION_LATEST,
     283             579 :             sequence,
     284             579 :             tenants: HashMap::new(),
     285             579 :             size: 0,
     286             579 :             validated: false,
     287             579 :         }
     288             579 :     }
     289                 : 
     290             511 :     fn is_empty(&self) -> bool {
     291             511 :         self.tenants.is_empty()
     292             511 :     }
     293                 : 
     294             349 :     fn len(&self) -> usize {
     295             349 :         self.size
     296             349 :     }
     297                 : 
     298                 :     /// Returns true if the push was accepted, false if the caller must start a new
     299                 :     /// deletion list.
     300              62 :     fn push(
     301              62 :         &mut self,
     302              62 :         tenant: &TenantId,
     303              62 :         timeline: &TimelineId,
     304              62 :         generation: Generation,
     305              62 :         objects: &mut Vec<RemotePath>,
     306              62 :     ) -> bool {
     307              62 :         if objects.is_empty() {
     308                 :             // Avoid inserting an empty TimelineDeletionList: this preserves the property
     309                 :             // that if we have no keys, then self.objects is empty (used in Self::is_empty)
     310              13 :             return true;
     311              49 :         }
     312              49 : 
     313              49 :         let tenant_entry = self
     314              49 :             .tenants
     315              49 :             .entry(*tenant)
     316              49 :             .or_insert_with(|| TenantDeletionList {
     317              16 :                 timelines: HashMap::new(),
     318              16 :                 generation,
     319              49 :             });
     320              49 : 
     321              49 :         if tenant_entry.generation != generation {
     322                 :             // Only one generation per tenant per list: signal to
     323                 :             // caller to start a new list.
     324               1 :             return false;
     325              48 :         }
     326              48 : 
     327              48 :         let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
     328              48 : 
     329              48 :         let timeline_remote_path = remote_timeline_path(tenant, timeline);
     330              48 : 
     331              48 :         self.size += objects.len();
     332            1672 :         timeline_entry.extend(objects.drain(..).map(|p| {
     333            1672 :             p.strip_prefix(&timeline_remote_path)
     334            1672 :                 .expect("Timeline paths always start with the timeline prefix")
     335            1672 :                 .to_string()
     336            1672 :         }));
     337              48 :         true
     338              62 :     }
     339                 : 
     340              16 :     fn into_remote_paths(self) -> Vec<RemotePath> {
     341              16 :         let mut result = Vec::new();
     342              16 :         for (tenant, tenant_deletions) in self.tenants.into_iter() {
     343              12 :             for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
     344              12 :                 let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
     345              12 :                 result.extend(
     346              12 :                     timeline_layers
     347              12 :                         .into_iter()
     348            1636 :                         .map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
     349              12 :                 );
     350              12 :             }
     351                 :         }
     352                 : 
     353              16 :         result
     354              16 :     }
     355                 : 
     356              19 :     async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> {
     357              19 :         let path = conf.deletion_list_path(self.sequence);
     358              19 :         let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
     359              19 : 
     360              19 :         let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
     361              19 :         VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
     362 UBC           0 :             .await
     363 CBC          19 :             .map_err(Into::into)
     364              19 :     }
     365                 : }
     366                 : 
     367                 : impl std::fmt::Display for DeletionList {
     368 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     369               0 :         write!(
     370               0 :             f,
     371               0 :             "DeletionList<seq={}, tenants={}, keys={}>",
     372               0 :             self.sequence,
     373               0 :             self.tenants.len(),
     374               0 :             self.size
     375               0 :         )
     376               0 :     }
     377                 : }
     378                 : 
     379                 : struct PendingLsn {
     380                 :     projected: Lsn,
     381                 :     result_slot: Arc<AtomicLsn>,
     382                 : }
     383                 : 
     384                 : struct TenantLsnState {
     385                 :     timelines: HashMap<TimelineId, PendingLsn>,
     386                 : 
     387                 :     // In what generation was the most recent update proposed?
     388                 :     generation: Generation,
     389                 : }
     390                 : 
     391 CBC         381 : #[derive(Default)]
     392                 : struct VisibleLsnUpdates {
     393                 :     tenants: HashMap<TenantId, TenantLsnState>,
     394                 : }
     395                 : 
     396                 : impl VisibleLsnUpdates {
     397             605 :     fn new() -> Self {
     398             605 :         Self {
     399             605 :             tenants: HashMap::new(),
     400             605 :         }
     401             605 :     }
     402                 : }
     403                 : 
     404                 : impl std::fmt::Debug for VisibleLsnUpdates {
     405 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     406               0 :         write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len())
     407               0 :     }
     408                 : }
     409                 : 
     410               0 : #[derive(Error, Debug)]
     411                 : pub enum DeletionQueueError {
     412                 :     #[error("Deletion queue unavailable during shutdown")]
     413                 :     ShuttingDown,
     414                 : }
     415                 : 
     416                 : impl DeletionQueueClient {
     417               0 :     pub(crate) fn broken() -> Self {
     418               0 :         // Channels whose receivers are immediately dropped.
     419               0 :         let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
     420               0 :         let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
     421               0 :         Self {
     422               0 :             tx,
     423               0 :             executor_tx,
     424               0 :             lsn_table: Arc::default(),
     425               0 :         }
     426               0 :     }
     427                 : 
     428                 :     /// This is cancel-safe.  If you drop the future before it completes, the message
     429                 :     /// is not pushed, although in the context of the deletion queue it doesn't matter: once
     430                 :     /// we decide to do a deletion the decision is always final.
     431 CBC         294 :     fn do_push<T>(
     432             294 :         &self,
     433             294 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     434             294 :         msg: T,
     435             294 :     ) -> Result<(), DeletionQueueError> {
     436             294 :         match queue.send(msg) {
     437             294 :             Ok(_) => Ok(()),
     438 UBC           0 :             Err(e) => {
     439               0 :                 // This shouldn't happen, we should shut down all tenants before
     440               0 :                 // we shut down the global delete queue.  If we encounter a bug like this,
     441               0 :                 // we may leak objects as deletions won't be processed.
     442               0 :                 error!("Deletion queue closed while pushing, shutting down? ({e})");
     443               0 :                 Err(DeletionQueueError::ShuttingDown)
     444                 :             }
     445                 :         }
     446 CBC         294 :     }
     447                 : 
     448              35 :     pub(crate) fn recover(
     449              35 :         &self,
     450              35 :         attached_tenants: HashMap<TenantId, Generation>,
     451              35 :     ) -> Result<(), DeletionQueueError> {
     452              35 :         self.do_push(
     453              35 :             &self.tx,
     454              35 :             ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
     455              35 :         )
     456              35 :     }
     457                 : 
     458                 :     /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
     459                 :     /// world, it must validate its generation number before doing so.  Rather than do this synchronously,
     460                 :     /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most
     461                 :     /// recently validated separately.
     462                 :     ///
     463                 :     /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates.  The
     464                 :     /// backend will later wake up and notice that the tenant's generation requires validation.
     465             428 :     pub(crate) async fn update_remote_consistent_lsn(
     466             428 :         &self,
     467             428 :         tenant_id: TenantId,
     468             428 :         timeline_id: TimelineId,
     469             428 :         current_generation: Generation,
     470             428 :         lsn: Lsn,
     471             428 :         result_slot: Arc<AtomicLsn>,
     472             428 :     ) {
     473             428 :         let mut locked = self
     474             428 :             .lsn_table
     475             428 :             .write()
     476             428 :             .expect("Lock should never be poisoned");
     477             428 : 
     478             428 :         let tenant_entry = locked.tenants.entry(tenant_id).or_insert(TenantLsnState {
     479             428 :             timelines: HashMap::new(),
     480             428 :             generation: current_generation,
     481             428 :         });
     482             428 : 
     483             428 :         if tenant_entry.generation != current_generation {
     484 UBC           0 :             // Generation might have changed if we were detached and then re-attached: in this case,
     485               0 :             // state from the previous generation cannot be trusted.
     486               0 :             tenant_entry.timelines.clear();
     487               0 :             tenant_entry.generation = current_generation;
     488 CBC         428 :         }
     489                 : 
     490             428 :         tenant_entry.timelines.insert(
     491             428 :             timeline_id,
     492             428 :             PendingLsn {
     493             428 :                 projected: lsn,
     494             428 :                 result_slot,
     495             428 :             },
     496             428 :         );
     497             428 :     }
     498                 : 
     499                 :     /// Submit a list of layers for deletion: this function will return before the deletion is
     500                 :     /// persistent, but it may be executed at any time after this function enters: do not push
     501                 :     /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
     502                 :     /// references them).
     503                 :     ///
     504                 :     /// The `current_generation` is the generation of this pageserver's current attachment.  The
     505                 :     /// generations in `layers` are the generations in which those layers were written.
     506             591 :     pub(crate) async fn push_layers(
     507             591 :         &self,
     508             591 :         tenant_id: TenantId,
     509             591 :         timeline_id: TimelineId,
     510             591 :         current_generation: Generation,
     511             591 :         layers: Vec<(LayerFileName, Generation)>,
     512             591 :     ) -> Result<(), DeletionQueueError> {
     513             591 :         if current_generation.is_none() {
     514 UBC           0 :             debug!("Enqueuing deletions in legacy mode, skipping queue");
     515 CBC         518 :             let mut layer_paths = Vec::new();
     516            4356 :             for (layer, generation) in layers {
     517            3838 :                 layer_paths.push(remote_layer_path(
     518            3838 :                     &tenant_id,
     519            3838 :                     &timeline_id,
     520            3838 :                     &layer,
     521            3838 :                     generation,
     522            3838 :                 ));
     523            3838 :             }
     524             518 :             self.push_immediate(layer_paths).await?;
     525             518 :             return self.flush_immediate().await;
     526              73 :         }
     527              73 : 
     528              73 :         self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
     529             591 :     }
     530                 : 
     531                 :     /// When a Tenant has a generation, push_layers is always synchronous because
     532                 :     /// the ListValidator channel is an unbounded channel.
     533                 :     ///
     534                 :     /// This can be merged into push_layers when we remove the Generation-less mode
     535                 :     /// support (`<https://github.com/neondatabase/neon/issues/5395>`)
     536              73 :     pub(crate) fn push_layers_sync(
     537              73 :         &self,
     538              73 :         tenant_id: TenantId,
     539              73 :         timeline_id: TimelineId,
     540              73 :         current_generation: Generation,
     541              73 :         layers: Vec<(LayerFileName, Generation)>,
     542              73 :     ) -> Result<(), DeletionQueueError> {
     543              73 :         metrics::DELETION_QUEUE
     544              73 :             .keys_submitted
     545              73 :             .inc_by(layers.len() as u64);
     546              73 :         self.do_push(
     547              73 :             &self.tx,
     548              73 :             ListWriterQueueMessage::Delete(DeletionOp {
     549              73 :                 tenant_id,
     550              73 :                 timeline_id,
     551              73 :                 layers,
     552              73 :                 generation: current_generation,
     553              73 :                 objects: Vec::new(),
     554              73 :             }),
     555              73 :         )
     556              73 :     }
     557                 : 
     558                 :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     559             186 :     async fn do_flush<T>(
     560             186 :         &self,
     561             186 :         queue: &tokio::sync::mpsc::UnboundedSender<T>,
     562             186 :         msg: T,
     563             186 :         rx: tokio::sync::oneshot::Receiver<()>,
     564             186 :     ) -> Result<(), DeletionQueueError> {
     565             186 :         self.do_push(queue, msg)?;
     566             186 :         if rx.await.is_err() {
     567                 :             // This shouldn't happen if tenants are shut down before deletion queue.  If we
     568                 :             // encounter a bug like this, then a flusher will incorrectly believe it has flushed
     569                 :             // when it hasn't, possibly leading to leaking objects.
     570 UBC           0 :             error!("Deletion queue dropped flush op while client was still waiting");
     571               0 :             Err(DeletionQueueError::ShuttingDown)
     572                 :         } else {
     573 CBC         186 :             Ok(())
     574                 :         }
     575             186 :     }
     576                 : 
     577                 :     /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
     578                 :     ///
     579                 :     /// This is cancel-safe.  If you drop the future the flush may still happen in the background.
     580             170 :     pub async fn flush(&self) -> Result<(), DeletionQueueError> {
     581             170 :         let (flush_op, rx) = FlushOp::new();
     582             170 :         self.do_flush(&self.tx, ListWriterQueueMessage::Flush(flush_op), rx)
     583             170 :             .await
     584             170 :     }
     585                 : 
     586                 :     /// Issue a flush without waiting for it to complete.  This is useful on advisory flushes where
     587                 :     /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
     588                 :     /// detach where flushing is nice but not necessary.
     589                 :     ///
     590                 :     /// This function provides no guarantees of work being done.
     591              38 :     pub fn flush_advisory(&self) {
     592              38 :         let (flush_op, _) = FlushOp::new();
     593              38 : 
     594              38 :         // Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
     595              38 :         drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
     596              38 :     }
     597                 : 
     598                 :     // Wait until all previous deletions are executed
     599              16 :     pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
     600 UBC           0 :         debug!("flush_execute: flushing to deletion lists...");
     601                 :         // Flush any buffered work to deletion lists
     602 CBC          16 :         self.flush().await?;
     603                 : 
     604                 :         // Flush the backend into the executor of deletion lists
     605              16 :         let (flush_op, rx) = FlushOp::new();
     606 UBC           0 :         debug!("flush_execute: flushing backend...");
     607 CBC          16 :         self.do_flush(&self.tx, ListWriterQueueMessage::FlushExecute(flush_op), rx)
     608              16 :             .await?;
     609 UBC           0 :         debug!("flush_execute: finished flushing backend...");
     610                 : 
     611                 :         // Flush any immediate-mode deletions (the above backend flush will only flush
     612                 :         // the executor if deletions had flowed through the backend)
     613               0 :         debug!("flush_execute: flushing execution...");
     614 CBC          16 :         self.flush_immediate().await?;
     615 UBC           0 :         debug!("flush_execute: finished flushing execution...");
     616 CBC          16 :         Ok(())
     617              16 :     }
     618                 : 
     619                 :     /// This interface bypasses the persistent deletion queue, and any validation
     620                 :     /// that this pageserver is still elegible to execute the deletions.  It is for
     621                 :     /// use in timeline deletions, where the control plane is telling us we may
     622                 :     /// delete everything in the timeline.
     623                 :     ///
     624                 :     /// DO NOT USE THIS FROM GC OR COMPACTION CODE.  Use the regular `push_layers`.
     625             986 :     pub(crate) async fn push_immediate(
     626             986 :         &self,
     627             986 :         objects: Vec<RemotePath>,
     628             986 :     ) -> Result<(), DeletionQueueError> {
     629             986 :         metrics::DELETION_QUEUE
     630             986 :             .keys_submitted
     631             986 :             .inc_by(objects.len() as u64);
     632             986 :         self.executor_tx
     633             986 :             .send(DeleterMessage::Delete(objects))
     634 UBC           0 :             .await
     635 CBC         986 :             .map_err(|_| DeletionQueueError::ShuttingDown)
     636             986 :     }
     637                 : 
     638                 :     /// Companion to push_immediate.  When this returns Ok, all prior objects sent
     639                 :     /// into push_immediate have been deleted from remote storage.
     640             950 :     pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
     641             950 :         let (flush_op, rx) = FlushOp::new();
     642             950 :         self.executor_tx
     643             950 :             .send(DeleterMessage::Flush(flush_op))
     644 UBC           0 :             .await
     645 CBC         950 :             .map_err(|_| DeletionQueueError::ShuttingDown)?;
     646                 : 
     647             950 :         rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
     648             950 :     }
     649                 : }
     650                 : 
     651                 : impl DeletionQueue {
     652            1124 :     pub fn new_client(&self) -> DeletionQueueClient {
     653            1124 :         self.client.clone()
     654            1124 :     }
     655                 : 
     656                 :     /// Caller may use the returned object to construct clients with new_client.
     657                 :     /// Caller should tokio::spawn the background() members of the two worker objects returned:
     658                 :     /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
     659                 :     ///
     660                 :     /// If remote_storage is None, then the returned workers will also be None.
     661             564 :     pub fn new<C>(
     662             564 :         remote_storage: Option<GenericRemoteStorage>,
     663             564 :         control_plane_client: Option<C>,
     664             564 :         conf: &'static PageServerConf,
     665             564 :     ) -> (Self, Option<DeletionQueueWorkers<C>>)
     666             564 :     where
     667             564 :         C: ControlPlaneGenerationsApi + Send + Sync,
     668             564 :     {
     669             564 :         // Unbounded channel: enables non-async functions to submit deletions.  The actual length is
     670             564 :         // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
     671             564 :         // enough to avoid this taking pathologically large amount of memory.
     672             564 :         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
     673             564 : 
     674             564 :         // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
     675             564 :         let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
     676             564 : 
     677             564 :         // Shallow channel: it carries lists of paths, and we expect the main queueing to
     678             564 :         // happen in the backend (persistent), not in this queue.
     679             564 :         let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
     680             564 : 
     681             564 :         let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
     682             564 : 
     683             564 :         // The deletion queue has an independent cancellation token to
     684             564 :         // the general pageserver shutdown token, because it stays alive a bit
     685             564 :         // longer to flush after Tenants have all been torn down.
     686             564 :         let cancel = CancellationToken::new();
     687                 : 
     688             564 :         let remote_storage = match remote_storage {
     689                 :             None => {
     690 UBC           0 :                 return (
     691               0 :                     Self {
     692               0 :                         client: DeletionQueueClient {
     693               0 :                             tx,
     694               0 :                             executor_tx,
     695               0 :                             lsn_table: lsn_table.clone(),
     696               0 :                         },
     697               0 :                         cancel,
     698               0 :                     },
     699               0 :                     None,
     700               0 :                 )
     701                 :             }
     702 CBC         564 :             Some(r) => r,
     703             564 :         };
     704             564 : 
     705             564 :         (
     706             564 :             Self {
     707             564 :                 client: DeletionQueueClient {
     708             564 :                     tx,
     709             564 :                     executor_tx: executor_tx.clone(),
     710             564 :                     lsn_table: lsn_table.clone(),
     711             564 :                 },
     712             564 :                 cancel: cancel.clone(),
     713             564 :             },
     714             564 :             Some(DeletionQueueWorkers {
     715             564 :                 frontend: ListWriter::new(conf, rx, backend_tx, cancel.clone()),
     716             564 :                 backend: Validator::new(
     717             564 :                     conf,
     718             564 :                     backend_rx,
     719             564 :                     executor_tx,
     720             564 :                     control_plane_client,
     721             564 :                     lsn_table.clone(),
     722             564 :                     cancel.clone(),
     723             564 :                 ),
     724             564 :                 executor: Deleter::new(remote_storage, executor_rx, cancel.clone()),
     725             564 :             }),
     726             564 :         )
     727             564 :     }
     728                 : 
     729             144 :     pub async fn shutdown(&mut self, timeout: Duration) {
     730             144 :         self.cancel.cancel();
     731             144 : 
     732             144 :         match tokio::time::timeout(timeout, self.client.flush()).await {
     733                 :             Ok(Ok(())) => {
     734             144 :                 tracing::info!("Deletion queue flushed successfully on shutdown")
     735                 :             }
     736                 :             Ok(Err(DeletionQueueError::ShuttingDown)) => {
     737                 :                 // This is not harmful for correctness, but is unexpected: the deletion
     738                 :                 // queue's workers should stay alive as long as there are any client handles instantiated.
     739 UBC           0 :                 tracing::warn!("Deletion queue stopped prematurely");
     740                 :             }
     741               0 :             Err(_timeout) => {
     742               0 :                 tracing::warn!("Timed out flushing deletion queue on shutdown")
     743                 :             }
     744                 :         }
     745 CBC         144 :     }
     746                 : }
     747                 : 
     748                 : #[cfg(test)]
     749                 : mod test {
     750                 :     use camino::Utf8Path;
     751                 :     use hex_literal::hex;
     752                 :     use std::{io::ErrorKind, time::Duration};
     753                 :     use tracing::info;
     754                 : 
     755                 :     use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
     756                 :     use tokio::task::JoinHandle;
     757                 : 
     758                 :     use crate::{
     759                 :         control_plane_client::RetryForeverError,
     760                 :         repository::Key,
     761                 :         tenant::{
     762                 :             harness::TenantHarness, remote_timeline_client::remote_timeline_path,
     763                 :             storage_layer::DeltaFileName,
     764                 :         },
     765                 :     };
     766                 : 
     767                 :     use super::*;
     768                 :     pub const TIMELINE_ID: TimelineId =
     769                 :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
     770                 : 
     771                 :     pub const EXAMPLE_LAYER_NAME: LayerFileName = LayerFileName::Delta(DeltaFileName {
     772                 :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     773                 :         lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51),
     774                 :     });
     775                 : 
     776                 :     // When you need a second layer in a test.
     777                 :     pub const EXAMPLE_LAYER_NAME_ALT: LayerFileName = LayerFileName::Delta(DeltaFileName {
     778                 :         key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF),
     779                 :         lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61),
     780                 :     });
     781                 : 
     782                 :     struct TestSetup {
     783                 :         harness: TenantHarness,
     784                 :         remote_fs_dir: Utf8PathBuf,
     785                 :         storage: GenericRemoteStorage,
     786                 :         mock_control_plane: MockControlPlane,
     787                 :         deletion_queue: DeletionQueue,
     788                 :         worker_join: JoinHandle<()>,
     789                 :     }
     790                 : 
     791                 :     impl TestSetup {
     792                 :         /// Simulate a pageserver restart by destroying and recreating the deletion queue
     793               1 :         async fn restart(&mut self) {
     794               1 :             let (deletion_queue, workers) = DeletionQueue::new(
     795               1 :                 Some(self.storage.clone()),
     796               1 :                 Some(self.mock_control_plane.clone()),
     797               1 :                 self.harness.conf,
     798               1 :             );
     799                 : 
     800 UBC           0 :             tracing::debug!("Spawning worker for new queue queue");
     801 CBC           1 :             let worker_join = workers
     802               1 :                 .unwrap()
     803               1 :                 .spawn_with(&tokio::runtime::Handle::current());
     804               1 : 
     805               1 :             let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
     806               1 :             let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
     807                 : 
     808 UBC           0 :             tracing::debug!("Joining worker from previous queue");
     809 CBC           1 :             old_deletion_queue.cancel.cancel();
     810               1 :             old_worker_join
     811               1 :                 .await
     812               1 :                 .expect("Failed to join workers for previous deletion queue");
     813               1 :         }
     814                 : 
     815               3 :         fn set_latest_generation(&self, gen: Generation) {
     816               3 :             let tenant_id = self.harness.tenant_id;
     817               3 :             self.mock_control_plane
     818               3 :                 .latest_generation
     819               3 :                 .lock()
     820               3 :                 .unwrap()
     821               3 :                 .insert(tenant_id, gen);
     822               3 :         }
     823                 : 
     824                 :         /// Returns remote layer file name, suitable for use in assert_remote_files
     825               3 :         fn write_remote_layer(
     826               3 :             &self,
     827               3 :             file_name: LayerFileName,
     828               3 :             gen: Generation,
     829               3 :         ) -> anyhow::Result<String> {
     830               3 :             let tenant_id = self.harness.tenant_id;
     831               3 :             let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
     832               3 :             let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
     833               3 :             std::fs::create_dir_all(&remote_timeline_path)?;
     834               3 :             let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
     835               3 : 
     836               3 :             let content: Vec<u8> = format!("placeholder contents of {file_name}").into();
     837               3 : 
     838               3 :             std::fs::write(
     839               3 :                 remote_timeline_path.join(remote_layer_file_name.clone()),
     840               3 :                 content,
     841               3 :             )?;
     842                 : 
     843               3 :             Ok(remote_layer_file_name)
     844               3 :         }
     845                 :     }
     846                 : 
     847               4 :     #[derive(Debug, Clone)]
     848                 :     struct MockControlPlane {
     849                 :         pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantId, Generation>>>,
     850                 :     }
     851                 : 
     852                 :     impl MockControlPlane {
     853               3 :         fn new() -> Self {
     854               3 :             Self {
     855               3 :                 latest_generation: Arc::default(),
     856               3 :             }
     857               3 :         }
     858                 :     }
     859                 : 
     860                 :     #[async_trait::async_trait]
     861                 :     impl ControlPlaneGenerationsApi for MockControlPlane {
     862                 :         #[allow(clippy::diverging_sub_expression)] // False positive via async_trait
     863 UBC           0 :         async fn re_attach(&self) -> Result<HashMap<TenantId, Generation>, RetryForeverError> {
     864               0 :             unimplemented!()
     865               0 :         }
     866 CBC           4 :         async fn validate(
     867               4 :             &self,
     868               4 :             tenants: Vec<(TenantId, Generation)>,
     869               4 :         ) -> Result<HashMap<TenantId, bool>, RetryForeverError> {
     870               4 :             let mut result = HashMap::new();
     871               4 : 
     872               4 :             let latest_generation = self.latest_generation.lock().unwrap();
     873                 : 
     874               8 :             for (tenant_id, generation) in tenants {
     875               4 :                 if let Some(latest) = latest_generation.get(&tenant_id) {
     876               4 :                     result.insert(tenant_id, *latest == generation);
     877               4 :                 }
     878                 :             }
     879                 : 
     880               4 :             Ok(result)
     881               8 :         }
     882                 :     }
     883                 : 
     884               3 :     fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
     885               3 :         let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
     886               3 :         let harness = TenantHarness::create(test_name)?;
     887                 : 
     888                 :         // We do not load() the harness: we only need its config and remote_storage
     889                 : 
     890                 :         // Set up a GenericRemoteStorage targetting a directory
     891               3 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs");
     892               3 :         std::fs::create_dir_all(remote_fs_dir)?;
     893               3 :         let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
     894               3 :         let storage_config = RemoteStorageConfig {
     895               3 :             max_concurrent_syncs: std::num::NonZeroUsize::new(
     896               3 :                 remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
     897               3 :             )
     898               3 :             .unwrap(),
     899               3 :             max_sync_errors: std::num::NonZeroU32::new(
     900               3 :                 remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
     901               3 :             )
     902               3 :             .unwrap(),
     903               3 :             storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
     904               3 :         };
     905               3 :         let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
     906               3 : 
     907               3 :         let mock_control_plane = MockControlPlane::new();
     908               3 : 
     909               3 :         let (deletion_queue, worker) = DeletionQueue::new(
     910               3 :             Some(storage.clone()),
     911               3 :             Some(mock_control_plane.clone()),
     912               3 :             harness.conf,
     913               3 :         );
     914               3 : 
     915               3 :         let worker = worker.unwrap();
     916               3 :         let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
     917               3 : 
     918               3 :         Ok(TestSetup {
     919               3 :             harness,
     920               3 :             remote_fs_dir,
     921               3 :             storage,
     922               3 :             mock_control_plane,
     923               3 :             deletion_queue,
     924               3 :             worker_join,
     925               3 :         })
     926               3 :     }
     927                 : 
     928                 :     // TODO: put this in a common location so that we can share with remote_timeline_client's tests
     929               9 :     fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
     930               9 :         let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
     931               9 :         expected.sort();
     932               9 : 
     933               9 :         let mut found: Vec<String> = Vec::new();
     934               9 :         let dir = match std::fs::read_dir(remote_path) {
     935               9 :             Ok(d) => d,
     936 UBC           0 :             Err(e) => {
     937               0 :                 if e.kind() == ErrorKind::NotFound {
     938               0 :                     if expected.is_empty() {
     939                 :                         // We are asserting prefix is empty: it is expected that the dir is missing
     940               0 :                         return;
     941                 :                     } else {
     942               0 :                         assert_eq!(expected, Vec::<String>::new());
     943               0 :                         unreachable!();
     944                 :                     }
     945                 :                 } else {
     946               0 :                     panic!("Unexpected error listing {remote_path}: {e}");
     947                 :                 }
     948                 :             }
     949                 :         };
     950                 : 
     951 CBC           9 :         for entry in dir.flatten() {
     952               8 :             let entry_name = entry.file_name();
     953               8 :             let fname = entry_name.to_str().unwrap();
     954               8 :             found.push(String::from(fname));
     955               8 :         }
     956               9 :         found.sort();
     957               9 : 
     958               9 :         assert_eq!(expected, found);
     959               9 :     }
     960                 : 
     961               5 :     fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
     962               5 :         let dir = match std::fs::read_dir(directory) {
     963               4 :             Ok(d) => d,
     964                 :             Err(_) => {
     965               1 :                 assert_eq!(expected, &Vec::<String>::new());
     966               1 :                 return;
     967                 :             }
     968                 :         };
     969               4 :         let mut found = Vec::new();
     970               9 :         for dentry in dir {
     971               5 :             let dentry = dentry.unwrap();
     972               5 :             let file_name = dentry.file_name();
     973               5 :             let file_name_str = file_name.to_string_lossy();
     974               5 :             found.push(file_name_str.to_string());
     975               5 :         }
     976               4 :         found.sort();
     977               4 :         assert_eq!(expected, found);
     978               5 :     }
     979                 : 
     980               1 :     #[tokio::test]
     981               1 :     async fn deletion_queue_smoke() -> anyhow::Result<()> {
     982               1 :         // Basic test that the deletion queue processes the deletions we pass into it
     983               1 :         let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
     984               1 :         let client = ctx.deletion_queue.new_client();
     985               1 :         client.recover(HashMap::new())?;
     986                 : 
     987               1 :         let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
     988               1 :         let tenant_id = ctx.harness.tenant_id;
     989               1 : 
     990               1 :         let content: Vec<u8> = "victim1 contents".into();
     991               1 :         let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
     992               1 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
     993               1 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
     994               1 : 
     995               1 :         // Exercise the distinction between the generation of the layers
     996               1 :         // we delete, and the generation of the running Tenant.
     997               1 :         let layer_generation = Generation::new(0xdeadbeef);
     998               1 :         let now_generation = Generation::new(0xfeedbeef);
     999               1 : 
    1000               1 :         let remote_layer_file_name_1 =
    1001               1 :             format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
    1002               1 : 
    1003               1 :         // Set mock control plane state to valid for our generation
    1004               1 :         ctx.set_latest_generation(now_generation);
    1005               1 : 
    1006               1 :         // Inject a victim file to remote storage
    1007               1 :         info!("Writing");
    1008               1 :         std::fs::create_dir_all(&remote_timeline_path)?;
    1009               1 :         std::fs::write(
    1010               1 :             remote_timeline_path.join(remote_layer_file_name_1.clone()),
    1011               1 :             content,
    1012               1 :         )?;
    1013               1 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
    1014               1 : 
    1015               1 :         // File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
    1016               1 :         info!("Pushing");
    1017               1 :         client
    1018               1 :             .push_layers(
    1019               1 :                 tenant_id,
    1020               1 :                 TIMELINE_ID,
    1021               1 :                 now_generation,
    1022               1 :                 [(layer_file_name_1.clone(), layer_generation)].to_vec(),
    1023               1 :             )
    1024 UBC           0 :             .await?;
    1025 CBC           1 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
    1026               1 : 
    1027               1 :         assert_local_files(&[], &deletion_prefix);
    1028               1 : 
    1029               1 :         // File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
    1030               1 :         info!("Flushing");
    1031               1 :         client.flush().await?;
    1032               1 :         assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
    1033               1 :         assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
    1034               1 : 
    1035               1 :         // File should go away when we execute
    1036               1 :         info!("Flush-executing");
    1037               3 :         client.flush_execute().await?;
    1038               1 :         assert_remote_files(&[], &remote_timeline_path);
    1039               1 :         assert_local_files(&["header-01"], &deletion_prefix);
    1040               1 : 
    1041               1 :         // Flushing on an empty queue should succeed immediately, and not write any lists
    1042               1 :         info!("Flush-executing on empty");
    1043               3 :         client.flush_execute().await?;
    1044               1 :         assert_local_files(&["header-01"], &deletion_prefix);
    1045               1 : 
    1046               1 :         Ok(())
    1047                 :     }
    1048                 : 
    1049               1 :     #[tokio::test]
    1050               1 :     async fn deletion_queue_validation() -> anyhow::Result<()> {
    1051               1 :         let ctx = setup("deletion_queue_validation").expect("Failed test setup");
    1052               1 :         let client = ctx.deletion_queue.new_client();
    1053               1 :         client.recover(HashMap::new())?;
    1054                 : 
    1055                 :         // Generation that the control plane thinks is current
    1056               1 :         let latest_generation = Generation::new(0xdeadbeef);
    1057               1 :         // Generation that our DeletionQueue thinks the tenant is running with
    1058               1 :         let stale_generation = latest_generation.previous();
    1059               1 :         // Generation that our example layer file was written with
    1060               1 :         let layer_generation = stale_generation.previous();
    1061               1 : 
    1062               1 :         ctx.set_latest_generation(latest_generation);
    1063               1 : 
    1064               1 :         let tenant_id = ctx.harness.tenant_id;
    1065               1 :         let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
    1066               1 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
    1067                 : 
    1068                 :         // Initial state: a remote layer exists
    1069               1 :         let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
    1070               1 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
    1071               1 : 
    1072               1 :         tracing::debug!("Pushing...");
    1073               1 :         client
    1074               1 :             .push_layers(
    1075               1 :                 tenant_id,
    1076               1 :                 TIMELINE_ID,
    1077               1 :                 stale_generation,
    1078               1 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
    1079               1 :             )
    1080 UBC           0 :             .await?;
    1081                 : 
    1082                 :         // We enqueued the operation in a stale generation: it should have failed validation
    1083 CBC           1 :         tracing::debug!("Flushing...");
    1084               3 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1085               1 :         assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
    1086               1 : 
    1087               1 :         tracing::debug!("Pushing...");
    1088               1 :         client
    1089               1 :             .push_layers(
    1090               1 :                 tenant_id,
    1091               1 :                 TIMELINE_ID,
    1092               1 :                 latest_generation,
    1093               1 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
    1094               1 :             )
    1095 UBC           0 :             .await?;
    1096                 : 
    1097                 :         // We enqueued the operation in a fresh generation: it should have passed validation
    1098 CBC           1 :         tracing::debug!("Flushing...");
    1099               3 :         tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??;
    1100               1 :         assert_remote_files(&[], &remote_timeline_path);
    1101               1 : 
    1102               1 :         Ok(())
    1103                 :     }
    1104                 : 
    1105               1 :     #[tokio::test]
    1106               1 :     async fn deletion_queue_recovery() -> anyhow::Result<()> {
    1107               1 :         // Basic test that the deletion queue processes the deletions we pass into it
    1108               1 :         let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
    1109               1 :         let client = ctx.deletion_queue.new_client();
    1110               1 :         client.recover(HashMap::new())?;
    1111                 : 
    1112               1 :         let tenant_id = ctx.harness.tenant_id;
    1113               1 : 
    1114               1 :         let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
    1115               1 :         let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
    1116               1 :         let deletion_prefix = ctx.harness.conf.deletion_prefix();
    1117               1 : 
    1118               1 :         let layer_generation = Generation::new(0xdeadbeef);
    1119               1 :         let now_generation = Generation::new(0xfeedbeef);
    1120                 : 
    1121                 :         // Inject a deletion in the generation before generation_now: after restart,
    1122                 :         // this deletion should _not_ get executed (only the immediately previous
    1123                 :         // generation gets that treatment)
    1124               1 :         let remote_layer_file_name_historical =
    1125               1 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
    1126               1 :         client
    1127               1 :             .push_layers(
    1128               1 :                 tenant_id,
    1129               1 :                 TIMELINE_ID,
    1130               1 :                 now_generation.previous(),
    1131               1 :                 [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
    1132               1 :             )
    1133 UBC           0 :             .await?;
    1134                 : 
    1135                 :         // Inject a deletion in the generation before generation_now: after restart,
    1136                 :         // this deletion should get executed, because we execute deletions in the
    1137                 :         // immediately previous generation on the same node.
    1138 CBC           1 :         let remote_layer_file_name_previous =
    1139               1 :             ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
    1140               1 :         client
    1141               1 :             .push_layers(
    1142               1 :                 tenant_id,
    1143               1 :                 TIMELINE_ID,
    1144               1 :                 now_generation,
    1145               1 :                 [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(),
    1146               1 :             )
    1147 UBC           0 :             .await?;
    1148                 : 
    1149 CBC           1 :         client.flush().await?;
    1150               1 :         assert_remote_files(
    1151               1 :             &[
    1152               1 :                 &remote_layer_file_name_historical,
    1153               1 :                 &remote_layer_file_name_previous,
    1154               1 :             ],
    1155               1 :             &remote_timeline_path,
    1156               1 :         );
    1157               1 : 
    1158               1 :         // Different generatinos for the same tenant will cause two separate
    1159               1 :         // deletion lists to be emitted.
    1160               1 :         assert_local_files(
    1161               1 :             &["0000000000000001-01.list", "0000000000000002-01.list"],
    1162               1 :             &deletion_prefix,
    1163               1 :         );
    1164               1 : 
    1165               1 :         // Simulate a node restart: the latest generation advances
    1166               1 :         let now_generation = now_generation.next();
    1167               1 :         ctx.set_latest_generation(now_generation);
    1168               1 : 
    1169               1 :         // Restart the deletion queue
    1170               1 :         drop(client);
    1171               1 :         ctx.restart().await;
    1172               1 :         let client = ctx.deletion_queue.new_client();
    1173               1 :         client.recover(HashMap::from([(tenant_id, now_generation)]))?;
    1174                 : 
    1175               1 :         info!("Flush-executing");
    1176               3 :         client.flush_execute().await?;
    1177                 :         // The deletion from immediately prior generation was executed, the one from
    1178                 :         // an older generation was not.
    1179               1 :         assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path);
    1180               1 :         Ok(())
    1181                 :     }
    1182                 : }
    1183                 : 
    1184                 : /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
    1185                 : /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
    1186                 : #[cfg(test)]
    1187                 : pub(crate) mod mock {
    1188                 :     use tracing::info;
    1189                 : 
    1190                 :     use crate::tenant::remote_timeline_client::remote_layer_path;
    1191                 : 
    1192                 :     use super::*;
    1193                 :     use std::sync::{
    1194                 :         atomic::{AtomicUsize, Ordering},
    1195                 :         Arc,
    1196                 :     };
    1197                 : 
    1198                 :     pub struct ConsumerState {
    1199                 :         rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
    1200                 :         executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
    1201                 :     }
    1202                 : 
    1203                 :     impl ConsumerState {
    1204               1 :         async fn consume(&mut self, remote_storage: &GenericRemoteStorage) -> usize {
    1205               1 :             let mut executed = 0;
    1206                 : 
    1207               1 :             info!("Executing all pending deletions");
    1208                 : 
    1209                 :             // Transform all executor messages to generic frontend messages
    1210               1 :             while let Ok(msg) = self.executor_rx.try_recv() {
    1211 UBC           0 :                 match msg {
    1212               0 :                     DeleterMessage::Delete(objects) => {
    1213               0 :                         for path in objects {
    1214               0 :                             match remote_storage.delete(&path).await {
    1215                 :                                 Ok(_) => {
    1216               0 :                                     debug!("Deleted {path}");
    1217                 :                                 }
    1218               0 :                                 Err(e) => {
    1219               0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1220                 :                                 }
    1221                 :                             }
    1222               0 :                             executed += 1;
    1223                 :                         }
    1224                 :                     }
    1225               0 :                     DeleterMessage::Flush(flush_op) => {
    1226               0 :                         flush_op.notify();
    1227               0 :                     }
    1228                 :                 }
    1229                 :             }
    1230                 : 
    1231 CBC           2 :             while let Ok(msg) = self.rx.try_recv() {
    1232               1 :                 match msg {
    1233               1 :                     ListWriterQueueMessage::Delete(op) => {
    1234               1 :                         let mut objects = op.objects;
    1235               2 :                         for (layer, generation) in op.layers {
    1236               1 :                             objects.push(remote_layer_path(
    1237               1 :                                 &op.tenant_id,
    1238               1 :                                 &op.timeline_id,
    1239               1 :                                 &layer,
    1240               1 :                                 generation,
    1241               1 :                             ));
    1242               1 :                         }
    1243                 : 
    1244               2 :                         for path in objects {
    1245               1 :                             info!("Executing deletion {path}");
    1246               1 :                             match remote_storage.delete(&path).await {
    1247                 :                                 Ok(_) => {
    1248 UBC           0 :                                     debug!("Deleted {path}");
    1249                 :                                 }
    1250               0 :                                 Err(e) => {
    1251               0 :                                     error!("Failed to delete {path}, leaking object! ({e})");
    1252                 :                                 }
    1253                 :                             }
    1254 CBC           1 :                             executed += 1;
    1255                 :                         }
    1256                 :                     }
    1257 UBC           0 :                     ListWriterQueueMessage::Flush(op) => {
    1258               0 :                         op.notify();
    1259               0 :                     }
    1260               0 :                     ListWriterQueueMessage::FlushExecute(op) => {
    1261               0 :                         // We have already executed all prior deletions because mock does them inline
    1262               0 :                         op.notify();
    1263               0 :                     }
    1264               0 :                     ListWriterQueueMessage::Recover(_) => {
    1265               0 :                         // no-op in mock
    1266               0 :                     }
    1267                 :                 }
    1268 CBC           1 :                 info!("All pending deletions have been executed");
    1269                 :             }
    1270                 : 
    1271               1 :             executed
    1272               1 :         }
    1273                 :     }
    1274                 : 
    1275                 :     pub struct MockDeletionQueue {
    1276                 :         tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
    1277                 :         executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
    1278                 :         executed: Arc<AtomicUsize>,
    1279                 :         remote_storage: Option<GenericRemoteStorage>,
    1280                 :         consumer: std::sync::Mutex<ConsumerState>,
    1281                 :         lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
    1282                 :     }
    1283                 : 
    1284                 :     impl MockDeletionQueue {
    1285              41 :         pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
    1286              41 :             let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    1287              41 :             let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
    1288              41 : 
    1289              41 :             let executed = Arc::new(AtomicUsize::new(0));
    1290              41 : 
    1291              41 :             Self {
    1292              41 :                 tx,
    1293              41 :                 executor_tx,
    1294              41 :                 executed,
    1295              41 :                 remote_storage,
    1296              41 :                 consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }),
    1297              41 :                 lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
    1298              41 :             }
    1299              41 :         }
    1300                 : 
    1301                 :         #[allow(clippy::await_holding_lock)]
    1302               1 :         pub async fn pump(&self) {
    1303               1 :             if let Some(remote_storage) = &self.remote_storage {
    1304                 :                 // Permit holding mutex across await, because this is only ever
    1305                 :                 // called once at a time in tests.
    1306               1 :                 let mut locked = self.consumer.lock().unwrap();
    1307               1 :                 let count = locked.consume(remote_storage).await;
    1308               1 :                 self.executed.fetch_add(count, Ordering::Relaxed);
    1309 UBC           0 :             }
    1310 CBC           1 :         }
    1311                 : 
    1312              47 :         pub(crate) fn new_client(&self) -> DeletionQueueClient {
    1313              47 :             DeletionQueueClient {
    1314              47 :                 tx: self.tx.clone(),
    1315              47 :                 executor_tx: self.executor_tx.clone(),
    1316              47 :                 lsn_table: self.lsn_table.clone(),
    1317              47 :             }
    1318              47 :         }
    1319                 :     }
    1320                 : }
        

Generated by: LCOV version 2.1-beta