LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 87.6 % 458 401
Test Date: 2025-02-20 13:11:02 Functions: 49.6 % 113 56

            Line data    Source code
       1              : //! Helper functions to download files from remote storage with a RemoteStorage
       2              : //!
       3              : //! The functions in this module retry failed operations automatically, according
       4              : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5              : 
       6              : use std::collections::HashSet;
       7              : use std::future::Future;
       8              : use std::str::FromStr;
       9              : use std::time::SystemTime;
      10              : 
      11              : use anyhow::{anyhow, Context};
      12              : use camino::{Utf8Path, Utf8PathBuf};
      13              : use pageserver_api::shard::TenantShardId;
      14              : use tokio::fs::{self, File, OpenOptions};
      15              : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
      16              : use tokio_util::io::StreamReader;
      17              : use tokio_util::sync::CancellationToken;
      18              : use tracing::warn;
      19              : use utils::backoff;
      20              : 
      21              : use crate::config::PageServerConf;
      22              : use crate::context::RequestContext;
      23              : use crate::span::{
      24              :     debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
      25              : };
      26              : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
      27              : use crate::tenant::storage_layer::LayerName;
      28              : use crate::tenant::Generation;
      29              : use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
      30              : use crate::TEMP_FILE_SUFFIX;
      31              : use remote_storage::{
      32              :     DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
      33              : };
      34              : use utils::crashsafe::path_with_suffix_extension;
      35              : use utils::id::{TenantId, TimelineId};
      36              : use utils::pausable_failpoint;
      37              : 
      38              : use super::index::{IndexPart, LayerFileMetadata};
      39              : use super::manifest::TenantManifest;
      40              : use super::{
      41              :     parse_remote_index_path, parse_remote_tenant_manifest_path, remote_index_path,
      42              :     remote_initdb_archive_path, remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
      43              :     remote_tenant_manifest_prefix, remote_tenant_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
      44              :     FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
      45              : };
      46              : 
      47              : ///
      48              : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      49              : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      50              : ///
      51              : /// Returns the size of the downloaded file.
      52              : #[allow(clippy::too_many_arguments)]
      53           28 : pub async fn download_layer_file<'a>(
      54           28 :     conf: &'static PageServerConf,
      55           28 :     storage: &'a GenericRemoteStorage,
      56           28 :     tenant_shard_id: TenantShardId,
      57           28 :     timeline_id: TimelineId,
      58           28 :     layer_file_name: &'a LayerName,
      59           28 :     layer_metadata: &'a LayerFileMetadata,
      60           28 :     local_path: &Utf8Path,
      61           28 :     gate: &utils::sync::gate::Gate,
      62           28 :     cancel: &CancellationToken,
      63           28 :     ctx: &RequestContext,
      64           28 : ) -> Result<u64, DownloadError> {
      65           28 :     debug_assert_current_span_has_tenant_and_timeline_id();
      66           28 : 
      67           28 :     let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
      68           28 : 
      69           28 :     let remote_path = remote_layer_path(
      70           28 :         &tenant_shard_id.tenant_id,
      71           28 :         &timeline_id,
      72           28 :         layer_metadata.shard,
      73           28 :         layer_file_name,
      74           28 :         layer_metadata.generation,
      75           28 :     );
      76           28 : 
      77           28 :     // Perform a rename inspired by durable_rename from file_utils.c.
      78           28 :     // The sequence:
      79           28 :     //     write(tmp)
      80           28 :     //     fsync(tmp)
      81           28 :     //     rename(tmp, new)
      82           28 :     //     fsync(new)
      83           28 :     //     fsync(parent)
      84           28 :     // For more context about durable_rename check this email from postgres mailing list:
      85           28 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      86           28 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      87           28 :     let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
      88              : 
      89           28 :     let bytes_amount = download_retry(
      90           28 :         || async {
      91           28 :             download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
      92           56 :         },
      93           28 :         &format!("download {remote_path:?}"),
      94           28 :         cancel,
      95           28 :     )
      96           28 :     .await?;
      97              : 
      98           28 :     let expected = layer_metadata.file_size;
      99           28 :     if expected != bytes_amount {
     100            0 :         return Err(DownloadError::Other(anyhow!(
     101            0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
     102            0 :         )));
     103           28 :     }
     104           28 : 
     105           28 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
     106            0 :         Err(DownloadError::Other(anyhow!(
     107            0 :             "remote-storage-download-pre-rename failpoint triggered"
     108            0 :         )))
     109           28 :     });
     110              : 
     111           28 :     fs::rename(&temp_file_path, &local_path)
     112           28 :         .await
     113           28 :         .with_context(|| format!("rename download layer file to {local_path}"))
     114           28 :         .map_err(DownloadError::Other)?;
     115              : 
     116              :     // We use fatal_err() below because the after the rename above,
     117              :     // the in-memory state of the filesystem already has the layer file in its final place,
     118              :     // and subsequent pageserver code could think it's durable while it really isn't.
     119           28 :     let work = {
     120           28 :         let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
     121           28 :         async move {
     122           28 :             let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
     123           28 :                 .await
     124           28 :                 .fatal_err("VirtualFile::open for timeline dir fsync");
     125           28 :             timeline_dir
     126           28 :                 .sync_all()
     127           28 :                 .await
     128           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     129           28 :         }
     130              :     };
     131           28 :     crate::virtual_file::io_engine::get()
     132           28 :         .spawn_blocking_and_block_on_if_std(work)
     133           28 :         .await;
     134              : 
     135           28 :     tracing::debug!("download complete: {local_path}");
     136              : 
     137           28 :     Ok(bytes_amount)
     138           28 : }
     139              : 
     140              : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
     141              : ///
     142              : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
     143              : /// (Note that the directory entry for the inode is not made durable.)
     144              : /// The file size in bytes is returned.
     145              : ///
     146              : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
     147              : /// The unlinking has _not_ been made durable.
     148           28 : async fn download_object(
     149           28 :     storage: &GenericRemoteStorage,
     150           28 :     src_path: &RemotePath,
     151           28 :     dst_path: &Utf8PathBuf,
     152           28 :     #[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
     153           28 :     cancel: &CancellationToken,
     154           28 :     #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
     155           28 : ) -> Result<u64, DownloadError> {
     156           28 :     let res = match crate::virtual_file::io_engine::get() {
     157            0 :         crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
     158              :         crate::virtual_file::io_engine::IoEngine::StdFs => {
     159           14 :             async {
     160           14 :                 let destination_file = tokio::fs::File::create(dst_path)
     161           14 :                     .await
     162           14 :                     .with_context(|| format!("create a destination file for layer '{dst_path}'"))
     163           14 :                     .map_err(DownloadError::Other)?;
     164              : 
     165           14 :                 let download = storage
     166           14 :                     .download(src_path, &DownloadOpts::default(), cancel)
     167           14 :                     .await?;
     168              : 
     169           14 :                 pausable_failpoint!("before-downloading-layer-stream-pausable");
     170              : 
     171           14 :                 let mut buf_writer =
     172           14 :                     tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
     173           14 : 
     174           14 :                 let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
     175              : 
     176           14 :                 let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
     177           14 :                 buf_writer.flush().await?;
     178              : 
     179           14 :                 let mut destination_file = buf_writer.into_inner();
     180           14 : 
     181           14 :                 // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     182           14 :                 // A file will not be closed immediately when it goes out of scope if there are any IO operations
     183           14 :                 // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     184           14 :                 // you should call flush before dropping it.
     185           14 :                 //
     186           14 :                 // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     187           14 :                 // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     188           14 :                 // But for additional safety lets check/wait for any pending operations.
     189           14 :                 destination_file
     190           14 :                     .flush()
     191           14 :                     .await
     192           14 :                     .maybe_fatal_err("download_object sync_all")
     193           14 :                     .with_context(|| format!("flush source file at {dst_path}"))
     194           14 :                     .map_err(DownloadError::Other)?;
     195              : 
     196              :                 // not using sync_data because it can lose file size update
     197           14 :                 destination_file
     198           14 :                     .sync_all()
     199           14 :                     .await
     200           14 :                     .maybe_fatal_err("download_object sync_all")
     201           14 :                     .with_context(|| format!("failed to fsync source file at {dst_path}"))
     202           14 :                     .map_err(DownloadError::Other)?;
     203              : 
     204           14 :                 Ok(bytes_amount)
     205           14 :             }
     206           14 :             .await
     207              :         }
     208              :         #[cfg(target_os = "linux")]
     209              :         crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
     210              :             use crate::virtual_file::owned_buffers_io;
     211              :             use crate::virtual_file::IoBufferMut;
     212              :             use std::sync::Arc;
     213           14 :             async {
     214           14 :                 let destination_file = Arc::new(
     215           14 :                     VirtualFile::create(dst_path, ctx)
     216           14 :                         .await
     217           14 :                         .with_context(|| {
     218            0 :                             format!("create a destination file for layer '{dst_path}'")
     219           14 :                         })
     220           14 :                         .map_err(DownloadError::Other)?,
     221              :                 );
     222              : 
     223           14 :                 let mut download = storage
     224           14 :                     .download(src_path, &DownloadOpts::default(), cancel)
     225           14 :                     .await?;
     226              : 
     227           14 :                 pausable_failpoint!("before-downloading-layer-stream-pausable");
     228              : 
     229           14 :                 let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
     230           14 :                     destination_file,
     231           28 :                     || IoBufferMut::with_capacity(super::BUFFER_SIZE),
     232           14 :                     gate.enter().map_err(|_| DownloadError::Cancelled)?,
     233           14 :                     ctx,
     234              :                 );
     235              : 
     236              :                 // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
     237              :                 // There's chunks_vectored() on the stream.
     238           14 :                 let (bytes_amount, destination_file) = async {
     239           84 :                     while let Some(res) =
     240           98 :                         futures::StreamExt::next(&mut download.download_stream).await
     241              :                     {
     242           84 :                         let chunk = match res {
     243           84 :                             Ok(chunk) => chunk,
     244            0 :                             Err(e) => return Err(e),
     245              :                         };
     246           84 :                         buffered.write_buffered_borrowed(&chunk, ctx).await?;
     247              :                     }
     248           14 :                     let inner = buffered.flush_and_into_inner(ctx).await?;
     249           14 :                     Ok(inner)
     250           14 :                 }
     251           14 :                 .await?;
     252              : 
     253              :                 // not using sync_data because it can lose file size update
     254           14 :                 destination_file
     255           14 :                     .sync_all()
     256           14 :                     .await
     257           14 :                     .maybe_fatal_err("download_object sync_all")
     258           14 :                     .with_context(|| format!("failed to fsync source file at {dst_path}"))
     259           14 :                     .map_err(DownloadError::Other)?;
     260              : 
     261           14 :                 Ok(bytes_amount)
     262           14 :             }
     263           14 :             .await
     264              :         }
     265              :     };
     266              : 
     267              :     // in case the download failed, clean up
     268           28 :     match res {
     269           28 :         Ok(bytes_amount) => Ok(bytes_amount),
     270            0 :         Err(e) => {
     271            0 :             if let Err(e) = tokio::fs::remove_file(dst_path).await {
     272            0 :                 if e.kind() != std::io::ErrorKind::NotFound {
     273            0 :                     on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
     274            0 :                 }
     275            0 :             }
     276            0 :             Err(e)
     277              :         }
     278              :     }
     279           28 : }
     280              : 
     281              : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     282              : 
     283            0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
     284            0 :     let extension = path.extension();
     285            0 :     match extension {
     286            0 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     287            0 :         Some(_) => false,
     288            0 :         None => false,
     289              :     }
     290            0 : }
     291              : 
     292          444 : async fn list_identifiers<T>(
     293          444 :     storage: &GenericRemoteStorage,
     294          444 :     prefix: RemotePath,
     295          444 :     cancel: CancellationToken,
     296          444 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
     297          444 : where
     298          444 :     T: FromStr + Eq + std::hash::Hash,
     299          444 : {
     300          444 :     let listing = download_retry_forever(
     301          444 :         || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
     302          444 :         &format!("list identifiers in prefix {prefix}"),
     303          444 :         &cancel,
     304          444 :     )
     305          444 :     .await?;
     306              : 
     307          444 :     let mut parsed_ids = HashSet::new();
     308          444 :     let mut other_prefixes = HashSet::new();
     309              : 
     310          456 :     for id_remote_storage_key in listing.prefixes {
     311           12 :         let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
     312            0 :             anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
     313           12 :         })?;
     314              : 
     315           12 :         match object_name.parse::<T>() {
     316           12 :             Ok(t) => parsed_ids.insert(t),
     317            0 :             Err(_) => other_prefixes.insert(object_name.to_string()),
     318              :         };
     319              :     }
     320              : 
     321          444 :     for object in listing.keys {
     322            0 :         let object_name = object
     323            0 :             .key
     324            0 :             .object_name()
     325            0 :             .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
     326            0 :         other_prefixes.insert(object_name.to_string());
     327              :     }
     328              : 
     329          444 :     Ok((parsed_ids, other_prefixes))
     330          444 : }
     331              : 
     332              : /// List shards of given tenant in remote storage
     333            0 : pub(crate) async fn list_remote_tenant_shards(
     334            0 :     storage: &GenericRemoteStorage,
     335            0 :     tenant_id: TenantId,
     336            0 :     cancel: CancellationToken,
     337            0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
     338            0 :     let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
     339            0 :     list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
     340            0 : }
     341              : 
     342              : /// List timelines of given tenant shard in remote storage
     343          444 : pub async fn list_remote_timelines(
     344          444 :     storage: &GenericRemoteStorage,
     345          444 :     tenant_shard_id: TenantShardId,
     346          444 :     cancel: CancellationToken,
     347          444 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
     348          444 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     349            0 :         anyhow::bail!("storage-sync-list-remote-timelines");
     350          444 :     });
     351              : 
     352          444 :     let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
     353          444 :     list_identifiers::<TimelineId>(storage, remote_path, cancel).await
     354          444 : }
     355              : 
     356         1400 : async fn do_download_remote_path_retry_forever(
     357         1400 :     storage: &GenericRemoteStorage,
     358         1400 :     remote_path: &RemotePath,
     359         1400 :     download_opts: DownloadOpts,
     360         1400 :     cancel: &CancellationToken,
     361         1400 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
     362         1400 :     download_retry_forever(
     363         1400 :         || async {
     364         1400 :             let download = storage
     365         1400 :                 .download(remote_path, &download_opts, cancel)
     366         1400 :                 .await?;
     367              : 
     368           40 :             let mut bytes = Vec::new();
     369           40 : 
     370           40 :             let stream = download.download_stream;
     371           40 :             let mut stream = StreamReader::new(stream);
     372           40 : 
     373           40 :             tokio::io::copy_buf(&mut stream, &mut bytes).await?;
     374              : 
     375           40 :             Ok((bytes, download.last_modified))
     376         2800 :         },
     377         1400 :         &format!("download {remote_path:?}"),
     378         1400 :         cancel,
     379         1400 :     )
     380         1400 :     .await
     381         1400 : }
     382              : 
     383         1332 : async fn do_download_tenant_manifest(
     384         1332 :     storage: &GenericRemoteStorage,
     385         1332 :     tenant_shard_id: &TenantShardId,
     386         1332 :     _timeline_id: Option<&TimelineId>,
     387         1332 :     generation: Generation,
     388         1332 :     cancel: &CancellationToken,
     389         1332 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
     390         1332 :     let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
     391         1332 : 
     392         1332 :     let download_opts = DownloadOpts {
     393         1332 :         kind: DownloadKind::Small,
     394         1332 :         ..Default::default()
     395         1332 :     };
     396              : 
     397            0 :     let (manifest_bytes, manifest_bytes_mtime) =
     398         1332 :         do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
     399              : 
     400            0 :     let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
     401            0 :         .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
     402            0 :         .map_err(DownloadError::Other)?;
     403              : 
     404            0 :     Ok((tenant_manifest, generation, manifest_bytes_mtime))
     405         1332 : }
     406              : 
     407           68 : async fn do_download_index_part(
     408           68 :     storage: &GenericRemoteStorage,
     409           68 :     tenant_shard_id: &TenantShardId,
     410           68 :     timeline_id: Option<&TimelineId>,
     411           68 :     index_generation: Generation,
     412           68 :     cancel: &CancellationToken,
     413           68 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
     414           68 :     let timeline_id =
     415           68 :         timeline_id.expect("A timeline ID is always provided when downloading an index");
     416           68 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
     417           68 : 
     418           68 :     let download_opts = DownloadOpts {
     419           68 :         kind: DownloadKind::Small,
     420           68 :         ..Default::default()
     421           68 :     };
     422              : 
     423           40 :     let (index_part_bytes, index_part_mtime) =
     424           68 :         do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
     425              : 
     426           40 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     427           40 :         .with_context(|| format!("deserialize index part file at {remote_path:?}"))
     428           40 :         .map_err(DownloadError::Other)?;
     429              : 
     430           40 :     Ok((index_part, index_generation, index_part_mtime))
     431           68 : }
     432              : 
     433              : /// Metadata objects are "generationed", meaning that they include a generation suffix.  This
     434              : /// function downloads the object with the highest generation <= `my_generation`.
     435              : ///
     436              : /// Data objects (layer files) also include a generation in their path, but there is no equivalent
     437              : /// search process, because their reference from an index includes the generation.
     438              : ///
     439              : /// An expensive object listing operation is only done if necessary: the typical fast path is to issue two
     440              : /// GET operations, one to our own generation (stale attachment case), and one to the immediately preceding
     441              : /// generation (normal case when migrating/restarting).  Only if both of these return 404 do we fall back
     442              : /// to listing objects.
     443              : ///
     444              : /// * `my_generation`: the value of `[crate::tenant::Tenant::generation]`
     445              : /// * `what`: for logging, what object are we downloading
     446              : /// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
     447              : /// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless
     448              : ///                  `cancel`` has fired.  This function does not do its own retries of GET operations, and relies
     449              : ///                  on the function passed in to do so.
     450              : /// * `parse_path`: parse a fully qualified remote storage path to get the generation of the object.
     451              : #[allow(clippy::too_many_arguments)]
     452              : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     453              : pub(crate) async fn download_generation_object<'a, T, DF, DFF, PF>(
     454              :     storage: &'a GenericRemoteStorage,
     455              :     tenant_shard_id: &'a TenantShardId,
     456              :     timeline_id: Option<&'a TimelineId>,
     457              :     my_generation: Generation,
     458              :     what: &str,
     459              :     prefix: RemotePath,
     460              :     do_download: DF,
     461              :     parse_path: PF,
     462              :     cancel: &'a CancellationToken,
     463              : ) -> Result<(T, Generation, SystemTime), DownloadError>
     464              : where
     465              :     DF: Fn(
     466              :         &'a GenericRemoteStorage,
     467              :         &'a TenantShardId,
     468              :         Option<&'a TimelineId>,
     469              :         Generation,
     470              :         &'a CancellationToken,
     471              :     ) -> DFF,
     472              :     DFF: Future<Output = Result<(T, Generation, SystemTime), DownloadError>>,
     473              :     PF: Fn(RemotePath) -> Option<Generation>,
     474              :     T: 'static,
     475              : {
     476              :     debug_assert_current_span_has_tenant_id();
     477              : 
     478              :     if my_generation.is_none() {
     479              :         // Operating without generations: just fetch the generation-less path
     480              :         return do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     481              :     }
     482              : 
     483              :     // Stale case: If we were intentionally attached in a stale generation, the remote object may already
     484              :     // exist in our generation.
     485              :     //
     486              :     // This is an optimization to avoid doing the listing for the general case below.
     487              :     let res = do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     488              :     match res {
     489              :         Ok(decoded) => {
     490              :             tracing::debug!("Found {what} from current generation (this is a stale attachment)");
     491              :             return Ok(decoded);
     492              :         }
     493              :         Err(DownloadError::NotFound) => {}
     494              :         Err(e) => return Err(e),
     495              :     };
     496              : 
     497              :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded the object
     498              :     // we are seeking in that generation.  We may safely start from this index without doing a listing, because:
     499              :     //  - We checked for current generation case above
     500              :     //  - generations > my_generation are to be ignored
     501              :     //  - any other objects that exist would have an older generation than `previous_gen`, and
     502              :     //    we want to find the most recent object from a previous generation.
     503              :     //
     504              :     // This is an optimization to avoid doing the listing for the general case below.
     505              :     let res = do_download(
     506              :         storage,
     507              :         tenant_shard_id,
     508              :         timeline_id,
     509              :         my_generation.previous(),
     510              :         cancel,
     511              :     )
     512              :     .await;
     513              :     match res {
     514              :         Ok(decoded) => {
     515              :             tracing::debug!("Found {what} from previous generation");
     516              :             return Ok(decoded);
     517              :         }
     518              :         Err(DownloadError::NotFound) => {
     519              :             tracing::debug!("No {what} found from previous generation, falling back to listing");
     520              :         }
     521              :         Err(e) => {
     522              :             return Err(e);
     523              :         }
     524              :     }
     525              : 
     526              :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     527              :     // objects, and select the highest one with a generation <= my_generation.  Constructing the prefix is equivalent
     528              :     // to constructing a full index path with no generation, because the generation is a suffix.
     529              :     let paths = download_retry(
     530          456 :         || async {
     531          456 :             storage
     532          456 :                 .list(Some(&prefix), ListingMode::NoDelimiter, None, cancel)
     533          456 :                 .await
     534          912 :         },
     535              :         "list index_part files",
     536              :         cancel,
     537              :     )
     538              :     .await?
     539              :     .keys;
     540              : 
     541              :     // General case logic for which index to use: the latest index whose generation
     542              :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     543              :     let max_previous_generation = paths
     544              :         .into_iter()
     545           36 :         .filter_map(|o| parse_path(o.key))
     546           24 :         .filter(|g| g <= &my_generation)
     547              :         .max();
     548              : 
     549              :     match max_previous_generation {
     550              :         Some(g) => {
     551              :             tracing::debug!("Found {what} in generation {g:?}");
     552              :             do_download(storage, tenant_shard_id, timeline_id, g, cancel).await
     553              :         }
     554              :         None => {
     555              :             // Migration from legacy pre-generation state: we have a generation but no prior
     556              :             // attached pageservers did.  Try to load from a no-generation path.
     557              :             tracing::debug!("No {what}* found");
     558              :             do_download(
     559              :                 storage,
     560              :                 tenant_shard_id,
     561              :                 timeline_id,
     562              :                 Generation::none(),
     563              :                 cancel,
     564              :             )
     565              :             .await
     566              :         }
     567              :     }
     568              : }
     569              : 
     570              : /// index_part.json objects are suffixed with a generation number, so we cannot
     571              : /// directly GET the latest index part without doing some probing.
     572              : ///
     573              : /// In this function we probe for the most recent index in a generation <= our current generation.
     574              : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     575           40 : pub(crate) async fn download_index_part(
     576           40 :     storage: &GenericRemoteStorage,
     577           40 :     tenant_shard_id: &TenantShardId,
     578           40 :     timeline_id: &TimelineId,
     579           40 :     my_generation: Generation,
     580           40 :     cancel: &CancellationToken,
     581           40 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
     582           40 :     debug_assert_current_span_has_tenant_and_timeline_id();
     583           40 : 
     584           40 :     let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
     585           40 :     download_generation_object(
     586           40 :         storage,
     587           40 :         tenant_shard_id,
     588           40 :         Some(timeline_id),
     589           40 :         my_generation,
     590           40 :         "index_part",
     591           40 :         index_prefix,
     592           40 :         do_download_index_part,
     593           40 :         parse_remote_index_path,
     594           40 :         cancel,
     595           40 :     )
     596           40 :     .await
     597           40 : }
     598              : 
     599          444 : pub(crate) async fn download_tenant_manifest(
     600          444 :     storage: &GenericRemoteStorage,
     601          444 :     tenant_shard_id: &TenantShardId,
     602          444 :     my_generation: Generation,
     603          444 :     cancel: &CancellationToken,
     604          444 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
     605          444 :     let manifest_prefix = remote_tenant_manifest_prefix(tenant_shard_id);
     606          444 : 
     607          444 :     download_generation_object(
     608          444 :         storage,
     609          444 :         tenant_shard_id,
     610          444 :         None,
     611          444 :         my_generation,
     612          444 :         "tenant-manifest",
     613          444 :         manifest_prefix,
     614          444 :         do_download_tenant_manifest,
     615          444 :         parse_remote_tenant_manifest_path,
     616          444 :         cancel,
     617          444 :     )
     618          444 :     .await
     619          444 : }
     620              : 
     621            4 : pub(crate) async fn download_initdb_tar_zst(
     622            4 :     conf: &'static PageServerConf,
     623            4 :     storage: &GenericRemoteStorage,
     624            4 :     tenant_shard_id: &TenantShardId,
     625            4 :     timeline_id: &TimelineId,
     626            4 :     cancel: &CancellationToken,
     627            4 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
     628            4 :     debug_assert_current_span_has_tenant_and_timeline_id();
     629            4 : 
     630            4 :     let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     631            4 : 
     632            4 :     let remote_preserved_path =
     633            4 :         remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     634            4 : 
     635            4 :     let timeline_path = conf.timelines_path(tenant_shard_id);
     636            4 : 
     637            4 :     if !timeline_path.exists() {
     638            0 :         tokio::fs::create_dir_all(&timeline_path)
     639            0 :             .await
     640            0 :             .with_context(|| format!("timeline dir creation {timeline_path}"))
     641            0 :             .map_err(DownloadError::Other)?;
     642            4 :     }
     643            4 :     let temp_path = timeline_path.join(format!(
     644            4 :         "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
     645            4 :     ));
     646              : 
     647            4 :     let file = download_retry(
     648            4 :         || async {
     649            4 :             let file = OpenOptions::new()
     650            4 :                 .create(true)
     651            4 :                 .truncate(true)
     652            4 :                 .read(true)
     653            4 :                 .write(true)
     654            4 :                 .open(&temp_path)
     655            4 :                 .await
     656            4 :                 .with_context(|| format!("tempfile creation {temp_path}"))
     657            4 :                 .map_err(DownloadError::Other)?;
     658              : 
     659            4 :             let download = match storage
     660            4 :                 .download(&remote_path, &DownloadOpts::default(), cancel)
     661            4 :                 .await
     662              :             {
     663            4 :                 Ok(dl) => dl,
     664              :                 Err(DownloadError::NotFound) => {
     665            0 :                     storage
     666            0 :                         .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
     667            0 :                         .await?
     668              :                 }
     669            0 :                 Err(other) => Err(other)?,
     670              :             };
     671            4 :             let mut download = tokio_util::io::StreamReader::new(download.download_stream);
     672            4 :             let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
     673            4 : 
     674            4 :             tokio::io::copy_buf(&mut download, &mut writer).await?;
     675              : 
     676            4 :             let mut file = writer.into_inner();
     677            4 : 
     678            4 :             file.seek(std::io::SeekFrom::Start(0))
     679            4 :                 .await
     680            4 :                 .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
     681            4 :                 .map_err(DownloadError::Other)?;
     682              : 
     683            4 :             Ok(file)
     684            8 :         },
     685            4 :         &format!("download {remote_path}"),
     686            4 :         cancel,
     687            4 :     )
     688            4 :     .await
     689            4 :     .inspect_err(|_e| {
     690              :         // Do a best-effort attempt at deleting the temporary file upon encountering an error.
     691              :         // We don't have async here nor do we want to pile on any extra errors.
     692            0 :         if let Err(e) = std::fs::remove_file(&temp_path) {
     693            0 :             if e.kind() != std::io::ErrorKind::NotFound {
     694            0 :                 warn!("error deleting temporary file {temp_path}: {e}");
     695            0 :             }
     696            0 :         }
     697            4 :     })?;
     698              : 
     699            4 :     Ok((temp_path, file))
     700            4 : }
     701              : 
     702              : /// Helper function to handle retries for a download operation.
     703              : ///
     704              : /// Remote operations can fail due to rate limits (S3), spurious network
     705              : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     706              : /// with backoff.
     707              : ///
     708              : /// (See similar logic for uploads in `perform_upload_task`)
     709          488 : pub(super) async fn download_retry<T, O, F>(
     710          488 :     op: O,
     711          488 :     description: &str,
     712          488 :     cancel: &CancellationToken,
     713          488 : ) -> Result<T, DownloadError>
     714          488 : where
     715          488 :     O: FnMut() -> F,
     716          488 :     F: Future<Output = Result<T, DownloadError>>,
     717          488 : {
     718          488 :     backoff::retry(
     719          488 :         op,
     720          488 :         DownloadError::is_permanent,
     721          488 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     722          488 :         FAILED_REMOTE_OP_RETRIES,
     723          488 :         description,
     724          488 :         cancel,
     725          488 :     )
     726          488 :     .await
     727          488 :     .ok_or_else(|| DownloadError::Cancelled)
     728          488 :     .and_then(|x| x)
     729          488 : }
     730              : 
     731         1844 : pub(crate) async fn download_retry_forever<T, O, F>(
     732         1844 :     op: O,
     733         1844 :     description: &str,
     734         1844 :     cancel: &CancellationToken,
     735         1844 : ) -> Result<T, DownloadError>
     736         1844 : where
     737         1844 :     O: FnMut() -> F,
     738         1844 :     F: Future<Output = Result<T, DownloadError>>,
     739         1844 : {
     740         1844 :     backoff::retry(
     741         1844 :         op,
     742         1844 :         DownloadError::is_permanent,
     743         1844 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     744         1844 :         u32::MAX,
     745         1844 :         description,
     746         1844 :         cancel,
     747         1844 :     )
     748         1844 :     .await
     749         1844 :     .ok_or_else(|| DownloadError::Cancelled)
     750         1844 :     .and_then(|x| x)
     751         1844 : }
        

Generated by: LCOV version 2.1-beta