Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 :
9 : use anyhow::{anyhow, Context};
10 : use camino::{Utf8Path, Utf8PathBuf};
11 : use pageserver_api::shard::TenantShardId;
12 : use tokio::fs::{self, File, OpenOptions};
13 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::warn;
16 : use utils::timeout::timeout_cancellable;
17 : use utils::{backoff, crashsafe};
18 :
19 : use crate::config::PageServerConf;
20 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
21 : use crate::tenant::remote_timeline_client::{
22 : download_cancellable, remote_layer_path, remote_timelines_path, DOWNLOAD_TIMEOUT,
23 : };
24 : use crate::tenant::storage_layer::LayerFileName;
25 : use crate::tenant::Generation;
26 : use crate::virtual_file::on_fatal_io_error;
27 : use crate::TEMP_FILE_SUFFIX;
28 : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
29 : use utils::crashsafe::path_with_suffix_extension;
30 : use utils::id::TimelineId;
31 :
32 : use super::index::{IndexPart, LayerFileMetadata};
33 : use super::{
34 : parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
35 : remote_initdb_preserved_archive_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
36 : INITDB_PATH,
37 : };
38 :
39 : ///
40 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
41 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
42 : ///
43 : /// Returns the size of the downloaded file.
44 10731 : pub async fn download_layer_file<'a>(
45 10731 : conf: &'static PageServerConf,
46 10731 : storage: &'a GenericRemoteStorage,
47 10731 : tenant_shard_id: TenantShardId,
48 10731 : timeline_id: TimelineId,
49 10731 : layer_file_name: &'a LayerFileName,
50 10731 : layer_metadata: &'a LayerFileMetadata,
51 10731 : cancel: &CancellationToken,
52 10731 : ) -> Result<u64, DownloadError> {
53 10731 : debug_assert_current_span_has_tenant_and_timeline_id();
54 10731 :
55 10731 : let local_path = conf
56 10731 : .timeline_path(&tenant_shard_id, &timeline_id)
57 10731 : .join(layer_file_name.file_name());
58 10731 :
59 10731 : let remote_path = remote_layer_path(
60 10731 : &tenant_shard_id.tenant_id,
61 10731 : &timeline_id,
62 10731 : layer_metadata.shard,
63 10731 : layer_file_name,
64 10731 : layer_metadata.generation,
65 10731 : );
66 10731 :
67 10731 : // Perform a rename inspired by durable_rename from file_utils.c.
68 10731 : // The sequence:
69 10731 : // write(tmp)
70 10731 : // fsync(tmp)
71 10731 : // rename(tmp, new)
72 10731 : // fsync(new)
73 10731 : // fsync(parent)
74 10731 : // For more context about durable_rename check this email from postgres mailing list:
75 10731 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
76 10731 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
77 10731 : let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
78 :
79 10731 : let (mut destination_file, bytes_amount) = download_retry(
80 10758 : || async {
81 10758 : let destination_file = tokio::fs::File::create(&temp_file_path)
82 10291 : .await
83 10758 : .with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
84 10758 : .map_err(DownloadError::Other)?;
85 :
86 : // Cancellation safety: it is safe to cancel this future, because it isn't writing to a local
87 : // file: the write to local file doesn't start until after the request header is returned
88 : // and we start draining the body stream below
89 10758 : let download = download_cancellable(cancel, storage.download(&remote_path))
90 29701 : .await
91 10758 : .with_context(|| {
92 30 : format!(
93 30 : "open a download stream for layer with remote storage path '{remote_path:?}'"
94 30 : )
95 10758 : })
96 10758 : .map_err(DownloadError::Other)?;
97 :
98 10728 : let mut destination_file =
99 10728 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
100 10728 :
101 10728 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
102 :
103 : // Cancellation safety: it is safe to cancel this future because it is writing into a temporary file,
104 : // and we will unlink the temporary file if there is an error. This unlink is important because we
105 : // are in a retry loop, and we wouldn't want to leave behind a rogue write I/O to a file that
106 : // we will imminiently try and write to again.
107 10728 : let bytes_amount: u64 = match timeout_cancellable(
108 10728 : DOWNLOAD_TIMEOUT,
109 10728 : cancel,
110 10728 : tokio::io::copy_buf(&mut reader, &mut destination_file),
111 10728 : )
112 422199 : .await
113 10728 : .with_context(|| {
114 1 : format!(
115 1 : "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
116 1 : )
117 10728 : })
118 10728 : .map_err(DownloadError::Other)?
119 : {
120 10727 : Ok(b) => Ok(b),
121 0 : Err(e) => {
122 : // Remove incomplete files: on restart Timeline would do this anyway, but we must
123 : // do it here for the retry case.
124 0 : if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
125 0 : on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
126 0 : }
127 0 : Err(e)
128 : }
129 : }
130 10727 : .with_context(|| {
131 0 : format!(
132 0 : "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
133 0 : )
134 10727 : })
135 10727 : .map_err(DownloadError::Other)?;
136 :
137 10727 : let destination_file = destination_file.into_inner();
138 10727 :
139 10727 : Ok((destination_file, bytes_amount))
140 21516 : },
141 10731 : &format!("download {remote_path:?}"),
142 10731 : cancel,
143 10731 : )
144 462191 : .await?;
145 :
146 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
147 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
148 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
149 : // you should call flush before dropping it.
150 : //
151 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
152 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
153 : // But for additional safety lets check/wait for any pending operations.
154 10727 : destination_file
155 10727 : .flush()
156 0 : .await
157 10727 : .with_context(|| format!("flush source file at {temp_file_path}"))
158 10727 : .map_err(DownloadError::Other)?;
159 :
160 10727 : let expected = layer_metadata.file_size();
161 10727 : if expected != bytes_amount {
162 0 : return Err(DownloadError::Other(anyhow!(
163 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
164 0 : )));
165 10727 : }
166 10727 :
167 10727 : // not using sync_data because it can lose file size update
168 10727 : destination_file
169 10727 : .sync_all()
170 10724 : .await
171 10726 : .with_context(|| format!("failed to fsync source file at {temp_file_path}"))
172 10726 : .map_err(DownloadError::Other)?;
173 10726 : drop(destination_file);
174 10726 :
175 10726 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
176 6 : Err(DownloadError::Other(anyhow!(
177 6 : "remote-storage-download-pre-rename failpoint triggered"
178 6 : )))
179 10726 : });
180 :
181 10720 : fs::rename(&temp_file_path, &local_path)
182 10311 : .await
183 10720 : .with_context(|| format!("rename download layer file to {local_path}"))
184 10720 : .map_err(DownloadError::Other)?;
185 :
186 10720 : crashsafe::fsync_async(&local_path)
187 20880 : .await
188 10719 : .with_context(|| format!("fsync layer file {local_path}"))
189 10719 : .map_err(DownloadError::Other)?;
190 :
191 0 : tracing::debug!("download complete: {local_path}");
192 :
193 10719 : Ok(bytes_amount)
194 10729 : }
195 :
196 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
197 :
198 51 : pub fn is_temp_download_file(path: &Utf8Path) -> bool {
199 51 : let extension = path.extension();
200 51 : match extension {
201 4 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
202 1 : Some(_) => false,
203 47 : None => false,
204 : }
205 51 : }
206 :
207 : /// List timelines of given tenant in remote storage
208 872 : pub async fn list_remote_timelines(
209 872 : storage: &GenericRemoteStorage,
210 872 : tenant_shard_id: TenantShardId,
211 872 : cancel: CancellationToken,
212 872 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
213 872 : let remote_path = remote_timelines_path(&tenant_shard_id);
214 872 :
215 872 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
216 6 : anyhow::bail!("storage-sync-list-remote-timelines");
217 872 : });
218 :
219 866 : let listing = download_retry_forever(
220 929 : || {
221 929 : download_cancellable(
222 929 : &cancel,
223 929 : storage.list(Some(&remote_path), ListingMode::WithDelimiter, None),
224 929 : )
225 929 : },
226 866 : &format!("list timelines for {tenant_shard_id}"),
227 866 : &cancel,
228 866 : )
229 2243 : .await?;
230 :
231 866 : let mut timeline_ids = HashSet::new();
232 866 : let mut other_prefixes = HashSet::new();
233 :
234 1312 : for timeline_remote_storage_key in listing.prefixes {
235 446 : let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
236 0 : anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
237 446 : })?;
238 :
239 446 : match object_name.parse::<TimelineId>() {
240 446 : Ok(t) => timeline_ids.insert(t),
241 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
242 : };
243 : }
244 :
245 881 : for key in listing.keys {
246 15 : let object_name = key
247 15 : .object_name()
248 15 : .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
249 15 : other_prefixes.insert(object_name.to_string());
250 : }
251 :
252 866 : Ok((timeline_ids, other_prefixes))
253 872 : }
254 :
255 1085 : async fn do_download_index_part(
256 1085 : storage: &GenericRemoteStorage,
257 1085 : tenant_shard_id: &TenantShardId,
258 1085 : timeline_id: &TimelineId,
259 1085 : index_generation: Generation,
260 1085 : cancel: &CancellationToken,
261 1085 : ) -> Result<IndexPart, DownloadError> {
262 1085 : use futures::stream::StreamExt;
263 1085 :
264 1085 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
265 :
266 1085 : let index_part_bytes = download_retry_forever(
267 1139 : || async {
268 : // Cancellation: if is safe to cancel this future because we're just downloading into
269 : // a memory buffer, not touching local disk.
270 462 : let index_part_download =
271 1564 : download_cancellable(cancel, storage.download(&remote_path)).await?;
272 :
273 462 : let mut index_part_bytes = Vec::new();
274 462 : let mut stream = std::pin::pin!(index_part_download.download_stream);
275 1319 : while let Some(chunk) = stream.next().await {
276 857 : let chunk = chunk
277 857 : .with_context(|| format!("download index part at {remote_path:?}"))
278 857 : .map_err(DownloadError::Other)?;
279 857 : index_part_bytes.extend_from_slice(&chunk[..]);
280 : }
281 462 : Ok(index_part_bytes)
282 2278 : },
283 1085 : &format!("download {remote_path:?}"),
284 1085 : cancel,
285 1085 : )
286 2558 : .await?;
287 :
288 462 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
289 462 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
290 462 : .map_err(DownloadError::Other)?;
291 :
292 462 : Ok(index_part)
293 1085 : }
294 :
295 : /// index_part.json objects are suffixed with a generation number, so we cannot
296 : /// directly GET the latest index part without doing some probing.
297 : ///
298 : /// In this function we probe for the most recent index in a generation <= our current generation.
299 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
300 930 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
301 : pub(super) async fn download_index_part(
302 : storage: &GenericRemoteStorage,
303 : tenant_shard_id: &TenantShardId,
304 : timeline_id: &TimelineId,
305 : my_generation: Generation,
306 : cancel: &CancellationToken,
307 : ) -> Result<IndexPart, DownloadError> {
308 : debug_assert_current_span_has_tenant_and_timeline_id();
309 :
310 : if my_generation.is_none() {
311 : // Operating without generations: just fetch the generation-less path
312 : return do_download_index_part(
313 : storage,
314 : tenant_shard_id,
315 : timeline_id,
316 : my_generation,
317 : cancel,
318 : )
319 : .await;
320 : }
321 :
322 : // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
323 : // index in our generation.
324 : //
325 : // This is an optimization to avoid doing the listing for the general case below.
326 : let res =
327 : do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
328 : match res {
329 : Ok(index_part) => {
330 0 : tracing::debug!(
331 0 : "Found index_part from current generation (this is a stale attachment)"
332 0 : );
333 : return Ok(index_part);
334 : }
335 : Err(DownloadError::NotFound) => {}
336 : Err(e) => return Err(e),
337 : };
338 :
339 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded
340 : // and index part. We may safely start from this index without doing a listing, because:
341 : // - We checked for current generation case above
342 : // - generations > my_generation are to be ignored
343 : // - any other indices that exist would have an older generation than `previous_gen`, and
344 : // we want to find the most recent index from a previous generation.
345 : //
346 : // This is an optimization to avoid doing the listing for the general case below.
347 : let res = do_download_index_part(
348 : storage,
349 : tenant_shard_id,
350 : timeline_id,
351 : my_generation.previous(),
352 : cancel,
353 : )
354 : .await;
355 : match res {
356 : Ok(index_part) => {
357 0 : tracing::debug!("Found index_part from previous generation");
358 : return Ok(index_part);
359 : }
360 : Err(DownloadError::NotFound) => {
361 0 : tracing::debug!(
362 0 : "No index_part found from previous generation, falling back to listing"
363 0 : );
364 : }
365 : Err(e) => {
366 : return Err(e);
367 : }
368 : }
369 :
370 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
371 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
372 : // to constructing a full index path with no generation, because the generation is a suffix.
373 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
374 :
375 : let indices = download_retry(
376 425 : || async { storage.list_files(Some(&index_prefix), None).await },
377 : "list index_part files",
378 : cancel,
379 : )
380 : .await?;
381 :
382 : // General case logic for which index to use: the latest index whose generation
383 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
384 : let max_previous_generation = indices
385 : .into_iter()
386 : .filter_map(parse_remote_index_path)
387 403 : .filter(|g| g <= &my_generation)
388 : .max();
389 :
390 : match max_previous_generation {
391 : Some(g) => {
392 0 : tracing::debug!("Found index_part in generation {g:?}");
393 : do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
394 : }
395 : None => {
396 : // Migration from legacy pre-generation state: we have a generation but no prior
397 : // attached pageservers did. Try to load from a no-generation path.
398 0 : tracing::debug!("No index_part.json* found");
399 : do_download_index_part(
400 : storage,
401 : tenant_shard_id,
402 : timeline_id,
403 : Generation::none(),
404 : cancel,
405 : )
406 : .await
407 : }
408 : }
409 : }
410 :
411 4 : pub(crate) async fn download_initdb_tar_zst(
412 4 : conf: &'static PageServerConf,
413 4 : storage: &GenericRemoteStorage,
414 4 : tenant_shard_id: &TenantShardId,
415 4 : timeline_id: &TimelineId,
416 4 : cancel: &CancellationToken,
417 4 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
418 4 : debug_assert_current_span_has_tenant_and_timeline_id();
419 4 :
420 4 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
421 4 :
422 4 : let remote_preserved_path =
423 4 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
424 4 :
425 4 : let timeline_path = conf.timelines_path(tenant_shard_id);
426 4 :
427 4 : if !timeline_path.exists() {
428 0 : tokio::fs::create_dir_all(&timeline_path)
429 0 : .await
430 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
431 0 : .map_err(DownloadError::Other)?;
432 4 : }
433 4 : let temp_path = timeline_path.join(format!(
434 4 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
435 4 : ));
436 :
437 4 : let file = download_retry(
438 4 : || async {
439 4 : let file = OpenOptions::new()
440 4 : .create(true)
441 4 : .truncate(true)
442 4 : .read(true)
443 4 : .write(true)
444 4 : .open(&temp_path)
445 4 : .await
446 4 : .with_context(|| format!("tempfile creation {temp_path}"))
447 4 : .map_err(DownloadError::Other)?;
448 :
449 4 : let download = match download_cancellable(cancel, storage.download(&remote_path)).await
450 : {
451 2 : Ok(dl) => dl,
452 : Err(DownloadError::NotFound) => {
453 2 : download_cancellable(cancel, storage.download(&remote_preserved_path)).await?
454 : }
455 0 : Err(other) => Err(other)?,
456 : };
457 4 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
458 4 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
459 4 :
460 4 : // TODO: this consumption of the response body should be subject to timeout + cancellation, but
461 4 : // not without thinking carefully about how to recover safely from cancelling a write to
462 4 : // local storage (e.g. by writing into a temp file as we do in download_layer)
463 4 : tokio::io::copy_buf(&mut download, &mut writer)
464 1618 : .await
465 4 : .with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))
466 4 : .map_err(DownloadError::Other)?;
467 :
468 4 : let mut file = writer.into_inner();
469 4 :
470 4 : file.seek(std::io::SeekFrom::Start(0))
471 4 : .await
472 4 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
473 4 : .map_err(DownloadError::Other)?;
474 :
475 4 : Ok(file)
476 8 : },
477 4 : &format!("download {remote_path}"),
478 4 : cancel,
479 4 : )
480 1630 : .await
481 4 : .map_err(|e| {
482 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
483 : // We don't have async here nor do we want to pile on any extra errors.
484 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
485 0 : if e.kind() != std::io::ErrorKind::NotFound {
486 0 : warn!("error deleting temporary file {temp_path}: {e}");
487 0 : }
488 0 : }
489 0 : e
490 4 : })?;
491 :
492 4 : Ok((temp_path, file))
493 4 : }
494 :
495 : /// Helper function to handle retries for a download operation.
496 : ///
497 : /// Remote operations can fail due to rate limits (S3), spurious network
498 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
499 : /// with backoff.
500 : ///
501 : /// (See similar logic for uploads in `perform_upload_task`)
502 11109 : pub(super) async fn download_retry<T, O, F>(
503 11109 : op: O,
504 11109 : description: &str,
505 11109 : cancel: &CancellationToken,
506 11109 : ) -> Result<T, DownloadError>
507 11109 : where
508 11109 : O: FnMut() -> F,
509 11109 : F: Future<Output = Result<T, DownloadError>>,
510 11109 : {
511 11109 : backoff::retry(
512 11109 : op,
513 11109 : DownloadError::is_permanent,
514 11109 : FAILED_DOWNLOAD_WARN_THRESHOLD,
515 11109 : FAILED_REMOTE_OP_RETRIES,
516 11109 : description,
517 11109 : cancel,
518 11109 : )
519 465039 : .await
520 11109 : .ok_or_else(|| DownloadError::Cancelled)
521 11109 : .and_then(|x| x)
522 11109 : }
523 :
524 1951 : async fn download_retry_forever<T, O, F>(
525 1951 : op: O,
526 1951 : description: &str,
527 1951 : cancel: &CancellationToken,
528 1951 : ) -> Result<T, DownloadError>
529 1951 : where
530 1951 : O: FnMut() -> F,
531 1951 : F: Future<Output = Result<T, DownloadError>>,
532 1951 : {
533 1951 : backoff::retry(
534 1951 : op,
535 1951 : DownloadError::is_permanent,
536 1951 : FAILED_DOWNLOAD_WARN_THRESHOLD,
537 1951 : u32::MAX,
538 1951 : description,
539 1951 : cancel,
540 1951 : )
541 4801 : .await
542 1951 : .ok_or_else(|| DownloadError::Cancelled)
543 1951 : .and_then(|x| x)
544 1951 : }
|