LCOV - code coverage report
Current view: top level - libs/remote_storage/src - lib.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 78.4 % 430 337
Test Date: 2025-07-16 12:29:03 Functions: 57.7 % 130 75

            Line data    Source code
       1              : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
       2              : //! No other modules from this tree are supposed to be used directly by the external code.
       3              : //!
       4              : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
       5              : //!   * [`local_fs`] allows to use local file system as an external storage
       6              : //!   * [`s3_bucket`] uses AWS S3 bucket as an external storage
       7              : //!   * [`azure_blob`] allows to use Azure Blob storage as an external storage
       8              : //!
       9              : #![deny(unsafe_code)]
      10              : #![deny(clippy::undocumented_unsafe_blocks)]
      11              : 
      12              : mod azure_blob;
      13              : mod config;
      14              : mod error;
      15              : mod local_fs;
      16              : mod metrics;
      17              : mod s3_bucket;
      18              : mod simulate_failures;
      19              : mod support;
      20              : 
      21              : use std::collections::HashMap;
      22              : use std::fmt::Debug;
      23              : use std::num::NonZeroU32;
      24              : use std::ops::Bound;
      25              : use std::pin::{Pin, pin};
      26              : use std::sync::Arc;
      27              : use std::time::SystemTime;
      28              : 
      29              : use anyhow::Context;
      30              : /// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
      31              : pub use azure_core::Etag;
      32              : use bytes::Bytes;
      33              : use camino::{Utf8Path, Utf8PathBuf};
      34              : pub use config::TypedRemoteStorageKind;
      35              : pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
      36              : use futures::StreamExt;
      37              : use futures::stream::Stream;
      38              : use itertools::Itertools as _;
      39              : use s3_bucket::RequestKind;
      40              : use serde::{Deserialize, Serialize};
      41              : use tokio::sync::Semaphore;
      42              : use tokio_util::sync::CancellationToken;
      43              : use tracing::info;
      44              : 
      45              : pub use self::azure_blob::AzureBlobStorage;
      46              : pub use self::local_fs::LocalFs;
      47              : pub use self::s3_bucket::S3Bucket;
      48              : pub use self::simulate_failures::UnreliableWrapper;
      49              : pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
      50              : 
      51              : /// Default concurrency limit for S3 operations
      52              : ///
      53              : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
      54              : /// ~200 RPS for IAM services
      55              : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
      56              : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
      57              : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
      58              : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
      59              : /// Set this limit analogously to the S3 limit
      60              : ///
      61              : /// Here, a limit of max 20k concurrent connections was noted.
      62              : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
      63              : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
      64              : /// Set this limit analogously to the S3 limit.
      65              : ///
      66              : /// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
      67              : /// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
      68              : /// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
      69              : pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
      70              : /// No limits on the client side, which currenltly means 1000 for AWS S3.
      71              : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
      72              : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
      73              : 
      74              : /// As defined in S3 docs
      75              : ///
      76              : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
      77              : pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
      78              : 
      79              : /// As defined in Azure docs
      80              : ///
      81              : /// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
      82              : pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
      83              : 
      84              : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
      85              : 
      86              : /// Path on the remote storage, relative to some inner prefix.
      87              : /// The prefix is an implementation detail, that allows representing local paths
      88              : /// as the remote ones, stripping the local storage prefix away.
      89              : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
      90              : pub struct RemotePath(Utf8PathBuf);
      91              : 
      92              : impl Serialize for RemotePath {
      93            0 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
      94            0 :     where
      95            0 :         S: serde::Serializer,
      96              :     {
      97            0 :         serializer.collect_str(self)
      98            0 :     }
      99              : }
     100              : 
     101              : impl<'de> Deserialize<'de> for RemotePath {
     102            0 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     103            0 :     where
     104            0 :         D: serde::Deserializer<'de>,
     105              :     {
     106            0 :         let str = String::deserialize(deserializer)?;
     107            0 :         Ok(Self(Utf8PathBuf::from(&str)))
     108            0 :     }
     109              : }
     110              : 
     111              : impl std::fmt::Display for RemotePath {
     112          531 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     113          531 :         std::fmt::Display::fmt(&self.0, f)
     114          531 :     }
     115              : }
     116              : 
     117              : impl RemotePath {
     118         3288 :     pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
     119         3288 :         anyhow::ensure!(
     120         3288 :             relative_path.is_relative(),
     121            4 :             "Path {relative_path:?} is not relative"
     122              :         );
     123         3284 :         Ok(Self(relative_path.to_path_buf()))
     124         3288 :     }
     125              : 
     126         2786 :     pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
     127         2786 :         Self::new(Utf8Path::new(relative_path))
     128         2786 :     }
     129              : 
     130         3068 :     pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
     131         3068 :         base_path.join(&self.0)
     132         3068 :     }
     133              : 
     134           15 :     pub fn object_name(&self) -> Option<&str> {
     135           15 :         self.0.file_name()
     136           15 :     }
     137              : 
     138           84 :     pub fn join(&self, path: impl AsRef<Utf8Path>) -> Self {
     139           84 :         Self(self.0.join(path))
     140            0 :     }
     141              : 
     142         1017 :     pub fn get_path(&self) -> &Utf8PathBuf {
     143         1017 :         &self.0
     144         1017 :     }
     145              : 
     146           50 :     pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
     147           50 :         self.0.strip_prefix(&p.0)
     148           50 :     }
     149              : 
     150          125 :     pub fn add_trailing_slash(&self) -> Self {
     151              :         // Unwrap safety inputs are guararnteed to be valid UTF-8
     152          125 :         Self(format!("{}/", self.0).try_into().unwrap())
     153          125 :     }
     154              : }
     155              : 
     156              : /// We don't need callers to be able to pass arbitrary delimiters: just control
     157              : /// whether listings will use a '/' separator or not.
     158              : ///
     159              : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result.  The
     160              : /// NoDelimiter mode will only populate `keys`.
     161              : #[derive(Copy, Clone)]
     162              : pub enum ListingMode {
     163              :     WithDelimiter,
     164              :     NoDelimiter,
     165              : }
     166              : 
     167              : #[derive(PartialEq, Eq, Debug, Clone)]
     168              : pub struct ListingObject {
     169              :     pub key: RemotePath,
     170              :     pub last_modified: SystemTime,
     171              :     pub size: u64,
     172              : }
     173              : 
     174              : #[derive(Default)]
     175              : pub struct Listing {
     176              :     pub prefixes: Vec<RemotePath>,
     177              :     pub keys: Vec<ListingObject>,
     178              : }
     179              : 
     180              : #[derive(Default)]
     181              : pub struct VersionListing {
     182              :     pub versions: Vec<Version>,
     183              : }
     184              : 
     185              : pub struct Version {
     186              :     pub key: RemotePath,
     187              :     pub last_modified: SystemTime,
     188              :     pub kind: VersionKind,
     189              : }
     190              : 
     191              : impl Version {
     192           36 :     pub fn version_id(&self) -> Option<&VersionId> {
     193           36 :         match &self.kind {
     194           28 :             VersionKind::Version(id) => Some(id),
     195            8 :             VersionKind::DeletionMarker => None,
     196              :         }
     197           36 :     }
     198              : }
     199              : 
     200              : #[derive(Debug)]
     201              : pub enum VersionKind {
     202              :     DeletionMarker,
     203              :     Version(VersionId),
     204              : }
     205              : 
     206              : /// Options for downloads. The default value is a plain GET.
     207              : pub struct DownloadOpts {
     208              :     /// If given, returns [`DownloadError::Unmodified`] if the object still has
     209              :     /// the same ETag (using If-None-Match).
     210              :     pub etag: Option<Etag>,
     211              :     /// The start of the byte range to download, or unbounded.
     212              :     pub byte_start: Bound<u64>,
     213              :     /// The end of the byte range to download, or unbounded. Must be after the
     214              :     /// start bound.
     215              :     pub byte_end: Bound<u64>,
     216              :     /// Optionally request a specific version of a key
     217              :     pub version_id: Option<VersionId>,
     218              :     /// Indicate whether we're downloading something small or large: this indirectly controls
     219              :     /// timeouts: for something like an index/manifest/heatmap, we should time out faster than
     220              :     /// for layer files
     221              :     pub kind: DownloadKind,
     222              : }
     223              : 
     224              : pub enum DownloadKind {
     225              :     Large,
     226              :     Small,
     227              : }
     228              : 
     229              : #[derive(Debug, Clone)]
     230              : pub struct VersionId(pub String);
     231              : 
     232              : impl Default for DownloadOpts {
     233          544 :     fn default() -> Self {
     234          544 :         Self {
     235          544 :             etag: Default::default(),
     236          544 :             byte_start: Bound::Unbounded,
     237          544 :             byte_end: Bound::Unbounded,
     238          544 :             version_id: None,
     239          544 :             kind: DownloadKind::Large,
     240          544 :         }
     241          544 :     }
     242              : }
     243              : 
     244              : impl DownloadOpts {
     245              :     /// Returns the byte range with inclusive start and exclusive end, or None
     246              :     /// if unbounded.
     247          214 :     pub fn byte_range(&self) -> Option<(u64, Option<u64>)> {
     248          214 :         if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded {
     249          118 :             return None;
     250           96 :         }
     251           96 :         let start = match self.byte_start {
     252           18 :             Bound::Excluded(i) => i + 1,
     253           60 :             Bound::Included(i) => i,
     254           18 :             Bound::Unbounded => 0,
     255              :         };
     256           96 :         let end = match self.byte_end {
     257           48 :             Bound::Excluded(i) => Some(i),
     258           27 :             Bound::Included(i) => Some(i + 1),
     259           21 :             Bound::Unbounded => None,
     260              :         };
     261           96 :         if let Some(end) = end {
     262           75 :             assert!(start < end, "range end {end} at or before start {start}");
     263           21 :         }
     264           87 :         Some((start, end))
     265          205 :     }
     266              : 
     267              :     /// Returns the byte range as an RFC 2616 Range header value with inclusive
     268              :     /// bounds, or None if unbounded.
     269           66 :     pub fn byte_range_header(&self) -> Option<String> {
     270           66 :         self.byte_range()
     271           66 :             .map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive
     272           66 :             .map(|(start, end)| match end {
     273           30 :                 Some(end) => format!("bytes={start}-{end}"),
     274           10 :                 None => format!("bytes={start}-"),
     275           40 :             })
     276           66 :     }
     277              : }
     278              : 
     279              : /// Storage (potentially remote) API to manage its state.
     280              : /// This storage tries to be unaware of any layered repository context,
     281              : /// providing basic CRUD operations for storage files.
     282              : #[allow(async_fn_in_trait)]
     283              : pub trait RemoteStorage: Send + Sync + 'static {
     284              :     /// List objects in remote storage, with semantics matching AWS S3's [`ListObjectsV2`].
     285              :     ///
     286              :     /// The stream is guaranteed to return at least one element, even in the case of errors
     287              :     /// (in that case it's an `Err()`), or an empty `Listing`.
     288              :     ///
     289              :     /// The stream is not ending if it returns an error, as long as [`is_permanent`] returns false on the error.
     290              :     /// The `next` function can be retried, and maybe in a future retry, there will be success.
     291              :     ///
     292              :     /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not
     293              :     /// from the absolute root of the bucket.
     294              :     ///
     295              :     /// `mode` configures whether to use a delimiter.  Without a delimiter, all keys
     296              :     /// within the prefix are listed in the `keys` of the result.  With a delimiter, any "directories" at the top level of
     297              :     /// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are
     298              :     /// returned in `keys` ().
     299              :     ///
     300              :     /// `max_keys` controls the maximum number of keys that will be returned.  If this is None, this function
     301              :     /// will iteratively call listobjects until it runs out of keys.  Note that this is not safe to use on
     302              :     /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure.
     303              :     ///
     304              :     /// [`ListObjectsV2`]: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
     305              :     /// [`is_permanent`]: DownloadError::is_permanent
     306              :     fn list_streaming(
     307              :         &self,
     308              :         prefix: Option<&RemotePath>,
     309              :         mode: ListingMode,
     310              :         max_keys: Option<NonZeroU32>,
     311              :         cancel: &CancellationToken,
     312              :     ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;
     313              : 
     314           69 :     async fn list(
     315           69 :         &self,
     316           69 :         prefix: Option<&RemotePath>,
     317           69 :         mode: ListingMode,
     318           69 :         max_keys: Option<NonZeroU32>,
     319           69 :         cancel: &CancellationToken,
     320           69 :     ) -> Result<Listing, DownloadError> {
     321           69 :         let mut stream = pin!(self.list_streaming(prefix, mode, max_keys, cancel));
     322           69 :         let mut combined = stream.next().await.expect("At least one item required")?;
     323          117 :         while let Some(list) = stream.next().await {
     324           48 :             let list = list?;
     325           48 :             combined.keys.extend(list.keys.into_iter());
     326           48 :             combined.prefixes.extend_from_slice(&list.prefixes);
     327              :         }
     328           69 :         Ok(combined)
     329           69 :     }
     330              : 
     331              :     async fn list_versions(
     332              :         &self,
     333              :         prefix: Option<&RemotePath>,
     334              :         mode: ListingMode,
     335              :         max_keys: Option<NonZeroU32>,
     336              :         cancel: &CancellationToken,
     337              :     ) -> Result<VersionListing, DownloadError>;
     338              : 
     339              :     /// Obtain metadata information about an object.
     340              :     async fn head_object(
     341              :         &self,
     342              :         key: &RemotePath,
     343              :         cancel: &CancellationToken,
     344              :     ) -> Result<ListingObject, DownloadError>;
     345              : 
     346              :     /// Streams the local file contents into remote into the remote storage entry.
     347              :     ///
     348              :     /// If the operation fails because of timeout or cancellation, the root cause of the error will be
     349              :     /// set to `TimeoutOrCancel`.
     350              :     async fn upload(
     351              :         &self,
     352              :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     353              :         // S3 PUT request requires the content length to be specified,
     354              :         // otherwise it starts to fail with the concurrent connection count increasing.
     355              :         data_size_bytes: usize,
     356              :         to: &RemotePath,
     357              :         metadata: Option<StorageMetadata>,
     358              :         cancel: &CancellationToken,
     359              :     ) -> anyhow::Result<()>;
     360              : 
     361              :     /// Streams the remote storage entry contents.
     362              :     ///
     363              :     /// The returned download stream will obey initial timeout and cancellation signal by erroring
     364              :     /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
     365              :     /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
     366              :     ///
     367              :     /// Returns the metadata, if any was stored with the file previously.
     368              :     async fn download(
     369              :         &self,
     370              :         from: &RemotePath,
     371              :         opts: &DownloadOpts,
     372              :         cancel: &CancellationToken,
     373              :     ) -> Result<Download, DownloadError>;
     374              : 
     375              :     /// Delete a single path from remote storage.
     376              :     ///
     377              :     /// If the operation fails because of timeout or cancellation, the root cause of the error will be
     378              :     /// set to `TimeoutOrCancel`. In such situation it is unknown if the deletion went through.
     379              :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()>;
     380              : 
     381              :     /// Delete a multiple paths from remote storage.
     382              :     ///
     383              :     /// If the operation fails because of timeout or cancellation, the root cause of the error will be
     384              :     /// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
     385              :     /// through.
     386              :     async fn delete_objects(
     387              :         &self,
     388              :         paths: &[RemotePath],
     389              :         cancel: &CancellationToken,
     390              :     ) -> anyhow::Result<()>;
     391              : 
     392              :     /// Returns the maximum number of keys that a call to [`Self::delete_objects`] can delete without chunking
     393              :     ///
     394              :     /// The value returned is only an optimization hint, One can pass larger number of objects to
     395              :     /// `delete_objects` as well.
     396              :     ///
     397              :     /// The value is guaranteed to be >= 1.
     398              :     fn max_keys_per_delete(&self) -> usize;
     399              : 
     400              :     /// Deletes all objects matching the given prefix.
     401              :     ///
     402              :     /// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
     403              :     /// delete /a/b, /a/b/*, /a/bc, /a/bc/*, etc.
     404              :     ///
     405              :     /// If the operation fails because of timeout or cancellation, the root cause of the error will
     406              :     /// be set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
     407              :     /// through.
     408           24 :     async fn delete_prefix(
     409           24 :         &self,
     410           24 :         prefix: &RemotePath,
     411           24 :         cancel: &CancellationToken,
     412           24 :     ) -> anyhow::Result<()> {
     413           24 :         let mut stream =
     414           24 :             pin!(self.list_streaming(Some(prefix), ListingMode::NoDelimiter, None, cancel));
     415           51 :         while let Some(result) = stream.next().await {
     416           27 :             let keys = match result {
     417           27 :                 Ok(listing) if listing.keys.is_empty() => continue,
     418           18 :                 Ok(listing) => listing.keys.into_iter().map(|o| o.key).collect_vec(),
     419            0 :                 Err(DownloadError::Cancelled) => return Err(TimeoutOrCancel::Cancel.into()),
     420            0 :                 Err(DownloadError::Timeout) => return Err(TimeoutOrCancel::Timeout.into()),
     421            0 :                 Err(err) => return Err(err.into()),
     422              :             };
     423           18 :             tracing::info!("Deleting {} keys from remote storage", keys.len());
     424           18 :             self.delete_objects(&keys, cancel).await?;
     425              :         }
     426           24 :         Ok(())
     427            0 :     }
     428              : 
     429              :     /// Copy a remote object inside a bucket from one path to another.
     430              :     async fn copy(
     431              :         &self,
     432              :         from: &RemotePath,
     433              :         to: &RemotePath,
     434              :         cancel: &CancellationToken,
     435              :     ) -> anyhow::Result<()>;
     436              : 
     437              :     /// Resets the content of everything with the given prefix to the given state
     438              :     async fn time_travel_recover(
     439              :         &self,
     440              :         prefix: Option<&RemotePath>,
     441              :         timestamp: SystemTime,
     442              :         done_if_after: SystemTime,
     443              :         cancel: &CancellationToken,
     444              :         complexity_limit: Option<NonZeroU32>,
     445              :     ) -> Result<(), TimeTravelError>;
     446              : }
     447              : 
     448              : /// Data part of an ongoing [`Download`].
     449              : ///
     450              : /// `DownloadStream` is sensitive to the timeout and cancellation used with the original
     451              : /// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
     452              : /// with `tokio::io::copy_buf`.
     453              : // This has 'static because safekeepers do not use cancellation tokens (yet)
     454              : pub type DownloadStream =
     455              :     Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>>;
     456              : 
     457              : pub struct Download {
     458              :     pub download_stream: DownloadStream,
     459              :     /// The last time the file was modified (`last-modified` HTTP header)
     460              :     pub last_modified: SystemTime,
     461              :     /// A way to identify this specific version of the resource (`etag` HTTP header)
     462              :     pub etag: Etag,
     463              :     /// Extra key-value data, associated with the current remote file.
     464              :     pub metadata: Option<StorageMetadata>,
     465              : }
     466              : 
     467              : impl Debug for Download {
     468            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     469            0 :         f.debug_struct("Download")
     470            0 :             .field("metadata", &self.metadata)
     471            0 :             .finish()
     472            0 :     }
     473              : }
     474              : 
     475              : /// Every storage, currently supported.
     476              : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
     477              : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
     478              : #[derive(Clone)]
     479              : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
     480              :     LocalFs(LocalFs),
     481              :     AwsS3(Arc<S3Bucket>),
     482              :     AzureBlob(Arc<AzureBlobStorage>),
     483              :     Unreliable(Other),
     484              : }
     485              : 
     486              : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
     487              :     // See [`RemoteStorage::list`].
     488          307 :     pub async fn list(
     489          307 :         &self,
     490          307 :         prefix: Option<&RemotePath>,
     491          307 :         mode: ListingMode,
     492          307 :         max_keys: Option<NonZeroU32>,
     493          307 :         cancel: &CancellationToken,
     494          307 :     ) -> Result<Listing, DownloadError> {
     495          307 :         match self {
     496          238 :             Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await,
     497           50 :             Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
     498           19 :             Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
     499            0 :             Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
     500              :         }
     501            0 :     }
     502              : 
     503              :     // See [`RemoteStorage::list_streaming`].
     504            3 :     pub fn list_streaming<'a>(
     505            3 :         &'a self,
     506            3 :         prefix: Option<&'a RemotePath>,
     507            3 :         mode: ListingMode,
     508            3 :         max_keys: Option<NonZeroU32>,
     509            3 :         cancel: &'a CancellationToken,
     510            3 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
     511            3 :         match self {
     512            0 :             Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
     513            0 :                 as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
     514            2 :             Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
     515            1 :             Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
     516            0 :             Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
     517              :         }
     518            0 :     }
     519              : 
     520              :     // See [`RemoteStorage::list_versions`].
     521            0 :     pub async fn list_versions<'a>(
     522            0 :         &'a self,
     523            0 :         prefix: Option<&'a RemotePath>,
     524            0 :         mode: ListingMode,
     525            0 :         max_keys: Option<NonZeroU32>,
     526            0 :         cancel: &'a CancellationToken,
     527            0 :     ) -> Result<VersionListing, DownloadError> {
     528            0 :         match self {
     529            0 :             Self::LocalFs(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
     530            0 :             Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
     531            0 :             Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
     532            0 :             Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
     533              :         }
     534            0 :     }
     535              : 
     536              :     // See [`RemoteStorage::head_object`].
     537            9 :     pub async fn head_object(
     538            9 :         &self,
     539            9 :         key: &RemotePath,
     540            9 :         cancel: &CancellationToken,
     541            9 :     ) -> Result<ListingObject, DownloadError> {
     542            9 :         match self {
     543            0 :             Self::LocalFs(s) => s.head_object(key, cancel).await,
     544            6 :             Self::AwsS3(s) => s.head_object(key, cancel).await,
     545            3 :             Self::AzureBlob(s) => s.head_object(key, cancel).await,
     546            0 :             Self::Unreliable(s) => s.head_object(key, cancel).await,
     547              :         }
     548            0 :     }
     549              : 
     550              :     /// See [`RemoteStorage::upload`]
     551         2170 :     pub async fn upload(
     552         2170 :         &self,
     553         2170 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     554         2170 :         data_size_bytes: usize,
     555         2170 :         to: &RemotePath,
     556         2170 :         metadata: Option<StorageMetadata>,
     557         2170 :         cancel: &CancellationToken,
     558         2170 :     ) -> anyhow::Result<()> {
     559         2170 :         match self {
     560         1855 :             Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
     561          198 :             Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
     562           93 :             Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
     563           24 :             Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
     564              :         }
     565            0 :     }
     566              : 
     567              :     /// See [`RemoteStorage::download`]
     568          476 :     pub async fn download(
     569          476 :         &self,
     570          476 :         from: &RemotePath,
     571          476 :         opts: &DownloadOpts,
     572          476 :         cancel: &CancellationToken,
     573          476 :     ) -> Result<Download, DownloadError> {
     574          476 :         match self {
     575          432 :             Self::LocalFs(s) => s.download(from, opts, cancel).await,
     576           33 :             Self::AwsS3(s) => s.download(from, opts, cancel).await,
     577           11 :             Self::AzureBlob(s) => s.download(from, opts, cancel).await,
     578            0 :             Self::Unreliable(s) => s.download(from, opts, cancel).await,
     579              :         }
     580            0 :     }
     581              : 
     582              :     /// See [`RemoteStorage::delete`]
     583          471 :     pub async fn delete(
     584          471 :         &self,
     585          471 :         path: &RemotePath,
     586          471 :         cancel: &CancellationToken,
     587          471 :     ) -> anyhow::Result<()> {
     588          471 :         match self {
     589          211 :             Self::LocalFs(s) => s.delete(path, cancel).await,
     590          174 :             Self::AwsS3(s) => s.delete(path, cancel).await,
     591           86 :             Self::AzureBlob(s) => s.delete(path, cancel).await,
     592            0 :             Self::Unreliable(s) => s.delete(path, cancel).await,
     593              :         }
     594            0 :     }
     595              : 
     596              :     /// See [`RemoteStorage::delete_objects`]
     597           18 :     pub async fn delete_objects(
     598           18 :         &self,
     599           18 :         paths: &[RemotePath],
     600           18 :         cancel: &CancellationToken,
     601           18 :     ) -> anyhow::Result<()> {
     602           18 :         match self {
     603            3 :             Self::LocalFs(s) => s.delete_objects(paths, cancel).await,
     604           12 :             Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
     605            3 :             Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
     606            0 :             Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
     607              :         }
     608            0 :     }
     609              : 
     610              :     /// [`RemoteStorage::max_keys_per_delete`]
     611            4 :     pub fn max_keys_per_delete(&self) -> usize {
     612            4 :         match self {
     613            4 :             Self::LocalFs(s) => s.max_keys_per_delete(),
     614            0 :             Self::AwsS3(s) => s.max_keys_per_delete(),
     615            0 :             Self::AzureBlob(s) => s.max_keys_per_delete(),
     616            0 :             Self::Unreliable(s) => s.max_keys_per_delete(),
     617              :         }
     618            0 :     }
     619              : 
     620              :     /// See [`RemoteStorage::delete_prefix`]
     621           24 :     pub async fn delete_prefix(
     622           24 :         &self,
     623           24 :         prefix: &RemotePath,
     624           24 :         cancel: &CancellationToken,
     625           24 :     ) -> anyhow::Result<()> {
     626           24 :         match self {
     627            6 :             Self::LocalFs(s) => s.delete_prefix(prefix, cancel).await,
     628           12 :             Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
     629            6 :             Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
     630            0 :             Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
     631              :         }
     632            0 :     }
     633              : 
     634              :     /// See [`RemoteStorage::copy`]
     635            3 :     pub async fn copy_object(
     636            3 :         &self,
     637            3 :         from: &RemotePath,
     638            3 :         to: &RemotePath,
     639            3 :         cancel: &CancellationToken,
     640            3 :     ) -> anyhow::Result<()> {
     641            3 :         match self {
     642            0 :             Self::LocalFs(s) => s.copy(from, to, cancel).await,
     643            2 :             Self::AwsS3(s) => s.copy(from, to, cancel).await,
     644            1 :             Self::AzureBlob(s) => s.copy(from, to, cancel).await,
     645            0 :             Self::Unreliable(s) => s.copy(from, to, cancel).await,
     646              :         }
     647            0 :     }
     648              : 
     649              :     /// See [`RemoteStorage::time_travel_recover`].
     650            6 :     pub async fn time_travel_recover(
     651            6 :         &self,
     652            6 :         prefix: Option<&RemotePath>,
     653            6 :         timestamp: SystemTime,
     654            6 :         done_if_after: SystemTime,
     655            6 :         cancel: &CancellationToken,
     656            6 :         complexity_limit: Option<NonZeroU32>,
     657            6 :     ) -> Result<(), TimeTravelError> {
     658            6 :         match self {
     659            0 :             Self::LocalFs(s) => {
     660            0 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
     661            0 :                     .await
     662              :             }
     663            6 :             Self::AwsS3(s) => {
     664            6 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
     665            6 :                     .await
     666              :             }
     667            0 :             Self::AzureBlob(s) => {
     668            0 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
     669            0 :                     .await
     670              :             }
     671            0 :             Self::Unreliable(s) => {
     672            0 :                 s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
     673            0 :                     .await
     674              :             }
     675              :         }
     676            0 :     }
     677              : }
     678              : 
     679              : impl GenericRemoteStorage {
     680            0 :     pub async fn from_storage_kind(kind: TypedRemoteStorageKind) -> anyhow::Result<Self> {
     681            0 :         Self::from_config(&RemoteStorageConfig {
     682            0 :             storage: kind.into(),
     683            0 :             timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     684            0 :             small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
     685            0 :         })
     686            0 :         .await
     687            0 :     }
     688              : 
     689          163 :     pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
     690          163 :         let timeout = storage_config.timeout;
     691              : 
     692              :         // If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
     693          163 :         let small_timeout = std::cmp::min(storage_config.small_timeout, timeout);
     694              : 
     695          163 :         Ok(match &storage_config.storage {
     696          127 :             RemoteStorageKind::LocalFs { local_path: path } => {
     697          127 :                 info!("Using fs root '{path}' as a remote storage");
     698          127 :                 Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
     699              :             }
     700           26 :             RemoteStorageKind::AwsS3(s3_config) => {
     701              :                 // The profile and access key id are only printed here for debugging purposes,
     702              :                 // their values don't indicate the eventually taken choice for auth.
     703           26 :                 let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
     704           26 :                 let access_key_id =
     705           26 :                     std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
     706           26 :                 info!(
     707            0 :                     "Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
     708              :                     s3_config.bucket_name,
     709              :                     s3_config.bucket_region,
     710              :                     s3_config.prefix_in_bucket,
     711              :                     s3_config.endpoint
     712              :                 );
     713           26 :                 Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?))
     714              :             }
     715           10 :             RemoteStorageKind::AzureContainer(azure_config) => {
     716           10 :                 let storage_account = azure_config
     717           10 :                     .storage_account
     718           10 :                     .as_deref()
     719           10 :                     .unwrap_or("<AZURE_STORAGE_ACCOUNT>");
     720           10 :                 info!(
     721            0 :                     "Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
     722              :                     azure_config.container_name,
     723              :                     azure_config.container_region,
     724              :                     azure_config.prefix_in_container
     725              :                 );
     726           10 :                 Self::AzureBlob(Arc::new(AzureBlobStorage::new(
     727           10 :                     azure_config,
     728           10 :                     timeout,
     729           10 :                     small_timeout,
     730            0 :                 )?))
     731              :             }
     732              :         })
     733          163 :     }
     734              : 
     735              :     /* BEGIN_HADRON */
     736            2 :     pub fn unreliable_wrapper(s: Self, fail_first: u64, fail_probability: u64) -> Self {
     737            2 :         Self::Unreliable(Arc::new(UnreliableWrapper::new(
     738            2 :             s,
     739            2 :             fail_first,
     740            2 :             fail_probability,
     741            2 :         )))
     742            2 :     }
     743              :     /* END_HADRON */
     744              : 
     745              :     /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
     746          891 :     pub async fn upload_storage_object(
     747          891 :         &self,
     748          891 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     749          891 :         from_size_bytes: usize,
     750          891 :         to: &RemotePath,
     751          891 :         cancel: &CancellationToken,
     752          891 :     ) -> anyhow::Result<()> {
     753          891 :         self.upload(from, from_size_bytes, to, None, cancel)
     754          891 :             .await
     755          887 :             .with_context(|| {
     756            0 :                 format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
     757            0 :             })
     758            0 :     }
     759              : 
     760              :     /// The name of the bucket/container/etc.
     761            0 :     pub fn bucket_name(&self) -> Option<&str> {
     762            0 :         match self {
     763            0 :             Self::LocalFs(_s) => None,
     764            0 :             Self::AwsS3(s) => Some(s.bucket_name()),
     765            0 :             Self::AzureBlob(s) => Some(s.container_name()),
     766            0 :             Self::Unreliable(_s) => None,
     767              :         }
     768            0 :     }
     769              : }
     770              : 
     771              : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
     772              : /// Immutable, cannot be changed once the file is created.
     773              : #[derive(Debug, Clone, PartialEq, Eq)]
     774              : pub struct StorageMetadata(HashMap<String, String>);
     775              : 
     776              : impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
     777          114 :     fn from(arr: [(&str, &str); N]) -> Self {
     778          114 :         let map: HashMap<String, String> = arr
     779          114 :             .iter()
     780          114 :             .map(|(k, v)| (k.to_string(), v.to_string()))
     781          114 :             .collect();
     782          114 :         Self(map)
     783            0 :     }
     784              : }
     785              : 
     786              : struct ConcurrencyLimiter {
     787              :     // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
     788              :     // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
     789              :     // The helps to ensure we don't exceed the thresholds.
     790              :     write: Arc<Semaphore>,
     791              :     read: Arc<Semaphore>,
     792              : }
     793              : 
     794              : impl ConcurrencyLimiter {
     795          730 :     fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
     796          730 :         match kind {
     797           44 :             RequestKind::Get => &self.read,
     798          291 :             RequestKind::Put => &self.write,
     799           64 :             RequestKind::List => &self.read,
     800          287 :             RequestKind::Delete => &self.write,
     801            3 :             RequestKind::Copy => &self.write,
     802            6 :             RequestKind::TimeTravel => &self.write,
     803            9 :             RequestKind::Head => &self.read,
     804           26 :             RequestKind::ListVersions => &self.read,
     805              :         }
     806          730 :     }
     807              : 
     808          697 :     async fn acquire(
     809          697 :         &self,
     810          697 :         kind: RequestKind,
     811          697 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
     812          697 :         self.for_kind(kind).acquire().await
     813          697 :     }
     814              : 
     815           33 :     async fn acquire_owned(
     816           33 :         &self,
     817           33 :         kind: RequestKind,
     818           33 :     ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
     819           33 :         Arc::clone(self.for_kind(kind)).acquire_owned().await
     820           33 :     }
     821              : 
     822           51 :     fn new(limit: usize) -> ConcurrencyLimiter {
     823           51 :         Self {
     824           51 :             read: Arc::new(Semaphore::new(limit)),
     825           51 :             write: Arc::new(Semaphore::new(limit)),
     826           51 :         }
     827           51 :     }
     828              : }
     829              : 
     830              : #[cfg(test)]
     831              : mod tests {
     832              :     use super::*;
     833              : 
     834              :     /// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges
     835              :     /// with optional end bound, or None when unbounded.
     836              :     #[test]
     837            3 :     fn download_opts_byte_range() {
     838              :         // Consider using test_case or a similar table-driven test framework.
     839            3 :         let cases = [
     840            3 :             // (byte_start, byte_end, expected)
     841            3 :             (Bound::Unbounded, Bound::Unbounded, None),
     842            3 :             (Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))),
     843            3 :             (Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))),
     844            3 :             (Bound::Included(3), Bound::Unbounded, Some((3, None))),
     845            3 :             (Bound::Included(3), Bound::Included(7), Some((3, Some(8)))),
     846            3 :             (Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))),
     847            3 :             (Bound::Excluded(3), Bound::Unbounded, Some((4, None))),
     848            3 :             (Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))),
     849            3 :             (Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))),
     850            3 :             // 1-sized ranges are fine, 0 aren't and will panic (separate test).
     851            3 :             (Bound::Included(3), Bound::Included(3), Some((3, Some(4)))),
     852            3 :             (Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))),
     853            3 :         ];
     854              : 
     855           36 :         for (byte_start, byte_end, expect) in cases {
     856           33 :             let opts = DownloadOpts {
     857           33 :                 byte_start,
     858           33 :                 byte_end,
     859           33 :                 ..Default::default()
     860           33 :             };
     861           33 :             let result = opts.byte_range();
     862           33 :             assert_eq!(
     863              :                 result, expect,
     864            0 :                 "byte_start={byte_start:?} byte_end={byte_end:?}"
     865              :             );
     866              : 
     867              :             // Check generated HTTP header, which uses an inclusive range.
     868           33 :             let expect_header = expect.map(|(start, end)| match end {
     869           24 :                 Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end
     870            6 :                 None => format!("bytes={start}-"),
     871           30 :             });
     872           33 :             assert_eq!(
     873           33 :                 opts.byte_range_header(),
     874              :                 expect_header,
     875            0 :                 "byte_start={byte_start:?} byte_end={byte_end:?}"
     876              :             );
     877              :         }
     878            3 :     }
     879              : 
     880              :     /// DownloadOpts::byte_range() zero-sized byte range should panic.
     881              :     #[test]
     882              :     #[should_panic]
     883            3 :     fn download_opts_byte_range_zero() {
     884            3 :         DownloadOpts {
     885            3 :             byte_start: Bound::Included(3),
     886            3 :             byte_end: Bound::Excluded(3),
     887            3 :             ..Default::default()
     888            3 :         }
     889            3 :         .byte_range();
     890            3 :     }
     891              : 
     892              :     /// DownloadOpts::byte_range() negative byte range should panic.
     893              :     #[test]
     894              :     #[should_panic]
     895            3 :     fn download_opts_byte_range_negative() {
     896            3 :         DownloadOpts {
     897            3 :             byte_start: Bound::Included(3),
     898            3 :             byte_end: Bound::Included(2),
     899            3 :             ..Default::default()
     900            3 :         }
     901            3 :         .byte_range();
     902            3 :     }
     903              : 
     904              :     #[test]
     905            3 :     fn test_object_name() {
     906            3 :         let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
     907            3 :         assert_eq!(k.object_name(), Some("c"));
     908              : 
     909            3 :         let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
     910            3 :         assert_eq!(k.object_name(), Some("c"));
     911              : 
     912            3 :         let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
     913            3 :         assert_eq!(k.object_name(), Some("a"));
     914              : 
     915              :         // XXX is it impossible to have an empty key?
     916            3 :         let k = RemotePath::new(Utf8Path::new("")).unwrap();
     917            3 :         assert_eq!(k.object_name(), None);
     918            3 :     }
     919              : 
     920              :     #[test]
     921            3 :     fn rempte_path_cannot_be_created_from_absolute_ones() {
     922            3 :         let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
     923            3 :         assert_eq!(err.to_string(), "Path \"/\" is not relative");
     924            3 :     }
     925              : }
        

Generated by: LCOV version 2.1-beta