LCOV - code coverage report
Current view: top level - libs/remote_storage/src - lib.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 73.6 % 417 307
Test Date: 2024-02-14 18:05:35 Functions: 61.8 % 157 97

            Line data    Source code
       1              : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
       2              : //! No other modules from this tree are supposed to be used directly by the external code.
       3              : //!
       4              : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
       5              : //!   * [`local_fs`] allows to use local file system as an external storage
       6              : //!   * [`s3_bucket`] uses AWS S3 bucket as an external storage
       7              : //!   * [`azure_blob`] allows to use Azure Blob storage as an external storage
       8              : //!
       9              : #![deny(unsafe_code)]
      10              : #![deny(clippy::undocumented_unsafe_blocks)]
      11              : 
      12              : mod azure_blob;
      13              : mod local_fs;
      14              : mod s3_bucket;
      15              : mod simulate_failures;
      16              : mod support;
      17              : 
      18              : use std::{
      19              :     collections::HashMap,
      20              :     fmt::Debug,
      21              :     num::{NonZeroU32, NonZeroUsize},
      22              :     pin::Pin,
      23              :     sync::Arc,
      24              :     time::SystemTime,
      25              : };
      26              : 
      27              : use anyhow::{bail, Context};
      28              : use camino::{Utf8Path, Utf8PathBuf};
      29              : 
      30              : use bytes::Bytes;
      31              : use futures::stream::Stream;
      32              : use serde::{Deserialize, Serialize};
      33              : use tokio::sync::Semaphore;
      34              : use tokio_util::sync::CancellationToken;
      35              : use toml_edit::Item;
      36              : use tracing::info;
      37              : 
      38              : pub use self::{
      39              :     azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket,
      40              :     simulate_failures::UnreliableWrapper,
      41              : };
      42              : use s3_bucket::RequestKind;
      43              : 
      44              : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
      45              : /// ~200 RPS for IAM services
      46              : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
      47              : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
      48              : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
      49              : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
      50              : /// We set this a little bit low as we currently buffer the entire file into RAM
      51              : ///
      52              : /// Here, a limit of max 20k concurrent connections was noted.
      53              : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
      54              : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 30;
      55              : /// No limits on the client side, which currenltly means 1000 for AWS S3.
      56              : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
      57              : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
      58              : 
      59              : /// As defined in S3 docs
      60              : pub const MAX_KEYS_PER_DELETE: usize = 1000;
      61              : 
      62              : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
      63              : 
      64              : /// Path on the remote storage, relative to some inner prefix.
      65              : /// The prefix is an implementation detail, that allows representing local paths
      66              : /// as the remote ones, stripping the local storage prefix away.
      67        17591 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
      68              : pub struct RemotePath(Utf8PathBuf);
      69              : 
      70              : impl Serialize for RemotePath {
      71            0 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
      72            0 :     where
      73            0 :         S: serde::Serializer,
      74            0 :     {
      75            0 :         serializer.collect_str(self)
      76            0 :     }
      77              : }
      78              : 
      79              : impl<'de> Deserialize<'de> for RemotePath {
      80            0 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
      81            0 :     where
      82            0 :         D: serde::Deserializer<'de>,
      83            0 :     {
      84            0 :         let str = String::deserialize(deserializer)?;
      85            0 :         Ok(Self(Utf8PathBuf::from(&str)))
      86            0 :     }
      87              : }
      88              : 
      89              : impl std::fmt::Display for RemotePath {
      90         1032 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      91         1032 :         std::fmt::Display::fmt(&self.0, f)
      92         1032 :     }
      93              : }
      94              : 
      95              : impl RemotePath {
      96        57576 :     pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
      97        57576 :         anyhow::ensure!(
      98        57576 :             relative_path.is_relative(),
      99            2 :             "Path {relative_path:?} is not relative"
     100              :         );
     101        57574 :         Ok(Self(relative_path.to_path_buf()))
     102        57576 :     }
     103              : 
     104        33172 :     pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
     105        33172 :         Self::new(Utf8Path::new(relative_path))
     106        33172 :     }
     107              : 
     108        16055 :     pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
     109        16055 :         base_path.join(&self.0)
     110        16055 :     }
     111              : 
     112         6349 :     pub fn object_name(&self) -> Option<&str> {
     113         6349 :         self.0.file_name()
     114         6349 :     }
     115              : 
     116         5245 :     pub fn join(&self, segment: &Utf8Path) -> Self {
     117         5245 :         Self(self.0.join(segment))
     118         5245 :     }
     119              : 
     120        35128 :     pub fn get_path(&self) -> &Utf8PathBuf {
     121        35128 :         &self.0
     122        35128 :     }
     123              : 
     124            0 :     pub fn extension(&self) -> Option<&str> {
     125            0 :         self.0.extension()
     126            0 :     }
     127              : 
     128         3675 :     pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
     129         3675 :         self.0.strip_prefix(&p.0)
     130         3675 :     }
     131              : }
     132              : 
     133              : /// We don't need callers to be able to pass arbitrary delimiters: just control
     134              : /// whether listings will use a '/' separator or not.
     135              : ///
     136              : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result.  The
     137              : /// NoDelimiter mode will only populate `keys`.
     138              : pub enum ListingMode {
     139              :     WithDelimiter,
     140              :     NoDelimiter,
     141              : }
     142              : 
     143         1298 : #[derive(Default)]
     144              : pub struct Listing {
     145              :     pub prefixes: Vec<RemotePath>,
     146              :     pub keys: Vec<RemotePath>,
     147              : }
     148              : 
     149              : /// Storage (potentially remote) API to manage its state.
     150              : /// This storage tries to be unaware of any layered repository context,
     151              : /// providing basic CRUD operations for storage files.
     152              : #[allow(async_fn_in_trait)]
     153              : pub trait RemoteStorage: Send + Sync + 'static {
     154              :     /// Lists all top level subdirectories for a given prefix
     155              :     /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
     156              :     /// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
     157              :     /// so this method doesnt need to.
     158            9 :     async fn list_prefixes(
     159            9 :         &self,
     160            9 :         prefix: Option<&RemotePath>,
     161            9 :     ) -> Result<Vec<RemotePath>, DownloadError> {
     162            0 :         let result = self
     163            0 :             .list(prefix, ListingMode::WithDelimiter, None)
     164            0 :             .await?
     165              :             .prefixes;
     166            0 :         Ok(result)
     167            0 :     }
     168              :     /// Lists all files in directory "recursively"
     169              :     /// (not really recursively, because AWS has a flat namespace)
     170              :     /// Note: This is subtely different than list_prefixes,
     171              :     /// because it is for listing files instead of listing
     172              :     /// names sharing common prefixes.
     173              :     /// For example,
     174              :     /// list_files("foo/bar") = ["foo/bar/cat123.txt",
     175              :     /// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"]
     176              :     /// whereas,
     177              :     /// list_prefixes("foo/bar/") = ["cat", "dog"]
     178              :     /// See `test_real_s3.rs` for more details.
     179              :     ///
     180              :     /// max_keys limits max number of keys returned; None means unlimited.
     181          447 :     async fn list_files(
     182          447 :         &self,
     183          447 :         prefix: Option<&RemotePath>,
     184          447 :         max_keys: Option<NonZeroU32>,
     185          447 :     ) -> Result<Vec<RemotePath>, DownloadError> {
     186          426 :         let result = self
     187          426 :             .list(prefix, ListingMode::NoDelimiter, max_keys)
     188         1440 :             .await?
     189              :             .keys;
     190          426 :         Ok(result)
     191          426 :     }
     192              : 
     193              :     async fn list(
     194              :         &self,
     195              :         prefix: Option<&RemotePath>,
     196              :         _mode: ListingMode,
     197              :         max_keys: Option<NonZeroU32>,
     198              :     ) -> Result<Listing, DownloadError>;
     199              : 
     200              :     /// Streams the local file contents into remote into the remote storage entry.
     201              :     async fn upload(
     202              :         &self,
     203              :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     204              :         // S3 PUT request requires the content length to be specified,
     205              :         // otherwise it starts to fail with the concurrent connection count increasing.
     206              :         data_size_bytes: usize,
     207              :         to: &RemotePath,
     208              :         metadata: Option<StorageMetadata>,
     209              :     ) -> anyhow::Result<()>;
     210              : 
     211              :     /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
     212              :     /// Returns the metadata, if any was stored with the file previously.
     213              :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError>;
     214              : 
     215              :     /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
     216              :     /// Returns the metadata, if any was stored with the file previously.
     217              :     async fn download_byte_range(
     218              :         &self,
     219              :         from: &RemotePath,
     220              :         start_inclusive: u64,
     221              :         end_exclusive: Option<u64>,
     222              :     ) -> Result<Download, DownloadError>;
     223              : 
     224              :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
     225              : 
     226              :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
     227              : 
     228              :     /// Copy a remote object inside a bucket from one path to another.
     229              :     async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>;
     230              : 
     231              :     /// Resets the content of everything with the given prefix to the given state
     232              :     async fn time_travel_recover(
     233              :         &self,
     234              :         prefix: Option<&RemotePath>,
     235              :         timestamp: SystemTime,
     236              :         done_if_after: SystemTime,
     237              :         cancel: &CancellationToken,
     238              :     ) -> Result<(), TimeTravelError>;
     239              : }
     240              : 
     241              : pub type DownloadStream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync>>;
     242              : pub struct Download {
     243              :     pub download_stream: DownloadStream,
     244              :     /// The last time the file was modified (`last-modified` HTTP header)
     245              :     pub last_modified: Option<SystemTime>,
     246              :     /// A way to identify this specific version of the resource (`etag` HTTP header)
     247              :     pub etag: Option<String>,
     248              :     /// Extra key-value data, associated with the current remote file.
     249              :     pub metadata: Option<StorageMetadata>,
     250              : }
     251              : 
     252              : impl Debug for Download {
     253            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     254            0 :         f.debug_struct("Download")
     255            0 :             .field("metadata", &self.metadata)
     256            0 :             .finish()
     257            0 :     }
     258              : }
     259              : 
     260            0 : #[derive(Debug)]
     261              : pub enum DownloadError {
     262              :     /// Validation or other error happened due to user input.
     263              :     BadInput(anyhow::Error),
     264              :     /// The file was not found in the remote storage.
     265              :     NotFound,
     266              :     /// A cancellation token aborted the download, typically during
     267              :     /// tenant detach or process shutdown.
     268              :     Cancelled,
     269              :     /// The file was found in the remote storage, but the download failed.
     270              :     Other(anyhow::Error),
     271              : }
     272              : 
     273              : impl std::fmt::Display for DownloadError {
     274          258 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     275          258 :         match self {
     276            0 :             DownloadError::BadInput(e) => {
     277            0 :                 write!(f, "Failed to download a remote file due to user input: {e}")
     278              :             }
     279            7 :             DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
     280            0 :             DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
     281          251 :             DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
     282              :         }
     283          258 :     }
     284              : }
     285              : 
     286              : impl std::error::Error for DownloadError {}
     287              : 
     288              : impl DownloadError {
     289              :     /// Returns true if the error should not be retried with backoff
     290          835 :     pub fn is_permanent(&self) -> bool {
     291          835 :         use DownloadError::*;
     292          835 :         match self {
     293            0 :             BadInput(_) => true,
     294          623 :             NotFound => true,
     295            0 :             Cancelled => true,
     296          212 :             Other(_) => false,
     297              :         }
     298          835 :     }
     299              : }
     300              : 
     301            0 : #[derive(Debug)]
     302              : pub enum TimeTravelError {
     303              :     /// Validation or other error happened due to user input.
     304              :     BadInput(anyhow::Error),
     305              :     /// The used remote storage does not have time travel recovery implemented
     306              :     Unimplemented,
     307              :     /// The number of versions/deletion markers is above our limit.
     308              :     TooManyVersions,
     309              :     /// A cancellation token aborted the process, typically during
     310              :     /// request closure or process shutdown.
     311              :     Cancelled,
     312              :     /// Other errors
     313              :     Other(anyhow::Error),
     314              : }
     315              : 
     316              : impl std::fmt::Display for TimeTravelError {
     317            1 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     318            1 :         match self {
     319            0 :             TimeTravelError::BadInput(e) => {
     320            0 :                 write!(
     321            0 :                     f,
     322            0 :                     "Failed to time travel recover a prefix due to user input: {e}"
     323            0 :                 )
     324              :             }
     325            0 :             TimeTravelError::Unimplemented => write!(
     326            0 :                 f,
     327            0 :                 "time travel recovery is not implemented for the current storage backend"
     328            0 :             ),
     329            0 :             TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"),
     330              :             TimeTravelError::TooManyVersions => {
     331            0 :                 write!(f, "Number of versions/delete markers above limit")
     332              :             }
     333            1 :             TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"),
     334              :         }
     335            1 :     }
     336              : }
     337              : 
     338              : impl std::error::Error for TimeTravelError {}
     339              : 
     340              : /// Every storage, currently supported.
     341              : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
     342         7631 : #[derive(Clone)]
     343              : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
     344              : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
     345              :     LocalFs(LocalFs),
     346              :     AwsS3(Arc<S3Bucket>),
     347              :     AzureBlob(Arc<AzureBlobStorage>),
     348              :     Unreliable(Other),
     349              : }
     350              : 
     351              : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
     352          992 :     pub async fn list(
     353          992 :         &self,
     354          992 :         prefix: Option<&RemotePath>,
     355          992 :         mode: ListingMode,
     356          992 :         max_keys: Option<NonZeroU32>,
     357          992 :     ) -> anyhow::Result<Listing, DownloadError> {
     358          992 :         match self {
     359          573 :             Self::LocalFs(s) => s.list(prefix, mode, max_keys).await,
     360         1768 :             Self::AwsS3(s) => s.list(prefix, mode, max_keys).await,
     361            0 :             Self::AzureBlob(s) => s.list(prefix, mode, max_keys).await,
     362          224 :             Self::Unreliable(s) => s.list(prefix, mode, max_keys).await,
     363              :         }
     364          992 :     }
     365              : 
     366              :     // A function for listing all the files in a "directory"
     367              :     // Example:
     368              :     // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
     369              :     //
     370              :     // max_keys limits max number of keys returned; None means unlimited.
     371          554 :     pub async fn list_files(
     372          554 :         &self,
     373          554 :         folder: Option<&RemotePath>,
     374          554 :         max_keys: Option<NonZeroU32>,
     375          554 :     ) -> Result<Vec<RemotePath>, DownloadError> {
     376          554 :         match self {
     377          330 :             Self::LocalFs(s) => s.list_files(folder, max_keys).await,
     378         1110 :             Self::AwsS3(s) => s.list_files(folder, max_keys).await,
     379            0 :             Self::AzureBlob(s) => s.list_files(folder, max_keys).await,
     380          262 :             Self::Unreliable(s) => s.list_files(folder, max_keys).await,
     381              :         }
     382          554 :     }
     383              : 
     384              :     // lists common *prefixes*, if any of files
     385              :     // Example:
     386              :     // list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
     387            0 :     pub async fn list_prefixes(
     388            0 :         &self,
     389            0 :         prefix: Option<&RemotePath>,
     390            0 :     ) -> Result<Vec<RemotePath>, DownloadError> {
     391            0 :         match self {
     392            0 :             Self::LocalFs(s) => s.list_prefixes(prefix).await,
     393            0 :             Self::AwsS3(s) => s.list_prefixes(prefix).await,
     394            0 :             Self::AzureBlob(s) => s.list_prefixes(prefix).await,
     395            0 :             Self::Unreliable(s) => s.list_prefixes(prefix).await,
     396              :         }
     397            0 :     }
     398              : 
     399        34039 :     pub async fn upload(
     400        34039 :         &self,
     401        34039 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     402        34039 :         data_size_bytes: usize,
     403        34039 :         to: &RemotePath,
     404        34039 :         metadata: Option<StorageMetadata>,
     405        34039 :     ) -> anyhow::Result<()> {
     406        34039 :         match self {
     407       944335 :             Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
     408        56585 :             Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
     409            0 :             Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await,
     410        20183 :             Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
     411              :         }
     412        34030 :     }
     413              : 
     414        11992 :     pub async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     415        11992 :         match self {
     416         1403 :             Self::LocalFs(s) => s.download(from).await,
     417        30321 :             Self::AwsS3(s) => s.download(from).await,
     418            0 :             Self::AzureBlob(s) => s.download(from).await,
     419          184 :             Self::Unreliable(s) => s.download(from).await,
     420              :         }
     421        11989 :     }
     422              : 
     423           38 :     pub async fn download_byte_range(
     424           38 :         &self,
     425           38 :         from: &RemotePath,
     426           38 :         start_inclusive: u64,
     427           38 :         end_exclusive: Option<u64>,
     428           38 :     ) -> Result<Download, DownloadError> {
     429           38 :         match self {
     430            2 :             Self::LocalFs(s) => {
     431            2 :                 s.download_byte_range(from, start_inclusive, end_exclusive)
     432            6 :                     .await
     433              :             }
     434           36 :             Self::AwsS3(s) => {
     435           36 :                 s.download_byte_range(from, start_inclusive, end_exclusive)
     436          106 :                     .await
     437              :             }
     438            0 :             Self::AzureBlob(s) => {
     439            0 :                 s.download_byte_range(from, start_inclusive, end_exclusive)
     440            0 :                     .await
     441              :             }
     442            0 :             Self::Unreliable(s) => {
     443            0 :                 s.download_byte_range(from, start_inclusive, end_exclusive)
     444            0 :                     .await
     445              :             }
     446              :         }
     447           38 :     }
     448              : 
     449         2100 :     pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     450         2100 :         match self {
     451           16 :             Self::LocalFs(s) => s.delete(path).await,
     452         8284 :             Self::AwsS3(s) => s.delete(path).await,
     453            0 :             Self::AzureBlob(s) => s.delete(path).await,
     454          142 :             Self::Unreliable(s) => s.delete(path).await,
     455              :         }
     456         2100 :     }
     457              : 
     458          532 :     pub async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     459          532 :         match self {
     460         1133 :             Self::LocalFs(s) => s.delete_objects(paths).await,
     461         1035 :             Self::AwsS3(s) => s.delete_objects(paths).await,
     462            0 :             Self::AzureBlob(s) => s.delete_objects(paths).await,
     463         7981 :             Self::Unreliable(s) => s.delete_objects(paths).await,
     464              :         }
     465          532 :     }
     466              : 
     467           14 :     pub async fn copy_object(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
     468           14 :         match self {
     469            2 :             Self::LocalFs(s) => s.copy(from, to).await,
     470           48 :             Self::AwsS3(s) => s.copy(from, to).await,
     471            0 :             Self::AzureBlob(s) => s.copy(from, to).await,
     472            0 :             Self::Unreliable(s) => s.copy(from, to).await,
     473              :         }
     474           14 :     }
     475              : 
     476            1 :     pub async fn time_travel_recover(
     477            1 :         &self,
     478            1 :         prefix: Option<&RemotePath>,
     479            1 :         timestamp: SystemTime,
     480            1 :         done_if_after: SystemTime,
     481            1 :         cancel: &CancellationToken,
     482            1 :     ) -> Result<(), TimeTravelError> {
     483            1 :         match self {
     484            0 :             Self::LocalFs(s) => {
     485            0 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
     486            0 :                     .await
     487              :             }
     488            1 :             Self::AwsS3(s) => {
     489            1 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
     490          151 :                     .await
     491              :             }
     492            0 :             Self::AzureBlob(s) => {
     493            0 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
     494            0 :                     .await
     495              :             }
     496            0 :             Self::Unreliable(s) => {
     497            0 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
     498            0 :                     .await
     499              :             }
     500              :         }
     501            1 :     }
     502              : }
     503              : 
     504              : impl GenericRemoteStorage {
     505          770 :     pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
     506          770 :         Ok(match &storage_config.storage {
     507          480 :             RemoteStorageKind::LocalFs(root) => {
     508          480 :                 info!("Using fs root '{root}' as a remote storage");
     509          480 :                 Self::LocalFs(LocalFs::new(root.clone())?)
     510              :             }
     511          284 :             RemoteStorageKind::AwsS3(s3_config) => {
     512          284 :                 // The profile and access key id are only printed here for debugging purposes,
     513          284 :                 // their values don't indicate the eventually taken choice for auth.
     514          284 :                 let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
     515          284 :                 let access_key_id =
     516          284 :                     std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
     517          284 :                 info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
     518          284 :                       s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
     519          284 :                 Self::AwsS3(Arc::new(S3Bucket::new(s3_config)?))
     520              :             }
     521            6 :             RemoteStorageKind::AzureContainer(azure_config) => {
     522            6 :                 info!("Using azure container '{}' in region '{}' as a remote storage, prefix in container: '{:?}'",
     523            6 :                       azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
     524            6 :                 Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?))
     525              :             }
     526              :         })
     527          770 :     }
     528              : 
     529           67 :     pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
     530           67 :         Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
     531           67 :     }
     532              : 
     533              :     /// Takes storage object contents and its size and uploads to remote storage,
     534              :     /// mapping `from_path` to the corresponding remote object id in the storage.
     535              :     ///
     536              :     /// The storage object does not have to be present on the `from_path`,
     537              :     /// this path is used for the remote object id conversion only.
     538         7549 :     pub async fn upload_storage_object(
     539         7549 :         &self,
     540         7549 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     541         7549 :         from_size_bytes: usize,
     542         7549 :         to: &RemotePath,
     543         7549 :     ) -> anyhow::Result<()> {
     544         7549 :         self.upload(from, from_size_bytes, to, None)
     545        44615 :             .await
     546         7542 :             .with_context(|| {
     547          783 :                 format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
     548         7542 :             })
     549         7542 :     }
     550              : 
     551              :     /// Downloads the storage object into the `to_path` provided.
     552              :     /// `byte_range` could be specified to dowload only a part of the file, if needed.
     553           38 :     pub async fn download_storage_object(
     554           38 :         &self,
     555           38 :         byte_range: Option<(u64, Option<u64>)>,
     556           38 :         from: &RemotePath,
     557           38 :     ) -> Result<Download, DownloadError> {
     558           38 :         match byte_range {
     559          112 :             Some((start, end)) => self.download_byte_range(from, start, end).await,
     560            0 :             None => self.download(from).await,
     561              :         }
     562           38 :     }
     563              : }
     564              : 
     565              : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
     566              : /// Immutable, cannot be changed once the file is created.
     567            4 : #[derive(Debug, Clone, PartialEq, Eq)]
     568              : pub struct StorageMetadata(HashMap<String, String>);
     569              : 
     570              : /// External backup storage configuration, enough for creating a client for that storage.
     571          797 : #[derive(Debug, Clone, PartialEq, Eq)]
     572              : pub struct RemoteStorageConfig {
     573              :     /// The storage connection configuration.
     574              :     pub storage: RemoteStorageKind,
     575              : }
     576              : 
     577              : /// A kind of a remote storage to connect to, with its connection configuration.
     578          797 : #[derive(Debug, Clone, PartialEq, Eq)]
     579              : pub enum RemoteStorageKind {
     580              :     /// Storage based on local file system.
     581              :     /// Specify a root folder to place all stored files into.
     582              :     LocalFs(Utf8PathBuf),
     583              :     /// AWS S3 based storage, storing all files in the S3 bucket
     584              :     /// specified by the config
     585              :     AwsS3(S3Config),
     586              :     /// Azure Blob based storage, storing all files in the container
     587              :     /// specified by the config
     588              :     AzureContainer(AzureConfig),
     589              : }
     590              : 
     591              : /// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
     592          688 : #[derive(Clone, PartialEq, Eq)]
     593              : pub struct S3Config {
     594              :     /// Name of the bucket to connect to.
     595              :     pub bucket_name: String,
     596              :     /// The region where the bucket is located at.
     597              :     pub bucket_region: String,
     598              :     /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
     599              :     pub prefix_in_bucket: Option<String>,
     600              :     /// A base URL to send S3 requests to.
     601              :     /// By default, the endpoint is derived from a region name, assuming it's
     602              :     /// an AWS S3 region name, erroring on wrong region name.
     603              :     /// Endpoint provides a way to support other S3 flavors and their regions.
     604              :     ///
     605              :     /// Example: `http://127.0.0.1:5000`
     606              :     pub endpoint: Option<String>,
     607              :     /// AWS S3 has various limits on its API calls, we need not to exceed those.
     608              :     /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
     609              :     pub concurrency_limit: NonZeroUsize,
     610              :     pub max_keys_per_list_response: Option<i32>,
     611              : }
     612              : 
     613              : impl Debug for S3Config {
     614           19 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     615           19 :         f.debug_struct("S3Config")
     616           19 :             .field("bucket_name", &self.bucket_name)
     617           19 :             .field("bucket_region", &self.bucket_region)
     618           19 :             .field("prefix_in_bucket", &self.prefix_in_bucket)
     619           19 :             .field("concurrency_limit", &self.concurrency_limit)
     620           19 :             .field(
     621           19 :                 "max_keys_per_list_response",
     622           19 :                 &self.max_keys_per_list_response,
     623           19 :             )
     624           19 :             .finish()
     625           19 :     }
     626              : }
     627              : 
     628              : /// Azure  bucket coordinates and access credentials to manage the bucket contents (read and write).
     629            0 : #[derive(Clone, PartialEq, Eq)]
     630              : pub struct AzureConfig {
     631              :     /// Name of the container to connect to.
     632              :     pub container_name: String,
     633              :     /// The region where the bucket is located at.
     634              :     pub container_region: String,
     635              :     /// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
     636              :     pub prefix_in_container: Option<String>,
     637              :     /// Azure has various limits on its API calls, we need not to exceed those.
     638              :     /// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
     639              :     pub concurrency_limit: NonZeroUsize,
     640              :     pub max_keys_per_list_response: Option<i32>,
     641              : }
     642              : 
     643              : impl Debug for AzureConfig {
     644            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     645            0 :         f.debug_struct("AzureConfig")
     646            0 :             .field("bucket_name", &self.container_name)
     647            0 :             .field("bucket_region", &self.container_region)
     648            0 :             .field("prefix_in_bucket", &self.prefix_in_container)
     649            0 :             .field("concurrency_limit", &self.concurrency_limit)
     650            0 :             .field(
     651            0 :                 "max_keys_per_list_response",
     652            0 :                 &self.max_keys_per_list_response,
     653            0 :             )
     654            0 :             .finish()
     655            0 :     }
     656              : }
     657              : 
     658              : impl RemoteStorageConfig {
     659         1122 :     pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
     660         1122 :         let local_path = toml.get("local_path");
     661         1122 :         let bucket_name = toml.get("bucket_name");
     662         1122 :         let bucket_region = toml.get("bucket_region");
     663         1122 :         let container_name = toml.get("container_name");
     664         1122 :         let container_region = toml.get("container_region");
     665              : 
     666         1122 :         let use_azure = container_name.is_some() && container_region.is_some();
     667              : 
     668         1122 :         let default_concurrency_limit = if use_azure {
     669            0 :             DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT
     670              :         } else {
     671         1122 :             DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
     672              :         };
     673         1122 :         let concurrency_limit = NonZeroUsize::new(
     674         1122 :             parse_optional_integer("concurrency_limit", toml)?.unwrap_or(default_concurrency_limit),
     675         1122 :         )
     676         1122 :         .context("Failed to parse 'concurrency_limit' as a positive integer")?;
     677              : 
     678         1122 :         let max_keys_per_list_response =
     679         1122 :             parse_optional_integer::<i32, _>("max_keys_per_list_response", toml)
     680         1122 :                 .context("Failed to parse 'max_keys_per_list_response' as a positive integer")?
     681         1122 :                 .or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE);
     682              : 
     683         1122 :         let endpoint = toml
     684         1122 :             .get("endpoint")
     685         1122 :             .map(|endpoint| parse_toml_string("endpoint", endpoint))
     686         1122 :             .transpose()?;
     687              : 
     688         1060 :         let storage = match (
     689         1122 :             local_path,
     690         1122 :             bucket_name,
     691         1122 :             bucket_region,
     692         1122 :             container_name,
     693         1122 :             container_region,
     694              :         ) {
     695              :             // no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled
     696           62 :             (None, None, None, None, None) => return Ok(None),
     697              :             (_, Some(_), None, ..) => {
     698            0 :                 bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
     699              :             }
     700              :             (_, None, Some(_), ..) => {
     701            0 :                 bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
     702              :             }
     703          420 :             (None, Some(bucket_name), Some(bucket_region), ..) => {
     704          420 :                 RemoteStorageKind::AwsS3(S3Config {
     705          420 :                     bucket_name: parse_toml_string("bucket_name", bucket_name)?,
     706          420 :                     bucket_region: parse_toml_string("bucket_region", bucket_region)?,
     707          420 :                     prefix_in_bucket: toml
     708          420 :                         .get("prefix_in_bucket")
     709          420 :                         .map(|prefix_in_bucket| {
     710          420 :                             parse_toml_string("prefix_in_bucket", prefix_in_bucket)
     711          420 :                         })
     712          420 :                         .transpose()?,
     713          420 :                     endpoint,
     714          420 :                     concurrency_limit,
     715          420 :                     max_keys_per_list_response,
     716              :                 })
     717              :             }
     718              :             (_, _, _, Some(_), None) => {
     719            0 :                 bail!("'container_name' option is mandatory if 'container_region' is given ")
     720              :             }
     721              :             (_, _, _, None, Some(_)) => {
     722            0 :                 bail!("'container_name' option is mandatory if 'container_region' is given ")
     723              :             }
     724            0 :             (None, None, None, Some(container_name), Some(container_region)) => {
     725            0 :                 RemoteStorageKind::AzureContainer(AzureConfig {
     726            0 :                     container_name: parse_toml_string("container_name", container_name)?,
     727            0 :                     container_region: parse_toml_string("container_region", container_region)?,
     728            0 :                     prefix_in_container: toml
     729            0 :                         .get("prefix_in_container")
     730            0 :                         .map(|prefix_in_container| {
     731            0 :                             parse_toml_string("prefix_in_container", prefix_in_container)
     732            0 :                         })
     733            0 :                         .transpose()?,
     734            0 :                     concurrency_limit,
     735            0 :                     max_keys_per_list_response,
     736              :                 })
     737              :             }
     738          640 :             (Some(local_path), None, None, None, None) => RemoteStorageKind::LocalFs(
     739          640 :                 Utf8PathBuf::from(parse_toml_string("local_path", local_path)?),
     740              :             ),
     741              :             (Some(_), Some(_), ..) => {
     742            0 :                 bail!("'local_path' and 'bucket_name' are mutually exclusive")
     743              :             }
     744              :             (Some(_), _, _, Some(_), Some(_)) => {
     745            0 :                 bail!("local_path and 'container_name' are mutually exclusive")
     746              :             }
     747              :         };
     748              : 
     749         1060 :         Ok(Some(RemoteStorageConfig { storage }))
     750         1122 :     }
     751              : }
     752              : 
     753              : // Helper functions to parse a toml Item
     754         2244 : fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
     755         2244 : where
     756         2244 :     I: TryFrom<i64, Error = E>,
     757         2244 :     E: std::error::Error + Send + Sync + 'static,
     758         2244 : {
     759         2244 :     let toml_integer = match item.get(name) {
     760            4 :         Some(item) => item
     761            4 :             .as_integer()
     762            4 :             .with_context(|| format!("configure option {name} is not an integer"))?,
     763         2240 :         None => return Ok(None),
     764              :     };
     765              : 
     766            4 :     I::try_from(toml_integer)
     767            4 :         .map(Some)
     768            4 :         .with_context(|| format!("configure option {name} is too large"))
     769         2244 : }
     770              : 
     771         2117 : fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result<String> {
     772         2117 :     let s = item
     773         2117 :         .as_str()
     774         2117 :         .with_context(|| format!("configure option {name} is not a string"))?;
     775         2117 :     Ok(s.to_string())
     776         2117 : }
     777              : 
     778              : struct ConcurrencyLimiter {
     779              :     // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
     780              :     // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
     781              :     // The helps to ensure we don't exceed the thresholds.
     782              :     write: Arc<Semaphore>,
     783              :     read: Arc<Semaphore>,
     784              : }
     785              : 
     786              : impl ConcurrencyLimiter {
     787        29837 :     fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
     788        29837 :         match kind {
     789        10492 :             RequestKind::Get => &self.read,
     790        16366 :             RequestKind::Put => &self.write,
     791          569 :             RequestKind::List => &self.read,
     792         2388 :             RequestKind::Delete => &self.write,
     793           15 :             RequestKind::Copy => &self.write,
     794            7 :             RequestKind::TimeTravel => &self.write,
     795              :         }
     796        29837 :     }
     797              : 
     798        19352 :     async fn acquire(
     799        19352 :         &self,
     800        19352 :         kind: RequestKind,
     801        19352 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
     802        19006 :         self.for_kind(kind).acquire().await
     803        19006 :     }
     804              : 
     805        10485 :     async fn acquire_owned(
     806        10485 :         &self,
     807        10485 :         kind: RequestKind,
     808        10485 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
     809        10463 :         Arc::clone(self.for_kind(kind)).acquire_owned().await
     810        10463 :     }
     811              : 
     812          300 :     fn new(limit: usize) -> ConcurrencyLimiter {
     813          300 :         Self {
     814          300 :             read: Arc::new(Semaphore::new(limit)),
     815          300 :             write: Arc::new(Semaphore::new(limit)),
     816          300 :         }
     817          300 :     }
     818              : }
     819              : 
     820              : #[cfg(test)]
     821              : mod tests {
     822              :     use super::*;
     823              : 
     824            2 :     #[test]
     825            2 :     fn test_object_name() {
     826            2 :         let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
     827            2 :         assert_eq!(k.object_name(), Some("c"));
     828              : 
     829            2 :         let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
     830            2 :         assert_eq!(k.object_name(), Some("c"));
     831              : 
     832            2 :         let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
     833            2 :         assert_eq!(k.object_name(), Some("a"));
     834              : 
     835              :         // XXX is it impossible to have an empty key?
     836            2 :         let k = RemotePath::new(Utf8Path::new("")).unwrap();
     837            2 :         assert_eq!(k.object_name(), None);
     838            2 :     }
     839              : 
     840            2 :     #[test]
     841            2 :     fn rempte_path_cannot_be_created_from_absolute_ones() {
     842            2 :         let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
     843            2 :         assert_eq!(err.to_string(), "Path \"/\" is not relative");
     844            2 :     }
     845              : }
        

Generated by: LCOV version 2.1-beta