LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit
Test: 309d74c7615cee485c3506897fba3db69780454f.info Lines: 87.4 % 470 411
Test Date: 2025-03-19 16:14:43 Functions: 48.7 % 115 56

            Line data    Source code
       1              : //! Helper functions to download files from remote storage with a RemoteStorage
       2              : //!
       3              : //! The functions in this module retry failed operations automatically, according
       4              : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5              : 
       6              : use std::collections::HashSet;
       7              : use std::future::Future;
       8              : use std::str::FromStr;
       9              : use std::time::SystemTime;
      10              : 
      11              : use anyhow::{Context, anyhow};
      12              : use camino::{Utf8Path, Utf8PathBuf};
      13              : use pageserver_api::shard::TenantShardId;
      14              : use remote_storage::{
      15              :     DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
      16              : };
      17              : use tokio::fs::{self, File, OpenOptions};
      18              : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
      19              : use tokio_util::io::StreamReader;
      20              : use tokio_util::sync::CancellationToken;
      21              : use tracing::warn;
      22              : use utils::crashsafe::path_with_suffix_extension;
      23              : use utils::id::{TenantId, TimelineId};
      24              : use utils::{backoff, pausable_failpoint};
      25              : 
      26              : use super::index::{IndexPart, LayerFileMetadata};
      27              : use super::manifest::TenantManifest;
      28              : use super::{
      29              :     FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
      30              :     parse_remote_tenant_manifest_path, remote_index_path, remote_initdb_archive_path,
      31              :     remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
      32              :     remote_tenant_manifest_prefix, remote_tenant_path,
      33              : };
      34              : use crate::TEMP_FILE_SUFFIX;
      35              : use crate::config::PageServerConf;
      36              : use crate::context::RequestContext;
      37              : use crate::span::{
      38              :     debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
      39              : };
      40              : use crate::tenant::Generation;
      41              : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
      42              : use crate::tenant::storage_layer::LayerName;
      43              : use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
      44              : 
      45              : ///
      46              : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      47              : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      48              : ///
      49              : /// Returns the size of the downloaded file.
      50              : #[allow(clippy::too_many_arguments)]
      51           28 : pub async fn download_layer_file<'a>(
      52           28 :     conf: &'static PageServerConf,
      53           28 :     storage: &'a GenericRemoteStorage,
      54           28 :     tenant_shard_id: TenantShardId,
      55           28 :     timeline_id: TimelineId,
      56           28 :     layer_file_name: &'a LayerName,
      57           28 :     layer_metadata: &'a LayerFileMetadata,
      58           28 :     local_path: &Utf8Path,
      59           28 :     gate: &utils::sync::gate::Gate,
      60           28 :     cancel: &CancellationToken,
      61           28 :     ctx: &RequestContext,
      62           28 : ) -> Result<u64, DownloadError> {
      63           28 :     debug_assert_current_span_has_tenant_and_timeline_id();
      64           28 : 
      65           28 :     let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
      66           28 : 
      67           28 :     let remote_path = remote_layer_path(
      68           28 :         &tenant_shard_id.tenant_id,
      69           28 :         &timeline_id,
      70           28 :         layer_metadata.shard,
      71           28 :         layer_file_name,
      72           28 :         layer_metadata.generation,
      73           28 :     );
      74           28 : 
      75           28 :     // Perform a rename inspired by durable_rename from file_utils.c.
      76           28 :     // The sequence:
      77           28 :     //     write(tmp)
      78           28 :     //     fsync(tmp)
      79           28 :     //     rename(tmp, new)
      80           28 :     //     fsync(new)
      81           28 :     //     fsync(parent)
      82           28 :     // For more context about durable_rename check this email from postgres mailing list:
      83           28 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      84           28 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      85           28 :     let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
      86              : 
      87           28 :     let bytes_amount = download_retry(
      88           28 :         || async {
      89           28 :             download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
      90           56 :         },
      91           28 :         &format!("download {remote_path:?}"),
      92           28 :         cancel,
      93           28 :     )
      94           28 :     .await?;
      95              : 
      96           28 :     let expected = layer_metadata.file_size;
      97           28 :     if expected != bytes_amount {
      98            0 :         return Err(DownloadError::Other(anyhow!(
      99            0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
     100            0 :         )));
     101           28 :     }
     102           28 : 
     103           28 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
     104            0 :         Err(DownloadError::Other(anyhow!(
     105            0 :             "remote-storage-download-pre-rename failpoint triggered"
     106            0 :         )))
     107           28 :     });
     108              : 
     109           28 :     fs::rename(&temp_file_path, &local_path)
     110           28 :         .await
     111           28 :         .with_context(|| format!("rename download layer file to {local_path}"))
     112           28 :         .map_err(DownloadError::Other)?;
     113              : 
     114              :     // We use fatal_err() below because the after the rename above,
     115              :     // the in-memory state of the filesystem already has the layer file in its final place,
     116              :     // and subsequent pageserver code could think it's durable while it really isn't.
     117           28 :     let work = {
     118           28 :         let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
     119           28 :         async move {
     120           28 :             let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
     121           28 :                 .await
     122           28 :                 .fatal_err("VirtualFile::open for timeline dir fsync");
     123           28 :             timeline_dir
     124           28 :                 .sync_all()
     125           28 :                 .await
     126           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     127           28 :         }
     128              :     };
     129           28 :     crate::virtual_file::io_engine::get()
     130           28 :         .spawn_blocking_and_block_on_if_std(work)
     131           28 :         .await;
     132              : 
     133           28 :     tracing::debug!("download complete: {local_path}");
     134              : 
     135           28 :     Ok(bytes_amount)
     136           28 : }
     137              : 
     138              : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
     139              : ///
     140              : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
     141              : /// (Note that the directory entry for the inode is not made durable.)
     142              : /// The file size in bytes is returned.
     143              : ///
     144              : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
     145              : /// The unlinking has _not_ been made durable.
     146           28 : async fn download_object(
     147           28 :     storage: &GenericRemoteStorage,
     148           28 :     src_path: &RemotePath,
     149           28 :     dst_path: &Utf8PathBuf,
     150           28 :     #[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
     151           28 :     cancel: &CancellationToken,
     152           28 :     #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
     153           28 : ) -> Result<u64, DownloadError> {
     154           28 :     let res = match crate::virtual_file::io_engine::get() {
     155            0 :         crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
     156              :         crate::virtual_file::io_engine::IoEngine::StdFs => {
     157           14 :             async {
     158           14 :                 let destination_file = tokio::fs::File::create(dst_path)
     159           14 :                     .await
     160           14 :                     .with_context(|| format!("create a destination file for layer '{dst_path}'"))
     161           14 :                     .map_err(DownloadError::Other)?;
     162              : 
     163           14 :                 let download = storage
     164           14 :                     .download(src_path, &DownloadOpts::default(), cancel)
     165           14 :                     .await?;
     166              : 
     167           14 :                 pausable_failpoint!("before-downloading-layer-stream-pausable");
     168              : 
     169           14 :                 let mut buf_writer =
     170           14 :                     tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
     171           14 : 
     172           14 :                 let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
     173              : 
     174           14 :                 let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
     175           14 :                 buf_writer.flush().await?;
     176              : 
     177           14 :                 let mut destination_file = buf_writer.into_inner();
     178           14 : 
     179           14 :                 // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     180           14 :                 // A file will not be closed immediately when it goes out of scope if there are any IO operations
     181           14 :                 // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     182           14 :                 // you should call flush before dropping it.
     183           14 :                 //
     184           14 :                 // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     185           14 :                 // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     186           14 :                 // But for additional safety lets check/wait for any pending operations.
     187           14 :                 destination_file
     188           14 :                     .flush()
     189           14 :                     .await
     190           14 :                     .maybe_fatal_err("download_object sync_all")
     191           14 :                     .with_context(|| format!("flush source file at {dst_path}"))
     192           14 :                     .map_err(DownloadError::Other)?;
     193              : 
     194              :                 // not using sync_data because it can lose file size update
     195           14 :                 destination_file
     196           14 :                     .sync_all()
     197           14 :                     .await
     198           14 :                     .maybe_fatal_err("download_object sync_all")
     199           14 :                     .with_context(|| format!("failed to fsync source file at {dst_path}"))
     200           14 :                     .map_err(DownloadError::Other)?;
     201              : 
     202           14 :                 Ok(bytes_amount)
     203           14 :             }
     204           14 :             .await
     205              :         }
     206              :         #[cfg(target_os = "linux")]
     207              :         crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
     208              :             use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
     209              :             use std::sync::Arc;
     210              : 
     211              :             use crate::virtual_file::{IoBufferMut, owned_buffers_io};
     212           14 :             async {
     213           14 :                 let destination_file = Arc::new(
     214           14 :                     VirtualFile::create(dst_path, ctx)
     215           14 :                         .await
     216           14 :                         .with_context(|| {
     217            0 :                             format!("create a destination file for layer '{dst_path}'")
     218           14 :                         })
     219           14 :                         .map_err(DownloadError::Other)?,
     220              :                 );
     221              : 
     222           14 :                 let mut download = storage
     223           14 :                     .download(src_path, &DownloadOpts::default(), cancel)
     224           14 :                     .await?;
     225              : 
     226           14 :                 pausable_failpoint!("before-downloading-layer-stream-pausable");
     227              : 
     228           14 :                 let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
     229           14 :                     destination_file,
     230           28 :                     || IoBufferMut::with_capacity(super::BUFFER_SIZE),
     231           14 :                     gate.enter().map_err(|_| DownloadError::Cancelled)?,
     232           14 :                     cancel.child_token(),
     233           14 :                     ctx,
     234           14 :                     tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
     235              :                 );
     236              : 
     237              :                 // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
     238              :                 // There's chunks_vectored() on the stream.
     239           14 :                 let (bytes_amount, destination_file) = async {
     240           84 :                     while let Some(res) =
     241           98 :                         futures::StreamExt::next(&mut download.download_stream).await
     242              :                     {
     243           84 :                         let chunk = match res {
     244           84 :                             Ok(chunk) => chunk,
     245            0 :                             Err(e) => return Err(DownloadError::from(e)),
     246              :                         };
     247           84 :                         buffered
     248           84 :                             .write_buffered_borrowed(&chunk, ctx)
     249           84 :                             .await
     250           84 :                             .map_err(|e| match e {
     251            0 :                                 FlushTaskError::Cancelled => DownloadError::Cancelled,
     252           84 :                             })?;
     253              :                     }
     254           14 :                     let inner = buffered
     255           14 :                         .flush_and_into_inner(ctx)
     256           14 :                         .await
     257           14 :                         .map_err(|e| match e {
     258            0 :                             FlushTaskError::Cancelled => DownloadError::Cancelled,
     259           14 :                         })?;
     260           14 :                     Ok(inner)
     261           14 :                 }
     262           14 :                 .await?;
     263              : 
     264              :                 // not using sync_data because it can lose file size update
     265           14 :                 destination_file
     266           14 :                     .sync_all()
     267           14 :                     .await
     268           14 :                     .maybe_fatal_err("download_object sync_all")
     269           14 :                     .with_context(|| format!("failed to fsync source file at {dst_path}"))
     270           14 :                     .map_err(DownloadError::Other)?;
     271              : 
     272           14 :                 Ok(bytes_amount)
     273           14 :             }
     274           14 :             .await
     275              :         }
     276              :     };
     277              : 
     278              :     // in case the download failed, clean up
     279           28 :     match res {
     280           28 :         Ok(bytes_amount) => Ok(bytes_amount),
     281            0 :         Err(e) => {
     282            0 :             if let Err(e) = tokio::fs::remove_file(dst_path).await {
     283            0 :                 if e.kind() != std::io::ErrorKind::NotFound {
     284            0 :                     on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
     285            0 :                 }
     286            0 :             }
     287            0 :             Err(e)
     288              :         }
     289              :     }
     290           28 : }
     291              : 
     292              : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     293              : 
     294            0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
     295            0 :     let extension = path.extension();
     296            0 :     match extension {
     297            0 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     298            0 :         Some(_) => false,
     299            0 :         None => false,
     300              :     }
     301            0 : }
     302              : 
     303          452 : async fn list_identifiers<T>(
     304          452 :     storage: &GenericRemoteStorage,
     305          452 :     prefix: RemotePath,
     306          452 :     cancel: CancellationToken,
     307          452 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
     308          452 : where
     309          452 :     T: FromStr + Eq + std::hash::Hash,
     310          452 : {
     311          452 :     let listing = download_retry_forever(
     312          452 :         || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
     313          452 :         &format!("list identifiers in prefix {prefix}"),
     314          452 :         &cancel,
     315          452 :     )
     316          452 :     .await?;
     317              : 
     318          452 :     let mut parsed_ids = HashSet::new();
     319          452 :     let mut other_prefixes = HashSet::new();
     320              : 
     321          464 :     for id_remote_storage_key in listing.prefixes {
     322           12 :         let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
     323            0 :             anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
     324           12 :         })?;
     325              : 
     326           12 :         match object_name.parse::<T>() {
     327           12 :             Ok(t) => parsed_ids.insert(t),
     328            0 :             Err(_) => other_prefixes.insert(object_name.to_string()),
     329              :         };
     330              :     }
     331              : 
     332          452 :     for object in listing.keys {
     333            0 :         let object_name = object
     334            0 :             .key
     335            0 :             .object_name()
     336            0 :             .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
     337            0 :         other_prefixes.insert(object_name.to_string());
     338              :     }
     339              : 
     340          452 :     Ok((parsed_ids, other_prefixes))
     341          452 : }
     342              : 
     343              : /// List shards of given tenant in remote storage
     344            0 : pub(crate) async fn list_remote_tenant_shards(
     345            0 :     storage: &GenericRemoteStorage,
     346            0 :     tenant_id: TenantId,
     347            0 :     cancel: CancellationToken,
     348            0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
     349            0 :     let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
     350            0 :     list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
     351            0 : }
     352              : 
     353              : /// List timelines of given tenant shard in remote storage
     354          452 : pub async fn list_remote_timelines(
     355          452 :     storage: &GenericRemoteStorage,
     356          452 :     tenant_shard_id: TenantShardId,
     357          452 :     cancel: CancellationToken,
     358          452 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
     359          452 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     360            0 :         anyhow::bail!("storage-sync-list-remote-timelines");
     361          452 :     });
     362              : 
     363          452 :     let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
     364          452 :     list_identifiers::<TimelineId>(storage, remote_path, cancel).await
     365          452 : }
     366              : 
     367         1424 : async fn do_download_remote_path_retry_forever(
     368         1424 :     storage: &GenericRemoteStorage,
     369         1424 :     remote_path: &RemotePath,
     370         1424 :     download_opts: DownloadOpts,
     371         1424 :     cancel: &CancellationToken,
     372         1424 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
     373         1424 :     download_retry_forever(
     374         1424 :         || async {
     375         1424 :             let download = storage
     376         1424 :                 .download(remote_path, &download_opts, cancel)
     377         1424 :                 .await?;
     378              : 
     379           40 :             let mut bytes = Vec::new();
     380           40 : 
     381           40 :             let stream = download.download_stream;
     382           40 :             let mut stream = StreamReader::new(stream);
     383           40 : 
     384           40 :             tokio::io::copy_buf(&mut stream, &mut bytes).await?;
     385              : 
     386           40 :             Ok((bytes, download.last_modified))
     387         2848 :         },
     388         1424 :         &format!("download {remote_path:?}"),
     389         1424 :         cancel,
     390         1424 :     )
     391         1424 :     .await
     392         1424 : }
     393              : 
     394         1356 : async fn do_download_tenant_manifest(
     395         1356 :     storage: &GenericRemoteStorage,
     396         1356 :     tenant_shard_id: &TenantShardId,
     397         1356 :     _timeline_id: Option<&TimelineId>,
     398         1356 :     generation: Generation,
     399         1356 :     cancel: &CancellationToken,
     400         1356 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
     401         1356 :     let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
     402         1356 : 
     403         1356 :     let download_opts = DownloadOpts {
     404         1356 :         kind: DownloadKind::Small,
     405         1356 :         ..Default::default()
     406         1356 :     };
     407              : 
     408            0 :     let (manifest_bytes, manifest_bytes_mtime) =
     409         1356 :         do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
     410              : 
     411            0 :     let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
     412            0 :         .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
     413            0 :         .map_err(DownloadError::Other)?;
     414              : 
     415            0 :     Ok((tenant_manifest, generation, manifest_bytes_mtime))
     416         1356 : }
     417              : 
     418           68 : async fn do_download_index_part(
     419           68 :     storage: &GenericRemoteStorage,
     420           68 :     tenant_shard_id: &TenantShardId,
     421           68 :     timeline_id: Option<&TimelineId>,
     422           68 :     index_generation: Generation,
     423           68 :     cancel: &CancellationToken,
     424           68 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
     425           68 :     let timeline_id =
     426           68 :         timeline_id.expect("A timeline ID is always provided when downloading an index");
     427           68 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
     428           68 : 
     429           68 :     let download_opts = DownloadOpts {
     430           68 :         kind: DownloadKind::Small,
     431           68 :         ..Default::default()
     432           68 :     };
     433              : 
     434           40 :     let (index_part_bytes, index_part_mtime) =
     435           68 :         do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
     436              : 
     437           40 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     438           40 :         .with_context(|| format!("deserialize index part file at {remote_path:?}"))
     439           40 :         .map_err(DownloadError::Other)?;
     440              : 
     441           40 :     Ok((index_part, index_generation, index_part_mtime))
     442           68 : }
     443              : 
     444              : /// Metadata objects are "generationed", meaning that they include a generation suffix.  This
     445              : /// function downloads the object with the highest generation <= `my_generation`.
     446              : ///
     447              : /// Data objects (layer files) also include a generation in their path, but there is no equivalent
     448              : /// search process, because their reference from an index includes the generation.
     449              : ///
     450              : /// An expensive object listing operation is only done if necessary: the typical fast path is to issue two
     451              : /// GET operations, one to our own generation (stale attachment case), and one to the immediately preceding
     452              : /// generation (normal case when migrating/restarting).  Only if both of these return 404 do we fall back
     453              : /// to listing objects.
     454              : ///
     455              : /// * `my_generation`: the value of `[crate::tenant::Tenant::generation]`
     456              : /// * `what`: for logging, what object are we downloading
     457              : /// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
     458              : /// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless
     459              : ///                  `cancel`` has fired.  This function does not do its own retries of GET operations, and relies
     460              : ///                  on the function passed in to do so.
     461              : /// * `parse_path`: parse a fully qualified remote storage path to get the generation of the object.
     462              : #[allow(clippy::too_many_arguments)]
     463              : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     464              : pub(crate) async fn download_generation_object<'a, T, DF, DFF, PF>(
     465              :     storage: &'a GenericRemoteStorage,
     466              :     tenant_shard_id: &'a TenantShardId,
     467              :     timeline_id: Option<&'a TimelineId>,
     468              :     my_generation: Generation,
     469              :     what: &str,
     470              :     prefix: RemotePath,
     471              :     do_download: DF,
     472              :     parse_path: PF,
     473              :     cancel: &'a CancellationToken,
     474              : ) -> Result<(T, Generation, SystemTime), DownloadError>
     475              : where
     476              :     DF: Fn(
     477              :         &'a GenericRemoteStorage,
     478              :         &'a TenantShardId,
     479              :         Option<&'a TimelineId>,
     480              :         Generation,
     481              :         &'a CancellationToken,
     482              :     ) -> DFF,
     483              :     DFF: Future<Output = Result<(T, Generation, SystemTime), DownloadError>>,
     484              :     PF: Fn(RemotePath) -> Option<Generation>,
     485              :     T: 'static,
     486              : {
     487              :     debug_assert_current_span_has_tenant_id();
     488              : 
     489              :     if my_generation.is_none() {
     490              :         // Operating without generations: just fetch the generation-less path
     491              :         return do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     492              :     }
     493              : 
     494              :     // Stale case: If we were intentionally attached in a stale generation, the remote object may already
     495              :     // exist in our generation.
     496              :     //
     497              :     // This is an optimization to avoid doing the listing for the general case below.
     498              :     let res = do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     499              :     match res {
     500              :         Ok(decoded) => {
     501              :             tracing::debug!("Found {what} from current generation (this is a stale attachment)");
     502              :             return Ok(decoded);
     503              :         }
     504              :         Err(DownloadError::NotFound) => {}
     505              :         Err(e) => return Err(e),
     506              :     };
     507              : 
     508              :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded the object
     509              :     // we are seeking in that generation.  We may safely start from this index without doing a listing, because:
     510              :     //  - We checked for current generation case above
     511              :     //  - generations > my_generation are to be ignored
     512              :     //  - any other objects that exist would have an older generation than `previous_gen`, and
     513              :     //    we want to find the most recent object from a previous generation.
     514              :     //
     515              :     // This is an optimization to avoid doing the listing for the general case below.
     516              :     let res = do_download(
     517              :         storage,
     518              :         tenant_shard_id,
     519              :         timeline_id,
     520              :         my_generation.previous(),
     521              :         cancel,
     522              :     )
     523              :     .await;
     524              :     match res {
     525              :         Ok(decoded) => {
     526              :             tracing::debug!("Found {what} from previous generation");
     527              :             return Ok(decoded);
     528              :         }
     529              :         Err(DownloadError::NotFound) => {
     530              :             tracing::debug!("No {what} found from previous generation, falling back to listing");
     531              :         }
     532              :         Err(e) => {
     533              :             return Err(e);
     534              :         }
     535              :     }
     536              : 
     537              :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     538              :     // objects, and select the highest one with a generation <= my_generation.  Constructing the prefix is equivalent
     539              :     // to constructing a full index path with no generation, because the generation is a suffix.
     540              :     let paths = download_retry(
     541          464 :         || async {
     542          464 :             storage
     543          464 :                 .list(Some(&prefix), ListingMode::NoDelimiter, None, cancel)
     544          464 :                 .await
     545          928 :         },
     546              :         "list index_part files",
     547              :         cancel,
     548              :     )
     549              :     .await?
     550              :     .keys;
     551              : 
     552              :     // General case logic for which index to use: the latest index whose generation
     553              :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     554              :     let max_previous_generation = paths
     555              :         .into_iter()
     556           36 :         .filter_map(|o| parse_path(o.key))
     557           24 :         .filter(|g| g <= &my_generation)
     558              :         .max();
     559              : 
     560              :     match max_previous_generation {
     561              :         Some(g) => {
     562              :             tracing::debug!("Found {what} in generation {g:?}");
     563              :             do_download(storage, tenant_shard_id, timeline_id, g, cancel).await
     564              :         }
     565              :         None => {
     566              :             // Migration from legacy pre-generation state: we have a generation but no prior
     567              :             // attached pageservers did.  Try to load from a no-generation path.
     568              :             tracing::debug!("No {what}* found");
     569              :             do_download(
     570              :                 storage,
     571              :                 tenant_shard_id,
     572              :                 timeline_id,
     573              :                 Generation::none(),
     574              :                 cancel,
     575              :             )
     576              :             .await
     577              :         }
     578              :     }
     579              : }
     580              : 
     581              : /// index_part.json objects are suffixed with a generation number, so we cannot
     582              : /// directly GET the latest index part without doing some probing.
     583              : ///
     584              : /// In this function we probe for the most recent index in a generation <= our current generation.
     585              : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     586           40 : pub(crate) async fn download_index_part(
     587           40 :     storage: &GenericRemoteStorage,
     588           40 :     tenant_shard_id: &TenantShardId,
     589           40 :     timeline_id: &TimelineId,
     590           40 :     my_generation: Generation,
     591           40 :     cancel: &CancellationToken,
     592           40 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
     593           40 :     debug_assert_current_span_has_tenant_and_timeline_id();
     594           40 : 
     595           40 :     let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
     596           40 :     download_generation_object(
     597           40 :         storage,
     598           40 :         tenant_shard_id,
     599           40 :         Some(timeline_id),
     600           40 :         my_generation,
     601           40 :         "index_part",
     602           40 :         index_prefix,
     603           40 :         do_download_index_part,
     604           40 :         parse_remote_index_path,
     605           40 :         cancel,
     606           40 :     )
     607           40 :     .await
     608           40 : }
     609              : 
     610          452 : pub(crate) async fn download_tenant_manifest(
     611          452 :     storage: &GenericRemoteStorage,
     612          452 :     tenant_shard_id: &TenantShardId,
     613          452 :     my_generation: Generation,
     614          452 :     cancel: &CancellationToken,
     615          452 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
     616          452 :     let manifest_prefix = remote_tenant_manifest_prefix(tenant_shard_id);
     617          452 : 
     618          452 :     download_generation_object(
     619          452 :         storage,
     620          452 :         tenant_shard_id,
     621          452 :         None,
     622          452 :         my_generation,
     623          452 :         "tenant-manifest",
     624          452 :         manifest_prefix,
     625          452 :         do_download_tenant_manifest,
     626          452 :         parse_remote_tenant_manifest_path,
     627          452 :         cancel,
     628          452 :     )
     629          452 :     .await
     630          452 : }
     631              : 
     632            4 : pub(crate) async fn download_initdb_tar_zst(
     633            4 :     conf: &'static PageServerConf,
     634            4 :     storage: &GenericRemoteStorage,
     635            4 :     tenant_shard_id: &TenantShardId,
     636            4 :     timeline_id: &TimelineId,
     637            4 :     cancel: &CancellationToken,
     638            4 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
     639            4 :     debug_assert_current_span_has_tenant_and_timeline_id();
     640            4 : 
     641            4 :     let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     642            4 : 
     643            4 :     let remote_preserved_path =
     644            4 :         remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     645            4 : 
     646            4 :     let timeline_path = conf.timelines_path(tenant_shard_id);
     647            4 : 
     648            4 :     if !timeline_path.exists() {
     649            0 :         tokio::fs::create_dir_all(&timeline_path)
     650            0 :             .await
     651            0 :             .with_context(|| format!("timeline dir creation {timeline_path}"))
     652            0 :             .map_err(DownloadError::Other)?;
     653            4 :     }
     654            4 :     let temp_path = timeline_path.join(format!(
     655            4 :         "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
     656            4 :     ));
     657              : 
     658            4 :     let file = download_retry(
     659            4 :         || async {
     660            4 :             let file = OpenOptions::new()
     661            4 :                 .create(true)
     662            4 :                 .truncate(true)
     663            4 :                 .read(true)
     664            4 :                 .write(true)
     665            4 :                 .open(&temp_path)
     666            4 :                 .await
     667            4 :                 .with_context(|| format!("tempfile creation {temp_path}"))
     668            4 :                 .map_err(DownloadError::Other)?;
     669              : 
     670            4 :             let download = match storage
     671            4 :                 .download(&remote_path, &DownloadOpts::default(), cancel)
     672            4 :                 .await
     673              :             {
     674            4 :                 Ok(dl) => dl,
     675              :                 Err(DownloadError::NotFound) => {
     676            0 :                     storage
     677            0 :                         .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
     678            0 :                         .await?
     679              :                 }
     680            0 :                 Err(other) => Err(other)?,
     681              :             };
     682            4 :             let mut download = tokio_util::io::StreamReader::new(download.download_stream);
     683            4 :             let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
     684            4 : 
     685            4 :             tokio::io::copy_buf(&mut download, &mut writer).await?;
     686              : 
     687            4 :             let mut file = writer.into_inner();
     688            4 : 
     689            4 :             file.seek(std::io::SeekFrom::Start(0))
     690            4 :                 .await
     691            4 :                 .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
     692            4 :                 .map_err(DownloadError::Other)?;
     693              : 
     694            4 :             Ok(file)
     695            8 :         },
     696            4 :         &format!("download {remote_path}"),
     697            4 :         cancel,
     698            4 :     )
     699            4 :     .await
     700            4 :     .inspect_err(|_e| {
     701              :         // Do a best-effort attempt at deleting the temporary file upon encountering an error.
     702              :         // We don't have async here nor do we want to pile on any extra errors.
     703            0 :         if let Err(e) = std::fs::remove_file(&temp_path) {
     704            0 :             if e.kind() != std::io::ErrorKind::NotFound {
     705            0 :                 warn!("error deleting temporary file {temp_path}: {e}");
     706            0 :             }
     707            0 :         }
     708            4 :     })?;
     709              : 
     710            4 :     Ok((temp_path, file))
     711            4 : }
     712              : 
     713              : /// Helper function to handle retries for a download operation.
     714              : ///
     715              : /// Remote operations can fail due to rate limits (S3), spurious network
     716              : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     717              : /// with backoff.
     718              : ///
     719              : /// (See similar logic for uploads in `perform_upload_task`)
     720          496 : pub(super) async fn download_retry<T, O, F>(
     721          496 :     op: O,
     722          496 :     description: &str,
     723          496 :     cancel: &CancellationToken,
     724          496 : ) -> Result<T, DownloadError>
     725          496 : where
     726          496 :     O: FnMut() -> F,
     727          496 :     F: Future<Output = Result<T, DownloadError>>,
     728          496 : {
     729          496 :     backoff::retry(
     730          496 :         op,
     731          496 :         DownloadError::is_permanent,
     732          496 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     733          496 :         FAILED_REMOTE_OP_RETRIES,
     734          496 :         description,
     735          496 :         cancel,
     736          496 :     )
     737          496 :     .await
     738          496 :     .ok_or_else(|| DownloadError::Cancelled)
     739          496 :     .and_then(|x| x)
     740          496 : }
     741              : 
     742         1876 : pub(crate) async fn download_retry_forever<T, O, F>(
     743         1876 :     op: O,
     744         1876 :     description: &str,
     745         1876 :     cancel: &CancellationToken,
     746         1876 : ) -> Result<T, DownloadError>
     747         1876 : where
     748         1876 :     O: FnMut() -> F,
     749         1876 :     F: Future<Output = Result<T, DownloadError>>,
     750         1876 : {
     751         1876 :     backoff::retry(
     752         1876 :         op,
     753         1876 :         DownloadError::is_permanent,
     754         1876 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     755         1876 :         u32::MAX,
     756         1876 :         description,
     757         1876 :         cancel,
     758         1876 :     )
     759         1876 :     .await
     760         1876 :     .ok_or_else(|| DownloadError::Cancelled)
     761         1876 :     .and_then(|x| x)
     762         1876 : }
        

Generated by: LCOV version 2.1-beta