LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 54.6 % 291 159
Test Date: 2024-02-29 11:57:12 Functions: 41.8 % 67 28

            Line data    Source code
       1              : //! Helper functions to download files from remote storage with a RemoteStorage
       2              : //!
       3              : //! The functions in this module retry failed operations automatically, according
       4              : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5              : 
       6              : use std::collections::HashSet;
       7              : use std::future::Future;
       8              : 
       9              : use anyhow::{anyhow, Context};
      10              : use camino::{Utf8Path, Utf8PathBuf};
      11              : use pageserver_api::shard::TenantShardId;
      12              : use tokio::fs::{self, File, OpenOptions};
      13              : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
      14              : use tokio_util::io::StreamReader;
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::warn;
      17              : use utils::{backoff, crashsafe};
      18              : 
      19              : use crate::config::PageServerConf;
      20              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      21              : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
      22              : use crate::tenant::storage_layer::LayerFileName;
      23              : use crate::tenant::Generation;
      24              : use crate::virtual_file::on_fatal_io_error;
      25              : use crate::TEMP_FILE_SUFFIX;
      26              : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
      27              : use utils::crashsafe::path_with_suffix_extension;
      28              : use utils::id::TimelineId;
      29              : 
      30              : use super::index::{IndexPart, LayerFileMetadata};
      31              : use super::{
      32              :     parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
      33              :     remote_initdb_preserved_archive_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
      34              :     INITDB_PATH,
      35              : };
      36              : 
      37              : ///
      38              : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      39              : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      40              : ///
      41              : /// Returns the size of the downloaded file.
      42            0 : pub async fn download_layer_file<'a>(
      43            0 :     conf: &'static PageServerConf,
      44            0 :     storage: &'a GenericRemoteStorage,
      45            0 :     tenant_shard_id: TenantShardId,
      46            0 :     timeline_id: TimelineId,
      47            0 :     layer_file_name: &'a LayerFileName,
      48            0 :     layer_metadata: &'a LayerFileMetadata,
      49            0 :     cancel: &CancellationToken,
      50            0 : ) -> Result<u64, DownloadError> {
      51            0 :     debug_assert_current_span_has_tenant_and_timeline_id();
      52            0 : 
      53            0 :     let local_path = conf
      54            0 :         .timeline_path(&tenant_shard_id, &timeline_id)
      55            0 :         .join(layer_file_name.file_name());
      56            0 : 
      57            0 :     let remote_path = remote_layer_path(
      58            0 :         &tenant_shard_id.tenant_id,
      59            0 :         &timeline_id,
      60            0 :         layer_metadata.shard,
      61            0 :         layer_file_name,
      62            0 :         layer_metadata.generation,
      63            0 :     );
      64            0 : 
      65            0 :     // Perform a rename inspired by durable_rename from file_utils.c.
      66            0 :     // The sequence:
      67            0 :     //     write(tmp)
      68            0 :     //     fsync(tmp)
      69            0 :     //     rename(tmp, new)
      70            0 :     //     fsync(new)
      71            0 :     //     fsync(parent)
      72            0 :     // For more context about durable_rename check this email from postgres mailing list:
      73            0 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      74            0 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      75            0 :     let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
      76              : 
      77            0 :     let (mut destination_file, bytes_amount) = download_retry(
      78            0 :         || async {
      79            0 :             let destination_file = tokio::fs::File::create(&temp_file_path)
      80            0 :                 .await
      81            0 :                 .with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
      82            0 :                 .map_err(DownloadError::Other)?;
      83              : 
      84            0 :             let download = storage.download(&remote_path, cancel).await?;
      85              : 
      86            0 :             let mut destination_file =
      87            0 :                 tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
      88            0 : 
      89            0 :             let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
      90              : 
      91            0 :             let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file).await;
      92              : 
      93            0 :             match bytes_amount {
      94            0 :                 Ok(bytes_amount) => {
      95            0 :                     let destination_file = destination_file.into_inner();
      96            0 :                     Ok((destination_file, bytes_amount))
      97              :                 }
      98            0 :                 Err(e) => {
      99            0 :                     if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
     100            0 :                         on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
     101            0 :                     }
     102            0 : 
     103            0 :                     Err(e.into())
     104              :                 }
     105              :             }
     106            0 :         },
     107            0 :         &format!("download {remote_path:?}"),
     108            0 :         cancel,
     109            0 :     )
     110            0 :     .await?;
     111              : 
     112              :     // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     113              :     // A file will not be closed immediately when it goes out of scope if there are any IO operations
     114              :     // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     115              :     // you should call flush before dropping it.
     116              :     //
     117              :     // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     118              :     // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     119              :     // But for additional safety lets check/wait for any pending operations.
     120            0 :     destination_file
     121            0 :         .flush()
     122            0 :         .await
     123            0 :         .with_context(|| format!("flush source file at {temp_file_path}"))
     124            0 :         .map_err(DownloadError::Other)?;
     125              : 
     126            0 :     let expected = layer_metadata.file_size();
     127            0 :     if expected != bytes_amount {
     128            0 :         return Err(DownloadError::Other(anyhow!(
     129            0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
     130            0 :         )));
     131            0 :     }
     132            0 : 
     133            0 :     // not using sync_data because it can lose file size update
     134            0 :     destination_file
     135            0 :         .sync_all()
     136            0 :         .await
     137            0 :         .with_context(|| format!("failed to fsync source file at {temp_file_path}"))
     138            0 :         .map_err(DownloadError::Other)?;
     139            0 :     drop(destination_file);
     140            0 : 
     141            0 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
     142            0 :         Err(DownloadError::Other(anyhow!(
     143            0 :             "remote-storage-download-pre-rename failpoint triggered"
     144            0 :         )))
     145            0 :     });
     146              : 
     147            0 :     fs::rename(&temp_file_path, &local_path)
     148            0 :         .await
     149            0 :         .with_context(|| format!("rename download layer file to {local_path}"))
     150            0 :         .map_err(DownloadError::Other)?;
     151              : 
     152            0 :     crashsafe::fsync_async(&local_path)
     153            0 :         .await
     154            0 :         .with_context(|| format!("fsync layer file {local_path}"))
     155            0 :         .map_err(DownloadError::Other)?;
     156              : 
     157            0 :     tracing::debug!("download complete: {local_path}");
     158              : 
     159            0 :     Ok(bytes_amount)
     160            0 : }
     161              : 
     162              : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     163              : 
     164            0 : pub fn is_temp_download_file(path: &Utf8Path) -> bool {
     165            0 :     let extension = path.extension();
     166            0 :     match extension {
     167            0 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     168            0 :         Some(_) => false,
     169            0 :         None => false,
     170              :     }
     171            0 : }
     172              : 
     173              : /// List timelines of given tenant in remote storage
     174           88 : pub async fn list_remote_timelines(
     175           88 :     storage: &GenericRemoteStorage,
     176           88 :     tenant_shard_id: TenantShardId,
     177           88 :     cancel: CancellationToken,
     178           88 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
     179           88 :     let remote_path = remote_timelines_path(&tenant_shard_id);
     180           88 : 
     181           88 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     182            0 :         anyhow::bail!("storage-sync-list-remote-timelines");
     183           88 :     });
     184              : 
     185           88 :     let listing = download_retry_forever(
     186           88 :         || {
     187           88 :             storage.list(
     188           88 :                 Some(&remote_path),
     189           88 :                 ListingMode::WithDelimiter,
     190           88 :                 None,
     191           88 :                 &cancel,
     192           88 :             )
     193           88 :         },
     194           88 :         &format!("list timelines for {tenant_shard_id}"),
     195           88 :         &cancel,
     196           88 :     )
     197           10 :     .await?;
     198              : 
     199           88 :     let mut timeline_ids = HashSet::new();
     200           88 :     let mut other_prefixes = HashSet::new();
     201              : 
     202           94 :     for timeline_remote_storage_key in listing.prefixes {
     203            6 :         let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
     204            0 :             anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
     205            6 :         })?;
     206              : 
     207            6 :         match object_name.parse::<TimelineId>() {
     208            6 :             Ok(t) => timeline_ids.insert(t),
     209            0 :             Err(_) => other_prefixes.insert(object_name.to_string()),
     210              :         };
     211              :     }
     212              : 
     213           88 :     for key in listing.keys {
     214            0 :         let object_name = key
     215            0 :             .object_name()
     216            0 :             .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
     217            0 :         other_prefixes.insert(object_name.to_string());
     218              :     }
     219              : 
     220           88 :     Ok((timeline_ids, other_prefixes))
     221           88 : }
     222              : 
     223           34 : async fn do_download_index_part(
     224           34 :     storage: &GenericRemoteStorage,
     225           34 :     tenant_shard_id: &TenantShardId,
     226           34 :     timeline_id: &TimelineId,
     227           34 :     index_generation: Generation,
     228           34 :     cancel: &CancellationToken,
     229           34 : ) -> Result<IndexPart, DownloadError> {
     230           34 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
     231              : 
     232           34 :     let index_part_bytes = download_retry_forever(
     233           34 :         || async {
     234           34 :             let download = storage.download(&remote_path, cancel).await?;
     235              : 
     236           20 :             let mut bytes = Vec::new();
     237           20 : 
     238           20 :             let stream = download.download_stream;
     239           20 :             let mut stream = StreamReader::new(stream);
     240           20 : 
     241           40 :             tokio::io::copy_buf(&mut stream, &mut bytes).await?;
     242              : 
     243           20 :             Ok(bytes)
     244           68 :         },
     245           34 :         &format!("download {remote_path:?}"),
     246           34 :         cancel,
     247           34 :     )
     248           60 :     .await?;
     249              : 
     250           20 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     251           20 :         .with_context(|| format!("deserialize index part file at {remote_path:?}"))
     252           20 :         .map_err(DownloadError::Other)?;
     253              : 
     254           20 :     Ok(index_part)
     255           34 : }
     256              : 
     257              : /// index_part.json objects are suffixed with a generation number, so we cannot
     258              : /// directly GET the latest index part without doing some probing.
     259              : ///
     260              : /// In this function we probe for the most recent index in a generation <= our current generation.
     261              : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     262           40 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     263              : pub(super) async fn download_index_part(
     264              :     storage: &GenericRemoteStorage,
     265              :     tenant_shard_id: &TenantShardId,
     266              :     timeline_id: &TimelineId,
     267              :     my_generation: Generation,
     268              :     cancel: &CancellationToken,
     269              : ) -> Result<IndexPart, DownloadError> {
     270              :     debug_assert_current_span_has_tenant_and_timeline_id();
     271              : 
     272              :     if my_generation.is_none() {
     273              :         // Operating without generations: just fetch the generation-less path
     274              :         return do_download_index_part(
     275              :             storage,
     276              :             tenant_shard_id,
     277              :             timeline_id,
     278              :             my_generation,
     279              :             cancel,
     280              :         )
     281              :         .await;
     282              :     }
     283              : 
     284              :     // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
     285              :     // index in our generation.
     286              :     //
     287              :     // This is an optimization to avoid doing the listing for the general case below.
     288              :     let res =
     289              :         do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
     290              :     match res {
     291              :         Ok(index_part) => {
     292            0 :             tracing::debug!(
     293            0 :                 "Found index_part from current generation (this is a stale attachment)"
     294            0 :             );
     295              :             return Ok(index_part);
     296              :         }
     297              :         Err(DownloadError::NotFound) => {}
     298              :         Err(e) => return Err(e),
     299              :     };
     300              : 
     301              :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded
     302              :     // and index part.  We may safely start from this index without doing a listing, because:
     303              :     //  - We checked for current generation case above
     304              :     //  - generations > my_generation are to be ignored
     305              :     //  - any other indices that exist would have an older generation than `previous_gen`, and
     306              :     //    we want to find the most recent index from a previous generation.
     307              :     //
     308              :     // This is an optimization to avoid doing the listing for the general case below.
     309              :     let res = do_download_index_part(
     310              :         storage,
     311              :         tenant_shard_id,
     312              :         timeline_id,
     313              :         my_generation.previous(),
     314              :         cancel,
     315              :     )
     316              :     .await;
     317              :     match res {
     318              :         Ok(index_part) => {
     319            0 :             tracing::debug!("Found index_part from previous generation");
     320              :             return Ok(index_part);
     321              :         }
     322              :         Err(DownloadError::NotFound) => {
     323            0 :             tracing::debug!(
     324            0 :                 "No index_part found from previous generation, falling back to listing"
     325            0 :             );
     326              :         }
     327              :         Err(e) => {
     328              :             return Err(e);
     329              :         }
     330              :     }
     331              : 
     332              :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     333              :     // objects, and select the highest one with a generation <= my_generation.  Constructing the prefix is equivalent
     334              :     // to constructing a full index path with no generation, because the generation is a suffix.
     335              :     let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
     336              : 
     337              :     let indices = download_retry(
     338           12 :         || async { storage.list_files(Some(&index_prefix), None, cancel).await },
     339              :         "list index_part files",
     340              :         cancel,
     341              :     )
     342              :     .await?;
     343              : 
     344              :     // General case logic for which index to use: the latest index whose generation
     345              :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     346              :     let max_previous_generation = indices
     347              :         .into_iter()
     348              :         .filter_map(parse_remote_index_path)
     349           12 :         .filter(|g| g <= &my_generation)
     350              :         .max();
     351              : 
     352              :     match max_previous_generation {
     353              :         Some(g) => {
     354            0 :             tracing::debug!("Found index_part in generation {g:?}");
     355              :             do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
     356              :         }
     357              :         None => {
     358              :             // Migration from legacy pre-generation state: we have a generation but no prior
     359              :             // attached pageservers did.  Try to load from a no-generation path.
     360            0 :             tracing::debug!("No index_part.json* found");
     361              :             do_download_index_part(
     362              :                 storage,
     363              :                 tenant_shard_id,
     364              :                 timeline_id,
     365              :                 Generation::none(),
     366              :                 cancel,
     367              :             )
     368              :             .await
     369              :         }
     370              :     }
     371              : }
     372              : 
     373            2 : pub(crate) async fn download_initdb_tar_zst(
     374            2 :     conf: &'static PageServerConf,
     375            2 :     storage: &GenericRemoteStorage,
     376            2 :     tenant_shard_id: &TenantShardId,
     377            2 :     timeline_id: &TimelineId,
     378            2 :     cancel: &CancellationToken,
     379            2 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
     380            2 :     debug_assert_current_span_has_tenant_and_timeline_id();
     381            2 : 
     382            2 :     let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     383            2 : 
     384            2 :     let remote_preserved_path =
     385            2 :         remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
     386            2 : 
     387            2 :     let timeline_path = conf.timelines_path(tenant_shard_id);
     388            2 : 
     389            2 :     if !timeline_path.exists() {
     390            0 :         tokio::fs::create_dir_all(&timeline_path)
     391            0 :             .await
     392            0 :             .with_context(|| format!("timeline dir creation {timeline_path}"))
     393            0 :             .map_err(DownloadError::Other)?;
     394            2 :     }
     395            2 :     let temp_path = timeline_path.join(format!(
     396            2 :         "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
     397            2 :     ));
     398              : 
     399            2 :     let file = download_retry(
     400            2 :         || async {
     401            2 :             let file = OpenOptions::new()
     402            2 :                 .create(true)
     403            2 :                 .truncate(true)
     404            2 :                 .read(true)
     405            2 :                 .write(true)
     406            2 :                 .open(&temp_path)
     407            2 :                 .await
     408            2 :                 .with_context(|| format!("tempfile creation {temp_path}"))
     409            2 :                 .map_err(DownloadError::Other)?;
     410              : 
     411            2 :             let download = match storage.download(&remote_path, cancel).await {
     412            2 :                 Ok(dl) => dl,
     413              :                 Err(DownloadError::NotFound) => {
     414            0 :                     storage.download(&remote_preserved_path, cancel).await?
     415              :                 }
     416            0 :                 Err(other) => Err(other)?,
     417              :             };
     418            2 :             let mut download = tokio_util::io::StreamReader::new(download.download_stream);
     419            2 :             let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
     420            2 : 
     421          708 :             tokio::io::copy_buf(&mut download, &mut writer).await?;
     422              : 
     423            2 :             let mut file = writer.into_inner();
     424            2 : 
     425            2 :             file.seek(std::io::SeekFrom::Start(0))
     426            2 :                 .await
     427            2 :                 .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
     428            2 :                 .map_err(DownloadError::Other)?;
     429              : 
     430            2 :             Ok(file)
     431            4 :         },
     432            2 :         &format!("download {remote_path}"),
     433            2 :         cancel,
     434            2 :     )
     435          714 :     .await
     436            2 :     .map_err(|e| {
     437              :         // Do a best-effort attempt at deleting the temporary file upon encountering an error.
     438              :         // We don't have async here nor do we want to pile on any extra errors.
     439            0 :         if let Err(e) = std::fs::remove_file(&temp_path) {
     440            0 :             if e.kind() != std::io::ErrorKind::NotFound {
     441            0 :                 warn!("error deleting temporary file {temp_path}: {e}");
     442            0 :             }
     443            0 :         }
     444            0 :         e
     445            2 :     })?;
     446              : 
     447            2 :     Ok((temp_path, file))
     448            2 : }
     449              : 
     450              : /// Helper function to handle retries for a download operation.
     451              : ///
     452              : /// Remote operations can fail due to rate limits (S3), spurious network
     453              : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     454              : /// with backoff.
     455              : ///
     456              : /// (See similar logic for uploads in `perform_upload_task`)
     457            8 : pub(super) async fn download_retry<T, O, F>(
     458            8 :     op: O,
     459            8 :     description: &str,
     460            8 :     cancel: &CancellationToken,
     461            8 : ) -> Result<T, DownloadError>
     462            8 : where
     463            8 :     O: FnMut() -> F,
     464            8 :     F: Future<Output = Result<T, DownloadError>>,
     465            8 : {
     466            8 :     backoff::retry(
     467            8 :         op,
     468            8 :         DownloadError::is_permanent,
     469            8 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     470            8 :         FAILED_REMOTE_OP_RETRIES,
     471            8 :         description,
     472            8 :         cancel,
     473            8 :     )
     474          726 :     .await
     475            8 :     .ok_or_else(|| DownloadError::Cancelled)
     476            8 :     .and_then(|x| x)
     477            8 : }
     478              : 
     479          122 : async fn download_retry_forever<T, O, F>(
     480          122 :     op: O,
     481          122 :     description: &str,
     482          122 :     cancel: &CancellationToken,
     483          122 : ) -> Result<T, DownloadError>
     484          122 : where
     485          122 :     O: FnMut() -> F,
     486          122 :     F: Future<Output = Result<T, DownloadError>>,
     487          122 : {
     488          122 :     backoff::retry(
     489          122 :         op,
     490          122 :         DownloadError::is_permanent,
     491          122 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     492          122 :         u32::MAX,
     493          122 :         description,
     494          122 :         cancel,
     495          122 :     )
     496           70 :     .await
     497          122 :     .ok_or_else(|| DownloadError::Cancelled)
     498          122 :     .and_then(|x| x)
     499          122 : }
        

Generated by: LCOV version 2.1-beta