Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::str::FromStr;
9 :
10 : use anyhow::{anyhow, Context};
11 : use camino::{Utf8Path, Utf8PathBuf};
12 : use pageserver_api::shard::TenantShardId;
13 : use tokio::fs::{self, File, OpenOptions};
14 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
15 : use tokio_util::io::StreamReader;
16 : use tokio_util::sync::CancellationToken;
17 : use tracing::warn;
18 : use utils::backoff;
19 :
20 : use crate::config::PageServerConf;
21 : use crate::context::RequestContext;
22 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
23 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
24 : use crate::tenant::storage_layer::LayerName;
25 : use crate::tenant::Generation;
26 : use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
27 : use crate::TEMP_FILE_SUFFIX;
28 : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
29 : use utils::crashsafe::path_with_suffix_extension;
30 : use utils::id::{TenantId, TimelineId};
31 :
32 : use super::index::{IndexPart, LayerFileMetadata};
33 : use super::{
34 : parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
35 : remote_initdb_preserved_archive_path, remote_tenant_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
36 : FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
37 : };
38 :
39 : ///
40 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
41 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
42 : ///
43 : /// Returns the size of the downloaded file.
44 : #[allow(clippy::too_many_arguments)]
45 6 : pub async fn download_layer_file<'a>(
46 6 : conf: &'static PageServerConf,
47 6 : storage: &'a GenericRemoteStorage,
48 6 : tenant_shard_id: TenantShardId,
49 6 : timeline_id: TimelineId,
50 6 : layer_file_name: &'a LayerName,
51 6 : layer_metadata: &'a LayerFileMetadata,
52 6 : local_path: &Utf8Path,
53 6 : cancel: &CancellationToken,
54 6 : ctx: &RequestContext,
55 6 : ) -> Result<u64, DownloadError> {
56 6 : debug_assert_current_span_has_tenant_and_timeline_id();
57 6 :
58 6 : let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
59 6 :
60 6 : let remote_path = remote_layer_path(
61 6 : &tenant_shard_id.tenant_id,
62 6 : &timeline_id,
63 6 : layer_metadata.shard,
64 6 : layer_file_name,
65 6 : layer_metadata.generation,
66 6 : );
67 6 :
68 6 : // Perform a rename inspired by durable_rename from file_utils.c.
69 6 : // The sequence:
70 6 : // write(tmp)
71 6 : // fsync(tmp)
72 6 : // rename(tmp, new)
73 6 : // fsync(new)
74 6 : // fsync(parent)
75 6 : // For more context about durable_rename check this email from postgres mailing list:
76 6 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
77 6 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
78 6 : let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
79 :
80 6 : let bytes_amount = download_retry(
81 72 : || async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await },
82 6 : &format!("download {remote_path:?}"),
83 6 : cancel,
84 6 : )
85 72 : .await?;
86 :
87 6 : let expected = layer_metadata.file_size();
88 6 : if expected != bytes_amount {
89 0 : return Err(DownloadError::Other(anyhow!(
90 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
91 0 : )));
92 6 : }
93 6 :
94 6 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
95 0 : Err(DownloadError::Other(anyhow!(
96 0 : "remote-storage-download-pre-rename failpoint triggered"
97 0 : )))
98 6 : });
99 :
100 6 : fs::rename(&temp_file_path, &local_path)
101 6 : .await
102 6 : .with_context(|| format!("rename download layer file to {local_path}"))
103 6 : .map_err(DownloadError::Other)?;
104 :
105 : // We use fatal_err() below because the after the rename above,
106 : // the in-memory state of the filesystem already has the layer file in its final place,
107 : // and subsequent pageserver code could think it's durable while it really isn't.
108 6 : let work = {
109 6 : let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
110 6 : async move {
111 6 : let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
112 3 : .await
113 6 : .fatal_err("VirtualFile::open for timeline dir fsync");
114 6 : timeline_dir
115 6 : .sync_all()
116 3 : .await
117 6 : .fatal_err("VirtualFile::sync_all timeline dir");
118 6 : }
119 : };
120 6 : crate::virtual_file::io_engine::get()
121 6 : .spawn_blocking_and_block_on_if_std(work)
122 9 : .await;
123 :
124 6 : tracing::debug!("download complete: {local_path}");
125 :
126 6 : Ok(bytes_amount)
127 6 : }
128 :
129 : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
130 : ///
131 : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
132 : /// (Note that the directory entry for the inode is not made durable.)
133 : /// The file size in bytes is returned.
134 : ///
135 : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
136 : /// The unlinking has _not_ been made durable.
137 6 : async fn download_object<'a>(
138 6 : storage: &'a GenericRemoteStorage,
139 6 : src_path: &RemotePath,
140 6 : dst_path: &Utf8PathBuf,
141 6 : cancel: &CancellationToken,
142 6 : #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
143 6 : ) -> Result<u64, DownloadError> {
144 6 : let res = match crate::virtual_file::io_engine::get() {
145 0 : crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
146 : crate::virtual_file::io_engine::IoEngine::StdFs => {
147 3 : async {
148 3 : let destination_file = tokio::fs::File::create(dst_path)
149 3 : .await
150 3 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
151 3 : .map_err(DownloadError::Other)?;
152 :
153 6 : let download = storage.download(src_path, cancel).await?;
154 :
155 3 : let mut buf_writer =
156 3 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
157 3 :
158 3 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
159 :
160 24 : let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
161 3 : buf_writer.flush().await?;
162 :
163 3 : let mut destination_file = buf_writer.into_inner();
164 3 :
165 3 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
166 3 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
167 3 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
168 3 : // you should call flush before dropping it.
169 3 : //
170 3 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
171 3 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
172 3 : // But for additional safety lets check/wait for any pending operations.
173 3 : destination_file
174 3 : .flush()
175 0 : .await
176 3 : .with_context(|| format!("flush source file at {dst_path}"))
177 3 : .map_err(DownloadError::Other)?;
178 :
179 : // not using sync_data because it can lose file size update
180 3 : destination_file
181 3 : .sync_all()
182 3 : .await
183 3 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
184 3 : .map_err(DownloadError::Other)?;
185 :
186 3 : Ok(bytes_amount)
187 3 : }
188 36 : .await
189 : }
190 : #[cfg(target_os = "linux")]
191 : crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
192 : use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
193 : use bytes::BytesMut;
194 3 : async {
195 3 : let destination_file = VirtualFile::create(dst_path, ctx)
196 3 : .await
197 3 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
198 3 : .map_err(DownloadError::Other)?;
199 :
200 6 : let mut download = storage.download(src_path, cancel).await?;
201 :
202 : // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
203 : // There's chunks_vectored() on the stream.
204 3 : let (bytes_amount, destination_file) = async {
205 3 : let size_tracking = size_tracking_writer::Writer::new(destination_file);
206 3 : let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
207 3 : size_tracking,
208 3 : BytesMut::with_capacity(super::BUFFER_SIZE),
209 3 : );
210 18 : while let Some(res) =
211 21 : futures::StreamExt::next(&mut download.download_stream).await
212 : {
213 18 : let chunk = match res {
214 18 : Ok(chunk) => chunk,
215 0 : Err(e) => return Err(e),
216 : };
217 18 : buffered
218 18 : .write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk), ctx)
219 0 : .await?;
220 : }
221 3 : let size_tracking = buffered.flush_and_into_inner(ctx).await?;
222 3 : Ok(size_tracking.into_inner())
223 3 : }
224 24 : .await?;
225 :
226 : // not using sync_data because it can lose file size update
227 3 : destination_file
228 3 : .sync_all()
229 3 : .await
230 3 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
231 3 : .map_err(DownloadError::Other)?;
232 :
233 3 : Ok(bytes_amount)
234 3 : }
235 36 : .await
236 : }
237 : };
238 :
239 : // in case the download failed, clean up
240 6 : match res {
241 6 : Ok(bytes_amount) => Ok(bytes_amount),
242 0 : Err(e) => {
243 0 : if let Err(e) = tokio::fs::remove_file(dst_path).await {
244 0 : if e.kind() != std::io::ErrorKind::NotFound {
245 0 : on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
246 0 : }
247 0 : }
248 0 : Err(e)
249 : }
250 : }
251 6 : }
252 :
253 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
254 :
255 0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
256 0 : let extension = path.extension();
257 0 : match extension {
258 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
259 0 : Some(_) => false,
260 0 : None => false,
261 : }
262 0 : }
263 :
264 130 : async fn list_identifiers<T>(
265 130 : storage: &GenericRemoteStorage,
266 130 : prefix: RemotePath,
267 130 : cancel: CancellationToken,
268 130 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
269 130 : where
270 130 : T: FromStr + Eq + std::hash::Hash,
271 130 : {
272 130 : let listing = download_retry_forever(
273 130 : || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
274 130 : &format!("list identifiers in prefix {prefix}"),
275 130 : &cancel,
276 130 : )
277 504 : .await?;
278 :
279 130 : let mut parsed_ids = HashSet::new();
280 130 : let mut other_prefixes = HashSet::new();
281 :
282 136 : for id_remote_storage_key in listing.prefixes {
283 6 : let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
284 0 : anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
285 6 : })?;
286 :
287 6 : match object_name.parse::<T>() {
288 6 : Ok(t) => parsed_ids.insert(t),
289 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
290 : };
291 : }
292 :
293 130 : for key in listing.keys {
294 0 : let object_name = key
295 0 : .object_name()
296 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
297 0 : other_prefixes.insert(object_name.to_string());
298 : }
299 :
300 130 : Ok((parsed_ids, other_prefixes))
301 130 : }
302 :
303 : /// List shards of given tenant in remote storage
304 0 : pub(crate) async fn list_remote_tenant_shards(
305 0 : storage: &GenericRemoteStorage,
306 0 : tenant_id: TenantId,
307 0 : cancel: CancellationToken,
308 0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
309 0 : let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
310 0 : list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
311 0 : }
312 :
313 : /// List timelines of given tenant shard in remote storage
314 130 : pub async fn list_remote_timelines(
315 130 : storage: &GenericRemoteStorage,
316 130 : tenant_shard_id: TenantShardId,
317 130 : cancel: CancellationToken,
318 130 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
319 130 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
320 0 : anyhow::bail!("storage-sync-list-remote-timelines");
321 130 : });
322 :
323 130 : let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
324 504 : list_identifiers::<TimelineId>(storage, remote_path, cancel).await
325 130 : }
326 :
327 34 : async fn do_download_index_part(
328 34 : storage: &GenericRemoteStorage,
329 34 : tenant_shard_id: &TenantShardId,
330 34 : timeline_id: &TimelineId,
331 34 : index_generation: Generation,
332 34 : cancel: &CancellationToken,
333 34 : ) -> Result<(IndexPart, Generation), DownloadError> {
334 34 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
335 :
336 34 : let index_part_bytes = download_retry_forever(
337 34 : || async {
338 54 : let download = storage.download(&remote_path, cancel).await?;
339 34 :
340 34 : let mut bytes = Vec::new();
341 20 :
342 20 : let stream = download.download_stream;
343 20 : let mut stream = StreamReader::new(stream);
344 20 :
345 40 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
346 34 :
347 34 : Ok(bytes)
348 34 : },
349 34 : &format!("download {remote_path:?}"),
350 34 : cancel,
351 34 : )
352 94 : .await?;
353 :
354 20 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
355 20 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
356 20 : .map_err(DownloadError::Other)?;
357 :
358 20 : Ok((index_part, index_generation))
359 34 : }
360 :
361 : /// index_part.json objects are suffixed with a generation number, so we cannot
362 : /// directly GET the latest index part without doing some probing.
363 : ///
364 : /// In this function we probe for the most recent index in a generation <= our current generation.
365 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
366 40 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
367 : pub(crate) async fn download_index_part(
368 : storage: &GenericRemoteStorage,
369 : tenant_shard_id: &TenantShardId,
370 : timeline_id: &TimelineId,
371 : my_generation: Generation,
372 : cancel: &CancellationToken,
373 : ) -> Result<(IndexPart, Generation), DownloadError> {
374 : debug_assert_current_span_has_tenant_and_timeline_id();
375 :
376 : if my_generation.is_none() {
377 : // Operating without generations: just fetch the generation-less path
378 : return do_download_index_part(
379 : storage,
380 : tenant_shard_id,
381 : timeline_id,
382 : my_generation,
383 : cancel,
384 : )
385 : .await;
386 : }
387 :
388 : // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
389 : // index in our generation.
390 : //
391 : // This is an optimization to avoid doing the listing for the general case below.
392 : let res =
393 : do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
394 : match res {
395 : Ok(index_part) => {
396 : tracing::debug!(
397 : "Found index_part from current generation (this is a stale attachment)"
398 : );
399 : return Ok(index_part);
400 : }
401 : Err(DownloadError::NotFound) => {}
402 : Err(e) => return Err(e),
403 : };
404 :
405 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded
406 : // and index part. We may safely start from this index without doing a listing, because:
407 : // - We checked for current generation case above
408 : // - generations > my_generation are to be ignored
409 : // - any other indices that exist would have an older generation than `previous_gen`, and
410 : // we want to find the most recent index from a previous generation.
411 : //
412 : // This is an optimization to avoid doing the listing for the general case below.
413 : let res = do_download_index_part(
414 : storage,
415 : tenant_shard_id,
416 : timeline_id,
417 : my_generation.previous(),
418 : cancel,
419 : )
420 : .await;
421 : match res {
422 : Ok(index_part) => {
423 : tracing::debug!("Found index_part from previous generation");
424 : return Ok(index_part);
425 : }
426 : Err(DownloadError::NotFound) => {
427 : tracing::debug!(
428 : "No index_part found from previous generation, falling back to listing"
429 : );
430 : }
431 : Err(e) => {
432 : return Err(e);
433 : }
434 : }
435 :
436 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
437 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
438 : // to constructing a full index path with no generation, because the generation is a suffix.
439 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
440 :
441 : let indices = download_retry(
442 6 : || async {
443 6 : storage
444 6 : .list(Some(&index_prefix), ListingMode::NoDelimiter, None, cancel)
445 6 : .await
446 6 : },
447 : "list index_part files",
448 : cancel,
449 : )
450 : .await?
451 : .keys;
452 :
453 : // General case logic for which index to use: the latest index whose generation
454 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
455 : let max_previous_generation = indices
456 : .into_iter()
457 : .filter_map(parse_remote_index_path)
458 12 : .filter(|g| g <= &my_generation)
459 : .max();
460 :
461 : match max_previous_generation {
462 : Some(g) => {
463 : tracing::debug!("Found index_part in generation {g:?}");
464 : do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
465 : }
466 : None => {
467 : // Migration from legacy pre-generation state: we have a generation but no prior
468 : // attached pageservers did. Try to load from a no-generation path.
469 : tracing::debug!("No index_part.json* found");
470 : do_download_index_part(
471 : storage,
472 : tenant_shard_id,
473 : timeline_id,
474 : Generation::none(),
475 : cancel,
476 : )
477 : .await
478 : }
479 : }
480 : }
481 :
482 2 : pub(crate) async fn download_initdb_tar_zst(
483 2 : conf: &'static PageServerConf,
484 2 : storage: &GenericRemoteStorage,
485 2 : tenant_shard_id: &TenantShardId,
486 2 : timeline_id: &TimelineId,
487 2 : cancel: &CancellationToken,
488 2 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
489 2 : debug_assert_current_span_has_tenant_and_timeline_id();
490 2 :
491 2 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
492 2 :
493 2 : let remote_preserved_path =
494 2 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
495 2 :
496 2 : let timeline_path = conf.timelines_path(tenant_shard_id);
497 2 :
498 2 : if !timeline_path.exists() {
499 0 : tokio::fs::create_dir_all(&timeline_path)
500 0 : .await
501 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
502 0 : .map_err(DownloadError::Other)?;
503 2 : }
504 2 : let temp_path = timeline_path.join(format!(
505 2 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
506 2 : ));
507 :
508 2 : let file = download_retry(
509 2 : || async {
510 2 : let file = OpenOptions::new()
511 2 : .create(true)
512 2 : .truncate(true)
513 2 : .read(true)
514 2 : .write(true)
515 2 : .open(&temp_path)
516 2 : .await
517 2 : .with_context(|| format!("tempfile creation {temp_path}"))
518 2 : .map_err(DownloadError::Other)?;
519 2 :
520 4 : let download = match storage.download(&remote_path, cancel).await {
521 2 : Ok(dl) => dl,
522 2 : Err(DownloadError::NotFound) => {
523 2 : storage.download(&remote_preserved_path, cancel).await?
524 2 : }
525 2 : Err(other) => Err(other)?,
526 2 : };
527 2 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
528 2 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
529 2 :
530 727 : tokio::io::copy_buf(&mut download, &mut writer).await?;
531 2 :
532 2 : let mut file = writer.into_inner();
533 2 :
534 2 : file.seek(std::io::SeekFrom::Start(0))
535 2 : .await
536 2 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
537 2 : .map_err(DownloadError::Other)?;
538 2 :
539 2 : Ok(file)
540 2 : },
541 2 : &format!("download {remote_path}"),
542 2 : cancel,
543 2 : )
544 735 : .await
545 2 : .map_err(|e| {
546 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
547 : // We don't have async here nor do we want to pile on any extra errors.
548 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
549 0 : if e.kind() != std::io::ErrorKind::NotFound {
550 0 : warn!("error deleting temporary file {temp_path}: {e}");
551 0 : }
552 0 : }
553 0 : e
554 2 : })?;
555 :
556 2 : Ok((temp_path, file))
557 2 : }
558 :
559 : /// Helper function to handle retries for a download operation.
560 : ///
561 : /// Remote operations can fail due to rate limits (S3), spurious network
562 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
563 : /// with backoff.
564 : ///
565 : /// (See similar logic for uploads in `perform_upload_task`)
566 14 : pub(super) async fn download_retry<T, O, F>(
567 14 : op: O,
568 14 : description: &str,
569 14 : cancel: &CancellationToken,
570 14 : ) -> Result<T, DownloadError>
571 14 : where
572 14 : O: FnMut() -> F,
573 14 : F: Future<Output = Result<T, DownloadError>>,
574 14 : {
575 14 : backoff::retry(
576 14 : op,
577 14 : DownloadError::is_permanent,
578 14 : FAILED_DOWNLOAD_WARN_THRESHOLD,
579 14 : FAILED_REMOTE_OP_RETRIES,
580 14 : description,
581 14 : cancel,
582 14 : )
583 813 : .await
584 14 : .ok_or_else(|| DownloadError::Cancelled)
585 14 : .and_then(|x| x)
586 14 : }
587 :
588 164 : async fn download_retry_forever<T, O, F>(
589 164 : op: O,
590 164 : description: &str,
591 164 : cancel: &CancellationToken,
592 164 : ) -> Result<T, DownloadError>
593 164 : where
594 164 : O: FnMut() -> F,
595 164 : F: Future<Output = Result<T, DownloadError>>,
596 164 : {
597 164 : backoff::retry(
598 164 : op,
599 164 : DownloadError::is_permanent,
600 164 : FAILED_DOWNLOAD_WARN_THRESHOLD,
601 164 : u32::MAX,
602 164 : description,
603 164 : cancel,
604 164 : )
605 598 : .await
606 164 : .ok_or_else(|| DownloadError::Cancelled)
607 164 : .and_then(|x| x)
608 164 : }
|