LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - upload.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 92.4 % 145 134
Test Date: 2024-02-07 07:37:29 Functions: 65.6 % 32 21

            Line data    Source code
       1              : //! Helper functions to upload files to remote storage with a RemoteStorage
       2              : 
       3              : use anyhow::{bail, Context};
       4              : use camino::Utf8Path;
       5              : use fail::fail_point;
       6              : use pageserver_api::shard::TenantShardId;
       7              : use std::io::{ErrorKind, SeekFrom};
       8              : use std::time::SystemTime;
       9              : use tokio::fs::{self, File};
      10              : use tokio::io::AsyncSeekExt;
      11              : use tokio_util::sync::CancellationToken;
      12              : use utils::backoff;
      13              : 
      14              : use super::Generation;
      15              : use crate::{
      16              :     config::PageServerConf,
      17              :     tenant::remote_timeline_client::{
      18              :         index::IndexPart, remote_index_path, remote_initdb_archive_path,
      19              :         remote_initdb_preserved_archive_path, remote_path, upload_cancellable,
      20              :     },
      21              : };
      22              : use remote_storage::{GenericRemoteStorage, TimeTravelError};
      23              : use utils::id::{TenantId, TimelineId};
      24              : 
      25              : use super::index::LayerFileMetadata;
      26              : 
      27              : use tracing::info;
      28              : 
      29              : /// Serializes and uploads the given index part data to the remote storage.
      30         6955 : pub(super) async fn upload_index_part<'a>(
      31         6955 :     storage: &'a GenericRemoteStorage,
      32         6955 :     tenant_shard_id: &TenantShardId,
      33         6955 :     timeline_id: &TimelineId,
      34         6955 :     generation: Generation,
      35         6955 :     index_part: &'a IndexPart,
      36         6955 :     cancel: &CancellationToken,
      37         6955 : ) -> anyhow::Result<()> {
      38            0 :     tracing::trace!("uploading new index part");
      39              : 
      40         6955 :     fail_point!("before-upload-index", |_| {
      41            4 :         bail!("failpoint before-upload-index")
      42         6955 :     });
      43         6951 :     pausable_failpoint!("before-upload-index-pausable");
      44              : 
      45         6948 :     let index_part_bytes = index_part
      46         6948 :         .to_s3_bytes()
      47         6948 :         .context("serialize index part file into bytes")?;
      48         6948 :     let index_part_size = index_part_bytes.len();
      49         6948 :     let index_part_bytes = bytes::Bytes::from(index_part_bytes);
      50         6948 : 
      51         6948 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
      52         6948 :     upload_cancellable(
      53         6948 :         cancel,
      54         6948 :         storage.upload_storage_object(
      55         6948 :             futures::stream::once(futures::future::ready(Ok(index_part_bytes))),
      56         6948 :             index_part_size,
      57         6948 :             &remote_path,
      58         6948 :         ),
      59         6948 :     )
      60        16334 :     .await
      61         6943 :     .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
      62         6947 : }
      63              : 
      64              : /// Attempts to upload given layer files.
      65              : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
      66              : ///
      67              : /// On an error, bumps the retries count and reschedules the entire task.
      68        23207 : pub(super) async fn upload_timeline_layer<'a>(
      69        23207 :     conf: &'static PageServerConf,
      70        23207 :     storage: &'a GenericRemoteStorage,
      71        23207 :     source_path: &'a Utf8Path,
      72        23207 :     known_metadata: &'a LayerFileMetadata,
      73        23207 :     generation: Generation,
      74        23207 :     cancel: &CancellationToken,
      75        23207 : ) -> anyhow::Result<()> {
      76        23207 :     fail_point!("before-upload-layer", |_| {
      77            3 :         bail!("failpoint before-upload-layer")
      78        23207 :     });
      79              : 
      80        23204 :     pausable_failpoint!("before-upload-layer-pausable");
      81              : 
      82        23204 :     let storage_path = remote_path(conf, source_path, generation)?;
      83        23204 :     let source_file_res = fs::File::open(&source_path).await;
      84        23204 :     let source_file = match source_file_res {
      85        23204 :         Ok(source_file) => source_file,
      86            0 :         Err(e) if e.kind() == ErrorKind::NotFound => {
      87              :             // If we encounter this arm, it wasn't intended, but it's also not
      88              :             // a big problem, if it's because the file was deleted before an
      89              :             // upload. However, a nonexistent file can also be indicative of
      90              :             // something worse, like when a file is scheduled for upload before
      91              :             // it has been written to disk yet.
      92              :             //
      93              :             // This is tested against `test_compaction_delete_before_upload`
      94            0 :             info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
      95            0 :             return Ok(());
      96              :         }
      97            0 :         Err(e) => {
      98            0 :             Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
      99              :         }
     100              :     };
     101              : 
     102        23204 :     let fs_size = source_file
     103        23204 :         .metadata()
     104        22614 :         .await
     105        23204 :         .with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
     106        23204 :         .len();
     107        23204 : 
     108        23204 :     let metadata_size = known_metadata.file_size();
     109        23204 :     if metadata_size != fs_size {
     110            0 :         bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
     111        23204 :     }
     112              : 
     113        23204 :     let fs_size = usize::try_from(fs_size)
     114        23204 :         .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
     115              : 
     116        23204 :     let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
     117        23204 : 
     118        23204 :     upload_cancellable(cancel, storage.upload(reader, fs_size, &storage_path, None))
     119      1058775 :         .await
     120        23203 :         .with_context(|| format!("upload layer from local path '{source_path}'"))?;
     121              : 
     122        21333 :     Ok(())
     123        23206 : }
     124              : 
     125              : /// Uploads the given `initdb` data to the remote storage.
     126          632 : pub(crate) async fn upload_initdb_dir(
     127          632 :     storage: &GenericRemoteStorage,
     128          632 :     tenant_id: &TenantId,
     129          632 :     timeline_id: &TimelineId,
     130          632 :     mut initdb_tar_zst: File,
     131          632 :     size: u64,
     132          632 :     cancel: &CancellationToken,
     133          632 : ) -> anyhow::Result<()> {
     134            0 :     tracing::trace!("uploading initdb dir");
     135              : 
     136              :     // We might have read somewhat into the file already in the prior retry attempt
     137          632 :     initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
     138              : 
     139          632 :     let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
     140          632 : 
     141          632 :     let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
     142          632 :     upload_cancellable(
     143          632 :         cancel,
     144          632 :         storage.upload_storage_object(file, size as usize, &remote_path),
     145          632 :     )
     146        26802 :     .await
     147          632 :     .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
     148          632 : }
     149              : 
     150            2 : pub(crate) async fn preserve_initdb_archive(
     151            2 :     storage: &GenericRemoteStorage,
     152            2 :     tenant_id: &TenantId,
     153            2 :     timeline_id: &TimelineId,
     154            2 :     cancel: &CancellationToken,
     155            2 : ) -> anyhow::Result<()> {
     156            2 :     let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
     157            2 :     let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
     158            2 :     upload_cancellable(cancel, storage.copy_object(&source_path, &dest_path))
     159            2 :         .await
     160            2 :         .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
     161            2 : }
     162              : 
     163            1 : pub(crate) async fn time_travel_recover_tenant(
     164            1 :     storage: &GenericRemoteStorage,
     165            1 :     tenant_shard_id: &TenantShardId,
     166            1 :     timestamp: SystemTime,
     167            1 :     done_if_after: SystemTime,
     168            1 :     cancel: &CancellationToken,
     169            1 : ) -> Result<(), TimeTravelError> {
     170            1 :     let warn_after = 3;
     171            1 :     let max_attempts = 10;
     172            1 :     let mut prefixes = Vec::with_capacity(2);
     173            1 :     if tenant_shard_id.is_zero() {
     174            1 :         // Also recover the unsharded prefix for a shard of zero:
     175            1 :         // - if the tenant is totally unsharded, the unsharded prefix contains all the data
     176            1 :         // - if the tenant is sharded, we still want to recover the initdb data, but we only
     177            1 :         //   want to do it once, so let's do it on the 0 shard
     178            1 :         let timelines_path_unsharded =
     179            1 :             super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
     180            1 :         prefixes.push(timelines_path_unsharded);
     181            1 :     }
     182            1 :     if !tenant_shard_id.is_unsharded() {
     183            0 :         // If the tenant is sharded, we need to recover the sharded prefix
     184            0 :         let timelines_path = super::remote_timelines_path(tenant_shard_id);
     185            0 :         prefixes.push(timelines_path);
     186            1 :     }
     187            2 :     for prefix in &prefixes {
     188            1 :         backoff::retry(
     189            1 :             || async {
     190            1 :                 storage
     191            1 :                     .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
     192          131 :                     .await
     193            1 :             },
     194            1 :             |e| !matches!(e, TimeTravelError::Other(_)),
     195            1 :             warn_after,
     196            1 :             max_attempts,
     197            1 :             "time travel recovery of tenant prefix",
     198            1 :             cancel,
     199            1 :         )
     200          131 :         .await
     201            1 :         .ok_or_else(|| TimeTravelError::Cancelled)
     202            1 :         .and_then(|x| x)?;
     203              :     }
     204            1 :     Ok(())
     205            1 : }
        

Generated by: LCOV version 2.1-beta