LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 85.1 % 349 297
Test Date: 2024-04-18 15:32:49 Functions: 55.4 % 74 41

            Line data    Source code
       1              : //! Helper functions to download files from remote storage with a RemoteStorage
       2              : //!
       3              : //! The functions in this module retry failed operations automatically, according
       4              : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5              : 
       6              : use std::collections::HashSet;
       7              : use std::future::Future;
       8              : 
       9              : use anyhow::{anyhow, Context};
      10              : use camino::{Utf8Path, Utf8PathBuf};
      11              : use pageserver_api::shard::TenantShardId;
      12              : use tokio::fs::{self, File, OpenOptions};
      13              : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
      14              : use tokio_util::io::StreamReader;
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::warn;
      17              : use utils::backoff;
      18              : 
      19              : use crate::config::PageServerConf;
      20              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      21              : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
      22              : use crate::tenant::storage_layer::LayerFileName;
      23              : use crate::tenant::Generation;
      24              : use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
      25              : use crate::TEMP_FILE_SUFFIX;
      26              : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
      27              : use utils::crashsafe::path_with_suffix_extension;
      28              : use utils::id::TimelineId;
      29              : 
      30              : use super::index::{IndexPart, LayerFileMetadata};
      31              : use super::{
      32              :     parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
      33              :     remote_initdb_preserved_archive_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
      34              :     INITDB_PATH,
      35              : };
      36              : 
      37              : ///
      38              : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      39              : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      40              : ///
      41              : /// Returns the size of the downloaded file.
      42            6 : pub async fn download_layer_file<'a>(
      43            6 :     conf: &'static PageServerConf,
      44            6 :     storage: &'a GenericRemoteStorage,
      45            6 :     tenant_shard_id: TenantShardId,
      46            6 :     timeline_id: TimelineId,
      47            6 :     layer_file_name: &'a LayerFileName,
      48            6 :     layer_metadata: &'a LayerFileMetadata,
      49            6 :     cancel: &CancellationToken,
      50            6 : ) -> Result<u64, DownloadError> {
      51            6 :     debug_assert_current_span_has_tenant_and_timeline_id();
      52            6 : 
      53            6 :     let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
      54            6 :     let local_path = timeline_path.join(layer_file_name.file_name());
      55            6 : 
      56            6 :     let remote_path = remote_layer_path(
      57            6 :         &tenant_shard_id.tenant_id,
      58            6 :         &timeline_id,
      59            6 :         layer_metadata.shard,
      60            6 :         layer_file_name,
      61            6 :         layer_metadata.generation,
      62            6 :     );
      63            6 : 
      64            6 :     // Perform a rename inspired by durable_rename from file_utils.c.
      65            6 :     // The sequence:
      66            6 :     //     write(tmp)
      67            6 :     //     fsync(tmp)
      68            6 :     //     rename(tmp, new)
      69            6 :     //     fsync(new)
      70            6 :     //     fsync(parent)
      71            6 :     // For more context about durable_rename check this email from postgres mailing list:
      72            6 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      73            6 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      74            6 :     let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
      75              : 
      76            6 :     let bytes_amount = download_retry(
      77           72 :         || async { download_object(storage, &remote_path, &temp_file_path, cancel).await },
      78            6 :         &format!("download {remote_path:?}"),
      79            6 :         cancel,
      80            6 :     )
      81           72 :     .await?;
      82              : 
      83            6 :     let expected = layer_metadata.file_size();
      84            6 :     if expected != bytes_amount {
      85            0 :         return Err(DownloadError::Other(anyhow!(
      86            0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
      87            0 :         )));
      88            6 :     }
      89            6 : 
      90            6 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
      91            0 :         Err(DownloadError::Other(anyhow!(
      92            0 :             "remote-storage-download-pre-rename failpoint triggered"
      93            0 :         )))
      94            6 :     });
      95              : 
      96            6 :     fs::rename(&temp_file_path, &local_path)
      97            6 :         .await
      98            6 :         .with_context(|| format!("rename download layer file to {local_path}"))
      99            6 :         .map_err(DownloadError::Other)?;
     100              : 
     101              :     // We use fatal_err() below because the after the rename above,
     102              :     // the in-memory state of the filesystem already has the layer file in its final place,
     103              :     // and subsequent pageserver code could think it's durable while it really isn't.
     104            6 :     let work = async move {
     105            6 :         let timeline_dir = VirtualFile::open(&timeline_path)
     106            3 :             .await
     107            6 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     108            6 :         timeline_dir
     109            6 :             .sync_all()
     110            3 :             .await
     111            6 :             .fatal_err("VirtualFile::sync_all timeline dir");
     112            6 :     };
     113            6 :     crate::virtual_file::io_engine::get()
     114            6 :         .spawn_blocking_and_block_on_if_std(work)
     115            9 :         .await;
     116              : 
     117            6 :     tracing::debug!("download complete: {local_path}");
     118              : 
     119            6 :     Ok(bytes_amount)
     120            6 : }
     121              : 
     122              : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
     123              : ///
     124              : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
     125              : /// (Note that the directory entry for the inode is not made durable.)
     126              : /// The file size in bytes is returned.
     127              : ///
     128              : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
     129              : /// The unlinking has _not_ been made durable.
     130            6 : async fn download_object<'a>(
     131            6 :     storage: &'a GenericRemoteStorage,
     132            6 :     src_path: &RemotePath,
     133            6 :     dst_path: &Utf8PathBuf,
     134            6 :     cancel: &CancellationToken,
     135            6 : ) -> Result<u64, DownloadError> {
     136            6 :     let res = match crate::virtual_file::io_engine::get() {
     137            0 :         crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
     138              :         crate::virtual_file::io_engine::IoEngine::StdFs => {
     139            3 :             async {
     140            3 :                 let destination_file = tokio::fs::File::create(dst_path)
     141            3 :                     .await
     142            3 :                     .with_context(|| format!("create a destination file for layer '{dst_path}'"))
     143            3 :                     .map_err(DownloadError::Other)?;
     144              : 
     145            6 :                 let download = storage.download(src_path, cancel).await?;
     146              : 
     147            3 :                 let mut buf_writer =
     148            3 :                     tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
     149            3 : 
     150            3 :                 let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
     151              : 
     152           24 :                 let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
     153            3 :                 buf_writer.flush().await?;
     154              : 
     155            3 :                 let mut destination_file = buf_writer.into_inner();
     156            3 : 
     157            3 :                 // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     158            3 :                 // A file will not be closed immediately when it goes out of scope if there are any IO operations
     159            3 :                 // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     160            3 :                 // you should call flush before dropping it.
     161            3 :                 //
     162            3 :                 // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     163            3 :                 // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     164            3 :                 // But for additional safety lets check/wait for any pending operations.
     165            3 :                 destination_file
     166            3 :                     .flush()
     167            0 :                     .await
     168            3 :                     .with_context(|| format!("flush source file at {dst_path}"))
     169            3 :                     .map_err(DownloadError::Other)?;
     170              : 
     171              :                 // not using sync_data because it can lose file size update
     172            3 :                 destination_file
     173            3 :                     .sync_all()
     174            3 :                     .await
     175            3 :                     .with_context(|| format!("failed to fsync source file at {dst_path}"))
     176            3 :                     .map_err(DownloadError::Other)?;
     177              : 
     178            3 :                 Ok(bytes_amount)
     179            3 :             }
     180           36 :             .await
     181              :         }
     182              :         #[cfg(target_os = "linux")]
     183              :         crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
     184              :             use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
     185            3 :             async {
     186            3 :                 let destination_file = VirtualFile::create(dst_path)
     187            3 :                     .await
     188            3 :                     .with_context(|| format!("create a destination file for layer '{dst_path}'"))
     189            3 :                     .map_err(DownloadError::Other)?;
     190              : 
     191            6 :                 let mut download = storage.download(src_path, cancel).await?;
     192              : 
     193              :                 // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
     194              :                 // There's chunks_vectored() on the stream.
     195            3 :                 let (bytes_amount, destination_file) = async {
     196            3 :                     let size_tracking = size_tracking_writer::Writer::new(destination_file);
     197            3 :                     let mut buffered = owned_buffers_io::write::BufferedWriter::<
     198            3 :                         { super::BUFFER_SIZE },
     199            3 :                         _,
     200            3 :                     >::new(size_tracking);
     201           18 :                     while let Some(res) =
     202           21 :                         futures::StreamExt::next(&mut download.download_stream).await
     203              :                     {
     204           18 :                         let chunk = match res {
     205           18 :                             Ok(chunk) => chunk,
     206            0 :                             Err(e) => return Err(e),
     207              :                         };
     208           18 :                         buffered
     209           18 :                             .write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk))
     210            0 :                             .await?;
     211              :                     }
     212            3 :                     let size_tracking = buffered.flush_and_into_inner().await?;
     213            3 :                     Ok(size_tracking.into_inner())
     214            3 :                 }
     215           24 :                 .await?;
     216              : 
     217              :                 // not using sync_data because it can lose file size update
     218            3 :                 destination_file
     219            3 :                     .sync_all()
     220            3 :                     .await
     221            3 :                     .with_context(|| format!("failed to fsync source file at {dst_path}"))
     222            3 :                     .map_err(DownloadError::Other)?;
     223              : 
     224            3 :                 Ok(bytes_amount)
     225            3 :             }
     226           36 :             .await
     227              :         }
     228              :     };
     229              : 
     230              :     // in case the download failed, clean up
     231            6 :     match res {
     232            6 :         Ok(bytes_amount) => Ok(bytes_amount),
     233            0 :         Err(e) => {
     234            0 :             if let Err(e) = tokio::fs::remove_file(dst_path).await {
     235            0 :                 if e.kind() != std::io::ErrorKind::NotFound {
     236            0 :                     on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
     237            0 :                 }
     238            0 :             }
     239            0 :             Err(e)
     240              :         }
     241              :     }
     242            6 : }
     243              : 
     244              : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     245              : 
     246            0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
     247            0 :     let extension = path.extension();
     248            0 :     match extension {
     249            0 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     250            0 :         Some(_) => false,
     251            0 :         None => false,
     252              :     }
     253            0 : }
     254              : 
     255              : /// List timelines of given tenant in remote storage
     256          108 : pub async fn list_remote_timelines(
     257          108 :     storage: &GenericRemoteStorage,
     258          108 :     tenant_shard_id: TenantShardId,
     259          108 :     cancel: CancellationToken,
     260          108 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
     261          108 :     let remote_path = remote_timelines_path(&tenant_shard_id);
     262          108 : 
     263          108 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     264            0 :         anyhow::bail!("storage-sync-list-remote-timelines");
     265          108 :     });
     266              : 
     267          108 :     let listing = download_retry_forever(
     268          108 :         || {
     269          108 :             storage.list(
     270          108 :                 Some(&remote_path),
     271          108 :                 ListingMode::WithDelimiter,
     272          108 :                 None,
     273          108 :                 &cancel,
     274          108 :             )
     275          108 :         },
     276          108 :         &format!("list timelines for {tenant_shard_id}"),
     277          108 :         &cancel,
     278          108 :     )
     279           10 :     .await?;
     280              : 
     281          108 :     let mut timeline_ids = HashSet::new();
     282          108 :     let mut other_prefixes = HashSet::new();
     283              : 
     284          114 :     for timeline_remote_storage_key in listing.prefixes {
     285            6 :         let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
     286            0 :             anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
     287            6 :         })?;
     288              : 
     289            6 :         match object_name.parse::<TimelineId>() {
     290            6 :             Ok(t) => timeline_ids.insert(t),
     291            0 :             Err(_) => other_prefixes.insert(object_name.to_string()),
     292              :         };
     293              :     }
     294              : 
     295          108 :     for key in listing.keys {
     296            0 :         let object_name = key
     297            0 :             .object_name()
     298            0 :             .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
     299            0 :         other_prefixes.insert(object_name.to_string());
     300              :     }
     301              : 
     302          108 :     Ok((timeline_ids, other_prefixes))
     303          108 : }
     304              : 
     305           34 : async fn do_download_index_part(
     306           34 :     storage: &GenericRemoteStorage,
     307           34 :     tenant_shard_id: &TenantShardId,
     308           34 :     timeline_id: &TimelineId,
     309           34 :     index_generation: Generation,
     310           34 :     cancel: &CancellationToken,
     311           34 : ) -> Result<IndexPart, DownloadError> {
     312           34 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
     313              : 
     314           34 :     let index_part_bytes = download_retry_forever(
     315           34 :         || async {
     316           54 :             let download = storage.download(&remote_path, cancel).await?;
     317              : 
     318           20 :             let mut bytes = Vec::new();
     319           20 : 
     320           20 :             let stream = download.download_stream;
     321           20 :             let mut stream = StreamReader::new(stream);
     322           20 : 
     323           40 :             tokio::io::copy_buf(&mut stream, &mut bytes).await?;
     324              : 
     325           20 :             Ok(bytes)
     326           68 :         },
     327           34 :         &format!("download {remote_path:?}"),
     328           34 :         cancel,
     329           34 :     )
     330           94 :     .await?;
     331              : 
     332           20 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     333           20 :         .with_context(|| format!("deserialize index part file at {remote_path:?}"))
     334           20 :         .map_err(DownloadError::Other)?;
     335              : 
     336           20 :     Ok(index_part)
     337           34 : }
     338              : 
     339              : /// index_part.json objects are suffixed with a generation number, so we cannot
     340              : /// directly GET the latest index part without doing some probing.
     341              : ///
     342              : /// In this function we probe for the most recent index in a generation <= our current generation.
     343              : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     344           40 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     345              : pub(super) async fn download_index_part(
     346              :     storage: &GenericRemoteStorage,
     347              :     tenant_shard_id: &TenantShardId,
     348              :     timeline_id: &TimelineId,
     349              :     my_generation: Generation,
     350              :     cancel: &CancellationToken,
     351              : ) -> Result<IndexPart, DownloadError> {
     352              :     debug_assert_current_span_has_tenant_and_timeline_id();
     353              : 
     354              :     if my_generation.is_none() {
     355              :         // Operating without generations: just fetch the generation-less path
     356              :         return do_download_index_part(
     357              :             storage,
     358              :             tenant_shard_id,
     359              :             timeline_id,
     360              :             my_generation,
     361              :             cancel,
     362              :         )
     363              :         .await;
     364              :     }
     365              : 
     366              :     // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
     367              :     // index in our generation.
     368              :     //
     369              :     // This is an optimization to avoid doing the listing for the general case below.
     370              :     let res =
     371              :         do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     372              :     match res {
     373              :         Ok(index_part) => {
     374            0 :             tracing::debug!(
     375            0 :                 "Found index_part from current generation (this is a stale attachment)"
     376            0 :             );
     377              :             return Ok(index_part);
     378              :         }
     379              :         Err(DownloadError::NotFound) => {}
     380              :         Err(e) => return Err(e),
     381              :     };
     382              : 
     383              :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded
     384              :     // and index part.  We may safely start from this index without doing a listing, because:
     385              :     //  - We checked for current generation case above
     386              :     //  - generations > my_generation are to be ignored
     387              :     //  - any other indices that exist would have an older generation than `previous_gen`, and
     388              :     //    we want to find the most recent index from a previous generation.
     389              :     //
     390              :     // This is an optimization to avoid doing the listing for the general case below.
     391              :     let res = do_download_index_part(
     392              :         storage,
     393              :         tenant_shard_id,
     394              :         timeline_id,
     395              :         my_generation.previous(),
     396              :         cancel,
     397              :     )
     398              :     .await;
     399              :     match res {
     400              :         Ok(index_part) => {
     401            0 :             tracing::debug!("Found index_part from previous generation");
     402              :             return Ok(index_part);
     403              :         }
     404              :         Err(DownloadError::NotFound) => {
     405            0 :             tracing::debug!(
     406            0 :                 "No index_part found from previous generation, falling back to listing"
     407            0 :             );
     408              :         }
     409              :         Err(e) => {
     410              :             return Err(e);
     411              :         }
     412              :     }
     413              : 
     414              :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     415              :     // objects, and select the highest one with a generation <= my_generation.  Constructing the prefix is equivalent
     416              :     // to constructing a full index path with no generation, because the generation is a suffix.
     417              :     let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
     418              : 
     419              :     let indices = download_retry(
     420           12 :         || async { storage.list_files(Some(&index_prefix), None, cancel).await },
     421              :         "list index_part files",
     422              :         cancel,
     423              :     )
     424              :     .await?;
     425              : 
     426              :     // General case logic for which index to use: the latest index whose generation
     427              :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     428              :     let max_previous_generation = indices
     429              :         .into_iter()
     430              :         .filter_map(parse_remote_index_path)
     431           12 :         .filter(|g| g <= &my_generation)
     432              :         .max();
     433              : 
     434              :     match max_previous_generation {
     435              :         Some(g) => {
     436            0 :             tracing::debug!("Found index_part in generation {g:?}");
     437              :             do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
     438              :         }
     439              :         None => {
     440              :             // Migration from legacy pre-generation state: we have a generation but no prior
     441              :             // attached pageservers did.  Try to load from a no-generation path.
     442            0 :             tracing::debug!("No index_part.json* found");
     443              :             do_download_index_part(
     444              :                 storage,
     445              :                 tenant_shard_id,
     446              :                 timeline_id,
     447              :                 Generation::none(),
     448              :                 cancel,
     449              :             )
     450              :             .await
     451              :         }
     452              :     }
     453              : }
     454              : 
     455            2 : pub(crate) async fn download_initdb_tar_zst(
     456            2 :     conf: &'static PageServerConf,
     457            2 :     storage: &GenericRemoteStorage,
     458            2 :     tenant_shard_id: &TenantShardId,
     459            2 :     timeline_id: &TimelineId,
     460            2 :     cancel: &CancellationToken,
     461            2 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
     462            2 :     debug_assert_current_span_has_tenant_and_timeline_id();
     463            2 : 
     464            2 :     let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     465            2 : 
     466            2 :     let remote_preserved_path =
     467            2 :         remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     468            2 : 
     469            2 :     let timeline_path = conf.timelines_path(tenant_shard_id);
     470            2 : 
     471            2 :     if !timeline_path.exists() {
     472            0 :         tokio::fs::create_dir_all(&timeline_path)
     473            0 :             .await
     474            0 :             .with_context(|| format!("timeline dir creation {timeline_path}"))
     475            0 :             .map_err(DownloadError::Other)?;
     476            2 :     }
     477            2 :     let temp_path = timeline_path.join(format!(
     478            2 :         "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
     479            2 :     ));
     480              : 
     481            2 :     let file = download_retry(
     482            2 :         || async {
     483            2 :             let file = OpenOptions::new()
     484            2 :                 .create(true)
     485            2 :                 .truncate(true)
     486            2 :                 .read(true)
     487            2 :                 .write(true)
     488            2 :                 .open(&temp_path)
     489            2 :                 .await
     490            2 :                 .with_context(|| format!("tempfile creation {temp_path}"))
     491            2 :                 .map_err(DownloadError::Other)?;
     492              : 
     493            4 :             let download = match storage.download(&remote_path, cancel).await {
     494            2 :                 Ok(dl) => dl,
     495              :                 Err(DownloadError::NotFound) => {
     496            0 :                     storage.download(&remote_preserved_path, cancel).await?
     497              :                 }
     498            0 :                 Err(other) => Err(other)?,
     499              :             };
     500            2 :             let mut download = tokio_util::io::StreamReader::new(download.download_stream);
     501            2 :             let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
     502            2 : 
     503          730 :             tokio::io::copy_buf(&mut download, &mut writer).await?;
     504              : 
     505            2 :             let mut file = writer.into_inner();
     506            2 : 
     507            2 :             file.seek(std::io::SeekFrom::Start(0))
     508            2 :                 .await
     509            2 :                 .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
     510            2 :                 .map_err(DownloadError::Other)?;
     511              : 
     512            2 :             Ok(file)
     513            4 :         },
     514            2 :         &format!("download {remote_path}"),
     515            2 :         cancel,
     516            2 :     )
     517          738 :     .await
     518            2 :     .map_err(|e| {
     519              :         // Do a best-effort attempt at deleting the temporary file upon encountering an error.
     520              :         // We don't have async here nor do we want to pile on any extra errors.
     521            0 :         if let Err(e) = std::fs::remove_file(&temp_path) {
     522            0 :             if e.kind() != std::io::ErrorKind::NotFound {
     523            0 :                 warn!("error deleting temporary file {temp_path}: {e}");
     524            0 :             }
     525            0 :         }
     526            0 :         e
     527            2 :     })?;
     528              : 
     529            2 :     Ok((temp_path, file))
     530            2 : }
     531              : 
     532              : /// Helper function to handle retries for a download operation.
     533              : ///
     534              : /// Remote operations can fail due to rate limits (S3), spurious network
     535              : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     536              : /// with backoff.
     537              : ///
     538              : /// (See similar logic for uploads in `perform_upload_task`)
     539           14 : pub(super) async fn download_retry<T, O, F>(
     540           14 :     op: O,
     541           14 :     description: &str,
     542           14 :     cancel: &CancellationToken,
     543           14 : ) -> Result<T, DownloadError>
     544           14 : where
     545           14 :     O: FnMut() -> F,
     546           14 :     F: Future<Output = Result<T, DownloadError>>,
     547           14 : {
     548           14 :     backoff::retry(
     549           14 :         op,
     550           14 :         DownloadError::is_permanent,
     551           14 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     552           14 :         FAILED_REMOTE_OP_RETRIES,
     553           14 :         description,
     554           14 :         cancel,
     555           14 :     )
     556          822 :     .await
     557           14 :     .ok_or_else(|| DownloadError::Cancelled)
     558           14 :     .and_then(|x| x)
     559           14 : }
     560              : 
     561          142 : async fn download_retry_forever<T, O, F>(
     562          142 :     op: O,
     563          142 :     description: &str,
     564          142 :     cancel: &CancellationToken,
     565          142 : ) -> Result<T, DownloadError>
     566          142 : where
     567          142 :     O: FnMut() -> F,
     568          142 :     F: Future<Output = Result<T, DownloadError>>,
     569          142 : {
     570          142 :     backoff::retry(
     571          142 :         op,
     572          142 :         DownloadError::is_permanent,
     573          142 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     574          142 :         u32::MAX,
     575          142 :         description,
     576          142 :         cancel,
     577          142 :     )
     578          104 :     .await
     579          142 :     .ok_or_else(|| DownloadError::Cancelled)
     580          142 :     .and_then(|x| x)
     581          142 : }
        

Generated by: LCOV version 2.1-beta