LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 89.1 % 322 287
Test Date: 2024-02-14 18:05:35 Functions: 61.1 % 72 44

            Line data    Source code
       1              : //! Helper functions to download files from remote storage with a RemoteStorage
       2              : //!
       3              : //! The functions in this module retry failed operations automatically, according
       4              : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5              : 
       6              : use std::collections::HashSet;
       7              : use std::future::Future;
       8              : 
       9              : use anyhow::{anyhow, Context};
      10              : use camino::{Utf8Path, Utf8PathBuf};
      11              : use pageserver_api::shard::TenantShardId;
      12              : use tokio::fs::{self, File, OpenOptions};
      13              : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::warn;
      16              : use utils::timeout::timeout_cancellable;
      17              : use utils::{backoff, crashsafe};
      18              : 
      19              : use crate::config::PageServerConf;
      20              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      21              : use crate::tenant::remote_timeline_client::{
      22              :     download_cancellable, remote_layer_path, remote_timelines_path, DOWNLOAD_TIMEOUT,
      23              : };
      24              : use crate::tenant::storage_layer::LayerFileName;
      25              : use crate::tenant::Generation;
      26              : use crate::virtual_file::on_fatal_io_error;
      27              : use crate::TEMP_FILE_SUFFIX;
      28              : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
      29              : use utils::crashsafe::path_with_suffix_extension;
      30              : use utils::id::TimelineId;
      31              : 
      32              : use super::index::{IndexPart, LayerFileMetadata};
      33              : use super::{
      34              :     parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
      35              :     remote_initdb_preserved_archive_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
      36              :     INITDB_PATH,
      37              : };
      38              : 
      39              : ///
      40              : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      41              : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      42              : ///
      43              : /// Returns the size of the downloaded file.
      44        10731 : pub async fn download_layer_file<'a>(
      45        10731 :     conf: &'static PageServerConf,
      46        10731 :     storage: &'a GenericRemoteStorage,
      47        10731 :     tenant_shard_id: TenantShardId,
      48        10731 :     timeline_id: TimelineId,
      49        10731 :     layer_file_name: &'a LayerFileName,
      50        10731 :     layer_metadata: &'a LayerFileMetadata,
      51        10731 :     cancel: &CancellationToken,
      52        10731 : ) -> Result<u64, DownloadError> {
      53        10731 :     debug_assert_current_span_has_tenant_and_timeline_id();
      54        10731 : 
      55        10731 :     let local_path = conf
      56        10731 :         .timeline_path(&tenant_shard_id, &timeline_id)
      57        10731 :         .join(layer_file_name.file_name());
      58        10731 : 
      59        10731 :     let remote_path = remote_layer_path(
      60        10731 :         &tenant_shard_id.tenant_id,
      61        10731 :         &timeline_id,
      62        10731 :         layer_metadata.shard,
      63        10731 :         layer_file_name,
      64        10731 :         layer_metadata.generation,
      65        10731 :     );
      66        10731 : 
      67        10731 :     // Perform a rename inspired by durable_rename from file_utils.c.
      68        10731 :     // The sequence:
      69        10731 :     //     write(tmp)
      70        10731 :     //     fsync(tmp)
      71        10731 :     //     rename(tmp, new)
      72        10731 :     //     fsync(new)
      73        10731 :     //     fsync(parent)
      74        10731 :     // For more context about durable_rename check this email from postgres mailing list:
      75        10731 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      76        10731 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      77        10731 :     let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
      78              : 
      79        10731 :     let (mut destination_file, bytes_amount) = download_retry(
      80        10758 :         || async {
      81        10758 :             let destination_file = tokio::fs::File::create(&temp_file_path)
      82        10291 :                 .await
      83        10758 :                 .with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
      84        10758 :                 .map_err(DownloadError::Other)?;
      85              : 
      86              :             // Cancellation safety: it is safe to cancel this future, because it isn't writing to a local
      87              :             // file: the write to local file doesn't start until after the request header is returned
      88              :             // and we start draining the body stream below
      89        10758 :             let download = download_cancellable(cancel, storage.download(&remote_path))
      90        29701 :                 .await
      91        10758 :                 .with_context(|| {
      92           30 :                     format!(
      93           30 :                     "open a download stream for layer with remote storage path '{remote_path:?}'"
      94           30 :                 )
      95        10758 :                 })
      96        10758 :                 .map_err(DownloadError::Other)?;
      97              : 
      98        10728 :             let mut destination_file =
      99        10728 :                 tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
     100        10728 : 
     101        10728 :             let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
     102              : 
     103              :             // Cancellation safety: it is safe to cancel this future because it is writing into a temporary file,
     104              :             // and we will unlink the temporary file if there is an error.  This unlink is important because we
     105              :             // are in a retry loop, and we wouldn't want to leave behind a rogue write I/O to a file that
     106              :             // we will imminiently try and write to again.
     107        10728 :             let bytes_amount: u64 = match timeout_cancellable(
     108        10728 :                 DOWNLOAD_TIMEOUT,
     109        10728 :                 cancel,
     110        10728 :                 tokio::io::copy_buf(&mut reader, &mut destination_file),
     111        10728 :             )
     112       422199 :             .await
     113        10728 :             .with_context(|| {
     114            1 :                 format!(
     115            1 :                     "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
     116            1 :                 )
     117        10728 :             })
     118        10728 :             .map_err(DownloadError::Other)?
     119              :             {
     120        10727 :                 Ok(b) => Ok(b),
     121            0 :                 Err(e) => {
     122              :                     // Remove incomplete files: on restart Timeline would do this anyway, but we must
     123              :                     // do it here for the retry case.
     124            0 :                     if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
     125            0 :                         on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
     126            0 :                     }
     127            0 :                     Err(e)
     128              :                 }
     129              :             }
     130        10727 :             .with_context(|| {
     131            0 :                 format!(
     132            0 :                     "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
     133            0 :                 )
     134        10727 :             })
     135        10727 :             .map_err(DownloadError::Other)?;
     136              : 
     137        10727 :             let destination_file = destination_file.into_inner();
     138        10727 : 
     139        10727 :             Ok((destination_file, bytes_amount))
     140        21516 :         },
     141        10731 :         &format!("download {remote_path:?}"),
     142        10731 :         cancel,
     143        10731 :     )
     144       462191 :     .await?;
     145              : 
     146              :     // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     147              :     // A file will not be closed immediately when it goes out of scope if there are any IO operations
     148              :     // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     149              :     // you should call flush before dropping it.
     150              :     //
     151              :     // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     152              :     // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     153              :     // But for additional safety lets check/wait for any pending operations.
     154        10727 :     destination_file
     155        10727 :         .flush()
     156            0 :         .await
     157        10727 :         .with_context(|| format!("flush source file at {temp_file_path}"))
     158        10727 :         .map_err(DownloadError::Other)?;
     159              : 
     160        10727 :     let expected = layer_metadata.file_size();
     161        10727 :     if expected != bytes_amount {
     162            0 :         return Err(DownloadError::Other(anyhow!(
     163            0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
     164            0 :         )));
     165        10727 :     }
     166        10727 : 
     167        10727 :     // not using sync_data because it can lose file size update
     168        10727 :     destination_file
     169        10727 :         .sync_all()
     170        10724 :         .await
     171        10726 :         .with_context(|| format!("failed to fsync source file at {temp_file_path}"))
     172        10726 :         .map_err(DownloadError::Other)?;
     173        10726 :     drop(destination_file);
     174        10726 : 
     175        10726 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
     176            6 :         Err(DownloadError::Other(anyhow!(
     177            6 :             "remote-storage-download-pre-rename failpoint triggered"
     178            6 :         )))
     179        10726 :     });
     180              : 
     181        10720 :     fs::rename(&temp_file_path, &local_path)
     182        10311 :         .await
     183        10720 :         .with_context(|| format!("rename download layer file to {local_path}"))
     184        10720 :         .map_err(DownloadError::Other)?;
     185              : 
     186        10720 :     crashsafe::fsync_async(&local_path)
     187        20880 :         .await
     188        10719 :         .with_context(|| format!("fsync layer file {local_path}"))
     189        10719 :         .map_err(DownloadError::Other)?;
     190              : 
     191            0 :     tracing::debug!("download complete: {local_path}");
     192              : 
     193        10719 :     Ok(bytes_amount)
     194        10729 : }
     195              : 
     196              : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     197              : 
     198           51 : pub fn is_temp_download_file(path: &Utf8Path) -> bool {
     199           51 :     let extension = path.extension();
     200           51 :     match extension {
     201            4 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     202            1 :         Some(_) => false,
     203           47 :         None => false,
     204              :     }
     205           51 : }
     206              : 
     207              : /// List timelines of given tenant in remote storage
     208          872 : pub async fn list_remote_timelines(
     209          872 :     storage: &GenericRemoteStorage,
     210          872 :     tenant_shard_id: TenantShardId,
     211          872 :     cancel: CancellationToken,
     212          872 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
     213          872 :     let remote_path = remote_timelines_path(&tenant_shard_id);
     214          872 : 
     215          872 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     216            6 :         anyhow::bail!("storage-sync-list-remote-timelines");
     217          872 :     });
     218              : 
     219          866 :     let listing = download_retry_forever(
     220          929 :         || {
     221          929 :             download_cancellable(
     222          929 :                 &cancel,
     223          929 :                 storage.list(Some(&remote_path), ListingMode::WithDelimiter, None),
     224          929 :             )
     225          929 :         },
     226          866 :         &format!("list timelines for {tenant_shard_id}"),
     227          866 :         &cancel,
     228          866 :     )
     229         2243 :     .await?;
     230              : 
     231          866 :     let mut timeline_ids = HashSet::new();
     232          866 :     let mut other_prefixes = HashSet::new();
     233              : 
     234         1312 :     for timeline_remote_storage_key in listing.prefixes {
     235          446 :         let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
     236            0 :             anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
     237          446 :         })?;
     238              : 
     239          446 :         match object_name.parse::<TimelineId>() {
     240          446 :             Ok(t) => timeline_ids.insert(t),
     241            0 :             Err(_) => other_prefixes.insert(object_name.to_string()),
     242              :         };
     243              :     }
     244              : 
     245          881 :     for key in listing.keys {
     246           15 :         let object_name = key
     247           15 :             .object_name()
     248           15 :             .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
     249           15 :         other_prefixes.insert(object_name.to_string());
     250              :     }
     251              : 
     252          866 :     Ok((timeline_ids, other_prefixes))
     253          872 : }
     254              : 
     255         1085 : async fn do_download_index_part(
     256         1085 :     storage: &GenericRemoteStorage,
     257         1085 :     tenant_shard_id: &TenantShardId,
     258         1085 :     timeline_id: &TimelineId,
     259         1085 :     index_generation: Generation,
     260         1085 :     cancel: &CancellationToken,
     261         1085 : ) -> Result<IndexPart, DownloadError> {
     262         1085 :     use futures::stream::StreamExt;
     263         1085 : 
     264         1085 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
     265              : 
     266         1085 :     let index_part_bytes = download_retry_forever(
     267         1139 :         || async {
     268              :             // Cancellation: if is safe to cancel this future because we're just downloading into
     269              :             // a memory buffer, not touching local disk.
     270          462 :             let index_part_download =
     271         1564 :                 download_cancellable(cancel, storage.download(&remote_path)).await?;
     272              : 
     273          462 :             let mut index_part_bytes = Vec::new();
     274          462 :             let mut stream = std::pin::pin!(index_part_download.download_stream);
     275         1319 :             while let Some(chunk) = stream.next().await {
     276          857 :                 let chunk = chunk
     277          857 :                     .with_context(|| format!("download index part at {remote_path:?}"))
     278          857 :                     .map_err(DownloadError::Other)?;
     279          857 :                 index_part_bytes.extend_from_slice(&chunk[..]);
     280              :             }
     281          462 :             Ok(index_part_bytes)
     282         2278 :         },
     283         1085 :         &format!("download {remote_path:?}"),
     284         1085 :         cancel,
     285         1085 :     )
     286         2558 :     .await?;
     287              : 
     288          462 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     289          462 :         .with_context(|| format!("deserialize index part file at {remote_path:?}"))
     290          462 :         .map_err(DownloadError::Other)?;
     291              : 
     292          462 :     Ok(index_part)
     293         1085 : }
     294              : 
     295              : /// index_part.json objects are suffixed with a generation number, so we cannot
     296              : /// directly GET the latest index part without doing some probing.
     297              : ///
     298              : /// In this function we probe for the most recent index in a generation <= our current generation.
     299              : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     300          930 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     301              : pub(super) async fn download_index_part(
     302              :     storage: &GenericRemoteStorage,
     303              :     tenant_shard_id: &TenantShardId,
     304              :     timeline_id: &TimelineId,
     305              :     my_generation: Generation,
     306              :     cancel: &CancellationToken,
     307              : ) -> Result<IndexPart, DownloadError> {
     308              :     debug_assert_current_span_has_tenant_and_timeline_id();
     309              : 
     310              :     if my_generation.is_none() {
     311              :         // Operating without generations: just fetch the generation-less path
     312              :         return do_download_index_part(
     313              :             storage,
     314              :             tenant_shard_id,
     315              :             timeline_id,
     316              :             my_generation,
     317              :             cancel,
     318              :         )
     319              :         .await;
     320              :     }
     321              : 
     322              :     // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
     323              :     // index in our generation.
     324              :     //
     325              :     // This is an optimization to avoid doing the listing for the general case below.
     326              :     let res =
     327              :         do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     328              :     match res {
     329              :         Ok(index_part) => {
     330            0 :             tracing::debug!(
     331            0 :                 "Found index_part from current generation (this is a stale attachment)"
     332            0 :             );
     333              :             return Ok(index_part);
     334              :         }
     335              :         Err(DownloadError::NotFound) => {}
     336              :         Err(e) => return Err(e),
     337              :     };
     338              : 
     339              :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded
     340              :     // and index part.  We may safely start from this index without doing a listing, because:
     341              :     //  - We checked for current generation case above
     342              :     //  - generations > my_generation are to be ignored
     343              :     //  - any other indices that exist would have an older generation than `previous_gen`, and
     344              :     //    we want to find the most recent index from a previous generation.
     345              :     //
     346              :     // This is an optimization to avoid doing the listing for the general case below.
     347              :     let res = do_download_index_part(
     348              :         storage,
     349              :         tenant_shard_id,
     350              :         timeline_id,
     351              :         my_generation.previous(),
     352              :         cancel,
     353              :     )
     354              :     .await;
     355              :     match res {
     356              :         Ok(index_part) => {
     357            0 :             tracing::debug!("Found index_part from previous generation");
     358              :             return Ok(index_part);
     359              :         }
     360              :         Err(DownloadError::NotFound) => {
     361            0 :             tracing::debug!(
     362            0 :                 "No index_part found from previous generation, falling back to listing"
     363            0 :             );
     364              :         }
     365              :         Err(e) => {
     366              :             return Err(e);
     367              :         }
     368              :     }
     369              : 
     370              :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     371              :     // objects, and select the highest one with a generation <= my_generation.  Constructing the prefix is equivalent
     372              :     // to constructing a full index path with no generation, because the generation is a suffix.
     373              :     let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
     374              : 
     375              :     let indices = download_retry(
     376          425 :         || async { storage.list_files(Some(&index_prefix), None).await },
     377              :         "list index_part files",
     378              :         cancel,
     379              :     )
     380              :     .await?;
     381              : 
     382              :     // General case logic for which index to use: the latest index whose generation
     383              :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     384              :     let max_previous_generation = indices
     385              :         .into_iter()
     386              :         .filter_map(parse_remote_index_path)
     387          403 :         .filter(|g| g <= &my_generation)
     388              :         .max();
     389              : 
     390              :     match max_previous_generation {
     391              :         Some(g) => {
     392            0 :             tracing::debug!("Found index_part in generation {g:?}");
     393              :             do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
     394              :         }
     395              :         None => {
     396              :             // Migration from legacy pre-generation state: we have a generation but no prior
     397              :             // attached pageservers did.  Try to load from a no-generation path.
     398            0 :             tracing::debug!("No index_part.json* found");
     399              :             do_download_index_part(
     400              :                 storage,
     401              :                 tenant_shard_id,
     402              :                 timeline_id,
     403              :                 Generation::none(),
     404              :                 cancel,
     405              :             )
     406              :             .await
     407              :         }
     408              :     }
     409              : }
     410              : 
     411            4 : pub(crate) async fn download_initdb_tar_zst(
     412            4 :     conf: &'static PageServerConf,
     413            4 :     storage: &GenericRemoteStorage,
     414            4 :     tenant_shard_id: &TenantShardId,
     415            4 :     timeline_id: &TimelineId,
     416            4 :     cancel: &CancellationToken,
     417            4 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
     418            4 :     debug_assert_current_span_has_tenant_and_timeline_id();
     419            4 : 
     420            4 :     let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     421            4 : 
     422            4 :     let remote_preserved_path =
     423            4 :         remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     424            4 : 
     425            4 :     let timeline_path = conf.timelines_path(tenant_shard_id);
     426            4 : 
     427            4 :     if !timeline_path.exists() {
     428            0 :         tokio::fs::create_dir_all(&timeline_path)
     429            0 :             .await
     430            0 :             .with_context(|| format!("timeline dir creation {timeline_path}"))
     431            0 :             .map_err(DownloadError::Other)?;
     432            4 :     }
     433            4 :     let temp_path = timeline_path.join(format!(
     434            4 :         "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
     435            4 :     ));
     436              : 
     437            4 :     let file = download_retry(
     438            4 :         || async {
     439            4 :             let file = OpenOptions::new()
     440            4 :                 .create(true)
     441            4 :                 .truncate(true)
     442            4 :                 .read(true)
     443            4 :                 .write(true)
     444            4 :                 .open(&temp_path)
     445            4 :                 .await
     446            4 :                 .with_context(|| format!("tempfile creation {temp_path}"))
     447            4 :                 .map_err(DownloadError::Other)?;
     448              : 
     449            4 :             let download = match download_cancellable(cancel, storage.download(&remote_path)).await
     450              :             {
     451            2 :                 Ok(dl) => dl,
     452              :                 Err(DownloadError::NotFound) => {
     453            2 :                     download_cancellable(cancel, storage.download(&remote_preserved_path)).await?
     454              :                 }
     455            0 :                 Err(other) => Err(other)?,
     456              :             };
     457            4 :             let mut download = tokio_util::io::StreamReader::new(download.download_stream);
     458            4 :             let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
     459            4 : 
     460            4 :             // TODO: this consumption of the response body should be subject to timeout + cancellation, but
     461            4 :             // not without thinking carefully about how to recover safely from cancelling a write to
     462            4 :             // local storage (e.g. by writing into a temp file as we do in download_layer)
     463            4 :             tokio::io::copy_buf(&mut download, &mut writer)
     464         1618 :                 .await
     465            4 :                 .with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))
     466            4 :                 .map_err(DownloadError::Other)?;
     467              : 
     468            4 :             let mut file = writer.into_inner();
     469            4 : 
     470            4 :             file.seek(std::io::SeekFrom::Start(0))
     471            4 :                 .await
     472            4 :                 .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
     473            4 :                 .map_err(DownloadError::Other)?;
     474              : 
     475            4 :             Ok(file)
     476            8 :         },
     477            4 :         &format!("download {remote_path}"),
     478            4 :         cancel,
     479            4 :     )
     480         1630 :     .await
     481            4 :     .map_err(|e| {
     482              :         // Do a best-effort attempt at deleting the temporary file upon encountering an error.
     483              :         // We don't have async here nor do we want to pile on any extra errors.
     484            0 :         if let Err(e) = std::fs::remove_file(&temp_path) {
     485            0 :             if e.kind() != std::io::ErrorKind::NotFound {
     486            0 :                 warn!("error deleting temporary file {temp_path}: {e}");
     487            0 :             }
     488            0 :         }
     489            0 :         e
     490            4 :     })?;
     491              : 
     492            4 :     Ok((temp_path, file))
     493            4 : }
     494              : 
     495              : /// Helper function to handle retries for a download operation.
     496              : ///
     497              : /// Remote operations can fail due to rate limits (S3), spurious network
     498              : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     499              : /// with backoff.
     500              : ///
     501              : /// (See similar logic for uploads in `perform_upload_task`)
     502        11109 : pub(super) async fn download_retry<T, O, F>(
     503        11109 :     op: O,
     504        11109 :     description: &str,
     505        11109 :     cancel: &CancellationToken,
     506        11109 : ) -> Result<T, DownloadError>
     507        11109 : where
     508        11109 :     O: FnMut() -> F,
     509        11109 :     F: Future<Output = Result<T, DownloadError>>,
     510        11109 : {
     511        11109 :     backoff::retry(
     512        11109 :         op,
     513        11109 :         DownloadError::is_permanent,
     514        11109 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     515        11109 :         FAILED_REMOTE_OP_RETRIES,
     516        11109 :         description,
     517        11109 :         cancel,
     518        11109 :     )
     519       465039 :     .await
     520        11109 :     .ok_or_else(|| DownloadError::Cancelled)
     521        11109 :     .and_then(|x| x)
     522        11109 : }
     523              : 
     524         1951 : async fn download_retry_forever<T, O, F>(
     525         1951 :     op: O,
     526         1951 :     description: &str,
     527         1951 :     cancel: &CancellationToken,
     528         1951 : ) -> Result<T, DownloadError>
     529         1951 : where
     530         1951 :     O: FnMut() -> F,
     531         1951 :     F: Future<Output = Result<T, DownloadError>>,
     532         1951 : {
     533         1951 :     backoff::retry(
     534         1951 :         op,
     535         1951 :         DownloadError::is_permanent,
     536         1951 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     537         1951 :         u32::MAX,
     538         1951 :         description,
     539         1951 :         cancel,
     540         1951 :     )
     541         4801 :     .await
     542         1951 :     .ok_or_else(|| DownloadError::Cancelled)
     543         1951 :     .and_then(|x| x)
     544         1951 : }
        

Generated by: LCOV version 2.1-beta