Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::str::FromStr;
9 : use std::sync::atomic::AtomicU64;
10 : use std::time::SystemTime;
11 :
12 : use anyhow::{Context, anyhow};
13 : use camino::{Utf8Path, Utf8PathBuf};
14 : use pageserver_api::shard::TenantShardId;
15 : use remote_storage::{
16 : DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
17 : };
18 : use tokio::fs::{self, File, OpenOptions};
19 : use tokio::io::AsyncSeekExt;
20 : use tokio_util::io::StreamReader;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::warn;
23 : use utils::crashsafe::path_with_suffix_extension;
24 : use utils::id::{TenantId, TimelineId};
25 : use utils::{backoff, pausable_failpoint};
26 :
27 : use super::index::{IndexPart, LayerFileMetadata};
28 : use super::manifest::TenantManifest;
29 : use super::{
30 : FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
31 : parse_remote_tenant_manifest_path, remote_index_path, remote_initdb_archive_path,
32 : remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
33 : remote_tenant_manifest_prefix, remote_tenant_path,
34 : };
35 : use crate::TEMP_FILE_SUFFIX;
36 : use crate::config::PageServerConf;
37 : use crate::context::RequestContext;
38 : use crate::span::{
39 : debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
40 : };
41 : use crate::tenant::Generation;
42 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
43 : use crate::tenant::storage_layer::LayerName;
44 : use crate::virtual_file;
45 : use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
46 : use crate::virtual_file::{IoBufferMut, MaybeFatalIo, VirtualFile};
47 : use crate::virtual_file::{TempVirtualFile, owned_buffers_io};
48 :
49 : ///
50 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
51 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
52 : ///
53 : /// Returns the size of the downloaded file.
54 : #[allow(clippy::too_many_arguments)]
55 84 : pub async fn download_layer_file<'a>(
56 84 : conf: &'static PageServerConf,
57 84 : storage: &'a GenericRemoteStorage,
58 84 : tenant_shard_id: TenantShardId,
59 84 : timeline_id: TimelineId,
60 84 : layer_file_name: &'a LayerName,
61 84 : layer_metadata: &'a LayerFileMetadata,
62 84 : local_path: &Utf8Path,
63 84 : gate: &utils::sync::gate::Gate,
64 84 : cancel: &CancellationToken,
65 84 : ctx: &RequestContext,
66 84 : ) -> Result<u64, DownloadError> {
67 84 : debug_assert_current_span_has_tenant_and_timeline_id();
68 84 :
69 84 : let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
70 84 :
71 84 : let remote_path = remote_layer_path(
72 84 : &tenant_shard_id.tenant_id,
73 84 : &timeline_id,
74 84 : layer_metadata.shard,
75 84 : layer_file_name,
76 84 : layer_metadata.generation,
77 84 : );
78 :
79 84 : let (bytes_amount, temp_file) = download_retry(
80 84 : || async {
81 : // TempVirtualFile requires us to never reuse a filename while an old
82 : // instance of TempVirtualFile created with that filename is not done dropping yet.
83 : // So, we use a monotonic counter to disambiguate the filenames.
84 : static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
85 84 : let filename_disambiguator =
86 84 : NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
87 84 :
88 84 : let temp_file_path = path_with_suffix_extension(
89 84 : local_path,
90 84 : &format!("{filename_disambiguator:x}.{TEMP_DOWNLOAD_EXTENSION}"),
91 84 : );
92 :
93 84 : let temp_file = TempVirtualFile::new(
94 84 : VirtualFile::open_with_options_v2(
95 84 : &temp_file_path,
96 84 : virtual_file::OpenOptions::new()
97 84 : .create_new(true)
98 84 : .write(true),
99 84 : ctx,
100 84 : )
101 84 : .await
102 84 : .with_context(|| format!("create a temp file for layer download: {temp_file_path}"))
103 84 : .map_err(DownloadError::Other)?,
104 84 : gate.enter().map_err(|_| DownloadError::Cancelled)?,
105 : );
106 84 : download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await
107 168 : },
108 84 : &format!("download {remote_path:?}"),
109 84 : cancel,
110 84 : )
111 84 : .await?;
112 :
113 84 : let expected = layer_metadata.file_size;
114 84 : if expected != bytes_amount {
115 0 : return Err(DownloadError::Other(anyhow!(
116 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}",
117 0 : temp_file.path()
118 0 : )));
119 84 : }
120 84 :
121 84 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
122 0 : Err(DownloadError::Other(anyhow!(
123 0 : "remote-storage-download-pre-rename failpoint triggered"
124 0 : )))
125 84 : });
126 :
127 : // Try rename before disarming the temp file.
128 : // That way, if rename fails for whatever reason, we clean up the temp file on the return path.
129 :
130 84 : fs::rename(temp_file.path(), &local_path)
131 84 : .await
132 84 : .with_context(|| format!("rename download layer file to {local_path}"))
133 84 : .map_err(DownloadError::Other)?;
134 :
135 : // The temp file's VirtualFile points to the temp_file_path which we moved above.
136 : // Drop it immediately, it's invalid.
137 : // This will get better in https://github.com/neondatabase/neon/issues/11692
138 84 : let _: VirtualFile = temp_file.disarm_into_inner();
139 84 : // NB: The gate guard that was stored in `temp_file` is dropped but we continue
140 84 : // to operate on it and on the parent timeline directory.
141 84 : // Those operations are safe to do because higher-level code is holding another gate guard:
142 84 : // - attached mode: the download task spawned by struct Layer is holding the gate guard
143 84 : // - secondary mode: The TenantDownloader::download holds the gate open
144 84 :
145 84 : // The rename above is not durable yet.
146 84 : // It doesn't matter for crash consistency because pageserver startup deletes temp
147 84 : // files and we'll re-download on demand if necessary.
148 84 :
149 84 : // We use fatal_err() below because the after the rename above,
150 84 : // the in-memory state of the filesystem already has the layer file in its final place,
151 84 : // and subsequent pageserver code could think it's durable while it really isn't.
152 84 : let work = {
153 84 : let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
154 84 : async move {
155 84 : let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
156 84 : .await
157 84 : .fatal_err("VirtualFile::open for timeline dir fsync");
158 84 : timeline_dir
159 84 : .sync_all()
160 84 : .await
161 84 : .fatal_err("VirtualFile::sync_all timeline dir");
162 84 : }
163 : };
164 84 : crate::virtual_file::io_engine::get()
165 84 : .spawn_blocking_and_block_on_if_std(work)
166 84 : .await;
167 :
168 84 : tracing::debug!("download complete: {local_path}");
169 :
170 84 : Ok(bytes_amount)
171 84 : }
172 :
173 : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
174 : ///
175 : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
176 : /// (Note that the directory entry for the inode is not made durable.)
177 : /// The file size in bytes is returned.
178 : ///
179 : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
180 : /// The unlinking has _not_ been made durable.
181 84 : async fn download_object(
182 84 : storage: &GenericRemoteStorage,
183 84 : src_path: &RemotePath,
184 84 : destination_file: TempVirtualFile,
185 84 : gate: &utils::sync::gate::Gate,
186 84 : cancel: &CancellationToken,
187 84 : ctx: &RequestContext,
188 84 : ) -> Result<(u64, TempVirtualFile), DownloadError> {
189 84 : let mut download = storage
190 84 : .download(src_path, &DownloadOpts::default(), cancel)
191 84 : .await?;
192 :
193 84 : pausable_failpoint!("before-downloading-layer-stream-pausable");
194 :
195 84 : let dst_path = destination_file.path().to_owned();
196 84 : let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
197 84 : destination_file,
198 : 0,
199 168 : || IoBufferMut::with_capacity(super::BUFFER_SIZE),
200 84 : gate.enter().map_err(|_| DownloadError::Cancelled)?,
201 84 : cancel.child_token(),
202 84 : ctx,
203 84 : tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
204 : );
205 :
206 : // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
207 : // There's chunks_vectored() on the stream.
208 84 : let (bytes_amount, destination_file) = async {
209 588 : while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await {
210 504 : let chunk = match res {
211 504 : Ok(chunk) => chunk,
212 0 : Err(e) => return Err(DownloadError::from(e)),
213 : };
214 504 : buffered
215 504 : .write_buffered_borrowed(&chunk, ctx)
216 504 : .await
217 504 : .map_err(|e| match e {
218 0 : FlushTaskError::Cancelled => DownloadError::Cancelled,
219 504 : })?;
220 : }
221 84 : buffered
222 84 : .shutdown(
223 84 : owned_buffers_io::write::BufferedWriterShutdownMode::PadThenTruncate,
224 84 : ctx,
225 84 : )
226 84 : .await
227 84 : .map_err(|e| match e {
228 0 : FlushTaskError::Cancelled => DownloadError::Cancelled,
229 84 : })
230 84 : }
231 84 : .await?;
232 :
233 : // not using sync_data because it can lose file size update
234 84 : destination_file
235 84 : .sync_all()
236 84 : .await
237 84 : .maybe_fatal_err("download_object sync_all")
238 84 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
239 84 : .map_err(DownloadError::Other)?;
240 :
241 84 : Ok((bytes_amount, destination_file))
242 84 : }
243 :
244 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
245 :
246 0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
247 0 : let extension = path.extension();
248 0 : match extension {
249 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
250 0 : Some(_) => false,
251 0 : None => false,
252 : }
253 0 : }
254 :
255 1392 : async fn list_identifiers<T>(
256 1392 : storage: &GenericRemoteStorage,
257 1392 : prefix: RemotePath,
258 1392 : cancel: CancellationToken,
259 1392 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
260 1392 : where
261 1392 : T: FromStr + Eq + std::hash::Hash,
262 1392 : {
263 1392 : let listing = download_retry_forever(
264 1392 : || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
265 1392 : &format!("list identifiers in prefix {prefix}"),
266 1392 : &cancel,
267 1392 : )
268 1392 : .await?;
269 :
270 1392 : let mut parsed_ids = HashSet::new();
271 1392 : let mut other_prefixes = HashSet::new();
272 :
273 1428 : for id_remote_storage_key in listing.prefixes {
274 36 : let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
275 0 : anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
276 36 : })?;
277 :
278 36 : match object_name.parse::<T>() {
279 36 : Ok(t) => parsed_ids.insert(t),
280 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
281 : };
282 : }
283 :
284 1392 : for object in listing.keys {
285 0 : let object_name = object
286 0 : .key
287 0 : .object_name()
288 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
289 0 : other_prefixes.insert(object_name.to_string());
290 : }
291 :
292 1392 : Ok((parsed_ids, other_prefixes))
293 1392 : }
294 :
295 : /// List shards of given tenant in remote storage
296 0 : pub(crate) async fn list_remote_tenant_shards(
297 0 : storage: &GenericRemoteStorage,
298 0 : tenant_id: TenantId,
299 0 : cancel: CancellationToken,
300 0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
301 0 : let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
302 0 : list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
303 0 : }
304 :
305 : /// List timelines of given tenant shard in remote storage
306 1392 : pub async fn list_remote_timelines(
307 1392 : storage: &GenericRemoteStorage,
308 1392 : tenant_shard_id: TenantShardId,
309 1392 : cancel: CancellationToken,
310 1392 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
311 1392 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
312 0 : anyhow::bail!("storage-sync-list-remote-timelines");
313 1392 : });
314 :
315 1392 : let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
316 1392 : list_identifiers::<TimelineId>(storage, remote_path, cancel).await
317 1392 : }
318 :
319 4308 : async fn do_download_remote_path_retry_forever(
320 4308 : storage: &GenericRemoteStorage,
321 4308 : remote_path: &RemotePath,
322 4308 : download_opts: DownloadOpts,
323 4308 : cancel: &CancellationToken,
324 4308 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
325 4308 : download_retry_forever(
326 4308 : || async {
327 4308 : let download = storage
328 4308 : .download(remote_path, &download_opts, cancel)
329 4308 : .await?;
330 :
331 156 : let mut bytes = Vec::new();
332 156 :
333 156 : let stream = download.download_stream;
334 156 : let mut stream = StreamReader::new(stream);
335 156 :
336 156 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
337 :
338 156 : Ok((bytes, download.last_modified))
339 8616 : },
340 4308 : &format!("download {remote_path:?}"),
341 4308 : cancel,
342 4308 : )
343 4308 : .await
344 4308 : }
345 :
346 4104 : async fn do_download_tenant_manifest(
347 4104 : storage: &GenericRemoteStorage,
348 4104 : tenant_shard_id: &TenantShardId,
349 4104 : _timeline_id: Option<&TimelineId>,
350 4104 : generation: Generation,
351 4104 : cancel: &CancellationToken,
352 4104 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
353 4104 : let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
354 4104 :
355 4104 : let download_opts = DownloadOpts {
356 4104 : kind: DownloadKind::Small,
357 4104 : ..Default::default()
358 4104 : };
359 :
360 36 : let (manifest_bytes, manifest_bytes_mtime) =
361 4104 : do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
362 :
363 36 : let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
364 36 : .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
365 36 : .map_err(DownloadError::Other)?;
366 :
367 36 : Ok((tenant_manifest, generation, manifest_bytes_mtime))
368 4104 : }
369 :
370 204 : async fn do_download_index_part(
371 204 : storage: &GenericRemoteStorage,
372 204 : tenant_shard_id: &TenantShardId,
373 204 : timeline_id: Option<&TimelineId>,
374 204 : index_generation: Generation,
375 204 : cancel: &CancellationToken,
376 204 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
377 204 : let timeline_id =
378 204 : timeline_id.expect("A timeline ID is always provided when downloading an index");
379 204 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
380 204 :
381 204 : let download_opts = DownloadOpts {
382 204 : kind: DownloadKind::Small,
383 204 : ..Default::default()
384 204 : };
385 :
386 120 : let (index_part_bytes, index_part_mtime) =
387 204 : do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
388 :
389 120 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
390 120 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
391 120 : .map_err(DownloadError::Other)?;
392 :
393 120 : Ok((index_part, index_generation, index_part_mtime))
394 204 : }
395 :
396 : /// Metadata objects are "generationed", meaning that they include a generation suffix. This
397 : /// function downloads the object with the highest generation <= `my_generation`.
398 : ///
399 : /// Data objects (layer files) also include a generation in their path, but there is no equivalent
400 : /// search process, because their reference from an index includes the generation.
401 : ///
402 : /// An expensive object listing operation is only done if necessary: the typical fast path is to issue two
403 : /// GET operations, one to our own generation (stale attachment case), and one to the immediately preceding
404 : /// generation (normal case when migrating/restarting). Only if both of these return 404 do we fall back
405 : /// to listing objects.
406 : ///
407 : /// * `my_generation`: the value of `[crate::tenant::TenantShard::generation]`
408 : /// * `what`: for logging, what object are we downloading
409 : /// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
410 : /// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless
411 : /// `cancel`` has fired. This function does not do its own retries of GET operations, and relies
412 : /// on the function passed in to do so.
413 : /// * `parse_path`: parse a fully qualified remote storage path to get the generation of the object.
414 : #[allow(clippy::too_many_arguments)]
415 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
416 : pub(crate) async fn download_generation_object<'a, T, DF, DFF, PF>(
417 : storage: &'a GenericRemoteStorage,
418 : tenant_shard_id: &'a TenantShardId,
419 : timeline_id: Option<&'a TimelineId>,
420 : my_generation: Generation,
421 : what: &str,
422 : prefix: RemotePath,
423 : do_download: DF,
424 : parse_path: PF,
425 : cancel: &'a CancellationToken,
426 : ) -> Result<(T, Generation, SystemTime), DownloadError>
427 : where
428 : DF: Fn(
429 : &'a GenericRemoteStorage,
430 : &'a TenantShardId,
431 : Option<&'a TimelineId>,
432 : Generation,
433 : &'a CancellationToken,
434 : ) -> DFF,
435 : DFF: Future<Output = Result<(T, Generation, SystemTime), DownloadError>>,
436 : PF: Fn(RemotePath) -> Option<Generation>,
437 : T: 'static,
438 : {
439 : debug_assert_current_span_has_tenant_id();
440 :
441 : if my_generation.is_none() {
442 : // Operating without generations: just fetch the generation-less path
443 : return do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
444 : }
445 :
446 : // Stale case: If we were intentionally attached in a stale generation, the remote object may already
447 : // exist in our generation.
448 : //
449 : // This is an optimization to avoid doing the listing for the general case below.
450 : let res = do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
451 : match res {
452 : Ok(decoded) => {
453 : tracing::debug!("Found {what} from current generation (this is a stale attachment)");
454 : return Ok(decoded);
455 : }
456 : Err(DownloadError::NotFound) => {}
457 : Err(e) => return Err(e),
458 : };
459 :
460 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded the object
461 : // we are seeking in that generation. We may safely start from this index without doing a listing, because:
462 : // - We checked for current generation case above
463 : // - generations > my_generation are to be ignored
464 : // - any other objects that exist would have an older generation than `previous_gen`, and
465 : // we want to find the most recent object from a previous generation.
466 : //
467 : // This is an optimization to avoid doing the listing for the general case below.
468 : let res = do_download(
469 : storage,
470 : tenant_shard_id,
471 : timeline_id,
472 : my_generation.previous(),
473 : cancel,
474 : )
475 : .await;
476 : match res {
477 : Ok(decoded) => {
478 : tracing::debug!("Found {what} from previous generation");
479 : return Ok(decoded);
480 : }
481 : Err(DownloadError::NotFound) => {
482 : tracing::debug!("No {what} found from previous generation, falling back to listing");
483 : }
484 : Err(e) => {
485 : return Err(e);
486 : }
487 : }
488 :
489 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
490 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
491 : // to constructing a full index path with no generation, because the generation is a suffix.
492 : let paths = download_retry(
493 1392 : || async {
494 1392 : storage
495 1392 : .list(Some(&prefix), ListingMode::NoDelimiter, None, cancel)
496 1392 : .await
497 2784 : },
498 : "list index_part files",
499 : cancel,
500 : )
501 : .await?
502 : .keys;
503 :
504 : // General case logic for which index to use: the latest index whose generation
505 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
506 : let max_previous_generation = paths
507 : .into_iter()
508 108 : .filter_map(|o| parse_path(o.key))
509 72 : .filter(|g| g <= &my_generation)
510 : .max();
511 :
512 : match max_previous_generation {
513 : Some(g) => {
514 : tracing::debug!("Found {what} in generation {g:?}");
515 : do_download(storage, tenant_shard_id, timeline_id, g, cancel).await
516 : }
517 : None => {
518 : // Migration from legacy pre-generation state: we have a generation but no prior
519 : // attached pageservers did. Try to load from a no-generation path.
520 : tracing::debug!("No {what}* found");
521 : do_download(
522 : storage,
523 : tenant_shard_id,
524 : timeline_id,
525 : Generation::none(),
526 : cancel,
527 : )
528 : .await
529 : }
530 : }
531 : }
532 :
533 : /// index_part.json objects are suffixed with a generation number, so we cannot
534 : /// directly GET the latest index part without doing some probing.
535 : ///
536 : /// In this function we probe for the most recent index in a generation <= our current generation.
537 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
538 120 : pub(crate) async fn download_index_part(
539 120 : storage: &GenericRemoteStorage,
540 120 : tenant_shard_id: &TenantShardId,
541 120 : timeline_id: &TimelineId,
542 120 : my_generation: Generation,
543 120 : cancel: &CancellationToken,
544 120 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
545 120 : debug_assert_current_span_has_tenant_and_timeline_id();
546 120 :
547 120 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
548 120 : download_generation_object(
549 120 : storage,
550 120 : tenant_shard_id,
551 120 : Some(timeline_id),
552 120 : my_generation,
553 120 : "index_part",
554 120 : index_prefix,
555 120 : do_download_index_part,
556 120 : parse_remote_index_path,
557 120 : cancel,
558 120 : )
559 120 : .await
560 120 : }
561 :
562 1392 : pub(crate) async fn download_tenant_manifest(
563 1392 : storage: &GenericRemoteStorage,
564 1392 : tenant_shard_id: &TenantShardId,
565 1392 : my_generation: Generation,
566 1392 : cancel: &CancellationToken,
567 1392 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
568 1392 : let manifest_prefix = remote_tenant_manifest_prefix(tenant_shard_id);
569 1392 :
570 1392 : download_generation_object(
571 1392 : storage,
572 1392 : tenant_shard_id,
573 1392 : None,
574 1392 : my_generation,
575 1392 : "tenant-manifest",
576 1392 : manifest_prefix,
577 1392 : do_download_tenant_manifest,
578 1392 : parse_remote_tenant_manifest_path,
579 1392 : cancel,
580 1392 : )
581 1392 : .await
582 1392 : }
583 :
584 12 : pub(crate) async fn download_initdb_tar_zst(
585 12 : conf: &'static PageServerConf,
586 12 : storage: &GenericRemoteStorage,
587 12 : tenant_shard_id: &TenantShardId,
588 12 : timeline_id: &TimelineId,
589 12 : cancel: &CancellationToken,
590 12 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
591 12 : debug_assert_current_span_has_tenant_and_timeline_id();
592 12 :
593 12 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
594 12 :
595 12 : let remote_preserved_path =
596 12 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
597 12 :
598 12 : let timeline_path = conf.timelines_path(tenant_shard_id);
599 12 :
600 12 : if !timeline_path.exists() {
601 0 : tokio::fs::create_dir_all(&timeline_path)
602 0 : .await
603 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
604 0 : .map_err(DownloadError::Other)?;
605 12 : }
606 12 : let temp_path = timeline_path.join(format!(
607 12 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
608 12 : ));
609 :
610 12 : let file = download_retry(
611 12 : || async {
612 12 : let file = OpenOptions::new()
613 12 : .create(true)
614 12 : .truncate(true)
615 12 : .read(true)
616 12 : .write(true)
617 12 : .open(&temp_path)
618 12 : .await
619 12 : .with_context(|| format!("tempfile creation {temp_path}"))
620 12 : .map_err(DownloadError::Other)?;
621 :
622 12 : let download = match storage
623 12 : .download(&remote_path, &DownloadOpts::default(), cancel)
624 12 : .await
625 : {
626 12 : Ok(dl) => dl,
627 : Err(DownloadError::NotFound) => {
628 0 : storage
629 0 : .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
630 0 : .await?
631 : }
632 0 : Err(other) => Err(other)?,
633 : };
634 12 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
635 12 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
636 12 :
637 12 : tokio::io::copy_buf(&mut download, &mut writer).await?;
638 :
639 12 : let mut file = writer.into_inner();
640 12 :
641 12 : file.seek(std::io::SeekFrom::Start(0))
642 12 : .await
643 12 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
644 12 : .map_err(DownloadError::Other)?;
645 :
646 12 : Ok(file)
647 24 : },
648 12 : &format!("download {remote_path}"),
649 12 : cancel,
650 12 : )
651 12 : .await
652 12 : .inspect_err(|_e| {
653 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
654 : // We don't have async here nor do we want to pile on any extra errors.
655 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
656 0 : if e.kind() != std::io::ErrorKind::NotFound {
657 0 : warn!("error deleting temporary file {temp_path}: {e}");
658 0 : }
659 0 : }
660 12 : })?;
661 :
662 12 : Ok((temp_path, file))
663 12 : }
664 :
665 : /// Helper function to handle retries for a download operation.
666 : ///
667 : /// Remote operations can fail due to rate limits (S3), spurious network
668 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
669 : /// with backoff.
670 : ///
671 : /// (See similar logic for uploads in `perform_upload_task`)
672 1488 : pub(super) async fn download_retry<T, O, F>(
673 1488 : op: O,
674 1488 : description: &str,
675 1488 : cancel: &CancellationToken,
676 1488 : ) -> Result<T, DownloadError>
677 1488 : where
678 1488 : O: FnMut() -> F,
679 1488 : F: Future<Output = Result<T, DownloadError>>,
680 1488 : {
681 1488 : backoff::retry(
682 1488 : op,
683 1488 : DownloadError::is_permanent,
684 1488 : FAILED_DOWNLOAD_WARN_THRESHOLD,
685 1488 : FAILED_REMOTE_OP_RETRIES,
686 1488 : description,
687 1488 : cancel,
688 1488 : )
689 1488 : .await
690 1488 : .ok_or_else(|| DownloadError::Cancelled)
691 1488 : .and_then(|x| x)
692 1488 : }
693 :
694 5700 : pub(crate) async fn download_retry_forever<T, O, F>(
695 5700 : op: O,
696 5700 : description: &str,
697 5700 : cancel: &CancellationToken,
698 5700 : ) -> Result<T, DownloadError>
699 5700 : where
700 5700 : O: FnMut() -> F,
701 5700 : F: Future<Output = Result<T, DownloadError>>,
702 5700 : {
703 5700 : backoff::retry(
704 5700 : op,
705 5700 : DownloadError::is_permanent,
706 5700 : FAILED_DOWNLOAD_WARN_THRESHOLD,
707 5700 : u32::MAX,
708 5700 : description,
709 5700 : cancel,
710 5700 : )
711 5700 : .await
712 5700 : .ok_or_else(|| DownloadError::Cancelled)
713 5700 : .and_then(|x| x)
714 5700 : }
|