LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit UBC GBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 89.1 % 322 287 35 3 284
Current Date: 2024-01-09 02:06:09 Functions: 52.5 % 61 32 29 1 31
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Helper functions to download files from remote storage with a RemoteStorage
       2                 : //!
       3                 : //! The functions in this module retry failed operations automatically, according
       4                 : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5                 : 
       6                 : use std::collections::HashSet;
       7                 : use std::future::Future;
       8                 : 
       9                 : use anyhow::{anyhow, Context};
      10                 : use camino::{Utf8Path, Utf8PathBuf};
      11                 : use pageserver_api::shard::TenantShardId;
      12                 : use tokio::fs::{self, File, OpenOptions};
      13                 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
      14                 : use tokio_util::sync::CancellationToken;
      15                 : use tracing::warn;
      16                 : use utils::timeout::timeout_cancellable;
      17                 : use utils::{backoff, crashsafe};
      18                 : 
      19                 : use crate::config::PageServerConf;
      20                 : use crate::tenant::remote_timeline_client::{
      21                 :     download_cancellable, remote_layer_path, remote_timelines_path, DOWNLOAD_TIMEOUT,
      22                 : };
      23                 : use crate::tenant::storage_layer::LayerFileName;
      24                 : use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
      25                 : use crate::tenant::Generation;
      26                 : use crate::virtual_file::on_fatal_io_error;
      27                 : use crate::TEMP_FILE_SUFFIX;
      28                 : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
      29                 : use utils::crashsafe::path_with_suffix_extension;
      30                 : use utils::id::TimelineId;
      31                 : 
      32                 : use super::index::{IndexPart, LayerFileMetadata};
      33                 : use super::{
      34                 :     parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
      35                 :     FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
      36                 : };
      37                 : 
      38                 : ///
      39                 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      40                 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      41                 : ///
      42                 : /// Returns the size of the downloaded file.
      43 CBC       10635 : pub async fn download_layer_file<'a>(
      44           10635 :     conf: &'static PageServerConf,
      45           10635 :     storage: &'a GenericRemoteStorage,
      46           10635 :     tenant_shard_id: TenantShardId,
      47           10635 :     timeline_id: TimelineId,
      48           10635 :     layer_file_name: &'a LayerFileName,
      49           10635 :     layer_metadata: &'a LayerFileMetadata,
      50           10635 :     cancel: &CancellationToken,
      51           10635 : ) -> Result<u64, DownloadError> {
      52           10635 :     debug_assert_current_span_has_tenant_and_timeline_id();
      53           10635 : 
      54           10635 :     let local_path = conf
      55           10635 :         .timeline_path(&tenant_shard_id, &timeline_id)
      56           10635 :         .join(layer_file_name.file_name());
      57           10635 : 
      58           10635 :     let remote_path = remote_layer_path(
      59           10635 :         &tenant_shard_id.tenant_id,
      60           10635 :         &timeline_id,
      61           10635 :         layer_metadata.shard,
      62           10635 :         layer_file_name,
      63           10635 :         layer_metadata.generation,
      64           10635 :     );
      65           10635 : 
      66           10635 :     // Perform a rename inspired by durable_rename from file_utils.c.
      67           10635 :     // The sequence:
      68           10635 :     //     write(tmp)
      69           10635 :     //     fsync(tmp)
      70           10635 :     //     rename(tmp, new)
      71           10635 :     //     fsync(new)
      72           10635 :     //     fsync(parent)
      73           10635 :     // For more context about durable_rename check this email from postgres mailing list:
      74           10635 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      75           10635 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      76           10635 :     let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
      77           10635 : 
      78           10635 :     let cancel_inner = cancel.clone();
      79           10635 :     let (mut destination_file, bytes_amount) = download_retry(
      80           10675 :         || async {
      81           10675 :             let destination_file = tokio::fs::File::create(&temp_file_path)
      82           10142 :                 .await
      83           10675 :                 .with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
      84           10675 :                 .map_err(DownloadError::Other)?;
      85                 : 
      86                 :             // Cancellation safety: it is safe to cancel this future, because it isn't writing to a local
      87                 :             // file: the write to local file doesn't start until after the request header is returned
      88                 :             // and we start draining the body stream below
      89           10675 :             let download = download_cancellable(&cancel_inner, storage.download(&remote_path))
      90           29649 :                 .await
      91           10675 :                 .with_context(|| {
      92              44 :                     format!(
      93              44 :                     "open a download stream for layer with remote storage path '{remote_path:?}'"
      94              44 :                 )
      95           10675 :                 })
      96           10675 :                 .map_err(DownloadError::Other)?;
      97                 : 
      98           10631 :             let mut destination_file =
      99           10631 :                 tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
     100           10631 : 
     101           10631 :             let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
     102                 : 
     103                 :             // Cancellation safety: it is safe to cancel this future because it is writing into a temporary file,
     104                 :             // and we will unlink the temporary file if there is an error.  This unlink is important because we
     105                 :             // are in a retry loop, and we wouldn't want to leave behind a rogue write I/O to a file that
     106                 :             // we will imminiently try and write to again.
     107           10631 :             let bytes_amount: u64 = match timeout_cancellable(
     108           10631 :                 DOWNLOAD_TIMEOUT,
     109           10631 :                 &cancel_inner,
     110           10631 :                 tokio::io::copy_buf(&mut reader, &mut destination_file),
     111           10631 :             )
     112          313106 :             .await
     113           10628 :             .with_context(|| {
     114 GBC           1 :                 format!(
     115               1 :                     "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
     116               1 :                 )
     117 CBC       10628 :             })
     118           10628 :             .map_err(DownloadError::Other)?
     119                 :             {
     120           10627 :                 Ok(b) => Ok(b),
     121 UBC           0 :                 Err(e) => {
     122                 :                     // Remove incomplete files: on restart Timeline would do this anyway, but we must
     123                 :                     // do it here for the retry case.
     124               0 :                     if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
     125               0 :                         on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
     126               0 :                     }
     127               0 :                     Err(e)
     128                 :                 }
     129                 :             }
     130 CBC       10627 :             .with_context(|| {
     131 UBC           0 :                 format!(
     132               0 :                     "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
     133               0 :                 )
     134 CBC       10627 :             })
     135           10627 :             .map_err(DownloadError::Other)?;
     136                 : 
     137           10627 :             let destination_file = destination_file.into_inner();
     138           10627 : 
     139           10627 :             Ok((destination_file, bytes_amount))
     140           10672 :         },
     141           10635 :         &format!("download {remote_path:?}"),
     142           10635 :         cancel,
     143           10635 :     )
     144          352897 :     .await?;
     145                 : 
     146                 :     // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     147                 :     // A file will not be closed immediately when it goes out of scope if there are any IO operations
     148                 :     // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     149                 :     // you should call flush before dropping it.
     150                 :     //
     151                 :     // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     152                 :     // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     153                 :     // But for additional safety lets check/wait for any pending operations.
     154           10627 :     destination_file
     155           10627 :         .flush()
     156 UBC           0 :         .await
     157 CBC       10627 :         .with_context(|| format!("flush source file at {temp_file_path}"))
     158           10627 :         .map_err(DownloadError::Other)?;
     159                 : 
     160           10627 :     let expected = layer_metadata.file_size();
     161           10627 :     if expected != bytes_amount {
     162 UBC           0 :         return Err(DownloadError::Other(anyhow!(
     163               0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
     164               0 :         )));
     165 CBC       10627 :     }
     166           10627 : 
     167           10627 :     // not using sync_data because it can lose file size update
     168           10627 :     destination_file
     169           10627 :         .sync_all()
     170           10624 :         .await
     171           10627 :         .with_context(|| format!("failed to fsync source file at {temp_file_path}"))
     172           10627 :         .map_err(DownloadError::Other)?;
     173           10627 :     drop(destination_file);
     174           10627 : 
     175           10627 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
     176               6 :         Err(DownloadError::Other(anyhow!(
     177               6 :             "remote-storage-download-pre-rename failpoint triggered"
     178               6 :         )))
     179           10627 :     });
     180                 : 
     181           10621 :     fs::rename(&temp_file_path, &local_path)
     182           10249 :         .await
     183           10621 :         .with_context(|| format!("rename download layer file to {local_path}"))
     184           10621 :         .map_err(DownloadError::Other)?;
     185                 : 
     186           10621 :     crashsafe::fsync_async(&local_path)
     187           20759 :         .await
     188           10621 :         .with_context(|| format!("fsync layer file {local_path}"))
     189           10621 :         .map_err(DownloadError::Other)?;
     190                 : 
     191 UBC           0 :     tracing::debug!("download complete: {local_path}");
     192                 : 
     193 CBC       10621 :     Ok(bytes_amount)
     194           10632 : }
     195                 : 
     196                 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     197                 : 
     198              59 : pub fn is_temp_download_file(path: &Utf8Path) -> bool {
     199              59 :     let extension = path.extension();
     200              59 :     match extension {
     201               8 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     202               6 :         Some(_) => false,
     203              51 :         None => false,
     204                 :     }
     205              59 : }
     206                 : 
     207                 : /// List timelines of given tenant in remote storage
     208             308 : pub async fn list_remote_timelines(
     209             308 :     storage: &GenericRemoteStorage,
     210             308 :     tenant_shard_id: TenantShardId,
     211             308 :     cancel: CancellationToken,
     212             308 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
     213             308 :     let remote_path = remote_timelines_path(&tenant_shard_id);
     214             308 : 
     215             308 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     216               6 :         anyhow::bail!("storage-sync-list-remote-timelines");
     217             308 :     });
     218                 : 
     219             302 :     let cancel_inner = cancel.clone();
     220             302 :     let listing = download_retry_forever(
     221             324 :         || {
     222             324 :             download_cancellable(
     223             324 :                 &cancel_inner,
     224             324 :                 storage.list(Some(&remote_path), ListingMode::WithDelimiter),
     225             324 :             )
     226             324 :         },
     227             302 :         &format!("list timelines for {tenant_shard_id}"),
     228             302 :         cancel,
     229             302 :     )
     230            1055 :     .await?;
     231                 : 
     232             302 :     let mut timeline_ids = HashSet::new();
     233             302 :     let mut other_prefixes = HashSet::new();
     234                 : 
     235             696 :     for timeline_remote_storage_key in listing.prefixes {
     236             394 :         let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
     237 UBC           0 :             anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
     238 CBC         394 :         })?;
     239                 : 
     240             394 :         match object_name.parse::<TimelineId>() {
     241             394 :             Ok(t) => timeline_ids.insert(t),
     242 UBC           0 :             Err(_) => other_prefixes.insert(object_name.to_string()),
     243                 :         };
     244                 :     }
     245                 : 
     246 CBC         317 :     for key in listing.keys {
     247              15 :         let object_name = key
     248              15 :             .object_name()
     249              15 :             .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
     250              15 :         other_prefixes.insert(object_name.to_string());
     251                 :     }
     252                 : 
     253             302 :     Ok((timeline_ids, other_prefixes))
     254             308 : }
     255                 : 
     256             950 : async fn do_download_index_part(
     257             950 :     storage: &GenericRemoteStorage,
     258             950 :     tenant_shard_id: &TenantShardId,
     259             950 :     timeline_id: &TimelineId,
     260             950 :     index_generation: Generation,
     261             950 :     cancel: CancellationToken,
     262             950 : ) -> Result<IndexPart, DownloadError> {
     263             950 :     use futures::stream::StreamExt;
     264             950 : 
     265             950 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
     266             950 : 
     267             950 :     let cancel_inner = cancel.clone();
     268             950 :     let index_part_bytes = download_retry_forever(
     269            1031 :         || async {
     270                 :             // Cancellation: if is safe to cancel this future because we're just downloading into
     271                 :             // a memory buffer, not touching local disk.
     272             376 :             let index_part_download =
     273            1664 :                 download_cancellable(&cancel_inner, storage.download(&remote_path)).await?;
     274                 : 
     275             376 :             let mut index_part_bytes = Vec::new();
     276             376 :             let mut stream = std::pin::pin!(index_part_download.download_stream);
     277            1216 :             while let Some(chunk) = stream.next().await {
     278             840 :                 let chunk = chunk
     279             840 :                     .with_context(|| format!("download index part at {remote_path:?}"))
     280             840 :                     .map_err(DownloadError::Other)?;
     281             840 :                 index_part_bytes.extend_from_slice(&chunk[..]);
     282                 :             }
     283             376 :             Ok(index_part_bytes)
     284            1031 :         },
     285             950 :         &format!("download {remote_path:?}"),
     286             950 :         cancel,
     287             950 :     )
     288            2611 :     .await?;
     289                 : 
     290             376 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     291             376 :         .with_context(|| format!("download index part file at {remote_path:?}"))
     292             376 :         .map_err(DownloadError::Other)?;
     293                 : 
     294             376 :     Ok(index_part)
     295             950 : }
     296                 : 
     297                 : /// index_part.json objects are suffixed with a generation number, so we cannot
     298                 : /// directly GET the latest index part without doing some probing.
     299                 : ///
     300                 : /// In this function we probe for the most recent index in a generation <= our current generation.
     301                 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     302             159 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     303                 : pub(super) async fn download_index_part(
     304                 :     storage: &GenericRemoteStorage,
     305                 :     tenant_shard_id: &TenantShardId,
     306                 :     timeline_id: &TimelineId,
     307                 :     my_generation: Generation,
     308                 :     cancel: CancellationToken,
     309                 : ) -> Result<IndexPart, DownloadError> {
     310                 :     debug_assert_current_span_has_tenant_and_timeline_id();
     311                 : 
     312                 :     if my_generation.is_none() {
     313                 :         // Operating without generations: just fetch the generation-less path
     314                 :         return do_download_index_part(
     315                 :             storage,
     316                 :             tenant_shard_id,
     317                 :             timeline_id,
     318                 :             my_generation,
     319                 :             cancel,
     320                 :         )
     321                 :         .await;
     322                 :     }
     323                 : 
     324                 :     // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
     325                 :     // index in our generation.
     326                 :     //
     327                 :     // This is an optimization to avoid doing the listing for the general case below.
     328                 :     let res = do_download_index_part(
     329                 :         storage,
     330                 :         tenant_shard_id,
     331                 :         timeline_id,
     332                 :         my_generation,
     333                 :         cancel.clone(),
     334                 :     )
     335                 :     .await;
     336                 :     match res {
     337                 :         Ok(index_part) => {
     338 UBC           0 :             tracing::debug!(
     339               0 :                 "Found index_part from current generation (this is a stale attachment)"
     340               0 :             );
     341                 :             return Ok(index_part);
     342                 :         }
     343                 :         Err(DownloadError::NotFound) => {}
     344                 :         Err(e) => return Err(e),
     345                 :     };
     346                 : 
     347                 :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded
     348                 :     // and index part.  We may safely start from this index without doing a listing, because:
     349                 :     //  - We checked for current generation case above
     350                 :     //  - generations > my_generation are to be ignored
     351                 :     //  - any other indices that exist would have an older generation than `previous_gen`, and
     352                 :     //    we want to find the most recent index from a previous generation.
     353                 :     //
     354                 :     // This is an optimization to avoid doing the listing for the general case below.
     355                 :     let res = do_download_index_part(
     356                 :         storage,
     357                 :         tenant_shard_id,
     358                 :         timeline_id,
     359                 :         my_generation.previous(),
     360                 :         cancel.clone(),
     361                 :     )
     362                 :     .await;
     363                 :     match res {
     364                 :         Ok(index_part) => {
     365               0 :             tracing::debug!("Found index_part from previous generation");
     366                 :             return Ok(index_part);
     367                 :         }
     368                 :         Err(DownloadError::NotFound) => {
     369               0 :             tracing::debug!(
     370               0 :                 "No index_part found from previous generation, falling back to listing"
     371               0 :             );
     372                 :         }
     373                 :         Err(e) => {
     374                 :             return Err(e);
     375                 :         }
     376                 :     }
     377                 : 
     378                 :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     379                 :     // objects, and select the highest one with a generation <= my_generation.  Constructing the prefix is equivalent
     380                 :     // to constructing a full index path with no generation, because the generation is a suffix.
     381                 :     let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
     382                 :     let indices = backoff::retry(
     383 CBC         390 :         || async { storage.list_files(Some(&index_prefix)).await },
     384              15 :         |_| false,
     385                 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     386                 :         FAILED_REMOTE_OP_RETRIES,
     387                 :         "listing index_part files",
     388 UBC           0 :         backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")),
     389                 :     )
     390                 :     .await
     391                 :     .map_err(DownloadError::Other)?;
     392                 : 
     393                 :     // General case logic for which index to use: the latest index whose generation
     394                 :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     395                 :     let max_previous_generation = indices
     396                 :         .into_iter()
     397                 :         .filter_map(parse_remote_index_path)
     398 CBC         370 :         .filter(|g| g <= &my_generation)
     399                 :         .max();
     400                 : 
     401                 :     match max_previous_generation {
     402                 :         Some(g) => {
     403 UBC           0 :             tracing::debug!("Found index_part in generation {g:?}");
     404                 :             do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
     405                 :         }
     406                 :         None => {
     407                 :             // Migration from legacy pre-generation state: we have a generation but no prior
     408                 :             // attached pageservers did.  Try to load from a no-generation path.
     409               0 :             tracing::debug!("No index_part.json* found");
     410                 :             do_download_index_part(
     411                 :                 storage,
     412                 :                 tenant_shard_id,
     413                 :                 timeline_id,
     414                 :                 Generation::none(),
     415                 :                 cancel,
     416                 :             )
     417                 :             .await
     418                 :         }
     419                 :     }
     420                 : }
     421                 : 
     422 CBC           3 : pub(crate) async fn download_initdb_tar_zst(
     423               3 :     conf: &'static PageServerConf,
     424               3 :     storage: &GenericRemoteStorage,
     425               3 :     tenant_shard_id: &TenantShardId,
     426               3 :     timeline_id: &TimelineId,
     427               3 :     cancel: &CancellationToken,
     428               3 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
     429               3 :     debug_assert_current_span_has_tenant_and_timeline_id();
     430               3 : 
     431               3 :     let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     432               3 : 
     433               3 :     let timeline_path = conf.timelines_path(tenant_shard_id);
     434               3 : 
     435               3 :     if !timeline_path.exists() {
     436 UBC           0 :         tokio::fs::create_dir_all(&timeline_path)
     437               0 :             .await
     438               0 :             .with_context(|| format!("timeline dir creation {timeline_path}"))
     439               0 :             .map_err(DownloadError::Other)?;
     440 CBC           3 :     }
     441               3 :     let temp_path = timeline_path.join(format!(
     442               3 :         "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
     443               3 :     ));
     444               3 : 
     445               3 :     let cancel_inner = cancel.clone();
     446                 : 
     447               3 :     let file = download_retry(
     448               3 :         || async {
     449               3 :             let file = OpenOptions::new()
     450               3 :                 .create(true)
     451               3 :                 .truncate(true)
     452               3 :                 .read(true)
     453               3 :                 .write(true)
     454               3 :                 .open(&temp_path)
     455               3 :                 .await
     456               3 :                 .with_context(|| format!("tempfile creation {temp_path}"))
     457               3 :                 .map_err(DownloadError::Other)?;
     458                 : 
     459               3 :             let download =
     460               3 :                 download_cancellable(&cancel_inner, storage.download(&remote_path)).await?;
     461               3 :             let mut download = tokio_util::io::StreamReader::new(download.download_stream);
     462               3 :             let mut writer = tokio::io::BufWriter::with_capacity(8 * 1024, file);
     463               3 : 
     464               3 :             // TODO: this consumption of the response body should be subject to timeout + cancellation, but
     465               3 :             // not without thinking carefully about how to recover safely from cancelling a write to
     466               3 :             // local storage (e.g. by writing into a temp file as we do in download_layer)
     467               3 :             tokio::io::copy_buf(&mut download, &mut writer)
     468            1238 :                 .await
     469               3 :                 .with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))
     470               3 :                 .map_err(DownloadError::Other)?;
     471                 : 
     472               3 :             let mut file = writer.into_inner();
     473               3 : 
     474               3 :             file.seek(std::io::SeekFrom::Start(0))
     475               2 :                 .await
     476               3 :                 .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
     477               3 :                 .map_err(DownloadError::Other)?;
     478                 : 
     479               3 :             Ok(file)
     480               3 :         },
     481               3 :         &format!("download {remote_path}"),
     482               3 :         cancel,
     483               3 :     )
     484            1246 :     .await
     485               3 :     .map_err(|e| {
     486                 :         // Do a best-effort attempt at deleting the temporary file upon encountering an error.
     487                 :         // We don't have async here nor do we want to pile on any extra errors.
     488 UBC           0 :         if let Err(e) = std::fs::remove_file(&temp_path) {
     489               0 :             if e.kind() != std::io::ErrorKind::NotFound {
     490               0 :                 warn!("error deleting temporary file {temp_path}: {e}");
     491               0 :             }
     492               0 :         }
     493               0 :         e
     494 CBC           3 :     })?;
     495                 : 
     496               3 :     Ok((temp_path, file))
     497               3 : }
     498                 : 
     499                 : /// Helper function to handle retries for a download operation.
     500                 : ///
     501                 : /// Remote operations can fail due to rate limits (IAM, S3), spurious network
     502                 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     503                 : /// with backoff.
     504                 : ///
     505                 : /// (See similar logic for uploads in `perform_upload_task`)
     506           10638 : async fn download_retry<T, O, F>(
     507           10638 :     op: O,
     508           10638 :     description: &str,
     509           10638 :     cancel: &CancellationToken,
     510           10638 : ) -> Result<T, DownloadError>
     511           10638 : where
     512           10638 :     O: FnMut() -> F,
     513           10638 :     F: Future<Output = Result<T, DownloadError>>,
     514           10638 : {
     515           10638 :     backoff::retry(
     516           10638 :         op,
     517           10638 :         |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
     518           10638 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     519           10638 :         FAILED_REMOTE_OP_RETRIES,
     520           10638 :         description,
     521           10638 :         backoff::Cancel::new(cancel.clone(), || DownloadError::Cancelled),
     522           10638 :     )
     523          354143 :     .await
     524           10635 : }
     525                 : 
     526            1252 : async fn download_retry_forever<T, O, F>(
     527            1252 :     op: O,
     528            1252 :     description: &str,
     529            1252 :     cancel: CancellationToken,
     530            1252 : ) -> Result<T, DownloadError>
     531            1252 : where
     532            1252 :     O: FnMut() -> F,
     533            1252 :     F: Future<Output = Result<T, DownloadError>>,
     534            1252 : {
     535            1252 :     backoff::retry(
     536            1252 :         op,
     537            1252 :         |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
     538            1252 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     539            1252 :         u32::MAX,
     540            1252 :         description,
     541            1252 :         backoff::Cancel::new(cancel, || DownloadError::Cancelled),
     542            1252 :     )
     543            3666 :     .await
     544            1252 : }
        

Generated by: LCOV version 2.1-beta