Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::str::FromStr;
9 : use std::time::SystemTime;
10 :
11 : use anyhow::{Context, anyhow};
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use pageserver_api::shard::TenantShardId;
14 : use remote_storage::{
15 : DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
16 : };
17 : use tokio::fs::{self, File, OpenOptions};
18 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
19 : use tokio_util::io::StreamReader;
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::warn;
22 : use utils::crashsafe::path_with_suffix_extension;
23 : use utils::id::{TenantId, TimelineId};
24 : use utils::{backoff, pausable_failpoint};
25 :
26 : use super::index::{IndexPart, LayerFileMetadata};
27 : use super::manifest::TenantManifest;
28 : use super::{
29 : FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
30 : parse_remote_tenant_manifest_path, remote_index_path, remote_initdb_archive_path,
31 : remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
32 : remote_tenant_manifest_prefix, remote_tenant_path,
33 : };
34 : use crate::TEMP_FILE_SUFFIX;
35 : use crate::config::PageServerConf;
36 : use crate::context::RequestContext;
37 : use crate::span::{
38 : debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
39 : };
40 : use crate::tenant::Generation;
41 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
42 : use crate::tenant::storage_layer::LayerName;
43 : use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
44 :
45 : ///
46 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
47 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
48 : ///
49 : /// Returns the size of the downloaded file.
50 : #[allow(clippy::too_many_arguments)]
51 84 : pub async fn download_layer_file<'a>(
52 84 : conf: &'static PageServerConf,
53 84 : storage: &'a GenericRemoteStorage,
54 84 : tenant_shard_id: TenantShardId,
55 84 : timeline_id: TimelineId,
56 84 : layer_file_name: &'a LayerName,
57 84 : layer_metadata: &'a LayerFileMetadata,
58 84 : local_path: &Utf8Path,
59 84 : gate: &utils::sync::gate::Gate,
60 84 : cancel: &CancellationToken,
61 84 : ctx: &RequestContext,
62 84 : ) -> Result<u64, DownloadError> {
63 84 : debug_assert_current_span_has_tenant_and_timeline_id();
64 84 :
65 84 : let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
66 84 :
67 84 : let remote_path = remote_layer_path(
68 84 : &tenant_shard_id.tenant_id,
69 84 : &timeline_id,
70 84 : layer_metadata.shard,
71 84 : layer_file_name,
72 84 : layer_metadata.generation,
73 84 : );
74 84 :
75 84 : // Perform a rename inspired by durable_rename from file_utils.c.
76 84 : // The sequence:
77 84 : // write(tmp)
78 84 : // fsync(tmp)
79 84 : // rename(tmp, new)
80 84 : // fsync(new)
81 84 : // fsync(parent)
82 84 : // For more context about durable_rename check this email from postgres mailing list:
83 84 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
84 84 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
85 84 : let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
86 :
87 84 : let bytes_amount = download_retry(
88 84 : || async {
89 84 : download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
90 168 : },
91 84 : &format!("download {remote_path:?}"),
92 84 : cancel,
93 84 : )
94 84 : .await?;
95 :
96 84 : let expected = layer_metadata.file_size;
97 84 : if expected != bytes_amount {
98 0 : return Err(DownloadError::Other(anyhow!(
99 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
100 0 : )));
101 84 : }
102 84 :
103 84 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
104 0 : Err(DownloadError::Other(anyhow!(
105 0 : "remote-storage-download-pre-rename failpoint triggered"
106 0 : )))
107 84 : });
108 :
109 84 : fs::rename(&temp_file_path, &local_path)
110 84 : .await
111 84 : .with_context(|| format!("rename download layer file to {local_path}"))
112 84 : .map_err(DownloadError::Other)?;
113 :
114 : // We use fatal_err() below because the after the rename above,
115 : // the in-memory state of the filesystem already has the layer file in its final place,
116 : // and subsequent pageserver code could think it's durable while it really isn't.
117 84 : let work = {
118 84 : let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
119 84 : async move {
120 84 : let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
121 84 : .await
122 84 : .fatal_err("VirtualFile::open for timeline dir fsync");
123 84 : timeline_dir
124 84 : .sync_all()
125 84 : .await
126 84 : .fatal_err("VirtualFile::sync_all timeline dir");
127 84 : }
128 : };
129 84 : crate::virtual_file::io_engine::get()
130 84 : .spawn_blocking_and_block_on_if_std(work)
131 84 : .await;
132 :
133 84 : tracing::debug!("download complete: {local_path}");
134 :
135 84 : Ok(bytes_amount)
136 84 : }
137 :
138 : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
139 : ///
140 : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
141 : /// (Note that the directory entry for the inode is not made durable.)
142 : /// The file size in bytes is returned.
143 : ///
144 : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
145 : /// The unlinking has _not_ been made durable.
146 84 : async fn download_object(
147 84 : storage: &GenericRemoteStorage,
148 84 : src_path: &RemotePath,
149 84 : dst_path: &Utf8PathBuf,
150 84 : #[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
151 84 : cancel: &CancellationToken,
152 84 : #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
153 84 : ) -> Result<u64, DownloadError> {
154 84 : let res = match crate::virtual_file::io_engine::get() {
155 0 : crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
156 : crate::virtual_file::io_engine::IoEngine::StdFs => {
157 42 : async {
158 42 : let destination_file = tokio::fs::File::create(dst_path)
159 42 : .await
160 42 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
161 42 : .map_err(DownloadError::Other)?;
162 :
163 42 : let download = storage
164 42 : .download(src_path, &DownloadOpts::default(), cancel)
165 42 : .await?;
166 :
167 42 : pausable_failpoint!("before-downloading-layer-stream-pausable");
168 :
169 42 : let mut buf_writer =
170 42 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
171 42 :
172 42 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
173 :
174 42 : let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
175 42 : buf_writer.flush().await?;
176 :
177 42 : let mut destination_file = buf_writer.into_inner();
178 42 :
179 42 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
180 42 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
181 42 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
182 42 : // you should call flush before dropping it.
183 42 : //
184 42 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
185 42 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
186 42 : // But for additional safety lets check/wait for any pending operations.
187 42 : destination_file
188 42 : .flush()
189 42 : .await
190 42 : .maybe_fatal_err("download_object sync_all")
191 42 : .with_context(|| format!("flush source file at {dst_path}"))
192 42 : .map_err(DownloadError::Other)?;
193 :
194 : // not using sync_data because it can lose file size update
195 42 : destination_file
196 42 : .sync_all()
197 42 : .await
198 42 : .maybe_fatal_err("download_object sync_all")
199 42 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
200 42 : .map_err(DownloadError::Other)?;
201 :
202 42 : Ok(bytes_amount)
203 42 : }
204 42 : .await
205 : }
206 : #[cfg(target_os = "linux")]
207 : crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
208 : use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
209 : use std::sync::Arc;
210 :
211 : use crate::virtual_file::{IoBufferMut, owned_buffers_io};
212 42 : async {
213 42 : let destination_file = Arc::new(
214 42 : VirtualFile::create(dst_path, ctx)
215 42 : .await
216 42 : .with_context(|| {
217 0 : format!("create a destination file for layer '{dst_path}'")
218 42 : })
219 42 : .map_err(DownloadError::Other)?,
220 : );
221 :
222 42 : let mut download = storage
223 42 : .download(src_path, &DownloadOpts::default(), cancel)
224 42 : .await?;
225 :
226 42 : pausable_failpoint!("before-downloading-layer-stream-pausable");
227 :
228 42 : let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
229 42 : destination_file,
230 84 : || IoBufferMut::with_capacity(super::BUFFER_SIZE),
231 42 : gate.enter().map_err(|_| DownloadError::Cancelled)?,
232 42 : cancel.child_token(),
233 42 : ctx,
234 42 : tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
235 : );
236 :
237 : // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
238 : // There's chunks_vectored() on the stream.
239 42 : let (bytes_amount, destination_file) = async {
240 252 : while let Some(res) =
241 294 : futures::StreamExt::next(&mut download.download_stream).await
242 : {
243 252 : let chunk = match res {
244 252 : Ok(chunk) => chunk,
245 0 : Err(e) => return Err(DownloadError::from(e)),
246 : };
247 252 : buffered
248 252 : .write_buffered_borrowed(&chunk, ctx)
249 252 : .await
250 252 : .map_err(|e| match e {
251 0 : FlushTaskError::Cancelled => DownloadError::Cancelled,
252 252 : })?;
253 : }
254 42 : let inner = buffered
255 42 : .flush_and_into_inner(ctx)
256 42 : .await
257 42 : .map_err(|e| match e {
258 0 : FlushTaskError::Cancelled => DownloadError::Cancelled,
259 42 : })?;
260 42 : Ok(inner)
261 42 : }
262 42 : .await?;
263 :
264 : // not using sync_data because it can lose file size update
265 42 : destination_file
266 42 : .sync_all()
267 42 : .await
268 42 : .maybe_fatal_err("download_object sync_all")
269 42 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
270 42 : .map_err(DownloadError::Other)?;
271 :
272 42 : Ok(bytes_amount)
273 42 : }
274 42 : .await
275 : }
276 : };
277 :
278 : // in case the download failed, clean up
279 84 : match res {
280 84 : Ok(bytes_amount) => Ok(bytes_amount),
281 0 : Err(e) => {
282 0 : if let Err(e) = tokio::fs::remove_file(dst_path).await {
283 0 : if e.kind() != std::io::ErrorKind::NotFound {
284 0 : on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
285 0 : }
286 0 : }
287 0 : Err(e)
288 : }
289 : }
290 84 : }
291 :
292 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
293 :
294 0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
295 0 : let extension = path.extension();
296 0 : match extension {
297 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
298 0 : Some(_) => false,
299 0 : None => false,
300 : }
301 0 : }
302 :
303 1392 : async fn list_identifiers<T>(
304 1392 : storage: &GenericRemoteStorage,
305 1392 : prefix: RemotePath,
306 1392 : cancel: CancellationToken,
307 1392 : ) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
308 1392 : where
309 1392 : T: FromStr + Eq + std::hash::Hash,
310 1392 : {
311 1392 : let listing = download_retry_forever(
312 1392 : || storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
313 1392 : &format!("list identifiers in prefix {prefix}"),
314 1392 : &cancel,
315 1392 : )
316 1392 : .await?;
317 :
318 1392 : let mut parsed_ids = HashSet::new();
319 1392 : let mut other_prefixes = HashSet::new();
320 :
321 1428 : for id_remote_storage_key in listing.prefixes {
322 36 : let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
323 0 : anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
324 36 : })?;
325 :
326 36 : match object_name.parse::<T>() {
327 36 : Ok(t) => parsed_ids.insert(t),
328 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
329 : };
330 : }
331 :
332 1392 : for object in listing.keys {
333 0 : let object_name = object
334 0 : .key
335 0 : .object_name()
336 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {}", object.key))?;
337 0 : other_prefixes.insert(object_name.to_string());
338 : }
339 :
340 1392 : Ok((parsed_ids, other_prefixes))
341 1392 : }
342 :
343 : /// List shards of given tenant in remote storage
344 0 : pub(crate) async fn list_remote_tenant_shards(
345 0 : storage: &GenericRemoteStorage,
346 0 : tenant_id: TenantId,
347 0 : cancel: CancellationToken,
348 0 : ) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
349 0 : let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
350 0 : list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
351 0 : }
352 :
353 : /// List timelines of given tenant shard in remote storage
354 1392 : pub async fn list_remote_timelines(
355 1392 : storage: &GenericRemoteStorage,
356 1392 : tenant_shard_id: TenantShardId,
357 1392 : cancel: CancellationToken,
358 1392 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
359 1392 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
360 0 : anyhow::bail!("storage-sync-list-remote-timelines");
361 1392 : });
362 :
363 1392 : let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
364 1392 : list_identifiers::<TimelineId>(storage, remote_path, cancel).await
365 1392 : }
366 :
367 4308 : async fn do_download_remote_path_retry_forever(
368 4308 : storage: &GenericRemoteStorage,
369 4308 : remote_path: &RemotePath,
370 4308 : download_opts: DownloadOpts,
371 4308 : cancel: &CancellationToken,
372 4308 : ) -> Result<(Vec<u8>, SystemTime), DownloadError> {
373 4308 : download_retry_forever(
374 4308 : || async {
375 4308 : let download = storage
376 4308 : .download(remote_path, &download_opts, cancel)
377 4308 : .await?;
378 :
379 156 : let mut bytes = Vec::new();
380 156 :
381 156 : let stream = download.download_stream;
382 156 : let mut stream = StreamReader::new(stream);
383 156 :
384 156 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
385 :
386 156 : Ok((bytes, download.last_modified))
387 8616 : },
388 4308 : &format!("download {remote_path:?}"),
389 4308 : cancel,
390 4308 : )
391 4308 : .await
392 4308 : }
393 :
394 4104 : async fn do_download_tenant_manifest(
395 4104 : storage: &GenericRemoteStorage,
396 4104 : tenant_shard_id: &TenantShardId,
397 4104 : _timeline_id: Option<&TimelineId>,
398 4104 : generation: Generation,
399 4104 : cancel: &CancellationToken,
400 4104 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
401 4104 : let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
402 4104 :
403 4104 : let download_opts = DownloadOpts {
404 4104 : kind: DownloadKind::Small,
405 4104 : ..Default::default()
406 4104 : };
407 :
408 36 : let (manifest_bytes, manifest_bytes_mtime) =
409 4104 : do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
410 :
411 36 : let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
412 36 : .with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
413 36 : .map_err(DownloadError::Other)?;
414 :
415 36 : Ok((tenant_manifest, generation, manifest_bytes_mtime))
416 4104 : }
417 :
418 204 : async fn do_download_index_part(
419 204 : storage: &GenericRemoteStorage,
420 204 : tenant_shard_id: &TenantShardId,
421 204 : timeline_id: Option<&TimelineId>,
422 204 : index_generation: Generation,
423 204 : cancel: &CancellationToken,
424 204 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
425 204 : let timeline_id =
426 204 : timeline_id.expect("A timeline ID is always provided when downloading an index");
427 204 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
428 204 :
429 204 : let download_opts = DownloadOpts {
430 204 : kind: DownloadKind::Small,
431 204 : ..Default::default()
432 204 : };
433 :
434 120 : let (index_part_bytes, index_part_mtime) =
435 204 : do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
436 :
437 120 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
438 120 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
439 120 : .map_err(DownloadError::Other)?;
440 :
441 120 : Ok((index_part, index_generation, index_part_mtime))
442 204 : }
443 :
444 : /// Metadata objects are "generationed", meaning that they include a generation suffix. This
445 : /// function downloads the object with the highest generation <= `my_generation`.
446 : ///
447 : /// Data objects (layer files) also include a generation in their path, but there is no equivalent
448 : /// search process, because their reference from an index includes the generation.
449 : ///
450 : /// An expensive object listing operation is only done if necessary: the typical fast path is to issue two
451 : /// GET operations, one to our own generation (stale attachment case), and one to the immediately preceding
452 : /// generation (normal case when migrating/restarting). Only if both of these return 404 do we fall back
453 : /// to listing objects.
454 : ///
455 : /// * `my_generation`: the value of `[crate::tenant::TenantShard::generation]`
456 : /// * `what`: for logging, what object are we downloading
457 : /// * `prefix`: when listing objects, use this prefix (i.e. the part of the object path before the generation)
458 : /// * `do_download`: a GET of the object in a particular generation, which should **retry indefinitely** unless
459 : /// `cancel`` has fired. This function does not do its own retries of GET operations, and relies
460 : /// on the function passed in to do so.
461 : /// * `parse_path`: parse a fully qualified remote storage path to get the generation of the object.
462 : #[allow(clippy::too_many_arguments)]
463 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
464 : pub(crate) async fn download_generation_object<'a, T, DF, DFF, PF>(
465 : storage: &'a GenericRemoteStorage,
466 : tenant_shard_id: &'a TenantShardId,
467 : timeline_id: Option<&'a TimelineId>,
468 : my_generation: Generation,
469 : what: &str,
470 : prefix: RemotePath,
471 : do_download: DF,
472 : parse_path: PF,
473 : cancel: &'a CancellationToken,
474 : ) -> Result<(T, Generation, SystemTime), DownloadError>
475 : where
476 : DF: Fn(
477 : &'a GenericRemoteStorage,
478 : &'a TenantShardId,
479 : Option<&'a TimelineId>,
480 : Generation,
481 : &'a CancellationToken,
482 : ) -> DFF,
483 : DFF: Future<Output = Result<(T, Generation, SystemTime), DownloadError>>,
484 : PF: Fn(RemotePath) -> Option<Generation>,
485 : T: 'static,
486 : {
487 : debug_assert_current_span_has_tenant_id();
488 :
489 : if my_generation.is_none() {
490 : // Operating without generations: just fetch the generation-less path
491 : return do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
492 : }
493 :
494 : // Stale case: If we were intentionally attached in a stale generation, the remote object may already
495 : // exist in our generation.
496 : //
497 : // This is an optimization to avoid doing the listing for the general case below.
498 : let res = do_download(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
499 : match res {
500 : Ok(decoded) => {
501 : tracing::debug!("Found {what} from current generation (this is a stale attachment)");
502 : return Ok(decoded);
503 : }
504 : Err(DownloadError::NotFound) => {}
505 : Err(e) => return Err(e),
506 : };
507 :
508 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded the object
509 : // we are seeking in that generation. We may safely start from this index without doing a listing, because:
510 : // - We checked for current generation case above
511 : // - generations > my_generation are to be ignored
512 : // - any other objects that exist would have an older generation than `previous_gen`, and
513 : // we want to find the most recent object from a previous generation.
514 : //
515 : // This is an optimization to avoid doing the listing for the general case below.
516 : let res = do_download(
517 : storage,
518 : tenant_shard_id,
519 : timeline_id,
520 : my_generation.previous(),
521 : cancel,
522 : )
523 : .await;
524 : match res {
525 : Ok(decoded) => {
526 : tracing::debug!("Found {what} from previous generation");
527 : return Ok(decoded);
528 : }
529 : Err(DownloadError::NotFound) => {
530 : tracing::debug!("No {what} found from previous generation, falling back to listing");
531 : }
532 : Err(e) => {
533 : return Err(e);
534 : }
535 : }
536 :
537 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
538 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
539 : // to constructing a full index path with no generation, because the generation is a suffix.
540 : let paths = download_retry(
541 1392 : || async {
542 1392 : storage
543 1392 : .list(Some(&prefix), ListingMode::NoDelimiter, None, cancel)
544 1392 : .await
545 2784 : },
546 : "list index_part files",
547 : cancel,
548 : )
549 : .await?
550 : .keys;
551 :
552 : // General case logic for which index to use: the latest index whose generation
553 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
554 : let max_previous_generation = paths
555 : .into_iter()
556 108 : .filter_map(|o| parse_path(o.key))
557 72 : .filter(|g| g <= &my_generation)
558 : .max();
559 :
560 : match max_previous_generation {
561 : Some(g) => {
562 : tracing::debug!("Found {what} in generation {g:?}");
563 : do_download(storage, tenant_shard_id, timeline_id, g, cancel).await
564 : }
565 : None => {
566 : // Migration from legacy pre-generation state: we have a generation but no prior
567 : // attached pageservers did. Try to load from a no-generation path.
568 : tracing::debug!("No {what}* found");
569 : do_download(
570 : storage,
571 : tenant_shard_id,
572 : timeline_id,
573 : Generation::none(),
574 : cancel,
575 : )
576 : .await
577 : }
578 : }
579 : }
580 :
581 : /// index_part.json objects are suffixed with a generation number, so we cannot
582 : /// directly GET the latest index part without doing some probing.
583 : ///
584 : /// In this function we probe for the most recent index in a generation <= our current generation.
585 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
586 120 : pub(crate) async fn download_index_part(
587 120 : storage: &GenericRemoteStorage,
588 120 : tenant_shard_id: &TenantShardId,
589 120 : timeline_id: &TimelineId,
590 120 : my_generation: Generation,
591 120 : cancel: &CancellationToken,
592 120 : ) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
593 120 : debug_assert_current_span_has_tenant_and_timeline_id();
594 120 :
595 120 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
596 120 : download_generation_object(
597 120 : storage,
598 120 : tenant_shard_id,
599 120 : Some(timeline_id),
600 120 : my_generation,
601 120 : "index_part",
602 120 : index_prefix,
603 120 : do_download_index_part,
604 120 : parse_remote_index_path,
605 120 : cancel,
606 120 : )
607 120 : .await
608 120 : }
609 :
610 1392 : pub(crate) async fn download_tenant_manifest(
611 1392 : storage: &GenericRemoteStorage,
612 1392 : tenant_shard_id: &TenantShardId,
613 1392 : my_generation: Generation,
614 1392 : cancel: &CancellationToken,
615 1392 : ) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
616 1392 : let manifest_prefix = remote_tenant_manifest_prefix(tenant_shard_id);
617 1392 :
618 1392 : download_generation_object(
619 1392 : storage,
620 1392 : tenant_shard_id,
621 1392 : None,
622 1392 : my_generation,
623 1392 : "tenant-manifest",
624 1392 : manifest_prefix,
625 1392 : do_download_tenant_manifest,
626 1392 : parse_remote_tenant_manifest_path,
627 1392 : cancel,
628 1392 : )
629 1392 : .await
630 1392 : }
631 :
632 12 : pub(crate) async fn download_initdb_tar_zst(
633 12 : conf: &'static PageServerConf,
634 12 : storage: &GenericRemoteStorage,
635 12 : tenant_shard_id: &TenantShardId,
636 12 : timeline_id: &TimelineId,
637 12 : cancel: &CancellationToken,
638 12 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
639 12 : debug_assert_current_span_has_tenant_and_timeline_id();
640 12 :
641 12 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
642 12 :
643 12 : let remote_preserved_path =
644 12 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
645 12 :
646 12 : let timeline_path = conf.timelines_path(tenant_shard_id);
647 12 :
648 12 : if !timeline_path.exists() {
649 0 : tokio::fs::create_dir_all(&timeline_path)
650 0 : .await
651 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
652 0 : .map_err(DownloadError::Other)?;
653 12 : }
654 12 : let temp_path = timeline_path.join(format!(
655 12 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
656 12 : ));
657 :
658 12 : let file = download_retry(
659 12 : || async {
660 12 : let file = OpenOptions::new()
661 12 : .create(true)
662 12 : .truncate(true)
663 12 : .read(true)
664 12 : .write(true)
665 12 : .open(&temp_path)
666 12 : .await
667 12 : .with_context(|| format!("tempfile creation {temp_path}"))
668 12 : .map_err(DownloadError::Other)?;
669 :
670 12 : let download = match storage
671 12 : .download(&remote_path, &DownloadOpts::default(), cancel)
672 12 : .await
673 : {
674 12 : Ok(dl) => dl,
675 : Err(DownloadError::NotFound) => {
676 0 : storage
677 0 : .download(&remote_preserved_path, &DownloadOpts::default(), cancel)
678 0 : .await?
679 : }
680 0 : Err(other) => Err(other)?,
681 : };
682 12 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
683 12 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
684 12 :
685 12 : tokio::io::copy_buf(&mut download, &mut writer).await?;
686 :
687 12 : let mut file = writer.into_inner();
688 12 :
689 12 : file.seek(std::io::SeekFrom::Start(0))
690 12 : .await
691 12 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
692 12 : .map_err(DownloadError::Other)?;
693 :
694 12 : Ok(file)
695 24 : },
696 12 : &format!("download {remote_path}"),
697 12 : cancel,
698 12 : )
699 12 : .await
700 12 : .inspect_err(|_e| {
701 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
702 : // We don't have async here nor do we want to pile on any extra errors.
703 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
704 0 : if e.kind() != std::io::ErrorKind::NotFound {
705 0 : warn!("error deleting temporary file {temp_path}: {e}");
706 0 : }
707 0 : }
708 12 : })?;
709 :
710 12 : Ok((temp_path, file))
711 12 : }
712 :
713 : /// Helper function to handle retries for a download operation.
714 : ///
715 : /// Remote operations can fail due to rate limits (S3), spurious network
716 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
717 : /// with backoff.
718 : ///
719 : /// (See similar logic for uploads in `perform_upload_task`)
720 1488 : pub(super) async fn download_retry<T, O, F>(
721 1488 : op: O,
722 1488 : description: &str,
723 1488 : cancel: &CancellationToken,
724 1488 : ) -> Result<T, DownloadError>
725 1488 : where
726 1488 : O: FnMut() -> F,
727 1488 : F: Future<Output = Result<T, DownloadError>>,
728 1488 : {
729 1488 : backoff::retry(
730 1488 : op,
731 1488 : DownloadError::is_permanent,
732 1488 : FAILED_DOWNLOAD_WARN_THRESHOLD,
733 1488 : FAILED_REMOTE_OP_RETRIES,
734 1488 : description,
735 1488 : cancel,
736 1488 : )
737 1488 : .await
738 1488 : .ok_or_else(|| DownloadError::Cancelled)
739 1488 : .and_then(|x| x)
740 1488 : }
741 :
742 5700 : pub(crate) async fn download_retry_forever<T, O, F>(
743 5700 : op: O,
744 5700 : description: &str,
745 5700 : cancel: &CancellationToken,
746 5700 : ) -> Result<T, DownloadError>
747 5700 : where
748 5700 : O: FnMut() -> F,
749 5700 : F: Future<Output = Result<T, DownloadError>>,
750 5700 : {
751 5700 : backoff::retry(
752 5700 : op,
753 5700 : DownloadError::is_permanent,
754 5700 : FAILED_DOWNLOAD_WARN_THRESHOLD,
755 5700 : u32::MAX,
756 5700 : description,
757 5700 : cancel,
758 5700 : )
759 5700 : .await
760 5700 : .ok_or_else(|| DownloadError::Cancelled)
761 5700 : .and_then(|x| x)
762 5700 : }
|