Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::str::FromStr;
9 : use std::sync::atomic::AtomicU64;
10 : use std::time::SystemTime;
11 :
12 : use anyhow::{Context, anyhow};
13 : use camino::{Utf8Path, Utf8PathBuf};
14 : use pageserver_api::shard::TenantShardId;
15 : use remote_storage::{
16 : DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
17 : };
18 : use tokio::fs::{self, File, OpenOptions};
19 : use tokio::io::AsyncSeekExt;
20 : use tokio_util::io::StreamReader;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::warn;
23 : use utils::crashsafe::path_with_suffix_extension;
24 : use utils::id::{TenantId, TimelineId};
25 : use utils::{backoff, pausable_failpoint};
26 :
27 : use super::index::{IndexPart, LayerFileMetadata};
28 : use super::manifest::TenantManifest;
29 : use super::{
30 : FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
31 : parse_remote_tenant_manifest_path, remote_index_path, remote_initdb_archive_path,
32 : remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
33 : remote_tenant_manifest_prefix, remote_tenant_path,
34 : };
35 : use crate::TEMP_FILE_SUFFIX;
36 : use crate::config::PageServerConf;
37 : use crate::context::RequestContext;
38 : use crate::span::{
39 : debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
40 : };
41 : use crate::tenant::Generation;
42 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
43 : use crate::tenant::storage_layer::LayerName;
44 : use crate::virtual_file;
45 : use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
46 : use crate::virtual_file::{IoBufferMut, MaybeFatalIo, VirtualFile};
47 : use crate::virtual_file::{TempVirtualFile, owned_buffers_io};
48 :
49 : ///
50 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
51 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
52 : ///
53 : /// Returns the size of the downloaded file.
54 : #[allow(clippy::too_many_arguments)]
55 7 : pub async fn download_layer_file<'a>(
56 7 : conf: &'static PageServerConf,
57 7 : storage: &'a GenericRemoteStorage,
58 7 : tenant_shard_id: TenantShardId,
59 7 : timeline_id: TimelineId,
60 7 : layer_file_name: &'a LayerName,
61 7 : layer_metadata: &'a LayerFileMetadata,
62 7 : local_path: &Utf8Path,
63 7 : gate: &utils::sync::gate::Gate,
64 7 : cancel: &CancellationToken,
65 7 : ctx: &RequestContext,
66 7 : ) -> Result<u64, DownloadError> {
67 7 : debug_assert_current_span_has_tenant_and_timeline_id();
68 :
69 7 : let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
70 :
71 7 : let remote_path = remote_layer_path(
72 7 : &tenant_shard_id.tenant_id,
73 7 : &timeline_id,
74 7 : layer_metadata.shard,
75 7 : layer_file_name,
76 7 : layer_metadata.generation,
77 : );
78 :
79 7 : let (bytes_amount, temp_file) = download_retry(
80 7 : || async {
81 : // TempVirtualFile requires us to never reuse a filename while an old
82 : // instance of TempVirtualFile created with that filename is not done dropping yet.
83 : // So, we use a monotonic counter to disambiguate the filenames.
84 : static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
85 7 : let filename_disambiguator =
86 7 : NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
87 :
88 7 : let temp_file_path = path_with_suffix_extension(
89 7 : local_path,
90 7 : &format!("{filename_disambiguator:x}.{TEMP_DOWNLOAD_EXTENSION}"),
91 : );
92 :
93 7 : let temp_file = TempVirtualFile::new(
94 7 : VirtualFile::open_with_options_v2(
95 7 : &temp_file_path,
96 7 : virtual_file::OpenOptions::new()
97 7 : .create_new(true)
98 7 : .write(true),
99 7 : ctx,
100 7 : )
101 7 : .await
102 7 : .with_context(|| format!("create a temp file for layer download: {temp_file_path}"))
103 7 : .map_err(DownloadError::Other)?,
104 7 : gate.enter().map_err(|_| DownloadError::Cancelled)?,
105 : );
106 7 : download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await
107 14 : },
108 7 : &format!("download {remote_path:?}"),
109 7 : cancel,
110 : )
111 7 : .await?;
112 :
113 7 : let expected = layer_metadata.file_size;
114 7 : if expected != bytes_amount {
115 0 : return Err(DownloadError::Other(anyhow!(
116 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}",
117 0 : temp_file.path()
118 0 : )));
119 7 : }
120 :
121 7 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
122 0 : Err(DownloadError::Other(anyhow!(
123 0 : "remote-storage-download-pre-rename failpoint triggered"
124 0 : )))
125 0 : });
126 :
127 : // Try rename before disarming the temp file.
128 : // That way, if rename fails for whatever reason, we clean up the temp file on the return path.
129 :
130 7 : fs::rename(temp_file.path(), &local_path)
131 7 : .await
132 7 : .with_context(|| format!("rename download layer file to {local_path}"))
133 7 : .map_err(DownloadError::Other)?;
134 :
135 : // The temp file's VirtualFile points to the temp_file_path which we moved above.
136 : // Drop it immediately, it's invalid.
137 : // This will get better in https://github.com/neondatabase/neon/issues/11692
138 7 : let _: VirtualFile = temp_file.disarm_into_inner();
139 : // NB: The gate guard that was stored in `temp_file` is dropped but we continue
140 : // to operate on it and on the parent timeline directory.
141 : // Those operations are safe to do because higher-level code is holding another gate guard:
142 : // - attached mode: the download task spawned by struct Layer is holding the gate guard
143 : // - secondary mode: The TenantDownloader::download holds the gate open
144 :
145 : // The rename above is not durable yet.
146 : // It doesn't matter for crash consistency because pageserver startup deletes temp
147 : // files and we'll re-download on demand if necessary.
148 :
149 : // We use fatal_err() below because the after the rename above,
150 : // the in-memory state of the filesystem already has the layer file in its final place,
151 : // and subsequent pageserver code could think it's durable while it really isn't.
152 7 : let work = {
153 7 : let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
154 7 : async move {
155 7 : let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
156 7 : .await
157 7 : .fatal_err("VirtualFile::open for timeline dir fsync");
158 7 : timeline_dir
159 7 : .sync_all()
160 7 : .await
161 7 : .fatal_err("VirtualFile::sync_all timeline dir");
162 7 : }
163 : };
164 7 : crate::virtual_file::io_engine::get()
165 7 : .spawn_blocking_and_block_on_if_std(work)
166 7 : .await;
167 :
168 7 : tracing::debug!("download complete: {local_path}");
169 :
170 7 : Ok(bytes_amount)
171 7 : }
172 :
173 : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
174 : ///
175 : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
176 : /// (Note that the directory entry for the inode is not made durable.)
177 : /// The file size in bytes is returned.
178 : ///
179 : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
180 : /// The unlinking has _not_ been made durable.
181 7 : async fn download_object(
182 7 : storage: &GenericRemoteStorage,
183 7 : src_path: &RemotePath,
184 7 : destination_file: TempVirtualFile,
185 7 : gate: &utils::sync::gate::Gate,
186 7 : cancel: &CancellationToken,
187 7 : ctx: &RequestContext,
188 7 : ) -> Result<(u64, TempVirtualFile), DownloadError> {
189 7 : let mut download = storage
190 7 : .download(src_path, &DownloadOpts::default(), cancel)
191 7 : .await?;
192 :
193 7 : pausable_failpoint!("before-downloading-layer-stream-pausable");
194 :
195 7 : let dst_path = destination_file.path().to_owned();
196 7 : let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
197 7 : destination_file,
198 : 0,
199 14 : || IoBufferMut::with_capacity(super::BUFFER_SIZE),
200 7 : gate.enter().map_err(|_| DownloadError::Cancelled)?,
201 7 : cancel.child_token(),
202 7 : ctx,
203 7 : tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
204 : );
205 :
206 : // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
207 : // There's chunks_vectored() on the stream.
208 7 : let (bytes_amount, destination_file) = async {
209 49 : while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await {
210 42 : let chunk = match res {
211 42 : Ok(chunk) => chunk,
212 0 : Err(e) => return Err(DownloadError::from(e)),
213 : };
214 42 : buffered
215 42 : .write_buffered_borrowed(&chunk, ctx)
216 42 : .await
217 42 : .map_err(|e| match e {
218 0 : FlushTaskError::Cancelled => DownloadError::Cancelled,
219 0 : })?;
220 : }
221 7 : buffered
222 7 : .shutdown(
223 7 : owned_buffers_io::write::BufferedWriterShutdownMode::PadThenTruncate,
224 7 : ctx,
225 7 : )
226 7 : .await
227 7 : .map_err(|e| match e {
228 0 : FlushTaskError::Cancelled => DownloadError::Cancelled,
229 0 : })
230 7 : }
231 7 : .await?;
232 :
233 : // not using sync_data because it can lose file size update
234 7 : destination_file
235 7 : .sync_all()
236 7 : .await
237 7 : .maybe_fatal_err("download_object sync_all")
238 7 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
239 7 : .map_err(DownloadError::Other)?;
240 :
241 7 : Ok((bytes_amount, destination_file))
242 7 : }
243 :
244 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
245 :
246 0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
247 0 : let extension = path.extension();
248 0 : match extension {
249 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
250 0 : Some(_) => false,
251 0 : None => false,
252 : }
253 0 : }
254 :
255 119 : async fn list_identifiers<T>(
256 119 : storage: &GenericRemoteStorage,
257 119 : prefix: RemotePath,
258 119 : cancel: CancellationToken,
259 119 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
260 119 : where
261 119 : T: FromStr + Eq + std::hash::Hash,
262 119 : {
263 119 : let listing = download_retry_forever(
264 119 : || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
265 119 : &format!("list identifiers in prefix {prefix}"),
266 119 : &cancel,
267 : )
268 119 : .await?;
269 :
270 119 : let mut parsed_ids = HashSet::new();
271 119 : let mut other_prefixes = HashSet::new();
272 :
273 122 : for id_remote_storage_key in listing.prefixes {
274 3 : let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
275 0 : anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
276 0 : })?;
277 :
278 3 : match object_name.parse::<T>() {
279 3 : Ok(t) => parsed_ids.insert(t),
280 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
281 : };
282 : }
283 :
284 119 : for object in listing.keys {
285 0 : let object_name = object
286 0 : .key
287 0 : .object_name()
288 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
289 0 : other_prefixes.insert(object_name.to_string());
290 : }
291 :
292 119 : Ok((parsed_ids, other_prefixes))
293 119 : }
294 :
295 : /// List shards of given tenant in remote storage
296 0 : pub(crate) async fn list_remote_tenant_shards(
297 0 : storage: &GenericRemoteStorage,
298 0 : tenant_id: TenantId,
299 0 : cancel: CancellationToken,
300 0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
301 0 : let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
302 0 : list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
303 0 : }
304 :
305 : /// List timelines of given tenant shard in remote storage
306 119 : pub async fn list_remote_timelines(
307 119 : storage: &GenericRemoteStorage,
308 119 : tenant_shard_id: TenantShardId,
309 119 : cancel: CancellationToken,
310 119 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
311 119 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
312 0 : anyhow::bail!("storage-sync-list-remote-timelines");
313 0 : });
314 :
315 119 : let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
316 119 : list_identifiers::<TimelineId>(storage, remote_path, cancel).await
317 119 : }
318 :
319 368 : async fn do_download_remote_path_retry_forever(
320 368 : storage: &GenericRemoteStorage,
321 368 : remote_path: &RemotePath,
322 368 : download_opts: DownloadOpts,
323 368 : cancel: &CancellationToken,
324 368 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
325 368 : download_retry_forever(
326 368 : || async {
327 368 : let download = storage
328 368 : .download(remote_path, &download_opts, cancel)
329 368 : .await?;
330 :
331 13 : let mut bytes = Vec::new();
332 :
333 13 : let stream = download.download_stream;
334 13 : let mut stream = StreamReader::new(stream);
335 :
336 13 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
337 :
338 13 : Ok((bytes, download.last_modified))
339 736 : },
340 368 : &format!("download {remote_path:?}"),
341 368 : cancel,
342 : )
343 368 : .await
344 368 : }
345 :
346 351 : async fn do_download_tenant_manifest(
347 351 : storage: &GenericRemoteStorage,
348 351 : tenant_shard_id: &TenantShardId,
349 351 : _timeline_id: Option<&TimelineId>,
350 351 : generation: Generation,
351 351 : cancel: &CancellationToken,
352 351 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
353 351 : let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
354 :
355 351 : let download_opts = DownloadOpts {
356 351 : kind: DownloadKind::Small,
357 351 : ..Default::default()
358 351 : };
359 :
360 3 : let (manifest_bytes, manifest_bytes_mtime) =
361 351 : do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
362 :
363 3 : let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
364 3 : .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
365 3 : .map_err(DownloadError::Other)?;
366 :
367 3 : Ok((tenant_manifest, generation, manifest_bytes_mtime))
368 351 : }
369 :
370 17 : async fn do_download_index_part(
371 17 : storage: &GenericRemoteStorage,
372 17 : tenant_shard_id: &TenantShardId,
373 17 : timeline_id: Option<&TimelineId>,
374 17 : index_generation: Generation,
375 17 : cancel: &CancellationToken,
376 17 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
377 17 : let timeline_id =
378 17 : timeline_id.expect("A timeline ID is always provided when downloading an index");
379 17 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
380 :
381 17 : let download_opts = DownloadOpts {
382 17 : kind: DownloadKind::Small,
383 17 : ..Default::default()
384 17 : };
385 :
386 10 : let (index_part_bytes, index_part_mtime) =
387 17 : do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
388 :
389 10 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
390 10 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
391 10 : .map_err(DownloadError::Other)?;
392 :
393 10 : Ok((index_part, index_generation, index_part_mtime))
394 17 : }
395 :
396 : /// Metadata objects are "generationed", meaning that they include a generation suffix. This
397 : /// function downloads the object with the highest generation <= `my_generation`.
398 : ///
399 : /// Data objects (layer files) also include a generation in their path, but there is no equivalent
400 : /// search process, because their reference from an index includes the generation.
401 : ///
402 : /// An expensive object listing operation is only done if necessary: the typical fast path is to issue two
403 : /// GET operations, one to our own generation (stale attachment case), and one to the immediately preceding
404 : /// generation (normal case when migrating/restarting). Only if both of these return 404 do we fall back
405 : /// to listing objects.
406 : ///
407 : /// * `my_generation`: the value of `[crate::tenant::TenantShard::generation]`
408 : /// * `what`: for logging, what object are we downloading
409 : /// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
410 : /// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless
411 : /// `cancel`` has fired. This function does not do its own retries of GET operations, and relies
412 : /// on the function passed in to do so.
413 : /// * `parse_path`: parse a fully qualified remote storage path to get the generation of the object.
414 : #[allow(clippy::too_many_arguments)]
415 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
416 : pub(crate) async fn download_generation_object<'a, T, DF, DFF, PF>(
417 : storage: &'a GenericRemoteStorage,
418 : tenant_shard_id: &'a TenantShardId,
419 : timeline_id: Option<&'a TimelineId>,
420 : my_generation: Generation,
421 : what: &str,
422 : prefix: RemotePath,
423 : do_download: DF,
424 : parse_path: PF,
425 : cancel: &'a CancellationToken,
426 : ) -> Result<(T, Generation, SystemTime), DownloadError>
427 : where
428 : DF: Fn(
429 : &'a GenericRemoteStorage,
430 : &'a TenantShardId,
431 : Option<&'a TimelineId>,
432 : Generation,
433 : &'a CancellationToken,
434 : ) -> DFF,
435 : DFF: Future<Output = Result<(T, Generation, SystemTime), DownloadError>>,
436 : PF: Fn(RemotePath) -> Option<Generation>,
437 : T: 'static,
438 : {
439 : debug_assert_current_span_has_tenant_id();
440 :
441 : if my_generation.is_none() {
442 : // Operating without generations: just fetch the generation-less path
443 : return do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
444 : }
445 :
446 : // Stale case: If we were intentionally attached in a stale generation, the remote object may already
447 : // exist in our generation.
448 : //
449 : // This is an optimization to avoid doing the listing for the general case below.
450 : let res = do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
451 : match res {
452 : Ok(decoded) => {
453 : tracing::debug!("Found {what} from current generation (this is a stale attachment)");
454 : return Ok(decoded);
455 : }
456 : Err(DownloadError::NotFound) => {}
457 : Err(e) => return Err(e),
458 : };
459 :
460 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded the object
461 : // we are seeking in that generation. We may safely start from this index without doing a listing, because:
462 : // - We checked for current generation case above
463 : // - generations > my_generation are to be ignored
464 : // - any other objects that exist would have an older generation than `previous_gen`, and
465 : // we want to find the most recent object from a previous generation.
466 : //
467 : // This is an optimization to avoid doing the listing for the general case below.
468 : let res = do_download(
469 : storage,
470 : tenant_shard_id,
471 : timeline_id,
472 : my_generation.previous(),
473 : cancel,
474 : )
475 : .await;
476 : match res {
477 : Ok(decoded) => {
478 : tracing::debug!("Found {what} from previous generation");
479 : return Ok(decoded);
480 : }
481 : Err(DownloadError::NotFound) => {
482 : tracing::debug!("No {what} found from previous generation, falling back to listing");
483 : }
484 : Err(e) => {
485 : return Err(e);
486 : }
487 : }
488 :
489 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
490 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
491 : // to constructing a full index path with no generation, because the generation is a suffix.
492 : let paths = download_retry(
493 119 : || async {
494 119 : storage
495 119 : .list(Some(&prefix), ListingMode::NoDelimiter, None, cancel)
496 119 : .await
497 238 : },
498 : "list index_part files",
499 : cancel,
500 : )
501 : .await?
502 : .keys;
503 :
504 : // General case logic for which index to use: the latest index whose generation
505 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
506 : let max_previous_generation = paths
507 : .into_iter()
508 9 : .filter_map(|o| parse_path(o.key))
509 6 : .filter(|g| g <= &my_generation)
510 : .max();
511 :
512 : match max_previous_generation {
513 : Some(g) => {
514 : tracing::debug!("Found {what} in generation {g:?}");
515 : do_download(storage, tenant_shard_id, timeline_id, g, cancel).await
516 : }
517 : None => {
518 : // Migration from legacy pre-generation state: we have a generation but no prior
519 : // attached pageservers did. Try to load from a no-generation path.
520 : tracing::debug!("No {what}* found");
521 : do_download(
522 : storage,
523 : tenant_shard_id,
524 : timeline_id,
525 : Generation::none(),
526 : cancel,
527 : )
528 : .await
529 : }
530 : }
531 : }
532 :
533 : /// index_part.json objects are suffixed with a generation number, so we cannot
534 : /// directly GET the latest index part without doing some probing.
535 : ///
536 : /// In this function we probe for the most recent index in a generation <= our current generation.
537 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
538 10 : pub(crate) async fn download_index_part(
539 10 : storage: &GenericRemoteStorage,
540 10 : tenant_shard_id: &TenantShardId,
541 10 : timeline_id: &TimelineId,
542 10 : my_generation: Generation,
543 10 : cancel: &CancellationToken,
544 10 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
545 10 : debug_assert_current_span_has_tenant_and_timeline_id();
546 :
547 10 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
548 10 : download_generation_object(
549 10 : storage,
550 10 : tenant_shard_id,
551 10 : Some(timeline_id),
552 10 : my_generation,
553 10 : "index_part",
554 10 : index_prefix,
555 10 : do_download_index_part,
556 10 : parse_remote_index_path,
557 10 : cancel,
558 10 : )
559 10 : .await
560 10 : }
561 :
562 119 : pub(crate) async fn download_tenant_manifest(
563 119 : storage: &GenericRemoteStorage,
564 119 : tenant_shard_id: &TenantShardId,
565 119 : my_generation: Generation,
566 119 : cancel: &CancellationToken,
567 119 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
568 119 : let manifest_prefix = remote_tenant_manifest_prefix(tenant_shard_id);
569 :
570 119 : download_generation_object(
571 119 : storage,
572 119 : tenant_shard_id,
573 119 : None,
574 119 : my_generation,
575 119 : "tenant-manifest",
576 119 : manifest_prefix,
577 119 : do_download_tenant_manifest,
578 119 : parse_remote_tenant_manifest_path,
579 119 : cancel,
580 119 : )
581 119 : .await
582 119 : }
583 :
584 1 : pub(crate) async fn download_initdb_tar_zst(
585 1 : conf: &'static PageServerConf,
586 1 : storage: &GenericRemoteStorage,
587 1 : tenant_shard_id: &TenantShardId,
588 1 : timeline_id: &TimelineId,
589 1 : cancel: &CancellationToken,
590 1 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
591 1 : debug_assert_current_span_has_tenant_and_timeline_id();
592 :
593 1 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
594 :
595 1 : let remote_preserved_path =
596 1 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
597 :
598 1 : let timeline_path = conf.timelines_path(tenant_shard_id);
599 :
600 1 : if !timeline_path.exists() {
601 0 : tokio::fs::create_dir_all(&timeline_path)
602 0 : .await
603 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
604 0 : .map_err(DownloadError::Other)?;
605 1 : }
606 1 : let temp_path = timeline_path.join(format!(
607 1 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
608 1 : ));
609 :
610 1 : let file = download_retry(
611 1 : || async {
612 1 : let file = OpenOptions::new()
613 1 : .create(true)
614 1 : .truncate(true)
615 1 : .read(true)
616 1 : .write(true)
617 1 : .open(&temp_path)
618 1 : .await
619 1 : .with_context(|| format!("tempfile creation {temp_path}"))
620 1 : .map_err(DownloadError::Other)?;
621 :
622 1 : let download = match storage
623 1 : .download(&remote_path, &DownloadOpts::default(), cancel)
624 1 : .await
625 : {
626 1 : Ok(dl) => dl,
627 : Err(DownloadError::NotFound) => {
628 0 : storage
629 0 : .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
630 0 : .await?
631 : }
632 0 : Err(other) => Err(other)?,
633 : };
634 1 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
635 1 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
636 :
637 1 : tokio::io::copy_buf(&mut download, &mut writer).await?;
638 :
639 1 : let mut file = writer.into_inner();
640 :
641 1 : file.seek(std::io::SeekFrom::Start(0))
642 1 : .await
643 1 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
644 1 : .map_err(DownloadError::Other)?;
645 :
646 1 : Ok(file)
647 2 : },
648 1 : &format!("download {remote_path}"),
649 1 : cancel,
650 : )
651 1 : .await
652 1 : .inspect_err(|_e| {
653 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
654 : // We don't have async here nor do we want to pile on any extra errors.
655 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
656 0 : if e.kind() != std::io::ErrorKind::NotFound {
657 0 : warn!("error deleting temporary file {temp_path}: {e}");
658 0 : }
659 0 : }
660 0 : })?;
661 :
662 1 : Ok((temp_path, file))
663 1 : }
664 :
665 : /// Helper function to handle retries for a download operation.
666 : ///
667 : /// Remote operations can fail due to rate limits (S3), spurious network
668 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
669 : /// with backoff.
670 : ///
671 : /// (See similar logic for uploads in `perform_upload_task`)
672 127 : pub(super) async fn download_retry<T, O, F>(
673 127 : op: O,
674 127 : description: &str,
675 127 : cancel: &CancellationToken,
676 127 : ) -> Result<T, DownloadError>
677 127 : where
678 127 : O: FnMut() -> F,
679 127 : F: Future<Output = Result<T, DownloadError>>,
680 127 : {
681 127 : backoff::retry(
682 127 : op,
683 127 : DownloadError::is_permanent,
684 127 : FAILED_DOWNLOAD_WARN_THRESHOLD,
685 127 : FAILED_REMOTE_OP_RETRIES,
686 127 : description,
687 127 : cancel,
688 127 : )
689 127 : .await
690 127 : .ok_or_else(|| DownloadError::Cancelled)
691 127 : .and_then(|x| x)
692 127 : }
693 :
694 487 : pub(crate) async fn download_retry_forever<T, O, F>(
695 487 : op: O,
696 487 : description: &str,
697 487 : cancel: &CancellationToken,
698 487 : ) -> Result<T, DownloadError>
699 487 : where
700 487 : O: FnMut() -> F,
701 487 : F: Future<Output = Result<T, DownloadError>>,
702 487 : {
703 487 : backoff::retry(
704 487 : op,
705 487 : DownloadError::is_permanent,
706 487 : FAILED_DOWNLOAD_WARN_THRESHOLD,
707 487 : u32::MAX,
708 487 : description,
709 487 : cancel,
710 487 : )
711 487 : .await
712 487 : .ok_or_else(|| DownloadError::Cancelled)
713 487 : .and_then(|x| x)
714 487 : }
|