LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - upload.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 45.2 % 188 85
Test Date: 2024-11-25 17:48:16 Functions: 18.8 % 32 6

            Line data    Source code
       1              : //! Helper functions to upload files to remote storage with a RemoteStorage
       2              : 
       3              : use anyhow::{bail, Context};
       4              : use bytes::Bytes;
       5              : use camino::Utf8Path;
       6              : use fail::fail_point;
       7              : use pageserver_api::shard::TenantShardId;
       8              : use std::io::{ErrorKind, SeekFrom};
       9              : use std::time::SystemTime;
      10              : use tokio::fs::{self, File};
      11              : use tokio::io::AsyncSeekExt;
      12              : use tokio_util::sync::CancellationToken;
      13              : use utils::{backoff, pausable_failpoint};
      14              : 
      15              : use super::index::IndexPart;
      16              : use super::manifest::TenantManifest;
      17              : use super::Generation;
      18              : use crate::tenant::remote_timeline_client::{
      19              :     remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
      20              :     remote_tenant_manifest_path,
      21              : };
      22              : use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
      23              : use utils::id::{TenantId, TimelineId};
      24              : 
      25              : use tracing::info;
      26              : 
      27              : /// Serializes and uploads the given index part data to the remote storage.
      28         1414 : pub(crate) async fn upload_index_part<'a>(
      29         1414 :     storage: &'a GenericRemoteStorage,
      30         1414 :     tenant_shard_id: &TenantShardId,
      31         1414 :     timeline_id: &TimelineId,
      32         1414 :     generation: Generation,
      33         1414 :     index_part: &IndexPart,
      34         1414 :     cancel: &CancellationToken,
      35         1414 : ) -> anyhow::Result<()> {
      36         1414 :     tracing::trace!("uploading new index part");
      37              : 
      38         1414 :     fail_point!("before-upload-index", |_| {
      39            0 :         bail!("failpoint before-upload-index")
      40         1414 :     });
      41         1414 :     pausable_failpoint!("before-upload-index-pausable");
      42              : 
      43              :     // FIXME: this error comes too late
      44         1414 :     let serialized = index_part.to_json_bytes()?;
      45         1414 :     let serialized = Bytes::from(serialized);
      46         1414 : 
      47         1414 :     let index_part_size = serialized.len();
      48         1414 : 
      49         1414 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
      50         1414 :     storage
      51         1414 :         .upload_storage_object(
      52         1414 :             futures::stream::once(futures::future::ready(Ok(serialized))),
      53         1414 :             index_part_size,
      54         1414 :             &remote_path,
      55         1414 :             cancel,
      56         1414 :         )
      57         3917 :         .await
      58         1410 :         .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
      59         1410 : }
      60              : /// Serializes and uploads the given tenant manifest data to the remote storage.
      61            2 : pub(crate) async fn upload_tenant_manifest(
      62            2 :     storage: &GenericRemoteStorage,
      63            2 :     tenant_shard_id: &TenantShardId,
      64            2 :     generation: Generation,
      65            2 :     tenant_manifest: &TenantManifest,
      66            2 :     cancel: &CancellationToken,
      67            2 : ) -> anyhow::Result<()> {
      68            2 :     tracing::trace!("uploading new tenant manifest");
      69              : 
      70            2 :     fail_point!("before-upload-manifest", |_| {
      71            0 :         bail!("failpoint before-upload-manifest")
      72            2 :     });
      73            2 :     pausable_failpoint!("before-upload-manifest-pausable");
      74              : 
      75            2 :     let serialized = tenant_manifest.to_json_bytes()?;
      76            2 :     let serialized = Bytes::from(serialized);
      77            2 : 
      78            2 :     let tenant_manifest_site = serialized.len();
      79            2 : 
      80            2 :     let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
      81            2 :     storage
      82            2 :         .upload_storage_object(
      83            2 :             futures::stream::once(futures::future::ready(Ok(serialized))),
      84            2 :             tenant_manifest_site,
      85            2 :             &remote_path,
      86            2 :             cancel,
      87            2 :         )
      88            3 :         .await
      89            2 :         .with_context(|| format!("upload tenant manifest for '{tenant_shard_id}'"))
      90            2 : }
      91              : 
      92              : /// Attempts to upload given layer files.
      93              : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
      94              : ///
      95              : /// On an error, bumps the retries count and reschedules the entire task.
      96         1436 : pub(super) async fn upload_timeline_layer<'a>(
      97         1436 :     storage: &'a GenericRemoteStorage,
      98         1436 :     local_path: &'a Utf8Path,
      99         1436 :     remote_path: &'a RemotePath,
     100         1436 :     metadata_size: u64,
     101         1436 :     cancel: &CancellationToken,
     102         1436 : ) -> anyhow::Result<()> {
     103         1436 :     fail_point!("before-upload-layer", |_| {
     104            0 :         bail!("failpoint before-upload-layer")
     105         1436 :     });
     106              : 
     107         1436 :     pausable_failpoint!("before-upload-layer-pausable");
     108              : 
     109         1378 :     let source_file_res = fs::File::open(&local_path).await;
     110         1372 :     let source_file = match source_file_res {
     111         1372 :         Ok(source_file) => source_file,
     112            0 :         Err(e) if e.kind() == ErrorKind::NotFound => {
     113            0 :             // If we encounter this arm, it wasn't intended, but it's also not
     114            0 :             // a big problem, if it's because the file was deleted before an
     115            0 :             // upload. However, a nonexistent file can also be indicative of
     116            0 :             // something worse, like when a file is scheduled for upload before
     117            0 :             // it has been written to disk yet.
     118            0 :             //
     119            0 :             // This is tested against `test_compaction_delete_before_upload`
     120            0 :             info!(path = %local_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
     121            0 :             return Ok(());
     122              :         }
     123            0 :         Err(e) => Err(e).with_context(|| format!("open a source file for layer {local_path:?}"))?,
     124              :     };
     125              : 
     126         1372 :     let fs_size = source_file
     127         1372 :         .metadata()
     128         1258 :         .await
     129         1372 :         .with_context(|| format!("get the source file metadata for layer {local_path:?}"))?
     130         1372 :         .len();
     131         1372 : 
     132         1372 :     if metadata_size != fs_size {
     133            0 :         bail!("File {local_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
     134         1372 :     }
     135              : 
     136         1372 :     let fs_size = usize::try_from(fs_size)
     137         1372 :         .with_context(|| format!("convert {local_path:?} size {fs_size} usize"))?;
     138              : 
     139         1372 :     let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
     140         1372 : 
     141         1372 :     storage
     142         1372 :         .upload(reader, fs_size, remote_path, None, cancel)
     143        35172 :         .await
     144         1342 :         .with_context(|| format!("upload layer from local path '{local_path}'"))
     145         1342 : }
     146              : 
     147            0 : pub(super) async fn copy_timeline_layer(
     148            0 :     storage: &GenericRemoteStorage,
     149            0 :     source_path: &RemotePath,
     150            0 :     target_path: &RemotePath,
     151            0 :     cancel: &CancellationToken,
     152            0 : ) -> anyhow::Result<()> {
     153            0 :     fail_point!("before-copy-layer", |_| {
     154            0 :         bail!("failpoint before-copy-layer")
     155            0 :     });
     156              : 
     157            0 :     pausable_failpoint!("before-copy-layer-pausable");
     158              : 
     159            0 :     storage
     160            0 :         .copy_object(source_path, target_path, cancel)
     161            0 :         .await
     162            0 :         .with_context(|| format!("copy layer {source_path} to {target_path}"))
     163            0 : }
     164              : 
     165              : /// Uploads the given `initdb` data to the remote storage.
     166            0 : pub(crate) async fn upload_initdb_dir(
     167            0 :     storage: &GenericRemoteStorage,
     168            0 :     tenant_id: &TenantId,
     169            0 :     timeline_id: &TimelineId,
     170            0 :     mut initdb_tar_zst: File,
     171            0 :     size: u64,
     172            0 :     cancel: &CancellationToken,
     173            0 : ) -> anyhow::Result<()> {
     174            0 :     tracing::trace!("uploading initdb dir");
     175              : 
     176              :     // We might have read somewhat into the file already in the prior retry attempt
     177            0 :     initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
     178              : 
     179            0 :     let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
     180            0 : 
     181            0 :     let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
     182            0 :     storage
     183            0 :         .upload_storage_object(file, size as usize, &remote_path, cancel)
     184            0 :         .await
     185            0 :         .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
     186            0 : }
     187              : 
     188            0 : pub(crate) async fn preserve_initdb_archive(
     189            0 :     storage: &GenericRemoteStorage,
     190            0 :     tenant_id: &TenantId,
     191            0 :     timeline_id: &TimelineId,
     192            0 :     cancel: &CancellationToken,
     193            0 : ) -> anyhow::Result<()> {
     194            0 :     let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
     195            0 :     let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
     196            0 :     storage
     197            0 :         .copy_object(&source_path, &dest_path, cancel)
     198            0 :         .await
     199            0 :         .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
     200            0 : }
     201              : 
     202            0 : pub(crate) async fn time_travel_recover_tenant(
     203            0 :     storage: &GenericRemoteStorage,
     204            0 :     tenant_shard_id: &TenantShardId,
     205            0 :     timestamp: SystemTime,
     206            0 :     done_if_after: SystemTime,
     207            0 :     cancel: &CancellationToken,
     208            0 : ) -> Result<(), TimeTravelError> {
     209            0 :     let warn_after = 3;
     210            0 :     let max_attempts = 10;
     211            0 :     let mut prefixes = Vec::with_capacity(2);
     212            0 :     if tenant_shard_id.is_shard_zero() {
     213            0 :         // Also recover the unsharded prefix for a shard of zero:
     214            0 :         // - if the tenant is totally unsharded, the unsharded prefix contains all the data
     215            0 :         // - if the tenant is sharded, we still want to recover the initdb data, but we only
     216            0 :         //   want to do it once, so let's do it on the 0 shard
     217            0 :         let timelines_path_unsharded =
     218            0 :             super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
     219            0 :         prefixes.push(timelines_path_unsharded);
     220            0 :     }
     221            0 :     if !tenant_shard_id.is_unsharded() {
     222            0 :         // If the tenant is sharded, we need to recover the sharded prefix
     223            0 :         let timelines_path = super::remote_timelines_path(tenant_shard_id);
     224            0 :         prefixes.push(timelines_path);
     225            0 :     }
     226            0 :     for prefix in &prefixes {
     227            0 :         backoff::retry(
     228            0 :             || async {
     229            0 :                 storage
     230            0 :                     .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
     231            0 :                     .await
     232            0 :             },
     233            0 :             |e| !matches!(e, TimeTravelError::Other(_)),
     234            0 :             warn_after,
     235            0 :             max_attempts,
     236            0 :             "time travel recovery of tenant prefix",
     237            0 :             cancel,
     238            0 :         )
     239            0 :         .await
     240            0 :         .ok_or_else(|| TimeTravelError::Cancelled)
     241            0 :         .and_then(|x| x)?;
     242              :     }
     243            0 :     Ok(())
     244            0 : }
        

Generated by: LCOV version 2.1-beta