LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - upload.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 42.7 % 143 61
Test Date: 2024-02-29 11:57:12 Functions: 24.2 % 33 8

            Line data    Source code
       1              : //! Helper functions to upload files to remote storage with a RemoteStorage
       2              : 
       3              : use anyhow::{bail, Context};
       4              : use camino::Utf8Path;
       5              : use fail::fail_point;
       6              : use pageserver_api::shard::TenantShardId;
       7              : use std::io::{ErrorKind, SeekFrom};
       8              : use std::time::SystemTime;
       9              : use tokio::fs::{self, File};
      10              : use tokio::io::AsyncSeekExt;
      11              : use tokio_util::sync::CancellationToken;
      12              : use utils::backoff;
      13              : 
      14              : use super::Generation;
      15              : use crate::{
      16              :     config::PageServerConf,
      17              :     tenant::remote_timeline_client::{
      18              :         index::IndexPart, remote_index_path, remote_initdb_archive_path,
      19              :         remote_initdb_preserved_archive_path, remote_path,
      20              :     },
      21              : };
      22              : use remote_storage::{GenericRemoteStorage, TimeTravelError};
      23              : use utils::id::{TenantId, TimelineId};
      24              : 
      25              : use super::index::LayerFileMetadata;
      26              : 
      27              : use tracing::info;
      28              : 
      29              : /// Serializes and uploads the given index part data to the remote storage.
      30          747 : pub(crate) async fn upload_index_part<'a>(
      31          747 :     storage: &'a GenericRemoteStorage,
      32          747 :     tenant_shard_id: &TenantShardId,
      33          747 :     timeline_id: &TimelineId,
      34          747 :     generation: Generation,
      35          747 :     index_part: &'a IndexPart,
      36          747 :     cancel: &CancellationToken,
      37          747 : ) -> anyhow::Result<()> {
      38            0 :     tracing::trace!("uploading new index part");
      39              : 
      40          747 :     fail_point!("before-upload-index", |_| {
      41            0 :         bail!("failpoint before-upload-index")
      42          747 :     });
      43          747 :     pausable_failpoint!("before-upload-index-pausable");
      44              : 
      45          747 :     let index_part_bytes = index_part
      46          747 :         .to_s3_bytes()
      47          747 :         .context("serialize index part file into bytes")?;
      48          747 :     let index_part_size = index_part_bytes.len();
      49          747 :     let index_part_bytes = bytes::Bytes::from(index_part_bytes);
      50          747 : 
      51          747 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
      52          747 :     storage
      53          747 :         .upload_storage_object(
      54          747 :             futures::stream::once(futures::future::ready(Ok(index_part_bytes))),
      55          747 :             index_part_size,
      56          747 :             &remote_path,
      57          747 :             cancel,
      58          747 :         )
      59         2429 :         .await
      60          745 :         .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
      61          745 : }
      62              : 
      63              : /// Attempts to upload given layer files.
      64              : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
      65              : ///
      66              : /// On an error, bumps the retries count and reschedules the entire task.
      67          555 : pub(super) async fn upload_timeline_layer<'a>(
      68          555 :     conf: &'static PageServerConf,
      69          555 :     storage: &'a GenericRemoteStorage,
      70          555 :     source_path: &'a Utf8Path,
      71          555 :     known_metadata: &'a LayerFileMetadata,
      72          555 :     generation: Generation,
      73          555 :     cancel: &CancellationToken,
      74          555 : ) -> anyhow::Result<()> {
      75          555 :     fail_point!("before-upload-layer", |_| {
      76            0 :         bail!("failpoint before-upload-layer")
      77          555 :     });
      78              : 
      79          555 :     pausable_failpoint!("before-upload-layer-pausable");
      80              : 
      81          554 :     let storage_path = remote_path(conf, source_path, generation)?;
      82          554 :     let source_file_res = fs::File::open(&source_path).await;
      83          554 :     let source_file = match source_file_res {
      84          554 :         Ok(source_file) => source_file,
      85            0 :         Err(e) if e.kind() == ErrorKind::NotFound => {
      86              :             // If we encounter this arm, it wasn't intended, but it's also not
      87              :             // a big problem, if it's because the file was deleted before an
      88              :             // upload. However, a nonexistent file can also be indicative of
      89              :             // something worse, like when a file is scheduled for upload before
      90              :             // it has been written to disk yet.
      91              :             //
      92              :             // This is tested against `test_compaction_delete_before_upload`
      93            0 :             info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
      94            0 :             return Ok(());
      95              :         }
      96            0 :         Err(e) => {
      97            0 :             Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
      98              :         }
      99              :     };
     100              : 
     101          554 :     let fs_size = source_file
     102          554 :         .metadata()
     103          553 :         .await
     104          553 :         .with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
     105          553 :         .len();
     106          553 : 
     107          553 :     let metadata_size = known_metadata.file_size();
     108          553 :     if metadata_size != fs_size {
     109            0 :         bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
     110          553 :     }
     111              : 
     112          553 :     let fs_size = usize::try_from(fs_size)
     113          553 :         .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
     114              : 
     115          553 :     let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
     116          553 : 
     117          553 :     storage
     118          553 :         .upload(reader, fs_size, &storage_path, None, cancel)
     119        17231 :         .await
     120          543 :         .with_context(|| format!("upload layer from local path '{source_path}'"))
     121          543 : }
     122              : 
     123              : /// Uploads the given `initdb` data to the remote storage.
     124            0 : pub(crate) async fn upload_initdb_dir(
     125            0 :     storage: &GenericRemoteStorage,
     126            0 :     tenant_id: &TenantId,
     127            0 :     timeline_id: &TimelineId,
     128            0 :     mut initdb_tar_zst: File,
     129            0 :     size: u64,
     130            0 :     cancel: &CancellationToken,
     131            0 : ) -> anyhow::Result<()> {
     132            0 :     tracing::trace!("uploading initdb dir");
     133              : 
     134              :     // We might have read somewhat into the file already in the prior retry attempt
     135            0 :     initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
     136              : 
     137            0 :     let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
     138            0 : 
     139            0 :     let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
     140            0 :     storage
     141            0 :         .upload_storage_object(file, size as usize, &remote_path, cancel)
     142            0 :         .await
     143            0 :         .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
     144            0 : }
     145              : 
     146            0 : pub(crate) async fn preserve_initdb_archive(
     147            0 :     storage: &GenericRemoteStorage,
     148            0 :     tenant_id: &TenantId,
     149            0 :     timeline_id: &TimelineId,
     150            0 :     cancel: &CancellationToken,
     151            0 : ) -> anyhow::Result<()> {
     152            0 :     let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
     153            0 :     let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
     154            0 :     storage
     155            0 :         .copy_object(&source_path, &dest_path, cancel)
     156            0 :         .await
     157            0 :         .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
     158            0 : }
     159              : 
     160            0 : pub(crate) async fn time_travel_recover_tenant(
     161            0 :     storage: &GenericRemoteStorage,
     162            0 :     tenant_shard_id: &TenantShardId,
     163            0 :     timestamp: SystemTime,
     164            0 :     done_if_after: SystemTime,
     165            0 :     cancel: &CancellationToken,
     166            0 : ) -> Result<(), TimeTravelError> {
     167            0 :     let warn_after = 3;
     168            0 :     let max_attempts = 10;
     169            0 :     let mut prefixes = Vec::with_capacity(2);
     170            0 :     if tenant_shard_id.is_zero() {
     171            0 :         // Also recover the unsharded prefix for a shard of zero:
     172            0 :         // - if the tenant is totally unsharded, the unsharded prefix contains all the data
     173            0 :         // - if the tenant is sharded, we still want to recover the initdb data, but we only
     174            0 :         //   want to do it once, so let's do it on the 0 shard
     175            0 :         let timelines_path_unsharded =
     176            0 :             super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
     177            0 :         prefixes.push(timelines_path_unsharded);
     178            0 :     }
     179            0 :     if !tenant_shard_id.is_unsharded() {
     180            0 :         // If the tenant is sharded, we need to recover the sharded prefix
     181            0 :         let timelines_path = super::remote_timelines_path(tenant_shard_id);
     182            0 :         prefixes.push(timelines_path);
     183            0 :     }
     184            0 :     for prefix in &prefixes {
     185            0 :         backoff::retry(
     186            0 :             || async {
     187            0 :                 storage
     188            0 :                     .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
     189            0 :                     .await
     190            0 :             },
     191            0 :             |e| !matches!(e, TimeTravelError::Other(_)),
     192            0 :             warn_after,
     193            0 :             max_attempts,
     194            0 :             "time travel recovery of tenant prefix",
     195            0 :             cancel,
     196            0 :         )
     197            0 :         .await
     198            0 :         .ok_or_else(|| TimeTravelError::Cancelled)
     199            0 :         .and_then(|x| x)?;
     200              :     }
     201            0 :     Ok(())
     202            0 : }
        

Generated by: LCOV version 2.1-beta