Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::str::FromStr;
9 : use std::time::SystemTime;
10 :
11 : use anyhow::{anyhow, Context};
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use pageserver_api::shard::TenantShardId;
14 : use tokio::fs::{self, File, OpenOptions};
15 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
16 : use tokio_util::io::StreamReader;
17 : use tokio_util::sync::CancellationToken;
18 : use tracing::warn;
19 : use utils::backoff;
20 :
21 : use crate::config::PageServerConf;
22 : use crate::context::RequestContext;
23 : use crate::span::{
24 : debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
25 : };
26 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
27 : use crate::tenant::storage_layer::LayerName;
28 : use crate::tenant::Generation;
29 : use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
30 : use crate::TEMP_FILE_SUFFIX;
31 : use remote_storage::{
32 : DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
33 : };
34 : use utils::crashsafe::path_with_suffix_extension;
35 : use utils::id::{TenantId, TimelineId};
36 : use utils::pausable_failpoint;
37 :
38 : use super::index::{IndexPart, LayerFileMetadata};
39 : use super::manifest::TenantManifest;
40 : use super::{
41 : parse_remote_index_path, parse_remote_tenant_manifest_path, remote_index_path,
42 : remote_initdb_archive_path, remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
43 : remote_tenant_manifest_prefix, remote_tenant_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
44 : FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
45 : };
46 :
47 : ///
48 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
49 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
50 : ///
51 : /// Returns the size of the downloaded file.
52 : #[allow(clippy::too_many_arguments)]
53 29 : pub async fn download_layer_file<'a>(
54 29 : conf: &'static PageServerConf,
55 29 : storage: &'a GenericRemoteStorage,
56 29 : tenant_shard_id: TenantShardId,
57 29 : timeline_id: TimelineId,
58 29 : layer_file_name: &'a LayerName,
59 29 : layer_metadata: &'a LayerFileMetadata,
60 29 : local_path: &Utf8Path,
61 29 : gate: &utils::sync::gate::Gate,
62 29 : cancel: &CancellationToken,
63 29 : ctx: &RequestContext,
64 29 : ) -> Result<u64, DownloadError> {
65 29 : debug_assert_current_span_has_tenant_and_timeline_id();
66 29 :
67 29 : let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
68 29 :
69 29 : let remote_path = remote_layer_path(
70 29 : &tenant_shard_id.tenant_id,
71 29 : &timeline_id,
72 29 : layer_metadata.shard,
73 29 : layer_file_name,
74 29 : layer_metadata.generation,
75 29 : );
76 29 :
77 29 : // Perform a rename inspired by durable_rename from file_utils.c.
78 29 : // The sequence:
79 29 : // write(tmp)
80 29 : // fsync(tmp)
81 29 : // rename(tmp, new)
82 29 : // fsync(new)
83 29 : // fsync(parent)
84 29 : // For more context about durable_rename check this email from postgres mailing list:
85 29 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
86 29 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
87 29 : let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
88 :
89 29 : let bytes_amount = download_retry(
90 29 : || async {
91 29 : download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
92 58 : },
93 29 : &format!("download {remote_path:?}"),
94 29 : cancel,
95 29 : )
96 29 : .await?;
97 :
98 29 : let expected = layer_metadata.file_size;
99 29 : if expected != bytes_amount {
100 0 : return Err(DownloadError::Other(anyhow!(
101 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
102 0 : )));
103 29 : }
104 29 :
105 29 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
106 0 : Err(DownloadError::Other(anyhow!(
107 0 : "remote-storage-download-pre-rename failpoint triggered"
108 0 : )))
109 29 : });
110 :
111 29 : fs::rename(&temp_file_path, &local_path)
112 29 : .await
113 29 : .with_context(|| format!("rename download layer file to {local_path}"))
114 29 : .map_err(DownloadError::Other)?;
115 :
116 : // We use fatal_err() below because the after the rename above,
117 : // the in-memory state of the filesystem already has the layer file in its final place,
118 : // and subsequent pageserver code could think it's durable while it really isn't.
119 29 : let work = {
120 29 : let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
121 29 : async move {
122 29 : let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
123 29 : .await
124 29 : .fatal_err("VirtualFile::open for timeline dir fsync");
125 29 : timeline_dir
126 29 : .sync_all()
127 29 : .await
128 29 : .fatal_err("VirtualFile::sync_all timeline dir");
129 29 : }
130 : };
131 29 : crate::virtual_file::io_engine::get()
132 29 : .spawn_blocking_and_block_on_if_std(work)
133 29 : .await;
134 :
135 29 : tracing::debug!("download complete: {local_path}");
136 :
137 29 : Ok(bytes_amount)
138 29 : }
139 :
140 : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
141 : ///
142 : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
143 : /// (Note that the directory entry for the inode is not made durable.)
144 : /// The file size in bytes is returned.
145 : ///
146 : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
147 : /// The unlinking has _not_ been made durable.
148 29 : async fn download_object(
149 29 : storage: &GenericRemoteStorage,
150 29 : src_path: &RemotePath,
151 29 : dst_path: &Utf8PathBuf,
152 29 : #[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
153 29 : cancel: &CancellationToken,
154 29 : #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
155 29 : ) -> Result<u64, DownloadError> {
156 29 : let res = match crate::virtual_file::io_engine::get() {
157 0 : crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
158 : crate::virtual_file::io_engine::IoEngine::StdFs => {
159 15 : async {
160 15 : let destination_file = tokio::fs::File::create(dst_path)
161 15 : .await
162 15 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
163 15 : .map_err(DownloadError::Other)?;
164 :
165 15 : let download = storage
166 15 : .download(src_path, &DownloadOpts::default(), cancel)
167 15 : .await?;
168 :
169 15 : pausable_failpoint!("before-downloading-layer-stream-pausable");
170 :
171 15 : let mut buf_writer =
172 15 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
173 15 :
174 15 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
175 :
176 15 : let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
177 15 : buf_writer.flush().await?;
178 :
179 15 : let mut destination_file = buf_writer.into_inner();
180 15 :
181 15 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
182 15 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
183 15 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
184 15 : // you should call flush before dropping it.
185 15 : //
186 15 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
187 15 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
188 15 : // But for additional safety lets check/wait for any pending operations.
189 15 : destination_file
190 15 : .flush()
191 15 : .await
192 15 : .maybe_fatal_err("download_object sync_all")
193 15 : .with_context(|| format!("flush source file at {dst_path}"))
194 15 : .map_err(DownloadError::Other)?;
195 :
196 : // not using sync_data because it can lose file size update
197 15 : destination_file
198 15 : .sync_all()
199 15 : .await
200 15 : .maybe_fatal_err("download_object sync_all")
201 15 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
202 15 : .map_err(DownloadError::Other)?;
203 :
204 15 : Ok(bytes_amount)
205 15 : }
206 15 : .await
207 : }
208 : #[cfg(target_os = "linux")]
209 : crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
210 : use crate::virtual_file::owned_buffers_io;
211 : use crate::virtual_file::IoBufferMut;
212 : use std::sync::Arc;
213 14 : async {
214 14 : let destination_file = Arc::new(
215 14 : VirtualFile::create(dst_path, ctx)
216 14 : .await
217 14 : .with_context(|| {
218 0 : format!("create a destination file for layer '{dst_path}'")
219 14 : })
220 14 : .map_err(DownloadError::Other)?,
221 : );
222 :
223 14 : let mut download = storage
224 14 : .download(src_path, &DownloadOpts::default(), cancel)
225 14 : .await?;
226 :
227 14 : pausable_failpoint!("before-downloading-layer-stream-pausable");
228 :
229 14 : let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
230 14 : destination_file,
231 28 : || IoBufferMut::with_capacity(super::BUFFER_SIZE),
232 14 : gate.enter().map_err(|_| DownloadError::Cancelled)?,
233 14 : ctx,
234 : );
235 :
236 : // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
237 : // There's chunks_vectored() on the stream.
238 14 : let (bytes_amount, destination_file) = async {
239 84 : while let Some(res) =
240 98 : futures::StreamExt::next(&mut download.download_stream).await
241 : {
242 84 : let chunk = match res {
243 84 : Ok(chunk) => chunk,
244 0 : Err(e) => return Err(e),
245 : };
246 84 : buffered.write_buffered_borrowed(&chunk, ctx).await?;
247 : }
248 14 : let inner = buffered.flush_and_into_inner(ctx).await?;
249 14 : Ok(inner)
250 14 : }
251 14 : .await?;
252 :
253 : // not using sync_data because it can lose file size update
254 14 : destination_file
255 14 : .sync_all()
256 14 : .await
257 14 : .maybe_fatal_err("download_object sync_all")
258 14 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
259 14 : .map_err(DownloadError::Other)?;
260 :
261 14 : Ok(bytes_amount)
262 14 : }
263 14 : .await
264 : }
265 : };
266 :
267 : // in case the download failed, clean up
268 29 : match res {
269 29 : Ok(bytes_amount) => Ok(bytes_amount),
270 0 : Err(e) => {
271 0 : if let Err(e) = tokio::fs::remove_file(dst_path).await {
272 0 : if e.kind() != std::io::ErrorKind::NotFound {
273 0 : on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
274 0 : }
275 0 : }
276 0 : Err(e)
277 : }
278 : }
279 29 : }
280 :
281 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
282 :
283 0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
284 0 : let extension = path.extension();
285 0 : match extension {
286 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
287 0 : Some(_) => false,
288 0 : None => false,
289 : }
290 0 : }
291 :
292 445 : async fn list_identifiers<T>(
293 445 : storage: &GenericRemoteStorage,
294 445 : prefix: RemotePath,
295 445 : cancel: CancellationToken,
296 445 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
297 445 : where
298 445 : T: FromStr + Eq + std::hash::Hash,
299 445 : {
300 445 : let listing = download_retry_forever(
301 445 : || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
302 445 : &format!("list identifiers in prefix {prefix}"),
303 445 : &cancel,
304 445 : )
305 445 : .await?;
306 :
307 445 : let mut parsed_ids = HashSet::new();
308 445 : let mut other_prefixes = HashSet::new();
309 :
310 457 : for id_remote_storage_key in listing.prefixes {
311 12 : let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
312 0 : anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
313 12 : })?;
314 :
315 12 : match object_name.parse::<T>() {
316 12 : Ok(t) => parsed_ids.insert(t),
317 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
318 : };
319 : }
320 :
321 445 : for object in listing.keys {
322 0 : let object_name = object
323 0 : .key
324 0 : .object_name()
325 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
326 0 : other_prefixes.insert(object_name.to_string());
327 : }
328 :
329 445 : Ok((parsed_ids, other_prefixes))
330 445 : }
331 :
332 : /// List shards of given tenant in remote storage
333 0 : pub(crate) async fn list_remote_tenant_shards(
334 0 : storage: &GenericRemoteStorage,
335 0 : tenant_id: TenantId,
336 0 : cancel: CancellationToken,
337 0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
338 0 : let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
339 0 : list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
340 0 : }
341 :
342 : /// List timelines of given tenant shard in remote storage
343 445 : pub async fn list_remote_timelines(
344 445 : storage: &GenericRemoteStorage,
345 445 : tenant_shard_id: TenantShardId,
346 445 : cancel: CancellationToken,
347 445 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
348 445 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
349 0 : anyhow::bail!("storage-sync-list-remote-timelines");
350 445 : });
351 :
352 445 : let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
353 445 : list_identifiers::<TimelineId>(storage, remote_path, cancel).await
354 445 : }
355 :
356 1403 : async fn do_download_remote_path_retry_forever(
357 1403 : storage: &GenericRemoteStorage,
358 1403 : remote_path: &RemotePath,
359 1403 : download_opts: DownloadOpts,
360 1403 : cancel: &CancellationToken,
361 1403 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
362 1403 : download_retry_forever(
363 1403 : || async {
364 1403 : let download = storage
365 1403 : .download(remote_path, &download_opts, cancel)
366 1403 : .await?;
367 :
368 40 : let mut bytes = Vec::new();
369 40 :
370 40 : let stream = download.download_stream;
371 40 : let mut stream = StreamReader::new(stream);
372 40 :
373 40 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
374 :
375 40 : Ok((bytes, download.last_modified))
376 2806 : },
377 1403 : &format!("download {remote_path:?}"),
378 1403 : cancel,
379 1403 : )
380 1403 : .await
381 1403 : }
382 :
383 1335 : async fn do_download_tenant_manifest(
384 1335 : storage: &GenericRemoteStorage,
385 1335 : tenant_shard_id: &TenantShardId,
386 1335 : _timeline_id: Option<&TimelineId>,
387 1335 : generation: Generation,
388 1335 : cancel: &CancellationToken,
389 1335 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
390 1335 : let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
391 1335 :
392 1335 : let download_opts = DownloadOpts {
393 1335 : kind: DownloadKind::Small,
394 1335 : ..Default::default()
395 1335 : };
396 :
397 0 : let (manifest_bytes, manifest_bytes_mtime) =
398 1335 : do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
399 :
400 0 : let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
401 0 : .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
402 0 : .map_err(DownloadError::Other)?;
403 :
404 0 : Ok((tenant_manifest, generation, manifest_bytes_mtime))
405 1335 : }
406 :
407 68 : async fn do_download_index_part(
408 68 : storage: &GenericRemoteStorage,
409 68 : tenant_shard_id: &TenantShardId,
410 68 : timeline_id: Option<&TimelineId>,
411 68 : index_generation: Generation,
412 68 : cancel: &CancellationToken,
413 68 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
414 68 : let timeline_id =
415 68 : timeline_id.expect("A timeline ID is always provided when downloading an index");
416 68 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
417 68 :
418 68 : let download_opts = DownloadOpts {
419 68 : kind: DownloadKind::Small,
420 68 : ..Default::default()
421 68 : };
422 :
423 40 : let (index_part_bytes, index_part_mtime) =
424 68 : do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
425 :
426 40 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
427 40 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
428 40 : .map_err(DownloadError::Other)?;
429 :
430 40 : Ok((index_part, index_generation, index_part_mtime))
431 68 : }
432 :
433 : /// Metadata objects are "generationed", meaning that they include a generation suffix. This
434 : /// function downloads the object with the highest generation <= `my_generation`.
435 : ///
436 : /// Data objects (layer files) also include a generation in their path, but there is no equivalent
437 : /// search process, because their reference from an index includes the generation.
438 : ///
439 : /// An expensive object listing operation is only done if necessary: the typical fast path is to issue two
440 : /// GET operations, one to our own generation (stale attachment case), and one to the immediately preceding
441 : /// generation (normal case when migrating/restarting). Only if both of these return 404 do we fall back
442 : /// to listing objects.
443 : ///
444 : /// * `my_generation`: the value of `[crate::tenant::Tenant::generation]`
445 : /// * `what`: for logging, what object are we downloading
446 : /// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
447 : /// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless
448 : /// `cancel`` has fired. This function does not do its own retries of GET operations, and relies
449 : /// on the function passed in to do so.
450 : /// * `parse_path`: parse a fully qualified remote storage path to get the generation of the object.
451 : #[allow(clippy::too_many_arguments)]
452 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
453 : pub(crate) async fn download_generation_object<'a, T, DF, DFF, PF>(
454 : storage: &'a GenericRemoteStorage,
455 : tenant_shard_id: &'a TenantShardId,
456 : timeline_id: Option<&'a TimelineId>,
457 : my_generation: Generation,
458 : what: &str,
459 : prefix: RemotePath,
460 : do_download: DF,
461 : parse_path: PF,
462 : cancel: &'a CancellationToken,
463 : ) -> Result<(T, Generation, SystemTime), DownloadError>
464 : where
465 : DF: Fn(
466 : &'a GenericRemoteStorage,
467 : &'a TenantShardId,
468 : Option<&'a TimelineId>,
469 : Generation,
470 : &'a CancellationToken,
471 : ) -> DFF,
472 : DFF: Future<Output = Result<(T, Generation, SystemTime), DownloadError>>,
473 : PF: Fn(RemotePath) -> Option<Generation>,
474 : T: 'static,
475 : {
476 : debug_assert_current_span_has_tenant_id();
477 :
478 : if my_generation.is_none() {
479 : // Operating without generations: just fetch the generation-less path
480 : return do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
481 : }
482 :
483 : // Stale case: If we were intentionally attached in a stale generation, the remote object may already
484 : // exist in our generation.
485 : //
486 : // This is an optimization to avoid doing the listing for the general case below.
487 : let res = do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
488 : match res {
489 : Ok(decoded) => {
490 : tracing::debug!("Found {what} from current generation (this is a stale attachment)");
491 : return Ok(decoded);
492 : }
493 : Err(DownloadError::NotFound) => {}
494 : Err(e) => return Err(e),
495 : };
496 :
497 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded the object
498 : // we are seeking in that generation. We may safely start from this index without doing a listing, because:
499 : // - We checked for current generation case above
500 : // - generations > my_generation are to be ignored
501 : // - any other objects that exist would have an older generation than `previous_gen`, and
502 : // we want to find the most recent object from a previous generation.
503 : //
504 : // This is an optimization to avoid doing the listing for the general case below.
505 : let res = do_download(
506 : storage,
507 : tenant_shard_id,
508 : timeline_id,
509 : my_generation.previous(),
510 : cancel,
511 : )
512 : .await;
513 : match res {
514 : Ok(decoded) => {
515 : tracing::debug!("Found {what} from previous generation");
516 : return Ok(decoded);
517 : }
518 : Err(DownloadError::NotFound) => {
519 : tracing::debug!("No {what} found from previous generation, falling back to listing");
520 : }
521 : Err(e) => {
522 : return Err(e);
523 : }
524 : }
525 :
526 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
527 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
528 : // to constructing a full index path with no generation, because the generation is a suffix.
529 : let paths = download_retry(
530 457 : || async {
531 457 : storage
532 457 : .list(Some(&prefix), ListingMode::NoDelimiter, None, cancel)
533 457 : .await
534 914 : },
535 : "list index_part files",
536 : cancel,
537 : )
538 : .await?
539 : .keys;
540 :
541 : // General case logic for which index to use: the latest index whose generation
542 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
543 : let max_previous_generation = paths
544 : .into_iter()
545 36 : .filter_map(|o| parse_path(o.key))
546 24 : .filter(|g| g <= &my_generation)
547 : .max();
548 :
549 : match max_previous_generation {
550 : Some(g) => {
551 : tracing::debug!("Found {what} in generation {g:?}");
552 : do_download(storage, tenant_shard_id, timeline_id, g, cancel).await
553 : }
554 : None => {
555 : // Migration from legacy pre-generation state: we have a generation but no prior
556 : // attached pageservers did. Try to load from a no-generation path.
557 : tracing::debug!("No {what}* found");
558 : do_download(
559 : storage,
560 : tenant_shard_id,
561 : timeline_id,
562 : Generation::none(),
563 : cancel,
564 : )
565 : .await
566 : }
567 : }
568 : }
569 :
570 : /// index_part.json objects are suffixed with a generation number, so we cannot
571 : /// directly GET the latest index part without doing some probing.
572 : ///
573 : /// In this function we probe for the most recent index in a generation <= our current generation.
574 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
575 40 : pub(crate) async fn download_index_part(
576 40 : storage: &GenericRemoteStorage,
577 40 : tenant_shard_id: &TenantShardId,
578 40 : timeline_id: &TimelineId,
579 40 : my_generation: Generation,
580 40 : cancel: &CancellationToken,
581 40 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
582 40 : debug_assert_current_span_has_tenant_and_timeline_id();
583 40 :
584 40 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
585 40 : download_generation_object(
586 40 : storage,
587 40 : tenant_shard_id,
588 40 : Some(timeline_id),
589 40 : my_generation,
590 40 : "index_part",
591 40 : index_prefix,
592 40 : do_download_index_part,
593 40 : parse_remote_index_path,
594 40 : cancel,
595 40 : )
596 40 : .await
597 40 : }
598 :
599 445 : pub(crate) async fn download_tenant_manifest(
600 445 : storage: &GenericRemoteStorage,
601 445 : tenant_shard_id: &TenantShardId,
602 445 : my_generation: Generation,
603 445 : cancel: &CancellationToken,
604 445 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
605 445 : let manifest_prefix = remote_tenant_manifest_prefix(tenant_shard_id);
606 445 :
607 445 : download_generation_object(
608 445 : storage,
609 445 : tenant_shard_id,
610 445 : None,
611 445 : my_generation,
612 445 : "tenant-manifest",
613 445 : manifest_prefix,
614 445 : do_download_tenant_manifest,
615 445 : parse_remote_tenant_manifest_path,
616 445 : cancel,
617 445 : )
618 445 : .await
619 445 : }
620 :
621 4 : pub(crate) async fn download_initdb_tar_zst(
622 4 : conf: &'static PageServerConf,
623 4 : storage: &GenericRemoteStorage,
624 4 : tenant_shard_id: &TenantShardId,
625 4 : timeline_id: &TimelineId,
626 4 : cancel: &CancellationToken,
627 4 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
628 4 : debug_assert_current_span_has_tenant_and_timeline_id();
629 4 :
630 4 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
631 4 :
632 4 : let remote_preserved_path =
633 4 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
634 4 :
635 4 : let timeline_path = conf.timelines_path(tenant_shard_id);
636 4 :
637 4 : if !timeline_path.exists() {
638 0 : tokio::fs::create_dir_all(&timeline_path)
639 0 : .await
640 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
641 0 : .map_err(DownloadError::Other)?;
642 4 : }
643 4 : let temp_path = timeline_path.join(format!(
644 4 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
645 4 : ));
646 :
647 4 : let file = download_retry(
648 4 : || async {
649 4 : let file = OpenOptions::new()
650 4 : .create(true)
651 4 : .truncate(true)
652 4 : .read(true)
653 4 : .write(true)
654 4 : .open(&temp_path)
655 4 : .await
656 4 : .with_context(|| format!("tempfile creation {temp_path}"))
657 4 : .map_err(DownloadError::Other)?;
658 :
659 4 : let download = match storage
660 4 : .download(&remote_path, &DownloadOpts::default(), cancel)
661 4 : .await
662 : {
663 4 : Ok(dl) => dl,
664 : Err(DownloadError::NotFound) => {
665 0 : storage
666 0 : .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
667 0 : .await?
668 : }
669 0 : Err(other) => Err(other)?,
670 : };
671 4 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
672 4 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
673 4 :
674 4 : tokio::io::copy_buf(&mut download, &mut writer).await?;
675 :
676 4 : let mut file = writer.into_inner();
677 4 :
678 4 : file.seek(std::io::SeekFrom::Start(0))
679 4 : .await
680 4 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
681 4 : .map_err(DownloadError::Other)?;
682 :
683 4 : Ok(file)
684 8 : },
685 4 : &format!("download {remote_path}"),
686 4 : cancel,
687 4 : )
688 4 : .await
689 4 : .inspect_err(|_e| {
690 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
691 : // We don't have async here nor do we want to pile on any extra errors.
692 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
693 0 : if e.kind() != std::io::ErrorKind::NotFound {
694 0 : warn!("error deleting temporary file {temp_path}: {e}");
695 0 : }
696 0 : }
697 4 : })?;
698 :
699 4 : Ok((temp_path, file))
700 4 : }
701 :
702 : /// Helper function to handle retries for a download operation.
703 : ///
704 : /// Remote operations can fail due to rate limits (S3), spurious network
705 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
706 : /// with backoff.
707 : ///
708 : /// (See similar logic for uploads in `perform_upload_task`)
709 490 : pub(super) async fn download_retry<T, O, F>(
710 490 : op: O,
711 490 : description: &str,
712 490 : cancel: &CancellationToken,
713 490 : ) -> Result<T, DownloadError>
714 490 : where
715 490 : O: FnMut() -> F,
716 490 : F: Future<Output = Result<T, DownloadError>>,
717 490 : {
718 490 : backoff::retry(
719 490 : op,
720 490 : DownloadError::is_permanent,
721 490 : FAILED_DOWNLOAD_WARN_THRESHOLD,
722 490 : FAILED_REMOTE_OP_RETRIES,
723 490 : description,
724 490 : cancel,
725 490 : )
726 490 : .await
727 490 : .ok_or_else(|| DownloadError::Cancelled)
728 490 : .and_then(|x| x)
729 490 : }
730 :
731 1848 : pub(crate) async fn download_retry_forever<T, O, F>(
732 1848 : op: O,
733 1848 : description: &str,
734 1848 : cancel: &CancellationToken,
735 1848 : ) -> Result<T, DownloadError>
736 1848 : where
737 1848 : O: FnMut() -> F,
738 1848 : F: Future<Output = Result<T, DownloadError>>,
739 1848 : {
740 1848 : backoff::retry(
741 1848 : op,
742 1848 : DownloadError::is_permanent,
743 1848 : FAILED_DOWNLOAD_WARN_THRESHOLD,
744 1848 : u32::MAX,
745 1848 : description,
746 1848 : cancel,
747 1848 : )
748 1848 : .await
749 1848 : .ok_or_else(|| DownloadError::Cancelled)
750 1848 : .and_then(|x| x)
751 1848 : }
|