LCOV - code coverage report
Current view: top level - pageserver/src/tenant - upload_queue.rs (source / functions) Coverage Total Hit
Test: 47d527da5e8405637e322911c55c08727c2fd272.info Lines: 93.9 % 857 805
Test Date: 2025-01-16 17:37:50 Functions: 92.6 % 81 75

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet, VecDeque};
       2              : use std::fmt::Debug;
       3              : use std::sync::atomic::AtomicU32;
       4              : use std::sync::Arc;
       5              : 
       6              : use super::remote_timeline_client::is_same_remote_layer_path;
       7              : use super::storage_layer::AsLayerDesc as _;
       8              : use super::storage_layer::LayerName;
       9              : use super::storage_layer::ResidentLayer;
      10              : use crate::tenant::metadata::TimelineMetadata;
      11              : use crate::tenant::remote_timeline_client::index::IndexPart;
      12              : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
      13              : use utils::generation::Generation;
      14              : use utils::lsn::{AtomicLsn, Lsn};
      15              : 
      16              : use chrono::NaiveDateTime;
      17              : use once_cell::sync::Lazy;
      18              : use tracing::info;
      19              : 
      20              : /// Kill switch for upload queue reordering in case it causes problems.
      21              : /// TODO: remove this once we have confidence in it.
      22              : static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy<bool> =
      23          192 :     Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true"));
      24              : 
      25              : /// Kill switch for index upload coalescing in case it causes problems.
      26              : /// TODO: remove this once we have confidence in it.
      27              : static DISABLE_UPLOAD_QUEUE_INDEX_COALESCING: Lazy<bool> =
      28            8 :     Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_INDEX_COALESCING").as_deref() == Ok("true"));
      29              : 
      30              : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
      31              : // memory for Uninitialized variants. Doesn't matter in practice, there are not
      32              : // that many upload queues in a running pageserver, and most of them are initialized
      33              : // anyway.
      34              : #[allow(clippy::large_enum_variant)]
      35              : pub enum UploadQueue {
      36              :     Uninitialized,
      37              :     Initialized(UploadQueueInitialized),
      38              :     Stopped(UploadQueueStopped),
      39              : }
      40              : 
      41              : impl UploadQueue {
      42            0 :     pub fn as_str(&self) -> &'static str {
      43            0 :         match self {
      44            0 :             UploadQueue::Uninitialized => "Uninitialized",
      45            0 :             UploadQueue::Initialized(_) => "Initialized",
      46            0 :             UploadQueue::Stopped(_) => "Stopped",
      47              :         }
      48            0 :     }
      49              : }
      50              : 
      51              : #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
      52              : pub enum OpType {
      53              :     MayReorder,
      54              :     FlushDeletion,
      55              : }
      56              : 
      57              : /// This keeps track of queued and in-progress tasks.
      58              : pub struct UploadQueueInitialized {
      59              :     /// Maximum number of inprogress tasks to schedule. 0 is no limit.
      60              :     pub(crate) inprogress_limit: usize,
      61              : 
      62              :     /// Counter to assign task IDs
      63              :     pub(crate) task_counter: u64,
      64              : 
      65              :     /// The next uploaded index_part.json; assumed to be dirty.
      66              :     ///
      67              :     /// Should not be read, directly except for layer file updates. Instead you should add a
      68              :     /// projected field.
      69              :     pub(crate) dirty: IndexPart,
      70              : 
      71              :     /// The latest remote persisted IndexPart.
      72              :     ///
      73              :     /// Each completed metadata upload will update this. The second item is the task_id which last
      74              :     /// updated the value, used to ensure we never store an older value over a newer one.
      75              :     pub(crate) clean: (IndexPart, Option<u64>),
      76              : 
      77              :     /// How many file uploads or deletions been scheduled, since the
      78              :     /// last (scheduling of) metadata index upload?
      79              :     pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
      80              : 
      81              :     /// The Lsn is only updated after our generation has been validated with
      82              :     /// the control plane (unlesss a timeline's generation is None, in which case
      83              :     /// we skip validation)
      84              :     pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
      85              : 
      86              :     /// Tasks that are currently in-progress. In-progress means that a tokio Task
      87              :     /// has been launched for it. An in-progress task can be busy uploading, but it can
      88              :     /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
      89              :     /// be waiting for retry in `exponential_backoff`.
      90              :     pub inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
      91              : 
      92              :     /// Queued operations that have not been launched yet. They might depend on previous
      93              :     /// tasks to finish. For example, metadata upload cannot be performed before all
      94              :     /// preceding layer file uploads have completed.
      95              :     pub queued_operations: VecDeque<UploadOp>,
      96              : 
      97              :     /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
      98              :     /// for error logging.
      99              :     ///
     100              :     /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
     101              :     /// bug causing leaks, then it's better to not leave this enabled for production builds.
     102              :     #[cfg(feature = "testing")]
     103              :     pub(crate) dangling_files: HashMap<LayerName, Generation>,
     104              : 
     105              :     /// Ensure we order file operations correctly.
     106              :     pub(crate) recently_deleted: HashSet<(LayerName, Generation)>,
     107              : 
     108              :     /// Deletions that are blocked by the tenant configuration
     109              :     pub(crate) blocked_deletions: Vec<Delete>,
     110              : 
     111              :     /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
     112              :     pub(crate) shutting_down: bool,
     113              : 
     114              :     /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
     115              :     /// wait on until one of them stops the queue. The semaphore is closed when
     116              :     /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
     117              :     pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
     118              : }
     119              : 
     120              : impl UploadQueueInitialized {
     121            8 :     pub(super) fn no_pending_work(&self) -> bool {
     122            8 :         self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
     123            8 :     }
     124              : 
     125            0 :     pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
     126            0 :         self.visible_remote_consistent_lsn.load()
     127            0 :     }
     128              : 
     129            0 :     pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     130            0 :         let lsn = self.clean.0.metadata.disk_consistent_lsn();
     131            0 :         self.clean.1.map(|_| lsn)
     132            0 :     }
     133              : 
     134              :     /// Returns and removes the next ready operation from the queue, if any. This isn't necessarily
     135              :     /// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump
     136              :     /// the queue if it doesn't conflict with operations ahead of it.
     137              :     ///
     138              :     /// Also returns any operations that were coalesced into this one, e.g. multiple index uploads.
     139              :     ///
     140              :     /// None may be returned even if the queue isn't empty, if no operations are ready yet.
     141              :     ///
     142              :     /// NB: this is quadratic, but queues are expected to be small, and bounded by inprogress_limit.
     143        12964 :     pub fn next_ready(&mut self) -> Option<(UploadOp, Vec<UploadOp>)> {
     144        12964 :         // If inprogress_tasks is already at limit, don't schedule anything more.
     145        12964 :         if self.inprogress_limit > 0 && self.inprogress_tasks.len() >= self.inprogress_limit {
     146            6 :             return None;
     147        12958 :         }
     148              : 
     149        25429 :         for (i, candidate) in self.queued_operations.iter().enumerate() {
     150              :             // If this candidate is ready, go for it. Otherwise, try the next one.
     151        25429 :             if self.is_ready(i) {
     152              :                 // Shutdown operations are left at the head of the queue, to prevent further
     153              :                 // operations from starting. Signal that we're ready to shut down.
     154         4924 :                 if matches!(candidate, UploadOp::Shutdown) {
     155           10 :                     assert!(self.inprogress_tasks.is_empty(), "shutdown with tasks");
     156           10 :                     assert_eq!(i, 0, "shutdown not at head of queue");
     157           10 :                     self.shutdown_ready.close();
     158           10 :                     return None;
     159         4914 :                 }
     160         4914 : 
     161         4914 :                 let mut op = self.queued_operations.remove(i).expect("i can't disappear");
     162         4914 : 
     163         4914 :                 // Coalesce any back-to-back index uploads by only uploading the newest one that's
     164         4914 :                 // ready. This typically happens with layer/index/layer/index/... sequences, where
     165         4914 :                 // the layers bypass the indexes, leaving the indexes queued.
     166         4914 :                 //
     167         4914 :                 // If other operations are interleaved between index uploads we don't try to
     168         4914 :                 // coalesce them, since we may as well update the index concurrently with them.
     169         4914 :                 // This keeps the index fresh and avoids starvation.
     170         4914 :                 //
     171         4914 :                 // NB: we assume that all uploaded indexes have the same remote path. This
     172         4914 :                 // is true at the time of writing: the path only depends on the tenant,
     173         4914 :                 // timeline and generation, all of which are static for a timeline instance.
     174         4914 :                 // Otherwise, we must be careful not to coalesce different paths.
     175         4914 :                 let mut coalesced_ops = Vec::new();
     176         4914 :                 if matches!(op, UploadOp::UploadMetadata { .. }) {
     177         1533 :                     while let Some(UploadOp::UploadMetadata { .. }) = self.queued_operations.get(i)
     178              :                     {
     179           16 :                         if *DISABLE_UPLOAD_QUEUE_INDEX_COALESCING {
     180            0 :                             break;
     181           16 :                         }
     182           16 :                         if !self.is_ready(i) {
     183            6 :                             break;
     184           10 :                         }
     185           10 :                         coalesced_ops.push(op);
     186           10 :                         op = self.queued_operations.remove(i).expect("i can't disappear");
     187              :                     }
     188         3391 :                 }
     189              : 
     190         4914 :                 return Some((op, coalesced_ops));
     191        20505 :             }
     192              : 
     193              :             // Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up.
     194        20505 :             if matches!(candidate, UploadOp::Barrier(_) | UploadOp::Shutdown) {
     195         2392 :                 return None;
     196        18113 :             }
     197        18113 : 
     198        18113 :             // If upload queue reordering is disabled, bail out after the first operation.
     199        18113 :             if *DISABLE_UPLOAD_QUEUE_REORDERING {
     200            0 :                 return None;
     201        18113 :             }
     202              :         }
     203         5642 :         None
     204        12964 :     }
     205              : 
     206              :     /// Returns true if the queued operation at the given position is ready to be uploaded, i.e. if
     207              :     /// it doesn't conflict with any in-progress or queued operations ahead of it. Operations are
     208              :     /// allowed to skip the queue when it's safe to do so, to increase parallelism.
     209              :     ///
     210              :     /// The position must be valid for the queue size.
     211        25445 :     fn is_ready(&self, pos: usize) -> bool {
     212        25445 :         let candidate = self.queued_operations.get(pos).expect("invalid position");
     213        25445 :         self
     214        25445 :             // Look at in-progress operations, in random order.
     215        25445 :             .inprogress_tasks
     216        25445 :             .values()
     217      1167033 :             .map(|task| &task.op)
     218        25445 :             // Then queued operations ahead of the candidate, front-to-back.
     219        25445 :             .chain(self.queued_operations.iter().take(pos))
     220        25445 :             // Keep track of the active index ahead of each operation. This is used to ensure that
     221        25445 :             // an upload doesn't skip the queue too far, such that it modifies a layer that's
     222        25445 :             // referenced by an active index.
     223        25445 :             //
     224        25445 :             // It's okay that in-progress operations are emitted in random order above, since at
     225        25445 :             // most one of them can be an index upload (enforced by can_bypass).
     226      1181313 :             .scan(&self.clean.0, |next_active_index, op| {
     227      1181313 :                 let active_index = *next_active_index;
     228      1181313 :                 if let UploadOp::UploadMetadata { ref uploaded } = op {
     229        16182 :                     *next_active_index = uploaded; // stash index for next operation after this
     230      1165131 :                 }
     231      1181313 :                 Some((op, active_index))
     232      1181313 :             })
     233        25445 :             // Check if the candidate can bypass all of them.
     234      1181313 :             .all(|(op, active_index)| candidate.can_bypass(op, active_index))
     235        25445 :     }
     236              : 
     237              :     /// Returns the number of in-progress deletion operations.
     238              :     #[cfg(test)]
     239            2 :     pub(crate) fn num_inprogress_deletions(&self) -> usize {
     240            2 :         self.inprogress_tasks
     241            2 :             .iter()
     242            2 :             .filter(|(_, t)| matches!(t.op, UploadOp::Delete(_)))
     243            2 :             .count()
     244            2 :     }
     245              : 
     246              :     /// Returns the number of in-progress layer uploads.
     247              :     #[cfg(test)]
     248            4 :     pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
     249            4 :         self.inprogress_tasks
     250            4 :             .iter()
     251            6 :             .filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
     252            4 :             .count()
     253            4 :     }
     254              : 
     255              :     /// Test helper that schedules all ready operations into inprogress_tasks, and returns
     256              :     /// references to them.
     257              :     ///
     258              :     /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into
     259              :     /// UploadQueue, so we can use the same code path.
     260              :     #[cfg(test)]
     261           78 :     fn schedule_ready(&mut self) -> Vec<Arc<UploadTask>> {
     262           78 :         let mut tasks = Vec::new();
     263              :         // NB: schedule operations one by one, to handle conflicts with inprogress_tasks.
     264          172 :         while let Some((op, coalesced_ops)) = self.next_ready() {
     265           94 :             self.task_counter += 1;
     266           94 :             let task = Arc::new(UploadTask {
     267           94 :                 task_id: self.task_counter,
     268           94 :                 op,
     269           94 :                 coalesced_ops,
     270           94 :                 retries: 0.into(),
     271           94 :             });
     272           94 :             self.inprogress_tasks.insert(task.task_id, task.clone());
     273           94 :             tasks.push(task);
     274           94 :         }
     275           78 :         tasks
     276           78 :     }
     277              : 
     278              :     /// Test helper that marks an operation as completed, removing it from inprogress_tasks.
     279              :     ///
     280              :     /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into
     281              :     /// UploadQueue, so we can use the same code path.
     282              :     #[cfg(test)]
     283           58 :     fn complete(&mut self, task_id: u64) {
     284           58 :         let Some(task) = self.inprogress_tasks.remove(&task_id) else {
     285            0 :             return;
     286              :         };
     287              :         // Update the clean index on uploads.
     288           58 :         if let UploadOp::UploadMetadata { ref uploaded } = task.op {
     289           16 :             if task.task_id > self.clean.1.unwrap_or_default() {
     290           16 :                 self.clean = (*uploaded.clone(), Some(task.task_id));
     291           16 :             }
     292           42 :         }
     293           58 :     }
     294              : }
     295              : 
     296              : #[derive(Clone, Copy)]
     297              : pub(super) enum SetDeletedFlagProgress {
     298              :     NotRunning,
     299              :     InProgress(NaiveDateTime),
     300              :     Successful(NaiveDateTime),
     301              : }
     302              : 
     303              : pub struct UploadQueueStoppedDeletable {
     304              :     pub(super) upload_queue_for_deletion: UploadQueueInitialized,
     305              :     pub(super) deleted_at: SetDeletedFlagProgress,
     306              : }
     307              : 
     308              : pub enum UploadQueueStopped {
     309              :     Deletable(UploadQueueStoppedDeletable),
     310              :     Uninitialized,
     311              : }
     312              : 
     313              : #[derive(thiserror::Error, Debug)]
     314              : pub enum NotInitialized {
     315              :     #[error("queue is in state Uninitialized")]
     316              :     Uninitialized,
     317              :     #[error("queue is in state Stopped")]
     318              :     Stopped,
     319              :     #[error("queue is shutting down")]
     320              :     ShuttingDown,
     321              : }
     322              : 
     323              : impl NotInitialized {
     324            0 :     pub(crate) fn is_stopping(&self) -> bool {
     325              :         use NotInitialized::*;
     326            0 :         match self {
     327            0 :             Uninitialized => false,
     328            0 :             Stopped => true,
     329            0 :             ShuttingDown => true,
     330              :         }
     331            0 :     }
     332              : }
     333              : 
     334              : impl UploadQueue {
     335          448 :     pub fn initialize_empty_remote(
     336          448 :         &mut self,
     337          448 :         metadata: &TimelineMetadata,
     338          448 :         inprogress_limit: usize,
     339          448 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     340          448 :         match self {
     341          448 :             UploadQueue::Uninitialized => (),
     342              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     343            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     344              :             }
     345              :         }
     346              : 
     347          448 :         info!("initializing upload queue for empty remote");
     348              : 
     349          448 :         let index_part = IndexPart::empty(metadata.clone());
     350          448 : 
     351          448 :         let state = UploadQueueInitialized {
     352          448 :             inprogress_limit,
     353          448 :             dirty: index_part.clone(),
     354          448 :             clean: (index_part, None),
     355          448 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     356          448 :             visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
     357          448 :             // what follows are boring default initializations
     358          448 :             task_counter: 0,
     359          448 :             inprogress_tasks: HashMap::new(),
     360          448 :             queued_operations: VecDeque::new(),
     361          448 :             #[cfg(feature = "testing")]
     362          448 :             dangling_files: HashMap::new(),
     363          448 :             recently_deleted: HashSet::new(),
     364          448 :             blocked_deletions: Vec::new(),
     365          448 :             shutting_down: false,
     366          448 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     367          448 :         };
     368          448 : 
     369          448 :         *self = UploadQueue::Initialized(state);
     370          448 :         Ok(self.initialized_mut().expect("we just set it"))
     371          448 :     }
     372              : 
     373           22 :     pub fn initialize_with_current_remote_index_part(
     374           22 :         &mut self,
     375           22 :         index_part: &IndexPart,
     376           22 :         inprogress_limit: usize,
     377           22 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     378           22 :         match self {
     379           22 :             UploadQueue::Uninitialized => (),
     380              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     381            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     382              :             }
     383              :         }
     384              : 
     385           22 :         info!(
     386            0 :             "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
     387            0 :             index_part.metadata.disk_consistent_lsn()
     388              :         );
     389              : 
     390           22 :         let state = UploadQueueInitialized {
     391           22 :             inprogress_limit,
     392           22 :             dirty: index_part.clone(),
     393           22 :             clean: (index_part.clone(), None),
     394           22 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     395           22 :             visible_remote_consistent_lsn: Arc::new(
     396           22 :                 index_part.metadata.disk_consistent_lsn().into(),
     397           22 :             ),
     398           22 :             // what follows are boring default initializations
     399           22 :             task_counter: 0,
     400           22 :             inprogress_tasks: HashMap::new(),
     401           22 :             queued_operations: VecDeque::new(),
     402           22 :             #[cfg(feature = "testing")]
     403           22 :             dangling_files: HashMap::new(),
     404           22 :             recently_deleted: HashSet::new(),
     405           22 :             blocked_deletions: Vec::new(),
     406           22 :             shutting_down: false,
     407           22 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     408           22 :         };
     409           22 : 
     410           22 :         *self = UploadQueue::Initialized(state);
     411           22 :         Ok(self.initialized_mut().expect("we just set it"))
     412           22 :     }
     413              : 
     414         8818 :     pub fn initialized_mut(&mut self) -> Result<&mut UploadQueueInitialized, NotInitialized> {
     415              :         use UploadQueue::*;
     416         8818 :         match self {
     417            0 :             Uninitialized => Err(NotInitialized::Uninitialized),
     418         8818 :             Initialized(x) => {
     419         8818 :                 if x.shutting_down {
     420            0 :                     Err(NotInitialized::ShuttingDown)
     421              :                 } else {
     422         8818 :                     Ok(x)
     423              :                 }
     424              :             }
     425            0 :             Stopped(_) => Err(NotInitialized::Stopped),
     426              :         }
     427         8818 :     }
     428              : 
     429            2 :     pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
     430            2 :         match self {
     431              :             UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
     432            0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     433              :             }
     434              :             UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
     435            0 :                 anyhow::bail!("queue is in state Stopped(Uninitialized)")
     436              :             }
     437            2 :             UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
     438              :         }
     439            2 :     }
     440              : }
     441              : 
     442              : /// An in-progress upload or delete task.
     443              : #[derive(Debug)]
     444              : pub struct UploadTask {
     445              :     /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
     446              :     pub task_id: u64,
     447              :     /// Number of task retries.
     448              :     pub retries: AtomicU32,
     449              :     /// The upload operation.
     450              :     pub op: UploadOp,
     451              :     /// Any upload operations that were coalesced into this operation. This typically happens with
     452              :     /// back-to-back index uploads, see `UploadQueueInitialized::next_ready()`.
     453              :     pub coalesced_ops: Vec<UploadOp>,
     454              : }
     455              : 
     456              : /// A deletion of some layers within the lifetime of a timeline.  This is not used
     457              : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
     458              : #[derive(Debug, Clone)]
     459              : pub struct Delete {
     460              :     pub layers: Vec<(LayerName, LayerFileMetadata)>,
     461              : }
     462              : 
     463              : #[derive(Clone, Debug)]
     464              : pub enum UploadOp {
     465              :     /// Upload a layer file. The last field indicates the last operation for thie file.
     466              :     UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
     467              : 
     468              :     /// Upload a index_part.json file
     469              :     UploadMetadata {
     470              :         /// The next [`UploadQueueInitialized::clean`] after this upload succeeds.
     471              :         uploaded: Box<IndexPart>,
     472              :     },
     473              : 
     474              :     /// Delete layer files
     475              :     Delete(Delete),
     476              : 
     477              :     /// Barrier. When the barrier operation is reached, the channel is closed.
     478              :     Barrier(tokio::sync::watch::Sender<()>),
     479              : 
     480              :     /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
     481              :     /// this is the same as a Barrier.
     482              :     Shutdown,
     483              : }
     484              : 
     485              : impl std::fmt::Display for UploadOp {
     486            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     487            0 :         match self {
     488            0 :             UploadOp::UploadLayer(layer, metadata, mode) => {
     489            0 :                 write!(
     490            0 :                     f,
     491            0 :                     "UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
     492            0 :                     layer, metadata.file_size, metadata.generation, mode
     493            0 :                 )
     494              :             }
     495            0 :             UploadOp::UploadMetadata { uploaded, .. } => {
     496            0 :                 write!(
     497            0 :                     f,
     498            0 :                     "UploadMetadata(lsn: {})",
     499            0 :                     uploaded.metadata.disk_consistent_lsn()
     500            0 :                 )
     501              :             }
     502            0 :             UploadOp::Delete(delete) => {
     503            0 :                 write!(f, "Delete({} layers)", delete.layers.len())
     504              :             }
     505            0 :             UploadOp::Barrier(_) => write!(f, "Barrier"),
     506            0 :             UploadOp::Shutdown => write!(f, "Shutdown"),
     507              :         }
     508            0 :     }
     509              : }
     510              : 
     511              : impl UploadOp {
     512              :     /// Returns true if self can bypass other, i.e. if the operations don't conflict. index is the
     513              :     /// active index when other would be uploaded -- if we allow self to bypass other, this would
     514              :     /// be the active index when self is uploaded.
     515      1181361 :     pub fn can_bypass(&self, other: &UploadOp, index: &IndexPart) -> bool {
     516      1181361 :         match (self, other) {
     517              :             // Nothing can bypass a barrier or shutdown, and it can't bypass anything.
     518         2400 :             (UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false,
     519            8 :             (UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,
     520              : 
     521              :             // Uploads and deletes can bypass each other unless they're for the same file.
     522        19877 :             (UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
     523        19877 :                 let aname = &a.layer_desc().layer_name();
     524        19877 :                 let bname = &b.layer_desc().layer_name();
     525        19877 :                 !is_same_remote_layer_path(aname, ameta, bname, bmeta)
     526              :             }
     527           27 :             (UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
     528      1137462 :             | (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
     529      1137503 :                 d.layers.iter().all(|(dname, dmeta)| {
     530      1137503 :                     !is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
     531      1137503 :                 })
     532              :             }
     533              : 
     534              :             // Deletes are idempotent and can always bypass each other.
     535         3029 :             (UploadOp::Delete(_), UploadOp::Delete(_)) => true,
     536              : 
     537              :             // Uploads and deletes can bypass an index upload as long as neither the uploaded index
     538              :             // nor the active index below it references the file. A layer can't be modified or
     539              :             // deleted while referenced by an index.
     540              :             //
     541              :             // Similarly, index uploads can bypass uploads and deletes as long as neither the
     542              :             // uploaded index nor the active index references the file (the latter would be
     543              :             // incorrect use by the caller).
     544           78 :             (UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
     545         3378 :             | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
     546         3456 :                 let uname = u.layer_desc().layer_name();
     547         3456 :                 !i.references(&uname, umeta) && !index.references(&uname, umeta)
     548              :             }
     549        14730 :             (UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i })
     550          170 :             | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => {
     551        14900 :                 d.layers.iter().all(|(dname, dmeta)| {
     552        14900 :                     !i.references(dname, dmeta) && !index.references(dname, dmeta)
     553        14900 :                 })
     554              :             }
     555              : 
     556              :             // Indexes can never bypass each other. They can coalesce though, and
     557              :             // `UploadQueue::next_ready()` currently does this when possible.
     558          202 :             (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
     559              :         }
     560      1181361 :     }
     561              : }
     562              : 
     563              : #[cfg(test)]
     564              : mod tests {
     565              :     use super::*;
     566              :     use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
     567              :     use crate::tenant::storage_layer::layer::local_layer_path;
     568              :     use crate::tenant::storage_layer::Layer;
     569              :     use crate::tenant::Timeline;
     570              :     use crate::DEFAULT_PG_VERSION;
     571              :     use itertools::Itertools as _;
     572              :     use std::str::FromStr as _;
     573              :     use utils::shard::{ShardCount, ShardIndex, ShardNumber};
     574              : 
     575              :     /// Test helper which asserts that two operations are the same, in lieu of UploadOp PartialEq.
     576              :     #[track_caller]
     577           98 :     fn assert_same_op(a: &UploadOp, b: &UploadOp) {
     578              :         use UploadOp::*;
     579           98 :         match (a, b) {
     580           44 :             (UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => {
     581           44 :                 assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name());
     582           44 :                 assert_eq!(ameta, bmeta);
     583           44 :                 assert_eq!(atype, btype);
     584              :             }
     585           22 :             (Delete(a), Delete(b)) => assert_eq!(a.layers, b.layers),
     586           28 :             (UploadMetadata { uploaded: a }, UploadMetadata { uploaded: b }) => assert_eq!(a, b),
     587            4 :             (Barrier(_), Barrier(_)) => {}
     588            0 :             (Shutdown, Shutdown) => {}
     589            0 :             (a, b) => panic!("{a:?} != {b:?}"),
     590              :         }
     591           98 :     }
     592              : 
     593              :     /// Test helper which asserts that two sets of operations are the same.
     594              :     #[track_caller]
     595           22 :     fn assert_same_ops<'a>(
     596           22 :         a: impl IntoIterator<Item = &'a UploadOp>,
     597           22 :         b: impl IntoIterator<Item = &'a UploadOp>,
     598           22 :     ) {
     599           22 :         a.into_iter()
     600           22 :             .zip_eq(b)
     601           58 :             .for_each(|(a, b)| assert_same_op(a, b))
     602           22 :     }
     603              : 
     604              :     /// Test helper to construct a test timeline.
     605              :     ///
     606              :     /// TODO: it really shouldn't be necessary to construct an entire tenant and timeline just to
     607              :     /// test the upload queue -- decouple ResidentLayer from Timeline.
     608              :     ///
     609              :     /// TODO: the upload queue uses TimelineMetadata::example() instead, because there's no way to
     610              :     /// obtain a TimelineMetadata from a Timeline.
     611           24 :     fn make_timeline() -> Arc<Timeline> {
     612           24 :         // Grab the current test name from the current thread name.
     613           24 :         // TODO: TenantHarness shouldn't take a &'static str, but just leak the test name for now.
     614           24 :         let test_name = std::thread::current().name().unwrap().to_string();
     615           24 :         let test_name = Box::leak(test_name.into_boxed_str());
     616           24 : 
     617           24 :         let runtime = tokio::runtime::Builder::new_current_thread()
     618           24 :             .enable_all()
     619           24 :             .build()
     620           24 :             .expect("failed to create runtime");
     621           24 : 
     622           24 :         runtime
     623           24 :             .block_on(async {
     624           24 :                 let harness = TenantHarness::create(test_name).await?;
     625           24 :                 let (tenant, ctx) = harness.load().await;
     626           24 :                 tenant
     627           24 :                     .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
     628           24 :                     .await
     629           24 :             })
     630           24 :             .expect("failed to create timeline")
     631           24 :     }
     632              : 
     633              :     /// Test helper to construct an (empty) resident layer.
     634           60 :     fn make_layer(timeline: &Arc<Timeline>, name: &str) -> ResidentLayer {
     635           60 :         make_layer_with_size(timeline, name, 0)
     636           60 :     }
     637              : 
     638              :     /// Test helper to construct a resident layer with the given size.
     639           66 :     fn make_layer_with_size(timeline: &Arc<Timeline>, name: &str, size: usize) -> ResidentLayer {
     640           66 :         let metadata = LayerFileMetadata {
     641           66 :             generation: timeline.generation,
     642           66 :             shard: timeline.get_shard_index(),
     643           66 :             file_size: size as u64,
     644           66 :         };
     645           66 :         make_layer_with_metadata(timeline, name, metadata)
     646           66 :     }
     647              : 
     648              :     /// Test helper to construct a layer with the given metadata.
     649           98 :     fn make_layer_with_metadata(
     650           98 :         timeline: &Arc<Timeline>,
     651           98 :         name: &str,
     652           98 :         metadata: LayerFileMetadata,
     653           98 :     ) -> ResidentLayer {
     654           98 :         let name = LayerName::from_str(name).expect("invalid name");
     655           98 :         let local_path = local_layer_path(
     656           98 :             timeline.conf,
     657           98 :             &timeline.tenant_shard_id,
     658           98 :             &timeline.timeline_id,
     659           98 :             &name,
     660           98 :             &metadata.generation,
     661           98 :         );
     662           98 :         std::fs::write(&local_path, vec![0; metadata.file_size as usize])
     663           98 :             .expect("failed to write file");
     664           98 :         Layer::for_resident(timeline.conf, timeline, local_path, name, metadata)
     665           98 :     }
     666              : 
     667              :     /// Test helper to add a layer to an index and return a new index.
     668           12 :     fn index_with(index: &IndexPart, layer: &ResidentLayer) -> Box<IndexPart> {
     669           12 :         let mut index = index.clone();
     670           12 :         index
     671           12 :             .layer_metadata
     672           12 :             .insert(layer.layer_desc().layer_name(), layer.metadata());
     673           12 :         Box::new(index)
     674           12 :     }
     675              : 
     676              :     /// Test helper to remove a layer from an index and return a new index.
     677            4 :     fn index_without(index: &IndexPart, layer: &ResidentLayer) -> Box<IndexPart> {
     678            4 :         let mut index = index.clone();
     679            4 :         index
     680            4 :             .layer_metadata
     681            4 :             .remove(&layer.layer_desc().layer_name());
     682            4 :         Box::new(index)
     683            4 :     }
     684              : 
     685              :     /// Nothing can bypass a barrier, and it can't bypass inprogress tasks.
     686              :     #[test]
     687            2 :     fn schedule_barrier() -> anyhow::Result<()> {
     688            2 :         let mut queue = UploadQueue::Uninitialized;
     689            2 :         let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
     690            2 :         let tli = make_timeline();
     691            2 : 
     692            2 :         let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
     693            2 :         let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     694            2 :         let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     695            2 :         let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     696            2 :         let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     697            2 :         let (barrier, _) = tokio::sync::watch::channel(());
     698            2 : 
     699            2 :         // Enqueue non-conflicting upload, delete, and index before and after a barrier.
     700            2 :         let ops = [
     701            2 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
     702            2 :             UploadOp::Delete(Delete {
     703            2 :                 layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
     704            2 :             }),
     705            2 :             UploadOp::UploadMetadata {
     706            2 :                 uploaded: index.clone(),
     707            2 :             },
     708            2 :             UploadOp::Barrier(barrier),
     709            2 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
     710            2 :             UploadOp::Delete(Delete {
     711            2 :                 layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
     712            2 :             }),
     713            2 :             UploadOp::UploadMetadata {
     714            2 :                 uploaded: index.clone(),
     715            2 :             },
     716            2 :         ];
     717            2 : 
     718            2 :         queue.queued_operations.extend(ops.clone());
     719            2 : 
     720            2 :         // Schedule the initial operations ahead of the barrier.
     721            2 :         let tasks = queue.schedule_ready();
     722            2 : 
     723            6 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]);
     724            2 :         assert!(matches!(
     725            2 :             queue.queued_operations.front(),
     726              :             Some(&UploadOp::Barrier(_))
     727              :         ));
     728              : 
     729              :         // Complete the initial operations. The barrier isn't scheduled while they're pending.
     730            8 :         for task in tasks {
     731            6 :             assert!(queue.schedule_ready().is_empty());
     732            6 :             queue.complete(task.task_id);
     733              :         }
     734              : 
     735              :         // Schedule the barrier. The later tasks won't schedule until it completes.
     736            2 :         let tasks = queue.schedule_ready();
     737            2 : 
     738            2 :         assert_eq!(tasks.len(), 1);
     739            2 :         assert!(matches!(tasks[0].op, UploadOp::Barrier(_)));
     740            2 :         assert_eq!(queue.queued_operations.len(), 3);
     741              : 
     742              :         // Complete the barrier. The rest of the tasks schedule immediately.
     743            2 :         queue.complete(tasks[0].task_id);
     744            2 : 
     745            2 :         let tasks = queue.schedule_ready();
     746            6 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops[4..]);
     747            2 :         assert!(queue.queued_operations.is_empty());
     748              : 
     749            2 :         Ok(())
     750            2 :     }
     751              : 
     752              :     /// Deletes can be scheduled in parallel, even if they're for the same file.
     753              :     #[test]
     754            2 :     fn schedule_delete_parallel() -> anyhow::Result<()> {
     755            2 :         let mut queue = UploadQueue::Uninitialized;
     756            2 :         let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
     757            2 :         let tli = make_timeline();
     758            2 : 
     759            2 :         // Enqueue a bunch of deletes, some with conflicting names.
     760            2 :         let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     761            2 :         let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     762            2 :         let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     763            2 :         let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     764            2 : 
     765            2 :         let ops = [
     766            2 :             UploadOp::Delete(Delete {
     767            2 :                 layers: vec![(layer0.layer_desc().layer_name(), layer0.metadata())],
     768            2 :             }),
     769            2 :             UploadOp::Delete(Delete {
     770            2 :                 layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
     771            2 :             }),
     772            2 :             UploadOp::Delete(Delete {
     773            2 :                 layers: vec![
     774            2 :                     (layer1.layer_desc().layer_name(), layer1.metadata()),
     775            2 :                     (layer2.layer_desc().layer_name(), layer2.metadata()),
     776            2 :                 ],
     777            2 :             }),
     778            2 :             UploadOp::Delete(Delete {
     779            2 :                 layers: vec![(layer2.layer_desc().layer_name(), layer2.metadata())],
     780            2 :             }),
     781            2 :             UploadOp::Delete(Delete {
     782            2 :                 layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
     783            2 :             }),
     784            2 :         ];
     785            2 : 
     786            2 :         queue.queued_operations.extend(ops.clone());
     787            2 : 
     788            2 :         // Schedule all ready operations. Since deletes don't conflict, they're all scheduled.
     789            2 :         let tasks = queue.schedule_ready();
     790            2 : 
     791           10 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops);
     792            2 :         assert!(queue.queued_operations.is_empty());
     793              : 
     794            2 :         Ok(())
     795            2 :     }
     796              : 
     797              :     /// Conflicting uploads are serialized.
     798              :     #[test]
     799            2 :     fn schedule_upload_conflicts() -> anyhow::Result<()> {
     800            2 :         let mut queue = UploadQueue::Uninitialized;
     801            2 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
     802            2 :         let tli = make_timeline();
     803            2 : 
     804            2 :         // Enqueue three versions of the same layer, with different file sizes.
     805            2 :         let layer0a = make_layer_with_size(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51", 1);
     806            2 :         let layer0b = make_layer_with_size(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51", 2);
     807            2 :         let layer0c = make_layer_with_size(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51", 3);
     808            2 : 
     809            2 :         let ops = [
     810            2 :             UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None),
     811            2 :             UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None),
     812            2 :             UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None),
     813            2 :         ];
     814            2 : 
     815            2 :         queue.queued_operations.extend(ops.clone());
     816              : 
     817              :         // Only one version should be scheduled and uploaded at a time.
     818            8 :         for op in ops {
     819            6 :             let tasks = queue.schedule_ready();
     820            6 :             assert_eq!(tasks.len(), 1);
     821            6 :             assert_same_op(&tasks[0].op, &op);
     822            6 :             queue.complete(tasks[0].task_id);
     823              :         }
     824            2 :         assert!(queue.schedule_ready().is_empty());
     825            2 :         assert!(queue.queued_operations.is_empty());
     826              : 
     827            2 :         Ok(())
     828            2 :     }
     829              : 
     830              :     /// Conflicting uploads and deletes are serialized.
     831              :     #[test]
     832            2 :     fn schedule_upload_delete_conflicts() -> anyhow::Result<()> {
     833            2 :         let mut queue = UploadQueue::Uninitialized;
     834            2 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
     835            2 :         let tli = make_timeline();
     836            2 : 
     837            2 :         // Enqueue two layer uploads, with a delete of both layers in between them. These should be
     838            2 :         // scheduled one at a time, since deletes can't bypass uploads and vice versa.
     839            2 :         let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     840            2 :         let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     841            2 : 
     842            2 :         let ops = [
     843            2 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
     844            2 :             UploadOp::Delete(Delete {
     845            2 :                 layers: vec![
     846            2 :                     (layer0.layer_desc().layer_name(), layer0.metadata()),
     847            2 :                     (layer1.layer_desc().layer_name(), layer1.metadata()),
     848            2 :                 ],
     849            2 :             }),
     850            2 :             UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
     851            2 :         ];
     852            2 : 
     853            2 :         queue.queued_operations.extend(ops.clone());
     854              : 
     855              :         // Only one version should be scheduled and uploaded at a time.
     856            8 :         for op in ops {
     857            6 :             let tasks = queue.schedule_ready();
     858            6 :             assert_eq!(tasks.len(), 1);
     859            6 :             assert_same_op(&tasks[0].op, &op);
     860            6 :             queue.complete(tasks[0].task_id);
     861              :         }
     862            2 :         assert!(queue.schedule_ready().is_empty());
     863            2 :         assert!(queue.queued_operations.is_empty());
     864              : 
     865            2 :         Ok(())
     866            2 :     }
     867              : 
     868              :     /// Non-conflicting uploads and deletes can bypass the queue, avoiding the conflicting
     869              :     /// delete/upload operations at the head of the queue.
     870              :     #[test]
     871            2 :     fn schedule_upload_delete_conflicts_bypass() -> anyhow::Result<()> {
     872            2 :         let mut queue = UploadQueue::Uninitialized;
     873            2 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
     874            2 :         let tli = make_timeline();
     875            2 : 
     876            2 :         // Enqueue two layer uploads, with a delete of both layers in between them. These should be
     877            2 :         // scheduled one at a time, since deletes can't bypass uploads and vice versa.
     878            2 :         //
     879            2 :         // Also enqueue non-conflicting uploads and deletes at the end. These can bypass the queue
     880            2 :         // and run immediately.
     881            2 :         let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     882            2 :         let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     883            2 :         let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     884            2 :         let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     885            2 : 
     886            2 :         let ops = [
     887            2 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
     888            2 :             UploadOp::Delete(Delete {
     889            2 :                 layers: vec![
     890            2 :                     (layer0.layer_desc().layer_name(), layer0.metadata()),
     891            2 :                     (layer1.layer_desc().layer_name(), layer1.metadata()),
     892            2 :                 ],
     893            2 :             }),
     894            2 :             UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
     895            2 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
     896            2 :             UploadOp::Delete(Delete {
     897            2 :                 layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
     898            2 :             }),
     899            2 :         ];
     900            2 : 
     901            2 :         queue.queued_operations.extend(ops.clone());
     902            2 : 
     903            2 :         // Operations 0, 3, and 4 are scheduled immediately.
     904            2 :         let tasks = queue.schedule_ready();
     905            6 :         assert_same_ops(tasks.iter().map(|t| &t.op), [&ops[0], &ops[3], &ops[4]]);
     906            2 :         assert_eq!(queue.queued_operations.len(), 2);
     907              : 
     908            2 :         Ok(())
     909            2 :     }
     910              : 
     911              :     /// Non-conflicting uploads are parallelized.
     912              :     #[test]
     913            2 :     fn schedule_upload_parallel() -> anyhow::Result<()> {
     914            2 :         let mut queue = UploadQueue::Uninitialized;
     915            2 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
     916            2 :         let tli = make_timeline();
     917            2 : 
     918            2 :         // Enqueue three different layer uploads.
     919            2 :         let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     920            2 :         let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     921            2 :         let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     922            2 : 
     923            2 :         let ops = [
     924            2 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
     925            2 :             UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
     926            2 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
     927            2 :         ];
     928            2 : 
     929            2 :         queue.queued_operations.extend(ops.clone());
     930            2 : 
     931            2 :         // All uploads should be scheduled concurrently.
     932            2 :         let tasks = queue.schedule_ready();
     933            2 : 
     934            6 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops);
     935            2 :         assert!(queue.queued_operations.is_empty());
     936              : 
     937            2 :         Ok(())
     938            2 :     }
     939              : 
     940              :     /// Index uploads are coalesced.
     941              :     #[test]
     942            2 :     fn schedule_index_coalesce() -> anyhow::Result<()> {
     943            2 :         let mut queue = UploadQueue::Uninitialized;
     944            2 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
     945              : 
     946              :         // Enqueue three uploads of the current empty index.
     947            2 :         let index = Box::new(queue.clean.0.clone());
     948            2 : 
     949            2 :         let ops = [
     950            2 :             UploadOp::UploadMetadata {
     951            2 :                 uploaded: index.clone(),
     952            2 :             },
     953            2 :             UploadOp::UploadMetadata {
     954            2 :                 uploaded: index.clone(),
     955            2 :             },
     956            2 :             UploadOp::UploadMetadata {
     957            2 :                 uploaded: index.clone(),
     958            2 :             },
     959            2 :         ];
     960            2 : 
     961            2 :         queue.queued_operations.extend(ops.clone());
     962            2 : 
     963            2 :         // The index uploads are coalesced into a single operation.
     964            2 :         let tasks = queue.schedule_ready();
     965            2 :         assert_eq!(tasks.len(), 1);
     966            2 :         assert_same_op(&tasks[0].op, &ops[2]);
     967            2 :         assert_same_ops(&tasks[0].coalesced_ops, &ops[0..2]);
     968            2 : 
     969            2 :         assert!(queue.queued_operations.is_empty());
     970              : 
     971            2 :         Ok(())
     972            2 :     }
     973              : 
     974              :     /// Chains of upload/index operations lead to parallel layer uploads and serial index uploads.
     975              :     /// This is the common case with layer flushes.
     976              :     #[test]
     977            2 :     fn schedule_index_upload_chain() -> anyhow::Result<()> {
     978            2 :         let mut queue = UploadQueue::Uninitialized;
     979            2 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
     980            2 :         let tli = make_timeline();
     981            2 : 
     982            2 :         // Enqueue three uploads of the current empty index.
     983            2 :         let index = Box::new(queue.clean.0.clone());
     984            2 :         let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     985            2 :         let index0 = index_with(&index, &layer0);
     986            2 :         let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     987            2 :         let index1 = index_with(&index0, &layer1);
     988            2 :         let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
     989            2 :         let index2 = index_with(&index1, &layer2);
     990            2 : 
     991            2 :         let ops = [
     992            2 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
     993            2 :             UploadOp::UploadMetadata {
     994            2 :                 uploaded: index0.clone(),
     995            2 :             },
     996            2 :             UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
     997            2 :             UploadOp::UploadMetadata {
     998            2 :                 uploaded: index1.clone(),
     999            2 :             },
    1000            2 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
    1001            2 :             UploadOp::UploadMetadata {
    1002            2 :                 uploaded: index2.clone(),
    1003            2 :             },
    1004            2 :         ];
    1005            2 : 
    1006            2 :         queue.queued_operations.extend(ops.clone());
    1007            2 : 
    1008            2 :         // The layer uploads should be scheduled immediately. The indexes must wait.
    1009            2 :         let upload_tasks = queue.schedule_ready();
    1010            2 :         assert_same_ops(
    1011            6 :             upload_tasks.iter().map(|t| &t.op),
    1012            2 :             [&ops[0], &ops[2], &ops[4]],
    1013            2 :         );
    1014            2 : 
    1015            2 :         // layer2 completes first. None of the indexes can upload yet.
    1016            2 :         queue.complete(upload_tasks[2].task_id);
    1017            2 :         assert!(queue.schedule_ready().is_empty());
    1018              : 
    1019              :         // layer0 completes. index0 can upload. It completes.
    1020            2 :         queue.complete(upload_tasks[0].task_id);
    1021            2 :         let index_tasks = queue.schedule_ready();
    1022            2 :         assert_eq!(index_tasks.len(), 1);
    1023            2 :         assert_same_op(&index_tasks[0].op, &ops[1]);
    1024            2 :         queue.complete(index_tasks[0].task_id);
    1025            2 : 
    1026            2 :         // layer 1 completes. This unblocks index 1 and 2, which coalesce into
    1027            2 :         // a single upload for index 2.
    1028            2 :         queue.complete(upload_tasks[1].task_id);
    1029            2 : 
    1030            2 :         let index_tasks = queue.schedule_ready();
    1031            2 :         assert_eq!(index_tasks.len(), 1);
    1032            2 :         assert_same_op(&index_tasks[0].op, &ops[5]);
    1033            2 :         assert_same_ops(&index_tasks[0].coalesced_ops, &ops[3..4]);
    1034            2 : 
    1035            2 :         assert!(queue.queued_operations.is_empty());
    1036              : 
    1037            2 :         Ok(())
    1038            2 :     }
    1039              : 
    1040              :     /// A delete can't bypass an index upload if an index ahead of it still references it.
    1041              :     #[test]
    1042            2 :     fn schedule_index_delete_dereferenced() -> anyhow::Result<()> {
    1043            2 :         let mut queue = UploadQueue::Uninitialized;
    1044            2 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
    1045            2 :         let tli = make_timeline();
    1046            2 : 
    1047            2 :         // Create a layer to upload.
    1048            2 :         let layer = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
    1049            2 :         let index_upload = index_with(&queue.clean.0, &layer);
    1050            2 : 
    1051            2 :         // Remove the layer reference in a new index, then delete the layer.
    1052            2 :         let index_deref = index_without(&index_upload, &layer);
    1053            2 : 
    1054            2 :         let ops = [
    1055            2 :             // Initial upload, with a barrier to prevent index coalescing.
    1056            2 :             UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
    1057            2 :             UploadOp::UploadMetadata {
    1058            2 :                 uploaded: index_upload.clone(),
    1059            2 :             },
    1060            2 :             UploadOp::Barrier(tokio::sync::watch::channel(()).0),
    1061            2 :             // Dereference the layer and delete it.
    1062            2 :             UploadOp::UploadMetadata {
    1063            2 :                 uploaded: index_deref.clone(),
    1064            2 :             },
    1065            2 :             UploadOp::Delete(Delete {
    1066            2 :                 layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
    1067            2 :             }),
    1068            2 :         ];
    1069            2 : 
    1070            2 :         queue.queued_operations.extend(ops.clone());
    1071              : 
    1072              :         // Operations are serialized.
    1073           12 :         for op in ops {
    1074           10 :             let tasks = queue.schedule_ready();
    1075           10 :             assert_eq!(tasks.len(), 1);
    1076           10 :             assert_same_op(&tasks[0].op, &op);
    1077           10 :             queue.complete(tasks[0].task_id);
    1078              :         }
    1079            2 :         assert!(queue.queued_operations.is_empty());
    1080              : 
    1081            2 :         Ok(())
    1082            2 :     }
    1083              : 
    1084              :     /// An upload with a reused layer name doesn't clobber the previous layer. Specifically, a
    1085              :     /// dereference/upload/reference cycle can't allow the upload to bypass the reference.
    1086              :     #[test]
    1087            2 :     fn schedule_index_upload_dereferenced() -> anyhow::Result<()> {
    1088            2 :         let mut queue = UploadQueue::Uninitialized;
    1089            2 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
    1090            2 :         let tli = make_timeline();
    1091            2 : 
    1092            2 :         // Create a layer to upload.
    1093            2 :         let layer = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
    1094            2 : 
    1095            2 :         // Upload the layer. Then dereference the layer, and upload/reference it again.
    1096            2 :         let index_upload = index_with(&queue.clean.0, &layer);
    1097            2 :         let index_deref = index_without(&index_upload, &layer);
    1098            2 :         let index_ref = index_with(&index_deref, &layer);
    1099            2 : 
    1100            2 :         let ops = [
    1101            2 :             // Initial upload, with a barrier to prevent index coalescing.
    1102            2 :             UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
    1103            2 :             UploadOp::UploadMetadata {
    1104            2 :                 uploaded: index_upload.clone(),
    1105            2 :             },
    1106            2 :             UploadOp::Barrier(tokio::sync::watch::channel(()).0),
    1107            2 :             // Dereference the layer.
    1108            2 :             UploadOp::UploadMetadata {
    1109            2 :                 uploaded: index_deref.clone(),
    1110            2 :             },
    1111            2 :             // Replace and reference the layer.
    1112            2 :             UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
    1113            2 :             UploadOp::UploadMetadata {
    1114            2 :                 uploaded: index_ref.clone(),
    1115            2 :             },
    1116            2 :         ];
    1117            2 : 
    1118            2 :         queue.queued_operations.extend(ops.clone());
    1119              : 
    1120              :         // Operations are serialized.
    1121           14 :         for op in ops {
    1122           12 :             let tasks = queue.schedule_ready();
    1123           12 :             assert_eq!(tasks.len(), 1);
    1124           12 :             assert_same_op(&tasks[0].op, &op);
    1125           12 :             queue.complete(tasks[0].task_id);
    1126              :         }
    1127            2 :         assert!(queue.queued_operations.is_empty());
    1128              : 
    1129            2 :         Ok(())
    1130            2 :     }
    1131              : 
    1132              :     /// Nothing can bypass a shutdown, and it waits for inprogress tasks. It's never returned from
    1133              :     /// next_ready(), but is left at the head of the queue.
    1134              :     #[test]
    1135            2 :     fn schedule_shutdown() -> anyhow::Result<()> {
    1136            2 :         let mut queue = UploadQueue::Uninitialized;
    1137            2 :         let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
    1138            2 :         let tli = make_timeline();
    1139            2 : 
    1140            2 :         let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
    1141            2 :         let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
    1142            2 :         let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
    1143            2 :         let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
    1144            2 :         let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
    1145            2 : 
    1146            2 :         // Enqueue non-conflicting upload, delete, and index before and after a shutdown.
    1147            2 :         let ops = [
    1148            2 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
    1149            2 :             UploadOp::Delete(Delete {
    1150            2 :                 layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
    1151            2 :             }),
    1152            2 :             UploadOp::UploadMetadata {
    1153            2 :                 uploaded: index.clone(),
    1154            2 :             },
    1155            2 :             UploadOp::Shutdown,
    1156            2 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
    1157            2 :             UploadOp::Delete(Delete {
    1158            2 :                 layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
    1159            2 :             }),
    1160            2 :             UploadOp::UploadMetadata {
    1161            2 :                 uploaded: index.clone(),
    1162            2 :             },
    1163            2 :         ];
    1164            2 : 
    1165            2 :         queue.queued_operations.extend(ops.clone());
    1166            2 : 
    1167            2 :         // Schedule the initial operations ahead of the shutdown.
    1168            2 :         let tasks = queue.schedule_ready();
    1169            2 : 
    1170            6 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]);
    1171            2 :         assert!(matches!(
    1172            2 :             queue.queued_operations.front(),
    1173              :             Some(&UploadOp::Shutdown)
    1174              :         ));
    1175              : 
    1176              :         // Complete the initial operations. The shutdown isn't triggered while they're pending.
    1177            8 :         for task in tasks {
    1178            6 :             assert!(queue.schedule_ready().is_empty());
    1179            6 :             queue.complete(task.task_id);
    1180              :         }
    1181              : 
    1182              :         // The shutdown is triggered the next time we try to pull an operation. It isn't returned,
    1183              :         // but is left in the queue.
    1184            2 :         assert!(!queue.shutdown_ready.is_closed());
    1185            2 :         assert!(queue.next_ready().is_none());
    1186            2 :         assert!(queue.shutdown_ready.is_closed());
    1187              : 
    1188            2 :         Ok(())
    1189            2 :     }
    1190              : 
    1191              :     /// Scheduling respects inprogress_limit.
    1192              :     #[test]
    1193            2 :     fn schedule_inprogress_limit() -> anyhow::Result<()> {
    1194            2 :         // Create a queue with inprogress_limit=2.
    1195            2 :         let mut queue = UploadQueue::Uninitialized;
    1196            2 :         let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 2)?;
    1197            2 :         let tli = make_timeline();
    1198            2 : 
    1199            2 :         // Enqueue a bunch of uploads.
    1200            2 :         let layer0 = make_layer(&tli, "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
    1201            2 :         let layer1 = make_layer(&tli, "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
    1202            2 :         let layer2 = make_layer(&tli, "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
    1203            2 :         let layer3 = make_layer(&tli, "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51");
    1204            2 : 
    1205            2 :         let ops = [
    1206            2 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
    1207            2 :             UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
    1208            2 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
    1209            2 :             UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
    1210            2 :         ];
    1211            2 : 
    1212            2 :         queue.queued_operations.extend(ops.clone());
    1213            2 : 
    1214            2 :         // Schedule all ready operations. Only 2 are scheduled.
    1215            2 :         let tasks = queue.schedule_ready();
    1216            4 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..2]);
    1217            2 :         assert!(queue.next_ready().is_none());
    1218              : 
    1219              :         // When one completes, another is scheduled.
    1220            2 :         queue.complete(tasks[0].task_id);
    1221            2 :         let tasks = queue.schedule_ready();
    1222            2 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops[2..3]);
    1223            2 : 
    1224            2 :         Ok(())
    1225            2 :     }
    1226              : 
    1227              :     /// Tests that can_bypass takes name, generation and shard index into account for all operations.
    1228              :     #[test]
    1229            2 :     fn can_bypass_path() -> anyhow::Result<()> {
    1230            2 :         let tli = make_timeline();
    1231            2 : 
    1232            2 :         let name0 = &"000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51";
    1233            2 :         let name1 = &"100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51";
    1234              : 
    1235              :         // Asserts that layers a and b either can or can't bypass each other, for all combinations
    1236              :         // of operations (except Delete and UploadMetadata which are special-cased).
    1237              :         #[track_caller]
    1238           16 :         fn assert_can_bypass(a: ResidentLayer, b: ResidentLayer, can_bypass: bool) {
    1239           16 :             let index = IndexPart::empty(TimelineMetadata::example());
    1240           48 :             for (a, b) in make_ops(a).into_iter().zip(make_ops(b)) {
    1241           48 :                 match (&a, &b) {
    1242              :                     // Deletes can always bypass each other.
    1243           16 :                     (UploadOp::Delete(_), UploadOp::Delete(_)) => assert!(a.can_bypass(&b, &index)),
    1244              :                     // Indexes can never bypass each other.
    1245              :                     (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => {
    1246           16 :                         assert!(!a.can_bypass(&b, &index))
    1247              :                     }
    1248              :                     // For other operations, assert as requested.
    1249           16 :                     (a, b) => assert_eq!(a.can_bypass(b, &index), can_bypass),
    1250              :                 }
    1251              :             }
    1252           16 :         }
    1253              : 
    1254           32 :         fn make_ops(layer: ResidentLayer) -> Vec<UploadOp> {
    1255           32 :             let mut index = IndexPart::empty(TimelineMetadata::example());
    1256           32 :             index
    1257           32 :                 .layer_metadata
    1258           32 :                 .insert(layer.layer_desc().layer_name(), layer.metadata());
    1259           32 :             vec![
    1260           32 :                 UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
    1261           32 :                 UploadOp::Delete(Delete {
    1262           32 :                     layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
    1263           32 :                 }),
    1264           32 :                 UploadOp::UploadMetadata {
    1265           32 :                     uploaded: Box::new(index),
    1266           32 :                 },
    1267           32 :             ]
    1268           32 :         }
    1269              : 
    1270              :         // Makes a ResidentLayer.
    1271           32 :         let layer = |name: &'static str, shard: Option<u8>, generation: u32| -> ResidentLayer {
    1272           32 :             let shard = shard
    1273           32 :                 .map(|n| ShardIndex::new(ShardNumber(n), ShardCount(8)))
    1274           32 :                 .unwrap_or(ShardIndex::unsharded());
    1275           32 :             let metadata = LayerFileMetadata {
    1276           32 :                 shard,
    1277           32 :                 generation: Generation::Valid(generation),
    1278           32 :                 file_size: 0,
    1279           32 :             };
    1280           32 :             make_layer_with_metadata(&tli, name, metadata)
    1281           32 :         };
    1282              : 
    1283              :         // Same name and metadata can't bypass. This goes both for unsharded and sharded, as well as
    1284              :         // 0 or >0 generation.
    1285            2 :         assert_can_bypass(layer(name0, None, 0), layer(name0, None, 0), false);
    1286            2 :         assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(0), 0), false);
    1287            2 :         assert_can_bypass(layer(name0, None, 1), layer(name0, None, 1), false);
    1288            2 : 
    1289            2 :         // Different names can bypass.
    1290            2 :         assert_can_bypass(layer(name0, None, 0), layer(name1, None, 0), true);
    1291            2 : 
    1292            2 :         // Different shards can bypass. Shard 0 is different from unsharded.
    1293            2 :         assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(1), 0), true);
    1294            2 :         assert_can_bypass(layer(name0, Some(0), 0), layer(name0, None, 0), true);
    1295            2 : 
    1296            2 :         // Different generations can bypass, both sharded and unsharded.
    1297            2 :         assert_can_bypass(layer(name0, None, 0), layer(name0, None, 1), true);
    1298            2 :         assert_can_bypass(layer(name0, Some(1), 0), layer(name0, Some(1), 1), true);
    1299            2 : 
    1300            2 :         Ok(())
    1301            2 :     }
    1302              : }
        

Generated by: LCOV version 2.1-beta