LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - upload.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 40.9 % 193 79
Test Date: 2025-07-16 12:29:03 Functions: 19.4 % 31 6

            Line data    Source code
       1              : //! Helper functions to upload files to remote storage with a RemoteStorage
       2              : 
       3              : use std::io::{ErrorKind, SeekFrom};
       4              : use std::num::NonZeroU32;
       5              : use std::time::SystemTime;
       6              : 
       7              : use anyhow::{Context, bail};
       8              : use bytes::Bytes;
       9              : use camino::Utf8Path;
      10              : use fail::fail_point;
      11              : use pageserver_api::shard::TenantShardId;
      12              : use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
      13              : use tokio::fs::{self, File};
      14              : use tokio::io::AsyncSeekExt;
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::info;
      17              : use utils::id::{TenantId, TimelineId};
      18              : use utils::{backoff, pausable_failpoint};
      19              : 
      20              : use super::Generation;
      21              : use super::index::IndexPart;
      22              : use super::manifest::TenantManifest;
      23              : use crate::tenant::remote_timeline_client::{
      24              :     remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
      25              :     remote_tenant_manifest_path,
      26              : };
      27              : 
      28              : /// Serializes and uploads the given index part data to the remote storage.
      29          774 : pub(crate) async fn upload_index_part(
      30          774 :     storage: &GenericRemoteStorage,
      31          774 :     tenant_shard_id: &TenantShardId,
      32          774 :     timeline_id: &TimelineId,
      33          774 :     generation: Generation,
      34          774 :     index_part: &IndexPart,
      35          774 :     cancel: &CancellationToken,
      36          774 : ) -> anyhow::Result<()> {
      37          774 :     tracing::trace!("uploading new index part");
      38              : 
      39          774 :     fail_point!("before-upload-index", |_| {
      40            0 :         bail!("failpoint before-upload-index")
      41            0 :     });
      42          774 :     pausable_failpoint!("before-upload-index-pausable");
      43              : 
      44              :     // Safety: refuse to persist invalid index metadata, to mitigate the impact of any bug that produces this
      45              :     // (this should never happen)
      46          774 :     index_part.validate().map_err(|e| anyhow::anyhow!(e))?;
      47              : 
      48              :     // FIXME: this error comes too late
      49          774 :     let serialized = index_part.to_json_bytes()?;
      50          774 :     let serialized = Bytes::from(serialized);
      51              : 
      52          774 :     let index_part_size = serialized.len();
      53              : 
      54          774 :     let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
      55          774 :     storage
      56          774 :         .upload_storage_object(
      57          774 :             futures::stream::once(futures::future::ready(Ok(serialized))),
      58          774 :             index_part_size,
      59          774 :             &remote_path,
      60          774 :             cancel,
      61          774 :         )
      62          774 :         .await
      63          770 :         .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
      64          770 : }
      65              : 
      66              : /// Serializes and uploads the given tenant manifest data to the remote storage.
      67          117 : pub(crate) async fn upload_tenant_manifest(
      68          117 :     storage: &GenericRemoteStorage,
      69          117 :     tenant_shard_id: &TenantShardId,
      70          117 :     generation: Generation,
      71          117 :     tenant_manifest: &TenantManifest,
      72          117 :     cancel: &CancellationToken,
      73          117 : ) -> anyhow::Result<()> {
      74          117 :     tracing::trace!("uploading new tenant manifest");
      75              : 
      76          117 :     fail_point!("before-upload-manifest", |_| {
      77            0 :         bail!("failpoint before-upload-manifest")
      78            0 :     });
      79          117 :     pausable_failpoint!("before-upload-manifest-pausable");
      80              : 
      81          117 :     let serialized = Bytes::from(tenant_manifest.to_json_bytes()?);
      82          117 :     let tenant_manifest_size = serialized.len();
      83          117 :     let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
      84              : 
      85          117 :     storage
      86          117 :         .upload_storage_object(
      87          117 :             futures::stream::once(futures::future::ready(Ok(serialized))),
      88          117 :             tenant_manifest_size,
      89          117 :             &remote_path,
      90          117 :             cancel,
      91          117 :         )
      92          117 :         .await
      93          117 :         .with_context(|| format!("upload tenant manifest for '{tenant_shard_id}'"))
      94          117 : }
      95              : 
      96              : /// Attempts to upload given layer files.
      97              : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
      98              : ///
      99              : /// On an error, bumps the retries count and reschedules the entire task.
     100          890 : pub(super) async fn upload_timeline_layer<'a>(
     101          890 :     storage: &'a GenericRemoteStorage,
     102          890 :     local_path: &'a Utf8Path,
     103          890 :     remote_path: &'a RemotePath,
     104          890 :     metadata_size: u64,
     105          890 :     cancel: &CancellationToken,
     106          890 : ) -> anyhow::Result<()> {
     107          890 :     fail_point!("before-upload-layer", |_| {
     108            0 :         bail!("failpoint before-upload-layer")
     109            0 :     });
     110              : 
     111          890 :     pausable_failpoint!("before-upload-layer-pausable");
     112              : 
     113          889 :     let source_file_res = fs::File::open(&local_path).await;
     114          888 :     let source_file = match source_file_res {
     115          888 :         Ok(source_file) => source_file,
     116            0 :         Err(e) if e.kind() == ErrorKind::NotFound => {
     117              :             // If we encounter this arm, it wasn't intended, but it's also not
     118              :             // a big problem, if it's because the file was deleted before an
     119              :             // upload. However, a nonexistent file can also be indicative of
     120              :             // something worse, like when a file is scheduled for upload before
     121              :             // it has been written to disk yet.
     122              :             //
     123              :             // This is tested against `test_compaction_delete_before_upload`
     124            0 :             info!(path = %local_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
     125            0 :             return Ok(());
     126              :         }
     127            0 :         Err(e) => Err(e).with_context(|| format!("open a source file for layer {local_path:?}"))?,
     128              :     };
     129              : 
     130          888 :     let fs_size = source_file
     131          888 :         .metadata()
     132          888 :         .await
     133          888 :         .with_context(|| format!("get the source file metadata for layer {local_path:?}"))?
     134          888 :         .len();
     135              : 
     136          888 :     if metadata_size != fs_size {
     137            0 :         bail!(
     138            0 :             "File {local_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}"
     139              :         );
     140          888 :     }
     141              : 
     142          888 :     let fs_size = usize::try_from(fs_size)
     143          888 :         .with_context(|| format!("convert {local_path:?} size {fs_size} usize"))?;
     144              :     /* BEGIN_HADRON */
     145          888 :     let mut metadata = None;
     146          888 :     match storage {
     147              :         // Pass the file path as a storage metadata to minimize changes to neon.
     148              :         // Otherwise, we need to change the upload interface.
     149            0 :         GenericRemoteStorage::AzureBlob(s) => {
     150            0 :             let block_size_mb = s.put_block_size_mb.unwrap_or(0);
     151            0 :             if block_size_mb > 0 && fs_size > block_size_mb * 1024 * 1024 {
     152            0 :                 metadata = Some(remote_storage::StorageMetadata::from([(
     153            0 :                     "databricks_azure_put_block",
     154            0 :                     local_path.as_str(),
     155            0 :                 )]));
     156            0 :             }
     157              :         }
     158          888 :         GenericRemoteStorage::LocalFs(_) => {}
     159            0 :         GenericRemoteStorage::AwsS3(_) => {}
     160            0 :         GenericRemoteStorage::Unreliable(_) => {}
     161              :     };
     162              :     /* END_HADRON */
     163          888 :     let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
     164              : 
     165          888 :     storage
     166          888 :         .upload(reader, fs_size, remote_path, metadata, cancel)
     167          888 :         .await
     168          880 :         .with_context(|| format!("upload layer from local path '{local_path}'"))
     169          880 : }
     170              : 
     171            0 : pub(super) async fn copy_timeline_layer(
     172            0 :     storage: &GenericRemoteStorage,
     173            0 :     source_path: &RemotePath,
     174            0 :     target_path: &RemotePath,
     175            0 :     cancel: &CancellationToken,
     176            0 : ) -> anyhow::Result<()> {
     177            0 :     fail_point!("before-copy-layer", |_| {
     178            0 :         bail!("failpoint before-copy-layer")
     179            0 :     });
     180              : 
     181            0 :     pausable_failpoint!("before-copy-layer-pausable");
     182              : 
     183            0 :     storage
     184            0 :         .copy_object(source_path, target_path, cancel)
     185            0 :         .await
     186            0 :         .with_context(|| format!("copy layer {source_path} to {target_path}"))
     187            0 : }
     188              : 
     189              : /// Uploads the given `initdb` data to the remote storage.
     190            0 : pub(crate) async fn upload_initdb_dir(
     191            0 :     storage: &GenericRemoteStorage,
     192            0 :     tenant_id: &TenantId,
     193            0 :     timeline_id: &TimelineId,
     194            0 :     mut initdb_tar_zst: File,
     195            0 :     size: u64,
     196            0 :     cancel: &CancellationToken,
     197            0 : ) -> anyhow::Result<()> {
     198            0 :     tracing::trace!("uploading initdb dir");
     199              : 
     200              :     // We might have read somewhat into the file already in the prior retry attempt
     201            0 :     initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
     202              : 
     203            0 :     let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
     204              : 
     205            0 :     let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
     206            0 :     storage
     207            0 :         .upload_storage_object(file, size as usize, &remote_path, cancel)
     208            0 :         .await
     209            0 :         .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
     210            0 : }
     211              : 
     212            0 : pub(crate) async fn preserve_initdb_archive(
     213            0 :     storage: &GenericRemoteStorage,
     214            0 :     tenant_id: &TenantId,
     215            0 :     timeline_id: &TimelineId,
     216            0 :     cancel: &CancellationToken,
     217            0 : ) -> anyhow::Result<()> {
     218            0 :     let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
     219            0 :     let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
     220            0 :     storage
     221            0 :         .copy_object(&source_path, &dest_path, cancel)
     222            0 :         .await
     223            0 :         .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
     224            0 : }
     225              : 
     226            0 : pub(crate) async fn time_travel_recover_tenant(
     227            0 :     storage: &GenericRemoteStorage,
     228            0 :     tenant_shard_id: &TenantShardId,
     229            0 :     timestamp: SystemTime,
     230            0 :     done_if_after: SystemTime,
     231            0 :     cancel: &CancellationToken,
     232            0 : ) -> Result<(), TimeTravelError> {
     233            0 :     let warn_after = 3;
     234            0 :     let max_attempts = 10;
     235            0 :     let mut prefixes = Vec::with_capacity(2);
     236            0 :     if tenant_shard_id.is_shard_zero() {
     237            0 :         // Also recover the unsharded prefix for a shard of zero:
     238            0 :         // - if the tenant is totally unsharded, the unsharded prefix contains all the data
     239            0 :         // - if the tenant is sharded, we still want to recover the initdb data, but we only
     240            0 :         //   want to do it once, so let's do it on the 0 shard
     241            0 :         let timelines_path_unsharded =
     242            0 :             super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
     243            0 :         prefixes.push(timelines_path_unsharded);
     244            0 :     }
     245            0 :     if !tenant_shard_id.is_unsharded() {
     246            0 :         // If the tenant is sharded, we need to recover the sharded prefix
     247            0 :         let timelines_path = super::remote_timelines_path(tenant_shard_id);
     248            0 :         prefixes.push(timelines_path);
     249            0 :     }
     250              : 
     251              :     // Limit the number of versions deletions, mostly so that we don't
     252              :     // keep requesting forever if the list is too long, as we'd put the
     253              :     // list in RAM.
     254              :     // Building a list of 100k entries that reaches the limit roughly takes
     255              :     // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
     256              :     const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
     257              : 
     258            0 :     for prefix in &prefixes {
     259            0 :         backoff::retry(
     260            0 :             || async {
     261            0 :                 storage
     262            0 :                     .time_travel_recover(
     263            0 :                         Some(prefix),
     264            0 :                         timestamp,
     265            0 :                         done_if_after,
     266            0 :                         cancel,
     267            0 :                         COMPLEXITY_LIMIT,
     268            0 :                     )
     269            0 :                     .await
     270            0 :             },
     271            0 :             |e| !matches!(e, TimeTravelError::Other(_)),
     272            0 :             warn_after,
     273            0 :             max_attempts,
     274            0 :             "time travel recovery of tenant prefix",
     275            0 :             cancel,
     276              :         )
     277            0 :         .await
     278            0 :         .ok_or_else(|| TimeTravelError::Cancelled)
     279            0 :         .and_then(|x| x)?;
     280              :     }
     281            0 :     Ok(())
     282            0 : }
        

Generated by: LCOV version 2.1-beta