Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::str::FromStr;
9 : use std::time::SystemTime;
10 :
11 : use anyhow::{anyhow, Context};
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use pageserver_api::shard::TenantShardId;
14 : use tokio::fs::{self, File, OpenOptions};
15 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
16 : use tokio_util::io::StreamReader;
17 : use tokio_util::sync::CancellationToken;
18 : use tracing::warn;
19 : use utils::backoff;
20 :
21 : use crate::config::PageServerConf;
22 : use crate::context::RequestContext;
23 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
24 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
25 : use crate::tenant::storage_layer::LayerName;
26 : use crate::tenant::Generation;
27 : #[cfg_attr(target_os = "macos", allow(unused_imports))]
28 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
29 : use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
30 : use crate::TEMP_FILE_SUFFIX;
31 : use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath};
32 : use utils::crashsafe::path_with_suffix_extension;
33 : use utils::id::{TenantId, TimelineId};
34 : use utils::pausable_failpoint;
35 :
36 : use super::index::{IndexPart, LayerFileMetadata};
37 : use super::manifest::TenantManifest;
38 : use super::{
39 : parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
40 : remote_initdb_preserved_archive_path, remote_tenant_manifest_path, remote_tenant_path,
41 : FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
42 : };
43 :
44 : ///
45 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
46 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
47 : ///
48 : /// Returns the size of the downloaded file.
49 : #[allow(clippy::too_many_arguments)]
50 6 : pub async fn download_layer_file<'a>(
51 6 : conf: &'static PageServerConf,
52 6 : storage: &'a GenericRemoteStorage,
53 6 : tenant_shard_id: TenantShardId,
54 6 : timeline_id: TimelineId,
55 6 : layer_file_name: &'a LayerName,
56 6 : layer_metadata: &'a LayerFileMetadata,
57 6 : local_path: &Utf8Path,
58 6 : cancel: &CancellationToken,
59 6 : ctx: &RequestContext,
60 6 : ) -> Result<u64, DownloadError> {
61 6 : debug_assert_current_span_has_tenant_and_timeline_id();
62 6 :
63 6 : let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
64 6 :
65 6 : let remote_path = remote_layer_path(
66 6 : &tenant_shard_id.tenant_id,
67 6 : &timeline_id,
68 6 : layer_metadata.shard,
69 6 : layer_file_name,
70 6 : layer_metadata.generation,
71 6 : );
72 6 :
73 6 : // Perform a rename inspired by durable_rename from file_utils.c.
74 6 : // The sequence:
75 6 : // write(tmp)
76 6 : // fsync(tmp)
77 6 : // rename(tmp, new)
78 6 : // fsync(new)
79 6 : // fsync(parent)
80 6 : // For more context about durable_rename check this email from postgres mailing list:
81 6 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
82 6 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
83 6 : let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
84 :
85 6 : let bytes_amount = download_retry(
86 51 : || async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await },
87 6 : &format!("download {remote_path:?}"),
88 6 : cancel,
89 6 : )
90 51 : .await?;
91 :
92 6 : let expected = layer_metadata.file_size;
93 6 : if expected != bytes_amount {
94 0 : return Err(DownloadError::Other(anyhow!(
95 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
96 0 : )));
97 6 : }
98 6 :
99 6 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
100 0 : Err(DownloadError::Other(anyhow!(
101 0 : "remote-storage-download-pre-rename failpoint triggered"
102 0 : )))
103 6 : });
104 :
105 6 : fs::rename(&temp_file_path, &local_path)
106 5 : .await
107 6 : .with_context(|| format!("rename download layer file to {local_path}"))
108 6 : .map_err(DownloadError::Other)?;
109 :
110 : // We use fatal_err() below because the after the rename above,
111 : // the in-memory state of the filesystem already has the layer file in its final place,
112 : // and subsequent pageserver code could think it's durable while it really isn't.
113 6 : let work = {
114 6 : let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
115 6 : async move {
116 6 : let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
117 3 : .await
118 6 : .fatal_err("VirtualFile::open for timeline dir fsync");
119 6 : timeline_dir
120 6 : .sync_all()
121 3 : .await
122 6 : .fatal_err("VirtualFile::sync_all timeline dir");
123 6 : }
124 : };
125 6 : crate::virtual_file::io_engine::get()
126 6 : .spawn_blocking_and_block_on_if_std(work)
127 9 : .await;
128 :
129 6 : tracing::debug!("download complete: {local_path}");
130 :
131 6 : Ok(bytes_amount)
132 6 : }
133 :
134 : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
135 : ///
136 : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
137 : /// (Note that the directory entry for the inode is not made durable.)
138 : /// The file size in bytes is returned.
139 : ///
140 : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
141 : /// The unlinking has _not_ been made durable.
142 6 : async fn download_object<'a>(
143 6 : storage: &'a GenericRemoteStorage,
144 6 : src_path: &RemotePath,
145 6 : dst_path: &Utf8PathBuf,
146 6 : cancel: &CancellationToken,
147 6 : #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
148 6 : ) -> Result<u64, DownloadError> {
149 6 : let res = match crate::virtual_file::io_engine::get() {
150 0 : crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
151 : crate::virtual_file::io_engine::IoEngine::StdFs => {
152 3 : async {
153 3 : let destination_file = tokio::fs::File::create(dst_path)
154 2 : .await
155 3 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
156 3 : .map_err(DownloadError::Other)?;
157 :
158 3 : let download = storage
159 3 : .download(src_path, &DownloadOpts::default(), cancel)
160 3 : .await?;
161 :
162 3 : pausable_failpoint!("before-downloading-layer-stream-pausable");
163 :
164 3 : let mut buf_writer =
165 3 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
166 3 :
167 3 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
168 :
169 14 : let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
170 3 : buf_writer.flush().await?;
171 :
172 3 : let mut destination_file = buf_writer.into_inner();
173 3 :
174 3 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
175 3 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
176 3 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
177 3 : // you should call flush before dropping it.
178 3 : //
179 3 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
180 3 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
181 3 : // But for additional safety lets check/wait for any pending operations.
182 3 : destination_file
183 3 : .flush()
184 0 : .await
185 3 : .maybe_fatal_err("download_object sync_all")
186 3 : .with_context(|| format!("flush source file at {dst_path}"))
187 3 : .map_err(DownloadError::Other)?;
188 :
189 : // not using sync_data because it can lose file size update
190 3 : destination_file
191 3 : .sync_all()
192 3 : .await
193 3 : .maybe_fatal_err("download_object sync_all")
194 3 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
195 3 : .map_err(DownloadError::Other)?;
196 :
197 3 : Ok(bytes_amount)
198 3 : }
199 24 : .await
200 : }
201 : #[cfg(target_os = "linux")]
202 : crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
203 : use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
204 : use bytes::BytesMut;
205 3 : async {
206 3 : let destination_file = VirtualFile::create(dst_path, ctx)
207 3 : .await
208 3 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
209 3 : .map_err(DownloadError::Other)?;
210 :
211 3 : let mut download = storage
212 3 : .download(src_path, &DownloadOpts::default(), cancel)
213 4 : .await?;
214 :
215 3 : pausable_failpoint!("before-downloading-layer-stream-pausable");
216 :
217 : // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
218 : // There's chunks_vectored() on the stream.
219 3 : let (bytes_amount, destination_file) = async {
220 3 : let size_tracking = size_tracking_writer::Writer::new(destination_file);
221 3 : let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
222 3 : size_tracking,
223 3 : BytesMut::with_capacity(super::BUFFER_SIZE),
224 3 : );
225 18 : while let Some(res) =
226 21 : futures::StreamExt::next(&mut download.download_stream).await
227 : {
228 18 : let chunk = match res {
229 18 : Ok(chunk) => chunk,
230 0 : Err(e) => return Err(e),
231 : };
232 18 : buffered.write_buffered(chunk.slice_len(), ctx).await?;
233 : }
234 3 : let size_tracking = buffered.flush_and_into_inner(ctx).await?;
235 3 : Ok(size_tracking.into_inner())
236 3 : }
237 15 : .await?;
238 :
239 : // not using sync_data because it can lose file size update
240 3 : destination_file
241 3 : .sync_all()
242 3 : .await
243 3 : .maybe_fatal_err("download_object sync_all")
244 3 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
245 3 : .map_err(DownloadError::Other)?;
246 :
247 3 : Ok(bytes_amount)
248 3 : }
249 27 : .await
250 : }
251 : };
252 :
253 : // in case the download failed, clean up
254 6 : match res {
255 6 : Ok(bytes_amount) => Ok(bytes_amount),
256 0 : Err(e) => {
257 0 : if let Err(e) = tokio::fs::remove_file(dst_path).await {
258 0 : if e.kind() != std::io::ErrorKind::NotFound {
259 0 : on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
260 0 : }
261 0 : }
262 0 : Err(e)
263 : }
264 : }
265 6 : }
266 :
267 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
268 :
269 0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
270 0 : let extension = path.extension();
271 0 : match extension {
272 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
273 0 : Some(_) => false,
274 0 : None => false,
275 : }
276 0 : }
277 :
278 186 : async fn list_identifiers<T>(
279 186 : storage: &GenericRemoteStorage,
280 186 : prefix: RemotePath,
281 186 : cancel: CancellationToken,
282 186 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
283 186 : where
284 186 : T: FromStr + Eq + std::hash::Hash,
285 186 : {
286 186 : let listing = download_retry_forever(
287 186 : || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
288 186 : &format!("list identifiers in prefix {prefix}"),
289 186 : &cancel,
290 186 : )
291 716 : .await?;
292 :
293 186 : let mut parsed_ids = HashSet::new();
294 186 : let mut other_prefixes = HashSet::new();
295 :
296 192 : for id_remote_storage_key in listing.prefixes {
297 6 : let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
298 0 : anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
299 6 : })?;
300 :
301 6 : match object_name.parse::<T>() {
302 6 : Ok(t) => parsed_ids.insert(t),
303 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
304 : };
305 : }
306 :
307 186 : for object in listing.keys {
308 0 : let object_name = object
309 0 : .key
310 0 : .object_name()
311 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
312 0 : other_prefixes.insert(object_name.to_string());
313 : }
314 :
315 186 : Ok((parsed_ids, other_prefixes))
316 186 : }
317 :
318 : /// List shards of given tenant in remote storage
319 0 : pub(crate) async fn list_remote_tenant_shards(
320 0 : storage: &GenericRemoteStorage,
321 0 : tenant_id: TenantId,
322 0 : cancel: CancellationToken,
323 0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
324 0 : let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
325 0 : list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
326 0 : }
327 :
328 : /// List timelines of given tenant shard in remote storage
329 186 : pub async fn list_remote_timelines(
330 186 : storage: &GenericRemoteStorage,
331 186 : tenant_shard_id: TenantShardId,
332 186 : cancel: CancellationToken,
333 186 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
334 186 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
335 0 : anyhow::bail!("storage-sync-list-remote-timelines");
336 186 : });
337 :
338 186 : let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
339 716 : list_identifiers::<TimelineId>(storage, remote_path, cancel).await
340 186 : }
341 :
342 220 : async fn do_download_remote_path_retry_forever(
343 220 : storage: &GenericRemoteStorage,
344 220 : remote_path: &RemotePath,
345 220 : cancel: &CancellationToken,
346 220 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
347 220 : download_retry_forever(
348 220 : || async {
349 220 : let download = storage
350 220 : .download(remote_path, &DownloadOpts::default(), cancel)
351 231 : .await?;
352 :
353 20 : let mut bytes = Vec::new();
354 20 :
355 20 : let stream = download.download_stream;
356 20 : let mut stream = StreamReader::new(stream);
357 20 :
358 20 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
359 :
360 20 : Ok((bytes, download.last_modified))
361 440 : },
362 220 : &format!("download {remote_path:?}"),
363 220 : cancel,
364 220 : )
365 250 : .await
366 220 : }
367 :
368 186 : pub async fn do_download_tenant_manifest(
369 186 : storage: &GenericRemoteStorage,
370 186 : tenant_shard_id: &TenantShardId,
371 186 : cancel: &CancellationToken,
372 186 : ) -> Result<(TenantManifest, Generation), DownloadError> {
373 186 : // TODO: generation support
374 186 : let generation = super::TENANT_MANIFEST_GENERATION;
375 186 : let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
376 :
377 0 : let (manifest_bytes, _manifest_bytes_mtime) =
378 186 : do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
379 :
380 0 : let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
381 0 : .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
382 0 : .map_err(DownloadError::Other)?;
383 :
384 0 : Ok((tenant_manifest, generation))
385 186 : }
386 :
387 34 : async fn do_download_index_part(
388 34 : storage: &GenericRemoteStorage,
389 34 : tenant_shard_id: &TenantShardId,
390 34 : timeline_id: &TimelineId,
391 34 : index_generation: Generation,
392 34 : cancel: &CancellationToken,
393 34 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
394 34 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
395 :
396 20 : let (index_part_bytes, index_part_mtime) =
397 70 : do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
398 :
399 20 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
400 20 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
401 20 : .map_err(DownloadError::Other)?;
402 :
403 20 : Ok((index_part, index_generation, index_part_mtime))
404 34 : }
405 :
406 : /// index_part.json objects are suffixed with a generation number, so we cannot
407 : /// directly GET the latest index part without doing some probing.
408 : ///
409 : /// In this function we probe for the most recent index in a generation <= our current generation.
410 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
411 20 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
412 : pub(crate) async fn download_index_part(
413 : storage: &GenericRemoteStorage,
414 : tenant_shard_id: &TenantShardId,
415 : timeline_id: &TimelineId,
416 : my_generation: Generation,
417 : cancel: &CancellationToken,
418 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
419 : debug_assert_current_span_has_tenant_and_timeline_id();
420 :
421 : if my_generation.is_none() {
422 : // Operating without generations: just fetch the generation-less path
423 : return do_download_index_part(
424 : storage,
425 : tenant_shard_id,
426 : timeline_id,
427 : my_generation,
428 : cancel,
429 : )
430 : .await;
431 : }
432 :
433 : // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
434 : // index in our generation.
435 : //
436 : // This is an optimization to avoid doing the listing for the general case below.
437 : let res =
438 : do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
439 : match res {
440 : Ok(index_part) => {
441 : tracing::debug!(
442 : "Found index_part from current generation (this is a stale attachment)"
443 : );
444 : return Ok(index_part);
445 : }
446 : Err(DownloadError::NotFound) => {}
447 : Err(e) => return Err(e),
448 : };
449 :
450 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded
451 : // and index part. We may safely start from this index without doing a listing, because:
452 : // - We checked for current generation case above
453 : // - generations > my_generation are to be ignored
454 : // - any other indices that exist would have an older generation than `previous_gen`, and
455 : // we want to find the most recent index from a previous generation.
456 : //
457 : // This is an optimization to avoid doing the listing for the general case below.
458 : let res = do_download_index_part(
459 : storage,
460 : tenant_shard_id,
461 : timeline_id,
462 : my_generation.previous(),
463 : cancel,
464 : )
465 : .await;
466 : match res {
467 : Ok(index_part) => {
468 : tracing::debug!("Found index_part from previous generation");
469 : return Ok(index_part);
470 : }
471 : Err(DownloadError::NotFound) => {
472 : tracing::debug!(
473 : "No index_part found from previous generation, falling back to listing"
474 : );
475 : }
476 : Err(e) => {
477 : return Err(e);
478 : }
479 : }
480 :
481 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
482 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
483 : // to constructing a full index path with no generation, because the generation is a suffix.
484 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
485 :
486 : let indices = download_retry(
487 6 : || async {
488 6 : storage
489 6 : .list(Some(&index_prefix), ListingMode::NoDelimiter, None, cancel)
490 6 : .await
491 12 : },
492 : "list index_part files",
493 : cancel,
494 : )
495 : .await?
496 : .keys;
497 :
498 : // General case logic for which index to use: the latest index whose generation
499 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
500 : let max_previous_generation = indices
501 : .into_iter()
502 18 : .filter_map(|o| parse_remote_index_path(o.key))
503 12 : .filter(|g| g <= &my_generation)
504 : .max();
505 :
506 : match max_previous_generation {
507 : Some(g) => {
508 : tracing::debug!("Found index_part in generation {g:?}");
509 : do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
510 : }
511 : None => {
512 : // Migration from legacy pre-generation state: we have a generation but no prior
513 : // attached pageservers did. Try to load from a no-generation path.
514 : tracing::debug!("No index_part.json* found");
515 : do_download_index_part(
516 : storage,
517 : tenant_shard_id,
518 : timeline_id,
519 : Generation::none(),
520 : cancel,
521 : )
522 : .await
523 : }
524 : }
525 : }
526 :
527 2 : pub(crate) async fn download_initdb_tar_zst(
528 2 : conf: &'static PageServerConf,
529 2 : storage: &GenericRemoteStorage,
530 2 : tenant_shard_id: &TenantShardId,
531 2 : timeline_id: &TimelineId,
532 2 : cancel: &CancellationToken,
533 2 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
534 2 : debug_assert_current_span_has_tenant_and_timeline_id();
535 2 :
536 2 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
537 2 :
538 2 : let remote_preserved_path =
539 2 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
540 2 :
541 2 : let timeline_path = conf.timelines_path(tenant_shard_id);
542 2 :
543 2 : if !timeline_path.exists() {
544 0 : tokio::fs::create_dir_all(&timeline_path)
545 0 : .await
546 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
547 0 : .map_err(DownloadError::Other)?;
548 2 : }
549 2 : let temp_path = timeline_path.join(format!(
550 2 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
551 2 : ));
552 :
553 2 : let file = download_retry(
554 2 : || async {
555 2 : let file = OpenOptions::new()
556 2 : .create(true)
557 2 : .truncate(true)
558 2 : .read(true)
559 2 : .write(true)
560 2 : .open(&temp_path)
561 2 : .await
562 2 : .with_context(|| format!("tempfile creation {temp_path}"))
563 2 : .map_err(DownloadError::Other)?;
564 :
565 2 : let download = match storage
566 2 : .download(&remote_path, &DownloadOpts::default(), cancel)
567 4 : .await
568 : {
569 2 : Ok(dl) => dl,
570 : Err(DownloadError::NotFound) => {
571 0 : storage
572 0 : .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
573 0 : .await?
574 : }
575 0 : Err(other) => Err(other)?,
576 : };
577 2 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
578 2 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
579 2 :
580 414 : tokio::io::copy_buf(&mut download, &mut writer).await?;
581 :
582 2 : let mut file = writer.into_inner();
583 2 :
584 2 : file.seek(std::io::SeekFrom::Start(0))
585 1 : .await
586 2 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
587 2 : .map_err(DownloadError::Other)?;
588 :
589 2 : Ok(file)
590 4 : },
591 2 : &format!("download {remote_path}"),
592 2 : cancel,
593 2 : )
594 421 : .await
595 2 : .inspect_err(|_e| {
596 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
597 : // We don't have async here nor do we want to pile on any extra errors.
598 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
599 0 : if e.kind() != std::io::ErrorKind::NotFound {
600 0 : warn!("error deleting temporary file {temp_path}: {e}");
601 0 : }
602 0 : }
603 2 : })?;
604 :
605 2 : Ok((temp_path, file))
606 2 : }
607 :
608 : /// Helper function to handle retries for a download operation.
609 : ///
610 : /// Remote operations can fail due to rate limits (S3), spurious network
611 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
612 : /// with backoff.
613 : ///
614 : /// (See similar logic for uploads in `perform_upload_task`)
615 14 : pub(super) async fn download_retry<T, O, F>(
616 14 : op: O,
617 14 : description: &str,
618 14 : cancel: &CancellationToken,
619 14 : ) -> Result<T, DownloadError>
620 14 : where
621 14 : O: FnMut() -> F,
622 14 : F: Future<Output = Result<T, DownloadError>>,
623 14 : {
624 14 : backoff::retry(
625 14 : op,
626 14 : DownloadError::is_permanent,
627 14 : FAILED_DOWNLOAD_WARN_THRESHOLD,
628 14 : FAILED_REMOTE_OP_RETRIES,
629 14 : description,
630 14 : cancel,
631 14 : )
632 478 : .await
633 14 : .ok_or_else(|| DownloadError::Cancelled)
634 14 : .and_then(|x| x)
635 14 : }
636 :
637 406 : async fn download_retry_forever<T, O, F>(
638 406 : op: O,
639 406 : description: &str,
640 406 : cancel: &CancellationToken,
641 406 : ) -> Result<T, DownloadError>
642 406 : where
643 406 : O: FnMut() -> F,
644 406 : F: Future<Output = Result<T, DownloadError>>,
645 406 : {
646 406 : backoff::retry(
647 406 : op,
648 406 : DownloadError::is_permanent,
649 406 : FAILED_DOWNLOAD_WARN_THRESHOLD,
650 406 : u32::MAX,
651 406 : description,
652 406 : cancel,
653 406 : )
654 966 : .await
655 406 : .ok_or_else(|| DownloadError::Cancelled)
656 406 : .and_then(|x| x)
657 406 : }
|