Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::str::FromStr;
9 :
10 : use anyhow::{anyhow, Context};
11 : use camino::{Utf8Path, Utf8PathBuf};
12 : use pageserver_api::shard::TenantShardId;
13 : use tokio::fs::{self, File, OpenOptions};
14 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
15 : use tokio_util::io::StreamReader;
16 : use tokio_util::sync::CancellationToken;
17 : use tracing::warn;
18 : use utils::backoff;
19 :
20 : use crate::config::PageServerConf;
21 : use crate::context::RequestContext;
22 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
23 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
24 : use crate::tenant::storage_layer::LayerName;
25 : use crate::tenant::Generation;
26 : #[cfg_attr(target_os = "macos", allow(unused_imports))]
27 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
28 : use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
29 : use crate::TEMP_FILE_SUFFIX;
30 : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
31 : use utils::crashsafe::path_with_suffix_extension;
32 : use utils::id::{TenantId, TimelineId};
33 : use utils::pausable_failpoint;
34 :
35 : use super::index::{IndexPart, LayerFileMetadata};
36 : use super::{
37 : parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
38 : remote_initdb_preserved_archive_path, remote_tenant_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
39 : FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
40 : };
41 :
42 : ///
43 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
44 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
45 : ///
46 : /// Returns the size of the downloaded file.
47 : #[allow(clippy::too_many_arguments)]
48 18 : pub async fn download_layer_file<'a>(
49 18 : conf: &'static PageServerConf,
50 18 : storage: &'a GenericRemoteStorage,
51 18 : tenant_shard_id: TenantShardId,
52 18 : timeline_id: TimelineId,
53 18 : layer_file_name: &'a LayerName,
54 18 : layer_metadata: &'a LayerFileMetadata,
55 18 : local_path: &Utf8Path,
56 18 : cancel: &CancellationToken,
57 18 : ctx: &RequestContext,
58 18 : ) -> Result<u64, DownloadError> {
59 18 : debug_assert_current_span_has_tenant_and_timeline_id();
60 18 :
61 18 : let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
62 18 :
63 18 : let remote_path = remote_layer_path(
64 18 : &tenant_shard_id.tenant_id,
65 18 : &timeline_id,
66 18 : layer_metadata.shard,
67 18 : layer_file_name,
68 18 : layer_metadata.generation,
69 18 : );
70 18 :
71 18 : // Perform a rename inspired by durable_rename from file_utils.c.
72 18 : // The sequence:
73 18 : // write(tmp)
74 18 : // fsync(tmp)
75 18 : // rename(tmp, new)
76 18 : // fsync(new)
77 18 : // fsync(parent)
78 18 : // For more context about durable_rename check this email from postgres mailing list:
79 18 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
80 18 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
81 18 : let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
82 :
83 18 : let bytes_amount = download_retry(
84 228 : || async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await },
85 18 : &format!("download {remote_path:?}"),
86 18 : cancel,
87 18 : )
88 228 : .await?;
89 :
90 18 : let expected = layer_metadata.file_size;
91 18 : if expected != bytes_amount {
92 0 : return Err(DownloadError::Other(anyhow!(
93 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
94 0 : )));
95 18 : }
96 18 :
97 18 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
98 0 : Err(DownloadError::Other(anyhow!(
99 0 : "remote-storage-download-pre-rename failpoint triggered"
100 0 : )))
101 18 : });
102 :
103 18 : fs::rename(&temp_file_path, &local_path)
104 18 : .await
105 18 : .with_context(|| format!("rename download layer file to {local_path}"))
106 18 : .map_err(DownloadError::Other)?;
107 :
108 : // We use fatal_err() below because the after the rename above,
109 : // the in-memory state of the filesystem already has the layer file in its final place,
110 : // and subsequent pageserver code could think it's durable while it really isn't.
111 18 : let work = {
112 18 : let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
113 18 : async move {
114 18 : let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
115 9 : .await
116 18 : .fatal_err("VirtualFile::open for timeline dir fsync");
117 18 : timeline_dir
118 18 : .sync_all()
119 9 : .await
120 18 : .fatal_err("VirtualFile::sync_all timeline dir");
121 18 : }
122 : };
123 18 : crate::virtual_file::io_engine::get()
124 18 : .spawn_blocking_and_block_on_if_std(work)
125 27 : .await;
126 :
127 18 : tracing::debug!("download complete: {local_path}");
128 :
129 18 : Ok(bytes_amount)
130 18 : }
131 :
132 : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
133 : ///
134 : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
135 : /// (Note that the directory entry for the inode is not made durable.)
136 : /// The file size in bytes is returned.
137 : ///
138 : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
139 : /// The unlinking has _not_ been made durable.
140 18 : async fn download_object<'a>(
141 18 : storage: &'a GenericRemoteStorage,
142 18 : src_path: &RemotePath,
143 18 : dst_path: &Utf8PathBuf,
144 18 : cancel: &CancellationToken,
145 18 : #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
146 18 : ) -> Result<u64, DownloadError> {
147 18 : let res = match crate::virtual_file::io_engine::get() {
148 0 : crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
149 : crate::virtual_file::io_engine::IoEngine::StdFs => {
150 9 : async {
151 9 : let destination_file = tokio::fs::File::create(dst_path)
152 8 : .await
153 9 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
154 9 : .map_err(DownloadError::Other)?;
155 :
156 18 : let download = storage.download(src_path, cancel).await?;
157 :
158 9 : pausable_failpoint!("before-downloading-layer-stream-pausable");
159 :
160 9 : let mut buf_writer =
161 9 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
162 9 :
163 9 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
164 :
165 71 : let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
166 9 : buf_writer.flush().await?;
167 :
168 9 : let mut destination_file = buf_writer.into_inner();
169 9 :
170 9 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
171 9 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
172 9 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
173 9 : // you should call flush before dropping it.
174 9 : //
175 9 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
176 9 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
177 9 : // But for additional safety lets check/wait for any pending operations.
178 9 : destination_file
179 9 : .flush()
180 0 : .await
181 9 : .with_context(|| format!("flush source file at {dst_path}"))
182 9 : .map_err(DownloadError::Other)?;
183 :
184 : // not using sync_data because it can lose file size update
185 9 : destination_file
186 9 : .sync_all()
187 9 : .await
188 9 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
189 9 : .map_err(DownloadError::Other)?;
190 :
191 9 : Ok(bytes_amount)
192 9 : }
193 115 : .await
194 : }
195 : #[cfg(target_os = "linux")]
196 : crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
197 : use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
198 : use bytes::BytesMut;
199 9 : async {
200 9 : let destination_file = VirtualFile::create(dst_path, ctx)
201 9 : .await
202 9 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
203 9 : .map_err(DownloadError::Other)?;
204 :
205 17 : let mut download = storage.download(src_path, cancel).await?;
206 :
207 9 : pausable_failpoint!("before-downloading-layer-stream-pausable");
208 :
209 : // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
210 : // There's chunks_vectored() on the stream.
211 9 : let (bytes_amount, destination_file) = async {
212 9 : let size_tracking = size_tracking_writer::Writer::new(destination_file);
213 9 : let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
214 9 : size_tracking,
215 9 : BytesMut::with_capacity(super::BUFFER_SIZE),
216 9 : );
217 54 : while let Some(res) =
218 63 : futures::StreamExt::next(&mut download.download_stream).await
219 : {
220 54 : let chunk = match res {
221 54 : Ok(chunk) => chunk,
222 0 : Err(e) => return Err(e),
223 : };
224 54 : buffered.write_buffered(chunk.slice_len(), ctx).await?;
225 : }
226 9 : let size_tracking = buffered.flush_and_into_inner(ctx).await?;
227 9 : Ok(size_tracking.into_inner())
228 9 : }
229 69 : .await?;
230 :
231 : // not using sync_data because it can lose file size update
232 9 : destination_file
233 9 : .sync_all()
234 9 : .await
235 9 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
236 9 : .map_err(DownloadError::Other)?;
237 :
238 9 : Ok(bytes_amount)
239 9 : }
240 113 : .await
241 : }
242 : };
243 :
244 : // in case the download failed, clean up
245 18 : match res {
246 18 : Ok(bytes_amount) => Ok(bytes_amount),
247 0 : Err(e) => {
248 0 : if let Err(e) = tokio::fs::remove_file(dst_path).await {
249 0 : if e.kind() != std::io::ErrorKind::NotFound {
250 0 : on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
251 0 : }
252 0 : }
253 0 : Err(e)
254 : }
255 : }
256 18 : }
257 :
258 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
259 :
260 0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
261 0 : let extension = path.extension();
262 0 : match extension {
263 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
264 0 : Some(_) => false,
265 0 : None => false,
266 : }
267 0 : }
268 :
269 576 : async fn list_identifiers<T>(
270 576 : storage: &GenericRemoteStorage,
271 576 : prefix: RemotePath,
272 576 : cancel: CancellationToken,
273 576 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
274 576 : where
275 576 : T: FromStr + Eq + std::hash::Hash,
276 576 : {
277 576 : let listing = download_retry_forever(
278 576 : || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
279 576 : &format!("list identifiers in prefix {prefix}"),
280 576 : &cancel,
281 576 : )
282 2258 : .await?;
283 :
284 576 : let mut parsed_ids = HashSet::new();
285 576 : let mut other_prefixes = HashSet::new();
286 :
287 594 : for id_remote_storage_key in listing.prefixes {
288 18 : let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
289 0 : anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
290 18 : })?;
291 :
292 18 : match object_name.parse::<T>() {
293 18 : Ok(t) => parsed_ids.insert(t),
294 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
295 : };
296 : }
297 :
298 576 : for object in listing.keys {
299 0 : let object_name = object
300 0 : .key
301 0 : .object_name()
302 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
303 0 : other_prefixes.insert(object_name.to_string());
304 : }
305 :
306 576 : Ok((parsed_ids, other_prefixes))
307 576 : }
308 :
309 : /// List shards of given tenant in remote storage
310 0 : pub(crate) async fn list_remote_tenant_shards(
311 0 : storage: &GenericRemoteStorage,
312 0 : tenant_id: TenantId,
313 0 : cancel: CancellationToken,
314 0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
315 0 : let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
316 0 : list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
317 0 : }
318 :
319 : /// List timelines of given tenant shard in remote storage
320 576 : pub async fn list_remote_timelines(
321 576 : storage: &GenericRemoteStorage,
322 576 : tenant_shard_id: TenantShardId,
323 576 : cancel: CancellationToken,
324 576 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
325 576 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
326 0 : anyhow::bail!("storage-sync-list-remote-timelines");
327 576 : });
328 :
329 576 : let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
330 2258 : list_identifiers::<TimelineId>(storage, remote_path, cancel).await
331 576 : }
332 :
333 102 : async fn do_download_index_part(
334 102 : storage: &GenericRemoteStorage,
335 102 : tenant_shard_id: &TenantShardId,
336 102 : timeline_id: &TimelineId,
337 102 : index_generation: Generation,
338 102 : cancel: &CancellationToken,
339 102 : ) -> Result<(IndexPart, Generation), DownloadError> {
340 102 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
341 :
342 102 : let index_part_bytes = download_retry_forever(
343 102 : || async {
344 161 : let download = storage.download(&remote_path, cancel).await?;
345 :
346 60 : let mut bytes = Vec::new();
347 60 :
348 60 : let stream = download.download_stream;
349 60 : let mut stream = StreamReader::new(stream);
350 60 :
351 117 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
352 :
353 60 : Ok(bytes)
354 204 : },
355 102 : &format!("download {remote_path:?}"),
356 102 : cancel,
357 102 : )
358 278 : .await?;
359 :
360 60 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
361 60 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
362 60 : .map_err(DownloadError::Other)?;
363 :
364 60 : Ok((index_part, index_generation))
365 102 : }
366 :
367 : /// index_part.json objects are suffixed with a generation number, so we cannot
368 : /// directly GET the latest index part without doing some probing.
369 : ///
370 : /// In this function we probe for the most recent index in a generation <= our current generation.
371 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
372 60 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
373 : pub(crate) async fn download_index_part(
374 : storage: &GenericRemoteStorage,
375 : tenant_shard_id: &TenantShardId,
376 : timeline_id: &TimelineId,
377 : my_generation: Generation,
378 : cancel: &CancellationToken,
379 : ) -> Result<(IndexPart, Generation), DownloadError> {
380 : debug_assert_current_span_has_tenant_and_timeline_id();
381 :
382 : if my_generation.is_none() {
383 : // Operating without generations: just fetch the generation-less path
384 : return do_download_index_part(
385 : storage,
386 : tenant_shard_id,
387 : timeline_id,
388 : my_generation,
389 : cancel,
390 : )
391 : .await;
392 : }
393 :
394 : // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
395 : // index in our generation.
396 : //
397 : // This is an optimization to avoid doing the listing for the general case below.
398 : let res =
399 : do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
400 : match res {
401 : Ok(index_part) => {
402 : tracing::debug!(
403 : "Found index_part from current generation (this is a stale attachment)"
404 : );
405 : return Ok(index_part);
406 : }
407 : Err(DownloadError::NotFound) => {}
408 : Err(e) => return Err(e),
409 : };
410 :
411 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded
412 : // and index part. We may safely start from this index without doing a listing, because:
413 : // - We checked for current generation case above
414 : // - generations > my_generation are to be ignored
415 : // - any other indices that exist would have an older generation than `previous_gen`, and
416 : // we want to find the most recent index from a previous generation.
417 : //
418 : // This is an optimization to avoid doing the listing for the general case below.
419 : let res = do_download_index_part(
420 : storage,
421 : tenant_shard_id,
422 : timeline_id,
423 : my_generation.previous(),
424 : cancel,
425 : )
426 : .await;
427 : match res {
428 : Ok(index_part) => {
429 : tracing::debug!("Found index_part from previous generation");
430 : return Ok(index_part);
431 : }
432 : Err(DownloadError::NotFound) => {
433 : tracing::debug!(
434 : "No index_part found from previous generation, falling back to listing"
435 : );
436 : }
437 : Err(e) => {
438 : return Err(e);
439 : }
440 : }
441 :
442 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
443 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
444 : // to constructing a full index path with no generation, because the generation is a suffix.
445 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
446 :
447 : let indices = download_retry(
448 18 : || async {
449 18 : storage
450 18 : .list(Some(&index_prefix), ListingMode::NoDelimiter, None, cancel)
451 18 : .await
452 36 : },
453 : "list index_part files",
454 : cancel,
455 : )
456 : .await?
457 : .keys;
458 :
459 : // General case logic for which index to use: the latest index whose generation
460 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
461 : let max_previous_generation = indices
462 : .into_iter()
463 54 : .filter_map(|o| parse_remote_index_path(o.key))
464 36 : .filter(|g| g <= &my_generation)
465 : .max();
466 :
467 : match max_previous_generation {
468 : Some(g) => {
469 : tracing::debug!("Found index_part in generation {g:?}");
470 : do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
471 : }
472 : None => {
473 : // Migration from legacy pre-generation state: we have a generation but no prior
474 : // attached pageservers did. Try to load from a no-generation path.
475 : tracing::debug!("No index_part.json* found");
476 : do_download_index_part(
477 : storage,
478 : tenant_shard_id,
479 : timeline_id,
480 : Generation::none(),
481 : cancel,
482 : )
483 : .await
484 : }
485 : }
486 : }
487 :
488 6 : pub(crate) async fn download_initdb_tar_zst(
489 6 : conf: &'static PageServerConf,
490 6 : storage: &GenericRemoteStorage,
491 6 : tenant_shard_id: &TenantShardId,
492 6 : timeline_id: &TimelineId,
493 6 : cancel: &CancellationToken,
494 6 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
495 6 : debug_assert_current_span_has_tenant_and_timeline_id();
496 6 :
497 6 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
498 6 :
499 6 : let remote_preserved_path =
500 6 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
501 6 :
502 6 : let timeline_path = conf.timelines_path(tenant_shard_id);
503 6 :
504 6 : if !timeline_path.exists() {
505 0 : tokio::fs::create_dir_all(&timeline_path)
506 0 : .await
507 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
508 0 : .map_err(DownloadError::Other)?;
509 6 : }
510 6 : let temp_path = timeline_path.join(format!(
511 6 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
512 6 : ));
513 :
514 6 : let file = download_retry(
515 6 : || async {
516 6 : let file = OpenOptions::new()
517 6 : .create(true)
518 6 : .truncate(true)
519 6 : .read(true)
520 6 : .write(true)
521 6 : .open(&temp_path)
522 6 : .await
523 6 : .with_context(|| format!("tempfile creation {temp_path}"))
524 6 : .map_err(DownloadError::Other)?;
525 :
526 12 : let download = match storage.download(&remote_path, cancel).await {
527 6 : Ok(dl) => dl,
528 : Err(DownloadError::NotFound) => {
529 0 : storage.download(&remote_preserved_path, cancel).await?
530 : }
531 0 : Err(other) => Err(other)?,
532 : };
533 6 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
534 6 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
535 6 :
536 1991 : tokio::io::copy_buf(&mut download, &mut writer).await?;
537 :
538 6 : let mut file = writer.into_inner();
539 6 :
540 6 : file.seek(std::io::SeekFrom::Start(0))
541 5 : .await
542 6 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
543 6 : .map_err(DownloadError::Other)?;
544 :
545 6 : Ok(file)
546 12 : },
547 6 : &format!("download {remote_path}"),
548 6 : cancel,
549 6 : )
550 2014 : .await
551 6 : .inspect_err(|_e| {
552 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
553 : // We don't have async here nor do we want to pile on any extra errors.
554 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
555 0 : if e.kind() != std::io::ErrorKind::NotFound {
556 0 : warn!("error deleting temporary file {temp_path}: {e}");
557 0 : }
558 0 : }
559 6 : })?;
560 :
561 6 : Ok((temp_path, file))
562 6 : }
563 :
564 : /// Helper function to handle retries for a download operation.
565 : ///
566 : /// Remote operations can fail due to rate limits (S3), spurious network
567 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
568 : /// with backoff.
569 : ///
570 : /// (See similar logic for uploads in `perform_upload_task`)
571 42 : pub(super) async fn download_retry<T, O, F>(
572 42 : op: O,
573 42 : description: &str,
574 42 : cancel: &CancellationToken,
575 42 : ) -> Result<T, DownloadError>
576 42 : where
577 42 : O: FnMut() -> F,
578 42 : F: Future<Output = Result<T, DownloadError>>,
579 42 : {
580 42 : backoff::retry(
581 42 : op,
582 42 : DownloadError::is_permanent,
583 42 : FAILED_DOWNLOAD_WARN_THRESHOLD,
584 42 : FAILED_REMOTE_OP_RETRIES,
585 42 : description,
586 42 : cancel,
587 42 : )
588 2260 : .await
589 42 : .ok_or_else(|| DownloadError::Cancelled)
590 42 : .and_then(|x| x)
591 42 : }
592 :
593 678 : async fn download_retry_forever<T, O, F>(
594 678 : op: O,
595 678 : description: &str,
596 678 : cancel: &CancellationToken,
597 678 : ) -> Result<T, DownloadError>
598 678 : where
599 678 : O: FnMut() -> F,
600 678 : F: Future<Output = Result<T, DownloadError>>,
601 678 : {
602 678 : backoff::retry(
603 678 : op,
604 678 : DownloadError::is_permanent,
605 678 : FAILED_DOWNLOAD_WARN_THRESHOLD,
606 678 : u32::MAX,
607 678 : description,
608 678 : cancel,
609 678 : )
610 2536 : .await
611 678 : .ok_or_else(|| DownloadError::Cancelled)
612 678 : .and_then(|x| x)
613 678 : }
|