Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //! * [`azure_blob`] allows to use Azure Blob storage as an external storage
8 : //!
9 : #![deny(unsafe_code)]
10 : #![deny(clippy::undocumented_unsafe_blocks)]
11 :
12 : mod azure_blob;
13 : mod config;
14 : mod error;
15 : mod local_fs;
16 : mod metrics;
17 : mod s3_bucket;
18 : mod simulate_failures;
19 : mod support;
20 :
21 : use std::{
22 : collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime,
23 : };
24 :
25 : use anyhow::Context;
26 : use camino::{Utf8Path, Utf8PathBuf};
27 :
28 : use bytes::Bytes;
29 : use futures::{stream::Stream, StreamExt};
30 : use serde::{Deserialize, Serialize};
31 : use tokio::sync::Semaphore;
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::info;
34 :
35 : pub use self::{
36 : azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket,
37 : simulate_failures::UnreliableWrapper,
38 : };
39 : use s3_bucket::RequestKind;
40 :
41 : pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
42 :
43 : /// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
44 : pub use azure_core::Etag;
45 :
46 : pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
47 :
48 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
49 : /// ~200 RPS for IAM services
50 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
51 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
52 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
53 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
54 : /// Set this limit analogously to the S3 limit
55 : ///
56 : /// Here, a limit of max 20k concurrent connections was noted.
57 : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
58 : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
59 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
60 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
61 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
62 :
63 : /// As defined in S3 docs
64 : pub const MAX_KEYS_PER_DELETE: usize = 1000;
65 :
66 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
67 :
68 : /// Path on the remote storage, relative to some inner prefix.
69 : /// The prefix is an implementation detail, that allows representing local paths
70 : /// as the remote ones, stripping the local storage prefix away.
71 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
72 : pub struct RemotePath(Utf8PathBuf);
73 :
74 : impl Serialize for RemotePath {
75 0 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
76 0 : where
77 0 : S: serde::Serializer,
78 0 : {
79 0 : serializer.collect_str(self)
80 0 : }
81 : }
82 :
83 : impl<'de> Deserialize<'de> for RemotePath {
84 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
85 0 : where
86 0 : D: serde::Deserializer<'de>,
87 0 : {
88 0 : let str = String::deserialize(deserializer)?;
89 0 : Ok(Self(Utf8PathBuf::from(&str)))
90 0 : }
91 : }
92 :
93 : impl std::fmt::Display for RemotePath {
94 268 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 268 : std::fmt::Display::fmt(&self.0, f)
96 268 : }
97 : }
98 :
99 : impl RemotePath {
100 3575 : pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
101 3575 : anyhow::ensure!(
102 3575 : relative_path.is_relative(),
103 6 : "Path {relative_path:?} is not relative"
104 : );
105 3569 : Ok(Self(relative_path.to_path_buf()))
106 3575 : }
107 :
108 3217 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
109 3217 : Self::new(Utf8Path::new(relative_path))
110 3217 : }
111 :
112 3413 : pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
113 3413 : base_path.join(&self.0)
114 3413 : }
115 :
116 22 : pub fn object_name(&self) -> Option<&str> {
117 22 : self.0.file_name()
118 22 : }
119 :
120 99 : pub fn join(&self, path: impl AsRef<Utf8Path>) -> Self {
121 99 : Self(self.0.join(path))
122 99 : }
123 :
124 528 : pub fn get_path(&self) -> &Utf8PathBuf {
125 528 : &self.0
126 528 : }
127 :
128 0 : pub fn extension(&self) -> Option<&str> {
129 0 : self.0.extension()
130 0 : }
131 :
132 78 : pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
133 78 : self.0.strip_prefix(&p.0)
134 78 : }
135 :
136 180 : pub fn add_trailing_slash(&self) -> Self {
137 180 : // Unwrap safety inputs are guararnteed to be valid UTF-8
138 180 : Self(format!("{}/", self.0).try_into().unwrap())
139 180 : }
140 : }
141 :
142 : /// We don't need callers to be able to pass arbitrary delimiters: just control
143 : /// whether listings will use a '/' separator or not.
144 : ///
145 : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
146 : /// NoDelimiter mode will only populate `keys`.
147 : #[derive(Copy, Clone)]
148 : pub enum ListingMode {
149 : WithDelimiter,
150 : NoDelimiter,
151 : }
152 :
153 : #[derive(PartialEq, Eq, Debug)]
154 : pub struct ListingObject {
155 : pub key: RemotePath,
156 : pub last_modified: SystemTime,
157 : pub size: u64,
158 : }
159 :
160 : #[derive(Default)]
161 : pub struct Listing {
162 : pub prefixes: Vec<RemotePath>,
163 : pub keys: Vec<ListingObject>,
164 : }
165 :
166 : /// Storage (potentially remote) API to manage its state.
167 : /// This storage tries to be unaware of any layered repository context,
168 : /// providing basic CRUD operations for storage files.
169 : #[allow(async_fn_in_trait)]
170 : pub trait RemoteStorage: Send + Sync + 'static {
171 : /// List objects in remote storage, with semantics matching AWS S3's [`ListObjectsV2`].
172 : ///
173 : /// The stream is guaranteed to return at least one element, even in the case of errors
174 : /// (in that case it's an `Err()`), or an empty `Listing`.
175 : ///
176 : /// The stream is not ending if it returns an error, as long as [`is_permanent`] returns false on the error.
177 : /// The `next` function can be retried, and maybe in a future retry, there will be success.
178 : ///
179 : /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not
180 : /// from the absolute root of the bucket.
181 : ///
182 : /// `mode` configures whether to use a delimiter. Without a delimiter, all keys
183 : /// within the prefix are listed in the `keys` of the result. With a delimiter, any "directories" at the top level of
184 : /// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are
185 : /// returned in `keys` ().
186 : ///
187 : /// `max_keys` controls the maximum number of keys that will be returned. If this is None, this function
188 : /// will iteratively call listobjects until it runs out of keys. Note that this is not safe to use on
189 : /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure.
190 : ///
191 : /// [`ListObjectsV2`]: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
192 : /// [`is_permanent`]: DownloadError::is_permanent
193 : fn list_streaming(
194 : &self,
195 : prefix: Option<&RemotePath>,
196 : mode: ListingMode,
197 : max_keys: Option<NonZeroU32>,
198 : cancel: &CancellationToken,
199 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;
200 :
201 30 : async fn list(
202 30 : &self,
203 30 : prefix: Option<&RemotePath>,
204 30 : mode: ListingMode,
205 30 : max_keys: Option<NonZeroU32>,
206 30 : cancel: &CancellationToken,
207 30 : ) -> Result<Listing, DownloadError> {
208 30 : let mut stream = std::pin::pin!(self.list_streaming(prefix, mode, max_keys, cancel));
209 133 : let mut combined = stream.next().await.expect("At least one item required")?;
210 48 : while let Some(list) = stream.next().await {
211 12 : let list = list?;
212 12 : combined.keys.extend(list.keys.into_iter());
213 12 : combined.prefixes.extend_from_slice(&list.prefixes);
214 : }
215 30 : Ok(combined)
216 30 : }
217 :
218 : /// Streams the local file contents into remote into the remote storage entry.
219 : ///
220 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
221 : /// set to `TimeoutOrCancel`.
222 : async fn upload(
223 : &self,
224 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
225 : // S3 PUT request requires the content length to be specified,
226 : // otherwise it starts to fail with the concurrent connection count increasing.
227 : data_size_bytes: usize,
228 : to: &RemotePath,
229 : metadata: Option<StorageMetadata>,
230 : cancel: &CancellationToken,
231 : ) -> anyhow::Result<()>;
232 :
233 : /// Streams the remote storage entry contents.
234 : ///
235 : /// The returned download stream will obey initial timeout and cancellation signal by erroring
236 : /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
237 : /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
238 : ///
239 : /// Returns the metadata, if any was stored with the file previously.
240 : async fn download(
241 : &self,
242 : from: &RemotePath,
243 : cancel: &CancellationToken,
244 : ) -> Result<Download, DownloadError>;
245 :
246 : /// Streams a given byte range of the remote storage entry contents.
247 : ///
248 : /// The returned download stream will obey initial timeout and cancellation signal by erroring
249 : /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
250 : /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
251 : ///
252 : /// Returns the metadata, if any was stored with the file previously.
253 : async fn download_byte_range(
254 : &self,
255 : from: &RemotePath,
256 : start_inclusive: u64,
257 : end_exclusive: Option<u64>,
258 : cancel: &CancellationToken,
259 : ) -> Result<Download, DownloadError>;
260 :
261 : /// Delete a single path from remote storage.
262 : ///
263 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
264 : /// set to `TimeoutOrCancel`. In such situation it is unknown if the deletion went through.
265 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()>;
266 :
267 : /// Delete a multiple paths from remote storage.
268 : ///
269 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
270 : /// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
271 : /// through.
272 : async fn delete_objects<'a>(
273 : &self,
274 : paths: &'a [RemotePath],
275 : cancel: &CancellationToken,
276 : ) -> anyhow::Result<()>;
277 :
278 : /// Copy a remote object inside a bucket from one path to another.
279 : async fn copy(
280 : &self,
281 : from: &RemotePath,
282 : to: &RemotePath,
283 : cancel: &CancellationToken,
284 : ) -> anyhow::Result<()>;
285 :
286 : /// Resets the content of everything with the given prefix to the given state
287 : async fn time_travel_recover(
288 : &self,
289 : prefix: Option<&RemotePath>,
290 : timestamp: SystemTime,
291 : done_if_after: SystemTime,
292 : cancel: &CancellationToken,
293 : ) -> Result<(), TimeTravelError>;
294 : }
295 :
296 : /// DownloadStream is sensitive to the timeout and cancellation used with the original
297 : /// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
298 : /// with `tokio::io::copy_buf`.
299 : // This has 'static because safekeepers do not use cancellation tokens (yet)
300 : pub type DownloadStream =
301 : Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>>;
302 :
303 : pub struct Download {
304 : pub download_stream: DownloadStream,
305 : /// The last time the file was modified (`last-modified` HTTP header)
306 : pub last_modified: SystemTime,
307 : /// A way to identify this specific version of the resource (`etag` HTTP header)
308 : pub etag: Etag,
309 : /// Extra key-value data, associated with the current remote file.
310 : pub metadata: Option<StorageMetadata>,
311 : }
312 :
313 : impl Debug for Download {
314 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
315 0 : f.debug_struct("Download")
316 0 : .field("metadata", &self.metadata)
317 0 : .finish()
318 0 : }
319 : }
320 :
321 : /// Every storage, currently supported.
322 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
323 : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
324 : #[derive(Clone)]
325 : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
326 : LocalFs(LocalFs),
327 : AwsS3(Arc<S3Bucket>),
328 : AzureBlob(Arc<AzureBlobStorage>),
329 : Unreliable(Other),
330 : }
331 :
332 : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
333 : // See [`RemoteStorage::list`].
334 210 : pub async fn list(
335 210 : &self,
336 210 : prefix: Option<&RemotePath>,
337 210 : mode: ListingMode,
338 210 : max_keys: Option<NonZeroU32>,
339 210 : cancel: &CancellationToken,
340 210 : ) -> Result<Listing, DownloadError> {
341 210 : match self {
342 689 : Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await,
343 118 : Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
344 63 : Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
345 0 : Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
346 : }
347 210 : }
348 :
349 : // See [`RemoteStorage::list_streaming`].
350 3 : pub fn list_streaming<'a>(
351 3 : &'a self,
352 3 : prefix: Option<&'a RemotePath>,
353 3 : mode: ListingMode,
354 3 : max_keys: Option<NonZeroU32>,
355 3 : cancel: &'a CancellationToken,
356 3 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
357 3 : match self {
358 0 : Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
359 0 : as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
360 2 : Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
361 1 : Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
362 0 : Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
363 : }
364 3 : }
365 :
366 : /// See [`RemoteStorage::upload`]
367 2986 : pub async fn upload(
368 2986 : &self,
369 2986 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
370 2986 : data_size_bytes: usize,
371 2986 : to: &RemotePath,
372 2986 : metadata: Option<StorageMetadata>,
373 2986 : cancel: &CancellationToken,
374 2986 : ) -> anyhow::Result<()> {
375 2986 : match self {
376 36155 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
377 636 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
378 268 : Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
379 76 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
380 : }
381 2870 : }
382 :
383 59 : pub async fn download(
384 59 : &self,
385 59 : from: &RemotePath,
386 59 : cancel: &CancellationToken,
387 59 : ) -> Result<Download, DownloadError> {
388 59 : match self {
389 67 : Self::LocalFs(s) => s.download(from, cancel).await,
390 19 : Self::AwsS3(s) => s.download(from, cancel).await,
391 10 : Self::AzureBlob(s) => s.download(from, cancel).await,
392 0 : Self::Unreliable(s) => s.download(from, cancel).await,
393 : }
394 59 : }
395 :
396 15 : pub async fn download_byte_range(
397 15 : &self,
398 15 : from: &RemotePath,
399 15 : start_inclusive: u64,
400 15 : end_exclusive: Option<u64>,
401 15 : cancel: &CancellationToken,
402 15 : ) -> Result<Download, DownloadError> {
403 15 : match self {
404 0 : Self::LocalFs(s) => {
405 0 : s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
406 0 : .await
407 : }
408 10 : Self::AwsS3(s) => {
409 10 : s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
410 10 : .await
411 : }
412 5 : Self::AzureBlob(s) => {
413 5 : s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
414 25 : .await
415 : }
416 0 : Self::Unreliable(s) => {
417 0 : s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
418 0 : .await
419 : }
420 : }
421 15 : }
422 :
423 : /// See [`RemoteStorage::delete`]
424 136 : pub async fn delete(
425 136 : &self,
426 136 : path: &RemotePath,
427 136 : cancel: &CancellationToken,
428 136 : ) -> anyhow::Result<()> {
429 136 : match self {
430 2 : Self::LocalFs(s) => s.delete(path, cancel).await,
431 317 : Self::AwsS3(s) => s.delete(path, cancel).await,
432 221 : Self::AzureBlob(s) => s.delete(path, cancel).await,
433 0 : Self::Unreliable(s) => s.delete(path, cancel).await,
434 : }
435 136 : }
436 :
437 : /// See [`RemoteStorage::delete_objects`]
438 21 : pub async fn delete_objects(
439 21 : &self,
440 21 : paths: &[RemotePath],
441 21 : cancel: &CancellationToken,
442 21 : ) -> anyhow::Result<()> {
443 21 : match self {
444 6 : Self::LocalFs(s) => s.delete_objects(paths, cancel).await,
445 60 : Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
446 25 : Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
447 0 : Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
448 : }
449 21 : }
450 :
451 : /// See [`RemoteStorage::copy`]
452 3 : pub async fn copy_object(
453 3 : &self,
454 3 : from: &RemotePath,
455 3 : to: &RemotePath,
456 3 : cancel: &CancellationToken,
457 3 : ) -> anyhow::Result<()> {
458 3 : match self {
459 0 : Self::LocalFs(s) => s.copy(from, to, cancel).await,
460 3 : Self::AwsS3(s) => s.copy(from, to, cancel).await,
461 5 : Self::AzureBlob(s) => s.copy(from, to, cancel).await,
462 0 : Self::Unreliable(s) => s.copy(from, to, cancel).await,
463 : }
464 3 : }
465 :
466 : /// See [`RemoteStorage::time_travel_recover`].
467 6 : pub async fn time_travel_recover(
468 6 : &self,
469 6 : prefix: Option<&RemotePath>,
470 6 : timestamp: SystemTime,
471 6 : done_if_after: SystemTime,
472 6 : cancel: &CancellationToken,
473 6 : ) -> Result<(), TimeTravelError> {
474 6 : match self {
475 0 : Self::LocalFs(s) => {
476 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
477 0 : .await
478 : }
479 6 : Self::AwsS3(s) => {
480 6 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
481 76 : .await
482 : }
483 0 : Self::AzureBlob(s) => {
484 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
485 0 : .await
486 : }
487 0 : Self::Unreliable(s) => {
488 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
489 0 : .await
490 : }
491 : }
492 6 : }
493 : }
494 :
495 : impl GenericRemoteStorage {
496 214 : pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
497 214 : let timeout = storage_config.timeout;
498 214 : Ok(match &storage_config.storage {
499 190 : RemoteStorageKind::LocalFs { local_path: path } => {
500 190 : info!("Using fs root '{path}' as a remote storage");
501 190 : Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
502 : }
503 18 : RemoteStorageKind::AwsS3(s3_config) => {
504 18 : // The profile and access key id are only printed here for debugging purposes,
505 18 : // their values don't indicate the eventually taken choice for auth.
506 18 : let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
507 18 : let access_key_id =
508 18 : std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
509 18 : info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
510 : s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
511 18 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?))
512 : }
513 6 : RemoteStorageKind::AzureContainer(azure_config) => {
514 6 : let storage_account = azure_config
515 6 : .storage_account
516 6 : .as_deref()
517 6 : .unwrap_or("<AZURE_STORAGE_ACCOUNT>");
518 6 : info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
519 : azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
520 6 : Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?))
521 : }
522 : })
523 214 : }
524 :
525 4 : pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
526 4 : Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
527 4 : }
528 :
529 : /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
530 1398 : pub async fn upload_storage_object(
531 1398 : &self,
532 1398 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
533 1398 : from_size_bytes: usize,
534 1398 : to: &RemotePath,
535 1398 : cancel: &CancellationToken,
536 1398 : ) -> anyhow::Result<()> {
537 1398 : self.upload(from, from_size_bytes, to, None, cancel)
538 4290 : .await
539 1392 : .with_context(|| {
540 0 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
541 1392 : })
542 1392 : }
543 :
544 : /// Downloads the storage object into the `to_path` provided.
545 : /// `byte_range` could be specified to dowload only a part of the file, if needed.
546 0 : pub async fn download_storage_object(
547 0 : &self,
548 0 : byte_range: Option<(u64, Option<u64>)>,
549 0 : from: &RemotePath,
550 0 : cancel: &CancellationToken,
551 0 : ) -> Result<Download, DownloadError> {
552 0 : match byte_range {
553 0 : Some((start, end)) => self.download_byte_range(from, start, end, cancel).await,
554 0 : None => self.download(from, cancel).await,
555 : }
556 0 : }
557 :
558 : /// The name of the bucket/container/etc.
559 0 : pub fn bucket_name(&self) -> Option<&str> {
560 0 : match self {
561 0 : Self::LocalFs(_s) => None,
562 0 : Self::AwsS3(s) => Some(s.bucket_name()),
563 0 : Self::AzureBlob(s) => Some(s.container_name()),
564 0 : Self::Unreliable(_s) => None,
565 : }
566 0 : }
567 : }
568 :
569 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
570 : /// Immutable, cannot be changed once the file is created.
571 : #[derive(Debug, Clone, PartialEq, Eq)]
572 : pub struct StorageMetadata(HashMap<String, String>);
573 :
574 : impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
575 0 : fn from(arr: [(&str, &str); N]) -> Self {
576 0 : let map: HashMap<String, String> = arr
577 0 : .iter()
578 0 : .map(|(k, v)| (k.to_string(), v.to_string()))
579 0 : .collect();
580 0 : Self(map)
581 0 : }
582 : }
583 :
584 : struct ConcurrencyLimiter {
585 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
586 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
587 : // The helps to ensure we don't exceed the thresholds.
588 : write: Arc<Semaphore>,
589 : read: Arc<Semaphore>,
590 : }
591 :
592 : impl ConcurrencyLimiter {
593 378 : fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
594 378 : match kind {
595 32 : RequestKind::Get => &self.read,
596 155 : RequestKind::Put => &self.write,
597 33 : RequestKind::List => &self.read,
598 149 : RequestKind::Delete => &self.write,
599 3 : RequestKind::Copy => &self.write,
600 6 : RequestKind::TimeTravel => &self.write,
601 : }
602 378 : }
603 :
604 353 : async fn acquire(
605 353 : &self,
606 353 : kind: RequestKind,
607 353 : ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
608 353 : self.for_kind(kind).acquire().await
609 353 : }
610 :
611 25 : async fn acquire_owned(
612 25 : &self,
613 25 : kind: RequestKind,
614 25 : ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
615 25 : Arc::clone(self.for_kind(kind)).acquire_owned().await
616 25 : }
617 :
618 44 : fn new(limit: usize) -> ConcurrencyLimiter {
619 44 : Self {
620 44 : read: Arc::new(Semaphore::new(limit)),
621 44 : write: Arc::new(Semaphore::new(limit)),
622 44 : }
623 44 : }
624 : }
625 :
626 : #[cfg(test)]
627 : mod tests {
628 : use super::*;
629 :
630 : #[test]
631 4 : fn test_object_name() {
632 4 : let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
633 4 : assert_eq!(k.object_name(), Some("c"));
634 :
635 4 : let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
636 4 : assert_eq!(k.object_name(), Some("c"));
637 :
638 4 : let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
639 4 : assert_eq!(k.object_name(), Some("a"));
640 :
641 : // XXX is it impossible to have an empty key?
642 4 : let k = RemotePath::new(Utf8Path::new("")).unwrap();
643 4 : assert_eq!(k.object_name(), None);
644 4 : }
645 :
646 : #[test]
647 4 : fn rempte_path_cannot_be_created_from_absolute_ones() {
648 4 : let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
649 4 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
650 4 : }
651 : }
|