LCOV - code coverage report
Current view: top level - pageserver/src/tenant - remote_timeline_client.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 97.1 % 1050 1020
Test Date: 2023-09-06 10:18:01 Functions: 82.1 % 117 96

            Line data    Source code
       1              : //! This module manages synchronizing local FS with remote storage.
       2              : //!
       3              : //! # Overview
       4              : //!
       5              : //! * [`RemoteTimelineClient`] provides functions related to upload/download of a particular timeline.
       6              : //!   It contains a queue of pending uploads, and manages the queue, performing uploads in parallel
       7              : //!   when it's safe to do so.
       8              : //!
       9              : //! * Stand-alone function, [`list_remote_timelines`], to get list of timelines of a tenant.
      10              : //!
      11              : //! These functions use the low-level remote storage client, [`remote_storage::RemoteStorage`].
      12              : //!
      13              : //! # APIs & How To Use Them
      14              : //!
      15              : //! There is a [RemoteTimelineClient] for each [Timeline][`crate::tenant::Timeline`] in the system,
      16              : //! unless the pageserver is configured without remote storage.
      17              : //!
      18              : //! We allocate the client instance in [Timeline][`crate::tenant::Timeline`], i.e.,
      19              : //! either in [`crate::tenant::mgr`] during startup or when creating a new
      20              : //! timeline.
      21              : //! However, the client does not become ready for use until we've initialized its upload queue:
      22              : //!
      23              : //! - For timelines that already have some state on the remote storage, we use
      24              : //!   [`RemoteTimelineClient::init_upload_queue`] .
      25              : //! - For newly created timelines, we use
      26              : //!   [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
      27              : //!
      28              : //! The former takes the remote's [`IndexPart`] as an argument, possibly retrieved
      29              : //! using [`list_remote_timelines`]. We'll elaborate on [`IndexPart`] in the next section.
      30              : //!
      31              : //! Whenever we've created/updated/deleted a file in a timeline directory, we schedule
      32              : //! the corresponding remote operation with the timeline's [`RemoteTimelineClient`]:
      33              : //!
      34              : //! - [`RemoteTimelineClient::schedule_layer_file_upload`]  when we've created a new layer file.
      35              : //! - [`RemoteTimelineClient::schedule_index_upload_for_metadata_update`] when we've updated the timeline metadata file.
      36              : //! - [`RemoteTimelineClient::schedule_index_upload_for_file_changes`] to upload an updated index file, after we've scheduled file uploads
      37              : //! - [`RemoteTimelineClient::schedule_layer_file_deletion`] when we've deleted one or more layer files.
      38              : //!
      39              : //! Internally, these functions create [`UploadOp`]s and put them in a queue.
      40              : //!
      41              : //! There are also APIs for downloading files.
      42              : //! These are not part of the aforementioned queuing and will not be discussed
      43              : //! further here, except in the section covering tenant attach.
      44              : //!
      45              : //! # Remote Storage Structure & [`IndexPart`] Index File
      46              : //!
      47              : //! The "directory structure" in the remote storage mirrors the local directory structure, with paths
      48              : //! like `tenants/<tenant_id>/timelines/<timeline_id>/<layer filename>`.
      49              : //! Yet instead of keeping the `metadata` file remotely, we wrap it with more
      50              : //! data in an "index file" aka [`IndexPart`], containing the list of **all** remote
      51              : //! files for a given timeline.
      52              : //! If a file is not referenced from [`IndexPart`], it's not part of the remote storage state.
      53              : //!
      54              : //! Having the `IndexPart` also avoids expensive and slow `S3 list` commands.
      55              : //!
      56              : //! # Consistency
      57              : //!
      58              : //! To have a consistent remote structure, it's important that uploads and
      59              : //! deletions are performed in the right order. For example, the index file
      60              : //! contains a list of layer files, so it must not be uploaded until all the
      61              : //! layer files that are in its list have been successfully uploaded.
      62              : //!
      63              : //! The contract between client and its user is that the user is responsible of
      64              : //! scheduling operations in an order that keeps the remote consistent as
      65              : //! described above.
      66              : //! From the user's perspective, the operations are executed sequentially.
      67              : //! Internally, the client knows which operations can be performed in parallel,
      68              : //! and which operations act like a "barrier" that require preceding operations
      69              : //! to finish. The calling code just needs to call the schedule-functions in the
      70              : //! correct order, and the client will parallelize the operations in a way that
      71              : //! is safe.
      72              : //!
      73              : //! The caller should be careful with deletion, though. They should not delete
      74              : //! local files that have been scheduled for upload but not yet finished uploading.
      75              : //! Otherwise the upload will fail. To wait for an upload to finish, use
      76              : //! the 'wait_completion' function (more on that later.)
      77              : //!
      78              : //! All of this relies on the following invariants:
      79              : //!
      80              : //! - We rely on read-after write consistency in the remote storage.
      81              : //! - Layer files are immutable
      82              : //!
      83              : //! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
      84              : //! storage. Different tenants can be attached to different pageservers, but if the
      85              : //! same tenant is attached to two pageservers at the same time, they will overwrite
      86              : //! each other's index file updates, and confusion will ensue. There's no interlock or
      87              : //! mechanism to detect that in the pageserver, we rely on the control plane to ensure
      88              : //! that that doesn't happen.
      89              : //!
      90              : //! ## Implementation Note
      91              : //!
      92              : //! The *actual* remote state lags behind the *desired* remote state while
      93              : //! there are in-flight operations.
      94              : //! We keep track of the desired remote state in
      95              : //! [`UploadQueueInitialized::latest_files`] and [`UploadQueueInitialized::latest_metadata`].
      96              : //! It is initialized based on the [`IndexPart`] that was passed during init
      97              : //! and updated with every `schedule_*` function call.
      98              : //! All this is necessary necessary to compute the future [`IndexPart`]s
      99              : //! when scheduling an operation while other operations that also affect the
     100              : //! remote [`IndexPart`] are in flight.
     101              : //!
     102              : //! # Retries & Error Handling
     103              : //!
     104              : //! The client retries operations indefinitely, using exponential back-off.
     105              : //! There is no way to force a retry, i.e., interrupt the back-off.
     106              : //! This could be built easily.
     107              : //!
     108              : //! # Cancellation
     109              : //!
     110              : //! The operations execute as plain [`task_mgr`] tasks, scoped to
     111              : //! the client's tenant and timeline.
     112              : //! Dropping the client will drop queued operations but not executing operations.
     113              : //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
     114              : //! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
     115              : //!
     116              : //! # Completion
     117              : //!
     118              : //! Once an operation has completed, we update
     119              : //! [`UploadQueueInitialized::last_uploaded_consistent_lsn`] which indicates
     120              : //! to safekeepers that they can delete the WAL up to that LSN.
     121              : //!
     122              : //! The [`RemoteTimelineClient::wait_completion`] method can be used to wait
     123              : //! for all pending operations to complete. It does not prevent more
     124              : //! operations from getting scheduled.
     125              : //!
     126              : //! # Crash Consistency
     127              : //!
     128              : //! We do not persist the upload queue state.
     129              : //! If we drop the client, or crash, all unfinished operations are lost.
     130              : //!
     131              : //! To recover, the following steps need to be taken:
     132              : //! - Retrieve the current remote [`IndexPart`]. This gives us a
     133              : //!   consistent remote state, assuming the user scheduled the operations in
     134              : //!   the correct order.
     135              : //! - Initiate upload queue with that [`IndexPart`].
     136              : //! - Reschedule all lost operations by comparing the local filesystem state
     137              : //!   and remote state as per [`IndexPart`]. This is done in
     138              : //!   [`Tenant::timeline_init_and_sync`].
     139              : //!
     140              : //! Note that if we crash during file deletion between the index update
     141              : //! that removes the file from the list of files, and deleting the remote file,
     142              : //! the file is leaked in the remote storage. Similarly, if a new file is created
     143              : //! and uploaded, but the pageserver dies permanently before updating the
     144              : //! remote index file, the new file is leaked in remote storage. We accept and
     145              : //! tolerate that for now.
     146              : //! Note further that we cannot easily fix this by scheduling deletes for every
     147              : //! file that is present only on the remote, because we cannot distinguish the
     148              : //! following two cases:
     149              : //! - (1) We had the file locally, deleted it locally, scheduled a remote delete,
     150              : //!   but crashed before it finished remotely.
     151              : //! - (2) We never had the file locally because we haven't on-demand downloaded
     152              : //!   it yet.
     153              : //!
     154              : //! # Downloads
     155              : //!
     156              : //! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
     157              : //! downloading files from the remote storage. Downloads are performed immediately
     158              : //! against the `RemoteStorage`, independently of the upload queue.
     159              : //!
     160              : //! When we attach a tenant, we perform the following steps:
     161              : //! - create `Tenant` object in `TenantState::Attaching` state
     162              : //! - List timelines that are present in remote storage, and for each:
     163              : //!   - download their remote [`IndexPart`]s
     164              : //!   - create `Timeline` struct and a `RemoteTimelineClient`
     165              : //!   - initialize the client's upload queue with its `IndexPart`
     166              : //!   - create [`RemoteLayer`](super::storage_layer::RemoteLayer) instances
     167              : //!     for layers that are referenced by `IndexPart` but not present locally
     168              : //!   - schedule uploads for layers that are only present locally.
     169              : //!   - if the remote `IndexPart`'s metadata was newer than the metadata in
     170              : //!     the local filesystem, write the remote metadata to the local filesystem
     171              : //! - After the above is done for each timeline, open the tenant for business by
     172              : //!   transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
     173              : //!   This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
     174              : //!
     175              : //! We keep track of the fact that a client is in `Attaching` state in a marker
     176              : //! file on the local disk. This is critical because, when we restart the pageserver,
     177              : //! we do not want to do the `List timelines` step for each tenant that has already
     178              : //! been successfully attached (for performance & cost reasons).
     179              : //! Instead, for a tenant without the attach marker file, we assume that the
     180              : //! local state is in sync or ahead of the remote state. This includes the list
     181              : //! of all of the tenant's timelines, which is particularly critical to be up-to-date:
     182              : //! if there's a timeline on the remote that the pageserver doesn't know about,
     183              : //! the GC will not consider its branch point, leading to data loss.
     184              : //! So, for a tenant with the attach marker file, we know that we do not yet have
     185              : //! persisted all the remote timeline's metadata files locally. To exclude the
     186              : //! risk above, we re-run the procedure for such tenants
     187              : //!
     188              : //! # Operating Without Remote Storage
     189              : //!
     190              : //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
     191              : //! not created and the uploads are skipped.
     192              : //! Theoretically, it should be ok to remove and re-add remote storage configuration to
     193              : //! the pageserver config at any time, since it doesn't make a difference to
     194              : //! [`Timeline::load_layer_map`].
     195              : //! Of course, the remote timeline dir must not change while we have de-configured
     196              : //! remote storage, i.e., the pageserver must remain the owner of the given prefix
     197              : //! in remote storage.
     198              : //! But note that we don't test any of this right now.
     199              : //!
     200              : //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
     201              : //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
     202              : 
     203              : mod delete;
     204              : mod download;
     205              : pub mod index;
     206              : mod upload;
     207              : 
     208              : use anyhow::Context;
     209              : use chrono::{NaiveDateTime, Utc};
     210              : // re-export these
     211              : pub use download::{is_temp_download_file, list_remote_timelines};
     212              : use scopeguard::ScopeGuard;
     213              : use tokio_util::sync::CancellationToken;
     214              : use utils::backoff::{
     215              :     self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
     216              : };
     217              : 
     218              : use std::collections::{HashMap, VecDeque};
     219              : use std::path::{Path, PathBuf};
     220              : use std::sync::atomic::{AtomicU32, Ordering};
     221              : use std::sync::{Arc, Mutex};
     222              : 
     223              : use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
     224              : use std::ops::DerefMut;
     225              : use tracing::{debug, error, info, instrument, warn};
     226              : use tracing::{info_span, Instrument};
     227              : use utils::lsn::Lsn;
     228              : 
     229              : use crate::metrics::{
     230              :     MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
     231              :     RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
     232              :     REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
     233              : };
     234              : use crate::task_mgr::shutdown_token;
     235              : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
     236              : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
     237              : use crate::tenant::upload_queue::Delete;
     238              : use crate::tenant::TIMELINES_SEGMENT_NAME;
     239              : use crate::{
     240              :     config::PageServerConf,
     241              :     task_mgr,
     242              :     task_mgr::TaskKind,
     243              :     task_mgr::BACKGROUND_RUNTIME,
     244              :     tenant::metadata::TimelineMetadata,
     245              :     tenant::upload_queue::{
     246              :         UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
     247              :     },
     248              : };
     249              : 
     250              : use utils::id::{TenantId, TimelineId};
     251              : 
     252              : use self::index::IndexPart;
     253              : 
     254              : use super::storage_layer::LayerFileName;
     255              : use super::upload_queue::SetDeletedFlagProgress;
     256              : use super::Generation;
     257              : 
     258              : // Occasional network issues and such can cause remote operations to fail, and
     259              : // that's expected. If a download fails, we log it at info-level, and retry.
     260              : // But after FAILED_DOWNLOAD_WARN_THRESHOLD retries, we start to log it at WARN
     261              : // level instead, as repeated failures can mean a more serious problem. If it
     262              : // fails more than FAILED_DOWNLOAD_RETRIES times, we give up
     263              : pub(crate) const FAILED_DOWNLOAD_WARN_THRESHOLD: u32 = 3;
     264              : pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
     265              : 
     266              : // Similarly log failed uploads and deletions at WARN level, after this many
     267              : // retries. Uploads and deletions are retried forever, though.
     268              : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
     269              : 
     270              : pub enum MaybeDeletedIndexPart {
     271              :     IndexPart(IndexPart),
     272              :     Deleted(IndexPart),
     273              : }
     274              : 
     275              : /// Errors that can arise when calling [`RemoteTimelineClient::stop`].
     276            0 : #[derive(Debug, thiserror::Error)]
     277              : pub enum StopError {
     278              :     /// Returned if the upload queue was never initialized.
     279              :     /// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
     280              :     #[error("queue is not initialized")]
     281              :     QueueUninitialized,
     282              : }
     283              : 
     284            0 : #[derive(Debug, thiserror::Error)]
     285              : pub enum PersistIndexPartWithDeletedFlagError {
     286              :     #[error("another task is already setting the deleted_flag, started at {0:?}")]
     287              :     AlreadyInProgress(NaiveDateTime),
     288              :     #[error("the deleted_flag was already set, value is {0:?}")]
     289              :     AlreadyDeleted(NaiveDateTime),
     290              :     #[error(transparent)]
     291              :     Other(#[from] anyhow::Error),
     292              : }
     293              : 
     294              : /// A client for accessing a timeline's data in remote storage.
     295              : ///
     296              : /// This takes care of managing the number of connections, and balancing them
     297              : /// across tenants. This also handles retries of failed uploads.
     298              : ///
     299              : /// Upload and delete requests are ordered so that before a deletion is
     300              : /// performed, we wait for all preceding uploads to finish. This ensures sure
     301              : /// that if you perform a compaction operation that reshuffles data in layer
     302              : /// files, we don't have a transient state where the old files have already been
     303              : /// deleted, but new files have not yet been uploaded.
     304              : ///
     305              : /// Similarly, this enforces an order between index-file uploads, and layer
     306              : /// uploads.  Before an index-file upload is performed, all preceding layer
     307              : /// uploads must be finished.
     308              : ///
     309              : /// This also maintains a list of remote files, and automatically includes that
     310              : /// in the index part file, whenever timeline metadata is uploaded.
     311              : ///
     312              : /// Downloads are not queued, they are performed immediately.
     313              : pub struct RemoteTimelineClient {
     314              :     conf: &'static PageServerConf,
     315              : 
     316              :     runtime: tokio::runtime::Handle,
     317              : 
     318              :     tenant_id: TenantId,
     319              :     timeline_id: TimelineId,
     320              :     generation: Generation,
     321              : 
     322              :     upload_queue: Mutex<UploadQueue>,
     323              : 
     324              :     metrics: Arc<RemoteTimelineClientMetrics>,
     325              : 
     326              :     storage_impl: GenericRemoteStorage,
     327              : }
     328              : 
     329              : impl RemoteTimelineClient {
     330              :     ///
     331              :     /// Create a remote storage client for given timeline
     332              :     ///
     333              :     /// Note: the caller must initialize the upload queue before any uploads can be scheduled,
     334              :     /// by calling init_upload_queue.
     335              :     ///
     336          765 :     pub fn new(
     337          765 :         remote_storage: GenericRemoteStorage,
     338          765 :         conf: &'static PageServerConf,
     339          765 :         tenant_id: TenantId,
     340          765 :         timeline_id: TimelineId,
     341          765 :         generation: Generation,
     342          765 :     ) -> RemoteTimelineClient {
     343          765 :         RemoteTimelineClient {
     344          765 :             conf,
     345          765 :             runtime: if cfg!(test) {
     346              :                 // remote_timeline_client.rs tests rely on current-thread runtime
     347          143 :                 tokio::runtime::Handle::current()
     348              :             } else {
     349          622 :                 BACKGROUND_RUNTIME.handle().clone()
     350              :             },
     351          765 :             tenant_id,
     352          765 :             timeline_id,
     353          765 :             generation,
     354          765 :             storage_impl: remote_storage,
     355          765 :             upload_queue: Mutex::new(UploadQueue::Uninitialized),
     356          765 :             metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)),
     357          765 :         }
     358          765 :     }
     359              : 
     360              :     /// Initialize the upload queue for a remote storage that already received
     361              :     /// an index file upload, i.e., it's not empty.
     362              :     /// The given `index_part` must be the one on the remote.
     363          178 :     pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
     364          178 :         let mut upload_queue = self.upload_queue.lock().unwrap();
     365          178 :         upload_queue.initialize_with_current_remote_index_part(index_part)?;
     366          178 :         self.update_remote_physical_size_gauge(Some(index_part));
     367          178 :         info!(
     368          178 :             "initialized upload queue from remote index with {} layer files",
     369          178 :             index_part.layer_metadata.len()
     370          178 :         );
     371          178 :         Ok(())
     372          178 :     }
     373              : 
     374              :     /// Initialize the upload queue for the case where the remote storage is empty,
     375              :     /// i.e., it doesn't have an `IndexPart`.
     376          564 :     pub fn init_upload_queue_for_empty_remote(
     377          564 :         &self,
     378          564 :         local_metadata: &TimelineMetadata,
     379          564 :     ) -> anyhow::Result<()> {
     380          564 :         let mut upload_queue = self.upload_queue.lock().unwrap();
     381          564 :         upload_queue.initialize_empty_remote(local_metadata)?;
     382          564 :         self.update_remote_physical_size_gauge(None);
     383          564 :         info!("initialized upload queue as empty");
     384          564 :         Ok(())
     385          564 :     }
     386              : 
     387              :     /// Initialize the queue in stopped state. Used in startup path
     388              :     /// to continue deletion operation interrupted by pageserver crash or restart.
     389           21 :     pub fn init_upload_queue_stopped_to_continue_deletion(
     390           21 :         &self,
     391           21 :         index_part: &IndexPart,
     392           21 :     ) -> anyhow::Result<()> {
     393              :         // FIXME: consider newtype for DeletedIndexPart.
     394           21 :         let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
     395           21 :             "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
     396           21 :         ))?;
     397              : 
     398              :         {
     399           21 :             let mut upload_queue = self.upload_queue.lock().unwrap();
     400           21 :             upload_queue.initialize_with_current_remote_index_part(index_part)?;
     401           21 :             self.update_remote_physical_size_gauge(Some(index_part));
     402           21 :         }
     403           21 :         // also locks upload queue, without dropping the guard above it will be a deadlock
     404           21 :         self.stop().expect("initialized line above");
     405           21 : 
     406           21 :         let mut upload_queue = self.upload_queue.lock().unwrap();
     407           21 : 
     408           21 :         upload_queue
     409           21 :             .stopped_mut()
     410           21 :             .expect("stopped above")
     411           21 :             .deleted_at = SetDeletedFlagProgress::Successful(deleted_at);
     412           21 : 
     413           21 :         Ok(())
     414           21 :     }
     415              : 
     416       116151 :     pub fn last_uploaded_consistent_lsn(&self) -> Option<Lsn> {
     417       116151 :         match &*self.upload_queue.lock().unwrap() {
     418            0 :             UploadQueue::Uninitialized => None,
     419       116020 :             UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn),
     420          131 :             UploadQueue::Stopped(q) => {
     421          131 :                 Some(q.upload_queue_for_deletion.last_uploaded_consistent_lsn)
     422              :             }
     423              :         }
     424       116151 :     }
     425              : 
     426         5039 :     fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
     427         5039 :         let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
     428         4475 :             current_remote_index_part
     429         4475 :                 .layer_metadata
     430         4475 :                 .values()
     431         4475 :                 // If we don't have the file size for the layer, don't account for it in the metric.
     432       337618 :                 .map(|ilmd| ilmd.file_size)
     433         4475 :                 .sum()
     434              :         } else {
     435          564 :             0
     436              :         };
     437         5039 :         self.metrics.remote_physical_size_gauge().set(size);
     438         5039 :     }
     439              : 
     440           38 :     pub fn get_remote_physical_size(&self) -> u64 {
     441           38 :         self.metrics.remote_physical_size_gauge().get()
     442           38 :     }
     443              : 
     444              :     //
     445              :     // Download operations.
     446              :     //
     447              :     // These don't use the per-timeline queue. They do use the global semaphore in
     448              :     // S3Bucket, to limit the total number of concurrent operations, though.
     449              :     //
     450              : 
     451              :     /// Download index file
     452          204 :     pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
     453          204 :         let _unfinished_gauge_guard = self.metrics.call_begin(
     454          204 :             &RemoteOpFileKind::Index,
     455          204 :             &RemoteOpKind::Download,
     456          204 :             crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
     457          204 :                 reason: "no need for a downloads gauge",
     458          204 :             },
     459          204 :         );
     460              : 
     461          204 :         let index_part = download::download_index_part(
     462          204 :             &self.storage_impl,
     463          204 :             &self.tenant_id,
     464          204 :             &self.timeline_id,
     465          204 :             self.generation,
     466          204 :         )
     467          204 :         .measure_remote_op(
     468          204 :             self.tenant_id,
     469          204 :             self.timeline_id,
     470          204 :             RemoteOpFileKind::Index,
     471          204 :             RemoteOpKind::Download,
     472          204 :             Arc::clone(&self.metrics),
     473          204 :         )
     474          667 :         .await?;
     475              : 
     476          201 :         if index_part.deleted_at.is_some() {
     477           21 :             Ok(MaybeDeletedIndexPart::Deleted(index_part))
     478              :         } else {
     479          180 :             Ok(MaybeDeletedIndexPart::IndexPart(index_part))
     480              :         }
     481          204 :     }
     482              : 
     483              :     /// Download a (layer) file from `path`, into local filesystem.
     484              :     ///
     485              :     /// 'layer_metadata' is the metadata from the remote index file.
     486              :     ///
     487              :     /// On success, returns the size of the downloaded file.
     488         1043 :     pub async fn download_layer_file(
     489         1043 :         &self,
     490         1043 :         layer_file_name: &LayerFileName,
     491         1043 :         layer_metadata: &LayerFileMetadata,
     492         1043 :     ) -> anyhow::Result<u64> {
     493         1008 :         let downloaded_size = {
     494         1043 :             let _unfinished_gauge_guard = self.metrics.call_begin(
     495         1043 :                 &RemoteOpFileKind::Layer,
     496         1043 :                 &RemoteOpKind::Download,
     497         1043 :                 crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
     498         1043 :                     reason: "no need for a downloads gauge",
     499         1043 :                 },
     500         1043 :             );
     501         1043 :             download::download_layer_file(
     502         1043 :                 self.conf,
     503         1043 :                 &self.storage_impl,
     504         1043 :                 self.tenant_id,
     505         1043 :                 self.timeline_id,
     506         1043 :                 layer_file_name,
     507         1043 :                 layer_metadata,
     508         1043 :             )
     509         1043 :             .measure_remote_op(
     510         1043 :                 self.tenant_id,
     511         1043 :                 self.timeline_id,
     512         1043 :                 RemoteOpFileKind::Layer,
     513         1043 :                 RemoteOpKind::Download,
     514         1043 :                 Arc::clone(&self.metrics),
     515         1043 :             )
     516       360525 :             .await?
     517              :         };
     518              : 
     519         1008 :         REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
     520         1008 :         REMOTE_ONDEMAND_DOWNLOADED_BYTES.inc_by(downloaded_size);
     521         1008 : 
     522         1008 :         Ok(downloaded_size)
     523         1040 :     }
     524              : 
     525              :     //
     526              :     // Upload operations.
     527              :     //
     528              : 
     529              :     ///
     530              :     /// Launch an index-file upload operation in the background, with
     531              :     /// updated metadata.
     532              :     ///
     533              :     /// The upload will be added to the queue immediately, but it
     534              :     /// won't be performed until all previously scheduled layer file
     535              :     /// upload operations have completed successfully.  This is to
     536              :     /// ensure that when the index file claims that layers X, Y and Z
     537              :     /// exist in remote storage, they really do. To wait for the upload
     538              :     /// to complete, use `wait_completion`.
     539              :     ///
     540              :     /// If there were any changes to the list of files, i.e. if any
     541              :     /// layer file uploads were scheduled, since the last index file
     542              :     /// upload, those will be included too.
     543         4109 :     pub fn schedule_index_upload_for_metadata_update(
     544         4109 :         self: &Arc<Self>,
     545         4109 :         metadata: &TimelineMetadata,
     546         4109 :     ) -> anyhow::Result<()> {
     547         4109 :         let mut guard = self.upload_queue.lock().unwrap();
     548         4109 :         let upload_queue = guard.initialized_mut()?;
     549              : 
     550              :         // As documented in the struct definition, it's ok for latest_metadata to be
     551              :         // ahead of what's _actually_ on the remote during index upload.
     552         4109 :         upload_queue.latest_metadata = metadata.clone();
     553         4109 : 
     554         4109 :         self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
     555         4109 : 
     556         4109 :         Ok(())
     557         4109 :     }
     558              : 
     559              :     ///
     560              :     /// Launch an index-file upload operation in the background, if necessary.
     561              :     ///
     562              :     /// Use this function to schedule the update of the index file after
     563              :     /// scheduling file uploads or deletions. If no file uploads or deletions
     564              :     /// have been scheduled since the last index file upload, this does
     565              :     /// nothing.
     566              :     ///
     567              :     /// Like schedule_index_upload_for_metadata_update(), this merely adds
     568              :     /// the upload to the upload queue and returns quickly.
     569          812 :     pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
     570          812 :         let mut guard = self.upload_queue.lock().unwrap();
     571          812 :         let upload_queue = guard.initialized_mut()?;
     572              : 
     573          812 :         if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
     574           11 :             self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
     575          801 :         }
     576              : 
     577          812 :         Ok(())
     578          812 :     }
     579              : 
     580              :     /// Launch an index-file upload operation in the background (internal function)
     581         4336 :     fn schedule_index_upload(
     582         4336 :         self: &Arc<Self>,
     583         4336 :         upload_queue: &mut UploadQueueInitialized,
     584         4336 :         metadata: TimelineMetadata,
     585         4336 :     ) {
     586         4336 :         info!(
     587         4336 :             "scheduling metadata upload with {} files ({} changed)",
     588         4336 :             upload_queue.latest_files.len(),
     589         4336 :             upload_queue.latest_files_changes_since_metadata_upload_scheduled,
     590         4336 :         );
     591              : 
     592         4336 :         let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
     593         4336 : 
     594         4336 :         let index_part = IndexPart::new(
     595         4336 :             upload_queue.latest_files.clone(),
     596         4336 :             disk_consistent_lsn,
     597         4336 :             metadata,
     598         4336 :         );
     599         4336 :         let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
     600         4336 :         self.calls_unfinished_metric_begin(&op);
     601         4336 :         upload_queue.queued_operations.push_back(op);
     602         4336 :         upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
     603         4336 : 
     604         4336 :         // Launch the task immediately, if possible
     605         4336 :         self.launch_queued_tasks(upload_queue);
     606         4336 :     }
     607              : 
     608              :     ///
     609              :     /// Launch an upload operation in the background.
     610              :     ///
     611        12223 :     pub fn schedule_layer_file_upload(
     612        12223 :         self: &Arc<Self>,
     613        12223 :         layer_file_name: &LayerFileName,
     614        12223 :         layer_metadata: &LayerFileMetadata,
     615        12223 :     ) -> anyhow::Result<()> {
     616        12223 :         let mut guard = self.upload_queue.lock().unwrap();
     617        12223 :         let upload_queue = guard.initialized_mut()?;
     618              : 
     619        12223 :         upload_queue
     620        12223 :             .latest_files
     621        12223 :             .insert(layer_file_name.clone(), layer_metadata.clone());
     622        12223 :         upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
     623        12223 : 
     624        12223 :         let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone());
     625        12223 :         self.calls_unfinished_metric_begin(&op);
     626        12223 :         upload_queue.queued_operations.push_back(op);
     627        12223 : 
     628        12223 :         info!("scheduled layer file upload {layer_file_name}");
     629              : 
     630              :         // Launch the task immediately, if possible
     631        12223 :         self.launch_queued_tasks(upload_queue);
     632        12223 :         Ok(())
     633        12223 :     }
     634              : 
     635              :     /// Launch a delete operation in the background.
     636              :     ///
     637              :     /// The operation does not modify local state but assumes the local files have already been
     638              :     /// deleted, and is used to mirror those changes to remote.
     639              :     ///
     640              :     /// Note: This schedules an index file upload before the deletions.  The
     641              :     /// deletion won't actually be performed, until any previously scheduled
     642              :     /// upload operations, and the index file upload, have completed
     643              :     /// successfully.
     644          387 :     pub fn schedule_layer_file_deletion(
     645          387 :         self: &Arc<Self>,
     646          387 :         names: &[LayerFileName],
     647          387 :     ) -> anyhow::Result<()> {
     648          387 :         let mut guard = self.upload_queue.lock().unwrap();
     649          387 :         let upload_queue = guard.initialized_mut()?;
     650              : 
     651              :         // Deleting layers doesn't affect the values stored in TimelineMetadata,
     652              :         // so we don't need update it. Just serialize it.
     653          387 :         let metadata = upload_queue.latest_metadata.clone();
     654          387 : 
     655          387 :         // Update the remote index file, removing the to-be-deleted files from the index,
     656          387 :         // before deleting the actual files.
     657          387 :         //
     658          387 :         // Once we start removing files from upload_queue.latest_files, there's
     659          387 :         // no going back! Otherwise, some of the files would already be removed
     660          387 :         // from latest_files, but not yet scheduled for deletion. Use a closure
     661          387 :         // to syntactically forbid ? or bail! calls here.
     662          387 :         let no_bail_here = || {
     663          387 :             // Decorate our list of names with each name's generation, dropping
     664          387 :             // makes that are unexpectedly missing from our metadata.
     665          387 :             let with_generations: Vec<_> = names
     666          387 :                 .iter()
     667         3548 :                 .filter_map(|name| {
     668         3548 :                     // Remove from latest_files, learning the file's remote generation in the process
     669         3548 :                     let meta = upload_queue.latest_files.remove(name);
     670              : 
     671         3548 :                     if let Some(meta) = meta {
     672         3547 :                         upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
     673         3547 :                         Some((name, meta.generation))
     674              :                     } else {
     675              :                         // This can only happen if we forgot to to schedule the file upload
     676              :                         // before scheduling the delete. Log it because it is a rare/strange
     677              :                         // situation, and in case something is misbehaving, we'd like to know which
     678              :                         // layers experienced this.
     679            1 :                         info!(
     680            1 :                             "Deleting layer {name} not found in latest_files list, never uploaded?"
     681            1 :                         );
     682            1 :                         None
     683              :                     }
     684         3548 :                 })
     685          387 :                 .collect();
     686          387 : 
     687          387 :             if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
     688          216 :                 self.schedule_index_upload(upload_queue, metadata);
     689          216 :             }
     690              : 
     691              :             // schedule the actual deletions
     692         3934 :             for (name, generation) in with_generations {
     693         3547 :                 let op = UploadOp::Delete(Delete {
     694         3547 :                     file_kind: RemoteOpFileKind::Layer,
     695         3547 :                     layer_file_name: name.clone(),
     696         3547 :                     scheduled_from_timeline_delete: false,
     697         3547 :                     generation,
     698         3547 :                 });
     699         3547 :                 self.calls_unfinished_metric_begin(&op);
     700         3547 :                 upload_queue.queued_operations.push_back(op);
     701         3547 :                 info!("scheduled layer file deletion {name}");
     702              :             }
     703              : 
     704              :             // Launch the tasks immediately, if possible
     705          387 :             self.launch_queued_tasks(upload_queue);
     706          387 :         };
     707          387 :         no_bail_here();
     708          387 :         Ok(())
     709          387 :     }
     710              : 
     711              :     ///
     712              :     /// Wait for all previously scheduled uploads/deletions to complete
     713              :     ///
     714         1027 :     pub async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
     715         1027 :         let mut receiver = {
     716         1027 :             let mut guard = self.upload_queue.lock().unwrap();
     717         1027 :             let upload_queue = guard.initialized_mut()?;
     718         1027 :             self.schedule_barrier(upload_queue)
     719         1027 :         };
     720         1027 : 
     721         1027 :         if receiver.changed().await.is_err() {
     722            1 :             anyhow::bail!("wait_completion aborted because upload queue was stopped");
     723         1025 :         }
     724         1025 :         Ok(())
     725         1026 :     }
     726              : 
     727         1230 :     fn schedule_barrier(
     728         1230 :         self: &Arc<Self>,
     729         1230 :         upload_queue: &mut UploadQueueInitialized,
     730         1230 :     ) -> tokio::sync::watch::Receiver<()> {
     731         1230 :         let (sender, receiver) = tokio::sync::watch::channel(());
     732         1230 :         let barrier_op = UploadOp::Barrier(sender);
     733         1230 : 
     734         1230 :         upload_queue.queued_operations.push_back(barrier_op);
     735         1230 :         // Don't count this kind of operation!
     736         1230 : 
     737         1230 :         // Launch the task immediately, if possible
     738         1230 :         self.launch_queued_tasks(upload_queue);
     739         1230 : 
     740         1230 :         receiver
     741         1230 :     }
     742              : 
     743              :     /// Set the deleted_at field in the remote index file.
     744              :     ///
     745              :     /// This fails if the upload queue has not been `stop()`ed.
     746              :     ///
     747              :     /// The caller is responsible for calling `stop()` AND for waiting
     748              :     /// for any ongoing upload tasks to finish after `stop()` has succeeded.
     749              :     /// Check method [`RemoteTimelineClient::stop`] for details.
     750          802 :     #[instrument(skip_all)]
     751              :     pub(crate) async fn persist_index_part_with_deleted_flag(
     752              :         self: &Arc<Self>,
     753              :     ) -> Result<(), PersistIndexPartWithDeletedFlagError> {
     754              :         let index_part_with_deleted_at = {
     755              :             let mut locked = self.upload_queue.lock().unwrap();
     756              : 
     757              :             // We must be in stopped state because otherwise
     758              :             // we can have inprogress index part upload that can overwrite the file
     759              :             // with missing is_deleted flag that we going to set below
     760              :             let stopped = locked.stopped_mut()?;
     761              : 
     762              :             match stopped.deleted_at {
     763              :                 SetDeletedFlagProgress::NotRunning => (), // proceed
     764              :                 SetDeletedFlagProgress::InProgress(at) => {
     765              :                     return Err(PersistIndexPartWithDeletedFlagError::AlreadyInProgress(at));
     766              :                 }
     767              :                 SetDeletedFlagProgress::Successful(at) => {
     768              :                     return Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(at));
     769              :                 }
     770              :             };
     771              :             let deleted_at = Utc::now().naive_utc();
     772              :             stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
     773              : 
     774              :             let mut index_part = IndexPart::try_from(&stopped.upload_queue_for_deletion)
     775              :                 .context("IndexPart serialize")?;
     776              :             index_part.deleted_at = Some(deleted_at);
     777              :             index_part
     778              :         };
     779              : 
     780            0 :         let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
     781            0 :             let mut locked = self_clone.upload_queue.lock().unwrap();
     782            0 :             let stopped = locked
     783            0 :                 .stopped_mut()
     784            0 :                 .expect("there's no way out of Stopping, and we checked it's Stopping above");
     785            0 :             stopped.deleted_at = SetDeletedFlagProgress::NotRunning;
     786            0 :         });
     787              : 
     788          184 :         pausable_failpoint!("persist_deleted_index_part");
     789              : 
     790              :         backoff::retry(
     791          245 :             || {
     792          245 :                 upload::upload_index_part(
     793          245 :                     &self.storage_impl,
     794          245 :                     &self.tenant_id,
     795          245 :                     &self.timeline_id,
     796          245 :                     self.generation,
     797          245 :                     &index_part_with_deleted_at,
     798          245 :                 )
     799          245 :             },
     800           61 :             |_e| false,
     801              :             1,
     802              :             // have just a couple of attempts
     803              :             // when executed as part of timeline deletion this happens in context of api call
     804              :             // when executed as part of tenant deletion this happens in the background
     805              :             2,
     806              :             "persist_index_part_with_deleted_flag",
     807              :             // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
     808            0 :             backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
     809              :         )
     810              :         .await?;
     811              : 
     812              :         // all good, disarm the guard and mark as success
     813              :         ScopeGuard::into_inner(undo_deleted_at);
     814              :         {
     815              :             let mut locked = self.upload_queue.lock().unwrap();
     816              : 
     817              :             let stopped = locked
     818              :                 .stopped_mut()
     819              :                 .expect("there's no way out of Stopping, and we checked it's Stopping above");
     820              :             stopped.deleted_at = SetDeletedFlagProgress::Successful(
     821              :                 index_part_with_deleted_at
     822              :                     .deleted_at
     823              :                     .expect("we set it above"),
     824              :             );
     825              :         }
     826              : 
     827              :         Ok(())
     828              :     }
     829              : 
     830              :     /// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
     831              :     /// The function deletes layer files one by one, then lists the prefix to see if we leaked something
     832              :     /// deletes leaked files if any and proceeds with deletion of index file at the end.
     833          203 :     pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
     834          203 :         debug_assert_current_span_has_tenant_and_timeline_id();
     835              : 
     836          203 :         let (mut receiver, deletions_queued) = {
     837          203 :             let mut deletions_queued = 0;
     838          203 : 
     839          203 :             let mut locked = self.upload_queue.lock().unwrap();
     840          203 :             let stopped = locked.stopped_mut()?;
     841              : 
     842          203 :             if !matches!(stopped.deleted_at, SetDeletedFlagProgress::Successful(_)) {
     843            0 :                 anyhow::bail!("deleted_at is not set")
     844          203 :             }
     845              : 
     846          203 :             debug_assert!(stopped.upload_queue_for_deletion.no_pending_work());
     847              : 
     848          203 :             stopped
     849          203 :                 .upload_queue_for_deletion
     850          203 :                 .queued_operations
     851          203 :                 .reserve(stopped.upload_queue_for_deletion.latest_files.len());
     852              : 
     853              :             // schedule the actual deletions
     854         5573 :             for (name, meta) in &stopped.upload_queue_for_deletion.latest_files {
     855         5370 :                 let op = UploadOp::Delete(Delete {
     856         5370 :                     file_kind: RemoteOpFileKind::Layer,
     857         5370 :                     layer_file_name: name.clone(),
     858         5370 :                     scheduled_from_timeline_delete: true,
     859         5370 :                     generation: meta.generation,
     860         5370 :                 });
     861         5370 : 
     862         5370 :                 self.calls_unfinished_metric_begin(&op);
     863         5370 :                 stopped
     864         5370 :                     .upload_queue_for_deletion
     865         5370 :                     .queued_operations
     866         5370 :                     .push_back(op);
     867              : 
     868         5370 :                 info!("scheduled layer file deletion {name}");
     869         5370 :                 deletions_queued += 1;
     870              :             }
     871              : 
     872          203 :             self.launch_queued_tasks(&mut stopped.upload_queue_for_deletion);
     873          203 : 
     874          203 :             (
     875          203 :                 self.schedule_barrier(&mut stopped.upload_queue_for_deletion),
     876          203 :                 deletions_queued,
     877          203 :             )
     878          203 :         };
     879          203 : 
     880          203 :         receiver.changed().await.context("upload queue shut down")?;
     881              : 
     882              :         // Do not delete index part yet, it is needed for possible retry. If we remove it first
     883              :         // and retry will arrive to different pageserver there wont be any traces of it on remote storage
     884          203 :         let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id);
     885              : 
     886          203 :         let remaining = backoff::retry(
     887          267 :             || async {
     888          267 :                 self.storage_impl
     889          267 :                     .list_files(Some(&timeline_storage_path))
     890         1172 :                     .await
     891          267 :             },
     892          203 :             |_e| false,
     893          203 :             FAILED_DOWNLOAD_WARN_THRESHOLD,
     894          203 :             FAILED_REMOTE_OP_RETRIES,
     895          203 :             "list_prefixes",
     896          203 :             backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")),
     897          203 :         )
     898         1172 :         .await
     899          203 :         .context("list prefixes")?;
     900              : 
     901          203 :         let remaining: Vec<RemotePath> = remaining
     902          203 :             .into_iter()
     903         1086 :             .filter(|p| p.object_name() != Some(IndexPart::FILE_NAME))
     904          203 :             .inspect(|path| {
     905          891 :                 if let Some(name) = path.object_name() {
     906          891 :                     info!(%name, "deleting a file not referenced from index_part.json");
     907              :                 } else {
     908            0 :                     warn!(%path, "deleting a nameless or non-utf8 object not referenced from index_part.json");
     909              :                 }
     910          891 :             })
     911          203 :             .collect();
     912          203 : 
     913          203 :         if !remaining.is_empty() {
     914           46 :             backoff::retry(
     915          638 :                 || async { self.storage_impl.delete_objects(&remaining).await },
     916           46 :                 |_e| false,
     917           46 :                 FAILED_UPLOAD_WARN_THRESHOLD,
     918           46 :                 FAILED_REMOTE_OP_RETRIES,
     919           46 :                 "delete_objects",
     920           46 :                 backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")),
     921           46 :             )
     922          638 :             .await
     923           46 :             .context("delete_objects")?;
     924          157 :         }
     925              : 
     926          203 :         fail::fail_point!("timeline-delete-before-index-delete", |_| {
     927           12 :             Err(anyhow::anyhow!(
     928           12 :                 "failpoint: timeline-delete-before-index-delete"
     929           12 :             ))?
     930          203 :         });
     931              : 
     932          191 :         let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));
     933              : 
     934            0 :         debug!("deleting index part");
     935              : 
     936          191 :         backoff::retry(
     937          729 :             || async { self.storage_impl.delete(&index_file_path).await },
     938          191 :             |_e| false,
     939          191 :             FAILED_UPLOAD_WARN_THRESHOLD,
     940          191 :             FAILED_REMOTE_OP_RETRIES,
     941          191 :             "delete_index",
     942          191 :             backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled")),
     943          191 :         )
     944          729 :         .await
     945          191 :         .context("delete_index")?;
     946              : 
     947          191 :         fail::fail_point!("timeline-delete-after-index-delete", |_| {
     948            4 :             Err(anyhow::anyhow!(
     949            4 :                 "failpoint: timeline-delete-after-index-delete"
     950            4 :             ))?
     951          191 :         });
     952              : 
     953          187 :         info!(prefix=%timeline_storage_path, referenced=deletions_queued, not_referenced=%remaining.len(), "done deleting in timeline prefix, including index_part.json");
     954              : 
     955          187 :         Ok(())
     956          203 :     }
     957              : 
     958              :     ///
     959              :     /// Pick next tasks from the queue, and start as many of them as possible without violating
     960              :     /// the ordering constraints.
     961              :     ///
     962              :     /// The caller needs to already hold the `upload_queue` lock.
     963        41305 :     fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
     964        67122 :         while let Some(next_op) = upload_queue.queued_operations.front() {
     965              :             // Can we run this task now?
     966        46320 :             let can_run_now = match next_op {
     967              :                 UploadOp::UploadLayer(_, _) => {
     968              :                     // Can always be scheduled.
     969        12215 :                     true
     970              :                 }
     971              :                 UploadOp::UploadMetadata(_, _) => {
     972              :                     // These can only be performed after all the preceding operations
     973              :                     // have finished.
     974        18365 :                     upload_queue.inprogress_tasks.is_empty()
     975              :                 }
     976              :                 UploadOp::Delete(_) => {
     977              :                     // Wait for preceding uploads to finish. Concurrent deletions are OK, though.
     978         8283 :                     upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
     979              :                 }
     980              : 
     981         7457 :                 UploadOp::Barrier(_) => upload_queue.inprogress_tasks.is_empty(),
     982              :             };
     983              : 
     984              :             // If we cannot launch this task, don't look any further.
     985              :             //
     986              :             // In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
     987              :             // them now, but we don't try to do that currently.  For example, if the frontmost task
     988              :             // is an index-file upload that cannot proceed until preceding uploads have finished, we
     989              :             // could still start layer uploads that were scheduled later.
     990        46320 :             if !can_run_now {
     991        20503 :                 break;
     992        25817 :             }
     993        25817 : 
     994        25817 :             // We can launch this task. Remove it from the queue first.
     995        25817 :             let next_op = upload_queue.queued_operations.pop_front().unwrap();
     996        25817 : 
     997        25817 :             debug!("starting op: {}", next_op);
     998              : 
     999              :             // Update the counters
    1000        25817 :             match next_op {
    1001        12215 :                 UploadOp::UploadLayer(_, _) => {
    1002        12215 :                     upload_queue.num_inprogress_layer_uploads += 1;
    1003        12215 :                 }
    1004         4280 :                 UploadOp::UploadMetadata(_, _) => {
    1005         4280 :                     upload_queue.num_inprogress_metadata_uploads += 1;
    1006         4280 :                 }
    1007         8094 :                 UploadOp::Delete(_) => {
    1008         8094 :                     upload_queue.num_inprogress_deletions += 1;
    1009         8094 :                 }
    1010         1228 :                 UploadOp::Barrier(sender) => {
    1011         1228 :                     sender.send_replace(());
    1012         1228 :                     continue;
    1013              :                 }
    1014              :             };
    1015              : 
    1016              :             // Assign unique ID to this task
    1017        24589 :             upload_queue.task_counter += 1;
    1018        24589 :             let upload_task_id = upload_queue.task_counter;
    1019        24589 : 
    1020        24589 :             // Add it to the in-progress map
    1021        24589 :             let task = Arc::new(UploadTask {
    1022        24589 :                 task_id: upload_task_id,
    1023        24589 :                 op: next_op,
    1024        24589 :                 retries: AtomicU32::new(0),
    1025        24589 :             });
    1026        24589 :             upload_queue
    1027        24589 :                 .inprogress_tasks
    1028        24589 :                 .insert(task.task_id, Arc::clone(&task));
    1029        24589 : 
    1030        24589 :             // Spawn task to perform the task
    1031        24589 :             let self_rc = Arc::clone(self);
    1032        24589 :             let tenant_id = self.tenant_id;
    1033        24589 :             let timeline_id = self.timeline_id;
    1034        24589 :             task_mgr::spawn(
    1035        24589 :                 &self.runtime,
    1036        24589 :                 TaskKind::RemoteUploadTask,
    1037        24589 :                 Some(self.tenant_id),
    1038        24589 :                 Some(self.timeline_id),
    1039        24589 :                 "remote upload",
    1040              :                 false,
    1041        24588 :                 async move {
    1042      1386546 :                     self_rc.perform_upload_task(task).await;
    1043        24576 :                     Ok(())
    1044        24576 :                 }
    1045        24589 :                 .instrument(info_span!(parent: None, "remote_upload", %tenant_id, %timeline_id, %upload_task_id)),
    1046              :             );
    1047              : 
    1048              :             // Loop back to process next task
    1049              :         }
    1050        41305 :     }
    1051              : 
    1052              :     ///
    1053              :     /// Perform an upload task.
    1054              :     ///
    1055              :     /// The task is in the `inprogress_tasks` list. This function will try to
    1056              :     /// execute it, retrying forever. On successful completion, the task is
    1057              :     /// removed it from the `inprogress_tasks` list, and any next task(s) in the
    1058              :     /// queue that were waiting by the completion are launched.
    1059              :     ///
    1060              :     /// The task can be shut down, however. That leads to stopping the whole
    1061              :     /// queue.
    1062              :     ///
    1063        24588 :     async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
    1064              :         // Loop to retry until it completes.
    1065        29222 :         loop {
    1066        29222 :             // If we're requested to shut down, close up shop and exit.
    1067        29222 :             //
    1068        29222 :             // Note: We only check for the shutdown requests between retries, so
    1069        29222 :             // if a shutdown request arrives while we're busy uploading, in the
    1070        29222 :             // upload::upload:*() call below, we will wait not exit until it has
    1071        29222 :             // finished. We probably could cancel the upload by simply dropping
    1072        29222 :             // the Future, but we're not 100% sure if the remote storage library
    1073        29222 :             // is cancellation safe, so we don't dare to do that. Hopefully, the
    1074        29222 :             // upload finishes or times out soon enough.
    1075        29222 :             if task_mgr::is_shutdown_requested() {
    1076          109 :                 info!("upload task cancelled by shutdown request");
    1077          109 :                 match self.stop() {
    1078          109 :                     Ok(()) => {}
    1079              :                     Err(StopError::QueueUninitialized) => {
    1080            0 :                         unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
    1081              :                     }
    1082              :                 }
    1083          109 :                 return;
    1084        29113 :             }
    1085              : 
    1086        29113 :             let upload_result: anyhow::Result<()> = match &task.op {
    1087        14125 :                 UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
    1088        14125 :                     let path = self
    1089        14125 :                         .conf
    1090        14125 :                         .timeline_path(&self.tenant_id, &self.timeline_id)
    1091        14125 :                         .join(layer_file_name.file_name());
    1092        14125 : 
    1093        14125 :                     upload::upload_timeline_layer(
    1094        14125 :                         self.conf,
    1095        14125 :                         &self.storage_impl,
    1096        14125 :                         &path,
    1097        14125 :                         layer_metadata,
    1098        14125 :                         self.generation,
    1099        14125 :                     )
    1100        14125 :                     .measure_remote_op(
    1101        14125 :                         self.tenant_id,
    1102        14125 :                         self.timeline_id,
    1103        14125 :                         RemoteOpFileKind::Layer,
    1104        14125 :                         RemoteOpKind::Upload,
    1105        14125 :                         Arc::clone(&self.metrics),
    1106        14125 :                     )
    1107      1341928 :                     .await
    1108              :                 }
    1109         5001 :                 UploadOp::UploadMetadata(ref index_part, _lsn) => {
    1110         5001 :                     let mention_having_future_layers = if cfg!(feature = "testing") {
    1111         5001 :                         index_part
    1112         5001 :                             .layer_metadata
    1113         5001 :                             .keys()
    1114       339100 :                             .any(|x| x.is_in_future(*_lsn))
    1115              :                     } else {
    1116            0 :                         false
    1117              :                     };
    1118              : 
    1119         5001 :                     let res = upload::upload_index_part(
    1120         5001 :                         &self.storage_impl,
    1121         5001 :                         &self.tenant_id,
    1122         5001 :                         &self.timeline_id,
    1123         5001 :                         self.generation,
    1124         5001 :                         index_part,
    1125         5001 :                     )
    1126         5001 :                     .measure_remote_op(
    1127         5001 :                         self.tenant_id,
    1128         5001 :                         self.timeline_id,
    1129         5001 :                         RemoteOpFileKind::Index,
    1130         5001 :                         RemoteOpKind::Upload,
    1131         5001 :                         Arc::clone(&self.metrics),
    1132         5001 :                     )
    1133        13983 :                     .await;
    1134         4999 :                     if res.is_ok() {
    1135         4276 :                         self.update_remote_physical_size_gauge(Some(index_part));
    1136         4276 :                         if mention_having_future_layers {
    1137              :                             // find rationale near crate::tenant::timeline::init::cleanup_future_layer
    1138            5 :                             tracing::info!(disk_consistent_lsn=%_lsn, "uploaded an index_part.json with future layers -- this is ok! if shutdown now, expect future layer cleanup");
    1139         4271 :                         }
    1140          723 :                     }
    1141         4999 :                     res
    1142              :                 }
    1143         9987 :                 UploadOp::Delete(delete) => {
    1144         9987 :                     let path = &self
    1145         9987 :                         .conf
    1146         9987 :                         .timeline_path(&self.tenant_id, &self.timeline_id)
    1147         9987 :                         .join(delete.layer_file_name.file_name());
    1148         9987 :                     delete::delete_layer(self.conf, &self.storage_impl, path, delete.generation)
    1149         9987 :                         .measure_remote_op(
    1150         9987 :                             self.tenant_id,
    1151         9987 :                             self.timeline_id,
    1152         9987 :                             delete.file_kind,
    1153         9987 :                             RemoteOpKind::Delete,
    1154         9987 :                             Arc::clone(&self.metrics),
    1155         9987 :                         )
    1156        30629 :                         .await
    1157              :                 }
    1158              :                 UploadOp::Barrier(_) => {
    1159              :                     // unreachable. Barrier operations are handled synchronously in
    1160              :                     // launch_queued_tasks
    1161            0 :                     warn!("unexpected Barrier operation in perform_upload_task");
    1162            0 :                     break;
    1163              :                 }
    1164              :             };
    1165              : 
    1166        29102 :             match upload_result {
    1167              :                 Ok(()) => {
    1168        24467 :                     break;
    1169              :                 }
    1170         4635 :                 Err(e) => {
    1171         4635 :                     let retries = task.retries.fetch_add(1, Ordering::SeqCst);
    1172         4635 : 
    1173         4635 :                     // Uploads can fail due to rate limits (IAM, S3), spurious network problems,
    1174         4635 :                     // or other external reasons. Such issues are relatively regular, so log them
    1175         4635 :                     // at info level at first, and only WARN if the operation fails repeatedly.
    1176         4635 :                     //
    1177         4635 :                     // (See similar logic for downloads in `download::download_retry`)
    1178         4635 :                     if retries < FAILED_UPLOAD_WARN_THRESHOLD {
    1179         4633 :                         info!(
    1180         4633 :                             "failed to perform remote task {}, will retry (attempt {}): {:#}",
    1181         4633 :                             task.op, retries, e
    1182         4633 :                         );
    1183              :                     } else {
    1184            2 :                         warn!(
    1185            2 :                             "failed to perform remote task {}, will retry (attempt {}): {:?}",
    1186            2 :                             task.op, retries, e
    1187            2 :                         );
    1188              :                     }
    1189              : 
    1190              :                     // sleep until it's time to retry, or we're cancelled
    1191         4635 :                     exponential_backoff(
    1192         4635 :                         retries,
    1193         4635 :                         DEFAULT_BASE_BACKOFF_SECONDS,
    1194         4635 :                         DEFAULT_MAX_BACKOFF_SECONDS,
    1195         4635 :                         &shutdown_token(),
    1196         4635 :                     )
    1197            6 :                     .await;
    1198              :                 }
    1199              :             }
    1200              :         }
    1201              : 
    1202        24467 :         let retries = task.retries.load(Ordering::SeqCst);
    1203        24467 :         if retries > 0 {
    1204         4517 :             info!(
    1205         4517 :                 "remote task {} completed successfully after {} retries",
    1206         4517 :                 task.op, retries
    1207         4517 :             );
    1208              :         } else {
    1209            0 :             debug!("remote task {} completed successfully", task.op);
    1210              :         }
    1211              : 
    1212              :         // The task has completed successfully. Remove it from the in-progress list.
    1213              :         {
    1214        24467 :             let mut upload_queue_guard = self.upload_queue.lock().unwrap();
    1215        24467 :             let upload_queue = match upload_queue_guard.deref_mut() {
    1216            0 :                 UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
    1217         6911 :                 UploadQueue::Stopped(stopped) => {
    1218         6911 :                     // Special care is needed for deletions, if it was an earlier deletion (not scheduled from deletion)
    1219         6911 :                     // then stop() took care of it so we just return.
    1220         6911 :                     // For deletions that come from delete_all we still want to maintain metrics, launch following tasks, etc.
    1221         6911 :                     match &task.op {
    1222         5440 :                         UploadOp::Delete(delete) if delete.scheduled_from_timeline_delete => Some(&mut stopped.upload_queue_for_deletion),
    1223         1541 :                         _ => None
    1224              :                     }
    1225              :                 },
    1226        17556 :                 UploadQueue::Initialized(qi) => { Some(qi) }
    1227              :             };
    1228              : 
    1229        24467 :             let upload_queue = match upload_queue {
    1230        22926 :                 Some(upload_queue) => upload_queue,
    1231              :                 None => {
    1232         1541 :                     info!("another concurrent task already stopped the queue");
    1233         1541 :                     return;
    1234              :                 }
    1235              :             };
    1236              : 
    1237        22926 :             upload_queue.inprogress_tasks.remove(&task.task_id);
    1238        22926 : 
    1239        22926 :             match task.op {
    1240        10742 :                 UploadOp::UploadLayer(_, _) => {
    1241        10742 :                     upload_queue.num_inprogress_layer_uploads -= 1;
    1242        10742 :                 }
    1243         4268 :                 UploadOp::UploadMetadata(_, lsn) => {
    1244         4268 :                     upload_queue.num_inprogress_metadata_uploads -= 1;
    1245         4268 :                     upload_queue.last_uploaded_consistent_lsn = lsn; // XXX monotonicity check?
    1246         4268 :                 }
    1247         7916 :                 UploadOp::Delete(_) => {
    1248         7916 :                     upload_queue.num_inprogress_deletions -= 1;
    1249         7916 :                 }
    1250            0 :                 UploadOp::Barrier(_) => unreachable!(),
    1251              :             };
    1252              : 
    1253              :             // Launch any queued tasks that were unblocked by this one.
    1254        22926 :             self.launch_queued_tasks(upload_queue);
    1255        22926 :         }
    1256        22926 :         self.calls_unfinished_metric_end(&task.op);
    1257        24576 :     }
    1258              : 
    1259        49278 :     fn calls_unfinished_metric_impl(
    1260        49278 :         &self,
    1261        49278 :         op: &UploadOp,
    1262        49278 :     ) -> Option<(
    1263        49278 :         RemoteOpFileKind,
    1264        49278 :         RemoteOpKind,
    1265        49278 :         RemoteTimelineClientMetricsCallTrackSize,
    1266        49278 :     )> {
    1267              :         use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
    1268        49278 :         let res = match op {
    1269        22973 :             UploadOp::UploadLayer(_, m) => (
    1270        22973 :                 RemoteOpFileKind::Layer,
    1271        22973 :                 RemoteOpKind::Upload,
    1272        22973 :                 RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
    1273        22973 :             ),
    1274         8651 :             UploadOp::UploadMetadata(_, _) => (
    1275         8651 :                 RemoteOpFileKind::Index,
    1276         8651 :                 RemoteOpKind::Upload,
    1277         8651 :                 DontTrackSize {
    1278         8651 :                     reason: "metadata uploads are tiny",
    1279         8651 :                 },
    1280         8651 :             ),
    1281        17653 :             UploadOp::Delete(delete) => (
    1282        17653 :                 delete.file_kind,
    1283        17653 :                 RemoteOpKind::Delete,
    1284        17653 :                 DontTrackSize {
    1285        17653 :                     reason: "should we track deletes? positive or negative sign?",
    1286        17653 :                 },
    1287        17653 :             ),
    1288              :             UploadOp::Barrier(_) => {
    1289              :                 // we do not account these
    1290            1 :                 return None;
    1291              :             }
    1292              :         };
    1293        49277 :         Some(res)
    1294        49278 :     }
    1295              : 
    1296        25476 :     fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
    1297        25476 :         let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
    1298        25476 :             Some(x) => x,
    1299            0 :             None => return,
    1300              :         };
    1301        25476 :         let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
    1302        25476 :         guard.will_decrement_manually(); // in unfinished_ops_metric_end()
    1303        25476 :     }
    1304              : 
    1305        23802 :     fn calls_unfinished_metric_end(&self, op: &UploadOp) {
    1306        23802 :         let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
    1307        23801 :             Some(x) => x,
    1308            1 :             None => return,
    1309              :         };
    1310        23801 :         self.metrics.call_end(&file_kind, &op_kind, track_bytes);
    1311        23802 :     }
    1312              : 
    1313              :     /// Close the upload queue for new operations and cancel queued operations.
    1314              :     /// In-progress operations will still be running after this function returns.
    1315              :     /// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
    1316              :     /// to wait for them to complete, after calling this function.
    1317          344 :     pub fn stop(&self) -> Result<(), StopError> {
    1318          344 :         // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
    1319          344 :         // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
    1320          344 :         // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
    1321          344 :         let mut guard = self.upload_queue.lock().unwrap();
    1322          344 :         match &mut *guard {
    1323            0 :             UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
    1324              :             UploadQueue::Stopped(_) => {
    1325              :                 // nothing to do
    1326          135 :                 info!("another concurrent task already shut down the queue");
    1327          135 :                 Ok(())
    1328              :             }
    1329          209 :             UploadQueue::Initialized(initialized) => {
    1330          209 :                 info!("shutting down upload queue");
    1331              : 
    1332              :                 // Replace the queue with the Stopped state, taking ownership of the old
    1333              :                 // Initialized queue. We will do some checks on it, and then drop it.
    1334          209 :                 let qi = {
    1335              :                     // Here we preserve working version of the upload queue for possible use during deletions.
    1336              :                     // In-place replace of Initialized to Stopped can be done with the help of https://github.com/Sgeo/take_mut
    1337              :                     // but for this use case it doesnt really makes sense to bring unsafe code only for this usage point.
    1338              :                     // Deletion is not really perf sensitive so there shouldnt be any problems with cloning a fraction of it.
    1339          209 :                     let upload_queue_for_deletion = UploadQueueInitialized {
    1340          209 :                         task_counter: 0,
    1341          209 :                         latest_files: initialized.latest_files.clone(),
    1342          209 :                         latest_files_changes_since_metadata_upload_scheduled: 0,
    1343          209 :                         latest_metadata: initialized.latest_metadata.clone(),
    1344          209 :                         last_uploaded_consistent_lsn: initialized.last_uploaded_consistent_lsn,
    1345          209 :                         num_inprogress_layer_uploads: 0,
    1346          209 :                         num_inprogress_metadata_uploads: 0,
    1347          209 :                         num_inprogress_deletions: 0,
    1348          209 :                         inprogress_tasks: HashMap::default(),
    1349          209 :                         queued_operations: VecDeque::default(),
    1350          209 :                     };
    1351          209 : 
    1352          209 :                     let upload_queue = std::mem::replace(
    1353          209 :                         &mut *guard,
    1354          209 :                         UploadQueue::Stopped(UploadQueueStopped {
    1355          209 :                             upload_queue_for_deletion,
    1356          209 :                             deleted_at: SetDeletedFlagProgress::NotRunning,
    1357          209 :                         }),
    1358          209 :                     );
    1359          209 :                     if let UploadQueue::Initialized(qi) = upload_queue {
    1360          209 :                         qi
    1361              :                     } else {
    1362            0 :                         unreachable!("we checked in the match above that it is Initialized");
    1363              :                     }
    1364              :                 };
    1365              : 
    1366              :                 // consistency check
    1367          209 :                 assert_eq!(
    1368          209 :                     qi.num_inprogress_layer_uploads
    1369          209 :                         + qi.num_inprogress_metadata_uploads
    1370          209 :                         + qi.num_inprogress_deletions,
    1371          209 :                     qi.inprogress_tasks.len()
    1372          209 :                 );
    1373              : 
    1374              :                 // We don't need to do anything here for in-progress tasks. They will finish
    1375              :                 // on their own, decrement the unfinished-task counter themselves, and observe
    1376              :                 // that the queue is Stopped.
    1377          209 :                 drop(qi.inprogress_tasks);
    1378              : 
    1379              :                 // Tear down queued ops
    1380          876 :                 for op in qi.queued_operations.into_iter() {
    1381          876 :                     self.calls_unfinished_metric_end(&op);
    1382          876 :                     // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
    1383          876 :                     // which is exactly what we want to happen.
    1384          876 :                     drop(op);
    1385          876 :                 }
    1386              : 
    1387              :                 // We're done.
    1388          209 :                 drop(guard);
    1389          209 :                 Ok(())
    1390              :             }
    1391              :         }
    1392          344 :     }
    1393              : }
    1394              : 
    1395          245 : pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
    1396          245 :     let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
    1397          245 :     RemotePath::from_string(&path).expect("Failed to construct path")
    1398          245 : }
    1399              : 
    1400          203 : pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
    1401          203 :     remote_timelines_path(tenant_id).join(&PathBuf::from(timeline_id.to_string()))
    1402          203 : }
    1403              : 
    1404         1043 : pub fn remote_layer_path(
    1405         1043 :     tenant_id: &TenantId,
    1406         1043 :     timeline_id: &TimelineId,
    1407         1043 :     layer_file_name: &LayerFileName,
    1408         1043 :     layer_meta: &LayerFileMetadata,
    1409         1043 : ) -> RemotePath {
    1410         1043 :     // Generation-aware key format
    1411         1043 :     let path = format!(
    1412         1043 :         "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
    1413         1043 :         layer_file_name.file_name(),
    1414         1043 :         layer_meta.generation.get_suffix()
    1415         1043 :     );
    1416         1043 : 
    1417         1043 :     RemotePath::from_string(&path).expect("Failed to construct path")
    1418         1043 : }
    1419              : 
    1420         5443 : pub fn remote_index_path(
    1421         5443 :     tenant_id: &TenantId,
    1422         5443 :     timeline_id: &TimelineId,
    1423         5443 :     generation: Generation,
    1424         5443 : ) -> RemotePath {
    1425         5443 :     RemotePath::from_string(&format!(
    1426         5443 :         "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
    1427         5443 :         IndexPart::FILE_NAME,
    1428         5443 :         generation.get_suffix()
    1429         5443 :     ))
    1430         5443 :     .expect("Failed to construct path")
    1431         5443 : }
    1432              : 
    1433              : /// Files on the remote storage are stored with paths, relative to the workdir.
    1434              : /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
    1435              : ///
    1436              : /// Errors if the path provided does not start from pageserver's workdir.
    1437        24108 : pub fn remote_path(
    1438        24108 :     conf: &PageServerConf,
    1439        24108 :     local_path: &Path,
    1440        24108 :     generation: Generation,
    1441        24108 : ) -> anyhow::Result<RemotePath> {
    1442        24108 :     let stripped = local_path
    1443        24108 :         .strip_prefix(&conf.workdir)
    1444        24108 :         .context("Failed to strip workdir prefix")?;
    1445              : 
    1446        24108 :     let suffixed = format!(
    1447        24108 :         "{0}{1}",
    1448        24108 :         stripped.to_string_lossy(),
    1449        24108 :         generation.get_suffix()
    1450        24108 :     );
    1451        24108 : 
    1452        24108 :     RemotePath::new(&PathBuf::from(suffixed)).with_context(|| {
    1453            0 :         format!(
    1454            0 :             "to resolve remote part of path {:?} for base {:?}",
    1455            0 :             local_path, conf.workdir
    1456            0 :         )
    1457        24108 :     })
    1458        24108 : }
    1459              : 
    1460              : #[cfg(test)]
    1461              : mod tests {
    1462              :     use super::*;
    1463              :     use crate::{
    1464              :         context::RequestContext,
    1465              :         tenant::{
    1466              :             harness::{TenantHarness, TIMELINE_ID},
    1467              :             Generation, Tenant, Timeline,
    1468              :         },
    1469              :         DEFAULT_PG_VERSION,
    1470              :     };
    1471              : 
    1472              :     use std::{collections::HashSet, path::Path};
    1473              :     use utils::lsn::Lsn;
    1474              : 
    1475            4 :     pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
    1476            4 :         format!("contents for {name}").into()
    1477            4 :     }
    1478              : 
    1479            1 :     pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
    1480            1 :         let metadata = TimelineMetadata::new(
    1481            1 :             disk_consistent_lsn,
    1482            1 :             None,
    1483            1 :             None,
    1484            1 :             Lsn(0),
    1485            1 :             Lsn(0),
    1486            1 :             Lsn(0),
    1487            1 :             // Any version will do
    1488            1 :             // but it should be consistent with the one in the tests
    1489            1 :             crate::DEFAULT_PG_VERSION,
    1490            1 :         );
    1491            1 : 
    1492            1 :         // go through serialize + deserialize to fix the header, including checksum
    1493            1 :         TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
    1494            1 :     }
    1495              : 
    1496            1 :     fn assert_file_list(a: &HashSet<LayerFileName>, b: &[&str]) {
    1497            3 :         let mut avec: Vec<String> = a.iter().map(|x| x.file_name()).collect();
    1498            1 :         avec.sort();
    1499            1 : 
    1500            1 :         let mut bvec = b.to_vec();
    1501            1 :         bvec.sort_unstable();
    1502            1 : 
    1503            1 :         assert_eq!(avec, bvec);
    1504            1 :     }
    1505              : 
    1506            2 :     fn assert_remote_files(expected: &[&str], remote_path: &Path, generation: Generation) {
    1507            2 :         let mut expected: Vec<String> = expected
    1508            2 :             .iter()
    1509            8 :             .map(|x| format!("{}{}", x, generation.get_suffix()))
    1510            2 :             .collect();
    1511            2 :         expected.sort();
    1512            2 : 
    1513            2 :         let mut found: Vec<String> = Vec::new();
    1514            8 :         for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
    1515            8 :             let entry_name = entry.file_name();
    1516            8 :             let fname = entry_name.to_str().unwrap();
    1517            8 :             found.push(String::from(fname));
    1518            8 :         }
    1519            2 :         found.sort();
    1520            2 : 
    1521            2 :         assert_eq!(found, expected);
    1522            2 :     }
    1523              : 
    1524              :     struct TestSetup {
    1525              :         harness: TenantHarness,
    1526              :         tenant: Arc<Tenant>,
    1527              :         timeline: Arc<Timeline>,
    1528              :         tenant_ctx: RequestContext,
    1529              :     }
    1530              : 
    1531              :     impl TestSetup {
    1532            2 :         async fn new(test_name: &str) -> anyhow::Result<Self> {
    1533            2 :             // Use a current-thread runtime in the test
    1534            2 :             let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
    1535            2 :             let harness = TenantHarness::create(test_name)?;
    1536            2 :             let (tenant, ctx) = harness.load().await;
    1537              : 
    1538            2 :             let timeline = tenant
    1539            2 :                 .create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
    1540            4 :                 .await?;
    1541              : 
    1542            2 :             Ok(Self {
    1543            2 :                 harness,
    1544            2 :                 tenant,
    1545            2 :                 timeline,
    1546            2 :                 tenant_ctx: ctx,
    1547            2 :             })
    1548            2 :         }
    1549              :     }
    1550              : 
    1551              :     // Test scheduling
    1552            1 :     #[tokio::test]
    1553            1 :     async fn upload_scheduling() {
    1554              :         // Test outline:
    1555              :         //
    1556              :         // Schedule upload of a bunch of layers. Check that they are started immediately, not queued
    1557              :         // Schedule upload of index. Check that it is queued
    1558              :         // let the layer file uploads finish. Check that the index-upload is now started
    1559              :         // let the index-upload finish.
    1560              :         //
    1561              :         // Download back the index.json. Check that the list of files is correct
    1562              :         //
    1563              :         // Schedule upload. Schedule deletion. Check that the deletion is queued
    1564              :         // let upload finish. Check that deletion is now started
    1565              :         // Schedule another deletion. Check that it's launched immediately.
    1566              :         // Schedule index upload. Check that it's queued
    1567              : 
    1568              :         let TestSetup {
    1569            1 :             harness,
    1570            1 :             tenant: _tenant,
    1571            1 :             timeline,
    1572            1 :             tenant_ctx: _tenant_ctx,
    1573            3 :         } = TestSetup::new("upload_scheduling").await.unwrap();
    1574            1 : 
    1575            1 :         let client = timeline.remote_client.as_ref().unwrap();
    1576              : 
    1577              :         // Download back the index.json, and check that the list of files is correct
    1578            3 :         let initial_index_part = match client.download_index_file().await.unwrap() {
    1579            1 :             MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
    1580            0 :             MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
    1581              :         };
    1582            1 :         let initial_layers = initial_index_part
    1583            1 :             .layer_metadata
    1584            1 :             .keys()
    1585            1 :             .map(|f| f.to_owned())
    1586            1 :             .collect::<HashSet<LayerFileName>>();
    1587            1 :         let initial_layer = {
    1588            1 :             assert!(initial_layers.len() == 1);
    1589            1 :             initial_layers.into_iter().next().unwrap()
    1590            1 :         };
    1591            1 : 
    1592            1 :         let timeline_path = harness.timeline_path(&TIMELINE_ID);
    1593            1 : 
    1594            1 :         println!("workdir: {}", harness.conf.workdir.display());
    1595            1 : 
    1596            1 :         let remote_timeline_dir = harness
    1597            1 :             .remote_fs_dir
    1598            1 :             .join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
    1599            1 :         println!("remote_timeline_dir: {}", remote_timeline_dir.display());
    1600            1 : 
    1601            1 :         let generation = harness.generation;
    1602            1 : 
    1603            1 :         // Create a couple of dummy files,  schedule upload for them
    1604            1 :         let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
    1605            1 :         let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
    1606            1 :         let layer_file_name_3: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap();
    1607            1 :         let content_1 = dummy_contents("foo");
    1608            1 :         let content_2 = dummy_contents("bar");
    1609            1 :         let content_3 = dummy_contents("baz");
    1610              : 
    1611            3 :         for (filename, content) in [
    1612            1 :             (&layer_file_name_1, &content_1),
    1613            1 :             (&layer_file_name_2, &content_2),
    1614            1 :             (&layer_file_name_3, &content_3),
    1615            3 :         ] {
    1616            3 :             std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
    1617            3 :         }
    1618              : 
    1619            1 :         client
    1620            1 :             .schedule_layer_file_upload(
    1621            1 :                 &layer_file_name_1,
    1622            1 :                 &LayerFileMetadata::new(content_1.len() as u64, generation),
    1623            1 :             )
    1624            1 :             .unwrap();
    1625            1 :         client
    1626            1 :             .schedule_layer_file_upload(
    1627            1 :                 &layer_file_name_2,
    1628            1 :                 &LayerFileMetadata::new(content_2.len() as u64, generation),
    1629            1 :             )
    1630            1 :             .unwrap();
    1631            1 : 
    1632            1 :         // Check that they are started immediately, not queued
    1633            1 :         //
    1634            1 :         // this works because we running within block_on, so any futures are now queued up until
    1635            1 :         // our next await point.
    1636            1 :         {
    1637            1 :             let mut guard = client.upload_queue.lock().unwrap();
    1638            1 :             let upload_queue = guard.initialized_mut().unwrap();
    1639            1 :             assert!(upload_queue.queued_operations.is_empty());
    1640            1 :             assert!(upload_queue.inprogress_tasks.len() == 2);
    1641            1 :             assert!(upload_queue.num_inprogress_layer_uploads == 2);
    1642              : 
    1643              :             // also check that `latest_file_changes` was updated
    1644            1 :             assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
    1645              :         }
    1646              : 
    1647              :         // Schedule upload of index. Check that it is queued
    1648            1 :         let metadata = dummy_metadata(Lsn(0x20));
    1649            1 :         client
    1650            1 :             .schedule_index_upload_for_metadata_update(&metadata)
    1651            1 :             .unwrap();
    1652            1 :         {
    1653            1 :             let mut guard = client.upload_queue.lock().unwrap();
    1654            1 :             let upload_queue = guard.initialized_mut().unwrap();
    1655            1 :             assert!(upload_queue.queued_operations.len() == 1);
    1656            1 :             assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
    1657              :         }
    1658              : 
    1659              :         // Wait for the uploads to finish
    1660            1 :         client.wait_completion().await.unwrap();
    1661            1 :         {
    1662            1 :             let mut guard = client.upload_queue.lock().unwrap();
    1663            1 :             let upload_queue = guard.initialized_mut().unwrap();
    1664            1 : 
    1665            1 :             assert!(upload_queue.queued_operations.is_empty());
    1666            1 :             assert!(upload_queue.inprogress_tasks.is_empty());
    1667              :         }
    1668              : 
    1669              :         // Download back the index.json, and check that the list of files is correct
    1670            3 :         let index_part = match client.download_index_file().await.unwrap() {
    1671            1 :             MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
    1672            0 :             MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
    1673              :         };
    1674              : 
    1675            1 :         assert_file_list(
    1676            1 :             &index_part
    1677            1 :                 .layer_metadata
    1678            1 :                 .keys()
    1679            3 :                 .map(|f| f.to_owned())
    1680            1 :                 .collect(),
    1681            1 :             &[
    1682            1 :                 &initial_layer.file_name(),
    1683            1 :                 &layer_file_name_1.file_name(),
    1684            1 :                 &layer_file_name_2.file_name(),
    1685            1 :             ],
    1686            1 :         );
    1687            1 :         assert_eq!(index_part.metadata, metadata);
    1688              : 
    1689              :         // Schedule upload and then a deletion. Check that the deletion is queued
    1690            1 :         client
    1691            1 :             .schedule_layer_file_upload(
    1692            1 :                 &layer_file_name_3,
    1693            1 :                 &LayerFileMetadata::new(content_3.len() as u64, generation),
    1694            1 :             )
    1695            1 :             .unwrap();
    1696            1 :         client
    1697            1 :             .schedule_layer_file_deletion(&[layer_file_name_1.clone()])
    1698            1 :             .unwrap();
    1699            1 :         {
    1700            1 :             let mut guard = client.upload_queue.lock().unwrap();
    1701            1 :             let upload_queue = guard.initialized_mut().unwrap();
    1702            1 : 
    1703            1 :             // Deletion schedules upload of the index file, and the file deletion itself
    1704            1 :             assert!(upload_queue.queued_operations.len() == 2);
    1705            1 :             assert!(upload_queue.inprogress_tasks.len() == 1);
    1706            1 :             assert!(upload_queue.num_inprogress_layer_uploads == 1);
    1707            1 :             assert!(upload_queue.num_inprogress_deletions == 0);
    1708            1 :             assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
    1709              :         }
    1710            1 :         assert_remote_files(
    1711            1 :             &[
    1712            1 :                 &initial_layer.file_name(),
    1713            1 :                 &layer_file_name_1.file_name(),
    1714            1 :                 &layer_file_name_2.file_name(),
    1715            1 :                 "index_part.json",
    1716            1 :             ],
    1717            1 :             &remote_timeline_dir,
    1718            1 :             generation,
    1719            1 :         );
    1720            1 : 
    1721            1 :         // Finish them
    1722            1 :         client.wait_completion().await.unwrap();
    1723            1 : 
    1724            1 :         assert_remote_files(
    1725            1 :             &[
    1726            1 :                 &initial_layer.file_name(),
    1727            1 :                 &layer_file_name_2.file_name(),
    1728            1 :                 &layer_file_name_3.file_name(),
    1729            1 :                 "index_part.json",
    1730            1 :             ],
    1731            1 :             &remote_timeline_dir,
    1732            1 :             generation,
    1733            1 :         );
    1734              :     }
    1735              : 
    1736            1 :     #[tokio::test]
    1737            1 :     async fn bytes_unfinished_gauge_for_layer_file_uploads() {
    1738              :         // Setup
    1739              : 
    1740              :         let TestSetup {
    1741            1 :             harness,
    1742            1 :             tenant: _tenant,
    1743            1 :             timeline,
    1744              :             ..
    1745            3 :         } = TestSetup::new("metrics").await.unwrap();
    1746            1 :         let client = timeline.remote_client.as_ref().unwrap();
    1747            1 :         let timeline_path = harness.timeline_path(&TIMELINE_ID);
    1748            1 : 
    1749            1 :         let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
    1750            1 :         let content_1 = dummy_contents("foo");
    1751            1 :         std::fs::write(
    1752            1 :             timeline_path.join(layer_file_name_1.file_name()),
    1753            1 :             &content_1,
    1754            1 :         )
    1755            1 :         .unwrap();
    1756            1 : 
    1757            2 :         #[derive(Debug, PartialEq, Clone, Copy)]
    1758            1 :         struct BytesStartedFinished {
    1759            1 :             started: Option<usize>,
    1760            1 :             finished: Option<usize>,
    1761            1 :         }
    1762            1 :         impl std::ops::Add for BytesStartedFinished {
    1763            1 :             type Output = Self;
    1764            2 :             fn add(self, rhs: Self) -> Self::Output {
    1765            2 :                 Self {
    1766            2 :                     started: self.started.map(|v| v + rhs.started.unwrap_or(0)),
    1767            2 :                     finished: self.finished.map(|v| v + rhs.finished.unwrap_or(0)),
    1768            2 :                 }
    1769            2 :             }
    1770            1 :         }
    1771            3 :         let get_bytes_started_stopped = || {
    1772            3 :             let started = client
    1773            3 :                 .metrics
    1774            3 :                 .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
    1775            3 :                 .map(|v| v.try_into().unwrap());
    1776            3 :             let stopped = client
    1777            3 :                 .metrics
    1778            3 :                 .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
    1779            3 :                 .map(|v| v.try_into().unwrap());
    1780            3 :             BytesStartedFinished {
    1781            3 :                 started,
    1782            3 :                 finished: stopped,
    1783            3 :             }
    1784            3 :         };
    1785              : 
    1786              :         // Test
    1787            1 :         tracing::info!("now doing actual test");
    1788              : 
    1789            1 :         let actual_a = get_bytes_started_stopped();
    1790            1 : 
    1791            1 :         client
    1792            1 :             .schedule_layer_file_upload(
    1793            1 :                 &layer_file_name_1,
    1794            1 :                 &LayerFileMetadata::new(content_1.len() as u64, harness.generation),
    1795            1 :             )
    1796            1 :             .unwrap();
    1797            1 : 
    1798            1 :         let actual_b = get_bytes_started_stopped();
    1799            1 : 
    1800            1 :         client.wait_completion().await.unwrap();
    1801            1 : 
    1802            1 :         let actual_c = get_bytes_started_stopped();
    1803            1 : 
    1804            1 :         // Validate
    1805            1 : 
    1806            1 :         let expected_b = actual_a
    1807            1 :             + BytesStartedFinished {
    1808            1 :                 started: Some(content_1.len()),
    1809            1 :                 // assert that the _finished metric is created eagerly so that subtractions work on first sample
    1810            1 :                 finished: Some(0),
    1811            1 :             };
    1812            1 :         assert_eq!(actual_b, expected_b);
    1813              : 
    1814            1 :         let expected_c = actual_a
    1815            1 :             + BytesStartedFinished {
    1816            1 :                 started: Some(content_1.len()),
    1817            1 :                 finished: Some(content_1.len()),
    1818            1 :             };
    1819            1 :         assert_eq!(actual_c, expected_c);
    1820              :     }
    1821              : }
        

Generated by: LCOV version 2.1-beta