LCOV - code coverage report
Current view: top level - pageserver/src/tenant/remote_timeline_client - upload.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 93.2 % 59 55
Test Date: 2023-09-06 10:18:01 Functions: 61.5 % 13 8

            Line data    Source code
       1              : //! Helper functions to upload files to remote storage with a RemoteStorage
       2              : 
       3              : use anyhow::{bail, Context};
       4              : use fail::fail_point;
       5              : use std::{io::ErrorKind, path::Path};
       6              : use tokio::fs;
       7              : 
       8              : use super::Generation;
       9              : use crate::{
      10              :     config::PageServerConf,
      11              :     tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
      12              : };
      13              : use remote_storage::GenericRemoteStorage;
      14              : use utils::id::{TenantId, TimelineId};
      15              : 
      16              : use super::index::LayerFileMetadata;
      17              : 
      18              : use tracing::info;
      19              : 
      20              : /// Serializes and uploads the given index part data to the remote storage.
      21         5246 : pub(super) async fn upload_index_part<'a>(
      22         5246 :     storage: &'a GenericRemoteStorage,
      23         5246 :     tenant_id: &TenantId,
      24         5246 :     timeline_id: &TimelineId,
      25         5246 :     generation: Generation,
      26         5246 :     index_part: &'a IndexPart,
      27         5246 : ) -> anyhow::Result<()> {
      28            0 :     tracing::trace!("uploading new index part");
      29              : 
      30         5246 :     fail_point!("before-upload-index", |_| {
      31            7 :         bail!("failpoint before-upload-index")
      32         5246 :     });
      33              : 
      34         5239 :     let index_part_bytes =
      35         5239 :         serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
      36         5239 :     let index_part_size = index_part_bytes.len();
      37         5239 :     let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
      38         5239 : 
      39         5239 :     let remote_path = remote_index_path(tenant_id, timeline_id, generation);
      40         5239 :     storage
      41         5239 :         .upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
      42        14484 :         .await
      43         5237 :         .with_context(|| format!("upload index part for '{tenant_id} / {timeline_id}'"))
      44         5244 : }
      45              : 
      46              : /// Attempts to upload given layer files.
      47              : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
      48              : ///
      49              : /// On an error, bumps the retries count and reschedules the entire task.
      50        14125 : pub(super) async fn upload_timeline_layer<'a>(
      51        14125 :     conf: &'static PageServerConf,
      52        14125 :     storage: &'a GenericRemoteStorage,
      53        14125 :     source_path: &'a Path,
      54        14125 :     known_metadata: &'a LayerFileMetadata,
      55        14125 :     generation: Generation,
      56        14125 : ) -> anyhow::Result<()> {
      57        14125 :     fail_point!("before-upload-layer", |_| {
      58            4 :         bail!("failpoint before-upload-layer")
      59        14125 :     });
      60              : 
      61        14121 :     let storage_path = remote_path(conf, source_path, generation)?;
      62        14121 :     let source_file_res = fs::File::open(&source_path).await;
      63        14120 :     let source_file = match source_file_res {
      64        14120 :         Ok(source_file) => source_file,
      65            1 :         Err(e) if e.kind() == ErrorKind::NotFound => {
      66              :             // If we encounter this arm, it wasn't intended, but it's also not
      67              :             // a big problem, if it's because the file was deleted before an
      68              :             // upload. However, a nonexistent file can also be indicative of
      69              :             // something worse, like when a file is scheduled for upload before
      70              :             // it has been written to disk yet.
      71            1 :             info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
      72            1 :             return Ok(());
      73              :         }
      74            0 :         Err(e) => {
      75            0 :             Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
      76              :         }
      77              :     };
      78              : 
      79        14120 :     let fs_size = source_file
      80        14120 :         .metadata()
      81        13998 :         .await
      82        14119 :         .with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
      83        14119 :         .len();
      84        14119 : 
      85        14119 :     let metadata_size = known_metadata.file_size();
      86        14119 :     if metadata_size != fs_size {
      87            0 :         bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
      88        14119 :     }
      89              : 
      90        14119 :     let fs_size = usize::try_from(fs_size)
      91        14119 :         .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
      92              : 
      93        14119 :     storage
      94        14119 :         .upload(source_file, fs_size, &storage_path, None)
      95      1313870 :         .await
      96        14111 :         .with_context(|| format!("upload layer from local path '{}'", source_path.display()))?;
      97              : 
      98        12204 :     Ok(())
      99        14116 : }
        

Generated by: LCOV version 2.1-beta