LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit
Test: a2f0f8a80fbf1089336086fa360ce27fa555cb1a.info Lines: 87.1 % 441 384
Test Date: 2024-11-20 17:59:39 Functions: 58.8 % 97 57

            Line data    Source code
       1              : //! Helper functions to download files from remote storage with a RemoteStorage
       2              : //!
       3              : //! The functions in this module retry failed operations automatically, according
       4              : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5              : 
       6              : use std::collections::HashSet;
       7              : use std::future::Future;
       8              : use std::str::FromStr;
       9              : use std::time::SystemTime;
      10              : 
      11              : use anyhow::{anyhow, Context};
      12              : use camino::{Utf8Path, Utf8PathBuf};
      13              : use pageserver_api::shard::TenantShardId;
      14              : use tokio::fs::{self, File, OpenOptions};
      15              : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
      16              : use tokio_util::io::StreamReader;
      17              : use tokio_util::sync::CancellationToken;
      18              : use tracing::warn;
      19              : use utils::backoff;
      20              : 
      21              : use crate::config::PageServerConf;
      22              : use crate::context::RequestContext;
      23              : use crate::span::{
      24              :     debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
      25              : };
      26              : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
      27              : use crate::tenant::storage_layer::LayerName;
      28              : use crate::tenant::Generation;
      29              : #[cfg_attr(target_os = "macos", allow(unused_imports))]
      30              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      31              : use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
      32              : use crate::TEMP_FILE_SUFFIX;
      33              : use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath};
      34              : use utils::crashsafe::path_with_suffix_extension;
      35              : use utils::id::{TenantId, TimelineId};
      36              : use utils::pausable_failpoint;
      37              : 
      38              : use super::index::{IndexPart, LayerFileMetadata};
      39              : use super::manifest::TenantManifest;
      40              : use super::{
      41              :     parse_remote_index_path, parse_remote_tenant_manifest_path, remote_index_path,
      42              :     remote_initdb_archive_path, remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
      43              :     remote_tenant_manifest_prefix, remote_tenant_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
      44              :     FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
      45              : };
      46              : 
      47              : ///
      48              : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      49              : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      50              : ///
      51              : /// Returns the size of the downloaded file.
      52              : #[allow(clippy::too_many_arguments)]
      53            6 : pub async fn download_layer_file<'a>(
      54            6 :     conf: &'static PageServerConf,
      55            6 :     storage: &'a GenericRemoteStorage,
      56            6 :     tenant_shard_id: TenantShardId,
      57            6 :     timeline_id: TimelineId,
      58            6 :     layer_file_name: &'a LayerName,
      59            6 :     layer_metadata: &'a LayerFileMetadata,
      60            6 :     local_path: &Utf8Path,
      61            6 :     cancel: &CancellationToken,
      62            6 :     ctx: &RequestContext,
      63            6 : ) -> Result<u64, DownloadError> {
      64            6 :     debug_assert_current_span_has_tenant_and_timeline_id();
      65            6 : 
      66            6 :     let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
      67            6 : 
      68            6 :     let remote_path = remote_layer_path(
      69            6 :         &tenant_shard_id.tenant_id,
      70            6 :         &timeline_id,
      71            6 :         layer_metadata.shard,
      72            6 :         layer_file_name,
      73            6 :         layer_metadata.generation,
      74            6 :     );
      75            6 : 
      76            6 :     // Perform a rename inspired by durable_rename from file_utils.c.
      77            6 :     // The sequence:
      78            6 :     //     write(tmp)
      79            6 :     //     fsync(tmp)
      80            6 :     //     rename(tmp, new)
      81            6 :     //     fsync(new)
      82            6 :     //     fsync(parent)
      83            6 :     // For more context about durable_rename check this email from postgres mailing list:
      84            6 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      85            6 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      86            6 :     let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
      87              : 
      88            6 :     let bytes_amount = download_retry(
      89           70 :         || async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await },
      90            6 :         &format!("download {remote_path:?}"),
      91            6 :         cancel,
      92            6 :     )
      93           70 :     .await?;
      94              : 
      95            6 :     let expected = layer_metadata.file_size;
      96            6 :     if expected != bytes_amount {
      97            0 :         return Err(DownloadError::Other(anyhow!(
      98            0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
      99            0 :         )));
     100            6 :     }
     101            6 : 
     102            6 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
     103            0 :         Err(DownloadError::Other(anyhow!(
     104            0 :             "remote-storage-download-pre-rename failpoint triggered"
     105            0 :         )))
     106            6 :     });
     107              : 
     108            6 :     fs::rename(&temp_file_path, &local_path)
     109            6 :         .await
     110            6 :         .with_context(|| format!("rename download layer file to {local_path}"))
     111            6 :         .map_err(DownloadError::Other)?;
     112              : 
     113              :     // We use fatal_err() below because the after the rename above,
     114              :     // the in-memory state of the filesystem already has the layer file in its final place,
     115              :     // and subsequent pageserver code could think it's durable while it really isn't.
     116            6 :     let work = {
     117            6 :         let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
     118            6 :         async move {
     119            6 :             let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
     120            3 :                 .await
     121            6 :                 .fatal_err("VirtualFile::open for timeline dir fsync");
     122            6 :             timeline_dir
     123            6 :                 .sync_all()
     124            3 :                 .await
     125            6 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     126            6 :         }
     127              :     };
     128            6 :     crate::virtual_file::io_engine::get()
     129            6 :         .spawn_blocking_and_block_on_if_std(work)
     130            9 :         .await;
     131              : 
     132            6 :     tracing::debug!("download complete: {local_path}");
     133              : 
     134            6 :     Ok(bytes_amount)
     135            6 : }
     136              : 
     137              : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
     138              : ///
     139              : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
     140              : /// (Note that the directory entry for the inode is not made durable.)
     141              : /// The file size in bytes is returned.
     142              : ///
     143              : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
     144              : /// The unlinking has _not_ been made durable.
     145            6 : async fn download_object<'a>(
     146            6 :     storage: &'a GenericRemoteStorage,
     147            6 :     src_path: &RemotePath,
     148            6 :     dst_path: &Utf8PathBuf,
     149            6 :     cancel: &CancellationToken,
     150            6 :     #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
     151            6 : ) -> Result<u64, DownloadError> {
     152            6 :     let res = match crate::virtual_file::io_engine::get() {
     153            0 :         crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
     154              :         crate::virtual_file::io_engine::IoEngine::StdFs => {
     155            3 :             async {
     156            3 :                 let destination_file = tokio::fs::File::create(dst_path)
     157            3 :                     .await
     158            3 :                     .with_context(|| format!("create a destination file for layer '{dst_path}'"))
     159            3 :                     .map_err(DownloadError::Other)?;
     160              : 
     161            3 :                 let download = storage
     162            3 :                     .download(src_path, &DownloadOpts::default(), cancel)
     163            6 :                     .await?;
     164              : 
     165            3 :                 pausable_failpoint!("before-downloading-layer-stream-pausable");
     166              : 
     167            3 :                 let mut buf_writer =
     168            3 :                     tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
     169            3 : 
     170            3 :                 let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
     171              : 
     172           20 :                 let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
     173            3 :                 buf_writer.flush().await?;
     174              : 
     175            3 :                 let mut destination_file = buf_writer.into_inner();
     176            3 : 
     177            3 :                 // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     178            3 :                 // A file will not be closed immediately when it goes out of scope if there are any IO operations
     179            3 :                 // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     180            3 :                 // you should call flush before dropping it.
     181            3 :                 //
     182            3 :                 // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     183            3 :                 // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     184            3 :                 // But for additional safety lets check/wait for any pending operations.
     185            3 :                 destination_file
     186            3 :                     .flush()
     187            0 :                     .await
     188            3 :                     .maybe_fatal_err("download_object sync_all")
     189            3 :                     .with_context(|| format!("flush source file at {dst_path}"))
     190            3 :                     .map_err(DownloadError::Other)?;
     191              : 
     192              :                 // not using sync_data because it can lose file size update
     193            3 :                 destination_file
     194            3 :                     .sync_all()
     195            3 :                     .await
     196            3 :                     .maybe_fatal_err("download_object sync_all")
     197            3 :                     .with_context(|| format!("failed to fsync source file at {dst_path}"))
     198            3 :                     .map_err(DownloadError::Other)?;
     199              : 
     200            3 :                 Ok(bytes_amount)
     201            3 :             }
     202           35 :             .await
     203              :         }
     204              :         #[cfg(target_os = "linux")]
     205              :         crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
     206              :             use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
     207              :             use bytes::BytesMut;
     208            3 :             async {
     209            3 :                 let destination_file = VirtualFile::create(dst_path, ctx)
     210            3 :                     .await
     211            3 :                     .with_context(|| format!("create a destination file for layer '{dst_path}'"))
     212            3 :                     .map_err(DownloadError::Other)?;
     213              : 
     214            3 :                 let mut download = storage
     215            3 :                     .download(src_path, &DownloadOpts::default(), cancel)
     216            6 :                     .await?;
     217              : 
     218            3 :                 pausable_failpoint!("before-downloading-layer-stream-pausable");
     219              : 
     220              :                 // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
     221              :                 // There's chunks_vectored() on the stream.
     222            3 :                 let (bytes_amount, destination_file) = async {
     223            3 :                     let size_tracking = size_tracking_writer::Writer::new(destination_file);
     224            3 :                     let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
     225            3 :                         size_tracking,
     226            3 :                         BytesMut::with_capacity(super::BUFFER_SIZE),
     227            3 :                     );
     228           18 :                     while let Some(res) =
     229           21 :                         futures::StreamExt::next(&mut download.download_stream).await
     230              :                     {
     231           18 :                         let chunk = match res {
     232           18 :                             Ok(chunk) => chunk,
     233            0 :                             Err(e) => return Err(e),
     234              :                         };
     235           18 :                         buffered.write_buffered(chunk.slice_len(), ctx).await?;
     236              :                     }
     237            3 :                     let size_tracking = buffered.flush_and_into_inner(ctx).await?;
     238            3 :                     Ok(size_tracking.into_inner())
     239            3 :                 }
     240           20 :                 .await?;
     241              : 
     242              :                 // not using sync_data because it can lose file size update
     243            3 :                 destination_file
     244            3 :                     .sync_all()
     245            3 :                     .await
     246            3 :                     .maybe_fatal_err("download_object sync_all")
     247            3 :                     .with_context(|| format!("failed to fsync source file at {dst_path}"))
     248            3 :                     .map_err(DownloadError::Other)?;
     249              : 
     250            3 :                 Ok(bytes_amount)
     251            3 :             }
     252           35 :             .await
     253              :         }
     254              :     };
     255              : 
     256              :     // in case the download failed, clean up
     257            6 :     match res {
     258            6 :         Ok(bytes_amount) => Ok(bytes_amount),
     259            0 :         Err(e) => {
     260            0 :             if let Err(e) = tokio::fs::remove_file(dst_path).await {
     261            0 :                 if e.kind() != std::io::ErrorKind::NotFound {
     262            0 :                     on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
     263            0 :                 }
     264            0 :             }
     265            0 :             Err(e)
     266              :         }
     267              :     }
     268            6 : }
     269              : 
     270              : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     271              : 
     272            0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
     273            0 :     let extension = path.extension();
     274            0 :     match extension {
     275            0 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     276            0 :         Some(_) => false,
     277            0 :         None => false,
     278              :     }
     279            0 : }
     280              : 
     281          192 : async fn list_identifiers<T>(
     282          192 :     storage: &GenericRemoteStorage,
     283          192 :     prefix: RemotePath,
     284          192 :     cancel: CancellationToken,
     285          192 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
     286          192 : where
     287          192 :     T: FromStr + Eq + std::hash::Hash,
     288          192 : {
     289          192 :     let listing = download_retry_forever(
     290          192 :         || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
     291          192 :         &format!("list identifiers in prefix {prefix}"),
     292          192 :         &cancel,
     293          192 :     )
     294          777 :     .await?;
     295              : 
     296          192 :     let mut parsed_ids = HashSet::new();
     297          192 :     let mut other_prefixes = HashSet::new();
     298              : 
     299          198 :     for id_remote_storage_key in listing.prefixes {
     300            6 :         let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
     301            0 :             anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
     302            6 :         })?;
     303              : 
     304            6 :         match object_name.parse::<T>() {
     305            6 :             Ok(t) => parsed_ids.insert(t),
     306            0 :             Err(_) => other_prefixes.insert(object_name.to_string()),
     307              :         };
     308              :     }
     309              : 
     310          192 :     for object in listing.keys {
     311            0 :         let object_name = object
     312            0 :             .key
     313            0 :             .object_name()
     314            0 :             .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
     315            0 :         other_prefixes.insert(object_name.to_string());
     316              :     }
     317              : 
     318          192 :     Ok((parsed_ids, other_prefixes))
     319          192 : }
     320              : 
     321              : /// List shards of given tenant in remote storage
     322            0 : pub(crate) async fn list_remote_tenant_shards(
     323            0 :     storage: &GenericRemoteStorage,
     324            0 :     tenant_id: TenantId,
     325            0 :     cancel: CancellationToken,
     326            0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
     327            0 :     let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
     328            0 :     list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
     329            0 : }
     330              : 
     331              : /// List timelines of given tenant shard in remote storage
     332          192 : pub async fn list_remote_timelines(
     333          192 :     storage: &GenericRemoteStorage,
     334          192 :     tenant_shard_id: TenantShardId,
     335          192 :     cancel: CancellationToken,
     336          192 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
     337          192 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     338            0 :         anyhow::bail!("storage-sync-list-remote-timelines");
     339          192 :     });
     340              : 
     341          192 :     let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
     342          777 :     list_identifiers::<TimelineId>(storage, remote_path, cancel).await
     343          192 : }
     344              : 
     345          610 : async fn do_download_remote_path_retry_forever(
     346          610 :     storage: &GenericRemoteStorage,
     347          610 :     remote_path: &RemotePath,
     348          610 :     cancel: &CancellationToken,
     349          610 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
     350          610 :     download_retry_forever(
     351          610 :         || async {
     352          610 :             let download = storage
     353          610 :                 .download(remote_path, &DownloadOpts::default(), cancel)
     354          617 :                 .await?;
     355              : 
     356           20 :             let mut bytes = Vec::new();
     357           20 : 
     358           20 :             let stream = download.download_stream;
     359           20 :             let mut stream = StreamReader::new(stream);
     360           20 : 
     361           20 :             tokio::io::copy_buf(&mut stream, &mut bytes).await?;
     362              : 
     363           20 :             Ok((bytes, download.last_modified))
     364         1220 :         },
     365          610 :         &format!("download {remote_path:?}"),
     366          610 :         cancel,
     367          610 :     )
     368          635 :     .await
     369          610 : }
     370              : 
     371          576 : async fn do_download_tenant_manifest(
     372          576 :     storage: &GenericRemoteStorage,
     373          576 :     tenant_shard_id: &TenantShardId,
     374          576 :     _timeline_id: Option<&TimelineId>,
     375          576 :     generation: Generation,
     376          576 :     cancel: &CancellationToken,
     377          576 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
     378          576 :     let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
     379              : 
     380            0 :     let (manifest_bytes, manifest_bytes_mtime) =
     381          576 :         do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
     382              : 
     383            0 :     let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
     384            0 :         .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
     385            0 :         .map_err(DownloadError::Other)?;
     386              : 
     387            0 :     Ok((tenant_manifest, generation, manifest_bytes_mtime))
     388          576 : }
     389              : 
     390           34 : async fn do_download_index_part(
     391           34 :     storage: &GenericRemoteStorage,
     392           34 :     tenant_shard_id: &TenantShardId,
     393           34 :     timeline_id: Option<&TimelineId>,
     394           34 :     index_generation: Generation,
     395           34 :     cancel: &CancellationToken,
     396           34 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
     397           34 :     let timeline_id =
     398           34 :         timeline_id.expect("A timeline ID is always provided when downloading an index");
     399           34 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
     400              : 
     401           20 :     let (index_part_bytes, index_part_mtime) =
     402           68 :         do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
     403              : 
     404           20 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     405           20 :         .with_context(|| format!("deserialize index part file at {remote_path:?}"))
     406           20 :         .map_err(DownloadError::Other)?;
     407              : 
     408           20 :     Ok((index_part, index_generation, index_part_mtime))
     409           34 : }
     410              : 
     411              : /// Metadata objects are "generationed", meaning that they include a generation suffix.  This
     412              : /// function downloads the object with the highest generation <= `my_generation`.
     413              : ///
     414              : /// Data objects (layer files) also include a generation in their path, but there is no equivalent
     415              : /// search process, because their reference from an index includes the generation.
     416              : ///
     417              : /// An expensive object listing operation is only done if necessary: the typical fast path is to issue two
     418              : /// GET operations, one to our own generation (stale attachment case), and one to the immediately preceding
     419              : /// generation (normal case when migrating/restarting).  Only if both of these return 404 do we fall back
     420              : /// to listing objects.
     421              : ///
     422              : /// * `my_generation`: the value of `[crate::tenant::Tenant::generation]`
     423              : /// * `what`: for logging, what object are we downloading
     424              : /// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
     425              : /// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless
     426              : ///                  `cancel`` has fired.  This function does not do its own retries of GET operations, and relies
     427              : ///                  on the function passed in to do so.
     428              : /// * `parse_path`: parse a fully qualified remote storage path to get the generation of the object.
     429              : #[allow(clippy::too_many_arguments)]
     430          212 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     431              : pub(crate) async fn download_generation_object<'a, T, DF, DFF, PF>(
     432              :     storage: &'a GenericRemoteStorage,
     433              :     tenant_shard_id: &'a TenantShardId,
     434              :     timeline_id: Option<&'a TimelineId>,
     435              :     my_generation: Generation,
     436              :     what: &str,
     437              :     prefix: RemotePath,
     438              :     do_download: DF,
     439              :     parse_path: PF,
     440              :     cancel: &'a CancellationToken,
     441              : ) -> Result<(T, Generation, SystemTime), DownloadError>
     442              : where
     443              :     DF: Fn(
     444              :         &'a GenericRemoteStorage,
     445              :         &'a TenantShardId,
     446              :         Option<&'a TimelineId>,
     447              :         Generation,
     448              :         &'a CancellationToken,
     449              :     ) -> DFF,
     450              :     DFF: Future<Output = Result<(T, Generation, SystemTime), DownloadError>>,
     451              :     PF: Fn(RemotePath) -> Option<Generation>,
     452              :     T: 'static,
     453              : {
     454              :     debug_assert_current_span_has_tenant_id();
     455              : 
     456              :     if my_generation.is_none() {
     457              :         // Operating without generations: just fetch the generation-less path
     458              :         return do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     459              :     }
     460              : 
     461              :     // Stale case: If we were intentionally attached in a stale generation, the remote object may already
     462              :     // exist in our generation.
     463              :     //
     464              :     // This is an optimization to avoid doing the listing for the general case below.
     465              :     let res = do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     466              :     match res {
     467              :         Ok(decoded) => {
     468              :             tracing::debug!("Found {what} from current generation (this is a stale attachment)");
     469              :             return Ok(decoded);
     470              :         }
     471              :         Err(DownloadError::NotFound) => {}
     472              :         Err(e) => return Err(e),
     473              :     };
     474              : 
     475              :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded the object
     476              :     // we are seeking in that generation.  We may safely start from this index without doing a listing, because:
     477              :     //  - We checked for current generation case above
     478              :     //  - generations > my_generation are to be ignored
     479              :     //  - any other objects that exist would have an older generation than `previous_gen`, and
     480              :     //    we want to find the most recent object from a previous generation.
     481              :     //
     482              :     // This is an optimization to avoid doing the listing for the general case below.
     483              :     let res = do_download(
     484              :         storage,
     485              :         tenant_shard_id,
     486              :         timeline_id,
     487              :         my_generation.previous(),
     488              :         cancel,
     489              :     )
     490              :     .await;
     491              :     match res {
     492              :         Ok(decoded) => {
     493              :             tracing::debug!("Found {what} from previous generation");
     494              :             return Ok(decoded);
     495              :         }
     496              :         Err(DownloadError::NotFound) => {
     497              :             tracing::debug!("No {what} found from previous generation, falling back to listing");
     498              :         }
     499              :         Err(e) => {
     500              :             return Err(e);
     501              :         }
     502              :     }
     503              : 
     504              :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     505              :     // objects, and select the highest one with a generation <= my_generation.  Constructing the prefix is equivalent
     506              :     // to constructing a full index path with no generation, because the generation is a suffix.
     507              :     let paths = download_retry(
     508          198 :         || async {
     509          198 :             storage
     510          198 :                 .list(Some(&prefix), ListingMode::NoDelimiter, None, cancel)
     511          585 :                 .await
     512          396 :         },
     513              :         "list index_part files",
     514              :         cancel,
     515              :     )
     516              :     .await?
     517              :     .keys;
     518              : 
     519              :     // General case logic for which index to use: the latest index whose generation
     520              :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     521              :     let max_previous_generation = paths
     522              :         .into_iter()
     523           18 :         .filter_map(|o| parse_path(o.key))
     524           12 :         .filter(|g| g <= &my_generation)
     525              :         .max();
     526              : 
     527              :     match max_previous_generation {
     528              :         Some(g) => {
     529              :             tracing::debug!("Found {what} in generation {g:?}");
     530              :             do_download(storage, tenant_shard_id, timeline_id, g, cancel).await
     531              :         }
     532              :         None => {
     533              :             // Migration from legacy pre-generation state: we have a generation but no prior
     534              :             // attached pageservers did.  Try to load from a no-generation path.
     535              :             tracing::debug!("No {what}* found");
     536              :             do_download(
     537              :                 storage,
     538              :                 tenant_shard_id,
     539              :                 timeline_id,
     540              :                 Generation::none(),
     541              :                 cancel,
     542              :             )
     543              :             .await
     544              :         }
     545              :     }
     546              : }
     547              : 
     548              : /// index_part.json objects are suffixed with a generation number, so we cannot
     549              : /// directly GET the latest index part without doing some probing.
     550              : ///
     551              : /// In this function we probe for the most recent index in a generation <= our current generation.
     552              : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     553           20 : pub(crate) async fn download_index_part(
     554           20 :     storage: &GenericRemoteStorage,
     555           20 :     tenant_shard_id: &TenantShardId,
     556           20 :     timeline_id: &TimelineId,
     557           20 :     my_generation: Generation,
     558           20 :     cancel: &CancellationToken,
     559           20 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
     560           20 :     debug_assert_current_span_has_tenant_and_timeline_id();
     561           20 : 
     562           20 :     let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
     563           20 :     download_generation_object(
     564           20 :         storage,
     565           20 :         tenant_shard_id,
     566           20 :         Some(timeline_id),
     567           20 :         my_generation,
     568           20 :         "index_part",
     569           20 :         index_prefix,
     570           20 :         do_download_index_part,
     571           20 :         parse_remote_index_path,
     572           20 :         cancel,
     573           20 :     )
     574           92 :     .await
     575           20 : }
     576              : 
     577          192 : pub(crate) async fn download_tenant_manifest(
     578          192 :     storage: &GenericRemoteStorage,
     579          192 :     tenant_shard_id: &TenantShardId,
     580          192 :     my_generation: Generation,
     581          192 :     cancel: &CancellationToken,
     582          192 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
     583          192 :     let manifest_prefix = remote_tenant_manifest_prefix(tenant_shard_id);
     584          192 : 
     585          192 :     download_generation_object(
     586          192 :         storage,
     587          192 :         tenant_shard_id,
     588          192 :         None,
     589          192 :         my_generation,
     590          192 :         "tenant-manifest",
     591          192 :         manifest_prefix,
     592          192 :         do_download_tenant_manifest,
     593          192 :         parse_remote_tenant_manifest_path,
     594          192 :         cancel,
     595          192 :     )
     596         1128 :     .await
     597          192 : }
     598              : 
     599            2 : pub(crate) async fn download_initdb_tar_zst(
     600            2 :     conf: &'static PageServerConf,
     601            2 :     storage: &GenericRemoteStorage,
     602            2 :     tenant_shard_id: &TenantShardId,
     603            2 :     timeline_id: &TimelineId,
     604            2 :     cancel: &CancellationToken,
     605            2 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
     606            2 :     debug_assert_current_span_has_tenant_and_timeline_id();
     607            2 : 
     608            2 :     let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     609            2 : 
     610            2 :     let remote_preserved_path =
     611            2 :         remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     612            2 : 
     613            2 :     let timeline_path = conf.timelines_path(tenant_shard_id);
     614            2 : 
     615            2 :     if !timeline_path.exists() {
     616            0 :         tokio::fs::create_dir_all(&timeline_path)
     617            0 :             .await
     618            0 :             .with_context(|| format!("timeline dir creation {timeline_path}"))
     619            0 :             .map_err(DownloadError::Other)?;
     620            2 :     }
     621            2 :     let temp_path = timeline_path.join(format!(
     622            2 :         "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
     623            2 :     ));
     624              : 
     625            2 :     let file = download_retry(
     626            2 :         || async {
     627            2 :             let file = OpenOptions::new()
     628            2 :                 .create(true)
     629            2 :                 .truncate(true)
     630            2 :                 .read(true)
     631            2 :                 .write(true)
     632            2 :                 .open(&temp_path)
     633            2 :                 .await
     634            2 :                 .with_context(|| format!("tempfile creation {temp_path}"))
     635            2 :                 .map_err(DownloadError::Other)?;
     636              : 
     637            2 :             let download = match storage
     638            2 :                 .download(&remote_path, &DownloadOpts::default(), cancel)
     639            4 :                 .await
     640              :             {
     641            2 :                 Ok(dl) => dl,
     642              :                 Err(DownloadError::NotFound) => {
     643            0 :                     storage
     644            0 :                         .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
     645            0 :                         .await?
     646              :                 }
     647            0 :                 Err(other) => Err(other)?,
     648              :             };
     649            2 :             let mut download = tokio_util::io::StreamReader::new(download.download_stream);
     650            2 :             let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
     651            2 : 
     652          517 :             tokio::io::copy_buf(&mut download, &mut writer).await?;
     653              : 
     654            2 :             let mut file = writer.into_inner();
     655            2 : 
     656            2 :             file.seek(std::io::SeekFrom::Start(0))
     657            1 :                 .await
     658            2 :                 .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
     659            2 :                 .map_err(DownloadError::Other)?;
     660              : 
     661            2 :             Ok(file)
     662            4 :         },
     663            2 :         &format!("download {remote_path}"),
     664            2 :         cancel,
     665            2 :     )
     666          524 :     .await
     667            2 :     .inspect_err(|_e| {
     668              :         // Do a best-effort attempt at deleting the temporary file upon encountering an error.
     669              :         // We don't have async here nor do we want to pile on any extra errors.
     670            0 :         if let Err(e) = std::fs::remove_file(&temp_path) {
     671            0 :             if e.kind() != std::io::ErrorKind::NotFound {
     672            0 :                 warn!("error deleting temporary file {temp_path}: {e}");
     673            0 :             }
     674            0 :         }
     675            2 :     })?;
     676              : 
     677            2 :     Ok((temp_path, file))
     678            2 : }
     679              : 
     680              : /// Helper function to handle retries for a download operation.
     681              : ///
     682              : /// Remote operations can fail due to rate limits (S3), spurious network
     683              : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     684              : /// with backoff.
     685              : ///
     686              : /// (See similar logic for uploads in `perform_upload_task`)
     687          206 : pub(super) async fn download_retry<T, O, F>(
     688          206 :     op: O,
     689          206 :     description: &str,
     690          206 :     cancel: &CancellationToken,
     691          206 : ) -> Result<T, DownloadError>
     692          206 : where
     693          206 :     O: FnMut() -> F,
     694          206 :     F: Future<Output = Result<T, DownloadError>>,
     695          206 : {
     696          206 :     backoff::retry(
     697          206 :         op,
     698          206 :         DownloadError::is_permanent,
     699          206 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     700          206 :         FAILED_REMOTE_OP_RETRIES,
     701          206 :         description,
     702          206 :         cancel,
     703          206 :     )
     704         1179 :     .await
     705          206 :     .ok_or_else(|| DownloadError::Cancelled)
     706          206 :     .and_then(|x| x)
     707          206 : }
     708              : 
     709          802 : async fn download_retry_forever<T, O, F>(
     710          802 :     op: O,
     711          802 :     description: &str,
     712          802 :     cancel: &CancellationToken,
     713          802 : ) -> Result<T, DownloadError>
     714          802 : where
     715          802 :     O: FnMut() -> F,
     716          802 :     F: Future<Output = Result<T, DownloadError>>,
     717          802 : {
     718          802 :     backoff::retry(
     719          802 :         op,
     720          802 :         DownloadError::is_permanent,
     721          802 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     722          802 :         u32::MAX,
     723          802 :         description,
     724          802 :         cancel,
     725          802 :     )
     726         1412 :     .await
     727          802 :     .ok_or_else(|| DownloadError::Cancelled)
     728          802 :     .and_then(|x| x)
     729          802 : }
        

Generated by: LCOV version 2.1-beta