LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - download.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 89.7 % 194 174
Test Date: 2023-09-06 10:18:01 Functions: 62.5 % 40 25

            Line data    Source code
       1              : //! Helper functions to download files from remote storage with a RemoteStorage
       2              : //!
       3              : //! The functions in this module retry failed operations automatically, according
       4              : //! to the FAILED_DOWNLOAD_RETRIES constant.
       5              : 
       6              : use std::collections::HashSet;
       7              : use std::future::Future;
       8              : use std::path::Path;
       9              : use std::time::Duration;
      10              : 
      11              : use anyhow::{anyhow, Context};
      12              : use tokio::fs;
      13              : use tokio::io::AsyncWriteExt;
      14              : use tokio_util::sync::CancellationToken;
      15              : use utils::{backoff, crashsafe};
      16              : 
      17              : use crate::config::PageServerConf;
      18              : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
      19              : use crate::tenant::storage_layer::LayerFileName;
      20              : use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
      21              : use crate::tenant::Generation;
      22              : use remote_storage::{DownloadError, GenericRemoteStorage};
      23              : use utils::crashsafe::path_with_suffix_extension;
      24              : use utils::id::{TenantId, TimelineId};
      25              : 
      26              : use super::index::{IndexPart, LayerFileMetadata};
      27              : use super::{remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES};
      28              : 
      29              : static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120);
      30              : 
      31              : ///
      32              : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
      33              : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
      34              : ///
      35              : /// Returns the size of the downloaded file.
      36         1043 : pub async fn download_layer_file<'a>(
      37         1043 :     conf: &'static PageServerConf,
      38         1043 :     storage: &'a GenericRemoteStorage,
      39         1043 :     tenant_id: TenantId,
      40         1043 :     timeline_id: TimelineId,
      41         1043 :     layer_file_name: &'a LayerFileName,
      42         1043 :     layer_metadata: &'a LayerFileMetadata,
      43         1043 : ) -> Result<u64, DownloadError> {
      44         1043 :     debug_assert_current_span_has_tenant_and_timeline_id();
      45         1043 : 
      46         1043 :     let local_path = conf
      47         1043 :         .timeline_path(&tenant_id, &timeline_id)
      48         1043 :         .join(layer_file_name.file_name());
      49         1043 : 
      50         1043 :     let remote_path = remote_layer_path(&tenant_id, &timeline_id, layer_file_name, layer_metadata);
      51         1043 : 
      52         1043 :     // Perform a rename inspired by durable_rename from file_utils.c.
      53         1043 :     // The sequence:
      54         1043 :     //     write(tmp)
      55         1043 :     //     fsync(tmp)
      56         1043 :     //     rename(tmp, new)
      57         1043 :     //     fsync(new)
      58         1043 :     //     fsync(parent)
      59         1043 :     // For more context about durable_rename check this email from postgres mailing list:
      60         1043 :     // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
      61         1043 :     // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
      62         1043 :     let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
      63              : 
      64         1043 :     let (mut destination_file, bytes_amount) = download_retry(
      65         1061 :         || async {
      66              :             // TODO: this doesn't use the cached fd for some reason?
      67         1061 :             let mut destination_file = fs::File::create(&temp_file_path)
      68         1029 :                 .await
      69         1061 :                 .with_context(|| {
      70            0 :                     format!(
      71            0 :                         "create a destination file for layer '{}'",
      72            0 :                         temp_file_path.display()
      73            0 :                     )
      74         1061 :                 })
      75         1061 :                 .map_err(DownloadError::Other)?;
      76         1061 :             let mut download = storage
      77         1061 :                 .download(&remote_path)
      78         1393 :                 .await
      79         1061 :                 .with_context(|| {
      80           18 :                     format!(
      81           18 :                     "open a download stream for layer with remote storage path '{remote_path:?}'"
      82           18 :                 )
      83         1061 :                 })
      84         1061 :                 .map_err(DownloadError::Other)?;
      85              : 
      86         1043 :             let bytes_amount = tokio::time::timeout(
      87         1043 :                 MAX_DOWNLOAD_DURATION,
      88         1043 :                 tokio::io::copy(&mut download.download_stream, &mut destination_file),
      89         1043 :             )
      90       353872 :             .await
      91         1041 :             .map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out  {:?}", e)))?
      92         1041 :             .with_context(|| {
      93            0 :                 format!(
      94            0 :                     "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
      95            0 :                 )
      96         1041 :             })
      97         1041 :             .map_err(DownloadError::Other)?;
      98              : 
      99         1041 :             Ok((destination_file, bytes_amount))
     100         1059 :         },
     101         1043 :         &format!("download {remote_path:?}"),
     102         1043 :     )
     103       356294 :     .await?;
     104              : 
     105              :     // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
     106              :     // A file will not be closed immediately when it goes out of scope if there are any IO operations
     107              :     // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
     108              :     // you should call flush before dropping it.
     109              :     //
     110              :     // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
     111              :     // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
     112              :     // But for additional safety lets check/wait for any pending operations.
     113         1041 :     destination_file
     114         1041 :         .flush()
     115            0 :         .await
     116         1041 :         .with_context(|| format!("flush source file at {}", temp_file_path.display()))
     117         1041 :         .map_err(DownloadError::Other)?;
     118              : 
     119         1041 :     let expected = layer_metadata.file_size();
     120         1041 :     if expected != bytes_amount {
     121            0 :         return Err(DownloadError::Other(anyhow!(
     122            0 :             "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
     123            0 :         )));
     124         1041 :     }
     125         1041 : 
     126         1041 :     // not using sync_data because it can lose file size update
     127         1041 :     destination_file
     128         1041 :         .sync_all()
     129         1253 :         .await
     130         1040 :         .with_context(|| {
     131            0 :             format!(
     132            0 :                 "failed to fsync source file at {}",
     133            0 :                 temp_file_path.display()
     134            0 :             )
     135         1040 :         })
     136         1040 :         .map_err(DownloadError::Other)?;
     137         1040 :     drop(destination_file);
     138         1040 : 
     139         1040 :     fail::fail_point!("remote-storage-download-pre-rename", |_| {
     140           32 :         Err(DownloadError::Other(anyhow!(
     141           32 :             "remote-storage-download-pre-rename failpoint triggered"
     142           32 :         )))
     143         1040 :     });
     144              : 
     145         1008 :     fs::rename(&temp_file_path, &local_path)
     146          985 :         .await
     147         1008 :         .with_context(|| format!("rename download layer file to {}", local_path.display(),))
     148         1008 :         .map_err(DownloadError::Other)?;
     149              : 
     150         1008 :     crashsafe::fsync_async(&local_path)
     151         1996 :         .await
     152         1008 :         .with_context(|| format!("fsync layer file {}", local_path.display(),))
     153         1008 :         .map_err(DownloadError::Other)?;
     154              : 
     155            0 :     tracing::debug!("download complete: {}", local_path.display());
     156              : 
     157         1008 :     Ok(bytes_amount)
     158         1040 : }
     159              : 
     160              : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
     161              : 
     162           88 : pub fn is_temp_download_file(path: &Path) -> bool {
     163           88 :     let extension = path.extension().map(|pname| {
     164            5 :         pname
     165            5 :             .to_str()
     166            5 :             .expect("paths passed to this function must be valid Rust strings")
     167           88 :     });
     168           88 :     match extension {
     169            5 :         Some(TEMP_DOWNLOAD_EXTENSION) => true,
     170            5 :         Some(_) => false,
     171           83 :         None => false,
     172              :     }
     173           88 : }
     174              : 
     175              : /// List timelines of given tenant in remote storage
     176           42 : pub async fn list_remote_timelines(
     177           42 :     storage: &GenericRemoteStorage,
     178           42 :     tenant_id: TenantId,
     179           42 : ) -> anyhow::Result<HashSet<TimelineId>> {
     180           42 :     let remote_path = remote_timelines_path(&tenant_id);
     181           42 : 
     182           42 :     fail::fail_point!("storage-sync-list-remote-timelines", |_| {
     183            3 :         anyhow::bail!("storage-sync-list-remote-timelines");
     184           42 :     });
     185              : 
     186           39 :     let timelines = download_retry(
     187           45 :         || storage.list_prefixes(Some(&remote_path)),
     188           39 :         &format!("list prefixes for {tenant_id}"),
     189           39 :     )
     190          132 :     .await?;
     191              : 
     192           39 :     if timelines.is_empty() {
     193            0 :         anyhow::bail!("no timelines found on the remote storage")
     194           39 :     }
     195           39 : 
     196           39 :     let mut timeline_ids = HashSet::new();
     197              : 
     198           94 :     for timeline_remote_storage_key in timelines {
     199           55 :         let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
     200            0 :             anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
     201           55 :         })?;
     202              : 
     203           55 :         let timeline_id: TimelineId = object_name
     204           55 :             .parse()
     205           55 :             .with_context(|| format!("parse object name into timeline id '{object_name}'"))?;
     206              : 
     207              :         // list_prefixes is assumed to return unique names. Ensure this here.
     208              :         // NB: it's safer to bail out than warn-log this because the pageserver
     209              :         //     needs to absolutely know about _all_ timelines that exist, so that
     210              :         //     GC knows all the branchpoints. If we skipped over a timeline instead,
     211              :         //     GC could delete a layer that's still needed by that timeline.
     212           55 :         anyhow::ensure!(
     213           55 :             !timeline_ids.contains(&timeline_id),
     214            0 :             "list_prefixes contains duplicate timeline id {timeline_id}"
     215              :         );
     216           55 :         timeline_ids.insert(timeline_id);
     217              :     }
     218              : 
     219           39 :     Ok(timeline_ids)
     220           42 : }
     221              : 
     222          204 : pub(super) async fn download_index_part(
     223          204 :     storage: &GenericRemoteStorage,
     224          204 :     tenant_id: &TenantId,
     225          204 :     timeline_id: &TimelineId,
     226          204 :     generation: Generation,
     227          204 : ) -> Result<IndexPart, DownloadError> {
     228          204 :     let remote_path = remote_index_path(tenant_id, timeline_id, generation);
     229              : 
     230          204 :     let index_part_bytes = download_retry(
     231          226 :         || async {
     232          408 :             let mut index_part_download = storage.download(&remote_path).await?;
     233              : 
     234          201 :             let mut index_part_bytes = Vec::new();
     235          201 :             tokio::io::copy(
     236          201 :                 &mut index_part_download.download_stream,
     237          201 :                 &mut index_part_bytes,
     238          201 :             )
     239          259 :             .await
     240          201 :             .with_context(|| format!("download index part at {remote_path:?}"))
     241          201 :             .map_err(DownloadError::Other)?;
     242          201 :             Ok(index_part_bytes)
     243          226 :         },
     244          204 :         &format!("download {remote_path:?}"),
     245          204 :     )
     246          667 :     .await?;
     247              : 
     248          201 :     let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
     249          201 :         .with_context(|| format!("download index part file at {remote_path:?}"))
     250          201 :         .map_err(DownloadError::Other)?;
     251              : 
     252          201 :     Ok(index_part)
     253          204 : }
     254              : 
     255              : /// Helper function to handle retries for a download operation.
     256              : ///
     257              : /// Remote operations can fail due to rate limits (IAM, S3), spurious network
     258              : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
     259              : /// with backoff.
     260              : ///
     261              : /// (See similar logic for uploads in `perform_upload_task`)
     262         1286 : async fn download_retry<T, O, F>(op: O, description: &str) -> Result<T, DownloadError>
     263         1286 : where
     264         1286 :     O: FnMut() -> F,
     265         1286 :     F: Future<Output = Result<T, DownloadError>>,
     266         1286 : {
     267         1286 :     backoff::retry(
     268         1286 :         op,
     269         1286 :         |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
     270         1286 :         FAILED_DOWNLOAD_WARN_THRESHOLD,
     271         1286 :         FAILED_REMOTE_OP_RETRIES,
     272         1286 :         description,
     273         1286 :         // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
     274         1286 :         backoff::Cancel::new(CancellationToken::new(), || -> DownloadError {
     275            0 :             unreachable!()
     276         1286 :         }),
     277         1286 :     )
     278       357093 :     .await
     279         1284 : }
        

Generated by: LCOV version 2.1-beta