Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::str::FromStr;
9 : use std::time::SystemTime;
10 :
11 : use anyhow::{Context, anyhow};
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use pageserver_api::shard::TenantShardId;
14 : use remote_storage::{
15 : DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
16 : };
17 : use tokio::fs::{self, File, OpenOptions};
18 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
19 : use tokio_util::io::StreamReader;
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{info_span, warn};
22 : use utils::crashsafe::path_with_suffix_extension;
23 : use utils::id::{TenantId, TimelineId};
24 : use utils::{backoff, pausable_failpoint};
25 :
26 : use super::index::{IndexPart, LayerFileMetadata};
27 : use super::manifest::TenantManifest;
28 : use super::{
29 : FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
30 : parse_remote_tenant_manifest_path, remote_index_path, remote_initdb_archive_path,
31 : remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
32 : remote_tenant_manifest_prefix, remote_tenant_path,
33 : };
34 : use crate::TEMP_FILE_SUFFIX;
35 : use crate::config::PageServerConf;
36 : use crate::context::RequestContext;
37 : use crate::span::{
38 : debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
39 : };
40 : use crate::tenant::Generation;
41 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
42 : use crate::tenant::storage_layer::LayerName;
43 : use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
44 :
45 : ///
46 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
47 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
48 : ///
49 : /// Returns the size of the downloaded file.
50 : #[allow(clippy::too_many_arguments)]
51 28 : pub async fn download_layer_file<'a>(
52 28 : conf: &'static PageServerConf,
53 28 : storage: &'a GenericRemoteStorage,
54 28 : tenant_shard_id: TenantShardId,
55 28 : timeline_id: TimelineId,
56 28 : layer_file_name: &'a LayerName,
57 28 : layer_metadata: &'a LayerFileMetadata,
58 28 : local_path: &Utf8Path,
59 28 : gate: &utils::sync::gate::Gate,
60 28 : cancel: &CancellationToken,
61 28 : ctx: &RequestContext,
62 28 : ) -> Result<u64, DownloadError> {
63 28 : debug_assert_current_span_has_tenant_and_timeline_id();
64 28 :
65 28 : let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
66 28 :
67 28 : let remote_path = remote_layer_path(
68 28 : &tenant_shard_id.tenant_id,
69 28 : &timeline_id,
70 28 : layer_metadata.shard,
71 28 : layer_file_name,
72 28 : layer_metadata.generation,
73 28 : );
74 28 :
75 28 : // Perform a rename inspired by durable_rename from file_utils.c.
76 28 : // The sequence:
77 28 : // write(tmp)
78 28 : // fsync(tmp)
79 28 : // rename(tmp, new)
80 28 : // fsync(new)
81 28 : // fsync(parent)
82 28 : // For more context about durable_rename check this email from postgres mailing list:
83 28 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
84 28 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
85 28 : let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
86 :
87 28 : let bytes_amount = download_retry(
88 28 : || async {
89 28 : download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
90 56 : },
91 28 : &format!("download {remote_path:?}"),
92 28 : cancel,
93 28 : )
94 28 : .await?;
95 :
96 28 : let expected = layer_metadata.file_size;
97 28 : if expected != bytes_amount {
98 0 : return Err(DownloadError::Other(anyhow!(
99 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
100 0 : )));
101 28 : }
102 28 :
103 28 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
104 0 : Err(DownloadError::Other(anyhow!(
105 0 : "remote-storage-download-pre-rename failpoint triggered"
106 0 : )))
107 28 : });
108 :
109 28 : fs::rename(&temp_file_path, &local_path)
110 28 : .await
111 28 : .with_context(|| format!("rename download layer file to {local_path}"))
112 28 : .map_err(DownloadError::Other)?;
113 :
114 : // We use fatal_err() below because the after the rename above,
115 : // the in-memory state of the filesystem already has the layer file in its final place,
116 : // and subsequent pageserver code could think it's durable while it really isn't.
117 28 : let work = {
118 28 : let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
119 28 : async move {
120 28 : let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
121 28 : .await
122 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
123 28 : timeline_dir
124 28 : .sync_all()
125 28 : .await
126 28 : .fatal_err("VirtualFile::sync_all timeline dir");
127 28 : }
128 : };
129 28 : crate::virtual_file::io_engine::get()
130 28 : .spawn_blocking_and_block_on_if_std(work)
131 28 : .await;
132 :
133 28 : tracing::debug!("download complete: {local_path}");
134 :
135 28 : Ok(bytes_amount)
136 28 : }
137 :
138 : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
139 : ///
140 : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
141 : /// (Note that the directory entry for the inode is not made durable.)
142 : /// The file size in bytes is returned.
143 : ///
144 : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
145 : /// The unlinking has _not_ been made durable.
146 28 : async fn download_object(
147 28 : storage: &GenericRemoteStorage,
148 28 : src_path: &RemotePath,
149 28 : dst_path: &Utf8PathBuf,
150 28 : #[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
151 28 : cancel: &CancellationToken,
152 28 : #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
153 28 : ) -> Result<u64, DownloadError> {
154 28 : let res = match crate::virtual_file::io_engine::get() {
155 0 : crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
156 : crate::virtual_file::io_engine::IoEngine::StdFs => {
157 14 : async {
158 14 : let destination_file = tokio::fs::File::create(dst_path)
159 14 : .await
160 14 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
161 14 : .map_err(DownloadError::Other)?;
162 :
163 14 : let download = storage
164 14 : .download(src_path, &DownloadOpts::default(), cancel)
165 14 : .await?;
166 :
167 14 : pausable_failpoint!("before-downloading-layer-stream-pausable");
168 :
169 14 : let mut buf_writer =
170 14 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
171 14 :
172 14 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
173 :
174 14 : let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
175 14 : buf_writer.flush().await?;
176 :
177 14 : let mut destination_file = buf_writer.into_inner();
178 14 :
179 14 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
180 14 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
181 14 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
182 14 : // you should call flush before dropping it.
183 14 : //
184 14 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
185 14 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
186 14 : // But for additional safety lets check/wait for any pending operations.
187 14 : destination_file
188 14 : .flush()
189 14 : .await
190 14 : .maybe_fatal_err("download_object sync_all")
191 14 : .with_context(|| format!("flush source file at {dst_path}"))
192 14 : .map_err(DownloadError::Other)?;
193 :
194 : // not using sync_data because it can lose file size update
195 14 : destination_file
196 14 : .sync_all()
197 14 : .await
198 14 : .maybe_fatal_err("download_object sync_all")
199 14 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
200 14 : .map_err(DownloadError::Other)?;
201 :
202 14 : Ok(bytes_amount)
203 14 : }
204 14 : .await
205 : }
206 : #[cfg(target_os = "linux")]
207 : crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
208 : use std::sync::Arc;
209 :
210 : use crate::virtual_file::{IoBufferMut, owned_buffers_io};
211 14 : async {
212 14 : let destination_file = Arc::new(
213 14 : VirtualFile::create(dst_path, ctx)
214 14 : .await
215 14 : .with_context(|| {
216 0 : format!("create a destination file for layer '{dst_path}'")
217 14 : })
218 14 : .map_err(DownloadError::Other)?,
219 : );
220 :
221 14 : let mut download = storage
222 14 : .download(src_path, &DownloadOpts::default(), cancel)
223 14 : .await?;
224 :
225 14 : pausable_failpoint!("before-downloading-layer-stream-pausable");
226 :
227 14 : let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
228 14 : destination_file,
229 28 : || IoBufferMut::with_capacity(super::BUFFER_SIZE),
230 14 : gate.enter().map_err(|_| DownloadError::Cancelled)?,
231 14 : ctx,
232 14 : info_span!(parent: None, "download_object_buffered_writer", %dst_path),
233 : );
234 :
235 : // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
236 : // There's chunks_vectored() on the stream.
237 14 : let (bytes_amount, destination_file) = async {
238 84 : while let Some(res) =
239 98 : futures::StreamExt::next(&mut download.download_stream).await
240 : {
241 84 : let chunk = match res {
242 84 : Ok(chunk) => chunk,
243 0 : Err(e) => return Err(e),
244 : };
245 84 : buffered.write_buffered_borrowed(&chunk, ctx).await?;
246 : }
247 14 : let inner = buffered.flush_and_into_inner(ctx).await?;
248 14 : Ok(inner)
249 14 : }
250 14 : .await?;
251 :
252 : // not using sync_data because it can lose file size update
253 14 : destination_file
254 14 : .sync_all()
255 14 : .await
256 14 : .maybe_fatal_err("download_object sync_all")
257 14 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
258 14 : .map_err(DownloadError::Other)?;
259 :
260 14 : Ok(bytes_amount)
261 14 : }
262 14 : .await
263 : }
264 : };
265 :
266 : // in case the download failed, clean up
267 28 : match res {
268 28 : Ok(bytes_amount) => Ok(bytes_amount),
269 0 : Err(e) => {
270 0 : if let Err(e) = tokio::fs::remove_file(dst_path).await {
271 0 : if e.kind() != std::io::ErrorKind::NotFound {
272 0 : on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
273 0 : }
274 0 : }
275 0 : Err(e)
276 : }
277 : }
278 28 : }
279 :
280 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
281 :
282 0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
283 0 : let extension = path.extension();
284 0 : match extension {
285 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
286 0 : Some(_) => false,
287 0 : None => false,
288 : }
289 0 : }
290 :
291 452 : async fn list_identifiers<T>(
292 452 : storage: &GenericRemoteStorage,
293 452 : prefix: RemotePath,
294 452 : cancel: CancellationToken,
295 452 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
296 452 : where
297 452 : T: FromStr + Eq + std::hash::Hash,
298 452 : {
299 452 : let listing = download_retry_forever(
300 452 : || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
301 452 : &format!("list identifiers in prefix {prefix}"),
302 452 : &cancel,
303 452 : )
304 452 : .await?;
305 :
306 452 : let mut parsed_ids = HashSet::new();
307 452 : let mut other_prefixes = HashSet::new();
308 :
309 464 : for id_remote_storage_key in listing.prefixes {
310 12 : let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
311 0 : anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
312 12 : })?;
313 :
314 12 : match object_name.parse::<T>() {
315 12 : Ok(t) => parsed_ids.insert(t),
316 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
317 : };
318 : }
319 :
320 452 : for object in listing.keys {
321 0 : let object_name = object
322 0 : .key
323 0 : .object_name()
324 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
325 0 : other_prefixes.insert(object_name.to_string());
326 : }
327 :
328 452 : Ok((parsed_ids, other_prefixes))
329 452 : }
330 :
331 : /// List shards of given tenant in remote storage
332 0 : pub(crate) async fn list_remote_tenant_shards(
333 0 : storage: &GenericRemoteStorage,
334 0 : tenant_id: TenantId,
335 0 : cancel: CancellationToken,
336 0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
337 0 : let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
338 0 : list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
339 0 : }
340 :
341 : /// List timelines of given tenant shard in remote storage
342 452 : pub async fn list_remote_timelines(
343 452 : storage: &GenericRemoteStorage,
344 452 : tenant_shard_id: TenantShardId,
345 452 : cancel: CancellationToken,
346 452 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
347 452 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
348 0 : anyhow::bail!("storage-sync-list-remote-timelines");
349 452 : });
350 :
351 452 : let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
352 452 : list_identifiers::<TimelineId>(storage, remote_path, cancel).await
353 452 : }
354 :
355 1424 : async fn do_download_remote_path_retry_forever(
356 1424 : storage: &GenericRemoteStorage,
357 1424 : remote_path: &RemotePath,
358 1424 : download_opts: DownloadOpts,
359 1424 : cancel: &CancellationToken,
360 1424 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
361 1424 : download_retry_forever(
362 1424 : || async {
363 1424 : let download = storage
364 1424 : .download(remote_path, &download_opts, cancel)
365 1424 : .await?;
366 :
367 40 : let mut bytes = Vec::new();
368 40 :
369 40 : let stream = download.download_stream;
370 40 : let mut stream = StreamReader::new(stream);
371 40 :
372 40 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
373 :
374 40 : Ok((bytes, download.last_modified))
375 2848 : },
376 1424 : &format!("download {remote_path:?}"),
377 1424 : cancel,
378 1424 : )
379 1424 : .await
380 1424 : }
381 :
382 1356 : async fn do_download_tenant_manifest(
383 1356 : storage: &GenericRemoteStorage,
384 1356 : tenant_shard_id: &TenantShardId,
385 1356 : _timeline_id: Option<&TimelineId>,
386 1356 : generation: Generation,
387 1356 : cancel: &CancellationToken,
388 1356 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
389 1356 : let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
390 1356 :
391 1356 : let download_opts = DownloadOpts {
392 1356 : kind: DownloadKind::Small,
393 1356 : ..Default::default()
394 1356 : };
395 :
396 0 : let (manifest_bytes, manifest_bytes_mtime) =
397 1356 : do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
398 :
399 0 : let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
400 0 : .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
401 0 : .map_err(DownloadError::Other)?;
402 :
403 0 : Ok((tenant_manifest, generation, manifest_bytes_mtime))
404 1356 : }
405 :
406 68 : async fn do_download_index_part(
407 68 : storage: &GenericRemoteStorage,
408 68 : tenant_shard_id: &TenantShardId,
409 68 : timeline_id: Option<&TimelineId>,
410 68 : index_generation: Generation,
411 68 : cancel: &CancellationToken,
412 68 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
413 68 : let timeline_id =
414 68 : timeline_id.expect("A timeline ID is always provided when downloading an index");
415 68 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
416 68 :
417 68 : let download_opts = DownloadOpts {
418 68 : kind: DownloadKind::Small,
419 68 : ..Default::default()
420 68 : };
421 :
422 40 : let (index_part_bytes, index_part_mtime) =
423 68 : do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
424 :
425 40 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
426 40 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
427 40 : .map_err(DownloadError::Other)?;
428 :
429 40 : Ok((index_part, index_generation, index_part_mtime))
430 68 : }
431 :
432 : /// Metadata objects are "generationed", meaning that they include a generation suffix. This
433 : /// function downloads the object with the highest generation <= `my_generation`.
434 : ///
435 : /// Data objects (layer files) also include a generation in their path, but there is no equivalent
436 : /// search process, because their reference from an index includes the generation.
437 : ///
438 : /// An expensive object listing operation is only done if necessary: the typical fast path is to issue two
439 : /// GET operations, one to our own generation (stale attachment case), and one to the immediately preceding
440 : /// generation (normal case when migrating/restarting). Only if both of these return 404 do we fall back
441 : /// to listing objects.
442 : ///
443 : /// * `my_generation`: the value of `[crate::tenant::Tenant::generation]`
444 : /// * `what`: for logging, what object are we downloading
445 : /// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
446 : /// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless
447 : /// `cancel`` has fired. This function does not do its own retries of GET operations, and relies
448 : /// on the function passed in to do so.
449 : /// * `parse_path`: parse a fully qualified remote storage path to get the generation of the object.
450 : #[allow(clippy::too_many_arguments)]
451 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
452 : pub(crate) async fn download_generation_object<'a, T, DF, DFF, PF>(
453 : storage: &'a GenericRemoteStorage,
454 : tenant_shard_id: &'a TenantShardId,
455 : timeline_id: Option<&'a TimelineId>,
456 : my_generation: Generation,
457 : what: &str,
458 : prefix: RemotePath,
459 : do_download: DF,
460 : parse_path: PF,
461 : cancel: &'a CancellationToken,
462 : ) -> Result<(T, Generation, SystemTime), DownloadError>
463 : where
464 : DF: Fn(
465 : &'a GenericRemoteStorage,
466 : &'a TenantShardId,
467 : Option<&'a TimelineId>,
468 : Generation,
469 : &'a CancellationToken,
470 : ) -> DFF,
471 : DFF: Future<Output = Result<(T, Generation, SystemTime), DownloadError>>,
472 : PF: Fn(RemotePath) -> Option<Generation>,
473 : T: 'static,
474 : {
475 : debug_assert_current_span_has_tenant_id();
476 :
477 : if my_generation.is_none() {
478 : // Operating without generations: just fetch the generation-less path
479 : return do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
480 : }
481 :
482 : // Stale case: If we were intentionally attached in a stale generation, the remote object may already
483 : // exist in our generation.
484 : //
485 : // This is an optimization to avoid doing the listing for the general case below.
486 : let res = do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
487 : match res {
488 : Ok(decoded) => {
489 : tracing::debug!("Found {what} from current generation (this is a stale attachment)");
490 : return Ok(decoded);
491 : }
492 : Err(DownloadError::NotFound) => {}
493 : Err(e) => return Err(e),
494 : };
495 :
496 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded the object
497 : // we are seeking in that generation. We may safely start from this index without doing a listing, because:
498 : // - We checked for current generation case above
499 : // - generations > my_generation are to be ignored
500 : // - any other objects that exist would have an older generation than `previous_gen`, and
501 : // we want to find the most recent object from a previous generation.
502 : //
503 : // This is an optimization to avoid doing the listing for the general case below.
504 : let res = do_download(
505 : storage,
506 : tenant_shard_id,
507 : timeline_id,
508 : my_generation.previous(),
509 : cancel,
510 : )
511 : .await;
512 : match res {
513 : Ok(decoded) => {
514 : tracing::debug!("Found {what} from previous generation");
515 : return Ok(decoded);
516 : }
517 : Err(DownloadError::NotFound) => {
518 : tracing::debug!("No {what} found from previous generation, falling back to listing");
519 : }
520 : Err(e) => {
521 : return Err(e);
522 : }
523 : }
524 :
525 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
526 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
527 : // to constructing a full index path with no generation, because the generation is a suffix.
528 : let paths = download_retry(
529 464 : || async {
530 464 : storage
531 464 : .list(Some(&prefix), ListingMode::NoDelimiter, None, cancel)
532 464 : .await
533 928 : },
534 : "list index_part files",
535 : cancel,
536 : )
537 : .await?
538 : .keys;
539 :
540 : // General case logic for which index to use: the latest index whose generation
541 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
542 : let max_previous_generation = paths
543 : .into_iter()
544 36 : .filter_map(|o| parse_path(o.key))
545 24 : .filter(|g| g <= &my_generation)
546 : .max();
547 :
548 : match max_previous_generation {
549 : Some(g) => {
550 : tracing::debug!("Found {what} in generation {g:?}");
551 : do_download(storage, tenant_shard_id, timeline_id, g, cancel).await
552 : }
553 : None => {
554 : // Migration from legacy pre-generation state: we have a generation but no prior
555 : // attached pageservers did. Try to load from a no-generation path.
556 : tracing::debug!("No {what}* found");
557 : do_download(
558 : storage,
559 : tenant_shard_id,
560 : timeline_id,
561 : Generation::none(),
562 : cancel,
563 : )
564 : .await
565 : }
566 : }
567 : }
568 :
569 : /// index_part.json objects are suffixed with a generation number, so we cannot
570 : /// directly GET the latest index part without doing some probing.
571 : ///
572 : /// In this function we probe for the most recent index in a generation <= our current generation.
573 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
574 40 : pub(crate) async fn download_index_part(
575 40 : storage: &GenericRemoteStorage,
576 40 : tenant_shard_id: &TenantShardId,
577 40 : timeline_id: &TimelineId,
578 40 : my_generation: Generation,
579 40 : cancel: &CancellationToken,
580 40 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
581 40 : debug_assert_current_span_has_tenant_and_timeline_id();
582 40 :
583 40 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
584 40 : download_generation_object(
585 40 : storage,
586 40 : tenant_shard_id,
587 40 : Some(timeline_id),
588 40 : my_generation,
589 40 : "index_part",
590 40 : index_prefix,
591 40 : do_download_index_part,
592 40 : parse_remote_index_path,
593 40 : cancel,
594 40 : )
595 40 : .await
596 40 : }
597 :
598 452 : pub(crate) async fn download_tenant_manifest(
599 452 : storage: &GenericRemoteStorage,
600 452 : tenant_shard_id: &TenantShardId,
601 452 : my_generation: Generation,
602 452 : cancel: &CancellationToken,
603 452 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
604 452 : let manifest_prefix = remote_tenant_manifest_prefix(tenant_shard_id);
605 452 :
606 452 : download_generation_object(
607 452 : storage,
608 452 : tenant_shard_id,
609 452 : None,
610 452 : my_generation,
611 452 : "tenant-manifest",
612 452 : manifest_prefix,
613 452 : do_download_tenant_manifest,
614 452 : parse_remote_tenant_manifest_path,
615 452 : cancel,
616 452 : )
617 452 : .await
618 452 : }
619 :
620 4 : pub(crate) async fn download_initdb_tar_zst(
621 4 : conf: &'static PageServerConf,
622 4 : storage: &GenericRemoteStorage,
623 4 : tenant_shard_id: &TenantShardId,
624 4 : timeline_id: &TimelineId,
625 4 : cancel: &CancellationToken,
626 4 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
627 4 : debug_assert_current_span_has_tenant_and_timeline_id();
628 4 :
629 4 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
630 4 :
631 4 : let remote_preserved_path =
632 4 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
633 4 :
634 4 : let timeline_path = conf.timelines_path(tenant_shard_id);
635 4 :
636 4 : if !timeline_path.exists() {
637 0 : tokio::fs::create_dir_all(&timeline_path)
638 0 : .await
639 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
640 0 : .map_err(DownloadError::Other)?;
641 4 : }
642 4 : let temp_path = timeline_path.join(format!(
643 4 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
644 4 : ));
645 :
646 4 : let file = download_retry(
647 4 : || async {
648 4 : let file = OpenOptions::new()
649 4 : .create(true)
650 4 : .truncate(true)
651 4 : .read(true)
652 4 : .write(true)
653 4 : .open(&temp_path)
654 4 : .await
655 4 : .with_context(|| format!("tempfile creation {temp_path}"))
656 4 : .map_err(DownloadError::Other)?;
657 :
658 4 : let download = match storage
659 4 : .download(&remote_path, &DownloadOpts::default(), cancel)
660 4 : .await
661 : {
662 4 : Ok(dl) => dl,
663 : Err(DownloadError::NotFound) => {
664 0 : storage
665 0 : .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
666 0 : .await?
667 : }
668 0 : Err(other) => Err(other)?,
669 : };
670 4 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
671 4 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
672 4 :
673 4 : tokio::io::copy_buf(&mut download, &mut writer).await?;
674 :
675 4 : let mut file = writer.into_inner();
676 4 :
677 4 : file.seek(std::io::SeekFrom::Start(0))
678 4 : .await
679 4 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
680 4 : .map_err(DownloadError::Other)?;
681 :
682 4 : Ok(file)
683 8 : },
684 4 : &format!("download {remote_path}"),
685 4 : cancel,
686 4 : )
687 4 : .await
688 4 : .inspect_err(|_e| {
689 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
690 : // We don't have async here nor do we want to pile on any extra errors.
691 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
692 0 : if e.kind() != std::io::ErrorKind::NotFound {
693 0 : warn!("error deleting temporary file {temp_path}: {e}");
694 0 : }
695 0 : }
696 4 : })?;
697 :
698 4 : Ok((temp_path, file))
699 4 : }
700 :
701 : /// Helper function to handle retries for a download operation.
702 : ///
703 : /// Remote operations can fail due to rate limits (S3), spurious network
704 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
705 : /// with backoff.
706 : ///
707 : /// (See similar logic for uploads in `perform_upload_task`)
708 496 : pub(super) async fn download_retry<T, O, F>(
709 496 : op: O,
710 496 : description: &str,
711 496 : cancel: &CancellationToken,
712 496 : ) -> Result<T, DownloadError>
713 496 : where
714 496 : O: FnMut() -> F,
715 496 : F: Future<Output = Result<T, DownloadError>>,
716 496 : {
717 496 : backoff::retry(
718 496 : op,
719 496 : DownloadError::is_permanent,
720 496 : FAILED_DOWNLOAD_WARN_THRESHOLD,
721 496 : FAILED_REMOTE_OP_RETRIES,
722 496 : description,
723 496 : cancel,
724 496 : )
725 496 : .await
726 496 : .ok_or_else(|| DownloadError::Cancelled)
727 496 : .and_then(|x| x)
728 496 : }
729 :
730 1876 : pub(crate) async fn download_retry_forever<T, O, F>(
731 1876 : op: O,
732 1876 : description: &str,
733 1876 : cancel: &CancellationToken,
734 1876 : ) -> Result<T, DownloadError>
735 1876 : where
736 1876 : O: FnMut() -> F,
737 1876 : F: Future<Output = Result<T, DownloadError>>,
738 1876 : {
739 1876 : backoff::retry(
740 1876 : op,
741 1876 : DownloadError::is_permanent,
742 1876 : FAILED_DOWNLOAD_WARN_THRESHOLD,
743 1876 : u32::MAX,
744 1876 : description,
745 1876 : cancel,
746 1876 : )
747 1876 : .await
748 1876 : .ok_or_else(|| DownloadError::Cancelled)
749 1876 : .and_then(|x| x)
750 1876 : }
|