LCOV - code coverage report
Current view: top level - libs/remote_storage/src - lib.rs (source / functions) Coverage Total Hit
Test: 09e7485004805bd42b53a0c369170b3228136512.info Lines: 84.5 % 388 328
Test Date: 2024-11-21 18:36:18 Functions: 50.7 % 138 70

            Line data    Source code
       1              : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
       2              : //! No other modules from this tree are supposed to be used directly by the external code.
       3              : //!
       4              : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
       5              : //!   * [`local_fs`] allows to use local file system as an external storage
       6              : //!   * [`s3_bucket`] uses AWS S3 bucket as an external storage
       7              : //!   * [`azure_blob`] allows to use Azure Blob storage as an external storage
       8              : //!
       9              : #![deny(unsafe_code)]
      10              : #![deny(clippy::undocumented_unsafe_blocks)]
      11              : 
      12              : mod azure_blob;
      13              : mod config;
      14              : mod error;
      15              : mod local_fs;
      16              : mod metrics;
      17              : mod s3_bucket;
      18              : mod simulate_failures;
      19              : mod support;
      20              : 
      21              : use std::{
      22              :     collections::HashMap,
      23              :     fmt::Debug,
      24              :     num::NonZeroU32,
      25              :     ops::Bound,
      26              :     pin::{pin, Pin},
      27              :     sync::Arc,
      28              :     time::SystemTime,
      29              : };
      30              : 
      31              : use anyhow::Context;
      32              : use camino::{Utf8Path, Utf8PathBuf};
      33              : 
      34              : use bytes::Bytes;
      35              : use futures::{stream::Stream, StreamExt};
      36              : use itertools::Itertools as _;
      37              : use serde::{Deserialize, Serialize};
      38              : use tokio::sync::Semaphore;
      39              : use tokio_util::sync::CancellationToken;
      40              : use tracing::info;
      41              : 
      42              : pub use self::{
      43              :     azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket,
      44              :     simulate_failures::UnreliableWrapper,
      45              : };
      46              : use s3_bucket::RequestKind;
      47              : 
      48              : pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
      49              : 
      50              : /// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
      51              : pub use azure_core::Etag;
      52              : 
      53              : pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
      54              : 
      55              : /// Default concurrency limit for S3 operations
      56              : ///
      57              : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
      58              : /// ~200 RPS for IAM services
      59              : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
      60              : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
      61              : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
      62              : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
      63              : /// Set this limit analogously to the S3 limit
      64              : ///
      65              : /// Here, a limit of max 20k concurrent connections was noted.
      66              : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
      67              : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
      68              : /// No limits on the client side, which currenltly means 1000 for AWS S3.
      69              : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
      70              : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
      71              : 
      72              : /// As defined in S3 docs
      73              : pub const MAX_KEYS_PER_DELETE: usize = 1000;
      74              : 
      75              : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
      76              : 
      77              : /// Path on the remote storage, relative to some inner prefix.
      78              : /// The prefix is an implementation detail, that allows representing local paths
      79              : /// as the remote ones, stripping the local storage prefix away.
      80              : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
      81              : pub struct RemotePath(Utf8PathBuf);
      82              : 
      83              : impl Serialize for RemotePath {
      84            0 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
      85            0 :     where
      86            0 :         S: serde::Serializer,
      87            0 :     {
      88            0 :         serializer.collect_str(self)
      89            0 :     }
      90              : }
      91              : 
      92              : impl<'de> Deserialize<'de> for RemotePath {
      93            0 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
      94            0 :     where
      95            0 :         D: serde::Deserializer<'de>,
      96            0 :     {
      97            0 :         let str = String::deserialize(deserializer)?;
      98            0 :         Ok(Self(Utf8PathBuf::from(&str)))
      99            0 :     }
     100              : }
     101              : 
     102              : impl std::fmt::Display for RemotePath {
     103          272 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     104          272 :         std::fmt::Display::fmt(&self.0, f)
     105          272 :     }
     106              : }
     107              : 
     108              : impl RemotePath {
     109         4519 :     pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
     110         4519 :         anyhow::ensure!(
     111         4519 :             relative_path.is_relative(),
     112            4 :             "Path {relative_path:?} is not relative"
     113              :         );
     114         4515 :         Ok(Self(relative_path.to_path_buf()))
     115         4519 :     }
     116              : 
     117         4067 :     pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
     118         4067 :         Self::new(Utf8Path::new(relative_path))
     119         4067 :     }
     120              : 
     121         4100 :     pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
     122         4100 :         base_path.join(&self.0)
     123         4100 :     }
     124              : 
     125           18 :     pub fn object_name(&self) -> Option<&str> {
     126           18 :         self.0.file_name()
     127           18 :     }
     128              : 
     129          102 :     pub fn join(&self, path: impl AsRef<Utf8Path>) -> Self {
     130          102 :         Self(self.0.join(path))
     131          102 :     }
     132              : 
     133         1022 :     pub fn get_path(&self) -> &Utf8PathBuf {
     134         1022 :         &self.0
     135         1022 :     }
     136              : 
     137           67 :     pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
     138           67 :         self.0.strip_prefix(&p.0)
     139           67 :     }
     140              : 
     141          198 :     pub fn add_trailing_slash(&self) -> Self {
     142          198 :         // Unwrap safety inputs are guararnteed to be valid UTF-8
     143          198 :         Self(format!("{}/", self.0).try_into().unwrap())
     144          198 :     }
     145              : }
     146              : 
     147              : /// We don't need callers to be able to pass arbitrary delimiters: just control
     148              : /// whether listings will use a '/' separator or not.
     149              : ///
     150              : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result.  The
     151              : /// NoDelimiter mode will only populate `keys`.
     152              : #[derive(Copy, Clone)]
     153              : pub enum ListingMode {
     154              :     WithDelimiter,
     155              :     NoDelimiter,
     156              : }
     157              : 
     158              : #[derive(PartialEq, Eq, Debug, Clone)]
     159              : pub struct ListingObject {
     160              :     pub key: RemotePath,
     161              :     pub last_modified: SystemTime,
     162              :     pub size: u64,
     163              : }
     164              : 
     165              : #[derive(Default)]
     166              : pub struct Listing {
     167              :     pub prefixes: Vec<RemotePath>,
     168              :     pub keys: Vec<ListingObject>,
     169              : }
     170              : 
     171              : /// Options for downloads. The default value is a plain GET.
     172              : pub struct DownloadOpts {
     173              :     /// If given, returns [`DownloadError::Unmodified`] if the object still has
     174              :     /// the same ETag (using If-None-Match).
     175              :     pub etag: Option<Etag>,
     176              :     /// The start of the byte range to download, or unbounded.
     177              :     pub byte_start: Bound<u64>,
     178              :     /// The end of the byte range to download, or unbounded. Must be after the
     179              :     /// start bound.
     180              :     pub byte_end: Bound<u64>,
     181              : }
     182              : 
     183              : impl Default for DownloadOpts {
     184          730 :     fn default() -> Self {
     185          730 :         Self {
     186          730 :             etag: Default::default(),
     187          730 :             byte_start: Bound::Unbounded,
     188          730 :             byte_end: Bound::Unbounded,
     189          730 :         }
     190          730 :     }
     191              : }
     192              : 
     193              : impl DownloadOpts {
     194              :     /// Returns the byte range with inclusive start and exclusive end, or None
     195              :     /// if unbounded.
     196          180 :     pub fn byte_range(&self) -> Option<(u64, Option<u64>)> {
     197          180 :         if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded {
     198           84 :             return None;
     199           96 :         }
     200           96 :         let start = match self.byte_start {
     201           18 :             Bound::Excluded(i) => i + 1,
     202           60 :             Bound::Included(i) => i,
     203           18 :             Bound::Unbounded => 0,
     204              :         };
     205           96 :         let end = match self.byte_end {
     206           48 :             Bound::Excluded(i) => Some(i),
     207           27 :             Bound::Included(i) => Some(i + 1),
     208           21 :             Bound::Unbounded => None,
     209              :         };
     210           96 :         if let Some(end) = end {
     211           75 :             assert!(start < end, "range end {end} at or before start {start}");
     212           21 :         }
     213           87 :         Some((start, end))
     214          171 :     }
     215              : 
     216              :     /// Returns the byte range as an RFC 2616 Range header value with inclusive
     217              :     /// bounds, or None if unbounded.
     218           66 :     pub fn byte_range_header(&self) -> Option<String> {
     219           66 :         self.byte_range()
     220           66 :             .map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive
     221           66 :             .map(|(start, end)| match end {
     222           30 :                 Some(end) => format!("bytes={start}-{end}"),
     223           10 :                 None => format!("bytes={start}-"),
     224           66 :             })
     225           66 :     }
     226              : }
     227              : 
     228              : /// Storage (potentially remote) API to manage its state.
     229              : /// This storage tries to be unaware of any layered repository context,
     230              : /// providing basic CRUD operations for storage files.
     231              : #[allow(async_fn_in_trait)]
     232              : pub trait RemoteStorage: Send + Sync + 'static {
     233              :     /// List objects in remote storage, with semantics matching AWS S3's [`ListObjectsV2`].
     234              :     ///
     235              :     /// The stream is guaranteed to return at least one element, even in the case of errors
     236              :     /// (in that case it's an `Err()`), or an empty `Listing`.
     237              :     ///
     238              :     /// The stream is not ending if it returns an error, as long as [`is_permanent`] returns false on the error.
     239              :     /// The `next` function can be retried, and maybe in a future retry, there will be success.
     240              :     ///
     241              :     /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not
     242              :     /// from the absolute root of the bucket.
     243              :     ///
     244              :     /// `mode` configures whether to use a delimiter.  Without a delimiter, all keys
     245              :     /// within the prefix are listed in the `keys` of the result.  With a delimiter, any "directories" at the top level of
     246              :     /// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are
     247              :     /// returned in `keys` ().
     248              :     ///
     249              :     /// `max_keys` controls the maximum number of keys that will be returned.  If this is None, this function
     250              :     /// will iteratively call listobjects until it runs out of keys.  Note that this is not safe to use on
     251              :     /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure.
     252              :     ///
     253              :     /// [`ListObjectsV2`]: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
     254              :     /// [`is_permanent`]: DownloadError::is_permanent
     255              :     fn list_streaming(
     256              :         &self,
     257              :         prefix: Option<&RemotePath>,
     258              :         mode: ListingMode,
     259              :         max_keys: Option<NonZeroU32>,
     260              :         cancel: &CancellationToken,
     261              :     ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;
     262              : 
     263           69 :     async fn list(
     264           69 :         &self,
     265           69 :         prefix: Option<&RemotePath>,
     266           69 :         mode: ListingMode,
     267           69 :         max_keys: Option<NonZeroU32>,
     268           69 :         cancel: &CancellationToken,
     269           69 :     ) -> Result<Listing, DownloadError> {
     270           69 :         let mut stream = pin!(self.list_streaming(prefix, mode, max_keys, cancel));
     271          277 :         let mut combined = stream.next().await.expect("At least one item required")?;
     272          177 :         while let Some(list) = stream.next().await {
     273           48 :             let list = list?;
     274           48 :             combined.keys.extend(list.keys.into_iter());
     275           48 :             combined.prefixes.extend_from_slice(&list.prefixes);
     276              :         }
     277           69 :         Ok(combined)
     278           69 :     }
     279              : 
     280              :     /// Obtain metadata information about an object.
     281              :     async fn head_object(
     282              :         &self,
     283              :         key: &RemotePath,
     284              :         cancel: &CancellationToken,
     285              :     ) -> Result<ListingObject, DownloadError>;
     286              : 
     287              :     /// Streams the local file contents into remote into the remote storage entry.
     288              :     ///
     289              :     /// If the operation fails because of timeout or cancellation, the root cause of the error will be
     290              :     /// set to `TimeoutOrCancel`.
     291              :     async fn upload(
     292              :         &self,
     293              :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     294              :         // S3 PUT request requires the content length to be specified,
     295              :         // otherwise it starts to fail with the concurrent connection count increasing.
     296              :         data_size_bytes: usize,
     297              :         to: &RemotePath,
     298              :         metadata: Option<StorageMetadata>,
     299              :         cancel: &CancellationToken,
     300              :     ) -> anyhow::Result<()>;
     301              : 
     302              :     /// Streams the remote storage entry contents.
     303              :     ///
     304              :     /// The returned download stream will obey initial timeout and cancellation signal by erroring
     305              :     /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
     306              :     /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
     307              :     ///
     308              :     /// Returns the metadata, if any was stored with the file previously.
     309              :     async fn download(
     310              :         &self,
     311              :         from: &RemotePath,
     312              :         opts: &DownloadOpts,
     313              :         cancel: &CancellationToken,
     314              :     ) -> Result<Download, DownloadError>;
     315              : 
     316              :     /// Delete a single path from remote storage.
     317              :     ///
     318              :     /// If the operation fails because of timeout or cancellation, the root cause of the error will be
     319              :     /// set to `TimeoutOrCancel`. In such situation it is unknown if the deletion went through.
     320              :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()>;
     321              : 
     322              :     /// Delete a multiple paths from remote storage.
     323              :     ///
     324              :     /// If the operation fails because of timeout or cancellation, the root cause of the error will be
     325              :     /// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
     326              :     /// through.
     327              :     async fn delete_objects<'a>(
     328              :         &self,
     329              :         paths: &'a [RemotePath],
     330              :         cancel: &CancellationToken,
     331              :     ) -> anyhow::Result<()>;
     332              : 
     333              :     /// Deletes all objects matching the given prefix.
     334              :     ///
     335              :     /// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
     336              :     /// delete /a/b, /a/b/*, /a/bc, /a/bc/*, etc.
     337              :     ///
     338              :     /// If the operation fails because of timeout or cancellation, the root cause of the error will
     339              :     /// be set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
     340              :     /// through.
     341           18 :     async fn delete_prefix(
     342           18 :         &self,
     343           18 :         prefix: &RemotePath,
     344           18 :         cancel: &CancellationToken,
     345           18 :     ) -> anyhow::Result<()> {
     346           18 :         let mut stream =
     347           18 :             pin!(self.list_streaming(Some(prefix), ListingMode::NoDelimiter, None, cancel));
     348           74 :         while let Some(result) = stream.next().await {
     349           21 :             let keys = match result {
     350           21 :                 Ok(listing) if listing.keys.is_empty() => continue,
     351           63 :                 Ok(listing) => listing.keys.into_iter().map(|o| o.key).collect_vec(),
     352            0 :                 Err(DownloadError::Cancelled) => return Err(TimeoutOrCancel::Cancel.into()),
     353            0 :                 Err(DownloadError::Timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     354            0 :                 Err(err) => return Err(err.into()),
     355              :             };
     356           12 :             tracing::info!("Deleting {} keys from remote storage", keys.len());
     357          122 :             self.delete_objects(&keys, cancel).await?;
     358              :         }
     359           18 :         Ok(())
     360           18 :     }
     361              : 
     362              :     /// Copy a remote object inside a bucket from one path to another.
     363              :     async fn copy(
     364              :         &self,
     365              :         from: &RemotePath,
     366              :         to: &RemotePath,
     367              :         cancel: &CancellationToken,
     368              :     ) -> anyhow::Result<()>;
     369              : 
     370              :     /// Resets the content of everything with the given prefix to the given state
     371              :     async fn time_travel_recover(
     372              :         &self,
     373              :         prefix: Option<&RemotePath>,
     374              :         timestamp: SystemTime,
     375              :         done_if_after: SystemTime,
     376              :         cancel: &CancellationToken,
     377              :     ) -> Result<(), TimeTravelError>;
     378              : }
     379              : 
     380              : /// Data part of an ongoing [`Download`].
     381              : ///
     382              : /// `DownloadStream` is sensitive to the timeout and cancellation used with the original
     383              : /// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
     384              : /// with `tokio::io::copy_buf`.
     385              : // This has 'static because safekeepers do not use cancellation tokens (yet)
     386              : pub type DownloadStream =
     387              :     Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>>;
     388              : 
     389              : pub struct Download {
     390              :     pub download_stream: DownloadStream,
     391              :     /// The last time the file was modified (`last-modified` HTTP header)
     392              :     pub last_modified: SystemTime,
     393              :     /// A way to identify this specific version of the resource (`etag` HTTP header)
     394              :     pub etag: Etag,
     395              :     /// Extra key-value data, associated with the current remote file.
     396              :     pub metadata: Option<StorageMetadata>,
     397              : }
     398              : 
     399              : impl Debug for Download {
     400            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     401            0 :         f.debug_struct("Download")
     402            0 :             .field("metadata", &self.metadata)
     403            0 :             .finish()
     404            0 :     }
     405              : }
     406              : 
     407              : /// Every storage, currently supported.
     408              : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
     409              : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
     410              : #[derive(Clone)]
     411              : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
     412              :     LocalFs(LocalFs),
     413              :     AwsS3(Arc<S3Bucket>),
     414              :     AzureBlob(Arc<AzureBlobStorage>),
     415              :     Unreliable(Other),
     416              : }
     417              : 
     418              : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
     419              :     // See [`RemoteStorage::list`].
     420          459 :     pub async fn list(
     421          459 :         &self,
     422          459 :         prefix: Option<&RemotePath>,
     423          459 :         mode: ListingMode,
     424          459 :         max_keys: Option<NonZeroU32>,
     425          459 :         cancel: &CancellationToken,
     426          459 :     ) -> Result<Listing, DownloadError> {
     427          459 :         match self {
     428         1365 :             Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await,
     429          228 :             Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
     430          226 :             Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
     431            0 :             Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
     432              :         }
     433          459 :     }
     434              : 
     435              :     // See [`RemoteStorage::list_streaming`].
     436            3 :     pub fn list_streaming<'a>(
     437            3 :         &'a self,
     438            3 :         prefix: Option<&'a RemotePath>,
     439            3 :         mode: ListingMode,
     440            3 :         max_keys: Option<NonZeroU32>,
     441            3 :         cancel: &'a CancellationToken,
     442            3 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
     443            3 :         match self {
     444            0 :             Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
     445            0 :                 as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
     446            2 :             Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
     447            1 :             Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
     448            0 :             Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
     449              :         }
     450            3 :     }
     451              : 
     452              :     // See [`RemoteStorage::head_object`].
     453            9 :     pub async fn head_object(
     454            9 :         &self,
     455            9 :         key: &RemotePath,
     456            9 :         cancel: &CancellationToken,
     457            9 :     ) -> Result<ListingObject, DownloadError> {
     458            9 :         match self {
     459            0 :             Self::LocalFs(s) => s.head_object(key, cancel).await,
     460           19 :             Self::AwsS3(s) => s.head_object(key, cancel).await,
     461           16 :             Self::AzureBlob(s) => s.head_object(key, cancel).await,
     462            0 :             Self::Unreliable(s) => s.head_object(key, cancel).await,
     463              :         }
     464            9 :     }
     465              : 
     466              :     /// See [`RemoteStorage::upload`]
     467         3080 :     pub async fn upload(
     468         3080 :         &self,
     469         3080 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     470         3080 :         data_size_bytes: usize,
     471         3080 :         to: &RemotePath,
     472         3080 :         metadata: Option<StorageMetadata>,
     473         3080 :         cancel: &CancellationToken,
     474         3080 :     ) -> anyhow::Result<()> {
     475         3080 :         match self {
     476        39072 :             Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
     477         1007 :             Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
     478          557 :             Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
     479           38 :             Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
     480              :         }
     481         3039 :     }
     482              : 
     483              :     /// See [`RemoteStorage::download`]
     484          662 :     pub async fn download(
     485          662 :         &self,
     486          662 :         from: &RemotePath,
     487          662 :         opts: &DownloadOpts,
     488          662 :         cancel: &CancellationToken,
     489          662 :     ) -> Result<Download, DownloadError> {
     490          662 :         match self {
     491          633 :             Self::LocalFs(s) => s.download(from, opts, cancel).await,
     492           38 :             Self::AwsS3(s) => s.download(from, opts, cancel).await,
     493           56 :             Self::AzureBlob(s) => s.download(from, opts, cancel).await,
     494            0 :             Self::Unreliable(s) => s.download(from, opts, cancel).await,
     495              :         }
     496          662 :     }
     497              : 
     498              :     /// See [`RemoteStorage::delete`]
     499          262 :     pub async fn delete(
     500          262 :         &self,
     501          262 :         path: &RemotePath,
     502          262 :         cancel: &CancellationToken,
     503          262 :     ) -> anyhow::Result<()> {
     504          262 :         match self {
     505            2 :             Self::LocalFs(s) => s.delete(path, cancel).await,
     506          582 :             Self::AwsS3(s) => s.delete(path, cancel).await,
     507          431 :             Self::AzureBlob(s) => s.delete(path, cancel).await,
     508            0 :             Self::Unreliable(s) => s.delete(path, cancel).await,
     509              :         }
     510          262 :     }
     511              : 
     512              :     /// See [`RemoteStorage::delete_objects`]
     513           21 :     pub async fn delete_objects(
     514           21 :         &self,
     515           21 :         paths: &[RemotePath],
     516           21 :         cancel: &CancellationToken,
     517           21 :     ) -> anyhow::Result<()> {
     518           21 :         match self {
     519            6 :             Self::LocalFs(s) => s.delete_objects(paths, cancel).await,
     520           54 :             Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
     521           25 :             Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
     522            0 :             Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
     523              :         }
     524           21 :     }
     525              : 
     526              :     /// See [`RemoteStorage::delete_prefix`]
     527           18 :     pub async fn delete_prefix(
     528           18 :         &self,
     529           18 :         prefix: &RemotePath,
     530           18 :         cancel: &CancellationToken,
     531           18 :     ) -> anyhow::Result<()> {
     532           18 :         match self {
     533            0 :             Self::LocalFs(s) => s.delete_prefix(prefix, cancel).await,
     534           48 :             Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
     535          148 :             Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
     536            0 :             Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
     537              :         }
     538           18 :     }
     539              : 
     540              :     /// See [`RemoteStorage::copy`]
     541            3 :     pub async fn copy_object(
     542            3 :         &self,
     543            3 :         from: &RemotePath,
     544            3 :         to: &RemotePath,
     545            3 :         cancel: &CancellationToken,
     546            3 :     ) -> anyhow::Result<()> {
     547            3 :         match self {
     548            0 :             Self::LocalFs(s) => s.copy(from, to, cancel).await,
     549            3 :             Self::AwsS3(s) => s.copy(from, to, cancel).await,
     550            5 :             Self::AzureBlob(s) => s.copy(from, to, cancel).await,
     551            0 :             Self::Unreliable(s) => s.copy(from, to, cancel).await,
     552              :         }
     553            3 :     }
     554              : 
     555              :     /// See [`RemoteStorage::time_travel_recover`].
     556            6 :     pub async fn time_travel_recover(
     557            6 :         &self,
     558            6 :         prefix: Option<&RemotePath>,
     559            6 :         timestamp: SystemTime,
     560            6 :         done_if_after: SystemTime,
     561            6 :         cancel: &CancellationToken,
     562            6 :     ) -> Result<(), TimeTravelError> {
     563            6 :         match self {
     564            0 :             Self::LocalFs(s) => {
     565            0 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
     566            0 :                     .await
     567              :             }
     568            6 :             Self::AwsS3(s) => {
     569            6 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
     570           72 :                     .await
     571              :             }
     572            0 :             Self::AzureBlob(s) => {
     573            0 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
     574            0 :                     .await
     575              :             }
     576            0 :             Self::Unreliable(s) => {
     577            0 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
     578            0 :                     .await
     579              :             }
     580              :         }
     581            6 :     }
     582              : }
     583              : 
     584              : impl GenericRemoteStorage {
     585          238 :     pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
     586          238 :         let timeout = storage_config.timeout;
     587          238 :         Ok(match &storage_config.storage {
     588          202 :             RemoteStorageKind::LocalFs { local_path: path } => {
     589          202 :                 info!("Using fs root '{path}' as a remote storage");
     590          202 :                 Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
     591              :             }
     592           26 :             RemoteStorageKind::AwsS3(s3_config) => {
     593           26 :                 // The profile and access key id are only printed here for debugging purposes,
     594           26 :                 // their values don't indicate the eventually taken choice for auth.
     595           26 :                 let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
     596           26 :                 let access_key_id =
     597           26 :                     std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
     598           26 :                 info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
     599              :                       s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
     600           26 :                 Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?))
     601              :             }
     602           10 :             RemoteStorageKind::AzureContainer(azure_config) => {
     603           10 :                 let storage_account = azure_config
     604           10 :                     .storage_account
     605           10 :                     .as_deref()
     606           10 :                     .unwrap_or("<AZURE_STORAGE_ACCOUNT>");
     607           10 :                 info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
     608              :                       azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
     609           10 :                 Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?))
     610              :             }
     611              :         })
     612          238 :     }
     613              : 
     614            2 :     pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
     615            2 :         Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
     616            2 :     }
     617              : 
     618              :     /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
     619         1414 :     pub async fn upload_storage_object(
     620         1414 :         &self,
     621         1414 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     622         1414 :         from_size_bytes: usize,
     623         1414 :         to: &RemotePath,
     624         1414 :         cancel: &CancellationToken,
     625         1414 :     ) -> anyhow::Result<()> {
     626         1414 :         self.upload(from, from_size_bytes, to, None, cancel)
     627         4021 :             .await
     628         1411 :             .with_context(|| {
     629            0 :                 format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
     630         1411 :             })
     631         1411 :     }
     632              : 
     633              :     /// The name of the bucket/container/etc.
     634            0 :     pub fn bucket_name(&self) -> Option<&str> {
     635            0 :         match self {
     636            0 :             Self::LocalFs(_s) => None,
     637            0 :             Self::AwsS3(s) => Some(s.bucket_name()),
     638            0 :             Self::AzureBlob(s) => Some(s.container_name()),
     639            0 :             Self::Unreliable(_s) => None,
     640              :         }
     641            0 :     }
     642              : }
     643              : 
     644              : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
     645              : /// Immutable, cannot be changed once the file is created.
     646              : #[derive(Debug, Clone, PartialEq, Eq)]
     647              : pub struct StorageMetadata(HashMap<String, String>);
     648              : 
     649              : impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
     650            0 :     fn from(arr: [(&str, &str); N]) -> Self {
     651            0 :         let map: HashMap<String, String> = arr
     652            0 :             .iter()
     653            0 :             .map(|(k, v)| (k.to_string(), v.to_string()))
     654            0 :             .collect();
     655            0 :         Self(map)
     656            0 :     }
     657              : }
     658              : 
     659              : struct ConcurrencyLimiter {
     660              :     // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
     661              :     // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
     662              :     // The helps to ensure we don't exceed the thresholds.
     663              :     write: Arc<Semaphore>,
     664              :     read: Arc<Semaphore>,
     665              : }
     666              : 
     667              : impl ConcurrencyLimiter {
     668          731 :     fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
     669          731 :         match kind {
     670           44 :             RequestKind::Get => &self.read,
     671          292 :             RequestKind::Put => &self.write,
     672           90 :             RequestKind::List => &self.read,
     673          287 :             RequestKind::Delete => &self.write,
     674            3 :             RequestKind::Copy => &self.write,
     675            6 :             RequestKind::TimeTravel => &self.write,
     676            9 :             RequestKind::Head => &self.read,
     677              :         }
     678          731 :     }
     679              : 
     680          698 :     async fn acquire(
     681          698 :         &self,
     682          698 :         kind: RequestKind,
     683          698 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
     684          698 :         self.for_kind(kind).acquire().await
     685          698 :     }
     686              : 
     687           33 :     async fn acquire_owned(
     688           33 :         &self,
     689           33 :         kind: RequestKind,
     690           33 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
     691           33 :         Arc::clone(self.for_kind(kind)).acquire_owned().await
     692           33 :     }
     693              : 
     694           51 :     fn new(limit: usize) -> ConcurrencyLimiter {
     695           51 :         Self {
     696           51 :             read: Arc::new(Semaphore::new(limit)),
     697           51 :             write: Arc::new(Semaphore::new(limit)),
     698           51 :         }
     699           51 :     }
     700              : }
     701              : 
     702              : #[cfg(test)]
     703              : mod tests {
     704              :     use super::*;
     705              : 
     706              :     /// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges
     707              :     /// with optional end bound, or None when unbounded.
     708              :     #[test]
     709            3 :     fn download_opts_byte_range() {
     710            3 :         // Consider using test_case or a similar table-driven test framework.
     711            3 :         let cases = [
     712            3 :             // (byte_start, byte_end, expected)
     713            3 :             (Bound::Unbounded, Bound::Unbounded, None),
     714            3 :             (Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))),
     715            3 :             (Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))),
     716            3 :             (Bound::Included(3), Bound::Unbounded, Some((3, None))),
     717            3 :             (Bound::Included(3), Bound::Included(7), Some((3, Some(8)))),
     718            3 :             (Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))),
     719            3 :             (Bound::Excluded(3), Bound::Unbounded, Some((4, None))),
     720            3 :             (Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))),
     721            3 :             (Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))),
     722            3 :             // 1-sized ranges are fine, 0 aren't and will panic (separate test).
     723            3 :             (Bound::Included(3), Bound::Included(3), Some((3, Some(4)))),
     724            3 :             (Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))),
     725            3 :         ];
     726              : 
     727           36 :         for (byte_start, byte_end, expect) in cases {
     728           33 :             let opts = DownloadOpts {
     729           33 :                 byte_start,
     730           33 :                 byte_end,
     731           33 :                 ..Default::default()
     732           33 :             };
     733           33 :             let result = opts.byte_range();
     734           33 :             assert_eq!(
     735              :                 result, expect,
     736            0 :                 "byte_start={byte_start:?} byte_end={byte_end:?}"
     737              :             );
     738              : 
     739              :             // Check generated HTTP header, which uses an inclusive range.
     740           33 :             let expect_header = expect.map(|(start, end)| match end {
     741           24 :                 Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end
     742            6 :                 None => format!("bytes={start}-"),
     743           33 :             });
     744           33 :             assert_eq!(
     745           33 :                 opts.byte_range_header(),
     746              :                 expect_header,
     747            0 :                 "byte_start={byte_start:?} byte_end={byte_end:?}"
     748              :             );
     749              :         }
     750            3 :     }
     751              : 
     752              :     /// DownloadOpts::byte_range() zero-sized byte range should panic.
     753              :     #[test]
     754              :     #[should_panic]
     755            3 :     fn download_opts_byte_range_zero() {
     756            3 :         DownloadOpts {
     757            3 :             byte_start: Bound::Included(3),
     758            3 :             byte_end: Bound::Excluded(3),
     759            3 :             ..Default::default()
     760            3 :         }
     761            3 :         .byte_range();
     762            3 :     }
     763              : 
     764              :     /// DownloadOpts::byte_range() negative byte range should panic.
     765              :     #[test]
     766              :     #[should_panic]
     767            3 :     fn download_opts_byte_range_negative() {
     768            3 :         DownloadOpts {
     769            3 :             byte_start: Bound::Included(3),
     770            3 :             byte_end: Bound::Included(2),
     771            3 :             ..Default::default()
     772            3 :         }
     773            3 :         .byte_range();
     774            3 :     }
     775              : 
     776              :     #[test]
     777            3 :     fn test_object_name() {
     778            3 :         let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
     779            3 :         assert_eq!(k.object_name(), Some("c"));
     780              : 
     781            3 :         let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
     782            3 :         assert_eq!(k.object_name(), Some("c"));
     783              : 
     784            3 :         let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
     785            3 :         assert_eq!(k.object_name(), Some("a"));
     786              : 
     787              :         // XXX is it impossible to have an empty key?
     788            3 :         let k = RemotePath::new(Utf8Path::new("")).unwrap();
     789            3 :         assert_eq!(k.object_name(), None);
     790            3 :     }
     791              : 
     792              :     #[test]
     793            3 :     fn rempte_path_cannot_be_created_from_absolute_ones() {
     794            3 :         let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
     795            3 :         assert_eq!(err.to_string(), "Path \"/\" is not relative");
     796            3 :     }
     797              : }
        

Generated by: LCOV version 2.1-beta