LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - remote_timeline_client.rs (source / functions) Coverage Total Hit UBC GIC CBC ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 96.9 % 1182 1145 37 1145
Current Date: 2023-10-19 02:04:12 Functions: 81.5 % 130 106 24 3 103 3
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! This module manages synchronizing local FS with remote storage.
       2                 : //!
       3                 : //! # Overview
       4                 : //!
       5                 : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
       6                 : //!   It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
       7                 : //!   when it's safe to do so.
       8                 : //!
       9                 : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
      10                 : //!
      11                 : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
      12                 : //!
      13                 : //! # APIs & How To Use Them
      14                 : //!
      15                 : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
      16                 : //! unless the pageserver is configured without remote storage.
      17                 : //!
      18                 : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
      19                 : //! either in [`crate::tenant::mgr`] during startup or when creating a new
      20                 : //! timeline.
      21                 : //! However, the client does not become ready for use until we've initialized its upload queue:
      22                 : //!
      23                 : //! - For timelines that already have some state on the remote storage, we use
      24                 : //!   [`RemoteTimelineClient::init_upload_queue`] .
      25                 : //! - For newly created timelines, we use
      26                 : //!   [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
      27                 : //!
      28                 : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
      29                 : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
      30                 : //!
      31                 : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
      32                 : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
      33                 : //!
      34                 : //! - [`RemoteTimelineClient::schedule_layer_file_upload`]  when we've created a new layer file.
      35                 : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
      36                 : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
      37                 : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
      38                 : //!
      39                 : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
      40                 : //!
      41                 : //! There are also APIs for downloading files.
      42                 : //! These are not part of the aforementioned queuing and will not be discussed
      43                 : //! further here, except in the section covering tenant attach.
      44                 : //!
      45                 : //! # Remote Storage Structure & [`IndexPart`] Index File
      46                 : //!
      47                 : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
      48                 : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
      49                 : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
      50                 : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
      51                 : //! files for a given timeline.
      52                 : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
      53                 : //!
      54                 : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
      55                 : //!
      56                 : //! # Consistency
      57                 : //!
      58                 : //! To have a consistent remote structure, it's important that uploads and
      59                 : //! deletions are performed in the right order. For example, the index file
      60                 : //! contains a list of layer files, so it must not be uploaded until all the
      61                 : //! layer files that are in its list have been successfully uploaded.
      62                 : //!
      63                 : //! The contract between client and its user is that the user is responsible of
      64                 : //! scheduling operations in an order that keeps the remote consistent as
      65                 : //! described above.
      66                 : //! From the user's perspective, the operations are executed sequentially.
      67                 : //! Internally, the client knows which operations can be performed in parallel,
      68                 : //! and which operations act like a "barrier" that require preceding operations
      69                 : //! to finish. The calling code just needs to call the schedule-functions in the
      70                 : //! correct order, and the client will parallelize the operations in a way that
      71                 : //! is safe.
      72                 : //!
      73                 : //! The caller should be careful with deletion, though. They should not delete
      74                 : //! local files that have been scheduled for upload but not yet finished uploading.
      75                 : //! Otherwise the upload will fail. To wait for an upload to finish, use
      76                 : //! the 'wait_completion' function (more on that later.)
      77                 : //!
      78                 : //! All of this relies on the following invariants:
      79                 : //!
      80                 : //! - We rely on read-after write consistency in the remote storage.
      81                 : //! - Layer files are immutable
      82                 : //!
      83                 : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
      84                 : //! storage. Different tenants can be attached to different pageservers, but if the
      85                 : //! same tenant is attached to two pageservers at the same time, they will overwrite
      86                 : //! each other's index file updates, and confusion will ensue. There's no interlock or
      87                 : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
      88                 : //! that that doesn't happen.
      89                 : //!
      90                 : //! ## Implementation Note
      91                 : //!
      92                 : //! The *actual* remote state lags behind the *desired* remote state while
      93                 : //! there are in-flight operations.
      94                 : //! We keep track of the desired remote state in
      95                 : //! [`UploadQueueInitialized::latest_files`] and [`UploadQueueInitialized::latest_metadata`].
      96                 : //! It is initialized based on the [`IndexPart`] that was passed during init
      97                 : //! and updated with every `schedule_*` function call.
      98                 : //! All this is necessary necessary to compute the future [`IndexPart`]s
      99                 : //! when scheduling an operation while other operations that also affect the
     100                 : //! remote [`IndexPart`] are in flight.
     101                 : //!
     102                 : //! # Retries & Error Handling
     103                 : //!
     104                 : //! The client retries operations indefinitely, using exponential back-off.
     105                 : //! There is no way to force a retry, i.e., interrupt the back-off.
     106                 : //! This could be built easily.
     107                 : //!
     108                 : //! # Cancellation
     109                 : //!
     110                 : //! The operations execute as plain [`task_mgr`] tasks, scoped to
     111                 : //! the client's tenant and timeline.
     112                 : //! Dropping the client will drop queued operations but not executing operations.
     113                 : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
     114                 : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
     115                 : //!
     116                 : //! # Completion
     117                 : //!
     118                 : //! Once an operation has completed, we update
     119                 : //! [`UploadQueueInitialized::projected_remote_consistent_lsn`] immediately,
     120                 : //! and submit a request through the DeletionQueue to update
     121                 : //! [`UploadQueueInitialized::visible_remote_consistent_lsn`] after it has
     122                 : //! validated that our generation is not stale.  It is this visible value
     123                 : //! that is advertized to safekeepers as a signal that that they can
     124                 : //! delete the WAL up to that LSN.
     125                 : //!
     126                 : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
     127                 : //! for all pending operations to complete. It does not prevent more
     128                 : //! operations from getting scheduled.
     129                 : //!
     130                 : //! # Crash Consistency
     131                 : //!
     132                 : //! We do not persist the upload queue state.
     133                 : //! If we drop the client, or crash, all unfinished operations are lost.
     134                 : //!
     135                 : //! To recover, the following steps need to be taken:
     136                 : //! - Retrieve the current remote [`IndexPart`]. This gives us a
     137                 : //!   consistent remote state, assuming the user scheduled the operations in
     138                 : //!   the correct order.
     139                 : //! - Initiate upload queue with that [`IndexPart`].
     140                 : //! - Reschedule all lost operations by comparing the local filesystem state
     141                 : //!   and remote state as per [`IndexPart`]. This is done in
     142                 : //!   [`Tenant::timeline_init_and_sync`].
     143                 : //!
     144                 : //! Note that if we crash during file deletion between the index update
     145                 : //! that removes the file from the list of files, and deleting the remote file,
     146                 : //! the file is leaked in the remote storage. Similarly, if a new file is created
     147                 : //! and uploaded, but the pageserver dies permanently before updating the
     148                 : //! remote index file, the new file is leaked in remote storage. We accept and
     149                 : //! tolerate that for now.
     150                 : //! Note further that we cannot easily fix this by scheduling deletes for every
     151                 : //! file that is present only on the remote, because we cannot distinguish the
     152                 : //! following two cases:
     153                 : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
     154                 : //!   but crashed before it finished remotely.
     155                 : //! - (2) We never had the file locally because we haven't on-demand downloaded
     156                 : //!   it yet.
     157                 : //!
     158                 : //! # Downloads
     159                 : //!
     160                 : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
     161                 : //! downloading files from the remote storage. Downloads are performed immediately
     162                 : //! against the `RemoteStorage`, independently of the upload queue.
     163                 : //!
     164                 : //! When we attach a tenant, we perform the following steps:
     165                 : //! - create `Tenant` object in `TenantState::Attaching` state
     166                 : //! - List timelines that are present in remote storage, and for each:
     167                 : //!   - download their remote [`IndexPart`]s
     168                 : //!   - create `Timeline` struct and a `RemoteTimelineClient`
     169                 : //!   - initialize the client's upload queue with its `IndexPart`
     170                 : //!   - create [`RemoteLayer`](super::storage_layer::RemoteLayer) instances
     171                 : //!     for layers that are referenced by `IndexPart` but not present locally
     172                 : //!   - schedule uploads for layers that are only present locally.
     173                 : //!   - if the remote `IndexPart`'s metadata was newer than the metadata in
     174                 : //!     the local filesystem, write the remote metadata to the local filesystem
     175                 : //! - After the above is done for each timeline, open the tenant for business by
     176                 : //!   transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
     177                 : //!   This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
     178                 : //!
     179                 : //! We keep track of the fact that a client is in `Attaching` state in a marker
     180                 : //! file on the local disk. This is critical because, when we restart the pageserver,
     181                 : //! we do not want to do the `List timelines` step for each tenant that has already
     182                 : //! been successfully attached (for performance & cost reasons).
     183                 : //! Instead, for a tenant without the attach marker file, we assume that the
     184                 : //! local state is in sync or ahead of the remote state. This includes the list
     185                 : //! of all of the tenant's timelines, which is particularly critical to be up-to-date:
     186                 : //! if there's a timeline on the remote that the pageserver doesn't know about,
     187                 : //! the GC will not consider its branch point, leading to data loss.
     188                 : //! So, for a tenant with the attach marker file, we know that we do not yet have
     189                 : //! persisted all the remote timeline's metadata files locally. To exclude the
     190                 : //! risk above, we re-run the procedure for such tenants
     191                 : //!
     192                 : //! # Operating Without Remote Storage
     193                 : //!
     194                 : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
     195                 : //! not created and the uploads are skipped.
     196                 : //! Theoretically, it should be ok to remove and re-add remote storage configuration to
     197                 : //! the pageserver config at any time, since it doesn't make a difference to
     198                 : //! [`Timeline::load_layer_map`].
     199                 : //! Of course, the remote timeline dir must not change while we have de-configured
     200                 : //! remote storage, i.e., the pageserver must remain the owner of the given prefix
     201                 : //! in remote storage.
     202                 : //! But note that we don't test any of this right now.
     203                 : //!
     204                 : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
     205                 : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
     206                 : 
     207                 : mod download;
     208                 : pub mod index;
     209                 : mod upload;
     210                 : 
     211                 : use anyhow::Context;
     212                 : use camino::Utf8Path;
     213                 : use chrono::{NaiveDateTime, Utc};
     214                 : // re-export these
     215                 : pub use download::{is_temp_download_file, list_remote_timelines};
     216                 : use scopeguard::ScopeGuard;
     217                 : use tokio_util::sync::CancellationToken;
     218                 : use utils::backoff::{
     219                 :     self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
     220                 : };
     221                 : 
     222                 : use std::collections::{HashMap, VecDeque};
     223                 : use std::sync::atomic::{AtomicU32, Ordering};
     224                 : use std::sync::{Arc, Mutex};
     225                 : 
     226                 : use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
     227                 : use std::ops::DerefMut;
     228                 : use tracing::{debug, error, info, instrument, warn};
     229                 : use tracing::{info_span, Instrument};
     230                 : use utils::lsn::Lsn;
     231                 : 
     232                 : use crate::deletion_queue::DeletionQueueClient;
     233                 : use crate::metrics::{
     234                 :     MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
     235                 :     RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
     236                 :     REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
     237                 : };
     238                 : use crate::task_mgr::shutdown_token;
     239                 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
     240                 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
     241                 : use crate::tenant::upload_queue::Delete;
     242                 : use crate::tenant::TIMELINES_SEGMENT_NAME;
     243                 : use crate::{
     244                 :     config::PageServerConf,
     245                 :     task_mgr,
     246                 :     task_mgr::TaskKind,
     247                 :     task_mgr::BACKGROUND_RUNTIME,
     248                 :     tenant::metadata::TimelineMetadata,
     249                 :     tenant::upload_queue::{
     250                 :         UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
     251                 :     },
     252                 : };
     253                 : 
     254                 : use utils::id::{TenantId, TimelineId};
     255                 : 
     256                 : use self::index::IndexPart;
     257                 : 
     258                 : use super::storage_layer::LayerFileName;
     259                 : use super::upload_queue::SetDeletedFlagProgress;
     260                 : use super::Generation;
     261                 : 
     262                 : // Occasional network issues and such can cause remote operations to fail, and
     263                 : // that's expected. If a download fails, we log it at info-level, and retry.
     264                 : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
     265                 : // level instead, as repeated failures can mean a more serious problem. If it
     266                 : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
     267                 : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
     268                 : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
     269                 : 
     270                 : // Similarly log failed uploads and deletions at WARN level, after this many
     271                 : // retries. Uploads and deletions are retried forever, though.
     272                 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
     273                 : 
     274                 : pub enum MaybeDeletedIndexPart {
     275                 :     IndexPart(IndexPart),
     276                 :     Deleted(IndexPart),
     277                 : }
     278                 : 
     279                 : /// Errors that can arise when calling [`RemoteTimelineClient::stop`].
     280 UBC           0 : #[derive(Debug, thiserror::Error)]
     281                 : pub enum StopError {
     282                 :     /// Returned if the upload queue was never initialized.
     283                 :     /// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
     284                 :     #[error("queue is not initialized")]
     285                 :     QueueUninitialized,
     286                 : }
     287                 : 
     288               0 : #[derive(Debug, thiserror::Error)]
     289                 : pub enum PersistIndexPartWithDeletedFlagError {
     290                 :     #[error("another task is already setting the deleted_flag, started at {0:?}")]
     291                 :     AlreadyInProgress(NaiveDateTime),
     292                 :     #[error("the deleted_flag was already set, value is {0:?}")]
     293                 :     AlreadyDeleted(NaiveDateTime),
     294                 :     #[error(transparent)]
     295                 :     Other(#[from] anyhow::Error),
     296                 : }
     297                 : 
     298                 : /// A client for accessing a timeline's data in remote storage.
     299                 : ///
     300                 : /// This takes care of managing the number of connections, and balancing them
     301                 : /// across tenants. This also handles retries of failed uploads.
     302                 : ///
     303                 : /// Upload and delete requests are ordered so that before a deletion is
     304                 : /// performed, we wait for all preceding uploads to finish. This ensures sure
     305                 : /// that if you perform a compaction operation that reshuffles data in layer
     306                 : /// files, we don't have a transient state where the old files have already been
     307                 : /// deleted, but new files have not yet been uploaded.
     308                 : ///
     309                 : /// Similarly, this enforces an order between index-file uploads, and layer
     310                 : /// uploads.  Before an index-file upload is performed, all preceding layer
     311                 : /// uploads must be finished.
     312                 : ///
     313                 : /// This also maintains a list of remote files, and automatically includes that
     314                 : /// in the index part file, whenever timeline metadata is uploaded.
     315                 : ///
     316                 : /// Downloads are not queued, they are performed immediately.
     317                 : pub struct RemoteTimelineClient {
     318                 :     conf: &'static PageServerConf,
     319                 : 
     320                 :     runtime: tokio::runtime::Handle,
     321                 : 
     322                 :     tenant_id: TenantId,
     323                 :     timeline_id: TimelineId,
     324                 :     generation: Generation,
     325                 : 
     326                 :     upload_queue: Mutex<UploadQueue>,
     327                 : 
     328                 :     metrics: Arc<RemoteTimelineClientMetrics>,
     329                 : 
     330                 :     storage_impl: GenericRemoteStorage,
     331                 : 
     332                 :     deletion_queue_client: DeletionQueueClient,
     333                 : }
     334                 : 
     335                 : impl RemoteTimelineClient {
     336                 :     ///
     337                 :     /// Create a remote storage client for given timeline
     338                 :     ///
     339                 :     /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
     340                 :     /// by calling init_upload_queue.
     341                 :     ///
     342 CBC        1595 :     pub fn new(
     343            1595 :         remote_storage: GenericRemoteStorage,
     344            1595 :         deletion_queue_client: DeletionQueueClient,
     345            1595 :         conf: &'static PageServerConf,
     346            1595 :         tenant_id: TenantId,
     347            1595 :         timeline_id: TimelineId,
     348            1595 :         generation: Generation,
     349            1595 :     ) -> RemoteTimelineClient {
     350            1595 :         RemoteTimelineClient {
     351            1595 :             conf,
     352            1595 :             runtime: if cfg!(test) {
     353                 :                 // remote_timeline_client.rs tests rely on current-thread runtime
     354             148 :                 tokio::runtime::Handle::current()
     355                 :             } else {
     356            1447 :                 BACKGROUND_RUNTIME.handle().clone()
     357                 :             },
     358            1595 :             tenant_id,
     359            1595 :             timeline_id,
     360            1595 :             generation,
     361            1595 :             storage_impl: remote_storage,
     362            1595 :             deletion_queue_client,
     363            1595 :             upload_queue: Mutex::new(UploadQueue::Uninitialized),
     364            1595 :             metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)),
     365            1595 :         }
     366            1595 :     }
     367                 : 
     368                 :     /// Initialize the upload queue for a remote storage that already received
     369                 :     /// an index file upload, i.e., it's not empty.
     370                 :     /// The given `index_part` must be the one on the remote.
     371             314 :     pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
     372             314 :         let mut upload_queue = self.upload_queue.lock().unwrap();
     373             314 :         upload_queue.initialize_with_current_remote_index_part(index_part)?;
     374             314 :         self.update_remote_physical_size_gauge(Some(index_part));
     375             314 :         info!(
     376             314 :             "initialized upload queue from remote index with {} layer files",
     377             314 :             index_part.layer_metadata.len()
     378             314 :         );
     379             314 :         Ok(())
     380             314 :     }
     381                 : 
     382                 :     /// Initialize the upload queue for the case where the remote storage is empty,
     383                 :     /// i.e., it doesn't have an `IndexPart`.
     384             967 :     pub fn init_upload_queue_for_empty_remote(
     385             967 :         &self,
     386             967 :         local_metadata: &TimelineMetadata,
     387             967 :     ) -> anyhow::Result<()> {
     388             967 :         let mut upload_queue = self.upload_queue.lock().unwrap();
     389             967 :         upload_queue.initialize_empty_remote(local_metadata)?;
     390             967 :         self.update_remote_physical_size_gauge(None);
     391             967 :         info!("initialized upload queue as empty");
     392             967 :         Ok(())
     393             967 :     }
     394                 : 
     395                 :     /// Initialize the queue in stopped state. Used in startup path
     396                 :     /// to continue deletion operation interrupted by pageserver crash or restart.
     397              21 :     pub fn init_upload_queue_stopped_to_continue_deletion(
     398              21 :         &self,
     399              21 :         index_part: &IndexPart,
     400              21 :     ) -> anyhow::Result<()> {
     401                 :         // FIXME: consider newtype for DeletedIndexPart.
     402              21 :         let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
     403              21 :             "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
     404              21 :         ))?;
     405                 : 
     406                 :         {
     407              21 :             let mut upload_queue = self.upload_queue.lock().unwrap();
     408              21 :             upload_queue.initialize_with_current_remote_index_part(index_part)?;
     409              21 :             self.update_remote_physical_size_gauge(Some(index_part));
     410              21 :         }
     411              21 :         // also locks upload queue, without dropping the guard above it will be a deadlock
     412              21 :         self.stop().expect("initialized line above");
     413              21 : 
     414              21 :         let mut upload_queue = self.upload_queue.lock().unwrap();
     415              21 : 
     416              21 :         upload_queue
     417              21 :             .stopped_mut()
     418              21 :             .expect("stopped above")
     419              21 :             .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
     420              21 : 
     421              21 :         Ok(())
     422              21 :     }
     423                 : 
     424            2671 :     pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     425            2671 :         match &mut *self.upload_queue.lock().unwrap() {
     426 UBC           0 :             UploadQueue::Uninitialized => None,
     427 CBC        2509 :             UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
     428             162 :             UploadQueue::Stopped(q) => q
     429             162 :                 .upload_queue_for_deletion
     430             162 :                 .get_last_remote_consistent_lsn_projected(),
     431                 :         }
     432            2671 :     }
     433                 : 
     434          779573 :     pub fn remote_consistent_lsn_visible(&self) -> Option<Lsn> {
     435          779573 :         match &mut *self.upload_queue.lock().unwrap() {
     436 UBC           0 :             UploadQueue::Uninitialized => None,
     437 CBC      779411 :             UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
     438             162 :             UploadQueue::Stopped(q) => Some(
     439             162 :                 q.upload_queue_for_deletion
     440             162 :                     .get_last_remote_consistent_lsn_visible(),
     441             162 :             ),
     442                 :         }
     443          779573 :     }
     444                 : 
     445            7290 :     fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
     446            7290 :         let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
     447            6323 :             current_remote_index_part
     448            6323 :                 .layer_metadata
     449            6323 :                 .values()
     450            6323 :                 // If we don't have the file size for the layer, don't account for it in the metric.
     451          417447 :                 .map(|ilmd| ilmd.file_size)
     452            6323 :                 .sum()
     453                 :         } else {
     454             967 :             0
     455                 :         };
     456            7290 :         self.metrics.remote_physical_size_set(size);
     457            7290 :     }
     458                 : 
     459              12 :     pub fn get_remote_physical_size(&self) -> u64 {
     460              12 :         self.metrics.remote_physical_size_get()
     461              12 :     }
     462                 : 
     463                 :     //
     464                 :     // Download operations.
     465                 :     //
     466                 :     // These don't use the per-timeline queue. They do use the global semaphore in
     467                 :     // S3Bucket, to limit the total number of concurrent operations, though.
     468                 :     //
     469                 : 
     470                 :     /// Download index file
     471             353 :     pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
     472             353 :         let _unfinished_gauge_guard = self.metrics.call_begin(
     473             353 :             &RemoteOpFileKind::Index,
     474             353 :             &RemoteOpKind::Download,
     475             353 :             crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
     476             353 :                 reason: "no need for a downloads gauge",
     477             353 :             },
     478             353 :         );
     479                 : 
     480             353 :         let index_part = download::download_index_part(
     481             353 :             &self.storage_impl,
     482             353 :             &self.tenant_id,
     483             353 :             &self.timeline_id,
     484             353 :             self.generation,
     485             353 :         )
     486             353 :         .measure_remote_op(
     487             353 :             self.tenant_id,
     488             353 :             self.timeline_id,
     489             353 :             RemoteOpFileKind::Index,
     490             353 :             RemoteOpKind::Download,
     491             353 :             Arc::clone(&self.metrics),
     492             353 :         )
     493            1372 :         .await?;
     494                 : 
     495             342 :         if index_part.deleted_at.is_some() {
     496              21 :             Ok(MaybeDeletedIndexPart::Deleted(index_part))
     497                 :         } else {
     498             321 :             Ok(MaybeDeletedIndexPart::IndexPart(index_part))
     499                 :         }
     500             353 :     }
     501                 : 
     502                 :     /// Download a (layer) file from `path`, into local filesystem.
     503                 :     ///
     504                 :     /// 'layer_metadata' is the metadata from the remote index file.
     505                 :     ///
     506                 :     /// On success, returns the size of the downloaded file.
     507            1379 :     pub async fn download_layer_file(
     508            1379 :         &self,
     509            1379 :         layer_file_name: &LayerFileName,
     510            1379 :         layer_metadata: &LayerFileMetadata,
     511            1379 :     ) -> anyhow::Result<u64> {
     512            1370 :         let downloaded_size = {
     513            1379 :             let _unfinished_gauge_guard = self.metrics.call_begin(
     514            1379 :                 &RemoteOpFileKind::Layer,
     515            1379 :                 &RemoteOpKind::Download,
     516            1379 :                 crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
     517            1379 :                     reason: "no need for a downloads gauge",
     518            1379 :                 },
     519            1379 :             );
     520            1379 :             download::download_layer_file(
     521            1379 :                 self.conf,
     522            1379 :                 &self.storage_impl,
     523            1379 :                 self.tenant_id,
     524            1379 :                 self.timeline_id,
     525            1379 :                 layer_file_name,
     526            1379 :                 layer_metadata,
     527            1379 :             )
     528            1379 :             .measure_remote_op(
     529            1379 :                 self.tenant_id,
     530            1379 :                 self.timeline_id,
     531            1379 :                 RemoteOpFileKind::Layer,
     532            1379 :                 RemoteOpKind::Download,
     533            1379 :                 Arc::clone(&self.metrics),
     534            1379 :             )
     535          459967 :             .await?
     536                 :         };
     537                 : 
     538            1370 :         REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
     539            1370 :         REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
     540            1370 : 
     541            1370 :         Ok(downloaded_size)
     542            1376 :     }
     543                 : 
     544                 :     //
     545                 :     // Upload operations.
     546                 :     //
     547                 : 
     548                 :     ///
     549                 :     /// Launch an index-file upload operation in the background, with
     550                 :     /// updated metadata.
     551                 :     ///
     552                 :     /// The upload will be added to the queue immediately, but it
     553                 :     /// won't be performed until all previously scheduled layer file
     554                 :     /// upload operations have completed successfully.  This is to
     555                 :     /// ensure that when the index file claims that layers X, Y and Z
     556                 :     /// exist in remote storage, they really do. To wait for the upload
     557                 :     /// to complete, use `wait_completion`.
     558                 :     ///
     559                 :     /// If there were any changes to the list of files, i.e. if any
     560                 :     /// layer file uploads were scheduled, since the last index file
     561                 :     /// upload, those will be included too.
     562            5719 :     pub fn schedule_index_upload_for_metadata_update(
     563            5719 :         self: &Arc<Self>,
     564            5719 :         metadata: &TimelineMetadata,
     565            5719 :     ) -> anyhow::Result<()> {
     566            5719 :         let mut guard = self.upload_queue.lock().unwrap();
     567            5719 :         let upload_queue = guard.initialized_mut()?;
     568                 : 
     569                 :         // As documented in the struct definition, it's ok for latest_metadata to be
     570                 :         // ahead of what's _actually_ on the remote during index upload.
     571            5719 :         upload_queue.latest_metadata = metadata.clone();
     572            5719 : 
     573            5719 :         self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
     574            5719 : 
     575            5719 :         Ok(())
     576            5719 :     }
     577                 : 
     578                 :     ///
     579                 :     /// Launch an index-file upload operation in the background, if necessary.
     580                 :     ///
     581                 :     /// Use this function to schedule the update of the index file after
     582                 :     /// scheduling file uploads or deletions. If no file uploads or deletions
     583                 :     /// have been scheduled since the last index file upload, this does
     584                 :     /// nothing.
     585                 :     ///
     586                 :     /// Like schedule_index_upload_for_metadata_update(), this merely adds
     587                 :     /// the upload to the upload queue and returns quickly.
     588            1566 :     pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
     589            1566 :         let mut guard = self.upload_queue.lock().unwrap();
     590            1566 :         let upload_queue = guard.initialized_mut()?;
     591                 : 
     592            1566 :         if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
     593              15 :             self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
     594            1551 :         }
     595                 : 
     596            1566 :         Ok(())
     597            1566 :     }
     598                 : 
     599                 :     /// Launch an index-file upload operation in the background (internal function)
     600            6059 :     fn schedule_index_upload(
     601            6059 :         self: &Arc<Self>,
     602            6059 :         upload_queue: &mut UploadQueueInitialized,
     603            6059 :         metadata: TimelineMetadata,
     604            6059 :     ) {
     605            6059 :         info!(
     606            6059 :             "scheduling metadata upload with {} files ({} changed)",
     607            6059 :             upload_queue.latest_files.len(),
     608            6059 :             upload_queue.latest_files_changes_since_metadata_upload_scheduled,
     609            6059 :         );
     610                 : 
     611            6059 :         let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
     612            6059 : 
     613            6059 :         let index_part = IndexPart::new(
     614            6059 :             upload_queue.latest_files.clone(),
     615            6059 :             disk_consistent_lsn,
     616            6059 :             metadata,
     617            6059 :         );
     618            6059 :         let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
     619            6059 :         self.calls_unfinished_metric_begin(&op);
     620            6059 :         upload_queue.queued_operations.push_back(op);
     621            6059 :         upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
     622            6059 : 
     623            6059 :         // Launch the task immediately, if possible
     624            6059 :         self.launch_queued_tasks(upload_queue);
     625            6059 :     }
     626                 : 
     627                 :     ///
     628                 :     /// Launch an upload operation in the background.
     629                 :     ///
     630           18600 :     pub fn schedule_layer_file_upload(
     631           18600 :         self: &Arc<Self>,
     632           18600 :         layer_file_name: &LayerFileName,
     633           18600 :         layer_metadata: &LayerFileMetadata,
     634           18600 :     ) -> anyhow::Result<()> {
     635           18600 :         let mut guard = self.upload_queue.lock().unwrap();
     636           18600 :         let upload_queue = guard.initialized_mut()?;
     637                 : 
     638           18600 :         upload_queue
     639           18600 :             .latest_files
     640           18600 :             .insert(layer_file_name.clone(), layer_metadata.clone());
     641           18600 :         upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
     642           18600 : 
     643           18600 :         let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone());
     644           18600 :         self.calls_unfinished_metric_begin(&op);
     645           18600 :         upload_queue.queued_operations.push_back(op);
     646           18600 : 
     647           18600 :         info!("scheduled layer file upload {layer_file_name}");
     648                 : 
     649                 :         // Launch the task immediately, if possible
     650           18600 :         self.launch_queued_tasks(upload_queue);
     651           18600 :         Ok(())
     652           18600 :     }
     653                 : 
     654                 :     /// Launch a delete operation in the background.
     655                 :     ///
     656                 :     /// The operation does not modify local state but assumes the local files have already been
     657                 :     /// deleted, and is used to mirror those changes to remote.
     658                 :     ///
     659                 :     /// Note: This schedules an index file upload before the deletions.  The
     660                 :     /// deletion won't actually be performed, until any previously scheduled
     661                 :     /// upload operations, and the index file upload, have completed
     662                 :     /// successfully.
     663             639 :     pub fn schedule_layer_file_deletion(
     664             639 :         self: &Arc<Self>,
     665             639 :         names: Vec<LayerFileName>,
     666             639 :     ) -> anyhow::Result<()> {
     667             639 :         let mut guard = self.upload_queue.lock().unwrap();
     668             639 :         let upload_queue = guard.initialized_mut()?;
     669                 : 
     670                 :         // Deleting layers doesn't affect the values stored in TimelineMetadata,
     671                 :         // so we don't need update it. Just serialize it.
     672             639 :         let metadata = upload_queue.latest_metadata.clone();
     673             639 : 
     674             639 :         // Update the remote index file, removing the to-be-deleted files from the index,
     675             639 :         // before deleting the actual files.
     676             639 :         //
     677             639 :         // Once we start removing files from upload_queue.latest_files, there's
     678             639 :         // no going back! Otherwise, some of the files would already be removed
     679             639 :         // from latest_files, but not yet scheduled for deletion. Use a closure
     680             639 :         // to syntactically forbid ? or bail! calls here.
     681             639 :         let no_bail_here = || {
     682             639 :             // Decorate our list of names with each name's generation, dropping
     683             639 :             // makes that are unexpectedly missing from our metadata.
     684             639 :             let with_generations: Vec<_> = names
     685             639 :                 .into_iter()
     686            6649 :                 .filter_map(|name| {
     687            6649 :                     // Remove from latest_files, learning the file's remote generation in the process
     688            6649 :                     let meta = upload_queue.latest_files.remove(&name);
     689                 : 
     690            6649 :                     if let Some(meta) = meta {
     691            6641 :                         upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
     692            6641 :                         Some((name, meta.generation))
     693                 :                     } else {
     694                 :                         // This can only happen if we forgot to to schedule the file upload
     695                 :                         // before scheduling the delete. Log it because it is a rare/strange
     696                 :                         // situation, and in case something is misbehaving, we'd like to know which
     697                 :                         // layers experienced this.
     698               8 :                         info!(
     699               8 :                             "Deleting layer {name} not found in latest_files list, never uploaded?"
     700               8 :                         );
     701               8 :                         None
     702                 :                     }
     703            6649 :                 })
     704             639 :                 .collect();
     705             639 : 
     706             639 :             if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
     707             325 :                 self.schedule_index_upload(upload_queue, metadata);
     708             325 :             }
     709                 : 
     710            7280 :             for (name, gen) in &with_generations {
     711            6641 :                 info!("scheduling deletion of layer {}{}", name, gen.get_suffix());
     712                 :             }
     713                 : 
     714                 :             // schedule the actual deletions
     715             639 :             let op = UploadOp::Delete(Delete {
     716             639 :                 layers: with_generations,
     717             639 :             });
     718             639 :             self.calls_unfinished_metric_begin(&op);
     719             639 :             upload_queue.queued_operations.push_back(op);
     720             639 : 
     721             639 :             // Launch the tasks immediately, if possible
     722             639 :             self.launch_queued_tasks(upload_queue);
     723             639 :         };
     724             639 :         no_bail_here();
     725             639 :         Ok(())
     726             639 :     }
     727                 : 
     728                 :     ///
     729                 :     /// Wait for all previously scheduled uploads/deletions to complete
     730                 :     ///
     731            2365 :     pub async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
     732            2365 :         let mut receiver = {
     733            2365 :             let mut guard = self.upload_queue.lock().unwrap();
     734            2365 :             let upload_queue = guard.initialized_mut()?;
     735            2365 :             self.schedule_barrier(upload_queue)
     736            2365 :         };
     737            2365 : 
     738            2365 :         if receiver.changed().await.is_err() {
     739               1 :             anyhow::bail!("wait_completion aborted because upload queue was stopped");
     740            2359 :         }
     741            2359 :         Ok(())
     742            2360 :     }
     743                 : 
     744            2365 :     fn schedule_barrier(
     745            2365 :         self: &Arc<Self>,
     746            2365 :         upload_queue: &mut UploadQueueInitialized,
     747            2365 :     ) -> tokio::sync::watch::Receiver<()> {
     748            2365 :         let (sender, receiver) = tokio::sync::watch::channel(());
     749            2365 :         let barrier_op = UploadOp::Barrier(sender);
     750            2365 : 
     751            2365 :         upload_queue.queued_operations.push_back(barrier_op);
     752            2365 :         // Don't count this kind of operation!
     753            2365 : 
     754            2365 :         // Launch the task immediately, if possible
     755            2365 :         self.launch_queued_tasks(upload_queue);
     756            2365 : 
     757            2365 :         receiver
     758            2365 :     }
     759                 : 
     760                 :     /// Set the deleted_at field in the remote index file.
     761                 :     ///
     762                 :     /// This fails if the upload queue has not been `stop()`ed.
     763                 :     ///
     764                 :     /// The caller is responsible for calling `stop()` AND for waiting
     765                 :     /// for any ongoing upload tasks to finish after `stop()` has succeeded.
     766                 :     /// Check method [`RemoteTimelineClient::stop`] for details.
     767             846 :     #[instrument(skip_all)]
     768                 :     pub(crate) async fn persist_index_part_with_deleted_flag(
     769                 :         self: &Arc<Self>,
     770                 :     ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
     771                 :         let index_part_with_deleted_at = {
     772                 :             let mut locked = self.upload_queue.lock().unwrap();
     773                 : 
     774                 :             // We must be in stopped state because otherwise
     775                 :             // we can have inprogress index part upload that can overwrite the file
     776                 :             // with missing is_deleted flag that we going to set below
     777                 :             let stopped = locked.stopped_mut()?;
     778                 : 
     779                 :             match stopped.deleted_at {
     780                 :                 SetDeletedFlagProgress::NotRunning => (), // proceed
     781                 :                 SetDeletedFlagProgress::InProgress(at) => {
     782                 :                     return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
     783                 :                 }
     784                 :                 SetDeletedFlagProgress::Successful(at) => {
     785                 :                     return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
     786                 :                 }
     787                 :             };
     788                 :             let deleted_at = Utc::now().naive_utc();
     789                 :             stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
     790                 : 
     791                 :             let mut index_part = IndexPart::try_from(&stopped.upload_queue_for_deletion)
     792                 :                 .context("IndexPart serialize")?;
     793                 :             index_part.deleted_at = Some(deleted_at);
     794                 :             index_part
     795                 :         };
     796                 : 
     797 UBC           0 :         let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
     798               0 :             let mut locked = self_clone.upload_queue.lock().unwrap();
     799               0 :             let stopped = locked
     800               0 :                 .stopped_mut()
     801               0 :                 .expect("there's no way out of Stopping, and we checked it's Stopping above");
     802               0 :             stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
     803               0 :         });
     804                 : 
     805 CBC         195 :         pausable_failpoint!("persist_deleted_index_part");
     806                 : 
     807                 :         backoff::retry(
     808             256 :             || {
     809             256 :                 upload::upload_index_part(
     810             256 :                     &self.storage_impl,
     811             256 :                     &self.tenant_id,
     812             256 :                     &self.timeline_id,
     813             256 :                     self.generation,
     814             256 :                     &index_part_with_deleted_at,
     815             256 :                 )
     816             256 :             },
     817              61 :             |_e| false,
     818                 :             1,
     819                 :             // have just a couple of attempts
     820                 :             // when executed as part of timeline deletion this happens in context of api call
     821                 :             // when executed as part of tenant deletion this happens in the background
     822                 :             2,
     823                 :             "persist_index_part_with_deleted_flag",
     824                 :             // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
     825 UBC           0 :             backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
     826                 :         )
     827                 :         .await?;
     828                 : 
     829                 :         // all good, disarm the guard and mark as success
     830                 :         ScopeGuard::into_inner(undo_deleted_at);
     831                 :         {
     832                 :             let mut locked = self.upload_queue.lock().unwrap();
     833                 : 
     834                 :             let stopped = locked
     835                 :                 .stopped_mut()
     836                 :                 .expect("there's no way out of Stopping, and we checked it's Stopping above");
     837                 :             stopped.deleted_at = SetDeletedFlagProgress::Successful(
     838                 :                 index_part_with_deleted_at
     839                 :                     .deleted_at
     840                 :                     .expect("we set it above"),
     841                 :             );
     842                 :         }
     843                 : 
     844                 :         Ok(())
     845                 :     }
     846                 : 
     847                 :     /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
     848                 :     /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
     849                 :     /// deletes leaked files if any and proceeds with deletion of index file at the end.
     850 CBC         214 :     pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
     851             214 :         debug_assert_current_span_has_tenant_and_timeline_id();
     852                 : 
     853             214 :         let layers: Vec<RemotePath> = {
     854             214 :             let mut locked = self.upload_queue.lock().unwrap();
     855             214 :             let stopped = locked.stopped_mut()?;
     856                 : 
     857             214 :             if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
     858 UBC           0 :                 anyhow::bail!("deleted_at is not set")
     859 CBC         214 :             }
     860                 : 
     861             214 :             debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
     862                 : 
     863             214 :             stopped
     864             214 :                 .upload_queue_for_deletion
     865             214 :                 .latest_files
     866             214 :                 .drain()
     867            4663 :                 .map(|(file_name, meta)| {
     868            4663 :                     remote_layer_path(
     869            4663 :                         &self.tenant_id,
     870            4663 :                         &self.timeline_id,
     871            4663 :                         &file_name,
     872            4663 :                         meta.generation,
     873            4663 :                     )
     874            4663 :                 })
     875             214 :                 .collect()
     876             214 :         };
     877             214 : 
     878             214 :         let layer_deletion_count = layers.len();
     879             214 :         self.deletion_queue_client.push_immediate(layers).await?;
     880                 : 
     881                 :         // Do not delete index part yet, it is needed for possible retry. If we remove it first
     882                 :         // and retry will arrive to different pageserver there wont be any traces of it on remote storage
     883             214 :         let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id);
     884             214 : 
     885             214 :         // Execute all pending deletions, so that when we proceed to do a list_prefixes below, we aren't
     886             214 :         // taking the burden of listing all the layers that we already know we should delete.
     887             214 :         self.deletion_queue_client.flush_immediate().await?;
     888                 : 
     889             214 :         let remaining = backoff::retry(
     890             278 :             || async {
     891             278 :                 self.storage_impl
     892             278 :                     .list_files(Some(&timeline_storage_path))
     893             824 :                     .await
     894             278 :             },
     895             214 :             |_e| false,
     896             214 :             FAILED_DOWNLOAD_WARN_THRESHOLD,
     897             214 :             FAILED_REMOTE_OP_RETRIES,
     898             214 :             "list_prefixes",
     899             214 :             backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")),
     900             214 :         )
     901             824 :         .await
     902             214 :         .context("list prefixes")?;
     903                 : 
     904                 :         // We will delete the current index_part object last, since it acts as a deletion
     905                 :         // marker via its deleted_at attribute
     906             214 :         let latest_index = remaining
     907             214 :             .iter()
     908            1296 :             .filter(|p| {
     909            1296 :                 p.object_name()
     910            1296 :                     .map(|n| n.starts_with(IndexPart::FILE_NAME))
     911            1296 :                     .unwrap_or(false)
     912            1296 :             })
     913             214 :             .filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen)))
     914             214 :             .max_by_key(|i| i.1)
     915             214 :             .map(|i| i.0.clone())
     916             214 :             .unwrap_or(
     917             214 :                 // No generation-suffixed indices, assume we are dealing with
     918             214 :                 // a legacy index.
     919             214 :                 remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()),
     920             214 :             );
     921             214 : 
     922             214 :         let remaining_layers: Vec<RemotePath> = remaining
     923             214 :             .into_iter()
     924            1296 :             .filter(|p| p!= &latest_index)
     925             214 :             .inspect(|path| {
     926            1090 :                 if let Some(name) = path.object_name() {
     927            1090 :                     info!(%name, "deleting a file not referenced from index_part.json");
     928                 :                 } else {
     929 UBC           0 :                     warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
     930                 :                 }
     931 CBC        1090 :             })
     932             214 :             .collect();
     933             214 : 
     934             214 :         let not_referenced_count = remaining_layers.len();
     935             214 :         if !remaining_layers.is_empty() {
     936              52 :             self.deletion_queue_client
     937              52 :                 .push_immediate(remaining_layers)
     938 UBC           0 :                 .await?;
     939 CBC         162 :         }
     940                 : 
     941             214 :         fail::fail_point!("timeline-delete-before-index-delete", |_| {
     942              12 :             Err(anyhow::anyhow!(
     943              12 :                 "failpoint: timeline-delete-before-index-delete"
     944              12 :             ))?
     945             214 :         });
     946                 : 
     947 UBC           0 :         debug!("enqueuing index part deletion");
     948 CBC         202 :         self.deletion_queue_client
     949             202 :             .push_immediate([latest_index].to_vec())
     950 UBC           0 :             .await?;
     951                 : 
     952                 :         // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
     953                 :         // for a flush to a persistent deletion list so that we may be sure deletion will occur.
     954 CBC         202 :         self.deletion_queue_client.flush_immediate().await?;
     955                 : 
     956             202 :         fail::fail_point!("timeline-delete-after-index-delete", |_| {
     957               4 :             Err(anyhow::anyhow!(
     958               4 :                 "failpoint: timeline-delete-after-index-delete"
     959               4 :             ))?
     960             202 :         });
     961                 : 
     962             198 :         info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
     963                 : 
     964             198 :         Ok(())
     965             214 :     }
     966                 : 
     967                 :     ///
     968                 :     /// Pick next tasks from the queue, and start as many of them as possible without violating
     969                 :     /// the ordering constraints.
     970                 :     ///
     971                 :     /// The caller needs to already hold the `upload_queue` lock.
     972           51336 :     fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
     973           78876 :         while let Some(next_op) = upload_queue.queued_operations.front() {
     974                 :             // Can we run this task now?
     975           52694 :             let can_run_now = match next_op {
     976                 :                 UploadOp::UploadLayer(_, _) => {
     977                 :                     // Can always be scheduled.
     978           18593 :                     true
     979                 :                 }
     980                 :                 UploadOp::UploadMetadata(_, _) => {
     981                 :                     // These can only be performed after all the preceding operations
     982                 :                     // have finished.
     983           27392 :                     upload_queue.inprogress_tasks.is_empty()
     984                 :                 }
     985                 :                 UploadOp::Delete(_) => {
     986                 :                     // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
     987             891 :                     upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
     988                 :                 }
     989                 : 
     990            5818 :                 UploadOp::Barrier(_) => upload_queue.inprogress_tasks.is_empty(),
     991                 :             };
     992                 : 
     993                 :             // If we cannot launch this task, don't look any further.
     994                 :             //
     995                 :             // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
     996                 :             // them now, but we don't try to do that currently.  For example, if the frontmost task
     997                 :             // is an index-file upload that cannot proceed until preceding uploads have finished, we
     998                 :             // could still start layer uploads that were scheduled later.
     999           52694 :             if !can_run_now {
    1000           25154 :                 break;
    1001           27540 :             }
    1002           27540 : 
    1003           27540 :             // We can launch this task. Remove it from the queue first.
    1004           27540 :             let next_op = upload_queue.queued_operations.pop_front().unwrap();
    1005           27540 : 
    1006           27540 :             debug!("starting op: {}", next_op);
    1007                 : 
    1008                 :             // Update the counters
    1009           27540 :             match next_op {
    1010           18593 :                 UploadOp::UploadLayer(_, _) => {
    1011           18593 :                     upload_queue.num_inprogress_layer_uploads += 1;
    1012           18593 :                 }
    1013            6000 :                 UploadOp::UploadMetadata(_, _) => {
    1014            6000 :                     upload_queue.num_inprogress_metadata_uploads += 1;
    1015            6000 :                 }
    1016             588 :                 UploadOp::Delete(_) => {
    1017             588 :                     upload_queue.num_inprogress_deletions += 1;
    1018             588 :                 }
    1019            2359 :                 UploadOp::Barrier(sender) => {
    1020            2359 :                     sender.send_replace(());
    1021            2359 :                     continue;
    1022                 :                 }
    1023                 :             };
    1024                 : 
    1025                 :             // Assign unique ID to this task
    1026           25181 :             upload_queue.task_counter += 1;
    1027           25181 :             let upload_task_id = upload_queue.task_counter;
    1028           25181 : 
    1029           25181 :             // Add it to the in-progress map
    1030           25181 :             let task = Arc::new(UploadTask {
    1031           25181 :                 task_id: upload_task_id,
    1032           25181 :                 op: next_op,
    1033           25181 :                 retries: AtomicU32::new(0),
    1034           25181 :             });
    1035           25181 :             upload_queue
    1036           25181 :                 .inprogress_tasks
    1037           25181 :                 .insert(task.task_id, Arc::clone(&task));
    1038           25181 : 
    1039           25181 :             // Spawn task to perform the task
    1040           25181 :             let self_rc = Arc::clone(self);
    1041           25181 :             let tenant_id = self.tenant_id;
    1042           25181 :             let timeline_id = self.timeline_id;
    1043           25181 :             task_mgr::spawn(
    1044           25181 :                 &self.runtime,
    1045           25181 :                 TaskKind::RemoteUploadTask,
    1046           25181 :                 Some(self.tenant_id),
    1047           25181 :                 Some(self.timeline_id),
    1048           25181 :                 "remote upload",
    1049                 :                 false,
    1050           25173 :                 async move {
    1051         3273479 :                     self_rc.perform_upload_task(task).await;
    1052           25091 :                     Ok(())
    1053           25091 :                 }
    1054           25181 :                 .instrument(info_span!(parent: None, "remote_upload", %tenant_id, %timeline_id, %upload_task_id)),
    1055                 :             );
    1056                 : 
    1057                 :             // Loop back to process next task
    1058                 :         }
    1059           51336 :     }
    1060                 : 
    1061                 :     ///
    1062                 :     /// Perform an upload task.
    1063                 :     ///
    1064                 :     /// The task is in the `inprogress_tasks` list. This function will try to
    1065                 :     /// execute it, retrying forever. On successful completion, the task is
    1066                 :     /// removed it from the `inprogress_tasks` list, and any next task(s) in the
    1067                 :     /// queue that were waiting by the completion are launched.
    1068                 :     ///
    1069                 :     /// The task can be shut down, however. That leads to stopping the whole
    1070                 :     /// queue.
    1071                 :     ///
    1072           25173 :     async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
    1073                 :         // Loop to retry until it completes.
    1074           27817 :         loop {
    1075           27817 :             // If we're requested to shut down, close up shop and exit.
    1076           27817 :             //
    1077           27817 :             // Note: We only check for the shutdown requests between retries, so
    1078           27817 :             // if a shutdown request arrives while we're busy uploading, in the
    1079           27817 :             // upload::upload:*() call below, we will wait not exit until it has
    1080           27817 :             // finished. We probably could cancel the upload by simply dropping
    1081           27817 :             // the Future, but we're not 100% sure if the remote storage library
    1082           27817 :             // is cancellation safe, so we don't dare to do that. Hopefully, the
    1083           27817 :             // upload finishes or times out soon enough.
    1084           27817 :             if task_mgr::is_shutdown_requested() {
    1085             136 :                 info!("upload task cancelled by shutdown request");
    1086             136 :                 match self.stop() {
    1087             136 :                     Ok(()) => {}
    1088                 :                     Err(StopError::QueueUninitialized) => {
    1089 UBC           0 :                         unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
    1090                 :                     }
    1091                 :                 }
    1092 CBC         136 :                 return;
    1093           27681 :             }
    1094                 : 
    1095           27681 :             let upload_result: anyhow::Result<()> = match &task.op {
    1096           20377 :                 UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
    1097           20377 :                     let path = self
    1098           20377 :                         .conf
    1099           20377 :                         .timeline_path(&self.tenant_id, &self.timeline_id)
    1100           20377 :                         .join(layer_file_name.file_name());
    1101           20377 : 
    1102           20377 :                     upload::upload_timeline_layer(
    1103           20377 :                         self.conf,
    1104           20377 :                         &self.storage_impl,
    1105           20377 :                         &path,
    1106           20377 :                         layer_metadata,
    1107           20377 :                         self.generation,
    1108           20377 :                     )
    1109           20377 :                     .measure_remote_op(
    1110           20377 :                         self.tenant_id,
    1111           20377 :                         self.timeline_id,
    1112           20377 :                         RemoteOpFileKind::Layer,
    1113           20377 :                         RemoteOpKind::Upload,
    1114           20377 :                         Arc::clone(&self.metrics),
    1115           20377 :                     )
    1116         3246491 :                     .await
    1117                 :                 }
    1118            6718 :                 UploadOp::UploadMetadata(ref index_part, _lsn) => {
    1119            6718 :                     let mention_having_future_layers = if cfg!(feature = "testing") {
    1120            6718 :                         index_part
    1121            6718 :                             .layer_metadata
    1122            6718 :                             .keys()
    1123          415960 :                             .any(|x| x.is_in_future(*_lsn))
    1124                 :                     } else {
    1125 UBC           0 :                         false
    1126                 :                     };
    1127                 : 
    1128 CBC        6718 :                     let res = upload::upload_index_part(
    1129            6718 :                         &self.storage_impl,
    1130            6718 :                         &self.tenant_id,
    1131            6718 :                         &self.timeline_id,
    1132            6718 :                         self.generation,
    1133            6718 :                         index_part,
    1134            6718 :                     )
    1135            6718 :                     .measure_remote_op(
    1136            6718 :                         self.tenant_id,
    1137            6718 :                         self.timeline_id,
    1138            6718 :                         RemoteOpFileKind::Index,
    1139            6718 :                         RemoteOpKind::Upload,
    1140            6718 :                         Arc::clone(&self.metrics),
    1141            6718 :                     )
    1142           26464 :                     .await;
    1143            6713 :                     if res.is_ok() {
    1144            5988 :                         self.update_remote_physical_size_gauge(Some(index_part));
    1145            5988 :                         if mention_having_future_layers {
    1146                 :                             // find rationale near crate::tenant::timeline::init::cleanup_future_layer
    1147              12 :                             tracing::info!(disk_consistent_lsn=%_lsn, "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup");
    1148            5976 :                         }
    1149             725 :                     }
    1150            6713 :                     res
    1151                 :                 }
    1152             586 :                 UploadOp::Delete(delete) => self
    1153             586 :                     .deletion_queue_client
    1154             586 :                     .push_layers(
    1155             586 :                         self.tenant_id,
    1156             586 :                         self.timeline_id,
    1157             586 :                         self.generation,
    1158             586 :                         delete.layers.clone(),
    1159             586 :                     )
    1160             518 :                     .await
    1161             586 :                     .map_err(|e| anyhow::anyhow!(e)),
    1162                 :                 UploadOp::Barrier(_) => {
    1163                 :                     // unreachable. Barrier operations are handled synchronously in
    1164                 :                     // launch_queued_tasks
    1165 UBC           0 :                     warn!("unexpected Barrier operation in perform_upload_task");
    1166               0 :                     break;
    1167                 :                 }
    1168                 :             };
    1169                 : 
    1170 CBC       27600 :             match upload_result {
    1171                 :                 Ok(()) => {
    1172           24955 :                     break;
    1173                 :                 }
    1174            2645 :                 Err(e) => {
    1175            2645 :                     let retries = task.retries.fetch_add(1, Ordering::SeqCst);
    1176            2645 : 
    1177            2645 :                     // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
    1178            2645 :                     // or other external reasons. Such issues are relatively regular, so log them
    1179            2645 :                     // at info level at first, and only WARN if the operation fails repeatedly.
    1180            2645 :                     //
    1181            2645 :                     // (See similar logic for downloads in `download::download_retry`)
    1182            2645 :                     if retries < FAILED_UPLOAD_WARN_THRESHOLD {
    1183            2644 :                         info!(
    1184            2644 :                             "failed to perform remote task {}, will retry (attempt {}): {:#}",
    1185            2644 :                             task.op, retries, e
    1186            2644 :                         );
    1187                 :                     } else {
    1188               1 :                         warn!(
    1189               1 :                             "failed to perform remote task {}, will retry (attempt {}): {:?}",
    1190               1 :                             task.op, retries, e
    1191               1 :                         );
    1192                 :                     }
    1193                 : 
    1194                 :                     // sleep until it's time to retry, or we're cancelled
    1195            2645 :                     exponential_backoff(
    1196            2645 :                         retries,
    1197            2645 :                         DEFAULT_BASE_BACKOFF_SECONDS,
    1198            2645 :                         DEFAULT_MAX_BACKOFF_SECONDS,
    1199            2645 :                         &shutdown_token(),
    1200            2645 :                     )
    1201               6 :                     .await;
    1202                 :                 }
    1203                 :             }
    1204                 :         }
    1205                 : 
    1206           24955 :         let retries = task.retries.load(Ordering::SeqCst);
    1207           24955 :         if retries > 0 {
    1208            2501 :             info!(
    1209            2501 :                 "remote task {} completed successfully after {} retries",
    1210            2501 :                 task.op, retries
    1211            2501 :             );
    1212                 :         } else {
    1213 UBC           0 :             debug!("remote task {} completed successfully", task.op);
    1214                 :         }
    1215                 : 
    1216                 :         // The task has completed successfully. Remove it from the in-progress list.
    1217 CBC       23673 :         let lsn_update = {
    1218           24955 :             let mut upload_queue_guard = self.upload_queue.lock().unwrap();
    1219           24955 :             let upload_queue = match upload_queue_guard.deref_mut() {
    1220 UBC           0 :                 UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
    1221 CBC        1282 :                 UploadQueue::Stopped(_stopped) => {
    1222            1282 :                     None
    1223                 :                 },
    1224           23673 :                 UploadQueue::Initialized(qi) => { Some(qi) }
    1225                 :             };
    1226                 : 
    1227           24955 :             let upload_queue = match upload_queue {
    1228           23673 :                 Some(upload_queue) => upload_queue,
    1229                 :                 None => {
    1230            1282 :                     info!("another concurrent task already stopped the queue");
    1231            1282 :                     return;
    1232                 :                 }
    1233                 :             };
    1234                 : 
    1235           23673 :             upload_queue.inprogress_tasks.remove(&task.task_id);
    1236                 : 
    1237           23673 :             let lsn_update = match task.op {
    1238                 :                 UploadOp::UploadLayer(_, _) => {
    1239           17123 :                     upload_queue.num_inprogress_layer_uploads -= 1;
    1240           17123 :                     None
    1241                 :                 }
    1242            5975 :                 UploadOp::UploadMetadata(_, lsn) => {
    1243            5975 :                     upload_queue.num_inprogress_metadata_uploads -= 1;
    1244            5975 :                     // XXX monotonicity check?
    1245            5975 : 
    1246            5975 :                     upload_queue.projected_remote_consistent_lsn = Some(lsn);
    1247            5975 :                     if self.generation.is_none() {
    1248                 :                         // Legacy mode: skip validating generation
    1249            5547 :                         upload_queue.visible_remote_consistent_lsn.store(lsn);
    1250            5547 :                         None
    1251                 :                     } else {
    1252             428 :                         Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
    1253                 :                     }
    1254                 :                 }
    1255                 :                 UploadOp::Delete(_) => {
    1256             575 :                     upload_queue.num_inprogress_deletions -= 1;
    1257             575 :                     None
    1258                 :                 }
    1259 UBC           0 :                 UploadOp::Barrier(_) => unreachable!(),
    1260                 :             };
    1261                 : 
    1262                 :             // Launch any queued tasks that were unblocked by this one.
    1263 CBC       23673 :             self.launch_queued_tasks(upload_queue);
    1264           23673 :             lsn_update
    1265                 :         };
    1266                 : 
    1267           23673 :         if let Some((lsn, slot)) = lsn_update {
    1268                 :             // Updates to the remote_consistent_lsn we advertise to pageservers
    1269                 :             // are all routed through the DeletionQueue, to enforce important
    1270                 :             // data safety guarantees (see docs/rfcs/025-generation-numbers.md)
    1271             428 :             self.deletion_queue_client
    1272             428 :                 .update_remote_consistent_lsn(
    1273             428 :                     self.tenant_id,
    1274             428 :                     self.timeline_id,
    1275             428 :                     self.generation,
    1276             428 :                     lsn,
    1277             428 :                     slot,
    1278             428 :                 )
    1279 UBC           0 :                 .await;
    1280 CBC       23245 :         }
    1281                 : 
    1282           23673 :         self.calls_unfinished_metric_end(&task.op);
    1283           25091 :     }
    1284                 : 
    1285           49074 :     fn calls_unfinished_metric_impl(
    1286           49074 :         &self,
    1287           49074 :         op: &UploadOp,
    1288           49074 :     ) -> Option<(
    1289           49074 :         RemoteOpFileKind,
    1290           49074 :         RemoteOpKind,
    1291           49074 :         RemoteTimelineClientMetricsCallTrackSize,
    1292           49074 :     )> {
    1293                 :         use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
    1294           49074 :         let res = match op {
    1295           35730 :             UploadOp::UploadLayer(_, m) => (
    1296           35730 :                 RemoteOpFileKind::Layer,
    1297           35730 :                 RemoteOpKind::Upload,
    1298           35730 :                 RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
    1299           35730 :             ),
    1300           12080 :             UploadOp::UploadMetadata(_, _) => (
    1301           12080 :                 RemoteOpFileKind::Index,
    1302           12080 :                 RemoteOpKind::Upload,
    1303           12080 :                 DontTrackSize {
    1304           12080 :                     reason: "metadata uploads are tiny",
    1305           12080 :                 },
    1306           12080 :             ),
    1307            1263 :             UploadOp::Delete(_delete) => (
    1308            1263 :                 RemoteOpFileKind::Layer,
    1309            1263 :                 RemoteOpKind::Delete,
    1310            1263 :                 DontTrackSize {
    1311            1263 :                     reason: "should we track deletes? positive or negative sign?",
    1312            1263 :                 },
    1313            1263 :             ),
    1314                 :             UploadOp::Barrier(_) => {
    1315                 :                 // we do not account these
    1316               1 :                 return None;
    1317                 :             }
    1318                 :         };
    1319           49073 :         Some(res)
    1320           49074 :     }
    1321                 : 
    1322           25298 :     fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
    1323           25298 :         let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
    1324           25298 :             Some(x) => x,
    1325 UBC           0 :             None => return,
    1326                 :         };
    1327 CBC       25298 :         let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
    1328           25298 :         guard.will_decrement_manually(); // in unfinished_ops_metric_end()
    1329           25298 :     }
    1330                 : 
    1331           23776 :     fn calls_unfinished_metric_end(&self, op: &UploadOp) {
    1332           23776 :         let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
    1333           23775 :             Some(x) => x,
    1334               1 :             None => return,
    1335                 :         };
    1336           23775 :         self.metrics.call_end(&file_kind, &op_kind, track_bytes);
    1337           23776 :     }
    1338                 : 
    1339                 :     /// Close the upload queue for new operations and cancel queued operations.
    1340                 :     /// In-progress operations will still be running after this function returns.
    1341                 :     /// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
    1342                 :     /// to wait for them to complete, after calling this function.
    1343             382 :     pub fn stop(&self) -> Result<(), StopError> {
    1344             382 :         // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
    1345             382 :         // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
    1346             382 :         // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
    1347             382 :         let mut guard = self.upload_queue.lock().unwrap();
    1348             382 :         match &mut *guard {
    1349 UBC           0 :             UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
    1350                 :             UploadQueue::Stopped(_) => {
    1351                 :                 // nothing to do
    1352 CBC         160 :                 info!("another concurrent task already shut down the queue");
    1353             160 :                 Ok(())
    1354                 :             }
    1355             222 :             UploadQueue::Initialized(initialized) => {
    1356             222 :                 info!("shutting down upload queue");
    1357                 : 
    1358                 :                 // Replace the queue with the Stopped state, taking ownership of the old
    1359                 :                 // Initialized queue. We will do some checks on it, and then drop it.
    1360             222 :                 let qi = {
    1361                 :                     // Here we preserve working version of the upload queue for possible use during deletions.
    1362                 :                     // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
    1363                 :                     // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
    1364                 :                     // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
    1365             222 :                     let upload_queue_for_deletion = UploadQueueInitialized {
    1366             222 :                         task_counter: 0,
    1367             222 :                         latest_files: initialized.latest_files.clone(),
    1368             222 :                         latest_files_changes_since_metadata_upload_scheduled: 0,
    1369             222 :                         latest_metadata: initialized.latest_metadata.clone(),
    1370             222 :                         projected_remote_consistent_lsn: None,
    1371             222 :                         visible_remote_consistent_lsn: initialized
    1372             222 :                             .visible_remote_consistent_lsn
    1373             222 :                             .clone(),
    1374             222 :                         num_inprogress_layer_uploads: 0,
    1375             222 :                         num_inprogress_metadata_uploads: 0,
    1376             222 :                         num_inprogress_deletions: 0,
    1377             222 :                         inprogress_tasks: HashMap::default(),
    1378             222 :                         queued_operations: VecDeque::default(),
    1379             222 :                     };
    1380             222 : 
    1381             222 :                     let upload_queue = std::mem::replace(
    1382             222 :                         &mut *guard,
    1383             222 :                         UploadQueue::Stopped(UploadQueueStopped {
    1384             222 :                             upload_queue_for_deletion,
    1385             222 :                             deleted_at: SetDeletedFlagProgress::NotRunning,
    1386             222 :                         }),
    1387             222 :                     );
    1388             222 :                     if let UploadQueue::Initialized(qi) = upload_queue {
    1389             222 :                         qi
    1390                 :                     } else {
    1391 UBC           0 :                         unreachable!("we checked in the match above that it is Initialized");
    1392                 :                     }
    1393                 :                 };
    1394                 : 
    1395                 :                 // consistency check
    1396 CBC         222 :                 assert_eq!(
    1397             222 :                     qi.num_inprogress_layer_uploads
    1398             222 :                         + qi.num_inprogress_metadata_uploads
    1399             222 :                         + qi.num_inprogress_deletions,
    1400             222 :                     qi.inprogress_tasks.len()
    1401             222 :                 );
    1402                 : 
    1403                 :                 // We don't need to do anything here for in-progress tasks. They will finish
    1404                 :                 // on their own, decrement the unfinished-task counter themselves, and observe
    1405                 :                 // that the queue is Stopped.
    1406             222 :                 drop(qi.inprogress_tasks);
    1407                 : 
    1408                 :                 // Tear down queued ops
    1409             222 :                 for op in qi.queued_operations.into_iter() {
    1410             103 :                     self.calls_unfinished_metric_end(&op);
    1411             103 :                     // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
    1412             103 :                     // which is exactly what we want to happen.
    1413             103 :                     drop(op);
    1414             103 :                 }
    1415                 : 
    1416                 :                 // We're done.
    1417             222 :                 drop(guard);
    1418             222 :                 Ok(())
    1419                 :             }
    1420                 :         }
    1421             382 :     }
    1422                 : 
    1423             591 :     pub(crate) fn get_layer_metadata(
    1424             591 :         &self,
    1425             591 :         name: &LayerFileName,
    1426             591 :     ) -> anyhow::Result<Option<LayerFileMetadata>> {
    1427             591 :         self.upload_queue.lock().unwrap().get_layer_metadata(name)
    1428             591 :     }
    1429                 : }
    1430                 : 
    1431             328 : pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
    1432             328 :     let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
    1433             328 :     RemotePath::from_string(&path).expect("Failed to construct path")
    1434             328 : }
    1435                 : 
    1436             280 : pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
    1437             280 :     remote_timelines_path(tenant_id).join(Utf8Path::new(&timeline_id.to_string()))
    1438             280 : }
    1439                 : 
    1440           11553 : pub fn remote_layer_path(
    1441           11553 :     tenant_id: &TenantId,
    1442           11553 :     timeline_id: &TimelineId,
    1443           11553 :     layer_file_name: &LayerFileName,
    1444           11553 :     generation: Generation,
    1445           11553 : ) -> RemotePath {
    1446           11553 :     // Generation-aware key format
    1447           11553 :     let path = format!(
    1448           11553 :         "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
    1449           11553 :         layer_file_name.file_name(),
    1450           11553 :         generation.get_suffix()
    1451           11553 :     );
    1452           11553 : 
    1453           11553 :     RemotePath::from_string(&path).expect("Failed to construct path")
    1454           11553 : }
    1455                 : 
    1456            7573 : pub fn remote_index_path(
    1457            7573 :     tenant_id: &TenantId,
    1458            7573 :     timeline_id: &TimelineId,
    1459            7573 :     generation: Generation,
    1460            7573 : ) -> RemotePath {
    1461            7573 :     RemotePath::from_string(&format!(
    1462            7573 :         "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
    1463            7573 :         IndexPart::FILE_NAME,
    1464            7573 :         generation.get_suffix()
    1465            7573 :     ))
    1466            7573 :     .expect("Failed to construct path")
    1467            7573 : }
    1468                 : 
    1469                 : /// Given the key of an index, parse out the generation part of the name
    1470             221 : pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
    1471             221 :     let file_name = match path.get_path().file_name() {
    1472             221 :         Some(f) => f,
    1473                 :         None => {
    1474                 :             // Unexpected: we should be seeing index_part.json paths only
    1475 UBC           0 :             tracing::warn!("Malformed index key {}", path);
    1476               0 :             return None;
    1477                 :         }
    1478                 :     };
    1479                 : 
    1480 CBC         221 :     match file_name.split_once('-') {
    1481              11 :         Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
    1482             210 :         None => None,
    1483                 :     }
    1484             221 : }
    1485                 : 
    1486                 : /// Files on the remote storage are stored with paths, relative to the workdir.
    1487                 : /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
    1488                 : ///
    1489                 : /// Errors if the path provided does not start from pageserver's workdir.
    1490           20372 : pub fn remote_path(
    1491           20372 :     conf: &PageServerConf,
    1492           20372 :     local_path: &Utf8Path,
    1493           20372 :     generation: Generation,
    1494           20372 : ) -> anyhow::Result<RemotePath> {
    1495           20372 :     let stripped = local_path
    1496           20372 :         .strip_prefix(&conf.workdir)
    1497           20372 :         .context("Failed to strip workdir prefix")?;
    1498                 : 
    1499           20372 :     let suffixed = format!("{0}{1}", stripped, generation.get_suffix());
    1500           20372 : 
    1501           20372 :     RemotePath::new(Utf8Path::new(&suffixed)).with_context(|| {
    1502 UBC           0 :         format!(
    1503               0 :             "to resolve remote part of path {:?} for base {:?}",
    1504               0 :             local_path, conf.workdir
    1505               0 :         )
    1506 CBC       20372 :     })
    1507           20372 : }
    1508                 : 
    1509                 : #[cfg(test)]
    1510                 : mod tests {
    1511                 :     use super::*;
    1512                 :     use crate::{
    1513                 :         context::RequestContext,
    1514                 :         tenant::{
    1515                 :             harness::{TenantHarness, TIMELINE_ID},
    1516                 :             Generation, Tenant, Timeline,
    1517                 :         },
    1518                 :         DEFAULT_PG_VERSION,
    1519                 :     };
    1520                 : 
    1521                 :     use std::collections::HashSet;
    1522                 :     use utils::lsn::Lsn;
    1523                 : 
    1524               4 :     pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
    1525               4 :         format!("contents for {name}").into()
    1526               4 :     }
    1527                 : 
    1528               1 :     pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
    1529               1 :         let metadata = TimelineMetadata::new(
    1530               1 :             disk_consistent_lsn,
    1531               1 :             None,
    1532               1 :             None,
    1533               1 :             Lsn(0),
    1534               1 :             Lsn(0),
    1535               1 :             Lsn(0),
    1536               1 :             // Any version will do
    1537               1 :             // but it should be consistent with the one in the tests
    1538               1 :             crate::DEFAULT_PG_VERSION,
    1539               1 :         );
    1540               1 : 
    1541               1 :         // go through serialize + deserialize to fix the header, including checksum
    1542               1 :         TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
    1543               1 :     }
    1544                 : 
    1545               1 :     fn assert_file_list(a: &HashSet<LayerFileName>, b: &[&str]) {
    1546               3 :         let mut avec: Vec<String> = a.iter().map(|x| x.file_name()).collect();
    1547               1 :         avec.sort();
    1548               1 : 
    1549               1 :         let mut bvec = b.to_vec();
    1550               1 :         bvec.sort_unstable();
    1551               1 : 
    1552               1 :         assert_eq!(avec, bvec);
    1553               1 :     }
    1554                 : 
    1555               2 :     fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
    1556               2 :         let mut expected: Vec<String> = expected
    1557               2 :             .iter()
    1558               8 :             .map(|x| format!("{}{}", x, generation.get_suffix()))
    1559               2 :             .collect();
    1560               2 :         expected.sort();
    1561               2 : 
    1562               2 :         let mut found: Vec<String> = Vec::new();
    1563               8 :         for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
    1564               8 :             let entry_name = entry.file_name();
    1565               8 :             let fname = entry_name.to_str().unwrap();
    1566               8 :             found.push(String::from(fname));
    1567               8 :         }
    1568               2 :         found.sort();
    1569               2 : 
    1570               2 :         assert_eq!(found, expected);
    1571               2 :     }
    1572                 : 
    1573                 :     struct TestSetup {
    1574                 :         harness: TenantHarness,
    1575                 :         tenant: Arc<Tenant>,
    1576                 :         timeline: Arc<Timeline>,
    1577                 :         tenant_ctx: RequestContext,
    1578                 :     }
    1579                 : 
    1580                 :     impl TestSetup {
    1581               4 :         async fn new(test_name: &str) -> anyhow::Result<Self> {
    1582               4 :             let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
    1583               4 :             let harness = TenantHarness::create(test_name)?;
    1584               4 :             let (tenant, ctx) = harness.load().await;
    1585                 : 
    1586               4 :             let timeline = tenant
    1587               4 :                 .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1588               8 :                 .await?;
    1589                 : 
    1590               4 :             Ok(Self {
    1591               4 :                 harness,
    1592               4 :                 tenant,
    1593               4 :                 timeline,
    1594               4 :                 tenant_ctx: ctx,
    1595               4 :             })
    1596               4 :         }
    1597                 : 
    1598                 :         /// Construct a RemoteTimelineClient in an arbitrary generation
    1599               5 :         fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
    1600               5 :             Arc::new(RemoteTimelineClient {
    1601               5 :                 conf: self.harness.conf,
    1602               5 :                 runtime: tokio::runtime::Handle::current(),
    1603               5 :                 tenant_id: self.harness.tenant_id,
    1604               5 :                 timeline_id: TIMELINE_ID,
    1605               5 :                 generation,
    1606               5 :                 storage_impl: self.harness.remote_storage.clone(),
    1607               5 :                 deletion_queue_client: self.harness.deletion_queue.new_client(),
    1608               5 :                 upload_queue: Mutex::new(UploadQueue::Uninitialized),
    1609               5 :                 metrics: Arc::new(RemoteTimelineClientMetrics::new(
    1610               5 :                     &self.harness.tenant_id,
    1611               5 :                     &TIMELINE_ID,
    1612               5 :                 )),
    1613               5 :             })
    1614               5 :         }
    1615                 : 
    1616                 :         /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
    1617                 :         /// and timeline_id are present.
    1618               3 :         fn span(&self) -> tracing::Span {
    1619               3 :             tracing::info_span!(
    1620               3 :                 "test",
    1621               3 :                 tenant_id = %self.harness.tenant_id,
    1622               3 :                 timeline_id = %TIMELINE_ID
    1623               3 :             )
    1624               3 :         }
    1625                 :     }
    1626                 : 
    1627                 :     // Test scheduling
    1628               1 :     #[tokio::test]
    1629               1 :     async fn upload_scheduling() {
    1630                 :         // Test outline:
    1631                 :         //
    1632                 :         // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
    1633                 :         // Schedule upload of index. Check that it is queued
    1634                 :         // let the layer file uploads finish. Check that the index-upload is now started
    1635                 :         // let the index-upload finish.
    1636                 :         //
    1637                 :         // Download back the index.json. Check that the list of files is correct
    1638                 :         //
    1639                 :         // Schedule upload. Schedule deletion. Check that the deletion is queued
    1640                 :         // let upload finish. Check that deletion is now started
    1641                 :         // Schedule another deletion. Check that it's launched immediately.
    1642                 :         // Schedule index upload. Check that it's queued
    1643                 : 
    1644               3 :         let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
    1645               1 :         let span = test_setup.span();
    1646               1 :         let _guard = span.enter();
    1647               1 : 
    1648               1 :         let TestSetup {
    1649               1 :             harness,
    1650               1 :             tenant: _tenant,
    1651               1 :             timeline,
    1652               1 :             tenant_ctx: _tenant_ctx,
    1653               1 :         } = test_setup;
    1654               1 : 
    1655               1 :         let client = timeline.remote_client.as_ref().unwrap();
    1656                 : 
    1657                 :         // Download back the index.json, and check that the list of files is correct
    1658               3 :         let initial_index_part = match client.download_index_file().await.unwrap() {
    1659               1 :             MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
    1660 UBC           0 :             MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
    1661                 :         };
    1662 CBC           1 :         let initial_layers = initial_index_part
    1663               1 :             .layer_metadata
    1664               1 :             .keys()
    1665               1 :             .map(|f| f.to_owned())
    1666               1 :             .collect::<HashSet<LayerFileName>>();
    1667               1 :         let initial_layer = {
    1668               1 :             assert!(initial_layers.len() == 1);
    1669               1 :             initial_layers.into_iter().next().unwrap()
    1670               1 :         };
    1671               1 : 
    1672               1 :         let timeline_path = harness.timeline_path(&TIMELINE_ID);
    1673               1 : 
    1674               1 :         println!("workdir: {}", harness.conf.workdir);
    1675               1 : 
    1676               1 :         let remote_timeline_dir = harness
    1677               1 :             .remote_fs_dir
    1678               1 :             .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
    1679               1 :         println!("remote_timeline_dir: {remote_timeline_dir}");
    1680               1 : 
    1681               1 :         let generation = harness.generation;
    1682               1 : 
    1683               1 :         // Create a couple of dummy files,  schedule upload for them
    1684               1 :         let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
    1685               1 :         let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
    1686               1 :         let layer_file_name_3: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap();
    1687               1 :         let content_1 = dummy_contents("foo");
    1688               1 :         let content_2 = dummy_contents("bar");
    1689               1 :         let content_3 = dummy_contents("baz");
    1690                 : 
    1691               3 :         for (filename, content) in [
    1692               1 :             (&layer_file_name_1, &content_1),
    1693               1 :             (&layer_file_name_2, &content_2),
    1694               1 :             (&layer_file_name_3, &content_3),
    1695               3 :         ] {
    1696               3 :             std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
    1697               3 :         }
    1698                 : 
    1699               1 :         client
    1700               1 :             .schedule_layer_file_upload(
    1701               1 :                 &layer_file_name_1,
    1702               1 :                 &LayerFileMetadata::new(content_1.len() as u64, generation),
    1703               1 :             )
    1704               1 :             .unwrap();
    1705               1 :         client
    1706               1 :             .schedule_layer_file_upload(
    1707               1 :                 &layer_file_name_2,
    1708               1 :                 &LayerFileMetadata::new(content_2.len() as u64, generation),
    1709               1 :             )
    1710               1 :             .unwrap();
    1711               1 : 
    1712               1 :         // Check that they are started immediately, not queued
    1713               1 :         //
    1714               1 :         // this works because we running within block_on, so any futures are now queued up until
    1715               1 :         // our next await point.
    1716               1 :         {
    1717               1 :             let mut guard = client.upload_queue.lock().unwrap();
    1718               1 :             let upload_queue = guard.initialized_mut().unwrap();
    1719               1 :             assert!(upload_queue.queued_operations.is_empty());
    1720               1 :             assert!(upload_queue.inprogress_tasks.len() == 2);
    1721               1 :             assert!(upload_queue.num_inprogress_layer_uploads == 2);
    1722                 : 
    1723                 :             // also check that `latest_file_changes` was updated
    1724               1 :             assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
    1725                 :         }
    1726                 : 
    1727                 :         // Schedule upload of index. Check that it is queued
    1728               1 :         let metadata = dummy_metadata(Lsn(0x20));
    1729               1 :         client
    1730               1 :             .schedule_index_upload_for_metadata_update(&metadata)
    1731               1 :             .unwrap();
    1732               1 :         {
    1733               1 :             let mut guard = client.upload_queue.lock().unwrap();
    1734               1 :             let upload_queue = guard.initialized_mut().unwrap();
    1735               1 :             assert!(upload_queue.queued_operations.len() == 1);
    1736               1 :             assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
    1737                 :         }
    1738                 : 
    1739                 :         // Wait for the uploads to finish
    1740               1 :         client.wait_completion().await.unwrap();
    1741               1 :         {
    1742               1 :             let mut guard = client.upload_queue.lock().unwrap();
    1743               1 :             let upload_queue = guard.initialized_mut().unwrap();
    1744               1 : 
    1745               1 :             assert!(upload_queue.queued_operations.is_empty());
    1746               1 :             assert!(upload_queue.inprogress_tasks.is_empty());
    1747                 :         }
    1748                 : 
    1749                 :         // Download back the index.json, and check that the list of files is correct
    1750               3 :         let index_part = match client.download_index_file().await.unwrap() {
    1751               1 :             MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
    1752 UBC           0 :             MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
    1753                 :         };
    1754                 : 
    1755 CBC           1 :         assert_file_list(
    1756               1 :             &index_part
    1757               1 :                 .layer_metadata
    1758               1 :                 .keys()
    1759               3 :                 .map(|f| f.to_owned())
    1760               1 :                 .collect(),
    1761               1 :             &[
    1762               1 :                 &initial_layer.file_name(),
    1763               1 :                 &layer_file_name_1.file_name(),
    1764               1 :                 &layer_file_name_2.file_name(),
    1765               1 :             ],
    1766               1 :         );
    1767               1 :         assert_eq!(index_part.metadata, metadata);
    1768                 : 
    1769                 :         // Schedule upload and then a deletion. Check that the deletion is queued
    1770               1 :         client
    1771               1 :             .schedule_layer_file_upload(
    1772               1 :                 &layer_file_name_3,
    1773               1 :                 &LayerFileMetadata::new(content_3.len() as u64, generation),
    1774               1 :             )
    1775               1 :             .unwrap();
    1776               1 :         client
    1777               1 :             .schedule_layer_file_deletion([layer_file_name_1.clone()].to_vec())
    1778               1 :             .unwrap();
    1779               1 :         {
    1780               1 :             let mut guard = client.upload_queue.lock().unwrap();
    1781               1 :             let upload_queue = guard.initialized_mut().unwrap();
    1782               1 : 
    1783               1 :             // Deletion schedules upload of the index file, and the file deletion itself
    1784               1 :             assert!(upload_queue.queued_operations.len() == 2);
    1785               1 :             assert!(upload_queue.inprogress_tasks.len() == 1);
    1786               1 :             assert!(upload_queue.num_inprogress_layer_uploads == 1);
    1787               1 :             assert!(upload_queue.num_inprogress_deletions == 0);
    1788               1 :             assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
    1789                 :         }
    1790               1 :         assert_remote_files(
    1791               1 :             &[
    1792               1 :                 &initial_layer.file_name(),
    1793               1 :                 &layer_file_name_1.file_name(),
    1794               1 :                 &layer_file_name_2.file_name(),
    1795               1 :                 "index_part.json",
    1796               1 :             ],
    1797               1 :             &remote_timeline_dir,
    1798               1 :             generation,
    1799               1 :         );
    1800               1 : 
    1801               1 :         // Finish them
    1802               1 :         client.wait_completion().await.unwrap();
    1803               1 :         harness.deletion_queue.pump().await;
    1804                 : 
    1805               1 :         assert_remote_files(
    1806               1 :             &[
    1807               1 :                 &initial_layer.file_name(),
    1808               1 :                 &layer_file_name_2.file_name(),
    1809               1 :                 &layer_file_name_3.file_name(),
    1810               1 :                 "index_part.json",
    1811               1 :             ],
    1812               1 :             &remote_timeline_dir,
    1813               1 :             generation,
    1814               1 :         );
    1815                 :     }
    1816                 : 
    1817               1 :     #[tokio::test]
    1818               1 :     async fn bytes_unfinished_gauge_for_layer_file_uploads() {
    1819                 :         // Setup
    1820                 : 
    1821                 :         let TestSetup {
    1822               1 :             harness,
    1823               1 :             tenant: _tenant,
    1824               1 :             timeline,
    1825                 :             ..
    1826               3 :         } = TestSetup::new("metrics").await.unwrap();
    1827               1 :         let client = timeline.remote_client.as_ref().unwrap();
    1828               1 :         let timeline_path = harness.timeline_path(&TIMELINE_ID);
    1829               1 : 
    1830               1 :         let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
    1831               1 :         let content_1 = dummy_contents("foo");
    1832               1 :         std::fs::write(
    1833               1 :             timeline_path.join(layer_file_name_1.file_name()),
    1834               1 :             &content_1,
    1835               1 :         )
    1836               1 :         .unwrap();
    1837               1 : 
    1838               2 :         #[derive(Debug, PartialEq, Clone, Copy)]
    1839               1 :         struct BytesStartedFinished {
    1840               1 :             started: Option<usize>,
    1841               1 :             finished: Option<usize>,
    1842               1 :         }
    1843               1 :         impl std::ops::Add for BytesStartedFinished {
    1844               1 :             type Output = Self;
    1845               2 :             fn add(self, rhs: Self) -> Self::Output {
    1846               2 :                 Self {
    1847               2 :                     started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
    1848               2 :                     finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
    1849               2 :                 }
    1850               2 :             }
    1851               1 :         }
    1852               3 :         let get_bytes_started_stopped = || {
    1853               3 :             let started = client
    1854               3 :                 .metrics
    1855               3 :                 .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
    1856               3 :                 .map(|v| v.try_into().unwrap());
    1857               3 :             let stopped = client
    1858               3 :                 .metrics
    1859               3 :                 .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
    1860               3 :                 .map(|v| v.try_into().unwrap());
    1861               3 :             BytesStartedFinished {
    1862               3 :                 started,
    1863               3 :                 finished: stopped,
    1864               3 :             }
    1865               3 :         };
    1866                 : 
    1867                 :         // Test
    1868               1 :         tracing::info!("now doing actual test");
    1869                 : 
    1870               1 :         let actual_a = get_bytes_started_stopped();
    1871               1 : 
    1872               1 :         client
    1873               1 :             .schedule_layer_file_upload(
    1874               1 :                 &layer_file_name_1,
    1875               1 :                 &LayerFileMetadata::new(content_1.len() as u64, harness.generation),
    1876               1 :             )
    1877               1 :             .unwrap();
    1878               1 : 
    1879               1 :         let actual_b = get_bytes_started_stopped();
    1880               1 : 
    1881               1 :         client.wait_completion().await.unwrap();
    1882               1 : 
    1883               1 :         let actual_c = get_bytes_started_stopped();
    1884               1 : 
    1885               1 :         // Validate
    1886               1 : 
    1887               1 :         let expected_b = actual_a
    1888               1 :             + BytesStartedFinished {
    1889               1 :                 started: Some(content_1.len()),
    1890               1 :                 // assert that the _finished metric is created eagerly so that subtractions work on first sample
    1891               1 :                 finished: Some(0),
    1892               1 :             };
    1893               1 :         assert_eq!(actual_b, expected_b);
    1894                 : 
    1895               1 :         let expected_c = actual_a
    1896               1 :             + BytesStartedFinished {
    1897               1 :                 started: Some(content_1.len()),
    1898               1 :                 finished: Some(content_1.len()),
    1899               1 :             };
    1900               1 :         assert_eq!(actual_c, expected_c);
    1901                 :     }
    1902                 : 
    1903               6 :     async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
    1904               6 :         // An empty IndexPart, just sufficient to ensure deserialization will succeed
    1905               6 :         let example_metadata = TimelineMetadata::example();
    1906               6 :         let example_index_part = IndexPart::new(
    1907               6 :             HashMap::new(),
    1908               6 :             example_metadata.disk_consistent_lsn(),
    1909               6 :             example_metadata,
    1910               6 :         );
    1911               6 : 
    1912               6 :         let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
    1913               6 : 
    1914               6 :         let timeline_path = test_state.harness.timeline_path(&TIMELINE_ID);
    1915               6 :         let remote_timeline_dir = test_state.harness.remote_fs_dir.join(
    1916               6 :             timeline_path
    1917               6 :                 .strip_prefix(&test_state.harness.conf.workdir)
    1918               6 :                 .unwrap(),
    1919               6 :         );
    1920               6 : 
    1921               6 :         std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work");
    1922               6 : 
    1923               6 :         let index_path = test_state.harness.remote_fs_dir.join(
    1924               6 :             remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(),
    1925               6 :         );
    1926               6 :         eprintln!("Writing {index_path}");
    1927               6 :         std::fs::write(&index_path, index_part_bytes).unwrap();
    1928               6 :         example_index_part
    1929               6 :     }
    1930                 : 
    1931                 :     /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
    1932                 :     /// index, the IndexPart returned is equal to `expected`
    1933               5 :     async fn assert_got_index_part(
    1934               5 :         test_state: &TestSetup,
    1935               5 :         get_generation: Generation,
    1936               5 :         expected: &IndexPart,
    1937               5 :     ) {
    1938               5 :         let client = test_state.build_client(get_generation);
    1939                 : 
    1940               5 :         let download_r = client
    1941               5 :             .download_index_file()
    1942              18 :             .await
    1943               5 :             .expect("download should always succeed");
    1944               5 :         assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
    1945               5 :         match download_r {
    1946               5 :             MaybeDeletedIndexPart::IndexPart(index_part) => {
    1947               5 :                 assert_eq!(&index_part, expected);
    1948                 :             }
    1949 UBC           0 :             MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
    1950                 :         }
    1951 CBC           5 :     }
    1952                 : 
    1953               1 :     #[tokio::test]
    1954               1 :     async fn index_part_download_simple() -> anyhow::Result<()> {
    1955               3 :         let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
    1956               1 :         let span = test_state.span();
    1957               1 :         let _guard = span.enter();
    1958               1 : 
    1959               1 :         // Simple case: we are in generation N, load the index from generation N - 1
    1960               1 :         let generation_n = 5;
    1961               1 :         let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
    1962                 : 
    1963               3 :         assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
    1964                 : 
    1965               1 :         Ok(())
    1966                 :     }
    1967                 : 
    1968               1 :     #[tokio::test]
    1969               1 :     async fn index_part_download_ordering() -> anyhow::Result<()> {
    1970               1 :         let test_state = TestSetup::new("index_part_download_ordering")
    1971               3 :             .await
    1972               1 :             .unwrap();
    1973               1 : 
    1974               1 :         let span = test_state.span();
    1975               1 :         let _guard = span.enter();
    1976               1 : 
    1977               1 :         // A generation-less IndexPart exists in the bucket, we should find it
    1978               1 :         let generation_n = 5;
    1979               1 :         let injected_none = inject_index_part(&test_state, Generation::none()).await;
    1980               4 :         assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
    1981                 : 
    1982                 :         // If a more recent-than-none generation exists, we should prefer to load that
    1983               1 :         let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
    1984               4 :         assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
    1985                 : 
    1986                 :         // If a more-recent-than-me generation exists, we should ignore it.
    1987               1 :         let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
    1988               4 :         assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
    1989                 : 
    1990                 :         // If a directly previous generation exists, _and_ an index exists in my own
    1991                 :         // generation, I should prefer my own generation.
    1992               1 :         let _injected_prev =
    1993               1 :             inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
    1994               1 :         let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
    1995               1 :         assert_got_index_part(
    1996               1 :             &test_state,
    1997               1 :             Generation::new(generation_n),
    1998               1 :             &injected_current,
    1999               1 :         )
    2000               3 :         .await;
    2001                 : 
    2002               1 :         Ok(())
    2003                 :     }
    2004                 : }
        

Generated by: LCOV version 2.1-beta