LCOV - code coverage report
Current view: top level - pageserver/src/tenant - upload_queue.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 94.7 % 959 908
Test Date: 2025-04-24 20:31:15 Functions: 92.6 % 81 75

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet, VecDeque};
       2              : use std::fmt::Debug;
       3              : use std::sync::Arc;
       4              : use std::sync::atomic::AtomicU32;
       5              : 
       6              : use chrono::NaiveDateTime;
       7              : use once_cell::sync::Lazy;
       8              : use tracing::info;
       9              : use utils::generation::Generation;
      10              : use utils::lsn::{AtomicLsn, Lsn};
      11              : 
      12              : use super::remote_timeline_client::is_same_remote_layer_path;
      13              : use super::storage_layer::{AsLayerDesc as _, LayerName, ResidentLayer};
      14              : use crate::tenant::metadata::TimelineMetadata;
      15              : use crate::tenant::remote_timeline_client::index::{IndexPart, LayerFileMetadata};
      16              : 
      17              : /// Kill switch for upload queue reordering in case it causes problems.
      18              : /// TODO: remove this once we have confidence in it.
      19              : static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy<bool> =
      20         1229 :     Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true"));
      21              : 
      22              : /// Kill switch for index upload coalescing in case it causes problems.
      23              : /// TODO: remove this once we have confidence in it.
      24              : static DISABLE_UPLOAD_QUEUE_INDEX_COALESCING: Lazy<bool> =
      25           72 :     Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_INDEX_COALESCING").as_deref() == Ok("true"));
      26              : 
      27              : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
      28              : // memory for Uninitialized variants. Doesn't matter in practice, there are not
      29              : // that many upload queues in a running pageserver, and most of them are initialized
      30              : // anyway.
      31              : #[allow(clippy::large_enum_variant)]
      32              : pub enum UploadQueue {
      33              :     Uninitialized,
      34              :     Initialized(UploadQueueInitialized),
      35              :     Stopped(UploadQueueStopped),
      36              : }
      37              : 
      38              : impl UploadQueue {
      39            0 :     pub fn as_str(&self) -> &'static str {
      40            0 :         match self {
      41            0 :             UploadQueue::Uninitialized => "Uninitialized",
      42            0 :             UploadQueue::Initialized(_) => "Initialized",
      43            0 :             UploadQueue::Stopped(_) => "Stopped",
      44              :         }
      45            0 :     }
      46              : }
      47              : 
      48              : #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
      49              : pub enum OpType {
      50              :     MayReorder,
      51              :     FlushDeletion,
      52              : }
      53              : 
      54              : /// This keeps track of queued and in-progress tasks.
      55              : pub struct UploadQueueInitialized {
      56              :     /// Maximum number of inprogress tasks to schedule. 0 is no limit.
      57              :     pub(crate) inprogress_limit: usize,
      58              : 
      59              :     /// Counter to assign task IDs
      60              :     pub(crate) task_counter: u64,
      61              : 
      62              :     /// The next uploaded index_part.json; assumed to be dirty.
      63              :     ///
      64              :     /// Should not be read, directly except for layer file updates. Instead you should add a
      65              :     /// projected field.
      66              :     pub(crate) dirty: IndexPart,
      67              : 
      68              :     /// The latest remote persisted IndexPart.
      69              :     ///
      70              :     /// Each completed metadata upload will update this. The second item is the task_id which last
      71              :     /// updated the value, used to ensure we never store an older value over a newer one.
      72              :     pub(crate) clean: (IndexPart, Option<u64>),
      73              : 
      74              :     /// How many file uploads or deletions been scheduled, since the
      75              :     /// last (scheduling of) metadata index upload?
      76              :     pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
      77              : 
      78              :     /// The Lsn is only updated after our generation has been validated with
      79              :     /// the control plane (unlesss a timeline's generation is None, in which case
      80              :     /// we skip validation)
      81              :     pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
      82              : 
      83              :     /// Tasks that are currently in-progress. In-progress means that a tokio Task
      84              :     /// has been launched for it. An in-progress task can be busy uploading, but it can
      85              :     /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
      86              :     /// be waiting for retry in `exponential_backoff`.
      87              :     pub inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
      88              : 
      89              :     /// Queued operations that have not been launched yet. They might depend on previous
      90              :     /// tasks to finish. For example, metadata upload cannot be performed before all
      91              :     /// preceding layer file uploads have completed.
      92              :     pub queued_operations: VecDeque<UploadOp>,
      93              : 
      94              :     /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
      95              :     /// for error logging.
      96              :     ///
      97              :     /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
      98              :     /// bug causing leaks, then it's better to not leave this enabled for production builds.
      99              :     #[cfg(feature = "testing")]
     100              :     pub(crate) dangling_files: HashMap<LayerName, Generation>,
     101              : 
     102              :     /// Ensure we order file operations correctly.
     103              :     pub(crate) recently_deleted: HashSet<(LayerName, Generation)>,
     104              : 
     105              :     /// Deletions that are blocked by the tenant configuration
     106              :     pub(crate) blocked_deletions: Vec<Delete>,
     107              : 
     108              :     /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
     109              :     pub(crate) shutting_down: bool,
     110              : 
     111              :     /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
     112              :     /// wait on until one of them stops the queue. The semaphore is closed when
     113              :     /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
     114              :     pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
     115              : }
     116              : 
     117              : impl UploadQueueInitialized {
     118           48 :     pub(super) fn no_pending_work(&self) -> bool {
     119           48 :         self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
     120           48 :     }
     121              : 
     122            0 :     pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
     123            0 :         self.visible_remote_consistent_lsn.load()
     124            0 :     }
     125              : 
     126            0 :     pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     127            0 :         let lsn = self.clean.0.metadata.disk_consistent_lsn();
     128            0 :         self.clean.1.map(|_| lsn)
     129            0 :     }
     130              : 
     131              :     /// Returns and removes the next ready operation from the queue, if any. This isn't necessarily
     132              :     /// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump
     133              :     /// the queue if it doesn't conflict with operations ahead of it.
     134              :     ///
     135              :     /// Also returns any operations that were coalesced into this one, e.g. multiple index uploads.
     136              :     ///
     137              :     /// None may be returned even if the queue isn't empty, if no operations are ready yet.
     138              :     ///
     139              :     /// NB: this is quadratic, but queues are expected to be small, and bounded by inprogress_limit.
     140        66934 :     pub fn next_ready(&mut self) -> Option<(UploadOp, Vec<UploadOp>)> {
     141        66934 :         // If inprogress_tasks is already at limit, don't schedule anything more.
     142        66934 :         if self.inprogress_limit > 0 && self.inprogress_tasks.len() >= self.inprogress_limit {
     143           36 :             return None;
     144        66898 :         }
     145              : 
     146       131619 :         for (i, candidate) in self.queued_operations.iter().enumerate() {
     147              :             // If this candidate is ready, go for it. Otherwise, try the next one.
     148       131619 :             if self.is_ready(i) {
     149              :                 // Shutdown operations are left at the head of the queue, to prevent further
     150              :                 // operations from starting. Signal that we're ready to shut down.
     151        23540 :                 if matches!(candidate, UploadOp::Shutdown) {
     152           60 :                     assert!(self.inprogress_tasks.is_empty(), "shutdown with tasks");
     153           60 :                     assert_eq!(i, 0, "shutdown not at head of queue");
     154           60 :                     self.shutdown_ready.close();
     155           60 :                     return None;
     156        23480 :                 }
     157        23480 : 
     158        23480 :                 let mut op = self.queued_operations.remove(i).expect("i can't disappear");
     159        23480 : 
     160        23480 :                 // Coalesce any back-to-back index uploads by only uploading the newest one that's
     161        23480 :                 // ready. This typically happens with layer/index/layer/index/... sequences, where
     162        23480 :                 // the layers bypass the indexes, leaving the indexes queued.
     163        23480 :                 //
     164        23480 :                 // If other operations are interleaved between index uploads we don't try to
     165        23480 :                 // coalesce them, since we may as well update the index concurrently with them.
     166        23480 :                 // This keeps the index fresh and avoids starvation.
     167        23480 :                 //
     168        23480 :                 // NB: we assume that all uploaded indexes have the same remote path. This
     169        23480 :                 // is true at the time of writing: the path only depends on the tenant,
     170        23480 :                 // timeline and generation, all of which are static for a timeline instance.
     171        23480 :                 // Otherwise, we must be careful not to coalesce different paths.
     172        23480 :                 let mut coalesced_ops = Vec::new();
     173        23480 :                 if matches!(op, UploadOp::UploadMetadata { .. }) {
     174         9235 :                     while let Some(UploadOp::UploadMetadata { .. }) = self.queued_operations.get(i)
     175              :                     {
     176          208 :                         if *DISABLE_UPLOAD_QUEUE_INDEX_COALESCING {
     177            0 :                             break;
     178          208 :                         }
     179          208 :                         if !self.is_ready(i) {
     180          142 :                             break;
     181           66 :                         }
     182           66 :                         coalesced_ops.push(op);
     183           66 :                         op = self.queued_operations.remove(i).expect("i can't disappear");
     184              :                     }
     185        14311 :                 }
     186              : 
     187        23480 :                 return Some((op, coalesced_ops));
     188       108079 :             }
     189              : 
     190              :             // Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up.
     191       108079 :             if matches!(candidate, UploadOp::Barrier(_) | UploadOp::Shutdown) {
     192         2728 :                 return None;
     193       105351 :             }
     194       105351 : 
     195       105351 :             // If upload queue reordering is disabled, bail out after the first operation.
     196       105351 :             if *DISABLE_UPLOAD_QUEUE_REORDERING {
     197            0 :                 return None;
     198       105351 :             }
     199              :         }
     200        40630 :         None
     201        66934 :     }
     202              : 
     203              :     /// Returns true if the queued operation at the given position is ready to be uploaded, i.e. if
     204              :     /// it doesn't conflict with any in-progress or queued operations ahead of it. Operations are
     205              :     /// allowed to skip the queue when it's safe to do so, to increase parallelism.
     206              :     ///
     207              :     /// The position must be valid for the queue size.
     208       131827 :     fn is_ready(&self, pos: usize) -> bool {
     209       131827 :         let candidate = self.queued_operations.get(pos).expect("invalid position");
     210       131827 :         self
     211       131827 :             // Look at in-progress operations, in random order.
     212       131827 :             .inprogress_tasks
     213       131827 :             .values()
     214      6987629 :             .map(|task| &task.op)
     215       131827 :             // Then queued operations ahead of the candidate, front-to-back.
     216       131827 :             .chain(self.queued_operations.iter().take(pos))
     217       131827 :             // Keep track of the active index ahead of each operation. This is used to ensure that
     218       131827 :             // an upload doesn't skip the queue too far, such that it modifies a layer that's
     219       131827 :             // referenced by an active index.
     220       131827 :             //
     221       131827 :             // It's okay that in-progress operations are emitted in random order above, since at
     222       131827 :             // most one of them can be an index upload (enforced by can_bypass).
     223      7072965 :             .scan(&self.clean.0, |next_active_index, op| {
     224      7072965 :                 let active_index = *next_active_index;
     225      7072965 :                 if let UploadOp::UploadMetadata { uploaded } = op {
     226        93073 :                     *next_active_index = uploaded; // stash index for next operation after this
     227      6979892 :                 }
     228      7072965 :                 Some((op, active_index))
     229      7072965 :             })
     230       131827 :             // Check if the candidate can bypass all of them.
     231      7072965 :             .all(|(op, active_index)| candidate.can_bypass(op, active_index))
     232       131827 :     }
     233              : 
     234              :     /// Returns the number of in-progress deletion operations.
     235              :     #[cfg(test)]
     236           12 :     pub(crate) fn num_inprogress_deletions(&self) -> usize {
     237           12 :         self.inprogress_tasks
     238           12 :             .iter()
     239           12 :             .filter(|(_, t)| matches!(t.op, UploadOp::Delete(_)))
     240           12 :             .count()
     241           12 :     }
     242              : 
     243              :     /// Returns the number of in-progress layer uploads.
     244              :     #[cfg(test)]
     245           24 :     pub(crate) fn num_inprogress_layer_uploads(&self) -> usize {
     246           24 :         self.inprogress_tasks
     247           24 :             .iter()
     248           36 :             .filter(|(_, t)| matches!(t.op, UploadOp::UploadLayer(_, _, _)))
     249           24 :             .count()
     250           24 :     }
     251              : 
     252              :     /// Test helper that schedules all ready operations into inprogress_tasks, and returns
     253              :     /// references to them.
     254              :     ///
     255              :     /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into
     256              :     /// UploadQueue, so we can use the same code path.
     257              :     #[cfg(test)]
     258          468 :     fn schedule_ready(&mut self) -> Vec<Arc<UploadTask>> {
     259          468 :         let mut tasks = Vec::new();
     260              :         // NB: schedule operations one by one, to handle conflicts with inprogress_tasks.
     261         1032 :         while let Some((op, coalesced_ops)) = self.next_ready() {
     262          564 :             self.task_counter += 1;
     263          564 :             let task = Arc::new(UploadTask {
     264          564 :                 task_id: self.task_counter,
     265          564 :                 op,
     266          564 :                 coalesced_ops,
     267          564 :                 retries: 0.into(),
     268          564 :             });
     269          564 :             self.inprogress_tasks.insert(task.task_id, task.clone());
     270          564 :             tasks.push(task);
     271          564 :         }
     272          468 :         tasks
     273          468 :     }
     274              : 
     275              :     /// Test helper that marks an operation as completed, removing it from inprogress_tasks.
     276              :     ///
     277              :     /// TODO: the corresponding production logic should be moved from RemoteTimelineClient into
     278              :     /// UploadQueue, so we can use the same code path.
     279              :     #[cfg(test)]
     280          348 :     fn complete(&mut self, task_id: u64) {
     281          348 :         let Some(task) = self.inprogress_tasks.remove(&task_id) else {
     282            0 :             return;
     283              :         };
     284              :         // Update the clean index on uploads.
     285          348 :         if let UploadOp::UploadMetadata { ref uploaded } = task.op {
     286           96 :             if task.task_id > self.clean.1.unwrap_or_default() {
     287           96 :                 self.clean = (*uploaded.clone(), Some(task.task_id));
     288           96 :             }
     289          252 :         }
     290          348 :     }
     291              : }
     292              : 
     293              : #[derive(Clone, Copy)]
     294              : pub(super) enum SetDeletedFlagProgress {
     295              :     NotRunning,
     296              :     InProgress(NaiveDateTime),
     297              :     Successful(NaiveDateTime),
     298              : }
     299              : 
     300              : pub struct UploadQueueStoppedDeletable {
     301              :     pub(super) upload_queue_for_deletion: UploadQueueInitialized,
     302              :     pub(super) deleted_at: SetDeletedFlagProgress,
     303              : }
     304              : 
     305              : #[allow(clippy::large_enum_variant, reason = "TODO")]
     306              : pub enum UploadQueueStopped {
     307              :     Deletable(UploadQueueStoppedDeletable),
     308              :     Uninitialized,
     309              : }
     310              : 
     311              : #[derive(thiserror::Error, Debug)]
     312              : pub enum NotInitialized {
     313              :     #[error("queue is in state Uninitialized")]
     314              :     Uninitialized,
     315              :     #[error("queue is in state Stopped")]
     316              :     Stopped,
     317              :     #[error("queue is shutting down")]
     318              :     ShuttingDown,
     319              : }
     320              : 
     321              : impl NotInitialized {
     322            0 :     pub(crate) fn is_stopping(&self) -> bool {
     323              :         use NotInitialized::*;
     324            0 :         match self {
     325            0 :             Uninitialized => false,
     326            0 :             Stopped => true,
     327            0 :             ShuttingDown => true,
     328              :         }
     329            0 :     }
     330              : }
     331              : 
     332              : impl UploadQueue {
     333         2796 :     pub fn initialize_empty_remote(
     334         2796 :         &mut self,
     335         2796 :         metadata: &TimelineMetadata,
     336         2796 :         inprogress_limit: usize,
     337         2796 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     338         2796 :         match self {
     339         2796 :             UploadQueue::Uninitialized => (),
     340              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     341            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     342              :             }
     343              :         }
     344              : 
     345         2796 :         info!("initializing upload queue for empty remote");
     346              : 
     347         2796 :         let index_part = IndexPart::empty(metadata.clone());
     348         2796 : 
     349         2796 :         let state = UploadQueueInitialized {
     350         2796 :             inprogress_limit,
     351         2796 :             dirty: index_part.clone(),
     352         2796 :             clean: (index_part, None),
     353         2796 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     354         2796 :             visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
     355         2796 :             // what follows are boring default initializations
     356         2796 :             task_counter: 0,
     357         2796 :             inprogress_tasks: HashMap::new(),
     358         2796 :             queued_operations: VecDeque::new(),
     359         2796 :             #[cfg(feature = "testing")]
     360         2796 :             dangling_files: HashMap::new(),
     361         2796 :             recently_deleted: HashSet::new(),
     362         2796 :             blocked_deletions: Vec::new(),
     363         2796 :             shutting_down: false,
     364         2796 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     365         2796 :         };
     366         2796 : 
     367         2796 :         *self = UploadQueue::Initialized(state);
     368         2796 :         Ok(self.initialized_mut().expect("we just set it"))
     369         2796 :     }
     370              : 
     371          132 :     pub fn initialize_with_current_remote_index_part(
     372          132 :         &mut self,
     373          132 :         index_part: &IndexPart,
     374          132 :         inprogress_limit: usize,
     375          132 :     ) -> anyhow::Result<&mut UploadQueueInitialized> {
     376          132 :         match self {
     377          132 :             UploadQueue::Uninitialized => (),
     378              :             UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
     379            0 :                 anyhow::bail!("already initialized, state {}", self.as_str())
     380              :             }
     381              :         }
     382              : 
     383          132 :         info!(
     384            0 :             "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
     385            0 :             index_part.metadata.disk_consistent_lsn()
     386              :         );
     387              : 
     388          132 :         let state = UploadQueueInitialized {
     389          132 :             inprogress_limit,
     390          132 :             dirty: index_part.clone(),
     391          132 :             clean: (index_part.clone(), None),
     392          132 :             latest_files_changes_since_metadata_upload_scheduled: 0,
     393          132 :             visible_remote_consistent_lsn: Arc::new(
     394          132 :                 index_part.metadata.disk_consistent_lsn().into(),
     395          132 :             ),
     396          132 :             // what follows are boring default initializations
     397          132 :             task_counter: 0,
     398          132 :             inprogress_tasks: HashMap::new(),
     399          132 :             queued_operations: VecDeque::new(),
     400          132 :             #[cfg(feature = "testing")]
     401          132 :             dangling_files: HashMap::new(),
     402          132 :             recently_deleted: HashSet::new(),
     403          132 :             blocked_deletions: Vec::new(),
     404          132 :             shutting_down: false,
     405          132 :             shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
     406          132 :         };
     407          132 : 
     408          132 :         *self = UploadQueue::Initialized(state);
     409          132 :         Ok(self.initialized_mut().expect("we just set it"))
     410          132 :     }
     411              : 
     412        48745 :     pub fn initialized_mut(&mut self) -> Result<&mut UploadQueueInitialized, NotInitialized> {
     413              :         use UploadQueue::*;
     414        48745 :         match self {
     415            0 :             Uninitialized => Err(NotInitialized::Uninitialized),
     416        48745 :             Initialized(x) => {
     417        48745 :                 if x.shutting_down {
     418           24 :                     Err(NotInitialized::ShuttingDown)
     419              :                 } else {
     420        48721 :                     Ok(x)
     421              :                 }
     422              :             }
     423            0 :             Stopped(_) => Err(NotInitialized::Stopped),
     424              :         }
     425        48745 :     }
     426              : 
     427           12 :     pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
     428           12 :         match self {
     429              :             UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
     430            0 :                 anyhow::bail!("queue is in state {}", self.as_str())
     431              :             }
     432              :             UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
     433            0 :                 anyhow::bail!("queue is in state Stopped(Uninitialized)")
     434              :             }
     435           12 :             UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
     436              :         }
     437           12 :     }
     438              : }
     439              : 
     440              : /// An in-progress upload or delete task.
     441              : #[derive(Debug)]
     442              : pub struct UploadTask {
     443              :     /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
     444              :     pub task_id: u64,
     445              :     /// Number of task retries.
     446              :     pub retries: AtomicU32,
     447              :     /// The upload operation.
     448              :     pub op: UploadOp,
     449              :     /// Any upload operations that were coalesced into this operation. This typically happens with
     450              :     /// back-to-back index uploads, see `UploadQueueInitialized::next_ready()`.
     451              :     pub coalesced_ops: Vec<UploadOp>,
     452              : }
     453              : 
     454              : /// A deletion of some layers within the lifetime of a timeline.  This is not used
     455              : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
     456              : #[derive(Debug, Clone)]
     457              : pub struct Delete {
     458              :     pub layers: Vec<(LayerName, LayerFileMetadata)>,
     459              : }
     460              : 
     461              : #[derive(Clone, Debug)]
     462              : pub enum UploadOp {
     463              :     /// Upload a layer file. The last field indicates the last operation for thie file.
     464              :     UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),
     465              : 
     466              :     /// Upload a index_part.json file
     467              :     UploadMetadata {
     468              :         /// The next [`UploadQueueInitialized::clean`] after this upload succeeds.
     469              :         uploaded: Box<IndexPart>,
     470              :     },
     471              : 
     472              :     /// Delete layer files
     473              :     Delete(Delete),
     474              : 
     475              :     /// Barrier. When the barrier operation is reached, the channel is closed.
     476              :     Barrier(tokio::sync::watch::Sender<()>),
     477              : 
     478              :     /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
     479              :     /// this is the same as a Barrier.
     480              :     Shutdown,
     481              : }
     482              : 
     483              : impl std::fmt::Display for UploadOp {
     484            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     485            0 :         match self {
     486            0 :             UploadOp::UploadLayer(layer, metadata, mode) => {
     487            0 :                 write!(
     488            0 :                     f,
     489            0 :                     "UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
     490            0 :                     layer, metadata.file_size, metadata.generation, mode
     491            0 :                 )
     492              :             }
     493            0 :             UploadOp::UploadMetadata { uploaded, .. } => {
     494            0 :                 write!(
     495            0 :                     f,
     496            0 :                     "UploadMetadata(lsn: {})",
     497            0 :                     uploaded.metadata.disk_consistent_lsn()
     498            0 :                 )
     499              :             }
     500            0 :             UploadOp::Delete(delete) => {
     501            0 :                 write!(f, "Delete({} layers)", delete.layers.len())
     502              :             }
     503            0 :             UploadOp::Barrier(_) => write!(f, "Barrier"),
     504            0 :             UploadOp::Shutdown => write!(f, "Shutdown"),
     505              :         }
     506            0 :     }
     507              : }
     508              : 
     509              : impl UploadOp {
     510              :     /// Returns true if self can bypass other, i.e. if the operations don't conflict. index is the
     511              :     /// active index when other would be uploaded -- if we allow self to bypass other, this would
     512              :     /// be the active index when self is uploaded.
     513      7073253 :     pub fn can_bypass(&self, other: &UploadOp, index: &IndexPart) -> bool {
     514      7073253 :         match (self, other) {
     515              :             // Nothing can bypass a barrier or shutdown, and it can't bypass anything.
     516         2728 :             (UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false,
     517           96 :             (UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,
     518              : 
     519              :             // Uploads and deletes can bypass each other unless they're for the same file.
     520       119868 :             (UploadOp::UploadLayer(a, ameta, _), UploadOp::UploadLayer(b, bmeta, _)) => {
     521       119868 :                 let aname = &a.layer_desc().layer_name();
     522       119868 :                 let bname = &b.layer_desc().layer_name();
     523       119868 :                 !is_same_remote_layer_path(aname, ameta, bname, bmeta)
     524              :             }
     525          311 :             (UploadOp::UploadLayer(u, umeta, _), UploadOp::Delete(d))
     526      6826282 :             | (UploadOp::Delete(d), UploadOp::UploadLayer(u, umeta, _)) => {
     527      6826669 :                 d.layers.iter().all(|(dname, dmeta)| {
     528      6826669 :                     !is_same_remote_layer_path(&u.layer_desc().layer_name(), umeta, dname, dmeta)
     529      6826669 :                 })
     530              :             }
     531              : 
     532              :             // Deletes are idempotent and can always bypass each other.
     533        16905 :             (UploadOp::Delete(_), UploadOp::Delete(_)) => true,
     534              : 
     535              :             // Uploads and deletes can bypass an index upload as long as neither the uploaded index
     536              :             // nor the active index below it references the file. A layer can't be modified or
     537              :             // deleted while referenced by an index.
     538              :             //
     539              :             // Similarly, index uploads can bypass uploads and deletes as long as neither the
     540              :             // uploaded index nor the active index references the file (the latter would be
     541              :             // incorrect use by the caller).
     542          830 :             (UploadOp::UploadLayer(u, umeta, _), UploadOp::UploadMetadata { uploaded: i })
     543        15137 :             | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, umeta, _)) => {
     544        15967 :                 let uname = u.layer_desc().layer_name();
     545        15967 :                 !i.references(&uname, umeta) && !index.references(&uname, umeta)
     546              :             }
     547        89722 :             (UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i })
     548          129 :             | (UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => {
     549        89851 :                 d.layers.iter().all(|(dname, dmeta)| {
     550        89851 :                     !i.references(dname, dmeta) && !index.references(dname, dmeta)
     551        89851 :                 })
     552              :             }
     553              : 
     554              :             // Indexes can never bypass each other. They can coalesce though, and
     555              :             // `UploadQueue::next_ready()` currently does this when possible.
     556         1245 :             (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
     557              :         }
     558      7073253 :     }
     559              : }
     560              : 
     561              : #[cfg(test)]
     562              : mod tests {
     563              :     use std::str::FromStr as _;
     564              : 
     565              :     use itertools::Itertools as _;
     566              :     use utils::shard::{ShardCount, ShardIndex, ShardNumber};
     567              : 
     568              :     use super::*;
     569              :     use crate::DEFAULT_PG_VERSION;
     570              :     use crate::tenant::Timeline;
     571              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
     572              :     use crate::tenant::storage_layer::Layer;
     573              :     use crate::tenant::storage_layer::layer::local_layer_path;
     574              : 
     575              :     /// Test helper which asserts that two operations are the same, in lieu of UploadOp PartialEq.
     576              :     #[track_caller]
     577          588 :     fn assert_same_op(a: &UploadOp, b: &UploadOp) {
     578              :         use UploadOp::*;
     579          588 :         match (a, b) {
     580          264 :             (UploadLayer(a, ameta, atype), UploadLayer(b, bmeta, btype)) => {
     581          264 :                 assert_eq!(a.layer_desc().layer_name(), b.layer_desc().layer_name());
     582          264 :                 assert_eq!(ameta, bmeta);
     583          264 :                 assert_eq!(atype, btype);
     584              :             }
     585          132 :             (Delete(a), Delete(b)) => assert_eq!(a.layers, b.layers),
     586          168 :             (UploadMetadata { uploaded: a }, UploadMetadata { uploaded: b }) => assert_eq!(a, b),
     587           24 :             (Barrier(_), Barrier(_)) => {}
     588            0 :             (Shutdown, Shutdown) => {}
     589            0 :             (a, b) => panic!("{a:?} != {b:?}"),
     590              :         }
     591          588 :     }
     592              : 
     593              :     /// Test helper which asserts that two sets of operations are the same.
     594              :     #[track_caller]
     595          132 :     fn assert_same_ops<'a>(
     596          132 :         a: impl IntoIterator<Item = &'a UploadOp>,
     597          132 :         b: impl IntoIterator<Item = &'a UploadOp>,
     598          132 :     ) {
     599          132 :         a.into_iter()
     600          132 :             .zip_eq(b)
     601          348 :             .for_each(|(a, b)| assert_same_op(a, b))
     602          132 :     }
     603              : 
     604              :     /// Test helper to construct a test timeline.
     605              :     ///
     606              :     /// TODO: it really shouldn't be necessary to construct an entire tenant and timeline just to
     607              :     /// test the upload queue -- decouple ResidentLayer from Timeline.
     608              :     ///
     609              :     /// TODO: the upload queue uses TimelineMetadata::example() instead, because there's no way to
     610              :     /// obtain a TimelineMetadata from a Timeline.
     611          144 :     fn make_timeline() -> Arc<Timeline> {
     612          144 :         // Grab the current test name from the current thread name.
     613          144 :         // TODO: TenantHarness shouldn't take a &'static str, but just leak the test name for now.
     614          144 :         let test_name = std::thread::current().name().unwrap().to_string();
     615          144 :         let test_name = Box::leak(test_name.into_boxed_str());
     616          144 : 
     617          144 :         let runtime = tokio::runtime::Builder::new_current_thread()
     618          144 :             .enable_all()
     619          144 :             .build()
     620          144 :             .expect("failed to create runtime");
     621          144 : 
     622          144 :         runtime
     623          144 :             .block_on(async {
     624          144 :                 let harness = TenantHarness::create(test_name).await?;
     625          144 :                 let (tenant, ctx) = harness.load().await;
     626          144 :                 tenant
     627          144 :                     .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
     628          144 :                     .await
     629          144 :             })
     630          144 :             .expect("failed to create timeline")
     631          144 :     }
     632              : 
     633              :     /// Test helper to construct an (empty) resident layer.
     634          360 :     fn make_layer(timeline: &Arc<Timeline>, name: &str) -> ResidentLayer {
     635          360 :         make_layer_with_size(timeline, name, 0)
     636          360 :     }
     637              : 
     638              :     /// Test helper to construct a resident layer with the given size.
     639          396 :     fn make_layer_with_size(timeline: &Arc<Timeline>, name: &str, size: usize) -> ResidentLayer {
     640          396 :         let metadata = LayerFileMetadata {
     641          396 :             generation: timeline.generation,
     642          396 :             shard: timeline.get_shard_index(),
     643          396 :             file_size: size as u64,
     644          396 :         };
     645          396 :         make_layer_with_metadata(timeline, name, metadata)
     646          396 :     }
     647              : 
     648              :     /// Test helper to construct a layer with the given metadata.
     649          588 :     fn make_layer_with_metadata(
     650          588 :         timeline: &Arc<Timeline>,
     651          588 :         name: &str,
     652          588 :         metadata: LayerFileMetadata,
     653          588 :     ) -> ResidentLayer {
     654          588 :         let name = LayerName::from_str(name).expect("invalid name");
     655          588 :         let local_path = local_layer_path(
     656          588 :             timeline.conf,
     657          588 :             &timeline.tenant_shard_id,
     658          588 :             &timeline.timeline_id,
     659          588 :             &name,
     660          588 :             &metadata.generation,
     661          588 :         );
     662          588 :         std::fs::write(&local_path, vec![0; metadata.file_size as usize])
     663          588 :             .expect("failed to write file");
     664          588 :         Layer::for_resident(timeline.conf, timeline, local_path, name, metadata)
     665          588 :     }
     666              : 
     667              :     /// Test helper to add a layer to an index and return a new index.
     668           72 :     fn index_with(index: &IndexPart, layer: &ResidentLayer) -> Box<IndexPart> {
     669           72 :         let mut index = index.clone();
     670           72 :         index
     671           72 :             .layer_metadata
     672           72 :             .insert(layer.layer_desc().layer_name(), layer.metadata());
     673           72 :         Box::new(index)
     674           72 :     }
     675              : 
     676              :     /// Test helper to remove a layer from an index and return a new index.
     677           24 :     fn index_without(index: &IndexPart, layer: &ResidentLayer) -> Box<IndexPart> {
     678           24 :         let mut index = index.clone();
     679           24 :         index
     680           24 :             .layer_metadata
     681           24 :             .remove(&layer.layer_desc().layer_name());
     682           24 :         Box::new(index)
     683           24 :     }
     684              : 
     685              :     /// Nothing can bypass a barrier, and it can't bypass inprogress tasks.
     686              :     #[test]
     687           12 :     fn schedule_barrier() -> anyhow::Result<()> {
     688           12 :         let mut queue = UploadQueue::Uninitialized;
     689           12 :         let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
     690           12 :         let tli = make_timeline();
     691           12 : 
     692           12 :         let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
     693           12 :         let layer0 = make_layer(
     694           12 :             &tli,
     695           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     696           12 :         );
     697           12 :         let layer1 = make_layer(
     698           12 :             &tli,
     699           12 :             "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     700           12 :         );
     701           12 :         let layer2 = make_layer(
     702           12 :             &tli,
     703           12 :             "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     704           12 :         );
     705           12 :         let layer3 = make_layer(
     706           12 :             &tli,
     707           12 :             "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     708           12 :         );
     709           12 :         let (barrier, _) = tokio::sync::watch::channel(());
     710           12 : 
     711           12 :         // Enqueue non-conflicting upload, delete, and index before and after a barrier.
     712           12 :         let ops = [
     713           12 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
     714           12 :             UploadOp::Delete(Delete {
     715           12 :                 layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
     716           12 :             }),
     717           12 :             UploadOp::UploadMetadata {
     718           12 :                 uploaded: index.clone(),
     719           12 :             },
     720           12 :             UploadOp::Barrier(barrier),
     721           12 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
     722           12 :             UploadOp::Delete(Delete {
     723           12 :                 layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
     724           12 :             }),
     725           12 :             UploadOp::UploadMetadata {
     726           12 :                 uploaded: index.clone(),
     727           12 :             },
     728           12 :         ];
     729           12 : 
     730           12 :         queue.queued_operations.extend(ops.clone());
     731           12 : 
     732           12 :         // Schedule the initial operations ahead of the barrier.
     733           12 :         let tasks = queue.schedule_ready();
     734           12 : 
     735           36 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]);
     736           12 :         assert!(matches!(
     737           12 :             queue.queued_operations.front(),
     738              :             Some(&UploadOp::Barrier(_))
     739              :         ));
     740              : 
     741              :         // Complete the initial operations. The barrier isn't scheduled while they're pending.
     742           48 :         for task in tasks {
     743           36 :             assert!(queue.schedule_ready().is_empty());
     744           36 :             queue.complete(task.task_id);
     745              :         }
     746              : 
     747              :         // Schedule the barrier. The later tasks won't schedule until it completes.
     748           12 :         let tasks = queue.schedule_ready();
     749           12 : 
     750           12 :         assert_eq!(tasks.len(), 1);
     751           12 :         assert!(matches!(tasks[0].op, UploadOp::Barrier(_)));
     752           12 :         assert_eq!(queue.queued_operations.len(), 3);
     753              : 
     754              :         // Complete the barrier. The rest of the tasks schedule immediately.
     755           12 :         queue.complete(tasks[0].task_id);
     756           12 : 
     757           12 :         let tasks = queue.schedule_ready();
     758           36 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops[4..]);
     759           12 :         assert!(queue.queued_operations.is_empty());
     760              : 
     761           12 :         Ok(())
     762           12 :     }
     763              : 
     764              :     /// Deletes can be scheduled in parallel, even if they're for the same file.
     765              :     #[test]
     766           12 :     fn schedule_delete_parallel() -> anyhow::Result<()> {
     767           12 :         let mut queue = UploadQueue::Uninitialized;
     768           12 :         let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
     769           12 :         let tli = make_timeline();
     770           12 : 
     771           12 :         // Enqueue a bunch of deletes, some with conflicting names.
     772           12 :         let layer0 = make_layer(
     773           12 :             &tli,
     774           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     775           12 :         );
     776           12 :         let layer1 = make_layer(
     777           12 :             &tli,
     778           12 :             "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     779           12 :         );
     780           12 :         let layer2 = make_layer(
     781           12 :             &tli,
     782           12 :             "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     783           12 :         );
     784           12 :         let layer3 = make_layer(
     785           12 :             &tli,
     786           12 :             "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     787           12 :         );
     788           12 : 
     789           12 :         let ops = [
     790           12 :             UploadOp::Delete(Delete {
     791           12 :                 layers: vec![(layer0.layer_desc().layer_name(), layer0.metadata())],
     792           12 :             }),
     793           12 :             UploadOp::Delete(Delete {
     794           12 :                 layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
     795           12 :             }),
     796           12 :             UploadOp::Delete(Delete {
     797           12 :                 layers: vec![
     798           12 :                     (layer1.layer_desc().layer_name(), layer1.metadata()),
     799           12 :                     (layer2.layer_desc().layer_name(), layer2.metadata()),
     800           12 :                 ],
     801           12 :             }),
     802           12 :             UploadOp::Delete(Delete {
     803           12 :                 layers: vec![(layer2.layer_desc().layer_name(), layer2.metadata())],
     804           12 :             }),
     805           12 :             UploadOp::Delete(Delete {
     806           12 :                 layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
     807           12 :             }),
     808           12 :         ];
     809           12 : 
     810           12 :         queue.queued_operations.extend(ops.clone());
     811           12 : 
     812           12 :         // Schedule all ready operations. Since deletes don't conflict, they're all scheduled.
     813           12 :         let tasks = queue.schedule_ready();
     814           12 : 
     815           60 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops);
     816           12 :         assert!(queue.queued_operations.is_empty());
     817              : 
     818           12 :         Ok(())
     819           12 :     }
     820              : 
     821              :     /// Conflicting uploads are serialized.
     822              :     #[test]
     823           12 :     fn schedule_upload_conflicts() -> anyhow::Result<()> {
     824           12 :         let mut queue = UploadQueue::Uninitialized;
     825           12 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
     826           12 :         let tli = make_timeline();
     827           12 : 
     828           12 :         // Enqueue three versions of the same layer, with different file sizes.
     829           12 :         let layer0a = make_layer_with_size(
     830           12 :             &tli,
     831           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     832           12 :             1,
     833           12 :         );
     834           12 :         let layer0b = make_layer_with_size(
     835           12 :             &tli,
     836           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     837           12 :             2,
     838           12 :         );
     839           12 :         let layer0c = make_layer_with_size(
     840           12 :             &tli,
     841           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     842           12 :             3,
     843           12 :         );
     844           12 : 
     845           12 :         let ops = [
     846           12 :             UploadOp::UploadLayer(layer0a.clone(), layer0a.metadata(), None),
     847           12 :             UploadOp::UploadLayer(layer0b.clone(), layer0b.metadata(), None),
     848           12 :             UploadOp::UploadLayer(layer0c.clone(), layer0c.metadata(), None),
     849           12 :         ];
     850           12 : 
     851           12 :         queue.queued_operations.extend(ops.clone());
     852              : 
     853              :         // Only one version should be scheduled and uploaded at a time.
     854           48 :         for op in ops {
     855           36 :             let tasks = queue.schedule_ready();
     856           36 :             assert_eq!(tasks.len(), 1);
     857           36 :             assert_same_op(&tasks[0].op, &op);
     858           36 :             queue.complete(tasks[0].task_id);
     859              :         }
     860           12 :         assert!(queue.schedule_ready().is_empty());
     861           12 :         assert!(queue.queued_operations.is_empty());
     862              : 
     863           12 :         Ok(())
     864           12 :     }
     865              : 
     866              :     /// Conflicting uploads and deletes are serialized.
     867              :     #[test]
     868           12 :     fn schedule_upload_delete_conflicts() -> anyhow::Result<()> {
     869           12 :         let mut queue = UploadQueue::Uninitialized;
     870           12 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
     871           12 :         let tli = make_timeline();
     872           12 : 
     873           12 :         // Enqueue two layer uploads, with a delete of both layers in between them. These should be
     874           12 :         // scheduled one at a time, since deletes can't bypass uploads and vice versa.
     875           12 :         let layer0 = make_layer(
     876           12 :             &tli,
     877           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     878           12 :         );
     879           12 :         let layer1 = make_layer(
     880           12 :             &tli,
     881           12 :             "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     882           12 :         );
     883           12 : 
     884           12 :         let ops = [
     885           12 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
     886           12 :             UploadOp::Delete(Delete {
     887           12 :                 layers: vec![
     888           12 :                     (layer0.layer_desc().layer_name(), layer0.metadata()),
     889           12 :                     (layer1.layer_desc().layer_name(), layer1.metadata()),
     890           12 :                 ],
     891           12 :             }),
     892           12 :             UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
     893           12 :         ];
     894           12 : 
     895           12 :         queue.queued_operations.extend(ops.clone());
     896              : 
     897              :         // Only one version should be scheduled and uploaded at a time.
     898           48 :         for op in ops {
     899           36 :             let tasks = queue.schedule_ready();
     900           36 :             assert_eq!(tasks.len(), 1);
     901           36 :             assert_same_op(&tasks[0].op, &op);
     902           36 :             queue.complete(tasks[0].task_id);
     903              :         }
     904           12 :         assert!(queue.schedule_ready().is_empty());
     905           12 :         assert!(queue.queued_operations.is_empty());
     906              : 
     907           12 :         Ok(())
     908           12 :     }
     909              : 
     910              :     /// Non-conflicting uploads and deletes can bypass the queue, avoiding the conflicting
     911              :     /// delete/upload operations at the head of the queue.
     912              :     #[test]
     913           12 :     fn schedule_upload_delete_conflicts_bypass() -> anyhow::Result<()> {
     914           12 :         let mut queue = UploadQueue::Uninitialized;
     915           12 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
     916           12 :         let tli = make_timeline();
     917           12 : 
     918           12 :         // Enqueue two layer uploads, with a delete of both layers in between them. These should be
     919           12 :         // scheduled one at a time, since deletes can't bypass uploads and vice versa.
     920           12 :         //
     921           12 :         // Also enqueue non-conflicting uploads and deletes at the end. These can bypass the queue
     922           12 :         // and run immediately.
     923           12 :         let layer0 = make_layer(
     924           12 :             &tli,
     925           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     926           12 :         );
     927           12 :         let layer1 = make_layer(
     928           12 :             &tli,
     929           12 :             "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     930           12 :         );
     931           12 :         let layer2 = make_layer(
     932           12 :             &tli,
     933           12 :             "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     934           12 :         );
     935           12 :         let layer3 = make_layer(
     936           12 :             &tli,
     937           12 :             "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     938           12 :         );
     939           12 : 
     940           12 :         let ops = [
     941           12 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
     942           12 :             UploadOp::Delete(Delete {
     943           12 :                 layers: vec![
     944           12 :                     (layer0.layer_desc().layer_name(), layer0.metadata()),
     945           12 :                     (layer1.layer_desc().layer_name(), layer1.metadata()),
     946           12 :                 ],
     947           12 :             }),
     948           12 :             UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
     949           12 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
     950           12 :             UploadOp::Delete(Delete {
     951           12 :                 layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
     952           12 :             }),
     953           12 :         ];
     954           12 : 
     955           12 :         queue.queued_operations.extend(ops.clone());
     956           12 : 
     957           12 :         // Operations 0, 3, and 4 are scheduled immediately.
     958           12 :         let tasks = queue.schedule_ready();
     959           36 :         assert_same_ops(tasks.iter().map(|t| &t.op), [&ops[0], &ops[3], &ops[4]]);
     960           12 :         assert_eq!(queue.queued_operations.len(), 2);
     961              : 
     962           12 :         Ok(())
     963           12 :     }
     964              : 
     965              :     /// Non-conflicting uploads are parallelized.
     966              :     #[test]
     967           12 :     fn schedule_upload_parallel() -> anyhow::Result<()> {
     968           12 :         let mut queue = UploadQueue::Uninitialized;
     969           12 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
     970           12 :         let tli = make_timeline();
     971           12 : 
     972           12 :         // Enqueue three different layer uploads.
     973           12 :         let layer0 = make_layer(
     974           12 :             &tli,
     975           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     976           12 :         );
     977           12 :         let layer1 = make_layer(
     978           12 :             &tli,
     979           12 :             "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     980           12 :         );
     981           12 :         let layer2 = make_layer(
     982           12 :             &tli,
     983           12 :             "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
     984           12 :         );
     985           12 : 
     986           12 :         let ops = [
     987           12 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
     988           12 :             UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
     989           12 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
     990           12 :         ];
     991           12 : 
     992           12 :         queue.queued_operations.extend(ops.clone());
     993           12 : 
     994           12 :         // All uploads should be scheduled concurrently.
     995           12 :         let tasks = queue.schedule_ready();
     996           12 : 
     997           36 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops);
     998           12 :         assert!(queue.queued_operations.is_empty());
     999              : 
    1000           12 :         Ok(())
    1001           12 :     }
    1002              : 
    1003              :     /// Index uploads are coalesced.
    1004              :     #[test]
    1005           12 :     fn schedule_index_coalesce() -> anyhow::Result<()> {
    1006           12 :         let mut queue = UploadQueue::Uninitialized;
    1007           12 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
    1008              : 
    1009              :         // Enqueue three uploads of the current empty index.
    1010           12 :         let index = Box::new(queue.clean.0.clone());
    1011           12 : 
    1012           12 :         let ops = [
    1013           12 :             UploadOp::UploadMetadata {
    1014           12 :                 uploaded: index.clone(),
    1015           12 :             },
    1016           12 :             UploadOp::UploadMetadata {
    1017           12 :                 uploaded: index.clone(),
    1018           12 :             },
    1019           12 :             UploadOp::UploadMetadata {
    1020           12 :                 uploaded: index.clone(),
    1021           12 :             },
    1022           12 :         ];
    1023           12 : 
    1024           12 :         queue.queued_operations.extend(ops.clone());
    1025           12 : 
    1026           12 :         // The index uploads are coalesced into a single operation.
    1027           12 :         let tasks = queue.schedule_ready();
    1028           12 :         assert_eq!(tasks.len(), 1);
    1029           12 :         assert_same_op(&tasks[0].op, &ops[2]);
    1030           12 :         assert_same_ops(&tasks[0].coalesced_ops, &ops[0..2]);
    1031           12 : 
    1032           12 :         assert!(queue.queued_operations.is_empty());
    1033              : 
    1034           12 :         Ok(())
    1035           12 :     }
    1036              : 
    1037              :     /// Chains of upload/index operations lead to parallel layer uploads and serial index uploads.
    1038              :     /// This is the common case with layer flushes.
    1039              :     #[test]
    1040           12 :     fn schedule_index_upload_chain() -> anyhow::Result<()> {
    1041           12 :         let mut queue = UploadQueue::Uninitialized;
    1042           12 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
    1043           12 :         let tli = make_timeline();
    1044           12 : 
    1045           12 :         // Enqueue three uploads of the current empty index.
    1046           12 :         let index = Box::new(queue.clean.0.clone());
    1047           12 :         let layer0 = make_layer(
    1048           12 :             &tli,
    1049           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1050           12 :         );
    1051           12 :         let index0 = index_with(&index, &layer0);
    1052           12 :         let layer1 = make_layer(
    1053           12 :             &tli,
    1054           12 :             "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1055           12 :         );
    1056           12 :         let index1 = index_with(&index0, &layer1);
    1057           12 :         let layer2 = make_layer(
    1058           12 :             &tli,
    1059           12 :             "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1060           12 :         );
    1061           12 :         let index2 = index_with(&index1, &layer2);
    1062           12 : 
    1063           12 :         let ops = [
    1064           12 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
    1065           12 :             UploadOp::UploadMetadata {
    1066           12 :                 uploaded: index0.clone(),
    1067           12 :             },
    1068           12 :             UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
    1069           12 :             UploadOp::UploadMetadata {
    1070           12 :                 uploaded: index1.clone(),
    1071           12 :             },
    1072           12 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
    1073           12 :             UploadOp::UploadMetadata {
    1074           12 :                 uploaded: index2.clone(),
    1075           12 :             },
    1076           12 :         ];
    1077           12 : 
    1078           12 :         queue.queued_operations.extend(ops.clone());
    1079           12 : 
    1080           12 :         // The layer uploads should be scheduled immediately. The indexes must wait.
    1081           12 :         let upload_tasks = queue.schedule_ready();
    1082           12 :         assert_same_ops(
    1083           36 :             upload_tasks.iter().map(|t| &t.op),
    1084           12 :             [&ops[0], &ops[2], &ops[4]],
    1085           12 :         );
    1086           12 : 
    1087           12 :         // layer2 completes first. None of the indexes can upload yet.
    1088           12 :         queue.complete(upload_tasks[2].task_id);
    1089           12 :         assert!(queue.schedule_ready().is_empty());
    1090              : 
    1091              :         // layer0 completes. index0 can upload. It completes.
    1092           12 :         queue.complete(upload_tasks[0].task_id);
    1093           12 :         let index_tasks = queue.schedule_ready();
    1094           12 :         assert_eq!(index_tasks.len(), 1);
    1095           12 :         assert_same_op(&index_tasks[0].op, &ops[1]);
    1096           12 :         queue.complete(index_tasks[0].task_id);
    1097           12 : 
    1098           12 :         // layer 1 completes. This unblocks index 1 and 2, which coalesce into
    1099           12 :         // a single upload for index 2.
    1100           12 :         queue.complete(upload_tasks[1].task_id);
    1101           12 : 
    1102           12 :         let index_tasks = queue.schedule_ready();
    1103           12 :         assert_eq!(index_tasks.len(), 1);
    1104           12 :         assert_same_op(&index_tasks[0].op, &ops[5]);
    1105           12 :         assert_same_ops(&index_tasks[0].coalesced_ops, &ops[3..4]);
    1106           12 : 
    1107           12 :         assert!(queue.queued_operations.is_empty());
    1108              : 
    1109           12 :         Ok(())
    1110           12 :     }
    1111              : 
    1112              :     /// A delete can't bypass an index upload if an index ahead of it still references it.
    1113              :     #[test]
    1114           12 :     fn schedule_index_delete_dereferenced() -> anyhow::Result<()> {
    1115           12 :         let mut queue = UploadQueue::Uninitialized;
    1116           12 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
    1117           12 :         let tli = make_timeline();
    1118           12 : 
    1119           12 :         // Create a layer to upload.
    1120           12 :         let layer = make_layer(
    1121           12 :             &tli,
    1122           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1123           12 :         );
    1124           12 :         let index_upload = index_with(&queue.clean.0, &layer);
    1125           12 : 
    1126           12 :         // Remove the layer reference in a new index, then delete the layer.
    1127           12 :         let index_deref = index_without(&index_upload, &layer);
    1128           12 : 
    1129           12 :         let ops = [
    1130           12 :             // Initial upload, with a barrier to prevent index coalescing.
    1131           12 :             UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
    1132           12 :             UploadOp::UploadMetadata {
    1133           12 :                 uploaded: index_upload.clone(),
    1134           12 :             },
    1135           12 :             UploadOp::Barrier(tokio::sync::watch::channel(()).0),
    1136           12 :             // Dereference the layer and delete it.
    1137           12 :             UploadOp::UploadMetadata {
    1138           12 :                 uploaded: index_deref.clone(),
    1139           12 :             },
    1140           12 :             UploadOp::Delete(Delete {
    1141           12 :                 layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
    1142           12 :             }),
    1143           12 :         ];
    1144           12 : 
    1145           12 :         queue.queued_operations.extend(ops.clone());
    1146              : 
    1147              :         // Operations are serialized.
    1148           72 :         for op in ops {
    1149           60 :             let tasks = queue.schedule_ready();
    1150           60 :             assert_eq!(tasks.len(), 1);
    1151           60 :             assert_same_op(&tasks[0].op, &op);
    1152           60 :             queue.complete(tasks[0].task_id);
    1153              :         }
    1154           12 :         assert!(queue.queued_operations.is_empty());
    1155              : 
    1156           12 :         Ok(())
    1157           12 :     }
    1158              : 
    1159              :     /// An upload with a reused layer name doesn't clobber the previous layer. Specifically, a
    1160              :     /// dereference/upload/reference cycle can't allow the upload to bypass the reference.
    1161              :     #[test]
    1162           12 :     fn schedule_index_upload_dereferenced() -> anyhow::Result<()> {
    1163           12 :         let mut queue = UploadQueue::Uninitialized;
    1164           12 :         let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example(), 0)?;
    1165           12 :         let tli = make_timeline();
    1166           12 : 
    1167           12 :         // Create a layer to upload.
    1168           12 :         let layer = make_layer(
    1169           12 :             &tli,
    1170           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1171           12 :         );
    1172           12 : 
    1173           12 :         // Upload the layer. Then dereference the layer, and upload/reference it again.
    1174           12 :         let index_upload = index_with(&queue.clean.0, &layer);
    1175           12 :         let index_deref = index_without(&index_upload, &layer);
    1176           12 :         let index_ref = index_with(&index_deref, &layer);
    1177           12 : 
    1178           12 :         let ops = [
    1179           12 :             // Initial upload, with a barrier to prevent index coalescing.
    1180           12 :             UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
    1181           12 :             UploadOp::UploadMetadata {
    1182           12 :                 uploaded: index_upload.clone(),
    1183           12 :             },
    1184           12 :             UploadOp::Barrier(tokio::sync::watch::channel(()).0),
    1185           12 :             // Dereference the layer.
    1186           12 :             UploadOp::UploadMetadata {
    1187           12 :                 uploaded: index_deref.clone(),
    1188           12 :             },
    1189           12 :             // Replace and reference the layer.
    1190           12 :             UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
    1191           12 :             UploadOp::UploadMetadata {
    1192           12 :                 uploaded: index_ref.clone(),
    1193           12 :             },
    1194           12 :         ];
    1195           12 : 
    1196           12 :         queue.queued_operations.extend(ops.clone());
    1197              : 
    1198              :         // Operations are serialized.
    1199           84 :         for op in ops {
    1200           72 :             let tasks = queue.schedule_ready();
    1201           72 :             assert_eq!(tasks.len(), 1);
    1202           72 :             assert_same_op(&tasks[0].op, &op);
    1203           72 :             queue.complete(tasks[0].task_id);
    1204              :         }
    1205           12 :         assert!(queue.queued_operations.is_empty());
    1206              : 
    1207           12 :         Ok(())
    1208           12 :     }
    1209              : 
    1210              :     /// Nothing can bypass a shutdown, and it waits for inprogress tasks. It's never returned from
    1211              :     /// next_ready(), but is left at the head of the queue.
    1212              :     #[test]
    1213           12 :     fn schedule_shutdown() -> anyhow::Result<()> {
    1214           12 :         let mut queue = UploadQueue::Uninitialized;
    1215           12 :         let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 0)?;
    1216           12 :         let tli = make_timeline();
    1217           12 : 
    1218           12 :         let index = Box::new(queue.clean.0.clone()); // empty, doesn't matter
    1219           12 :         let layer0 = make_layer(
    1220           12 :             &tli,
    1221           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1222           12 :         );
    1223           12 :         let layer1 = make_layer(
    1224           12 :             &tli,
    1225           12 :             "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1226           12 :         );
    1227           12 :         let layer2 = make_layer(
    1228           12 :             &tli,
    1229           12 :             "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1230           12 :         );
    1231           12 :         let layer3 = make_layer(
    1232           12 :             &tli,
    1233           12 :             "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1234           12 :         );
    1235           12 : 
    1236           12 :         // Enqueue non-conflicting upload, delete, and index before and after a shutdown.
    1237           12 :         let ops = [
    1238           12 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
    1239           12 :             UploadOp::Delete(Delete {
    1240           12 :                 layers: vec![(layer1.layer_desc().layer_name(), layer1.metadata())],
    1241           12 :             }),
    1242           12 :             UploadOp::UploadMetadata {
    1243           12 :                 uploaded: index.clone(),
    1244           12 :             },
    1245           12 :             UploadOp::Shutdown,
    1246           12 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
    1247           12 :             UploadOp::Delete(Delete {
    1248           12 :                 layers: vec![(layer3.layer_desc().layer_name(), layer3.metadata())],
    1249           12 :             }),
    1250           12 :             UploadOp::UploadMetadata {
    1251           12 :                 uploaded: index.clone(),
    1252           12 :             },
    1253           12 :         ];
    1254           12 : 
    1255           12 :         queue.queued_operations.extend(ops.clone());
    1256           12 : 
    1257           12 :         // Schedule the initial operations ahead of the shutdown.
    1258           12 :         let tasks = queue.schedule_ready();
    1259           12 : 
    1260           36 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..3]);
    1261           12 :         assert!(matches!(
    1262           12 :             queue.queued_operations.front(),
    1263              :             Some(&UploadOp::Shutdown)
    1264              :         ));
    1265              : 
    1266              :         // Complete the initial operations. The shutdown isn't triggered while they're pending.
    1267           48 :         for task in tasks {
    1268           36 :             assert!(queue.schedule_ready().is_empty());
    1269           36 :             queue.complete(task.task_id);
    1270              :         }
    1271              : 
    1272              :         // The shutdown is triggered the next time we try to pull an operation. It isn't returned,
    1273              :         // but is left in the queue.
    1274           12 :         assert!(!queue.shutdown_ready.is_closed());
    1275           12 :         assert!(queue.next_ready().is_none());
    1276           12 :         assert!(queue.shutdown_ready.is_closed());
    1277              : 
    1278           12 :         Ok(())
    1279           12 :     }
    1280              : 
    1281              :     /// Scheduling respects inprogress_limit.
    1282              :     #[test]
    1283           12 :     fn schedule_inprogress_limit() -> anyhow::Result<()> {
    1284           12 :         // Create a queue with inprogress_limit=2.
    1285           12 :         let mut queue = UploadQueue::Uninitialized;
    1286           12 :         let queue = queue.initialize_empty_remote(&TimelineMetadata::example(), 2)?;
    1287           12 :         let tli = make_timeline();
    1288           12 : 
    1289           12 :         // Enqueue a bunch of uploads.
    1290           12 :         let layer0 = make_layer(
    1291           12 :             &tli,
    1292           12 :             "000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1293           12 :         );
    1294           12 :         let layer1 = make_layer(
    1295           12 :             &tli,
    1296           12 :             "100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1297           12 :         );
    1298           12 :         let layer2 = make_layer(
    1299           12 :             &tli,
    1300           12 :             "200000000000000000000000000000000000-300000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1301           12 :         );
    1302           12 :         let layer3 = make_layer(
    1303           12 :             &tli,
    1304           12 :             "300000000000000000000000000000000000-400000000000000000000000000000000000__00000000016B59D8-00000000016B5A51",
    1305           12 :         );
    1306           12 : 
    1307           12 :         let ops = [
    1308           12 :             UploadOp::UploadLayer(layer0.clone(), layer0.metadata(), None),
    1309           12 :             UploadOp::UploadLayer(layer1.clone(), layer1.metadata(), None),
    1310           12 :             UploadOp::UploadLayer(layer2.clone(), layer2.metadata(), None),
    1311           12 :             UploadOp::UploadLayer(layer3.clone(), layer3.metadata(), None),
    1312           12 :         ];
    1313           12 : 
    1314           12 :         queue.queued_operations.extend(ops.clone());
    1315           12 : 
    1316           12 :         // Schedule all ready operations. Only 2 are scheduled.
    1317           12 :         let tasks = queue.schedule_ready();
    1318           24 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops[0..2]);
    1319           12 :         assert!(queue.next_ready().is_none());
    1320              : 
    1321              :         // When one completes, another is scheduled.
    1322           12 :         queue.complete(tasks[0].task_id);
    1323           12 :         let tasks = queue.schedule_ready();
    1324           12 :         assert_same_ops(tasks.iter().map(|t| &t.op), &ops[2..3]);
    1325           12 : 
    1326           12 :         Ok(())
    1327           12 :     }
    1328              : 
    1329              :     /// Tests that can_bypass takes name, generation and shard index into account for all operations.
    1330              :     #[test]
    1331           12 :     fn can_bypass_path() -> anyhow::Result<()> {
    1332           12 :         let tli = make_timeline();
    1333           12 : 
    1334           12 :         let name0 = &"000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51";
    1335           12 :         let name1 = &"100000000000000000000000000000000000-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51";
    1336              : 
    1337              :         // Asserts that layers a and b either can or can't bypass each other, for all combinations
    1338              :         // of operations (except Delete and UploadMetadata which are special-cased).
    1339              :         #[track_caller]
    1340           96 :         fn assert_can_bypass(a: ResidentLayer, b: ResidentLayer, can_bypass: bool) {
    1341           96 :             let index = IndexPart::empty(TimelineMetadata::example());
    1342          288 :             for (a, b) in make_ops(a).into_iter().zip(make_ops(b)) {
    1343          288 :                 match (&a, &b) {
    1344              :                     // Deletes can always bypass each other.
    1345           96 :                     (UploadOp::Delete(_), UploadOp::Delete(_)) => assert!(a.can_bypass(&b, &index)),
    1346              :                     // Indexes can never bypass each other.
    1347              :                     (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => {
    1348           96 :                         assert!(!a.can_bypass(&b, &index))
    1349              :                     }
    1350              :                     // For other operations, assert as requested.
    1351           96 :                     (a, b) => assert_eq!(a.can_bypass(b, &index), can_bypass),
    1352              :                 }
    1353              :             }
    1354           96 :         }
    1355              : 
    1356          192 :         fn make_ops(layer: ResidentLayer) -> Vec<UploadOp> {
    1357          192 :             let mut index = IndexPart::empty(TimelineMetadata::example());
    1358          192 :             index
    1359          192 :                 .layer_metadata
    1360          192 :                 .insert(layer.layer_desc().layer_name(), layer.metadata());
    1361          192 :             vec![
    1362          192 :                 UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
    1363          192 :                 UploadOp::Delete(Delete {
    1364          192 :                     layers: vec![(layer.layer_desc().layer_name(), layer.metadata())],
    1365          192 :                 }),
    1366          192 :                 UploadOp::UploadMetadata {
    1367          192 :                     uploaded: Box::new(index),
    1368          192 :                 },
    1369          192 :             ]
    1370          192 :         }
    1371              : 
    1372              :         // Makes a ResidentLayer.
    1373          192 :         let layer = |name: &'static str, shard: Option<u8>, generation: u32| -> ResidentLayer {
    1374          192 :             let shard = shard
    1375          192 :                 .map(|n| ShardIndex::new(ShardNumber(n), ShardCount(8)))
    1376          192 :                 .unwrap_or(ShardIndex::unsharded());
    1377          192 :             let metadata = LayerFileMetadata {
    1378          192 :                 shard,
    1379          192 :                 generation: Generation::Valid(generation),
    1380          192 :                 file_size: 0,
    1381          192 :             };
    1382          192 :             make_layer_with_metadata(&tli, name, metadata)
    1383          192 :         };
    1384              : 
    1385              :         // Same name and metadata can't bypass. This goes both for unsharded and sharded, as well as
    1386              :         // 0 or >0 generation.
    1387           12 :         assert_can_bypass(layer(name0, None, 0), layer(name0, None, 0), false);
    1388           12 :         assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(0), 0), false);
    1389           12 :         assert_can_bypass(layer(name0, None, 1), layer(name0, None, 1), false);
    1390           12 : 
    1391           12 :         // Different names can bypass.
    1392           12 :         assert_can_bypass(layer(name0, None, 0), layer(name1, None, 0), true);
    1393           12 : 
    1394           12 :         // Different shards can bypass. Shard 0 is different from unsharded.
    1395           12 :         assert_can_bypass(layer(name0, Some(0), 0), layer(name0, Some(1), 0), true);
    1396           12 :         assert_can_bypass(layer(name0, Some(0), 0), layer(name0, None, 0), true);
    1397           12 : 
    1398           12 :         // Different generations can bypass, both sharded and unsharded.
    1399           12 :         assert_can_bypass(layer(name0, None, 0), layer(name0, None, 1), true);
    1400           12 :         assert_can_bypass(layer(name0, Some(1), 0), layer(name0, Some(1), 1), true);
    1401           12 : 
    1402           12 :         Ok(())
    1403           12 :     }
    1404              : }
        

Generated by: LCOV version 2.1-beta