LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 88.6 % 201 178 23 178
Current Date: 2023-10-19 02:04:12 Functions: 61.5 % 52 32 20 32
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Helper functions to download files from remote storage with a RemoteStorage
       2                 : //!
       3                 : //! The functions in this module retry failed operations automatically, according
       4                 : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5                 : 
       6                 : use std::collections::HashSet;
       7                 : use std::future::Future;
       8                 : use std::time::Duration;
       9                 : 
      10                 : use anyhow::{anyhow, Context};
      11                 : use camino::Utf8Path;
      12                 : use tokio::fs;
      13                 : use tokio::io::AsyncWriteExt;
      14                 : use tokio_util::sync::CancellationToken;
      15                 : use utils::{backoff, crashsafe};
      16                 : 
      17                 : use crate::config::PageServerConf;
      18                 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
      19                 : use crate::tenant::storage_layer::LayerFileName;
      20                 : use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
      21                 : use crate::tenant::Generation;
      22                 : use remote_storage::{DownloadError, GenericRemoteStorage};
      23                 : use utils::crashsafe::path_with_suffix_extension;
      24                 : use utils::id::{TenantId, TimelineId};
      25                 : 
      26                 : use super::index::{IndexPart, LayerFileMetadata};
      27                 : use super::{
      28                 :     parse_remote_index_path, remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
      29                 :     FAILED_REMOTE_OP_RETRIES,
      30                 : };
      31                 : 
      32                 : static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120);
      33                 : 
      34                 : ///
      35                 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      36                 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      37                 : ///
      38                 : /// Returns the size of the downloaded file.
      39 CBC        1379 : pub async fn download_layer_file<'a>(
      40            1379 :     conf: &'static PageServerConf,
      41            1379 :     storage: &'a GenericRemoteStorage,
      42            1379 :     tenant_id: TenantId,
      43            1379 :     timeline_id: TimelineId,
      44            1379 :     layer_file_name: &'a LayerFileName,
      45            1379 :     layer_metadata: &'a LayerFileMetadata,
      46            1379 : ) -> Result<u64, DownloadError> {
      47            1379 :     debug_assert_current_span_has_tenant_and_timeline_id();
      48            1379 : 
      49            1379 :     let local_path = conf
      50            1379 :         .timeline_path(&tenant_id, &timeline_id)
      51            1379 :         .join(layer_file_name.file_name());
      52            1379 : 
      53            1379 :     let remote_path = remote_layer_path(
      54            1379 :         &tenant_id,
      55            1379 :         &timeline_id,
      56            1379 :         layer_file_name,
      57            1379 :         layer_metadata.generation,
      58            1379 :     );
      59            1379 : 
      60            1379 :     // Perform a rename inspired by durable_rename from file_utils.c.
      61            1379 :     // The sequence:
      62            1379 :     //     write(tmp)
      63            1379 :     //     fsync(tmp)
      64            1379 :     //     rename(tmp, new)
      65            1379 :     //     fsync(new)
      66            1379 :     //     fsync(parent)
      67            1379 :     // For more context about durable_rename check this email from postgres mailing list:
      68            1379 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      69            1379 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      70            1379 :     let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
      71                 : 
      72            1379 :     let (mut destination_file, bytes_amount) = download_retry(
      73            1426 :         || async {
      74                 :             // TODO: this doesn't use the cached fd for some reason?
      75            1426 :             let mut destination_file = fs::File::create(&temp_file_path)
      76            1389 :                 .await
      77            1426 :                 .with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
      78            1426 :                 .map_err(DownloadError::Other)?;
      79            1426 :             let mut download = storage
      80            1426 :                 .download(&remote_path)
      81            2330 :                 .await
      82            1426 :                 .with_context(|| {
      83              47 :                     format!(
      84              47 :                     "open a download stream for layer with remote storage path '{remote_path:?}'"
      85              47 :                 )
      86            1426 :                 })
      87            1426 :                 .map_err(DownloadError::Other)?;
      88                 : 
      89            1379 :             let bytes_amount = tokio::time::timeout(
      90            1379 :                 MAX_DOWNLOAD_DURATION,
      91            1379 :                 tokio::io::copy(&mut download.download_stream, &mut destination_file),
      92            1379 :             )
      93          450593 :             .await
      94            1377 :             .map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out  {:?}", e)))?
      95            1377 :             .with_context(|| {
      96 UBC           0 :                 format!(
      97               0 :                     "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
      98               0 :                 )
      99 CBC        1377 :             })
     100            1377 :             .map_err(DownloadError::Other)?;
     101                 : 
     102            1377 :             Ok((destination_file, bytes_amount))
     103            1424 :         },
     104            1379 :         &format!("download {remote_path:?}"),
     105            1379 :     )
     106          454312 :     .await?;
     107                 : 
     108                 :     // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     109                 :     // A file will not be closed immediately when it goes out of scope if there are any IO operations
     110                 :     // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     111                 :     // you should call flush before dropping it.
     112                 :     //
     113                 :     // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     114                 :     // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     115                 :     // But for additional safety lets check/wait for any pending operations.
     116            1377 :     destination_file
     117            1377 :         .flush()
     118 UBC           0 :         .await
     119 CBC        1377 :         .with_context(|| format!("flush source file at {temp_file_path}"))
     120            1377 :         .map_err(DownloadError::Other)?;
     121                 : 
     122            1377 :     let expected = layer_metadata.file_size();
     123            1377 :     if expected != bytes_amount {
     124 UBC           0 :         return Err(DownloadError::Other(anyhow!(
     125               0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
     126               0 :         )));
     127 CBC        1377 :     }
     128            1377 : 
     129            1377 :     // not using sync_data because it can lose file size update
     130            1377 :     destination_file
     131            1377 :         .sync_all()
     132            1598 :         .await
     133            1376 :         .with_context(|| format!("failed to fsync source file at {temp_file_path}"))
     134            1376 :         .map_err(DownloadError::Other)?;
     135            1376 :     drop(destination_file);
     136            1376 : 
     137            1376 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
     138               6 :         Err(DownloadError::Other(anyhow!(
     139               6 :             "remote-storage-download-pre-rename failpoint triggered"
     140               6 :         )))
     141            1376 :     });
     142                 : 
     143            1370 :     fs::rename(&temp_file_path, &local_path)
     144            1344 :         .await
     145            1370 :         .with_context(|| format!("rename download layer file to {local_path}"))
     146            1370 :         .map_err(DownloadError::Other)?;
     147                 : 
     148            1370 :     crashsafe::fsync_async(&local_path)
     149            2709 :         .await
     150            1370 :         .with_context(|| format!("fsync layer file {local_path}"))
     151            1370 :         .map_err(DownloadError::Other)?;
     152                 : 
     153 UBC           0 :     tracing::debug!("download complete: {local_path}");
     154                 : 
     155 CBC        1370 :     Ok(bytes_amount)
     156            1376 : }
     157                 : 
     158                 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     159                 : 
     160              85 : pub fn is_temp_download_file(path: &Utf8Path) -> bool {
     161              85 :     let extension = path.extension();
     162              85 :     match extension {
     163               8 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     164               8 :         Some(_) => false,
     165              77 :         None => false,
     166                 :     }
     167              85 : }
     168                 : 
     169                 : /// List timelines of given tenant in remote storage
     170              48 : pub async fn list_remote_timelines(
     171              48 :     storage: &GenericRemoteStorage,
     172              48 :     tenant_id: TenantId,
     173              48 : ) -> anyhow::Result<HashSet<TimelineId>> {
     174              48 :     let remote_path = remote_timelines_path(&tenant_id);
     175              48 : 
     176              48 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     177               6 :         anyhow::bail!("storage-sync-list-remote-timelines");
     178              48 :     });
     179                 : 
     180              42 :     let timelines = download_retry(
     181              51 :         || storage.list_prefixes(Some(&remote_path)),
     182              42 :         &format!("list prefixes for {tenant_id}"),
     183              42 :     )
     184             137 :     .await?;
     185                 : 
     186              42 :     if timelines.is_empty() {
     187 UBC           0 :         anyhow::bail!("no timelines found on the remote storage")
     188 CBC          42 :     }
     189              42 : 
     190              42 :     let mut timeline_ids = HashSet::new();
     191                 : 
     192             100 :     for timeline_remote_storage_key in timelines {
     193              58 :         let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
     194 UBC           0 :             anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
     195 CBC          58 :         })?;
     196                 : 
     197              58 :         let timeline_id: TimelineId = object_name
     198              58 :             .parse()
     199              58 :             .with_context(|| format!("parse object name into timeline id '{object_name}'"))?;
     200                 : 
     201                 :         // list_prefixes is assumed to return unique names. Ensure this here.
     202                 :         // NB: it's safer to bail out than warn-log this because the pageserver
     203                 :         //     needs to absolutely know about _all_ timelines that exist, so that
     204                 :         //     GC knows all the branchpoints. If we skipped over a timeline instead,
     205                 :         //     GC could delete a layer that's still needed by that timeline.
     206              58 :         anyhow::ensure!(
     207              58 :             !timeline_ids.contains(&timeline_id),
     208 UBC           0 :             "list_prefixes contains duplicate timeline id {timeline_id}"
     209                 :         );
     210 CBC          58 :         timeline_ids.insert(timeline_id);
     211                 :     }
     212                 : 
     213              42 :     Ok(timeline_ids)
     214              48 : }
     215                 : 
     216             378 : async fn do_download_index_part(
     217             378 :     storage: &GenericRemoteStorage,
     218             378 :     tenant_id: &TenantId,
     219             378 :     timeline_id: &TimelineId,
     220             378 :     index_generation: Generation,
     221             378 : ) -> Result<IndexPart, DownloadError> {
     222             378 :     let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
     223                 : 
     224             378 :     let index_part_bytes = download_retry(
     225             410 :         || async {
     226             799 :             let mut index_part_download = storage.download(&remote_path).await?;
     227                 : 
     228             342 :             let mut index_part_bytes = Vec::new();
     229             342 :             tokio::io::copy(
     230             342 :                 &mut index_part_download.download_stream,
     231             342 :                 &mut index_part_bytes,
     232             342 :             )
     233             550 :             .await
     234             342 :             .with_context(|| format!("download index part at {remote_path:?}"))
     235             342 :             .map_err(DownloadError::Other)?;
     236             342 :             Ok(index_part_bytes)
     237             410 :         },
     238             378 :         &format!("download {remote_path:?}"),
     239             378 :     )
     240            1349 :     .await?;
     241                 : 
     242             342 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     243             342 :         .with_context(|| format!("download index part file at {remote_path:?}"))
     244             342 :         .map_err(DownloadError::Other)?;
     245                 : 
     246             342 :     Ok(index_part)
     247             378 : }
     248                 : 
     249                 : /// index_part.json objects are suffixed with a generation number, so we cannot
     250                 : /// directly GET the latest index part without doing some probing.
     251                 : ///
     252                 : /// In this function we probe for the most recent index in a generation <= our current generation.
     253                 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     254            1059 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
     255                 : pub(super) async fn download_index_part(
     256                 :     storage: &GenericRemoteStorage,
     257                 :     tenant_id: &TenantId,
     258                 :     timeline_id: &TimelineId,
     259                 :     my_generation: Generation,
     260                 : ) -> Result<IndexPart, DownloadError> {
     261                 :     debug_assert_current_span_has_tenant_and_timeline_id();
     262                 : 
     263                 :     if my_generation.is_none() {
     264                 :         // Operating without generations: just fetch the generation-less path
     265                 :         return do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
     266                 :     }
     267                 : 
     268                 :     // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
     269                 :     // index in our generation.
     270                 :     //
     271                 :     // This is an optimization to avoid doing the listing for the general case below.
     272                 :     let res = do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
     273                 :     match res {
     274                 :         Ok(index_part) => {
     275 UBC           0 :             tracing::debug!(
     276               0 :                 "Found index_part from current generation (this is a stale attachment)"
     277               0 :             );
     278                 :             return Ok(index_part);
     279                 :         }
     280                 :         Err(DownloadError::NotFound) => {}
     281                 :         Err(e) => return Err(e),
     282                 :     };
     283                 : 
     284                 :     // Typical case: the previous generation of this tenant was running healthily, and had uploaded
     285                 :     // and index part.  We may safely start from this index without doing a listing, because:
     286                 :     //  - We checked for current generation case above
     287                 :     //  - generations > my_generation are to be ignored
     288                 :     //  - any other indices that exist would have an older generation than `previous_gen`, and
     289                 :     //    we want to find the most recent index from a previous generation.
     290                 :     //
     291                 :     // This is an optimization to avoid doing the listing for the general case below.
     292                 :     let res =
     293                 :         do_download_index_part(storage, tenant_id, timeline_id, my_generation.previous()).await;
     294                 :     match res {
     295                 :         Ok(index_part) => {
     296               0 :             tracing::debug!("Found index_part from previous generation");
     297                 :             return Ok(index_part);
     298                 :         }
     299                 :         Err(DownloadError::NotFound) => {
     300               0 :             tracing::debug!(
     301               0 :                 "No index_part found from previous generation, falling back to listing"
     302               0 :             );
     303                 :         }
     304                 :         Err(e) => {
     305                 :             return Err(e);
     306                 :         }
     307                 :     }
     308                 : 
     309                 :     // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
     310                 :     // objects, and select the highest one with a generation <= my_generation.
     311                 :     let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none());
     312                 :     let indices = backoff::retry(
     313 CBC          23 :         || async { storage.list_files(Some(&index_prefix)).await },
     314               3 :         |_| false,
     315                 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     316                 :         FAILED_REMOTE_OP_RETRIES,
     317                 :         "listing index_part files",
     318                 :         // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
     319 UBC           0 :         backoff::Cancel::new(CancellationToken::new(), || -> anyhow::Error {
     320               0 :             unreachable!()
     321               0 :         }),
     322                 :     )
     323                 :     .await
     324                 :     .map_err(DownloadError::Other)?;
     325                 : 
     326                 :     // General case logic for which index to use: the latest index whose generation
     327                 :     // is <= our own.  See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
     328                 :     let max_previous_generation = indices
     329                 :         .into_iter()
     330                 :         .filter_map(parse_remote_index_path)
     331 CBC          11 :         .filter(|g| g <= &my_generation)
     332                 :         .max();
     333                 : 
     334                 :     match max_previous_generation {
     335                 :         Some(g) => {
     336 UBC           0 :             tracing::debug!("Found index_part in generation {g:?}");
     337                 :             do_download_index_part(storage, tenant_id, timeline_id, g).await
     338                 :         }
     339                 :         None => {
     340                 :             // Migration from legacy pre-generation state: we have a generation but no prior
     341                 :             // attached pageservers did.  Try to load from a no-generation path.
     342 CBC           2 :             tracing::info!("No index_part.json* found");
     343                 :             do_download_index_part(storage, tenant_id, timeline_id, Generation::none()).await
     344                 :         }
     345                 :     }
     346                 : }
     347                 : 
     348                 : /// Helper function to handle retries for a download operation.
     349                 : ///
     350                 : /// Remote operations can fail due to rate limits (IAM, S3), spurious network
     351                 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     352                 : /// with backoff.
     353                 : ///
     354                 : /// (See similar logic for uploads in `perform_upload_task`)
     355            1799 : async fn download_retry<T, O, F>(op: O, description: &str) -> Result<T, DownloadError>
     356            1799 : where
     357            1799 :     O: FnMut() -> F,
     358            1799 :     F: Future<Output = Result<T, DownloadError>>,
     359            1799 : {
     360            1799 :     backoff::retry(
     361            1799 :         op,
     362            1799 :         |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
     363            1799 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     364            1799 :         FAILED_REMOTE_OP_RETRIES,
     365            1799 :         description,
     366            1799 :         // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
     367            1799 :         backoff::Cancel::new(CancellationToken::new(), || -> DownloadError {
     368 UBC           0 :             unreachable!()
     369 CBC        1799 :         }),
     370            1799 :     )
     371          455798 :     .await
     372            1797 : }
        

Generated by: LCOV version 2.1-beta