TLA Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 :
9 : use anyhow::{anyhow, Context};
10 : use camino::{Utf8Path, Utf8PathBuf};
11 : use pageserver_api::shard::TenantShardId;
12 : use tokio::fs::{self, File, OpenOptions};
13 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::warn;
16 : use utils::timeout::timeout_cancellable;
17 : use utils::{backoff, crashsafe};
18 :
19 : use crate::config::PageServerConf;
20 : use crate::tenant::remote_timeline_client::{
21 : download_cancellable, remote_layer_path, remote_timelines_path, DOWNLOAD_TIMEOUT,
22 : };
23 : use crate::tenant::storage_layer::LayerFileName;
24 : use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
25 : use crate::tenant::Generation;
26 : use crate::virtual_file::on_fatal_io_error;
27 : use crate::TEMP_FILE_SUFFIX;
28 : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
29 : use utils::crashsafe::path_with_suffix_extension;
30 : use utils::id::TimelineId;
31 :
32 : use super::index::{IndexPart, LayerFileMetadata};
33 : use super::{
34 : parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
35 : FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
36 : };
37 :
38 : ///
39 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
40 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
41 : ///
42 : /// Returns the size of the downloaded file.
43 CBC 10635 : pub async fn download_layer_file<'a>(
44 10635 : conf: &'static PageServerConf,
45 10635 : storage: &'a GenericRemoteStorage,
46 10635 : tenant_shard_id: TenantShardId,
47 10635 : timeline_id: TimelineId,
48 10635 : layer_file_name: &'a LayerFileName,
49 10635 : layer_metadata: &'a LayerFileMetadata,
50 10635 : cancel: &CancellationToken,
51 10635 : ) -> Result<u64, DownloadError> {
52 10635 : debug_assert_current_span_has_tenant_and_timeline_id();
53 10635 :
54 10635 : let local_path = conf
55 10635 : .timeline_path(&tenant_shard_id, &timeline_id)
56 10635 : .join(layer_file_name.file_name());
57 10635 :
58 10635 : let remote_path = remote_layer_path(
59 10635 : &tenant_shard_id.tenant_id,
60 10635 : &timeline_id,
61 10635 : layer_metadata.shard,
62 10635 : layer_file_name,
63 10635 : layer_metadata.generation,
64 10635 : );
65 10635 :
66 10635 : // Perform a rename inspired by durable_rename from file_utils.c.
67 10635 : // The sequence:
68 10635 : // write(tmp)
69 10635 : // fsync(tmp)
70 10635 : // rename(tmp, new)
71 10635 : // fsync(new)
72 10635 : // fsync(parent)
73 10635 : // For more context about durable_rename check this email from postgres mailing list:
74 10635 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
75 10635 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
76 10635 : let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
77 10635 :
78 10635 : let cancel_inner = cancel.clone();
79 10635 : let (mut destination_file, bytes_amount) = download_retry(
80 10675 : || async {
81 10675 : let destination_file = tokio::fs::File::create(&temp_file_path)
82 10142 : .await
83 10675 : .with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
84 10675 : .map_err(DownloadError::Other)?;
85 :
86 : // Cancellation safety: it is safe to cancel this future, because it isn't writing to a local
87 : // file: the write to local file doesn't start until after the request header is returned
88 : // and we start draining the body stream below
89 10675 : let download = download_cancellable(&cancel_inner, storage.download(&remote_path))
90 29649 : .await
91 10675 : .with_context(|| {
92 44 : format!(
93 44 : "open a download stream for layer with remote storage path '{remote_path:?}'"
94 44 : )
95 10675 : })
96 10675 : .map_err(DownloadError::Other)?;
97 :
98 10631 : let mut destination_file =
99 10631 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
100 10631 :
101 10631 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
102 :
103 : // Cancellation safety: it is safe to cancel this future because it is writing into a temporary file,
104 : // and we will unlink the temporary file if there is an error. This unlink is important because we
105 : // are in a retry loop, and we wouldn't want to leave behind a rogue write I/O to a file that
106 : // we will imminiently try and write to again.
107 10631 : let bytes_amount: u64 = match timeout_cancellable(
108 10631 : DOWNLOAD_TIMEOUT,
109 10631 : &cancel_inner,
110 10631 : tokio::io::copy_buf(&mut reader, &mut destination_file),
111 10631 : )
112 313106 : .await
113 10628 : .with_context(|| {
114 GBC 1 : format!(
115 1 : "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
116 1 : )
117 CBC 10628 : })
118 10628 : .map_err(DownloadError::Other)?
119 : {
120 10627 : Ok(b) => Ok(b),
121 UBC 0 : Err(e) => {
122 : // Remove incomplete files: on restart Timeline would do this anyway, but we must
123 : // do it here for the retry case.
124 0 : if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
125 0 : on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
126 0 : }
127 0 : Err(e)
128 : }
129 : }
130 CBC 10627 : .with_context(|| {
131 UBC 0 : format!(
132 0 : "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
133 0 : )
134 CBC 10627 : })
135 10627 : .map_err(DownloadError::Other)?;
136 :
137 10627 : let destination_file = destination_file.into_inner();
138 10627 :
139 10627 : Ok((destination_file, bytes_amount))
140 10672 : },
141 10635 : &format!("download {remote_path:?}"),
142 10635 : cancel,
143 10635 : )
144 352897 : .await?;
145 :
146 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
147 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
148 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
149 : // you should call flush before dropping it.
150 : //
151 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
152 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
153 : // But for additional safety lets check/wait for any pending operations.
154 10627 : destination_file
155 10627 : .flush()
156 UBC 0 : .await
157 CBC 10627 : .with_context(|| format!("flush source file at {temp_file_path}"))
158 10627 : .map_err(DownloadError::Other)?;
159 :
160 10627 : let expected = layer_metadata.file_size();
161 10627 : if expected != bytes_amount {
162 UBC 0 : return Err(DownloadError::Other(anyhow!(
163 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
164 0 : )));
165 CBC 10627 : }
166 10627 :
167 10627 : // not using sync_data because it can lose file size update
168 10627 : destination_file
169 10627 : .sync_all()
170 10624 : .await
171 10627 : .with_context(|| format!("failed to fsync source file at {temp_file_path}"))
172 10627 : .map_err(DownloadError::Other)?;
173 10627 : drop(destination_file);
174 10627 :
175 10627 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
176 6 : Err(DownloadError::Other(anyhow!(
177 6 : "remote-storage-download-pre-rename failpoint triggered"
178 6 : )))
179 10627 : });
180 :
181 10621 : fs::rename(&temp_file_path, &local_path)
182 10249 : .await
183 10621 : .with_context(|| format!("rename download layer file to {local_path}"))
184 10621 : .map_err(DownloadError::Other)?;
185 :
186 10621 : crashsafe::fsync_async(&local_path)
187 20759 : .await
188 10621 : .with_context(|| format!("fsync layer file {local_path}"))
189 10621 : .map_err(DownloadError::Other)?;
190 :
191 UBC 0 : tracing::debug!("download complete: {local_path}");
192 :
193 CBC 10621 : Ok(bytes_amount)
194 10632 : }
195 :
196 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
197 :
198 59 : pub fn is_temp_download_file(path: &Utf8Path) -> bool {
199 59 : let extension = path.extension();
200 59 : match extension {
201 8 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
202 6 : Some(_) => false,
203 51 : None => false,
204 : }
205 59 : }
206 :
207 : /// List timelines of given tenant in remote storage
208 308 : pub async fn list_remote_timelines(
209 308 : storage: &GenericRemoteStorage,
210 308 : tenant_shard_id: TenantShardId,
211 308 : cancel: CancellationToken,
212 308 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
213 308 : let remote_path = remote_timelines_path(&tenant_shard_id);
214 308 :
215 308 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
216 6 : anyhow::bail!("storage-sync-list-remote-timelines");
217 308 : });
218 :
219 302 : let cancel_inner = cancel.clone();
220 302 : let listing = download_retry_forever(
221 324 : || {
222 324 : download_cancellable(
223 324 : &cancel_inner,
224 324 : storage.list(Some(&remote_path), ListingMode::WithDelimiter),
225 324 : )
226 324 : },
227 302 : &format!("list timelines for {tenant_shard_id}"),
228 302 : cancel,
229 302 : )
230 1055 : .await?;
231 :
232 302 : let mut timeline_ids = HashSet::new();
233 302 : let mut other_prefixes = HashSet::new();
234 :
235 696 : for timeline_remote_storage_key in listing.prefixes {
236 394 : let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
237 UBC 0 : anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
238 CBC 394 : })?;
239 :
240 394 : match object_name.parse::<TimelineId>() {
241 394 : Ok(t) => timeline_ids.insert(t),
242 UBC 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
243 : };
244 : }
245 :
246 CBC 317 : for key in listing.keys {
247 15 : let object_name = key
248 15 : .object_name()
249 15 : .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
250 15 : other_prefixes.insert(object_name.to_string());
251 : }
252 :
253 302 : Ok((timeline_ids, other_prefixes))
254 308 : }
255 :
256 950 : async fn do_download_index_part(
257 950 : storage: &GenericRemoteStorage,
258 950 : tenant_shard_id: &TenantShardId,
259 950 : timeline_id: &TimelineId,
260 950 : index_generation: Generation,
261 950 : cancel: CancellationToken,
262 950 : ) -> Result<IndexPart, DownloadError> {
263 950 : use futures::stream::StreamExt;
264 950 :
265 950 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
266 950 :
267 950 : let cancel_inner = cancel.clone();
268 950 : let index_part_bytes = download_retry_forever(
269 1031 : || async {
270 : // Cancellation: if is safe to cancel this future because we're just downloading into
271 : // a memory buffer, not touching local disk.
272 376 : let index_part_download =
273 1664 : download_cancellable(&cancel_inner, storage.download(&remote_path)).await?;
274 :
275 376 : let mut index_part_bytes = Vec::new();
276 376 : let mut stream = std::pin::pin!(index_part_download.download_stream);
277 1216 : while let Some(chunk) = stream.next().await {
278 840 : let chunk = chunk
279 840 : .with_context(|| format!("download index part at {remote_path:?}"))
280 840 : .map_err(DownloadError::Other)?;
281 840 : index_part_bytes.extend_from_slice(&chunk[..]);
282 : }
283 376 : Ok(index_part_bytes)
284 1031 : },
285 950 : &format!("download {remote_path:?}"),
286 950 : cancel,
287 950 : )
288 2611 : .await?;
289 :
290 376 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
291 376 : .with_context(|| format!("download index part file at {remote_path:?}"))
292 376 : .map_err(DownloadError::Other)?;
293 :
294 376 : Ok(index_part)
295 950 : }
296 :
297 : /// index_part.json objects are suffixed with a generation number, so we cannot
298 : /// directly GET the latest index part without doing some probing.
299 : ///
300 : /// In this function we probe for the most recent index in a generation <= our current generation.
301 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
302 159 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
303 : pub(super) async fn download_index_part(
304 : storage: &GenericRemoteStorage,
305 : tenant_shard_id: &TenantShardId,
306 : timeline_id: &TimelineId,
307 : my_generation: Generation,
308 : cancel: CancellationToken,
309 : ) -> Result<IndexPart, DownloadError> {
310 : debug_assert_current_span_has_tenant_and_timeline_id();
311 :
312 : if my_generation.is_none() {
313 : // Operating without generations: just fetch the generation-less path
314 : return do_download_index_part(
315 : storage,
316 : tenant_shard_id,
317 : timeline_id,
318 : my_generation,
319 : cancel,
320 : )
321 : .await;
322 : }
323 :
324 : // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
325 : // index in our generation.
326 : //
327 : // This is an optimization to avoid doing the listing for the general case below.
328 : let res = do_download_index_part(
329 : storage,
330 : tenant_shard_id,
331 : timeline_id,
332 : my_generation,
333 : cancel.clone(),
334 : )
335 : .await;
336 : match res {
337 : Ok(index_part) => {
338 UBC 0 : tracing::debug!(
339 0 : "Found index_part from current generation (this is a stale attachment)"
340 0 : );
341 : return Ok(index_part);
342 : }
343 : Err(DownloadError::NotFound) => {}
344 : Err(e) => return Err(e),
345 : };
346 :
347 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded
348 : // and index part. We may safely start from this index without doing a listing, because:
349 : // - We checked for current generation case above
350 : // - generations > my_generation are to be ignored
351 : // - any other indices that exist would have an older generation than `previous_gen`, and
352 : // we want to find the most recent index from a previous generation.
353 : //
354 : // This is an optimization to avoid doing the listing for the general case below.
355 : let res = do_download_index_part(
356 : storage,
357 : tenant_shard_id,
358 : timeline_id,
359 : my_generation.previous(),
360 : cancel.clone(),
361 : )
362 : .await;
363 : match res {
364 : Ok(index_part) => {
365 0 : tracing::debug!("Found index_part from previous generation");
366 : return Ok(index_part);
367 : }
368 : Err(DownloadError::NotFound) => {
369 0 : tracing::debug!(
370 0 : "No index_part found from previous generation, falling back to listing"
371 0 : );
372 : }
373 : Err(e) => {
374 : return Err(e);
375 : }
376 : }
377 :
378 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
379 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
380 : // to constructing a full index path with no generation, because the generation is a suffix.
381 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
382 : let indices = backoff::retry(
383 CBC 390 : || async { storage.list_files(Some(&index_prefix)).await },
384 15 : |_| false,
385 : FAILED_DOWNLOAD_WARN_THRESHOLD,
386 : FAILED_REMOTE_OP_RETRIES,
387 : "listing index_part files",
388 UBC 0 : backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")),
389 : )
390 : .await
391 : .map_err(DownloadError::Other)?;
392 :
393 : // General case logic for which index to use: the latest index whose generation
394 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
395 : let max_previous_generation = indices
396 : .into_iter()
397 : .filter_map(parse_remote_index_path)
398 CBC 370 : .filter(|g| g <= &my_generation)
399 : .max();
400 :
401 : match max_previous_generation {
402 : Some(g) => {
403 UBC 0 : tracing::debug!("Found index_part in generation {g:?}");
404 : do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
405 : }
406 : None => {
407 : // Migration from legacy pre-generation state: we have a generation but no prior
408 : // attached pageservers did. Try to load from a no-generation path.
409 0 : tracing::debug!("No index_part.json* found");
410 : do_download_index_part(
411 : storage,
412 : tenant_shard_id,
413 : timeline_id,
414 : Generation::none(),
415 : cancel,
416 : )
417 : .await
418 : }
419 : }
420 : }
421 :
422 CBC 3 : pub(crate) async fn download_initdb_tar_zst(
423 3 : conf: &'static PageServerConf,
424 3 : storage: &GenericRemoteStorage,
425 3 : tenant_shard_id: &TenantShardId,
426 3 : timeline_id: &TimelineId,
427 3 : cancel: &CancellationToken,
428 3 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
429 3 : debug_assert_current_span_has_tenant_and_timeline_id();
430 3 :
431 3 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
432 3 :
433 3 : let timeline_path = conf.timelines_path(tenant_shard_id);
434 3 :
435 3 : if !timeline_path.exists() {
436 UBC 0 : tokio::fs::create_dir_all(&timeline_path)
437 0 : .await
438 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
439 0 : .map_err(DownloadError::Other)?;
440 CBC 3 : }
441 3 : let temp_path = timeline_path.join(format!(
442 3 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
443 3 : ));
444 3 :
445 3 : let cancel_inner = cancel.clone();
446 :
447 3 : let file = download_retry(
448 3 : || async {
449 3 : let file = OpenOptions::new()
450 3 : .create(true)
451 3 : .truncate(true)
452 3 : .read(true)
453 3 : .write(true)
454 3 : .open(&temp_path)
455 3 : .await
456 3 : .with_context(|| format!("tempfile creation {temp_path}"))
457 3 : .map_err(DownloadError::Other)?;
458 :
459 3 : let download =
460 3 : download_cancellable(&cancel_inner, storage.download(&remote_path)).await?;
461 3 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
462 3 : let mut writer = tokio::io::BufWriter::with_capacity(8 * 1024, file);
463 3 :
464 3 : // TODO: this consumption of the response body should be subject to timeout + cancellation, but
465 3 : // not without thinking carefully about how to recover safely from cancelling a write to
466 3 : // local storage (e.g. by writing into a temp file as we do in download_layer)
467 3 : tokio::io::copy_buf(&mut download, &mut writer)
468 1238 : .await
469 3 : .with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))
470 3 : .map_err(DownloadError::Other)?;
471 :
472 3 : let mut file = writer.into_inner();
473 3 :
474 3 : file.seek(std::io::SeekFrom::Start(0))
475 2 : .await
476 3 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
477 3 : .map_err(DownloadError::Other)?;
478 :
479 3 : Ok(file)
480 3 : },
481 3 : &format!("download {remote_path}"),
482 3 : cancel,
483 3 : )
484 1246 : .await
485 3 : .map_err(|e| {
486 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
487 : // We don't have async here nor do we want to pile on any extra errors.
488 UBC 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
489 0 : if e.kind() != std::io::ErrorKind::NotFound {
490 0 : warn!("error deleting temporary file {temp_path}: {e}");
491 0 : }
492 0 : }
493 0 : e
494 CBC 3 : })?;
495 :
496 3 : Ok((temp_path, file))
497 3 : }
498 :
499 : /// Helper function to handle retries for a download operation.
500 : ///
501 : /// Remote operations can fail due to rate limits (IAM, S3), spurious network
502 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
503 : /// with backoff.
504 : ///
505 : /// (See similar logic for uploads in `perform_upload_task`)
506 10638 : async fn download_retry<T, O, F>(
507 10638 : op: O,
508 10638 : description: &str,
509 10638 : cancel: &CancellationToken,
510 10638 : ) -> Result<T, DownloadError>
511 10638 : where
512 10638 : O: FnMut() -> F,
513 10638 : F: Future<Output = Result<T, DownloadError>>,
514 10638 : {
515 10638 : backoff::retry(
516 10638 : op,
517 10638 : |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
518 10638 : FAILED_DOWNLOAD_WARN_THRESHOLD,
519 10638 : FAILED_REMOTE_OP_RETRIES,
520 10638 : description,
521 10638 : backoff::Cancel::new(cancel.clone(), || DownloadError::Cancelled),
522 10638 : )
523 354143 : .await
524 10635 : }
525 :
526 1252 : async fn download_retry_forever<T, O, F>(
527 1252 : op: O,
528 1252 : description: &str,
529 1252 : cancel: CancellationToken,
530 1252 : ) -> Result<T, DownloadError>
531 1252 : where
532 1252 : O: FnMut() -> F,
533 1252 : F: Future<Output = Result<T, DownloadError>>,
534 1252 : {
535 1252 : backoff::retry(
536 1252 : op,
537 1252 : |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
538 1252 : FAILED_DOWNLOAD_WARN_THRESHOLD,
539 1252 : u32::MAX,
540 1252 : description,
541 1252 : backoff::Cancel::new(cancel, || DownloadError::Cancelled),
542 1252 : )
543 3666 : .await
544 1252 : }
|