LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 89.2 % 332 296
Test Date: 2024-02-07 07:37:29 Functions: 56.1 % 66 37

            Line data    Source code
       1              : //! Helper functions to download files from remote storage with a RemoteStorage
       2              : //!
       3              : //! The functions in this module retry failed operations automatically, according
       4              : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5              : 
       6              : use std::collections::HashSet;
       7              : use std::future::Future;
       8              : 
       9              : use anyhow::{anyhow, Context};
      10              : use camino::{Utf8Path, Utf8PathBuf};
      11              : use pageserver_api::shard::TenantShardId;
      12              : use tokio::fs::{self, File, OpenOptions};
      13              : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::warn;
      16              : use utils::timeout::timeout_cancellable;
      17              : use utils::{backoff, crashsafe};
      18              : 
      19              : use crate::config::PageServerConf;
      20              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      21              : use crate::tenant::remote_timeline_client::{
      22              :     download_cancellable, remote_layer_path, remote_timelines_path, DOWNLOAD_TIMEOUT,
      23              : };
      24              : use crate::tenant::storage_layer::LayerFileName;
      25              : use crate::tenant::Generation;
      26              : use crate::virtual_file::on_fatal_io_error;
      27              : use crate::TEMP_FILE_SUFFIX;
      28              : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
      29              : use utils::crashsafe::path_with_suffix_extension;
      30              : use utils::id::TimelineId;
      31              : 
      32              : use super::index::{IndexPart, LayerFileMetadata};
      33              : use super::{
      34              :     parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
      35              :     remote_initdb_preserved_archive_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
      36              :     INITDB_PATH,
      37              : };
      38              : 
      39              : ///
      40              : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      41              : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      42              : ///
      43              : /// Returns the size of the downloaded file.
      44        11047 : pub async fn download_layer_file<'a>(
      45        11047 :     conf: &'static PageServerConf,
      46        11047 :     storage: &'a GenericRemoteStorage,
      47        11047 :     tenant_shard_id: TenantShardId,
      48        11047 :     timeline_id: TimelineId,
      49        11047 :     layer_file_name: &'a LayerFileName,
      50        11047 :     layer_metadata: &'a LayerFileMetadata,
      51        11047 :     cancel: &CancellationToken,
      52        11047 : ) -> Result<u64, DownloadError> {
      53        11047 :     debug_assert_current_span_has_tenant_and_timeline_id();
      54        11047 : 
      55        11047 :     let local_path = conf
      56        11047 :         .timeline_path(&tenant_shard_id, &timeline_id)
      57        11047 :         .join(layer_file_name.file_name());
      58        11047 : 
      59        11047 :     let remote_path = remote_layer_path(
      60        11047 :         &tenant_shard_id.tenant_id,
      61        11047 :         &timeline_id,
      62        11047 :         layer_metadata.shard,
      63        11047 :         layer_file_name,
      64        11047 :         layer_metadata.generation,
      65        11047 :     );
      66        11047 : 
      67        11047 :     // Perform a rename inspired by durable_rename from file_utils.c.
      68        11047 :     // The sequence:
      69        11047 :     //     write(tmp)
      70        11047 :     //     fsync(tmp)
      71        11047 :     //     rename(tmp, new)
      72        11047 :     //     fsync(new)
      73        11047 :     //     fsync(parent)
      74        11047 :     // For more context about durable_rename check this email from postgres mailing list:
      75        11047 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      76        11047 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      77        11047 :     let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
      78              : 
      79        11047 :     let (mut destination_file, bytes_amount) = download_retry(
      80        11075 :         || async {
      81        11075 :             let destination_file = tokio::fs::File::create(&temp_file_path)
      82        10687 :                 .await
      83        11075 :                 .with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
      84        11075 :                 .map_err(DownloadError::Other)?;
      85              : 
      86              :             // Cancellation safety: it is safe to cancel this future, because it isn't writing to a local
      87              :             // file: the write to local file doesn't start until after the request header is returned
      88              :             // and we start draining the body stream below
      89        11075 :             let download = download_cancellable(cancel, storage.download(&remote_path))
      90        30883 :                 .await
      91        11075 :                 .with_context(|| {
      92           34 :                     format!(
      93           34 :                     "open a download stream for layer with remote storage path '{remote_path:?}'"
      94           34 :                 )
      95        11075 :                 })
      96        11075 :                 .map_err(DownloadError::Other)?;
      97              : 
      98        11041 :             let mut destination_file =
      99        11041 :                 tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
     100        11041 : 
     101        11041 :             let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
     102              : 
     103              :             // Cancellation safety: it is safe to cancel this future because it is writing into a temporary file,
     104              :             // and we will unlink the temporary file if there is an error.  This unlink is important because we
     105              :             // are in a retry loop, and we wouldn't want to leave behind a rogue write I/O to a file that
     106              :             // we will imminiently try and write to again.
     107        11041 :             let bytes_amount: u64 = match timeout_cancellable(
     108        11041 :                 DOWNLOAD_TIMEOUT,
     109        11041 :                 cancel,
     110        11041 :                 tokio::io::copy_buf(&mut reader, &mut destination_file),
     111        11041 :             )
     112       386207 :             .await
     113        11041 :             .with_context(|| {
     114            2 :                 format!(
     115            2 :                     "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
     116            2 :                 )
     117        11041 :             })
     118        11041 :             .map_err(DownloadError::Other)?
     119              :             {
     120        11039 :                 Ok(b) => Ok(b),
     121            0 :                 Err(e) => {
     122              :                     // Remove incomplete files: on restart Timeline would do this anyway, but we must
     123              :                     // do it here for the retry case.
     124            0 :                     if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
     125            0 :                         on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
     126            0 :                     }
     127            0 :                     Err(e)
     128              :                 }
     129              :             }
     130        11039 :             .with_context(|| {
     131            0 :                 format!(
     132            0 :                     "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
     133            0 :                 )
     134        11039 :             })
     135        11039 :             .map_err(DownloadError::Other)?;
     136              : 
     137        11039 :             let destination_file = destination_file.into_inner();
     138        11039 : 
     139        11039 :             Ok((destination_file, bytes_amount))
     140        11075 :         },
     141        11047 :         &format!("download {remote_path:?}"),
     142        11047 :         cancel,
     143        11047 :     )
     144       427777 :     .await?;
     145              : 
     146              :     // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     147              :     // A file will not be closed immediately when it goes out of scope if there are any IO operations
     148              :     // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     149              :     // you should call flush before dropping it.
     150              :     //
     151              :     // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     152              :     // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     153              :     // But for additional safety lets check/wait for any pending operations.
     154        11039 :     destination_file
     155        11039 :         .flush()
     156            0 :         .await
     157        11039 :         .with_context(|| format!("flush source file at {temp_file_path}"))
     158        11039 :         .map_err(DownloadError::Other)?;
     159              : 
     160        11039 :     let expected = layer_metadata.file_size();
     161        11039 :     if expected != bytes_amount {
     162            0 :         return Err(DownloadError::Other(anyhow!(
     163            0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
     164            0 :         )));
     165        11039 :     }
     166        11039 : 
     167        11039 :     // not using sync_data because it can lose file size update
     168        11039 :     destination_file
     169        11039 :         .sync_all()
     170        11037 :         .await
     171        11039 :         .with_context(|| format!("failed to fsync source file at {temp_file_path}"))
     172        11039 :         .map_err(DownloadError::Other)?;
     173        11039 :     drop(destination_file);
     174        11039 : 
     175        11039 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
     176            6 :         Err(DownloadError::Other(anyhow!(
     177            6 :             "remote-storage-download-pre-rename failpoint triggered"
     178            6 :         )))
     179        11039 :     });
     180              : 
     181        11033 :     fs::rename(&temp_file_path, &local_path)
     182        10734 :         .await
     183        11033 :         .with_context(|| format!("rename download layer file to {local_path}"))
     184        11033 :         .map_err(DownloadError::Other)?;
     185              : 
     186        11033 :     crashsafe::fsync_async(&local_path)
     187        21637 :         .await
     188        11032 :         .with_context(|| format!("fsync layer file {local_path}"))
     189        11032 :         .map_err(DownloadError::Other)?;
     190              : 
     191            0 :     tracing::debug!("download complete: {local_path}");
     192              : 
     193        11032 :     Ok(bytes_amount)
     194        11046 : }
     195              : 
     196              : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     197              : 
     198           51 : pub fn is_temp_download_file(path: &Utf8Path) -> bool {
     199           51 :     let extension = path.extension();
     200           51 :     match extension {
     201            4 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     202            1 :         Some(_) => false,
     203           47 :         None => false,
     204              :     }
     205           51 : }
     206              : 
     207              : /// List timelines of given tenant in remote storage
     208          846 : pub async fn list_remote_timelines(
     209          846 :     storage: &GenericRemoteStorage,
     210          846 :     tenant_shard_id: TenantShardId,
     211          846 :     cancel: CancellationToken,
     212          846 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
     213          846 :     let remote_path = remote_timelines_path(&tenant_shard_id);
     214          846 : 
     215          846 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     216            6 :         anyhow::bail!("storage-sync-list-remote-timelines");
     217          846 :     });
     218              : 
     219          840 :     let cancel_inner = cancel.clone();
     220          840 :     let listing = download_retry_forever(
     221          903 :         || {
     222          903 :             download_cancellable(
     223          903 :                 &cancel_inner,
     224          903 :                 storage.list(Some(&remote_path), ListingMode::WithDelimiter),
     225          903 :             )
     226          903 :         },
     227          840 :         &format!("list timelines for {tenant_shard_id}"),
     228          840 :         cancel,
     229          840 :     )
     230         2031 :     .await?;
     231              : 
     232          840 :     let mut timeline_ids = HashSet::new();
     233          840 :     let mut other_prefixes = HashSet::new();
     234              : 
     235         1267 :     for timeline_remote_storage_key in listing.prefixes {
     236          427 :         let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
     237            0 :             anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
     238          427 :         })?;
     239              : 
     240          427 :         match object_name.parse::<TimelineId>() {
     241          427 :             Ok(t) => timeline_ids.insert(t),
     242            0 :             Err(_) => other_prefixes.insert(object_name.to_string()),
     243              :         };
     244              :     }
     245              : 
     246          855 :     for key in listing.keys {
     247           15 :         let object_name = key
     248           15 :             .object_name()
     249           15 :             .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
     250           15 :         other_prefixes.insert(object_name.to_string());
     251              :     }
     252              : 
     253          840 :     Ok((timeline_ids, other_prefixes))
     254          846 : }
     255              : 
     256         1057 : async fn do_download_index_part(
     257         1057 :     storage: &GenericRemoteStorage,
     258         1057 :     tenant_shard_id: &TenantShardId,
     259         1057 :     timeline_id: &TimelineId,
     260         1057 :     index_generation: Generation,
     261         1057 :     cancel: CancellationToken,
     262         1057 : ) -> Result<IndexPart, DownloadError> {
     263         1057 :     use futures::stream::StreamExt;
     264         1057 : 
     265         1057 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
     266         1057 : 
     267         1057 :     let cancel_inner = cancel.clone();
     268         1057 :     let index_part_bytes = download_retry_forever(
     269         1111 :         || async {
     270              :             // Cancellation: if is safe to cancel this future because we're just downloading into
     271              :             // a memory buffer, not touching local disk.
     272          438 :             let index_part_download =
     273         1507 :                 download_cancellable(&cancel_inner, storage.download(&remote_path)).await?;
     274              : 
     275          438 :             let mut index_part_bytes = Vec::new();
     276          438 :             let mut stream = std::pin::pin!(index_part_download.download_stream);
     277         1310 :             while let Some(chunk) = stream.next().await {
     278          872 :                 let chunk = chunk
     279          872 :                     .with_context(|| format!("download index part at {remote_path:?}"))
     280          872 :                     .map_err(DownloadError::Other)?;
     281          872 :                 index_part_bytes.extend_from_slice(&chunk[..]);
     282              :             }
     283          438 :             Ok(index_part_bytes)
     284         1111 :         },
     285         1057 :         &format!("download {remote_path:?}"),
     286         1057 :         cancel,
     287         1057 :     )
     288         2544 :     .await?;
     289              : 
     290          438 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     291          438 :         .with_context(|| format!("download index part file at {remote_path:?}"))
     292          438 :         .map_err(DownloadError::Other)?;
     293              : 
     294          438 :     Ok(index_part)
     295         1057 : }
     296              : 
     297              : /// index_part.json objects are suffixed with a generation number, so we cannot
     298              : /// directly GET the latest index part without doing some probing.
     299              : ///
     300              : /// In this function we probe for the most recent index in a generation <= our current generation.
     301              : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     302          190 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     303              : pub(super) async fn download_index_part(
     304              :     storage: &GenericRemoteStorage,
     305              :     tenant_shard_id: &TenantShardId,
     306              :     timeline_id: &TimelineId,
     307              :     my_generation: Generation,
     308              :     cancel: CancellationToken,
     309              : ) -> Result<IndexPart, DownloadError> {
     310              :     debug_assert_current_span_has_tenant_and_timeline_id();
     311              : 
     312              :     if my_generation.is_none() {
     313              :         // Operating without generations: just fetch the generation-less path
     314              :         return do_download_index_part(
     315              :             storage,
     316              :             tenant_shard_id,
     317              :             timeline_id,
     318              :             my_generation,
     319              :             cancel,
     320              :         )
     321              :         .await;
     322              :     }
     323              : 
     324              :     // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
     325              :     // index in our generation.
     326              :     //
     327              :     // This is an optimization to avoid doing the listing for the general case below.
     328              :     let res = do_download_index_part(
     329              :         storage,
     330              :         tenant_shard_id,
     331              :         timeline_id,
     332              :         my_generation,
     333              :         cancel.clone(),
     334              :     )
     335              :     .await;
     336              :     match res {
     337              :         Ok(index_part) => {
     338            0 :             tracing::debug!(
     339            0 :                 "Found index_part from current generation (this is a stale attachment)"
     340            0 :             );
     341              :             return Ok(index_part);
     342              :         }
     343              :         Err(DownloadError::NotFound) => {}
     344              :         Err(e) => return Err(e),
     345              :     };
     346              : 
     347              :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded
     348              :     // and index part.  We may safely start from this index without doing a listing, because:
     349              :     //  - We checked for current generation case above
     350              :     //  - generations > my_generation are to be ignored
     351              :     //  - any other indices that exist would have an older generation than `previous_gen`, and
     352              :     //    we want to find the most recent index from a previous generation.
     353              :     //
     354              :     // This is an optimization to avoid doing the listing for the general case below.
     355              :     let res = do_download_index_part(
     356              :         storage,
     357              :         tenant_shard_id,
     358              :         timeline_id,
     359              :         my_generation.previous(),
     360              :         cancel.clone(),
     361              :     )
     362              :     .await;
     363              :     match res {
     364              :         Ok(index_part) => {
     365            0 :             tracing::debug!("Found index_part from previous generation");
     366              :             return Ok(index_part);
     367              :         }
     368              :         Err(DownloadError::NotFound) => {
     369            0 :             tracing::debug!(
     370            0 :                 "No index_part found from previous generation, falling back to listing"
     371            0 :             );
     372              :         }
     373              :         Err(e) => {
     374              :             return Err(e);
     375              :         }
     376              :     }
     377              : 
     378              :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     379              :     // objects, and select the highest one with a generation <= my_generation.  Constructing the prefix is equivalent
     380              :     // to constructing a full index path with no generation, because the generation is a suffix.
     381              :     let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
     382              :     let indices = backoff::retry(
     383          441 :         || async { storage.list_files(Some(&index_prefix)).await },
     384            6 :         |_| false,
     385              :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     386              :         FAILED_REMOTE_OP_RETRIES,
     387              :         "listing index_part files",
     388              :         &cancel,
     389              :     )
     390              :     .await
     391            0 :     .ok_or_else(|| anyhow::anyhow!("Cancelled"))
     392          190 :     .and_then(|x| x)
     393              :     .map_err(DownloadError::Other)?;
     394              : 
     395              :     // General case logic for which index to use: the latest index whose generation
     396              :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     397              :     let max_previous_generation = indices
     398              :         .into_iter()
     399              :         .filter_map(parse_remote_index_path)
     400          447 :         .filter(|g| g <= &my_generation)
     401              :         .max();
     402              : 
     403              :     match max_previous_generation {
     404              :         Some(g) => {
     405            0 :             tracing::debug!("Found index_part in generation {g:?}");
     406              :             do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
     407              :         }
     408              :         None => {
     409              :             // Migration from legacy pre-generation state: we have a generation but no prior
     410              :             // attached pageservers did.  Try to load from a no-generation path.
     411            0 :             tracing::debug!("No index_part.json* found");
     412              :             do_download_index_part(
     413              :                 storage,
     414              :                 tenant_shard_id,
     415              :                 timeline_id,
     416              :                 Generation::none(),
     417              :                 cancel,
     418              :             )
     419              :             .await
     420              :         }
     421              :     }
     422              : }
     423              : 
     424            4 : pub(crate) async fn download_initdb_tar_zst(
     425            4 :     conf: &'static PageServerConf,
     426            4 :     storage: &GenericRemoteStorage,
     427            4 :     tenant_shard_id: &TenantShardId,
     428            4 :     timeline_id: &TimelineId,
     429            4 :     cancel: &CancellationToken,
     430            4 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
     431            4 :     debug_assert_current_span_has_tenant_and_timeline_id();
     432            4 : 
     433            4 :     let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     434            4 : 
     435            4 :     let remote_preserved_path =
     436            4 :         remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     437            4 : 
     438            4 :     let timeline_path = conf.timelines_path(tenant_shard_id);
     439            4 : 
     440            4 :     if !timeline_path.exists() {
     441            0 :         tokio::fs::create_dir_all(&timeline_path)
     442            0 :             .await
     443            0 :             .with_context(|| format!("timeline dir creation {timeline_path}"))
     444            0 :             .map_err(DownloadError::Other)?;
     445            4 :     }
     446            4 :     let temp_path = timeline_path.join(format!(
     447            4 :         "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
     448            4 :     ));
     449            4 : 
     450            4 :     let cancel_inner = cancel.clone();
     451              : 
     452            4 :     let file = download_retry(
     453            4 :         || async {
     454            4 :             let file = OpenOptions::new()
     455            4 :                 .create(true)
     456            4 :                 .truncate(true)
     457            4 :                 .read(true)
     458            4 :                 .write(true)
     459            4 :                 .open(&temp_path)
     460            4 :                 .await
     461            4 :                 .with_context(|| format!("tempfile creation {temp_path}"))
     462            4 :                 .map_err(DownloadError::Other)?;
     463              : 
     464            4 :             let download = match download_cancellable(&cancel_inner, storage.download(&remote_path))
     465            2 :                 .await
     466              :             {
     467            2 :                 Ok(dl) => dl,
     468              :                 Err(DownloadError::NotFound) => {
     469            2 :                     download_cancellable(&cancel_inner, storage.download(&remote_preserved_path))
     470            2 :                         .await?
     471              :                 }
     472            0 :                 Err(other) => Err(other)?,
     473              :             };
     474            4 :             let mut download = tokio_util::io::StreamReader::new(download.download_stream);
     475            4 :             let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
     476            4 : 
     477            4 :             // TODO: this consumption of the response body should be subject to timeout + cancellation, but
     478            4 :             // not without thinking carefully about how to recover safely from cancelling a write to
     479            4 :             // local storage (e.g. by writing into a temp file as we do in download_layer)
     480            4 :             tokio::io::copy_buf(&mut download, &mut writer)
     481         1460 :                 .await
     482            4 :                 .with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))
     483            4 :                 .map_err(DownloadError::Other)?;
     484              : 
     485            4 :             let mut file = writer.into_inner();
     486            4 : 
     487            4 :             file.seek(std::io::SeekFrom::Start(0))
     488            3 :                 .await
     489            4 :                 .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
     490            4 :                 .map_err(DownloadError::Other)?;
     491              : 
     492            4 :             Ok(file)
     493            4 :         },
     494            4 :         &format!("download {remote_path}"),
     495            4 :         cancel,
     496            4 :     )
     497         1471 :     .await
     498            4 :     .map_err(|e| {
     499              :         // Do a best-effort attempt at deleting the temporary file upon encountering an error.
     500              :         // We don't have async here nor do we want to pile on any extra errors.
     501            0 :         if let Err(e) = std::fs::remove_file(&temp_path) {
     502            0 :             if e.kind() != std::io::ErrorKind::NotFound {
     503            0 :                 warn!("error deleting temporary file {temp_path}: {e}");
     504            0 :             }
     505            0 :         }
     506            0 :         e
     507            4 :     })?;
     508              : 
     509            4 :     Ok((temp_path, file))
     510            4 : }
     511              : 
     512              : /// Helper function to handle retries for a download operation.
     513              : ///
     514              : /// Remote operations can fail due to rate limits (S3), spurious network
     515              : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     516              : /// with backoff.
     517              : ///
     518              : /// (See similar logic for uploads in `perform_upload_task`)
     519        11051 : async fn download_retry<T, O, F>(
     520        11051 :     op: O,
     521        11051 :     description: &str,
     522        11051 :     cancel: &CancellationToken,
     523        11051 : ) -> Result<T, DownloadError>
     524        11051 : where
     525        11051 :     O: FnMut() -> F,
     526        11051 :     F: Future<Output = Result<T, DownloadError>>,
     527        11051 : {
     528        11051 :     backoff::retry(
     529        11051 :         op,
     530        11051 :         |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
     531        11051 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     532        11051 :         FAILED_REMOTE_OP_RETRIES,
     533        11051 :         description,
     534        11051 :         cancel,
     535        11051 :     )
     536       429248 :     .await
     537        11051 :     .ok_or_else(|| DownloadError::Cancelled)
     538        11051 :     .and_then(|x| x)
     539        11051 : }
     540              : 
     541         1897 : async fn download_retry_forever<T, O, F>(
     542         1897 :     op: O,
     543         1897 :     description: &str,
     544         1897 :     cancel: CancellationToken,
     545         1897 : ) -> Result<T, DownloadError>
     546         1897 : where
     547         1897 :     O: FnMut() -> F,
     548         1897 :     F: Future<Output = Result<T, DownloadError>>,
     549         1897 : {
     550         1897 :     backoff::retry(
     551         1897 :         op,
     552         1897 :         |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
     553         1897 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     554         1897 :         u32::MAX,
     555         1897 :         description,
     556         1897 :         &cancel,
     557         1897 :     )
     558         4575 :     .await
     559         1897 :     .ok_or_else(|| DownloadError::Cancelled)
     560         1897 :     .and_then(|x| x)
     561         1897 : }
        

Generated by: LCOV version 2.1-beta