Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::str::FromStr;
9 : use std::time::SystemTime;
10 :
11 : use anyhow::{anyhow, Context};
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use pageserver_api::shard::TenantShardId;
14 : use tokio::fs::{self, File, OpenOptions};
15 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
16 : use tokio_util::io::StreamReader;
17 : use tokio_util::sync::CancellationToken;
18 : use tracing::warn;
19 : use utils::backoff;
20 :
21 : use crate::config::PageServerConf;
22 : use crate::context::RequestContext;
23 : use crate::span::{
24 : debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
25 : };
26 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
27 : use crate::tenant::storage_layer::LayerName;
28 : use crate::tenant::Generation;
29 : #[cfg_attr(target_os = "macos", allow(unused_imports))]
30 : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
31 : use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
32 : use crate::TEMP_FILE_SUFFIX;
33 : use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath};
34 : use utils::crashsafe::path_with_suffix_extension;
35 : use utils::id::{TenantId, TimelineId};
36 : use utils::pausable_failpoint;
37 :
38 : use super::index::{IndexPart, LayerFileMetadata};
39 : use super::manifest::TenantManifest;
40 : use super::{
41 : parse_remote_index_path, parse_remote_tenant_manifest_path, remote_index_path,
42 : remote_initdb_archive_path, remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
43 : remote_tenant_manifest_prefix, remote_tenant_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
44 : FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
45 : };
46 :
47 : ///
48 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
49 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
50 : ///
51 : /// Returns the size of the downloaded file.
52 : #[allow(clippy::too_many_arguments)]
53 6 : pub async fn download_layer_file<'a>(
54 6 : conf: &'static PageServerConf,
55 6 : storage: &'a GenericRemoteStorage,
56 6 : tenant_shard_id: TenantShardId,
57 6 : timeline_id: TimelineId,
58 6 : layer_file_name: &'a LayerName,
59 6 : layer_metadata: &'a LayerFileMetadata,
60 6 : local_path: &Utf8Path,
61 6 : cancel: &CancellationToken,
62 6 : ctx: &RequestContext,
63 6 : ) -> Result<u64, DownloadError> {
64 6 : debug_assert_current_span_has_tenant_and_timeline_id();
65 6 :
66 6 : let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
67 6 :
68 6 : let remote_path = remote_layer_path(
69 6 : &tenant_shard_id.tenant_id,
70 6 : &timeline_id,
71 6 : layer_metadata.shard,
72 6 : layer_file_name,
73 6 : layer_metadata.generation,
74 6 : );
75 6 :
76 6 : // Perform a rename inspired by durable_rename from file_utils.c.
77 6 : // The sequence:
78 6 : // write(tmp)
79 6 : // fsync(tmp)
80 6 : // rename(tmp, new)
81 6 : // fsync(new)
82 6 : // fsync(parent)
83 6 : // For more context about durable_rename check this email from postgres mailing list:
84 6 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
85 6 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
86 6 : let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
87 :
88 6 : let bytes_amount = download_retry(
89 63 : || async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await },
90 6 : &format!("download {remote_path:?}"),
91 6 : cancel,
92 6 : )
93 63 : .await?;
94 :
95 6 : let expected = layer_metadata.file_size;
96 6 : if expected != bytes_amount {
97 0 : return Err(DownloadError::Other(anyhow!(
98 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
99 0 : )));
100 6 : }
101 6 :
102 6 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
103 0 : Err(DownloadError::Other(anyhow!(
104 0 : "remote-storage-download-pre-rename failpoint triggered"
105 0 : )))
106 6 : });
107 :
108 6 : fs::rename(&temp_file_path, &local_path)
109 6 : .await
110 6 : .with_context(|| format!("rename download layer file to {local_path}"))
111 6 : .map_err(DownloadError::Other)?;
112 :
113 : // We use fatal_err() below because the after the rename above,
114 : // the in-memory state of the filesystem already has the layer file in its final place,
115 : // and subsequent pageserver code could think it's durable while it really isn't.
116 6 : let work = {
117 6 : let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
118 6 : async move {
119 6 : let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
120 3 : .await
121 6 : .fatal_err("VirtualFile::open for timeline dir fsync");
122 6 : timeline_dir
123 6 : .sync_all()
124 3 : .await
125 6 : .fatal_err("VirtualFile::sync_all timeline dir");
126 6 : }
127 : };
128 6 : crate::virtual_file::io_engine::get()
129 6 : .spawn_blocking_and_block_on_if_std(work)
130 9 : .await;
131 :
132 6 : tracing::debug!("download complete: {local_path}");
133 :
134 6 : Ok(bytes_amount)
135 6 : }
136 :
137 : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
138 : ///
139 : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
140 : /// (Note that the directory entry for the inode is not made durable.)
141 : /// The file size in bytes is returned.
142 : ///
143 : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
144 : /// The unlinking has _not_ been made durable.
145 6 : async fn download_object<'a>(
146 6 : storage: &'a GenericRemoteStorage,
147 6 : src_path: &RemotePath,
148 6 : dst_path: &Utf8PathBuf,
149 6 : cancel: &CancellationToken,
150 6 : #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
151 6 : ) -> Result<u64, DownloadError> {
152 6 : let res = match crate::virtual_file::io_engine::get() {
153 0 : crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
154 : crate::virtual_file::io_engine::IoEngine::StdFs => {
155 3 : async {
156 3 : let destination_file = tokio::fs::File::create(dst_path)
157 3 : .await
158 3 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
159 3 : .map_err(DownloadError::Other)?;
160 :
161 3 : let download = storage
162 3 : .download(src_path, &DownloadOpts::default(), cancel)
163 6 : .await?;
164 :
165 3 : pausable_failpoint!("before-downloading-layer-stream-pausable");
166 :
167 3 : let mut buf_writer =
168 3 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
169 3 :
170 3 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
171 :
172 21 : let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
173 3 : buf_writer.flush().await?;
174 :
175 3 : let mut destination_file = buf_writer.into_inner();
176 3 :
177 3 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
178 3 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
179 3 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
180 3 : // you should call flush before dropping it.
181 3 : //
182 3 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
183 3 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
184 3 : // But for additional safety lets check/wait for any pending operations.
185 3 : destination_file
186 3 : .flush()
187 0 : .await
188 3 : .maybe_fatal_err("download_object sync_all")
189 3 : .with_context(|| format!("flush source file at {dst_path}"))
190 3 : .map_err(DownloadError::Other)?;
191 :
192 : // not using sync_data because it can lose file size update
193 3 : destination_file
194 3 : .sync_all()
195 3 : .await
196 3 : .maybe_fatal_err("download_object sync_all")
197 3 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
198 3 : .map_err(DownloadError::Other)?;
199 :
200 3 : Ok(bytes_amount)
201 3 : }
202 36 : .await
203 : }
204 : #[cfg(target_os = "linux")]
205 : crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
206 : use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
207 : use bytes::BytesMut;
208 3 : async {
209 3 : let destination_file = VirtualFile::create(dst_path, ctx)
210 3 : .await
211 3 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
212 3 : .map_err(DownloadError::Other)?;
213 :
214 3 : let mut download = storage
215 3 : .download(src_path, &DownloadOpts::default(), cancel)
216 4 : .await?;
217 :
218 3 : pausable_failpoint!("before-downloading-layer-stream-pausable");
219 :
220 : // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
221 : // There's chunks_vectored() on the stream.
222 3 : let (bytes_amount, destination_file) = async {
223 3 : let size_tracking = size_tracking_writer::Writer::new(destination_file);
224 3 : let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
225 3 : size_tracking,
226 3 : BytesMut::with_capacity(super::BUFFER_SIZE),
227 3 : );
228 18 : while let Some(res) =
229 21 : futures::StreamExt::next(&mut download.download_stream).await
230 : {
231 18 : let chunk = match res {
232 18 : Ok(chunk) => chunk,
233 0 : Err(e) => return Err(e),
234 : };
235 18 : buffered.write_buffered(chunk.slice_len(), ctx).await?;
236 : }
237 3 : let size_tracking = buffered.flush_and_into_inner(ctx).await?;
238 3 : Ok(size_tracking.into_inner())
239 3 : }
240 15 : .await?;
241 :
242 : // not using sync_data because it can lose file size update
243 3 : destination_file
244 3 : .sync_all()
245 3 : .await
246 3 : .maybe_fatal_err("download_object sync_all")
247 3 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
248 3 : .map_err(DownloadError::Other)?;
249 :
250 3 : Ok(bytes_amount)
251 3 : }
252 27 : .await
253 : }
254 : };
255 :
256 : // in case the download failed, clean up
257 6 : match res {
258 6 : Ok(bytes_amount) => Ok(bytes_amount),
259 0 : Err(e) => {
260 0 : if let Err(e) = tokio::fs::remove_file(dst_path).await {
261 0 : if e.kind() != std::io::ErrorKind::NotFound {
262 0 : on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
263 0 : }
264 0 : }
265 0 : Err(e)
266 : }
267 : }
268 6 : }
269 :
270 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
271 :
272 0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
273 0 : let extension = path.extension();
274 0 : match extension {
275 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
276 0 : Some(_) => false,
277 0 : None => false,
278 : }
279 0 : }
280 :
281 192 : async fn list_identifiers<T>(
282 192 : storage: &GenericRemoteStorage,
283 192 : prefix: RemotePath,
284 192 : cancel: CancellationToken,
285 192 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
286 192 : where
287 192 : T: FromStr + Eq + std::hash::Hash,
288 192 : {
289 192 : let listing = download_retry_forever(
290 192 : || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
291 192 : &format!("list identifiers in prefix {prefix}"),
292 192 : &cancel,
293 192 : )
294 769 : .await?;
295 :
296 192 : let mut parsed_ids = HashSet::new();
297 192 : let mut other_prefixes = HashSet::new();
298 :
299 198 : for id_remote_storage_key in listing.prefixes {
300 6 : let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
301 0 : anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
302 6 : })?;
303 :
304 6 : match object_name.parse::<T>() {
305 6 : Ok(t) => parsed_ids.insert(t),
306 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
307 : };
308 : }
309 :
310 192 : for object in listing.keys {
311 0 : let object_name = object
312 0 : .key
313 0 : .object_name()
314 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
315 0 : other_prefixes.insert(object_name.to_string());
316 : }
317 :
318 192 : Ok((parsed_ids, other_prefixes))
319 192 : }
320 :
321 : /// List shards of given tenant in remote storage
322 0 : pub(crate) async fn list_remote_tenant_shards(
323 0 : storage: &GenericRemoteStorage,
324 0 : tenant_id: TenantId,
325 0 : cancel: CancellationToken,
326 0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
327 0 : let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
328 0 : list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
329 0 : }
330 :
331 : /// List timelines of given tenant shard in remote storage
332 192 : pub async fn list_remote_timelines(
333 192 : storage: &GenericRemoteStorage,
334 192 : tenant_shard_id: TenantShardId,
335 192 : cancel: CancellationToken,
336 192 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
337 192 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
338 0 : anyhow::bail!("storage-sync-list-remote-timelines");
339 192 : });
340 :
341 192 : let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
342 769 : list_identifiers::<TimelineId>(storage, remote_path, cancel).await
343 192 : }
344 :
345 610 : async fn do_download_remote_path_retry_forever(
346 610 : storage: &GenericRemoteStorage,
347 610 : remote_path: &RemotePath,
348 610 : cancel: &CancellationToken,
349 610 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
350 610 : download_retry_forever(
351 610 : || async {
352 610 : let download = storage
353 610 : .download(remote_path, &DownloadOpts::default(), cancel)
354 616 : .await?;
355 :
356 20 : let mut bytes = Vec::new();
357 20 :
358 20 : let stream = download.download_stream;
359 20 : let mut stream = StreamReader::new(stream);
360 20 :
361 20 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
362 :
363 20 : Ok((bytes, download.last_modified))
364 1220 : },
365 610 : &format!("download {remote_path:?}"),
366 610 : cancel,
367 610 : )
368 635 : .await
369 610 : }
370 :
371 576 : async fn do_download_tenant_manifest(
372 576 : storage: &GenericRemoteStorage,
373 576 : tenant_shard_id: &TenantShardId,
374 576 : _timeline_id: Option<&TimelineId>,
375 576 : generation: Generation,
376 576 : cancel: &CancellationToken,
377 576 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
378 576 : let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
379 :
380 0 : let (manifest_bytes, manifest_bytes_mtime) =
381 576 : do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
382 :
383 0 : let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
384 0 : .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
385 0 : .map_err(DownloadError::Other)?;
386 :
387 0 : Ok((tenant_manifest, generation, manifest_bytes_mtime))
388 576 : }
389 :
390 34 : async fn do_download_index_part(
391 34 : storage: &GenericRemoteStorage,
392 34 : tenant_shard_id: &TenantShardId,
393 34 : timeline_id: Option<&TimelineId>,
394 34 : index_generation: Generation,
395 34 : cancel: &CancellationToken,
396 34 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
397 34 : let timeline_id =
398 34 : timeline_id.expect("A timeline ID is always provided when downloading an index");
399 34 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
400 :
401 20 : let (index_part_bytes, index_part_mtime) =
402 69 : do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
403 :
404 20 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
405 20 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
406 20 : .map_err(DownloadError::Other)?;
407 :
408 20 : Ok((index_part, index_generation, index_part_mtime))
409 34 : }
410 :
411 : /// Metadata objects are "generationed", meaning that they include a generation suffix. This
412 : /// function downloads the object with the highest generation <= `my_generation`.
413 : ///
414 : /// Data objects (layer files) also include a generation in their path, but there is no equivalent
415 : /// search process, because their reference from an index includes the generation.
416 : ///
417 : /// An expensive object listing operation is only done if necessary: the typical fast path is to issue two
418 : /// GET operations, one to our own generation (stale attachment case), and one to the immediately preceding
419 : /// generation (normal case when migrating/restarting). Only if both of these return 404 do we fall back
420 : /// to listing objects.
421 : ///
422 : /// * `my_generation`: the value of `[crate::tenant::Tenant::generation]`
423 : /// * `what`: for logging, what object are we downloading
424 : /// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
425 : /// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless
426 : /// `cancel`` has fired. This function does not do its own retries of GET operations, and relies
427 : /// on the function passed in to do so.
428 : /// * `parse_path`: parse a fully qualified remote storage path to get the generation of the object.
429 : #[allow(clippy::too_many_arguments)]
430 212 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
431 : pub(crate) async fn download_generation_object<'a, T, DF, DFF, PF>(
432 : storage: &'a GenericRemoteStorage,
433 : tenant_shard_id: &'a TenantShardId,
434 : timeline_id: Option<&'a TimelineId>,
435 : my_generation: Generation,
436 : what: &str,
437 : prefix: RemotePath,
438 : do_download: DF,
439 : parse_path: PF,
440 : cancel: &'a CancellationToken,
441 : ) -> Result<(T, Generation, SystemTime), DownloadError>
442 : where
443 : DF: Fn(
444 : &'a GenericRemoteStorage,
445 : &'a TenantShardId,
446 : Option<&'a TimelineId>,
447 : Generation,
448 : &'a CancellationToken,
449 : ) -> DFF,
450 : DFF: Future<Output = Result<(T, Generation, SystemTime), DownloadError>>,
451 : PF: Fn(RemotePath) -> Option<Generation>,
452 : T: 'static,
453 : {
454 : debug_assert_current_span_has_tenant_id();
455 :
456 : if my_generation.is_none() {
457 : // Operating without generations: just fetch the generation-less path
458 : return do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
459 : }
460 :
461 : // Stale case: If we were intentionally attached in a stale generation, the remote object may already
462 : // exist in our generation.
463 : //
464 : // This is an optimization to avoid doing the listing for the general case below.
465 : let res = do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
466 : match res {
467 : Ok(decoded) => {
468 : tracing::debug!("Found {what} from current generation (this is a stale attachment)");
469 : return Ok(decoded);
470 : }
471 : Err(DownloadError::NotFound) => {}
472 : Err(e) => return Err(e),
473 : };
474 :
475 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded the object
476 : // we are seeking in that generation. We may safely start from this index without doing a listing, because:
477 : // - We checked for current generation case above
478 : // - generations > my_generation are to be ignored
479 : // - any other objects that exist would have an older generation than `previous_gen`, and
480 : // we want to find the most recent object from a previous generation.
481 : //
482 : // This is an optimization to avoid doing the listing for the general case below.
483 : let res = do_download(
484 : storage,
485 : tenant_shard_id,
486 : timeline_id,
487 : my_generation.previous(),
488 : cancel,
489 : )
490 : .await;
491 : match res {
492 : Ok(decoded) => {
493 : tracing::debug!("Found {what} from previous generation");
494 : return Ok(decoded);
495 : }
496 : Err(DownloadError::NotFound) => {
497 : tracing::debug!("No {what} found from previous generation, falling back to listing");
498 : }
499 : Err(e) => {
500 : return Err(e);
501 : }
502 : }
503 :
504 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
505 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
506 : // to constructing a full index path with no generation, because the generation is a suffix.
507 : let paths = download_retry(
508 198 : || async {
509 198 : storage
510 198 : .list(Some(&prefix), ListingMode::NoDelimiter, None, cancel)
511 587 : .await
512 396 : },
513 : "list index_part files",
514 : cancel,
515 : )
516 : .await?
517 : .keys;
518 :
519 : // General case logic for which index to use: the latest index whose generation
520 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
521 : let max_previous_generation = paths
522 : .into_iter()
523 18 : .filter_map(|o| parse_path(o.key))
524 12 : .filter(|g| g <= &my_generation)
525 : .max();
526 :
527 : match max_previous_generation {
528 : Some(g) => {
529 : tracing::debug!("Found {what} in generation {g:?}");
530 : do_download(storage, tenant_shard_id, timeline_id, g, cancel).await
531 : }
532 : None => {
533 : // Migration from legacy pre-generation state: we have a generation but no prior
534 : // attached pageservers did. Try to load from a no-generation path.
535 : tracing::debug!("No {what}* found");
536 : do_download(
537 : storage,
538 : tenant_shard_id,
539 : timeline_id,
540 : Generation::none(),
541 : cancel,
542 : )
543 : .await
544 : }
545 : }
546 : }
547 :
548 : /// index_part.json objects are suffixed with a generation number, so we cannot
549 : /// directly GET the latest index part without doing some probing.
550 : ///
551 : /// In this function we probe for the most recent index in a generation <= our current generation.
552 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
553 20 : pub(crate) async fn download_index_part(
554 20 : storage: &GenericRemoteStorage,
555 20 : tenant_shard_id: &TenantShardId,
556 20 : timeline_id: &TimelineId,
557 20 : my_generation: Generation,
558 20 : cancel: &CancellationToken,
559 20 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
560 20 : debug_assert_current_span_has_tenant_and_timeline_id();
561 20 :
562 20 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
563 20 : download_generation_object(
564 20 : storage,
565 20 : tenant_shard_id,
566 20 : Some(timeline_id),
567 20 : my_generation,
568 20 : "index_part",
569 20 : index_prefix,
570 20 : do_download_index_part,
571 20 : parse_remote_index_path,
572 20 : cancel,
573 20 : )
574 93 : .await
575 20 : }
576 :
577 192 : pub(crate) async fn download_tenant_manifest(
578 192 : storage: &GenericRemoteStorage,
579 192 : tenant_shard_id: &TenantShardId,
580 192 : my_generation: Generation,
581 192 : cancel: &CancellationToken,
582 192 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
583 192 : let manifest_prefix = remote_tenant_manifest_prefix(tenant_shard_id);
584 192 :
585 192 : download_generation_object(
586 192 : storage,
587 192 : tenant_shard_id,
588 192 : None,
589 192 : my_generation,
590 192 : "tenant-manifest",
591 192 : manifest_prefix,
592 192 : do_download_tenant_manifest,
593 192 : parse_remote_tenant_manifest_path,
594 192 : cancel,
595 192 : )
596 1129 : .await
597 192 : }
598 :
599 2 : pub(crate) async fn download_initdb_tar_zst(
600 2 : conf: &'static PageServerConf,
601 2 : storage: &GenericRemoteStorage,
602 2 : tenant_shard_id: &TenantShardId,
603 2 : timeline_id: &TimelineId,
604 2 : cancel: &CancellationToken,
605 2 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
606 2 : debug_assert_current_span_has_tenant_and_timeline_id();
607 2 :
608 2 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
609 2 :
610 2 : let remote_preserved_path =
611 2 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
612 2 :
613 2 : let timeline_path = conf.timelines_path(tenant_shard_id);
614 2 :
615 2 : if !timeline_path.exists() {
616 0 : tokio::fs::create_dir_all(&timeline_path)
617 0 : .await
618 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
619 0 : .map_err(DownloadError::Other)?;
620 2 : }
621 2 : let temp_path = timeline_path.join(format!(
622 2 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
623 2 : ));
624 :
625 2 : let file = download_retry(
626 2 : || async {
627 2 : let file = OpenOptions::new()
628 2 : .create(true)
629 2 : .truncate(true)
630 2 : .read(true)
631 2 : .write(true)
632 2 : .open(&temp_path)
633 2 : .await
634 2 : .with_context(|| format!("tempfile creation {temp_path}"))
635 2 : .map_err(DownloadError::Other)?;
636 :
637 2 : let download = match storage
638 2 : .download(&remote_path, &DownloadOpts::default(), cancel)
639 4 : .await
640 : {
641 2 : Ok(dl) => dl,
642 : Err(DownloadError::NotFound) => {
643 0 : storage
644 0 : .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
645 0 : .await?
646 : }
647 0 : Err(other) => Err(other)?,
648 : };
649 2 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
650 2 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
651 2 :
652 417 : tokio::io::copy_buf(&mut download, &mut writer).await?;
653 :
654 2 : let mut file = writer.into_inner();
655 2 :
656 2 : file.seek(std::io::SeekFrom::Start(0))
657 1 : .await
658 2 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
659 2 : .map_err(DownloadError::Other)?;
660 :
661 2 : Ok(file)
662 4 : },
663 2 : &format!("download {remote_path}"),
664 2 : cancel,
665 2 : )
666 424 : .await
667 2 : .inspect_err(|_e| {
668 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
669 : // We don't have async here nor do we want to pile on any extra errors.
670 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
671 0 : if e.kind() != std::io::ErrorKind::NotFound {
672 0 : warn!("error deleting temporary file {temp_path}: {e}");
673 0 : }
674 0 : }
675 2 : })?;
676 :
677 2 : Ok((temp_path, file))
678 2 : }
679 :
680 : /// Helper function to handle retries for a download operation.
681 : ///
682 : /// Remote operations can fail due to rate limits (S3), spurious network
683 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
684 : /// with backoff.
685 : ///
686 : /// (See similar logic for uploads in `perform_upload_task`)
687 206 : pub(super) async fn download_retry<T, O, F>(
688 206 : op: O,
689 206 : description: &str,
690 206 : cancel: &CancellationToken,
691 206 : ) -> Result<T, DownloadError>
692 206 : where
693 206 : O: FnMut() -> F,
694 206 : F: Future<Output = Result<T, DownloadError>>,
695 206 : {
696 206 : backoff::retry(
697 206 : op,
698 206 : DownloadError::is_permanent,
699 206 : FAILED_DOWNLOAD_WARN_THRESHOLD,
700 206 : FAILED_REMOTE_OP_RETRIES,
701 206 : description,
702 206 : cancel,
703 206 : )
704 1074 : .await
705 206 : .ok_or_else(|| DownloadError::Cancelled)
706 206 : .and_then(|x| x)
707 206 : }
708 :
709 802 : async fn download_retry_forever<T, O, F>(
710 802 : op: O,
711 802 : description: &str,
712 802 : cancel: &CancellationToken,
713 802 : ) -> Result<T, DownloadError>
714 802 : where
715 802 : O: FnMut() -> F,
716 802 : F: Future<Output = Result<T, DownloadError>>,
717 802 : {
718 802 : backoff::retry(
719 802 : op,
720 802 : DownloadError::is_permanent,
721 802 : FAILED_DOWNLOAD_WARN_THRESHOLD,
722 802 : u32::MAX,
723 802 : description,
724 802 : cancel,
725 802 : )
726 1404 : .await
727 802 : .ok_or_else(|| DownloadError::Cancelled)
728 802 : .and_then(|x| x)
729 802 : }
|