LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit
Test: 2b0730d767f560e20b6748f57465922aa8bb805e.info Lines: 86.1 % 359 309
Test Date: 2024-09-25 14:04:07 Functions: 54.4 % 79 43

            Line data    Source code
       1              : //! Helper functions to download files from remote storage with a RemoteStorage
       2              : //!
       3              : //! The functions in this module retry failed operations automatically, according
       4              : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5              : 
       6              : use std::collections::HashSet;
       7              : use std::future::Future;
       8              : use std::str::FromStr;
       9              : 
      10              : use anyhow::{anyhow, Context};
      11              : use camino::{Utf8Path, Utf8PathBuf};
      12              : use pageserver_api::shard::TenantShardId;
      13              : use tokio::fs::{self, File, OpenOptions};
      14              : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
      15              : use tokio_util::io::StreamReader;
      16              : use tokio_util::sync::CancellationToken;
      17              : use tracing::warn;
      18              : use utils::backoff;
      19              : 
      20              : use crate::config::PageServerConf;
      21              : use crate::context::RequestContext;
      22              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      23              : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
      24              : use crate::tenant::storage_layer::LayerName;
      25              : use crate::tenant::Generation;
      26              : #[cfg_attr(target_os = "macos", allow(unused_imports))]
      27              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      28              : use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
      29              : use crate::TEMP_FILE_SUFFIX;
      30              : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
      31              : use utils::crashsafe::path_with_suffix_extension;
      32              : use utils::id::{TenantId, TimelineId};
      33              : use utils::pausable_failpoint;
      34              : 
      35              : use super::index::{IndexPart, LayerFileMetadata};
      36              : use super::{
      37              :     parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
      38              :     remote_initdb_preserved_archive_path, remote_tenant_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
      39              :     FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
      40              : };
      41              : 
      42              : ///
      43              : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      44              : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      45              : ///
      46              : /// Returns the size of the downloaded file.
      47              : #[allow(clippy::too_many_arguments)]
      48           18 : pub async fn download_layer_file<'a>(
      49           18 :     conf: &'static PageServerConf,
      50           18 :     storage: &'a GenericRemoteStorage,
      51           18 :     tenant_shard_id: TenantShardId,
      52           18 :     timeline_id: TimelineId,
      53           18 :     layer_file_name: &'a LayerName,
      54           18 :     layer_metadata: &'a LayerFileMetadata,
      55           18 :     local_path: &Utf8Path,
      56           18 :     cancel: &CancellationToken,
      57           18 :     ctx: &RequestContext,
      58           18 : ) -> Result<u64, DownloadError> {
      59           18 :     debug_assert_current_span_has_tenant_and_timeline_id();
      60           18 : 
      61           18 :     let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
      62           18 : 
      63           18 :     let remote_path = remote_layer_path(
      64           18 :         &tenant_shard_id.tenant_id,
      65           18 :         &timeline_id,
      66           18 :         layer_metadata.shard,
      67           18 :         layer_file_name,
      68           18 :         layer_metadata.generation,
      69           18 :     );
      70           18 : 
      71           18 :     // Perform a rename inspired by durable_rename from file_utils.c.
      72           18 :     // The sequence:
      73           18 :     //     write(tmp)
      74           18 :     //     fsync(tmp)
      75           18 :     //     rename(tmp, new)
      76           18 :     //     fsync(new)
      77           18 :     //     fsync(parent)
      78           18 :     // For more context about durable_rename check this email from postgres mailing list:
      79           18 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      80           18 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      81           18 :     let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
      82              : 
      83           18 :     let bytes_amount = download_retry(
      84          204 :         || async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await },
      85           18 :         &format!("download {remote_path:?}"),
      86           18 :         cancel,
      87           18 :     )
      88          204 :     .await?;
      89              : 
      90           18 :     let expected = layer_metadata.file_size;
      91           18 :     if expected != bytes_amount {
      92            0 :         return Err(DownloadError::Other(anyhow!(
      93            0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
      94            0 :         )));
      95           18 :     }
      96           18 : 
      97           18 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
      98            0 :         Err(DownloadError::Other(anyhow!(
      99            0 :             "remote-storage-download-pre-rename failpoint triggered"
     100            0 :         )))
     101           18 :     });
     102              : 
     103           18 :     fs::rename(&temp_file_path, &local_path)
     104           18 :         .await
     105           18 :         .with_context(|| format!("rename download layer file to {local_path}"))
     106           18 :         .map_err(DownloadError::Other)?;
     107              : 
     108              :     // We use fatal_err() below because the after the rename above,
     109              :     // the in-memory state of the filesystem already has the layer file in its final place,
     110              :     // and subsequent pageserver code could think it's durable while it really isn't.
     111           18 :     let work = {
     112           18 :         let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
     113           18 :         async move {
     114           18 :             let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
     115            9 :                 .await
     116           18 :                 .fatal_err("VirtualFile::open for timeline dir fsync");
     117           18 :             timeline_dir
     118           18 :                 .sync_all()
     119            9 :                 .await
     120           18 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     121           18 :         }
     122              :     };
     123           18 :     crate::virtual_file::io_engine::get()
     124           18 :         .spawn_blocking_and_block_on_if_std(work)
     125           27 :         .await;
     126              : 
     127           18 :     tracing::debug!("download complete: {local_path}");
     128              : 
     129           18 :     Ok(bytes_amount)
     130           18 : }
     131              : 
     132              : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
     133              : ///
     134              : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
     135              : /// (Note that the directory entry for the inode is not made durable.)
     136              : /// The file size in bytes is returned.
     137              : ///
     138              : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
     139              : /// The unlinking has _not_ been made durable.
     140           18 : async fn download_object<'a>(
     141           18 :     storage: &'a GenericRemoteStorage,
     142           18 :     src_path: &RemotePath,
     143           18 :     dst_path: &Utf8PathBuf,
     144           18 :     cancel: &CancellationToken,
     145           18 :     #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
     146           18 : ) -> Result<u64, DownloadError> {
     147           18 :     let res = match crate::virtual_file::io_engine::get() {
     148            0 :         crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
     149              :         crate::virtual_file::io_engine::IoEngine::StdFs => {
     150            9 :             async {
     151            9 :                 let destination_file = tokio::fs::File::create(dst_path)
     152            8 :                     .await
     153            9 :                     .with_context(|| format!("create a destination file for layer '{dst_path}'"))
     154            9 :                     .map_err(DownloadError::Other)?;
     155              : 
     156           16 :                 let download = storage.download(src_path, cancel).await?;
     157              : 
     158            9 :                 pausable_failpoint!("before-downloading-layer-stream-pausable");
     159              : 
     160            9 :                 let mut buf_writer =
     161            9 :                     tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
     162            9 : 
     163            9 :                 let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
     164              : 
     165           57 :                 let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
     166            9 :                 buf_writer.flush().await?;
     167              : 
     168            9 :                 let mut destination_file = buf_writer.into_inner();
     169            9 : 
     170            9 :                 // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     171            9 :                 // A file will not be closed immediately when it goes out of scope if there are any IO operations
     172            9 :                 // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     173            9 :                 // you should call flush before dropping it.
     174            9 :                 //
     175            9 :                 // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     176            9 :                 // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     177            9 :                 // But for additional safety lets check/wait for any pending operations.
     178            9 :                 destination_file
     179            9 :                     .flush()
     180            0 :                     .await
     181            9 :                     .with_context(|| format!("flush source file at {dst_path}"))
     182            9 :                     .map_err(DownloadError::Other)?;
     183              : 
     184              :                 // not using sync_data because it can lose file size update
     185            9 :                 destination_file
     186            9 :                     .sync_all()
     187            9 :                     .await
     188            9 :                     .with_context(|| format!("failed to fsync source file at {dst_path}"))
     189            9 :                     .map_err(DownloadError::Other)?;
     190              : 
     191            9 :                 Ok(bytes_amount)
     192            9 :             }
     193           98 :             .await
     194              :         }
     195              :         #[cfg(target_os = "linux")]
     196              :         crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
     197              :             use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
     198              :             use bytes::BytesMut;
     199            9 :             async {
     200            9 :                 let destination_file = VirtualFile::create(dst_path, ctx)
     201            9 :                     .await
     202            9 :                     .with_context(|| format!("create a destination file for layer '{dst_path}'"))
     203            9 :                     .map_err(DownloadError::Other)?;
     204              : 
     205           16 :                 let mut download = storage.download(src_path, cancel).await?;
     206              : 
     207            9 :                 pausable_failpoint!("before-downloading-layer-stream-pausable");
     208              : 
     209              :                 // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
     210              :                 // There's chunks_vectored() on the stream.
     211            9 :                 let (bytes_amount, destination_file) = async {
     212            9 :                     let size_tracking = size_tracking_writer::Writer::new(destination_file);
     213            9 :                     let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
     214            9 :                         size_tracking,
     215            9 :                         BytesMut::with_capacity(super::BUFFER_SIZE),
     216            9 :                     );
     217           54 :                     while let Some(res) =
     218           63 :                         futures::StreamExt::next(&mut download.download_stream).await
     219              :                     {
     220           54 :                         let chunk = match res {
     221           54 :                             Ok(chunk) => chunk,
     222            0 :                             Err(e) => return Err(e),
     223              :                         };
     224           54 :                         buffered.write_buffered(chunk.slice_len(), ctx).await?;
     225              :                     }
     226            9 :                     let size_tracking = buffered.flush_and_into_inner(ctx).await?;
     227            9 :                     Ok(size_tracking.into_inner())
     228            9 :                 }
     229           64 :                 .await?;
     230              : 
     231              :                 // not using sync_data because it can lose file size update
     232            9 :                 destination_file
     233            9 :                     .sync_all()
     234            9 :                     .await
     235            9 :                     .with_context(|| format!("failed to fsync source file at {dst_path}"))
     236            9 :                     .map_err(DownloadError::Other)?;
     237              : 
     238            9 :                 Ok(bytes_amount)
     239            9 :             }
     240          106 :             .await
     241              :         }
     242              :     };
     243              : 
     244              :     // in case the download failed, clean up
     245           18 :     match res {
     246           18 :         Ok(bytes_amount) => Ok(bytes_amount),
     247            0 :         Err(e) => {
     248            0 :             if let Err(e) = tokio::fs::remove_file(dst_path).await {
     249            0 :                 if e.kind() != std::io::ErrorKind::NotFound {
     250            0 :                     on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
     251            0 :                 }
     252            0 :             }
     253            0 :             Err(e)
     254              :         }
     255              :     }
     256           18 : }
     257              : 
     258              : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     259              : 
     260            0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
     261            0 :     let extension = path.extension();
     262            0 :     match extension {
     263            0 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     264            0 :         Some(_) => false,
     265            0 :         None => false,
     266              :     }
     267            0 : }
     268              : 
     269          570 : async fn list_identifiers<T>(
     270          570 :     storage: &GenericRemoteStorage,
     271          570 :     prefix: RemotePath,
     272          570 :     cancel: CancellationToken,
     273          570 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
     274          570 : where
     275          570 :     T: FromStr + Eq + std::hash::Hash,
     276          570 : {
     277          570 :     let listing = download_retry_forever(
     278          570 :         || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
     279          570 :         &format!("list identifiers in prefix {prefix}"),
     280          570 :         &cancel,
     281          570 :     )
     282         2238 :     .await?;
     283              : 
     284          570 :     let mut parsed_ids = HashSet::new();
     285          570 :     let mut other_prefixes = HashSet::new();
     286              : 
     287          588 :     for id_remote_storage_key in listing.prefixes {
     288           18 :         let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
     289            0 :             anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
     290           18 :         })?;
     291              : 
     292           18 :         match object_name.parse::<T>() {
     293           18 :             Ok(t) => parsed_ids.insert(t),
     294            0 :             Err(_) => other_prefixes.insert(object_name.to_string()),
     295              :         };
     296              :     }
     297              : 
     298          570 :     for object in listing.keys {
     299            0 :         let object_name = object
     300            0 :             .key
     301            0 :             .object_name()
     302            0 :             .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
     303            0 :         other_prefixes.insert(object_name.to_string());
     304              :     }
     305              : 
     306          570 :     Ok((parsed_ids, other_prefixes))
     307          570 : }
     308              : 
     309              : /// List shards of given tenant in remote storage
     310            0 : pub(crate) async fn list_remote_tenant_shards(
     311            0 :     storage: &GenericRemoteStorage,
     312            0 :     tenant_id: TenantId,
     313            0 :     cancel: CancellationToken,
     314            0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
     315            0 :     let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
     316            0 :     list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
     317            0 : }
     318              : 
     319              : /// List timelines of given tenant shard in remote storage
     320          570 : pub async fn list_remote_timelines(
     321          570 :     storage: &GenericRemoteStorage,
     322          570 :     tenant_shard_id: TenantShardId,
     323          570 :     cancel: CancellationToken,
     324          570 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
     325          570 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     326            0 :         anyhow::bail!("storage-sync-list-remote-timelines");
     327          570 :     });
     328              : 
     329          570 :     let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
     330         2238 :     list_identifiers::<TimelineId>(storage, remote_path, cancel).await
     331          570 : }
     332              : 
     333          102 : async fn do_download_index_part(
     334          102 :     storage: &GenericRemoteStorage,
     335          102 :     tenant_shard_id: &TenantShardId,
     336          102 :     timeline_id: &TimelineId,
     337          102 :     index_generation: Generation,
     338          102 :     cancel: &CancellationToken,
     339          102 : ) -> Result<(IndexPart, Generation), DownloadError> {
     340          102 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
     341              : 
     342          102 :     let index_part_bytes = download_retry_forever(
     343          102 :         || async {
     344          161 :             let download = storage.download(&remote_path, cancel).await?;
     345              : 
     346           60 :             let mut bytes = Vec::new();
     347           60 : 
     348           60 :             let stream = download.download_stream;
     349           60 :             let mut stream = StreamReader::new(stream);
     350           60 : 
     351          120 :             tokio::io::copy_buf(&mut stream, &mut bytes).await?;
     352              : 
     353           60 :             Ok(bytes)
     354          204 :         },
     355          102 :         &format!("download {remote_path:?}"),
     356          102 :         cancel,
     357          102 :     )
     358          281 :     .await?;
     359              : 
     360           60 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     361           60 :         .with_context(|| format!("deserialize index part file at {remote_path:?}"))
     362           60 :         .map_err(DownloadError::Other)?;
     363              : 
     364           60 :     Ok((index_part, index_generation))
     365          102 : }
     366              : 
     367              : /// index_part.json objects are suffixed with a generation number, so we cannot
     368              : /// directly GET the latest index part without doing some probing.
     369              : ///
     370              : /// In this function we probe for the most recent index in a generation <= our current generation.
     371              : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     372           60 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     373              : pub(crate) async fn download_index_part(
     374              :     storage: &GenericRemoteStorage,
     375              :     tenant_shard_id: &TenantShardId,
     376              :     timeline_id: &TimelineId,
     377              :     my_generation: Generation,
     378              :     cancel: &CancellationToken,
     379              : ) -> Result<(IndexPart, Generation), DownloadError> {
     380              :     debug_assert_current_span_has_tenant_and_timeline_id();
     381              : 
     382              :     if my_generation.is_none() {
     383              :         // Operating without generations: just fetch the generation-less path
     384              :         return do_download_index_part(
     385              :             storage,
     386              :             tenant_shard_id,
     387              :             timeline_id,
     388              :             my_generation,
     389              :             cancel,
     390              :         )
     391              :         .await;
     392              :     }
     393              : 
     394              :     // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
     395              :     // index in our generation.
     396              :     //
     397              :     // This is an optimization to avoid doing the listing for the general case below.
     398              :     let res =
     399              :         do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     400              :     match res {
     401              :         Ok(index_part) => {
     402              :             tracing::debug!(
     403              :                 "Found index_part from current generation (this is a stale attachment)"
     404              :             );
     405              :             return Ok(index_part);
     406              :         }
     407              :         Err(DownloadError::NotFound) => {}
     408              :         Err(e) => return Err(e),
     409              :     };
     410              : 
     411              :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded
     412              :     // and index part.  We may safely start from this index without doing a listing, because:
     413              :     //  - We checked for current generation case above
     414              :     //  - generations > my_generation are to be ignored
     415              :     //  - any other indices that exist would have an older generation than `previous_gen`, and
     416              :     //    we want to find the most recent index from a previous generation.
     417              :     //
     418              :     // This is an optimization to avoid doing the listing for the general case below.
     419              :     let res = do_download_index_part(
     420              :         storage,
     421              :         tenant_shard_id,
     422              :         timeline_id,
     423              :         my_generation.previous(),
     424              :         cancel,
     425              :     )
     426              :     .await;
     427              :     match res {
     428              :         Ok(index_part) => {
     429              :             tracing::debug!("Found index_part from previous generation");
     430              :             return Ok(index_part);
     431              :         }
     432              :         Err(DownloadError::NotFound) => {
     433              :             tracing::debug!(
     434              :                 "No index_part found from previous generation, falling back to listing"
     435              :             );
     436              :         }
     437              :         Err(e) => {
     438              :             return Err(e);
     439              :         }
     440              :     }
     441              : 
     442              :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     443              :     // objects, and select the highest one with a generation <= my_generation.  Constructing the prefix is equivalent
     444              :     // to constructing a full index path with no generation, because the generation is a suffix.
     445              :     let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
     446              : 
     447              :     let indices = download_retry(
     448           18 :         || async {
     449           18 :             storage
     450           18 :                 .list(Some(&index_prefix), ListingMode::NoDelimiter, None, cancel)
     451           18 :                 .await
     452           36 :         },
     453              :         "list index_part files",
     454              :         cancel,
     455              :     )
     456              :     .await?
     457              :     .keys;
     458              : 
     459              :     // General case logic for which index to use: the latest index whose generation
     460              :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     461              :     let max_previous_generation = indices
     462              :         .into_iter()
     463           54 :         .filter_map(|o| parse_remote_index_path(o.key))
     464           36 :         .filter(|g| g <= &my_generation)
     465              :         .max();
     466              : 
     467              :     match max_previous_generation {
     468              :         Some(g) => {
     469              :             tracing::debug!("Found index_part in generation {g:?}");
     470              :             do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
     471              :         }
     472              :         None => {
     473              :             // Migration from legacy pre-generation state: we have a generation but no prior
     474              :             // attached pageservers did.  Try to load from a no-generation path.
     475              :             tracing::debug!("No index_part.json* found");
     476              :             do_download_index_part(
     477              :                 storage,
     478              :                 tenant_shard_id,
     479              :                 timeline_id,
     480              :                 Generation::none(),
     481              :                 cancel,
     482              :             )
     483              :             .await
     484              :         }
     485              :     }
     486              : }
     487              : 
     488            6 : pub(crate) async fn download_initdb_tar_zst(
     489            6 :     conf: &'static PageServerConf,
     490            6 :     storage: &GenericRemoteStorage,
     491            6 :     tenant_shard_id: &TenantShardId,
     492            6 :     timeline_id: &TimelineId,
     493            6 :     cancel: &CancellationToken,
     494            6 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
     495            6 :     debug_assert_current_span_has_tenant_and_timeline_id();
     496            6 : 
     497            6 :     let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     498            6 : 
     499            6 :     let remote_preserved_path =
     500            6 :         remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     501            6 : 
     502            6 :     let timeline_path = conf.timelines_path(tenant_shard_id);
     503            6 : 
     504            6 :     if !timeline_path.exists() {
     505            0 :         tokio::fs::create_dir_all(&timeline_path)
     506            0 :             .await
     507            0 :             .with_context(|| format!("timeline dir creation {timeline_path}"))
     508            0 :             .map_err(DownloadError::Other)?;
     509            6 :     }
     510            6 :     let temp_path = timeline_path.join(format!(
     511            6 :         "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
     512            6 :     ));
     513              : 
     514            6 :     let file = download_retry(
     515            6 :         || async {
     516            6 :             let file = OpenOptions::new()
     517            6 :                 .create(true)
     518            6 :                 .truncate(true)
     519            6 :                 .read(true)
     520            6 :                 .write(true)
     521            6 :                 .open(&temp_path)
     522            6 :                 .await
     523            6 :                 .with_context(|| format!("tempfile creation {temp_path}"))
     524            6 :                 .map_err(DownloadError::Other)?;
     525              : 
     526           12 :             let download = match storage.download(&remote_path, cancel).await {
     527            6 :                 Ok(dl) => dl,
     528              :                 Err(DownloadError::NotFound) => {
     529            0 :                     storage.download(&remote_preserved_path, cancel).await?
     530              :                 }
     531            0 :                 Err(other) => Err(other)?,
     532              :             };
     533            6 :             let mut download = tokio_util::io::StreamReader::new(download.download_stream);
     534            6 :             let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
     535            6 : 
     536         2131 :             tokio::io::copy_buf(&mut download, &mut writer).await?;
     537              : 
     538            6 :             let mut file = writer.into_inner();
     539            6 : 
     540            6 :             file.seek(std::io::SeekFrom::Start(0))
     541            5 :                 .await
     542            6 :                 .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
     543            6 :                 .map_err(DownloadError::Other)?;
     544              : 
     545            6 :             Ok(file)
     546           12 :         },
     547            6 :         &format!("download {remote_path}"),
     548            6 :         cancel,
     549            6 :     )
     550         2154 :     .await
     551            6 :     .inspect_err(|_e| {
     552              :         // Do a best-effort attempt at deleting the temporary file upon encountering an error.
     553              :         // We don't have async here nor do we want to pile on any extra errors.
     554            0 :         if let Err(e) = std::fs::remove_file(&temp_path) {
     555            0 :             if e.kind() != std::io::ErrorKind::NotFound {
     556            0 :                 warn!("error deleting temporary file {temp_path}: {e}");
     557            0 :             }
     558            0 :         }
     559            6 :     })?;
     560              : 
     561            6 :     Ok((temp_path, file))
     562            6 : }
     563              : 
     564              : /// Helper function to handle retries for a download operation.
     565              : ///
     566              : /// Remote operations can fail due to rate limits (S3), spurious network
     567              : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     568              : /// with backoff.
     569              : ///
     570              : /// (See similar logic for uploads in `perform_upload_task`)
     571           42 : pub(super) async fn download_retry<T, O, F>(
     572           42 :     op: O,
     573           42 :     description: &str,
     574           42 :     cancel: &CancellationToken,
     575           42 : ) -> Result<T, DownloadError>
     576           42 : where
     577           42 :     O: FnMut() -> F,
     578           42 :     F: Future<Output = Result<T, DownloadError>>,
     579           42 : {
     580           42 :     backoff::retry(
     581           42 :         op,
     582           42 :         DownloadError::is_permanent,
     583           42 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     584           42 :         FAILED_REMOTE_OP_RETRIES,
     585           42 :         description,
     586           42 :         cancel,
     587           42 :     )
     588         2376 :     .await
     589           42 :     .ok_or_else(|| DownloadError::Cancelled)
     590           42 :     .and_then(|x| x)
     591           42 : }
     592              : 
     593          672 : async fn download_retry_forever<T, O, F>(
     594          672 :     op: O,
     595          672 :     description: &str,
     596          672 :     cancel: &CancellationToken,
     597          672 : ) -> Result<T, DownloadError>
     598          672 : where
     599          672 :     O: FnMut() -> F,
     600          672 :     F: Future<Output = Result<T, DownloadError>>,
     601          672 : {
     602          672 :     backoff::retry(
     603          672 :         op,
     604          672 :         DownloadError::is_permanent,
     605          672 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     606          672 :         u32::MAX,
     607          672 :         description,
     608          672 :         cancel,
     609          672 :     )
     610         2519 :     .await
     611          672 :     .ok_or_else(|| DownloadError::Cancelled)
     612          672 :     .and_then(|x| x)
     613          672 : }
        

Generated by: LCOV version 2.1-beta