LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 89.4 % 433 387
Test Date: 2025-04-24 20:31:15 Functions: 48.6 % 111 54

            Line data    Source code
       1              : //! Helper functions to download files from remote storage with a RemoteStorage
       2              : //!
       3              : //! The functions in this module retry failed operations automatically, according
       4              : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5              : 
       6              : use std::collections::HashSet;
       7              : use std::future::Future;
       8              : use std::str::FromStr;
       9              : use std::sync::atomic::AtomicU64;
      10              : use std::time::SystemTime;
      11              : 
      12              : use anyhow::{Context, anyhow};
      13              : use camino::{Utf8Path, Utf8PathBuf};
      14              : use pageserver_api::shard::TenantShardId;
      15              : use remote_storage::{
      16              :     DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
      17              : };
      18              : use tokio::fs::{self, File, OpenOptions};
      19              : use tokio::io::AsyncSeekExt;
      20              : use tokio_util::io::StreamReader;
      21              : use tokio_util::sync::CancellationToken;
      22              : use tracing::warn;
      23              : use utils::crashsafe::path_with_suffix_extension;
      24              : use utils::id::{TenantId, TimelineId};
      25              : use utils::{backoff, pausable_failpoint};
      26              : 
      27              : use super::index::{IndexPart, LayerFileMetadata};
      28              : use super::manifest::TenantManifest;
      29              : use super::{
      30              :     FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
      31              :     parse_remote_tenant_manifest_path, remote_index_path, remote_initdb_archive_path,
      32              :     remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
      33              :     remote_tenant_manifest_prefix, remote_tenant_path,
      34              : };
      35              : use crate::TEMP_FILE_SUFFIX;
      36              : use crate::config::PageServerConf;
      37              : use crate::context::RequestContext;
      38              : use crate::span::{
      39              :     debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
      40              : };
      41              : use crate::tenant::Generation;
      42              : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
      43              : use crate::tenant::storage_layer::LayerName;
      44              : use crate::virtual_file;
      45              : use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
      46              : use crate::virtual_file::{IoBufferMut, MaybeFatalIo, VirtualFile};
      47              : use crate::virtual_file::{TempVirtualFile, owned_buffers_io};
      48              : 
      49              : ///
      50              : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      51              : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      52              : ///
      53              : /// Returns the size of the downloaded file.
      54              : #[allow(clippy::too_many_arguments)]
      55           84 : pub async fn download_layer_file<'a>(
      56           84 :     conf: &'static PageServerConf,
      57           84 :     storage: &'a GenericRemoteStorage,
      58           84 :     tenant_shard_id: TenantShardId,
      59           84 :     timeline_id: TimelineId,
      60           84 :     layer_file_name: &'a LayerName,
      61           84 :     layer_metadata: &'a LayerFileMetadata,
      62           84 :     local_path: &Utf8Path,
      63           84 :     gate: &utils::sync::gate::Gate,
      64           84 :     cancel: &CancellationToken,
      65           84 :     ctx: &RequestContext,
      66           84 : ) -> Result<u64, DownloadError> {
      67           84 :     debug_assert_current_span_has_tenant_and_timeline_id();
      68           84 : 
      69           84 :     let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
      70           84 : 
      71           84 :     let remote_path = remote_layer_path(
      72           84 :         &tenant_shard_id.tenant_id,
      73           84 :         &timeline_id,
      74           84 :         layer_metadata.shard,
      75           84 :         layer_file_name,
      76           84 :         layer_metadata.generation,
      77           84 :     );
      78              : 
      79           84 :     let (bytes_amount, temp_file) = download_retry(
      80           84 :         || async {
      81              :             // TempVirtualFile requires us to never reuse a filename while an old
      82              :             // instance of TempVirtualFile created with that filename is not done dropping yet.
      83              :             // So, we use a monotonic counter to disambiguate the filenames.
      84              :             static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
      85           84 :             let filename_disambiguator =
      86           84 :                 NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      87           84 : 
      88           84 :             let temp_file_path = path_with_suffix_extension(
      89           84 :                 local_path,
      90           84 :                 &format!("{filename_disambiguator:x}.{TEMP_DOWNLOAD_EXTENSION}"),
      91           84 :             );
      92              : 
      93           84 :             let temp_file = TempVirtualFile::new(
      94           84 :                 VirtualFile::open_with_options_v2(
      95           84 :                     &temp_file_path,
      96           84 :                     virtual_file::OpenOptions::new()
      97           84 :                         .create_new(true)
      98           84 :                         .write(true),
      99           84 :                     ctx,
     100           84 :                 )
     101           84 :                 .await
     102           84 :                 .with_context(|| format!("create a temp file for layer download: {temp_file_path}"))
     103           84 :                 .map_err(DownloadError::Other)?,
     104           84 :                 gate.enter().map_err(|_| DownloadError::Cancelled)?,
     105              :             );
     106           84 :             download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await
     107          168 :         },
     108           84 :         &format!("download {remote_path:?}"),
     109           84 :         cancel,
     110           84 :     )
     111           84 :     .await?;
     112              : 
     113           84 :     let expected = layer_metadata.file_size;
     114           84 :     if expected != bytes_amount {
     115            0 :         return Err(DownloadError::Other(anyhow!(
     116            0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}",
     117            0 :             temp_file.path()
     118            0 :         )));
     119           84 :     }
     120           84 : 
     121           84 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
     122            0 :         Err(DownloadError::Other(anyhow!(
     123            0 :             "remote-storage-download-pre-rename failpoint triggered"
     124            0 :         )))
     125           84 :     });
     126              : 
     127              :     // Try rename before disarming the temp file.
     128              :     // That way, if rename fails for whatever reason, we clean up the temp file on the return path.
     129              : 
     130           84 :     fs::rename(temp_file.path(), &local_path)
     131           84 :         .await
     132           84 :         .with_context(|| format!("rename download layer file to {local_path}"))
     133           84 :         .map_err(DownloadError::Other)?;
     134              : 
     135              :     // The temp file's VirtualFile points to the temp_file_path which we moved above.
     136              :     // Drop it immediately, it's invalid.
     137              :     // This will get better in https://github.com/neondatabase/neon/issues/11692
     138           84 :     let _: VirtualFile = temp_file.disarm_into_inner();
     139           84 :     // NB: The gate guard that was stored in `temp_file` is dropped but we continue
     140           84 :     // to operate on it and on the parent timeline directory.
     141           84 :     // Those operations are safe to do because higher-level code is holding another gate guard:
     142           84 :     // - attached mode: the download task spawned by struct Layer is holding the gate guard
     143           84 :     // - secondary mode: The TenantDownloader::download holds the gate open
     144           84 : 
     145           84 :     // The rename above is not durable yet.
     146           84 :     // It doesn't matter for crash consistency because pageserver startup deletes temp
     147           84 :     // files and we'll re-download on demand if necessary.
     148           84 : 
     149           84 :     // We use fatal_err() below because the after the rename above,
     150           84 :     // the in-memory state of the filesystem already has the layer file in its final place,
     151           84 :     // and subsequent pageserver code could think it's durable while it really isn't.
     152           84 :     let work = {
     153           84 :         let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
     154           84 :         async move {
     155           84 :             let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
     156           84 :                 .await
     157           84 :                 .fatal_err("VirtualFile::open for timeline dir fsync");
     158           84 :             timeline_dir
     159           84 :                 .sync_all()
     160           84 :                 .await
     161           84 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     162           84 :         }
     163              :     };
     164           84 :     crate::virtual_file::io_engine::get()
     165           84 :         .spawn_blocking_and_block_on_if_std(work)
     166           84 :         .await;
     167              : 
     168           84 :     tracing::debug!("download complete: {local_path}");
     169              : 
     170           84 :     Ok(bytes_amount)
     171           84 : }
     172              : 
     173              : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
     174              : ///
     175              : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
     176              : /// (Note that the directory entry for the inode is not made durable.)
     177              : /// The file size in bytes is returned.
     178              : ///
     179              : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
     180              : /// The unlinking has _not_ been made durable.
     181           84 : async fn download_object(
     182           84 :     storage: &GenericRemoteStorage,
     183           84 :     src_path: &RemotePath,
     184           84 :     destination_file: TempVirtualFile,
     185           84 :     gate: &utils::sync::gate::Gate,
     186           84 :     cancel: &CancellationToken,
     187           84 :     ctx: &RequestContext,
     188           84 : ) -> Result<(u64, TempVirtualFile), DownloadError> {
     189           84 :     let mut download = storage
     190           84 :         .download(src_path, &DownloadOpts::default(), cancel)
     191           84 :         .await?;
     192              : 
     193           84 :     pausable_failpoint!("before-downloading-layer-stream-pausable");
     194              : 
     195           84 :     let dst_path = destination_file.path().to_owned();
     196           84 :     let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
     197           84 :         destination_file,
     198              :         0,
     199          168 :         || IoBufferMut::with_capacity(super::BUFFER_SIZE),
     200           84 :         gate.enter().map_err(|_| DownloadError::Cancelled)?,
     201           84 :         cancel.child_token(),
     202           84 :         ctx,
     203           84 :         tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
     204              :     );
     205              : 
     206              :     // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
     207              :     // There's chunks_vectored() on the stream.
     208           84 :     let (bytes_amount, destination_file) = async {
     209          588 :         while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await {
     210          504 :             let chunk = match res {
     211          504 :                 Ok(chunk) => chunk,
     212            0 :                 Err(e) => return Err(DownloadError::from(e)),
     213              :             };
     214          504 :             buffered
     215          504 :                 .write_buffered_borrowed(&chunk, ctx)
     216          504 :                 .await
     217          504 :                 .map_err(|e| match e {
     218            0 :                     FlushTaskError::Cancelled => DownloadError::Cancelled,
     219          504 :                 })?;
     220              :         }
     221           84 :         buffered
     222           84 :             .shutdown(
     223           84 :                 owned_buffers_io::write::BufferedWriterShutdownMode::PadThenTruncate,
     224           84 :                 ctx,
     225           84 :             )
     226           84 :             .await
     227           84 :             .map_err(|e| match e {
     228            0 :                 FlushTaskError::Cancelled => DownloadError::Cancelled,
     229           84 :             })
     230           84 :     }
     231           84 :     .await?;
     232              : 
     233              :     // not using sync_data because it can lose file size update
     234           84 :     destination_file
     235           84 :         .sync_all()
     236           84 :         .await
     237           84 :         .maybe_fatal_err("download_object sync_all")
     238           84 :         .with_context(|| format!("failed to fsync source file at {dst_path}"))
     239           84 :         .map_err(DownloadError::Other)?;
     240              : 
     241           84 :     Ok((bytes_amount, destination_file))
     242           84 : }
     243              : 
     244              : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     245              : 
     246            0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
     247            0 :     let extension = path.extension();
     248            0 :     match extension {
     249            0 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     250            0 :         Some(_) => false,
     251            0 :         None => false,
     252              :     }
     253            0 : }
     254              : 
     255         1392 : async fn list_identifiers<T>(
     256         1392 :     storage: &GenericRemoteStorage,
     257         1392 :     prefix: RemotePath,
     258         1392 :     cancel: CancellationToken,
     259         1392 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
     260         1392 : where
     261         1392 :     T: FromStr + Eq + std::hash::Hash,
     262         1392 : {
     263         1392 :     let listing = download_retry_forever(
     264         1392 :         || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
     265         1392 :         &format!("list identifiers in prefix {prefix}"),
     266         1392 :         &cancel,
     267         1392 :     )
     268         1392 :     .await?;
     269              : 
     270         1392 :     let mut parsed_ids = HashSet::new();
     271         1392 :     let mut other_prefixes = HashSet::new();
     272              : 
     273         1428 :     for id_remote_storage_key in listing.prefixes {
     274           36 :         let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
     275            0 :             anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
     276           36 :         })?;
     277              : 
     278           36 :         match object_name.parse::<T>() {
     279           36 :             Ok(t) => parsed_ids.insert(t),
     280            0 :             Err(_) => other_prefixes.insert(object_name.to_string()),
     281              :         };
     282              :     }
     283              : 
     284         1392 :     for object in listing.keys {
     285            0 :         let object_name = object
     286            0 :             .key
     287            0 :             .object_name()
     288            0 :             .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
     289            0 :         other_prefixes.insert(object_name.to_string());
     290              :     }
     291              : 
     292         1392 :     Ok((parsed_ids, other_prefixes))
     293         1392 : }
     294              : 
     295              : /// List shards of given tenant in remote storage
     296            0 : pub(crate) async fn list_remote_tenant_shards(
     297            0 :     storage: &GenericRemoteStorage,
     298            0 :     tenant_id: TenantId,
     299            0 :     cancel: CancellationToken,
     300            0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
     301            0 :     let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
     302            0 :     list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
     303            0 : }
     304              : 
     305              : /// List timelines of given tenant shard in remote storage
     306         1392 : pub async fn list_remote_timelines(
     307         1392 :     storage: &GenericRemoteStorage,
     308         1392 :     tenant_shard_id: TenantShardId,
     309         1392 :     cancel: CancellationToken,
     310         1392 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
     311         1392 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     312            0 :         anyhow::bail!("storage-sync-list-remote-timelines");
     313         1392 :     });
     314              : 
     315         1392 :     let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
     316         1392 :     list_identifiers::<TimelineId>(storage, remote_path, cancel).await
     317         1392 : }
     318              : 
     319         4308 : async fn do_download_remote_path_retry_forever(
     320         4308 :     storage: &GenericRemoteStorage,
     321         4308 :     remote_path: &RemotePath,
     322         4308 :     download_opts: DownloadOpts,
     323         4308 :     cancel: &CancellationToken,
     324         4308 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
     325         4308 :     download_retry_forever(
     326         4308 :         || async {
     327         4308 :             let download = storage
     328         4308 :                 .download(remote_path, &download_opts, cancel)
     329         4308 :                 .await?;
     330              : 
     331          156 :             let mut bytes = Vec::new();
     332          156 : 
     333          156 :             let stream = download.download_stream;
     334          156 :             let mut stream = StreamReader::new(stream);
     335          156 : 
     336          156 :             tokio::io::copy_buf(&mut stream, &mut bytes).await?;
     337              : 
     338          156 :             Ok((bytes, download.last_modified))
     339         8616 :         },
     340         4308 :         &format!("download {remote_path:?}"),
     341         4308 :         cancel,
     342         4308 :     )
     343         4308 :     .await
     344         4308 : }
     345              : 
     346         4104 : async fn do_download_tenant_manifest(
     347         4104 :     storage: &GenericRemoteStorage,
     348         4104 :     tenant_shard_id: &TenantShardId,
     349         4104 :     _timeline_id: Option<&TimelineId>,
     350         4104 :     generation: Generation,
     351         4104 :     cancel: &CancellationToken,
     352         4104 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
     353         4104 :     let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
     354         4104 : 
     355         4104 :     let download_opts = DownloadOpts {
     356         4104 :         kind: DownloadKind::Small,
     357         4104 :         ..Default::default()
     358         4104 :     };
     359              : 
     360           36 :     let (manifest_bytes, manifest_bytes_mtime) =
     361         4104 :         do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
     362              : 
     363           36 :     let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
     364           36 :         .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
     365           36 :         .map_err(DownloadError::Other)?;
     366              : 
     367           36 :     Ok((tenant_manifest, generation, manifest_bytes_mtime))
     368         4104 : }
     369              : 
     370          204 : async fn do_download_index_part(
     371          204 :     storage: &GenericRemoteStorage,
     372          204 :     tenant_shard_id: &TenantShardId,
     373          204 :     timeline_id: Option<&TimelineId>,
     374          204 :     index_generation: Generation,
     375          204 :     cancel: &CancellationToken,
     376          204 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
     377          204 :     let timeline_id =
     378          204 :         timeline_id.expect("A timeline ID is always provided when downloading an index");
     379          204 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
     380          204 : 
     381          204 :     let download_opts = DownloadOpts {
     382          204 :         kind: DownloadKind::Small,
     383          204 :         ..Default::default()
     384          204 :     };
     385              : 
     386          120 :     let (index_part_bytes, index_part_mtime) =
     387          204 :         do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
     388              : 
     389          120 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     390          120 :         .with_context(|| format!("deserialize index part file at {remote_path:?}"))
     391          120 :         .map_err(DownloadError::Other)?;
     392              : 
     393          120 :     Ok((index_part, index_generation, index_part_mtime))
     394          204 : }
     395              : 
     396              : /// Metadata objects are "generationed", meaning that they include a generation suffix.  This
     397              : /// function downloads the object with the highest generation <= `my_generation`.
     398              : ///
     399              : /// Data objects (layer files) also include a generation in their path, but there is no equivalent
     400              : /// search process, because their reference from an index includes the generation.
     401              : ///
     402              : /// An expensive object listing operation is only done if necessary: the typical fast path is to issue two
     403              : /// GET operations, one to our own generation (stale attachment case), and one to the immediately preceding
     404              : /// generation (normal case when migrating/restarting).  Only if both of these return 404 do we fall back
     405              : /// to listing objects.
     406              : ///
     407              : /// * `my_generation`: the value of `[crate::tenant::TenantShard::generation]`
     408              : /// * `what`: for logging, what object are we downloading
     409              : /// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
     410              : /// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless
     411              : ///                  `cancel`` has fired.  This function does not do its own retries of GET operations, and relies
     412              : ///                  on the function passed in to do so.
     413              : /// * `parse_path`: parse a fully qualified remote storage path to get the generation of the object.
     414              : #[allow(clippy::too_many_arguments)]
     415              : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     416              : pub(crate) async fn download_generation_object<'a, T, DF, DFF, PF>(
     417              :     storage: &'a GenericRemoteStorage,
     418              :     tenant_shard_id: &'a TenantShardId,
     419              :     timeline_id: Option<&'a TimelineId>,
     420              :     my_generation: Generation,
     421              :     what: &str,
     422              :     prefix: RemotePath,
     423              :     do_download: DF,
     424              :     parse_path: PF,
     425              :     cancel: &'a CancellationToken,
     426              : ) -> Result<(T, Generation, SystemTime), DownloadError>
     427              : where
     428              :     DF: Fn(
     429              :         &'a GenericRemoteStorage,
     430              :         &'a TenantShardId,
     431              :         Option<&'a TimelineId>,
     432              :         Generation,
     433              :         &'a CancellationToken,
     434              :     ) -> DFF,
     435              :     DFF: Future<Output = Result<(T, Generation, SystemTime), DownloadError>>,
     436              :     PF: Fn(RemotePath) -> Option<Generation>,
     437              :     T: 'static,
     438              : {
     439              :     debug_assert_current_span_has_tenant_id();
     440              : 
     441              :     if my_generation.is_none() {
     442              :         // Operating without generations: just fetch the generation-less path
     443              :         return do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     444              :     }
     445              : 
     446              :     // Stale case: If we were intentionally attached in a stale generation, the remote object may already
     447              :     // exist in our generation.
     448              :     //
     449              :     // This is an optimization to avoid doing the listing for the general case below.
     450              :     let res = do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     451              :     match res {
     452              :         Ok(decoded) => {
     453              :             tracing::debug!("Found {what} from current generation (this is a stale attachment)");
     454              :             return Ok(decoded);
     455              :         }
     456              :         Err(DownloadError::NotFound) => {}
     457              :         Err(e) => return Err(e),
     458              :     };
     459              : 
     460              :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded the object
     461              :     // we are seeking in that generation.  We may safely start from this index without doing a listing, because:
     462              :     //  - We checked for current generation case above
     463              :     //  - generations > my_generation are to be ignored
     464              :     //  - any other objects that exist would have an older generation than `previous_gen`, and
     465              :     //    we want to find the most recent object from a previous generation.
     466              :     //
     467              :     // This is an optimization to avoid doing the listing for the general case below.
     468              :     let res = do_download(
     469              :         storage,
     470              :         tenant_shard_id,
     471              :         timeline_id,
     472              :         my_generation.previous(),
     473              :         cancel,
     474              :     )
     475              :     .await;
     476              :     match res {
     477              :         Ok(decoded) => {
     478              :             tracing::debug!("Found {what} from previous generation");
     479              :             return Ok(decoded);
     480              :         }
     481              :         Err(DownloadError::NotFound) => {
     482              :             tracing::debug!("No {what} found from previous generation, falling back to listing");
     483              :         }
     484              :         Err(e) => {
     485              :             return Err(e);
     486              :         }
     487              :     }
     488              : 
     489              :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     490              :     // objects, and select the highest one with a generation <= my_generation.  Constructing the prefix is equivalent
     491              :     // to constructing a full index path with no generation, because the generation is a suffix.
     492              :     let paths = download_retry(
     493         1392 :         || async {
     494         1392 :             storage
     495         1392 :                 .list(Some(&prefix), ListingMode::NoDelimiter, None, cancel)
     496         1392 :                 .await
     497         2784 :         },
     498              :         "list index_part files",
     499              :         cancel,
     500              :     )
     501              :     .await?
     502              :     .keys;
     503              : 
     504              :     // General case logic for which index to use: the latest index whose generation
     505              :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     506              :     let max_previous_generation = paths
     507              :         .into_iter()
     508          108 :         .filter_map(|o| parse_path(o.key))
     509           72 :         .filter(|g| g <= &my_generation)
     510              :         .max();
     511              : 
     512              :     match max_previous_generation {
     513              :         Some(g) => {
     514              :             tracing::debug!("Found {what} in generation {g:?}");
     515              :             do_download(storage, tenant_shard_id, timeline_id, g, cancel).await
     516              :         }
     517              :         None => {
     518              :             // Migration from legacy pre-generation state: we have a generation but no prior
     519              :             // attached pageservers did.  Try to load from a no-generation path.
     520              :             tracing::debug!("No {what}* found");
     521              :             do_download(
     522              :                 storage,
     523              :                 tenant_shard_id,
     524              :                 timeline_id,
     525              :                 Generation::none(),
     526              :                 cancel,
     527              :             )
     528              :             .await
     529              :         }
     530              :     }
     531              : }
     532              : 
     533              : /// index_part.json objects are suffixed with a generation number, so we cannot
     534              : /// directly GET the latest index part without doing some probing.
     535              : ///
     536              : /// In this function we probe for the most recent index in a generation <= our current generation.
     537              : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     538          120 : pub(crate) async fn download_index_part(
     539          120 :     storage: &GenericRemoteStorage,
     540          120 :     tenant_shard_id: &TenantShardId,
     541          120 :     timeline_id: &TimelineId,
     542          120 :     my_generation: Generation,
     543          120 :     cancel: &CancellationToken,
     544          120 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
     545          120 :     debug_assert_current_span_has_tenant_and_timeline_id();
     546          120 : 
     547          120 :     let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
     548          120 :     download_generation_object(
     549          120 :         storage,
     550          120 :         tenant_shard_id,
     551          120 :         Some(timeline_id),
     552          120 :         my_generation,
     553          120 :         "index_part",
     554          120 :         index_prefix,
     555          120 :         do_download_index_part,
     556          120 :         parse_remote_index_path,
     557          120 :         cancel,
     558          120 :     )
     559          120 :     .await
     560          120 : }
     561              : 
     562         1392 : pub(crate) async fn download_tenant_manifest(
     563         1392 :     storage: &GenericRemoteStorage,
     564         1392 :     tenant_shard_id: &TenantShardId,
     565         1392 :     my_generation: Generation,
     566         1392 :     cancel: &CancellationToken,
     567         1392 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
     568         1392 :     let manifest_prefix = remote_tenant_manifest_prefix(tenant_shard_id);
     569         1392 : 
     570         1392 :     download_generation_object(
     571         1392 :         storage,
     572         1392 :         tenant_shard_id,
     573         1392 :         None,
     574         1392 :         my_generation,
     575         1392 :         "tenant-manifest",
     576         1392 :         manifest_prefix,
     577         1392 :         do_download_tenant_manifest,
     578         1392 :         parse_remote_tenant_manifest_path,
     579         1392 :         cancel,
     580         1392 :     )
     581         1392 :     .await
     582         1392 : }
     583              : 
     584           12 : pub(crate) async fn download_initdb_tar_zst(
     585           12 :     conf: &'static PageServerConf,
     586           12 :     storage: &GenericRemoteStorage,
     587           12 :     tenant_shard_id: &TenantShardId,
     588           12 :     timeline_id: &TimelineId,
     589           12 :     cancel: &CancellationToken,
     590           12 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
     591           12 :     debug_assert_current_span_has_tenant_and_timeline_id();
     592           12 : 
     593           12 :     let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     594           12 : 
     595           12 :     let remote_preserved_path =
     596           12 :         remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     597           12 : 
     598           12 :     let timeline_path = conf.timelines_path(tenant_shard_id);
     599           12 : 
     600           12 :     if !timeline_path.exists() {
     601            0 :         tokio::fs::create_dir_all(&timeline_path)
     602            0 :             .await
     603            0 :             .with_context(|| format!("timeline dir creation {timeline_path}"))
     604            0 :             .map_err(DownloadError::Other)?;
     605           12 :     }
     606           12 :     let temp_path = timeline_path.join(format!(
     607           12 :         "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
     608           12 :     ));
     609              : 
     610           12 :     let file = download_retry(
     611           12 :         || async {
     612           12 :             let file = OpenOptions::new()
     613           12 :                 .create(true)
     614           12 :                 .truncate(true)
     615           12 :                 .read(true)
     616           12 :                 .write(true)
     617           12 :                 .open(&temp_path)
     618           12 :                 .await
     619           12 :                 .with_context(|| format!("tempfile creation {temp_path}"))
     620           12 :                 .map_err(DownloadError::Other)?;
     621              : 
     622           12 :             let download = match storage
     623           12 :                 .download(&remote_path, &DownloadOpts::default(), cancel)
     624           12 :                 .await
     625              :             {
     626           12 :                 Ok(dl) => dl,
     627              :                 Err(DownloadError::NotFound) => {
     628            0 :                     storage
     629            0 :                         .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
     630            0 :                         .await?
     631              :                 }
     632            0 :                 Err(other) => Err(other)?,
     633              :             };
     634           12 :             let mut download = tokio_util::io::StreamReader::new(download.download_stream);
     635           12 :             let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
     636           12 : 
     637           12 :             tokio::io::copy_buf(&mut download, &mut writer).await?;
     638              : 
     639           12 :             let mut file = writer.into_inner();
     640           12 : 
     641           12 :             file.seek(std::io::SeekFrom::Start(0))
     642           12 :                 .await
     643           12 :                 .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
     644           12 :                 .map_err(DownloadError::Other)?;
     645              : 
     646           12 :             Ok(file)
     647           24 :         },
     648           12 :         &format!("download {remote_path}"),
     649           12 :         cancel,
     650           12 :     )
     651           12 :     .await
     652           12 :     .inspect_err(|_e| {
     653              :         // Do a best-effort attempt at deleting the temporary file upon encountering an error.
     654              :         // We don't have async here nor do we want to pile on any extra errors.
     655            0 :         if let Err(e) = std::fs::remove_file(&temp_path) {
     656            0 :             if e.kind() != std::io::ErrorKind::NotFound {
     657            0 :                 warn!("error deleting temporary file {temp_path}: {e}");
     658            0 :             }
     659            0 :         }
     660           12 :     })?;
     661              : 
     662           12 :     Ok((temp_path, file))
     663           12 : }
     664              : 
     665              : /// Helper function to handle retries for a download operation.
     666              : ///
     667              : /// Remote operations can fail due to rate limits (S3), spurious network
     668              : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     669              : /// with backoff.
     670              : ///
     671              : /// (See similar logic for uploads in `perform_upload_task`)
     672         1488 : pub(super) async fn download_retry<T, O, F>(
     673         1488 :     op: O,
     674         1488 :     description: &str,
     675         1488 :     cancel: &CancellationToken,
     676         1488 : ) -> Result<T, DownloadError>
     677         1488 : where
     678         1488 :     O: FnMut() -> F,
     679         1488 :     F: Future<Output = Result<T, DownloadError>>,
     680         1488 : {
     681         1488 :     backoff::retry(
     682         1488 :         op,
     683         1488 :         DownloadError::is_permanent,
     684         1488 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     685         1488 :         FAILED_REMOTE_OP_RETRIES,
     686         1488 :         description,
     687         1488 :         cancel,
     688         1488 :     )
     689         1488 :     .await
     690         1488 :     .ok_or_else(|| DownloadError::Cancelled)
     691         1488 :     .and_then(|x| x)
     692         1488 : }
     693              : 
     694         5700 : pub(crate) async fn download_retry_forever<T, O, F>(
     695         5700 :     op: O,
     696         5700 :     description: &str,
     697         5700 :     cancel: &CancellationToken,
     698         5700 : ) -> Result<T, DownloadError>
     699         5700 : where
     700         5700 :     O: FnMut() -> F,
     701         5700 :     F: Future<Output = Result<T, DownloadError>>,
     702         5700 : {
     703         5700 :     backoff::retry(
     704         5700 :         op,
     705         5700 :         DownloadError::is_permanent,
     706         5700 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     707         5700 :         u32::MAX,
     708         5700 :         description,
     709         5700 :         cancel,
     710         5700 :     )
     711         5700 :     .await
     712         5700 :     .ok_or_else(|| DownloadError::Cancelled)
     713         5700 :     .and_then(|x| x)
     714         5700 : }
        

Generated by: LCOV version 2.1-beta