LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - upload.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 91.2 % 91 83 8 83
Current Date: 2024-01-09 02:06:09 Functions: 65.2 % 23 15 8 15
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Helper functions to upload files to remote storage with a RemoteStorage
       2                 : 
       3                 : use anyhow::{bail, Context};
       4                 : use camino::Utf8Path;
       5                 : use fail::fail_point;
       6                 : use pageserver_api::shard::TenantShardId;
       7                 : use std::io::{ErrorKind, SeekFrom};
       8                 : use tokio::fs::{self, File};
       9                 : use tokio::io::AsyncSeekExt;
      10                 : use tokio_util::sync::CancellationToken;
      11                 : 
      12                 : use super::Generation;
      13                 : use crate::{
      14                 :     config::PageServerConf,
      15                 :     tenant::remote_timeline_client::{
      16                 :         index::IndexPart, remote_index_path, remote_initdb_archive_path, remote_path,
      17                 :         upload_cancellable,
      18                 :     },
      19                 : };
      20                 : use remote_storage::GenericRemoteStorage;
      21                 : use utils::id::{TenantId, TimelineId};
      22                 : 
      23                 : use super::index::LayerFileMetadata;
      24                 : 
      25                 : use tracing::info;
      26                 : 
      27                 : /// Serializes and uploads the given index part data to the remote storage.
      28 CBC        5959 : pub(super) async fn upload_index_part<'a>(
      29            5959 :     storage: &'a GenericRemoteStorage,
      30            5959 :     tenant_shard_id: &TenantShardId,
      31            5959 :     timeline_id: &TimelineId,
      32            5959 :     generation: Generation,
      33            5959 :     index_part: &'a IndexPart,
      34            5959 :     cancel: &CancellationToken,
      35            5959 : ) -> anyhow::Result<()> {
      36 UBC           0 :     tracing::trace!("uploading new index part");
      37                 : 
      38 CBC        5959 :     fail_point!("before-upload-index", |_| {
      39               4 :         bail!("failpoint before-upload-index")
      40            5959 :     });
      41            5955 :     pausable_failpoint!("before-upload-index-pausable");
      42                 : 
      43            5952 :     let index_part_bytes = index_part
      44            5952 :         .to_s3_bytes()
      45            5952 :         .context("serialize index part file into bytes")?;
      46            5952 :     let index_part_size = index_part_bytes.len();
      47            5952 :     let index_part_bytes = bytes::Bytes::from(index_part_bytes);
      48            5952 : 
      49            5952 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
      50            5952 :     upload_cancellable(
      51            5952 :         cancel,
      52            5952 :         storage.upload_storage_object(
      53            5952 :             futures::stream::once(futures::future::ready(Ok(index_part_bytes))),
      54            5952 :             index_part_size,
      55            5952 :             &remote_path,
      56            5952 :         ),
      57            5952 :     )
      58           13563 :     .await
      59            5951 :     .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
      60            5955 : }
      61                 : 
      62                 : /// Attempts to upload given layer files.
      63                 : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
      64                 : ///
      65                 : /// On an error, bumps the retries count and reschedules the entire task.
      66           21499 : pub(super) async fn upload_timeline_layer<'a>(
      67           21499 :     conf: &'static PageServerConf,
      68           21499 :     storage: &'a GenericRemoteStorage,
      69           21499 :     source_path: &'a Utf8Path,
      70           21499 :     known_metadata: &'a LayerFileMetadata,
      71           21499 :     generation: Generation,
      72           21499 :     cancel: &CancellationToken,
      73           21499 : ) -> anyhow::Result<()> {
      74           21499 :     fail_point!("before-upload-layer", |_| {
      75               5 :         bail!("failpoint before-upload-layer")
      76           21499 :     });
      77                 : 
      78           21494 :     pausable_failpoint!("before-upload-layer-pausable");
      79                 : 
      80           21493 :     let storage_path = remote_path(conf, source_path, generation)?;
      81           21493 :     let source_file_res = fs::File::open(&source_path).await;
      82           21493 :     let source_file = match source_file_res {
      83           21493 :         Ok(source_file) => source_file,
      84 UBC           0 :         Err(e) if e.kind() == ErrorKind::NotFound => {
      85                 :             // If we encounter this arm, it wasn't intended, but it's also not
      86                 :             // a big problem, if it's because the file was deleted before an
      87                 :             // upload. However, a nonexistent file can also be indicative of
      88                 :             // something worse, like when a file is scheduled for upload before
      89                 :             // it has been written to disk yet.
      90                 :             //
      91                 :             // This is tested against `test_compaction_delete_before_upload`
      92               0 :             info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
      93               0 :             return Ok(());
      94                 :         }
      95               0 :         Err(e) => {
      96               0 :             Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
      97                 :         }
      98                 :     };
      99                 : 
     100 CBC       21493 :     let fs_size = source_file
     101           21493 :         .metadata()
     102           20892 :         .await
     103           21493 :         .with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
     104           21493 :         .len();
     105           21493 : 
     106           21493 :     let metadata_size = known_metadata.file_size();
     107           21493 :     if metadata_size != fs_size {
     108 UBC           0 :         bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
     109 CBC       21493 :     }
     110                 : 
     111           21493 :     let fs_size = usize::try_from(fs_size)
     112           21493 :         .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
     113                 : 
     114           21493 :     let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
     115           21493 : 
     116           21493 :     upload_cancellable(cancel, storage.upload(reader, fs_size, &storage_path, None))
     117          784671 :         .await
     118           21490 :         .with_context(|| format!("upload layer from local path '{source_path}'"))?;
     119                 : 
     120           19646 :     Ok(())
     121           21495 : }
     122                 : 
     123                 : /// Uploads the given `initdb` data to the remote storage.
     124             589 : pub(crate) async fn upload_initdb_dir(
     125             589 :     storage: &GenericRemoteStorage,
     126             589 :     tenant_id: &TenantId,
     127             589 :     timeline_id: &TimelineId,
     128             589 :     mut initdb_tar_zst: File,
     129             589 :     size: u64,
     130             589 :     cancel: &CancellationToken,
     131             589 : ) -> anyhow::Result<()> {
     132 UBC           0 :     tracing::trace!("uploading initdb dir");
     133                 : 
     134                 :     // We might have read somewhat into the file already in the prior retry attempt
     135 CBC         589 :     initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
     136                 : 
     137             589 :     let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
     138             589 : 
     139             589 :     let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
     140             589 :     upload_cancellable(
     141             589 :         cancel,
     142             589 :         storage.upload_storage_object(file, size as usize, &remote_path),
     143             589 :     )
     144           23154 :     .await
     145             589 :     .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
     146             589 : }
        

Generated by: LCOV version 2.1-beta