Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //! * [`azure_blob`] allows to use Azure Blob storage as an external storage
8 : //!
9 : #![deny(unsafe_code)]
10 : #![deny(clippy::undocumented_unsafe_blocks)]
11 :
12 : mod azure_blob;
13 : mod config;
14 : mod error;
15 : mod local_fs;
16 : mod metrics;
17 : mod s3_bucket;
18 : mod simulate_failures;
19 : mod support;
20 :
21 : use std::{
22 : collections::HashMap, fmt::Debug, num::NonZeroU32, pin::Pin, sync::Arc, time::SystemTime,
23 : };
24 :
25 : use anyhow::Context;
26 : use camino::{Utf8Path, Utf8PathBuf};
27 :
28 : use bytes::Bytes;
29 : use futures::{stream::Stream, StreamExt};
30 : use serde::{Deserialize, Serialize};
31 : use tokio::sync::Semaphore;
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::info;
34 :
35 : pub use self::{
36 : azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket,
37 : simulate_failures::UnreliableWrapper,
38 : };
39 : use s3_bucket::RequestKind;
40 :
41 : pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
42 :
43 : /// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
44 : pub use azure_core::Etag;
45 :
46 : pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
47 :
48 : /// Default concurrency limit for S3 operations
49 : ///
50 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
51 : /// ~200 RPS for IAM services
52 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
53 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
54 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
55 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
56 : /// Set this limit analogously to the S3 limit
57 : ///
58 : /// Here, a limit of max 20k concurrent connections was noted.
59 : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
60 : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
61 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
62 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
63 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
64 :
65 : /// As defined in S3 docs
66 : pub const MAX_KEYS_PER_DELETE: usize = 1000;
67 :
68 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
69 :
70 : /// Path on the remote storage, relative to some inner prefix.
71 : /// The prefix is an implementation detail, that allows representing local paths
72 : /// as the remote ones, stripping the local storage prefix away.
73 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
74 : pub struct RemotePath(Utf8PathBuf);
75 :
76 : impl Serialize for RemotePath {
77 0 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
78 0 : where
79 0 : S: serde::Serializer,
80 0 : {
81 0 : serializer.collect_str(self)
82 0 : }
83 : }
84 :
85 : impl<'de> Deserialize<'de> for RemotePath {
86 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
87 0 : where
88 0 : D: serde::Deserializer<'de>,
89 0 : {
90 0 : let str = String::deserialize(deserializer)?;
91 0 : Ok(Self(Utf8PathBuf::from(&str)))
92 0 : }
93 : }
94 :
95 : impl std::fmt::Display for RemotePath {
96 720 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 720 : std::fmt::Display::fmt(&self.0, f)
98 720 : }
99 : }
100 :
101 : impl RemotePath {
102 9691 : pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
103 9691 : anyhow::ensure!(
104 9691 : relative_path.is_relative(),
105 4 : "Path {relative_path:?} is not relative"
106 : );
107 9687 : Ok(Self(relative_path.to_path_buf()))
108 9691 : }
109 :
110 9327 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
111 9327 : Self::new(Utf8Path::new(relative_path))
112 9327 : }
113 :
114 9414 : pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
115 9414 : base_path.join(&self.0)
116 9414 : }
117 :
118 30 : pub fn object_name(&self) -> Option<&str> {
119 30 : self.0.file_name()
120 30 : }
121 :
122 171 : pub fn join(&self, path: impl AsRef<Utf8Path>) -> Self {
123 171 : Self(self.0.join(path))
124 171 : }
125 :
126 598 : pub fn get_path(&self) -> &Utf8PathBuf {
127 598 : &self.0
128 598 : }
129 :
130 0 : pub fn extension(&self) -> Option<&str> {
131 0 : self.0.extension()
132 0 : }
133 :
134 135 : pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
135 135 : self.0.strip_prefix(&p.0)
136 135 : }
137 :
138 570 : pub fn add_trailing_slash(&self) -> Self {
139 570 : // Unwrap safety inputs are guararnteed to be valid UTF-8
140 570 : Self(format!("{}/", self.0).try_into().unwrap())
141 570 : }
142 : }
143 :
144 : /// We don't need callers to be able to pass arbitrary delimiters: just control
145 : /// whether listings will use a '/' separator or not.
146 : ///
147 : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
148 : /// NoDelimiter mode will only populate `keys`.
149 : #[derive(Copy, Clone)]
150 : pub enum ListingMode {
151 : WithDelimiter,
152 : NoDelimiter,
153 : }
154 :
155 : #[derive(PartialEq, Eq, Debug, Clone)]
156 : pub struct ListingObject {
157 : pub key: RemotePath,
158 : pub last_modified: SystemTime,
159 : pub size: u64,
160 : }
161 :
162 : #[derive(Default)]
163 : pub struct Listing {
164 : pub prefixes: Vec<RemotePath>,
165 : pub keys: Vec<ListingObject>,
166 : }
167 :
168 : /// Storage (potentially remote) API to manage its state.
169 : /// This storage tries to be unaware of any layered repository context,
170 : /// providing basic CRUD operations for storage files.
171 : #[allow(async_fn_in_trait)]
172 : pub trait RemoteStorage: Send + Sync + 'static {
173 : /// List objects in remote storage, with semantics matching AWS S3's [`ListObjectsV2`].
174 : ///
175 : /// The stream is guaranteed to return at least one element, even in the case of errors
176 : /// (in that case it's an `Err()`), or an empty `Listing`.
177 : ///
178 : /// The stream is not ending if it returns an error, as long as [`is_permanent`] returns false on the error.
179 : /// The `next` function can be retried, and maybe in a future retry, there will be success.
180 : ///
181 : /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not
182 : /// from the absolute root of the bucket.
183 : ///
184 : /// `mode` configures whether to use a delimiter. Without a delimiter, all keys
185 : /// within the prefix are listed in the `keys` of the result. With a delimiter, any "directories" at the top level of
186 : /// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are
187 : /// returned in `keys` ().
188 : ///
189 : /// `max_keys` controls the maximum number of keys that will be returned. If this is None, this function
190 : /// will iteratively call listobjects until it runs out of keys. Note that this is not safe to use on
191 : /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure.
192 : ///
193 : /// [`ListObjectsV2`]: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
194 : /// [`is_permanent`]: DownloadError::is_permanent
195 : fn list_streaming(
196 : &self,
197 : prefix: Option<&RemotePath>,
198 : mode: ListingMode,
199 : max_keys: Option<NonZeroU32>,
200 : cancel: &CancellationToken,
201 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;
202 :
203 30 : async fn list(
204 30 : &self,
205 30 : prefix: Option<&RemotePath>,
206 30 : mode: ListingMode,
207 30 : max_keys: Option<NonZeroU32>,
208 30 : cancel: &CancellationToken,
209 30 : ) -> Result<Listing, DownloadError> {
210 30 : let mut stream = std::pin::pin!(self.list_streaming(prefix, mode, max_keys, cancel));
211 126 : let mut combined = stream.next().await.expect("At least one item required")?;
212 42 : while let Some(list) = stream.next().await {
213 12 : let list = list?;
214 12 : combined.keys.extend(list.keys.into_iter());
215 12 : combined.prefixes.extend_from_slice(&list.prefixes);
216 : }
217 30 : Ok(combined)
218 30 : }
219 :
220 : /// Obtain metadata information about an object.
221 : async fn head_object(
222 : &self,
223 : key: &RemotePath,
224 : cancel: &CancellationToken,
225 : ) -> Result<ListingObject, DownloadError>;
226 :
227 : /// Streams the local file contents into remote into the remote storage entry.
228 : ///
229 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
230 : /// set to `TimeoutOrCancel`.
231 : async fn upload(
232 : &self,
233 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
234 : // S3 PUT request requires the content length to be specified,
235 : // otherwise it starts to fail with the concurrent connection count increasing.
236 : data_size_bytes: usize,
237 : to: &RemotePath,
238 : metadata: Option<StorageMetadata>,
239 : cancel: &CancellationToken,
240 : ) -> anyhow::Result<()>;
241 :
242 : /// Streams the remote storage entry contents.
243 : ///
244 : /// The returned download stream will obey initial timeout and cancellation signal by erroring
245 : /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
246 : /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
247 : ///
248 : /// Returns the metadata, if any was stored with the file previously.
249 : async fn download(
250 : &self,
251 : from: &RemotePath,
252 : cancel: &CancellationToken,
253 : ) -> Result<Download, DownloadError>;
254 :
255 : /// Streams a given byte range of the remote storage entry contents.
256 : ///
257 : /// The returned download stream will obey initial timeout and cancellation signal by erroring
258 : /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
259 : /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
260 : ///
261 : /// Returns the metadata, if any was stored with the file previously.
262 : async fn download_byte_range(
263 : &self,
264 : from: &RemotePath,
265 : start_inclusive: u64,
266 : end_exclusive: Option<u64>,
267 : cancel: &CancellationToken,
268 : ) -> Result<Download, DownloadError>;
269 :
270 : /// Delete a single path from remote storage.
271 : ///
272 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
273 : /// set to `TimeoutOrCancel`. In such situation it is unknown if the deletion went through.
274 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()>;
275 :
276 : /// Delete a multiple paths from remote storage.
277 : ///
278 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
279 : /// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
280 : /// through.
281 : async fn delete_objects<'a>(
282 : &self,
283 : paths: &'a [RemotePath],
284 : cancel: &CancellationToken,
285 : ) -> anyhow::Result<()>;
286 :
287 : /// Copy a remote object inside a bucket from one path to another.
288 : async fn copy(
289 : &self,
290 : from: &RemotePath,
291 : to: &RemotePath,
292 : cancel: &CancellationToken,
293 : ) -> anyhow::Result<()>;
294 :
295 : /// Resets the content of everything with the given prefix to the given state
296 : async fn time_travel_recover(
297 : &self,
298 : prefix: Option<&RemotePath>,
299 : timestamp: SystemTime,
300 : done_if_after: SystemTime,
301 : cancel: &CancellationToken,
302 : ) -> Result<(), TimeTravelError>;
303 : }
304 :
305 : /// Data part of an ongoing [`Download`].
306 : ///
307 : /// `DownloadStream` is sensitive to the timeout and cancellation used with the original
308 : /// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
309 : /// with `tokio::io::copy_buf`.
310 : // This has 'static because safekeepers do not use cancellation tokens (yet)
311 : pub type DownloadStream =
312 : Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>>;
313 :
314 : pub struct Download {
315 : pub download_stream: DownloadStream,
316 : /// The last time the file was modified (`last-modified` HTTP header)
317 : pub last_modified: SystemTime,
318 : /// A way to identify this specific version of the resource (`etag` HTTP header)
319 : pub etag: Etag,
320 : /// Extra key-value data, associated with the current remote file.
321 : pub metadata: Option<StorageMetadata>,
322 : }
323 :
324 : impl Debug for Download {
325 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326 0 : f.debug_struct("Download")
327 0 : .field("metadata", &self.metadata)
328 0 : .finish()
329 0 : }
330 : }
331 :
332 : /// Every storage, currently supported.
333 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
334 : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
335 : #[derive(Clone)]
336 : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
337 : LocalFs(LocalFs),
338 : AwsS3(Arc<S3Bucket>),
339 : AzureBlob(Arc<AzureBlobStorage>),
340 : Unreliable(Other),
341 : }
342 :
343 : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
344 : // See [`RemoteStorage::list`].
345 612 : pub async fn list(
346 612 : &self,
347 612 : prefix: Option<&RemotePath>,
348 612 : mode: ListingMode,
349 612 : max_keys: Option<NonZeroU32>,
350 612 : cancel: &CancellationToken,
351 612 : ) -> Result<Listing, DownloadError> {
352 612 : match self {
353 2235 : Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await,
354 104 : Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
355 63 : Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
356 0 : Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
357 : }
358 612 : }
359 :
360 : // See [`RemoteStorage::list_streaming`].
361 3 : pub fn list_streaming<'a>(
362 3 : &'a self,
363 3 : prefix: Option<&'a RemotePath>,
364 3 : mode: ListingMode,
365 3 : max_keys: Option<NonZeroU32>,
366 3 : cancel: &'a CancellationToken,
367 3 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
368 3 : match self {
369 0 : Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
370 0 : as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
371 2 : Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
372 1 : Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
373 0 : Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
374 : }
375 3 : }
376 :
377 : // See [`RemoteStorage::head_object`].
378 0 : pub async fn head_object(
379 0 : &self,
380 0 : key: &RemotePath,
381 0 : cancel: &CancellationToken,
382 0 : ) -> Result<ListingObject, DownloadError> {
383 0 : match self {
384 0 : Self::LocalFs(s) => s.head_object(key, cancel).await,
385 0 : Self::AwsS3(s) => s.head_object(key, cancel).await,
386 0 : Self::AzureBlob(s) => s.head_object(key, cancel).await,
387 0 : Self::Unreliable(s) => s.head_object(key, cancel).await,
388 : }
389 0 : }
390 :
391 : /// See [`RemoteStorage::upload`]
392 8406 : pub async fn upload(
393 8406 : &self,
394 8406 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
395 8406 : data_size_bytes: usize,
396 8406 : to: &RemotePath,
397 8406 : metadata: Option<StorageMetadata>,
398 8406 : cancel: &CancellationToken,
399 8406 : ) -> anyhow::Result<()> {
400 8406 : match self {
401 97647 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
402 516 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
403 280 : Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
404 39 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
405 : }
406 8168 : }
407 :
408 142 : pub async fn download(
409 142 : &self,
410 142 : from: &RemotePath,
411 142 : cancel: &CancellationToken,
412 142 : ) -> Result<Download, DownloadError> {
413 142 : match self {
414 203 : Self::LocalFs(s) => s.download(from, cancel).await,
415 19 : Self::AwsS3(s) => s.download(from, cancel).await,
416 10 : Self::AzureBlob(s) => s.download(from, cancel).await,
417 0 : Self::Unreliable(s) => s.download(from, cancel).await,
418 : }
419 142 : }
420 :
421 15 : pub async fn download_byte_range(
422 15 : &self,
423 15 : from: &RemotePath,
424 15 : start_inclusive: u64,
425 15 : end_exclusive: Option<u64>,
426 15 : cancel: &CancellationToken,
427 15 : ) -> Result<Download, DownloadError> {
428 15 : match self {
429 0 : Self::LocalFs(s) => {
430 0 : s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
431 0 : .await
432 : }
433 10 : Self::AwsS3(s) => {
434 10 : s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
435 10 : .await
436 : }
437 5 : Self::AzureBlob(s) => {
438 5 : s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
439 25 : .await
440 : }
441 0 : Self::Unreliable(s) => {
442 0 : s.download_byte_range(from, start_inclusive, end_exclusive, cancel)
443 0 : .await
444 : }
445 : }
446 15 : }
447 :
448 : /// See [`RemoteStorage::delete`]
449 140 : pub async fn delete(
450 140 : &self,
451 140 : path: &RemotePath,
452 140 : cancel: &CancellationToken,
453 140 : ) -> anyhow::Result<()> {
454 140 : match self {
455 6 : Self::LocalFs(s) => s.delete(path, cancel).await,
456 278 : Self::AwsS3(s) => s.delete(path, cancel).await,
457 221 : Self::AzureBlob(s) => s.delete(path, cancel).await,
458 0 : Self::Unreliable(s) => s.delete(path, cancel).await,
459 : }
460 140 : }
461 :
462 : /// See [`RemoteStorage::delete_objects`]
463 33 : pub async fn delete_objects(
464 33 : &self,
465 33 : paths: &[RemotePath],
466 33 : cancel: &CancellationToken,
467 33 : ) -> anyhow::Result<()> {
468 33 : match self {
469 18 : Self::LocalFs(s) => s.delete_objects(paths, cancel).await,
470 51 : Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
471 25 : Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
472 0 : Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
473 : }
474 33 : }
475 :
476 : /// See [`RemoteStorage::copy`]
477 3 : pub async fn copy_object(
478 3 : &self,
479 3 : from: &RemotePath,
480 3 : to: &RemotePath,
481 3 : cancel: &CancellationToken,
482 3 : ) -> anyhow::Result<()> {
483 3 : match self {
484 0 : Self::LocalFs(s) => s.copy(from, to, cancel).await,
485 2 : Self::AwsS3(s) => s.copy(from, to, cancel).await,
486 5 : Self::AzureBlob(s) => s.copy(from, to, cancel).await,
487 0 : Self::Unreliable(s) => s.copy(from, to, cancel).await,
488 : }
489 3 : }
490 :
491 : /// See [`RemoteStorage::time_travel_recover`].
492 6 : pub async fn time_travel_recover(
493 6 : &self,
494 6 : prefix: Option<&RemotePath>,
495 6 : timestamp: SystemTime,
496 6 : done_if_after: SystemTime,
497 6 : cancel: &CancellationToken,
498 6 : ) -> Result<(), TimeTravelError> {
499 6 : match self {
500 0 : Self::LocalFs(s) => {
501 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
502 0 : .await
503 : }
504 6 : Self::AwsS3(s) => {
505 6 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
506 68 : .await
507 : }
508 0 : Self::AzureBlob(s) => {
509 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
510 0 : .await
511 : }
512 0 : Self::Unreliable(s) => {
513 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
514 0 : .await
515 : }
516 : }
517 6 : }
518 : }
519 :
520 : impl GenericRemoteStorage {
521 610 : pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
522 610 : let timeout = storage_config.timeout;
523 610 : Ok(match &storage_config.storage {
524 586 : RemoteStorageKind::LocalFs { local_path: path } => {
525 586 : info!("Using fs root '{path}' as a remote storage");
526 586 : Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
527 : }
528 18 : RemoteStorageKind::AwsS3(s3_config) => {
529 18 : // The profile and access key id are only printed here for debugging purposes,
530 18 : // their values don't indicate the eventually taken choice for auth.
531 18 : let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
532 18 : let access_key_id =
533 18 : std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
534 18 : info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
535 : s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
536 18 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?))
537 : }
538 6 : RemoteStorageKind::AzureContainer(azure_config) => {
539 6 : let storage_account = azure_config
540 6 : .storage_account
541 6 : .as_deref()
542 6 : .unwrap_or("<AZURE_STORAGE_ACCOUNT>");
543 6 : info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
544 : azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
545 6 : Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?))
546 : }
547 : })
548 610 : }
549 :
550 2 : pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
551 2 : Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
552 2 : }
553 :
554 : /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
555 4230 : pub async fn upload_storage_object(
556 4230 : &self,
557 4230 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
558 4230 : from_size_bytes: usize,
559 4230 : to: &RemotePath,
560 4230 : cancel: &CancellationToken,
561 4230 : ) -> anyhow::Result<()> {
562 4230 : self.upload(from, from_size_bytes, to, None, cancel)
563 12621 : .await
564 4206 : .with_context(|| {
565 0 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
566 4206 : })
567 4206 : }
568 :
569 : /// Downloads the storage object into the `to_path` provided.
570 : /// `byte_range` could be specified to dowload only a part of the file, if needed.
571 0 : pub async fn download_storage_object(
572 0 : &self,
573 0 : byte_range: Option<(u64, Option<u64>)>,
574 0 : from: &RemotePath,
575 0 : cancel: &CancellationToken,
576 0 : ) -> Result<Download, DownloadError> {
577 0 : match byte_range {
578 0 : Some((start, end)) => self.download_byte_range(from, start, end, cancel).await,
579 0 : None => self.download(from, cancel).await,
580 : }
581 0 : }
582 :
583 : /// The name of the bucket/container/etc.
584 0 : pub fn bucket_name(&self) -> Option<&str> {
585 0 : match self {
586 0 : Self::LocalFs(_s) => None,
587 0 : Self::AwsS3(s) => Some(s.bucket_name()),
588 0 : Self::AzureBlob(s) => Some(s.container_name()),
589 0 : Self::Unreliable(_s) => None,
590 : }
591 0 : }
592 : }
593 :
594 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
595 : /// Immutable, cannot be changed once the file is created.
596 : #[derive(Debug, Clone, PartialEq, Eq)]
597 : pub struct StorageMetadata(HashMap<String, String>);
598 :
599 : impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
600 0 : fn from(arr: [(&str, &str); N]) -> Self {
601 0 : let map: HashMap<String, String> = arr
602 0 : .iter()
603 0 : .map(|(k, v)| (k.to_string(), v.to_string()))
604 0 : .collect();
605 0 : Self(map)
606 0 : }
607 : }
608 :
609 : struct ConcurrencyLimiter {
610 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
611 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
612 : // The helps to ensure we don't exceed the thresholds.
613 : write: Arc<Semaphore>,
614 : read: Arc<Semaphore>,
615 : }
616 :
617 : impl ConcurrencyLimiter {
618 375 : fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
619 375 : match kind {
620 31 : RequestKind::Get => &self.read,
621 153 : RequestKind::Put => &self.write,
622 33 : RequestKind::List => &self.read,
623 149 : RequestKind::Delete => &self.write,
624 3 : RequestKind::Copy => &self.write,
625 6 : RequestKind::TimeTravel => &self.write,
626 0 : RequestKind::Head => &self.read,
627 : }
628 375 : }
629 :
630 351 : async fn acquire(
631 351 : &self,
632 351 : kind: RequestKind,
633 351 : ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
634 351 : self.for_kind(kind).acquire().await
635 351 : }
636 :
637 24 : async fn acquire_owned(
638 24 : &self,
639 24 : kind: RequestKind,
640 24 : ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
641 24 : Arc::clone(self.for_kind(kind)).acquire_owned().await
642 24 : }
643 :
644 39 : fn new(limit: usize) -> ConcurrencyLimiter {
645 39 : Self {
646 39 : read: Arc::new(Semaphore::new(limit)),
647 39 : write: Arc::new(Semaphore::new(limit)),
648 39 : }
649 39 : }
650 : }
651 :
652 : #[cfg(test)]
653 : mod tests {
654 : use super::*;
655 :
656 : #[test]
657 3 : fn test_object_name() {
658 3 : let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
659 3 : assert_eq!(k.object_name(), Some("c"));
660 :
661 3 : let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
662 3 : assert_eq!(k.object_name(), Some("c"));
663 :
664 3 : let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
665 3 : assert_eq!(k.object_name(), Some("a"));
666 :
667 : // XXX is it impossible to have an empty key?
668 3 : let k = RemotePath::new(Utf8Path::new("")).unwrap();
669 3 : assert_eq!(k.object_name(), None);
670 3 : }
671 :
672 : #[test]
673 3 : fn rempte_path_cannot_be_created_from_absolute_ones() {
674 3 : let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
675 3 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
676 3 : }
677 : }
|