Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //! * [`azure_blob`] allows to use Azure Blob storage as an external storage
8 : //!
9 : #![deny(unsafe_code)]
10 : #![deny(clippy::undocumented_unsafe_blocks)]
11 :
12 : mod azure_blob;
13 : mod config;
14 : mod error;
15 : mod local_fs;
16 : mod metrics;
17 : mod s3_bucket;
18 : mod simulate_failures;
19 : mod support;
20 :
21 : use std::collections::HashMap;
22 : use std::fmt::Debug;
23 : use std::num::NonZeroU32;
24 : use std::ops::Bound;
25 : use std::pin::{Pin, pin};
26 : use std::sync::Arc;
27 : use std::time::SystemTime;
28 :
29 : use anyhow::Context;
30 : /// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
31 : pub use azure_core::Etag;
32 : use bytes::Bytes;
33 : use camino::{Utf8Path, Utf8PathBuf};
34 : pub use config::TypedRemoteStorageKind;
35 : pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
36 : use futures::StreamExt;
37 : use futures::stream::Stream;
38 : use itertools::Itertools as _;
39 : use s3_bucket::RequestKind;
40 : use serde::{Deserialize, Serialize};
41 : use tokio::sync::Semaphore;
42 : use tokio_util::sync::CancellationToken;
43 : use tracing::info;
44 :
45 : pub use self::azure_blob::AzureBlobStorage;
46 : pub use self::local_fs::LocalFs;
47 : pub use self::s3_bucket::S3Bucket;
48 : pub use self::simulate_failures::UnreliableWrapper;
49 : pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
50 :
51 : /// Default concurrency limit for S3 operations
52 : ///
53 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
54 : /// ~200 RPS for IAM services
55 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
56 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
57 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
58 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
59 : /// Set this limit analogously to the S3 limit
60 : ///
61 : /// Here, a limit of max 20k concurrent connections was noted.
62 : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
63 : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
64 : /// Set this limit analogously to the S3 limit.
65 : ///
66 : /// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
67 : /// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
68 : /// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
69 : pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
70 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
71 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
72 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
73 :
74 : /// As defined in S3 docs
75 : ///
76 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
77 : pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
78 :
79 : /// As defined in Azure docs
80 : ///
81 : /// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
82 : pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
83 :
84 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
85 :
86 : /// Path on the remote storage, relative to some inner prefix.
87 : /// The prefix is an implementation detail, that allows representing local paths
88 : /// as the remote ones, stripping the local storage prefix away.
89 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
90 : pub struct RemotePath(Utf8PathBuf);
91 :
92 : impl Serialize for RemotePath {
93 0 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
94 0 : where
95 0 : S: serde::Serializer,
96 : {
97 0 : serializer.collect_str(self)
98 0 : }
99 : }
100 :
101 : impl<'de> Deserialize<'de> for RemotePath {
102 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
103 0 : where
104 0 : D: serde::Deserializer<'de>,
105 : {
106 0 : let str = String::deserialize(deserializer)?;
107 0 : Ok(Self(Utf8PathBuf::from(&str)))
108 0 : }
109 : }
110 :
111 : impl std::fmt::Display for RemotePath {
112 531 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 531 : std::fmt::Display::fmt(&self.0, f)
114 531 : }
115 : }
116 :
117 : impl RemotePath {
118 3288 : pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
119 3288 : anyhow::ensure!(
120 3288 : relative_path.is_relative(),
121 4 : "Path {relative_path:?} is not relative"
122 : );
123 3284 : Ok(Self(relative_path.to_path_buf()))
124 3288 : }
125 :
126 2786 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
127 2786 : Self::new(Utf8Path::new(relative_path))
128 2786 : }
129 :
130 3068 : pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
131 3068 : base_path.join(&self.0)
132 3068 : }
133 :
134 15 : pub fn object_name(&self) -> Option<&str> {
135 15 : self.0.file_name()
136 15 : }
137 :
138 84 : pub fn join(&self, path: impl AsRef<Utf8Path>) -> Self {
139 84 : Self(self.0.join(path))
140 0 : }
141 :
142 1017 : pub fn get_path(&self) -> &Utf8PathBuf {
143 1017 : &self.0
144 1017 : }
145 :
146 50 : pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
147 50 : self.0.strip_prefix(&p.0)
148 50 : }
149 :
150 125 : pub fn add_trailing_slash(&self) -> Self {
151 : // Unwrap safety inputs are guararnteed to be valid UTF-8
152 125 : Self(format!("{}/", self.0).try_into().unwrap())
153 125 : }
154 : }
155 :
156 : /// We don't need callers to be able to pass arbitrary delimiters: just control
157 : /// whether listings will use a '/' separator or not.
158 : ///
159 : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
160 : /// NoDelimiter mode will only populate `keys`.
161 : #[derive(Copy, Clone)]
162 : pub enum ListingMode {
163 : WithDelimiter,
164 : NoDelimiter,
165 : }
166 :
167 : #[derive(PartialEq, Eq, Debug, Clone)]
168 : pub struct ListingObject {
169 : pub key: RemotePath,
170 : pub last_modified: SystemTime,
171 : pub size: u64,
172 : }
173 :
174 : #[derive(Default)]
175 : pub struct Listing {
176 : pub prefixes: Vec<RemotePath>,
177 : pub keys: Vec<ListingObject>,
178 : }
179 :
180 : #[derive(Default)]
181 : pub struct VersionListing {
182 : pub versions: Vec<Version>,
183 : }
184 :
185 : pub struct Version {
186 : pub key: RemotePath,
187 : pub last_modified: SystemTime,
188 : pub kind: VersionKind,
189 : }
190 :
191 : impl Version {
192 36 : pub fn version_id(&self) -> Option<&VersionId> {
193 36 : match &self.kind {
194 28 : VersionKind::Version(id) => Some(id),
195 8 : VersionKind::DeletionMarker => None,
196 : }
197 36 : }
198 : }
199 :
200 : #[derive(Debug)]
201 : pub enum VersionKind {
202 : DeletionMarker,
203 : Version(VersionId),
204 : }
205 :
206 : /// Options for downloads. The default value is a plain GET.
207 : pub struct DownloadOpts {
208 : /// If given, returns [`DownloadError::Unmodified`] if the object still has
209 : /// the same ETag (using If-None-Match).
210 : pub etag: Option<Etag>,
211 : /// The start of the byte range to download, or unbounded.
212 : pub byte_start: Bound<u64>,
213 : /// The end of the byte range to download, or unbounded. Must be after the
214 : /// start bound.
215 : pub byte_end: Bound<u64>,
216 : /// Optionally request a specific version of a key
217 : pub version_id: Option<VersionId>,
218 : /// Indicate whether we're downloading something small or large: this indirectly controls
219 : /// timeouts: for something like an index/manifest/heatmap, we should time out faster than
220 : /// for layer files
221 : pub kind: DownloadKind,
222 : }
223 :
224 : pub enum DownloadKind {
225 : Large,
226 : Small,
227 : }
228 :
229 : #[derive(Debug, Clone)]
230 : pub struct VersionId(pub String);
231 :
232 : impl Default for DownloadOpts {
233 544 : fn default() -> Self {
234 544 : Self {
235 544 : etag: Default::default(),
236 544 : byte_start: Bound::Unbounded,
237 544 : byte_end: Bound::Unbounded,
238 544 : version_id: None,
239 544 : kind: DownloadKind::Large,
240 544 : }
241 544 : }
242 : }
243 :
244 : impl DownloadOpts {
245 : /// Returns the byte range with inclusive start and exclusive end, or None
246 : /// if unbounded.
247 214 : pub fn byte_range(&self) -> Option<(u64, Option<u64>)> {
248 214 : if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded {
249 118 : return None;
250 96 : }
251 96 : let start = match self.byte_start {
252 18 : Bound::Excluded(i) => i + 1,
253 60 : Bound::Included(i) => i,
254 18 : Bound::Unbounded => 0,
255 : };
256 96 : let end = match self.byte_end {
257 48 : Bound::Excluded(i) => Some(i),
258 27 : Bound::Included(i) => Some(i + 1),
259 21 : Bound::Unbounded => None,
260 : };
261 96 : if let Some(end) = end {
262 75 : assert!(start < end, "range end {end} at or before start {start}");
263 21 : }
264 87 : Some((start, end))
265 205 : }
266 :
267 : /// Returns the byte range as an RFC 2616 Range header value with inclusive
268 : /// bounds, or None if unbounded.
269 66 : pub fn byte_range_header(&self) -> Option<String> {
270 66 : self.byte_range()
271 66 : .map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive
272 66 : .map(|(start, end)| match end {
273 30 : Some(end) => format!("bytes={start}-{end}"),
274 10 : None => format!("bytes={start}-"),
275 40 : })
276 66 : }
277 : }
278 :
279 : /// Storage (potentially remote) API to manage its state.
280 : /// This storage tries to be unaware of any layered repository context,
281 : /// providing basic CRUD operations for storage files.
282 : #[allow(async_fn_in_trait)]
283 : pub trait RemoteStorage: Send + Sync + 'static {
284 : /// List objects in remote storage, with semantics matching AWS S3's [`ListObjectsV2`].
285 : ///
286 : /// The stream is guaranteed to return at least one element, even in the case of errors
287 : /// (in that case it's an `Err()`), or an empty `Listing`.
288 : ///
289 : /// The stream is not ending if it returns an error, as long as [`is_permanent`] returns false on the error.
290 : /// The `next` function can be retried, and maybe in a future retry, there will be success.
291 : ///
292 : /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not
293 : /// from the absolute root of the bucket.
294 : ///
295 : /// `mode` configures whether to use a delimiter. Without a delimiter, all keys
296 : /// within the prefix are listed in the `keys` of the result. With a delimiter, any "directories" at the top level of
297 : /// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are
298 : /// returned in `keys` ().
299 : ///
300 : /// `max_keys` controls the maximum number of keys that will be returned. If this is None, this function
301 : /// will iteratively call listobjects until it runs out of keys. Note that this is not safe to use on
302 : /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure.
303 : ///
304 : /// [`ListObjectsV2`]: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
305 : /// [`is_permanent`]: DownloadError::is_permanent
306 : fn list_streaming(
307 : &self,
308 : prefix: Option<&RemotePath>,
309 : mode: ListingMode,
310 : max_keys: Option<NonZeroU32>,
311 : cancel: &CancellationToken,
312 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;
313 :
314 69 : async fn list(
315 69 : &self,
316 69 : prefix: Option<&RemotePath>,
317 69 : mode: ListingMode,
318 69 : max_keys: Option<NonZeroU32>,
319 69 : cancel: &CancellationToken,
320 69 : ) -> Result<Listing, DownloadError> {
321 69 : let mut stream = pin!(self.list_streaming(prefix, mode, max_keys, cancel));
322 69 : let mut combined = stream.next().await.expect("At least one item required")?;
323 117 : while let Some(list) = stream.next().await {
324 48 : let list = list?;
325 48 : combined.keys.extend(list.keys.into_iter());
326 48 : combined.prefixes.extend_from_slice(&list.prefixes);
327 : }
328 69 : Ok(combined)
329 69 : }
330 :
331 : async fn list_versions(
332 : &self,
333 : prefix: Option<&RemotePath>,
334 : mode: ListingMode,
335 : max_keys: Option<NonZeroU32>,
336 : cancel: &CancellationToken,
337 : ) -> Result<VersionListing, DownloadError>;
338 :
339 : /// Obtain metadata information about an object.
340 : async fn head_object(
341 : &self,
342 : key: &RemotePath,
343 : cancel: &CancellationToken,
344 : ) -> Result<ListingObject, DownloadError>;
345 :
346 : /// Streams the local file contents into remote into the remote storage entry.
347 : ///
348 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
349 : /// set to `TimeoutOrCancel`.
350 : async fn upload(
351 : &self,
352 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
353 : // S3 PUT request requires the content length to be specified,
354 : // otherwise it starts to fail with the concurrent connection count increasing.
355 : data_size_bytes: usize,
356 : to: &RemotePath,
357 : metadata: Option<StorageMetadata>,
358 : cancel: &CancellationToken,
359 : ) -> anyhow::Result<()>;
360 :
361 : /// Streams the remote storage entry contents.
362 : ///
363 : /// The returned download stream will obey initial timeout and cancellation signal by erroring
364 : /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
365 : /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
366 : ///
367 : /// Returns the metadata, if any was stored with the file previously.
368 : async fn download(
369 : &self,
370 : from: &RemotePath,
371 : opts: &DownloadOpts,
372 : cancel: &CancellationToken,
373 : ) -> Result<Download, DownloadError>;
374 :
375 : /// Delete a single path from remote storage.
376 : ///
377 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
378 : /// set to `TimeoutOrCancel`. In such situation it is unknown if the deletion went through.
379 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()>;
380 :
381 : /// Delete a multiple paths from remote storage.
382 : ///
383 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
384 : /// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
385 : /// through.
386 : async fn delete_objects(
387 : &self,
388 : paths: &[RemotePath],
389 : cancel: &CancellationToken,
390 : ) -> anyhow::Result<()>;
391 :
392 : /// Returns the maximum number of keys that a call to [`Self::delete_objects`] can delete without chunking
393 : ///
394 : /// The value returned is only an optimization hint, One can pass larger number of objects to
395 : /// `delete_objects` as well.
396 : ///
397 : /// The value is guaranteed to be >= 1.
398 : fn max_keys_per_delete(&self) -> usize;
399 :
400 : /// Deletes all objects matching the given prefix.
401 : ///
402 : /// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
403 : /// delete /a/b, /a/b/*, /a/bc, /a/bc/*, etc.
404 : ///
405 : /// If the operation fails because of timeout or cancellation, the root cause of the error will
406 : /// be set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
407 : /// through.
408 24 : async fn delete_prefix(
409 24 : &self,
410 24 : prefix: &RemotePath,
411 24 : cancel: &CancellationToken,
412 24 : ) -> anyhow::Result<()> {
413 24 : let mut stream =
414 24 : pin!(self.list_streaming(Some(prefix), ListingMode::NoDelimiter, None, cancel));
415 51 : while let Some(result) = stream.next().await {
416 27 : let keys = match result {
417 27 : Ok(listing) if listing.keys.is_empty() => continue,
418 18 : Ok(listing) => listing.keys.into_iter().map(|o| o.key).collect_vec(),
419 0 : Err(DownloadError::Cancelled) => return Err(TimeoutOrCancel::Cancel.into()),
420 0 : Err(DownloadError::Timeout) => return Err(TimeoutOrCancel::Timeout.into()),
421 0 : Err(err) => return Err(err.into()),
422 : };
423 18 : tracing::info!("Deleting {} keys from remote storage", keys.len());
424 18 : self.delete_objects(&keys, cancel).await?;
425 : }
426 24 : Ok(())
427 0 : }
428 :
429 : /// Copy a remote object inside a bucket from one path to another.
430 : async fn copy(
431 : &self,
432 : from: &RemotePath,
433 : to: &RemotePath,
434 : cancel: &CancellationToken,
435 : ) -> anyhow::Result<()>;
436 :
437 : /// Resets the content of everything with the given prefix to the given state
438 : async fn time_travel_recover(
439 : &self,
440 : prefix: Option<&RemotePath>,
441 : timestamp: SystemTime,
442 : done_if_after: SystemTime,
443 : cancel: &CancellationToken,
444 : complexity_limit: Option<NonZeroU32>,
445 : ) -> Result<(), TimeTravelError>;
446 : }
447 :
448 : /// Data part of an ongoing [`Download`].
449 : ///
450 : /// `DownloadStream` is sensitive to the timeout and cancellation used with the original
451 : /// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
452 : /// with `tokio::io::copy_buf`.
453 : // This has 'static because safekeepers do not use cancellation tokens (yet)
454 : pub type DownloadStream =
455 : Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>>;
456 :
457 : pub struct Download {
458 : pub download_stream: DownloadStream,
459 : /// The last time the file was modified (`last-modified` HTTP header)
460 : pub last_modified: SystemTime,
461 : /// A way to identify this specific version of the resource (`etag` HTTP header)
462 : pub etag: Etag,
463 : /// Extra key-value data, associated with the current remote file.
464 : pub metadata: Option<StorageMetadata>,
465 : }
466 :
467 : impl Debug for Download {
468 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 0 : f.debug_struct("Download")
470 0 : .field("metadata", &self.metadata)
471 0 : .finish()
472 0 : }
473 : }
474 :
475 : /// Every storage, currently supported.
476 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
477 : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
478 : #[derive(Clone)]
479 : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
480 : LocalFs(LocalFs),
481 : AwsS3(Arc<S3Bucket>),
482 : AzureBlob(Arc<AzureBlobStorage>),
483 : Unreliable(Other),
484 : }
485 :
486 : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
487 : // See [`RemoteStorage::list`].
488 307 : pub async fn list(
489 307 : &self,
490 307 : prefix: Option<&RemotePath>,
491 307 : mode: ListingMode,
492 307 : max_keys: Option<NonZeroU32>,
493 307 : cancel: &CancellationToken,
494 307 : ) -> Result<Listing, DownloadError> {
495 307 : match self {
496 238 : Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await,
497 50 : Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
498 19 : Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
499 0 : Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
500 : }
501 0 : }
502 :
503 : // See [`RemoteStorage::list_streaming`].
504 3 : pub fn list_streaming<'a>(
505 3 : &'a self,
506 3 : prefix: Option<&'a RemotePath>,
507 3 : mode: ListingMode,
508 3 : max_keys: Option<NonZeroU32>,
509 3 : cancel: &'a CancellationToken,
510 3 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
511 3 : match self {
512 0 : Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
513 0 : as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
514 2 : Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
515 1 : Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
516 0 : Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
517 : }
518 0 : }
519 :
520 : // See [`RemoteStorage::list_versions`].
521 0 : pub async fn list_versions<'a>(
522 0 : &'a self,
523 0 : prefix: Option<&'a RemotePath>,
524 0 : mode: ListingMode,
525 0 : max_keys: Option<NonZeroU32>,
526 0 : cancel: &'a CancellationToken,
527 0 : ) -> Result<VersionListing, DownloadError> {
528 0 : match self {
529 0 : Self::LocalFs(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
530 0 : Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
531 0 : Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
532 0 : Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
533 : }
534 0 : }
535 :
536 : // See [`RemoteStorage::head_object`].
537 9 : pub async fn head_object(
538 9 : &self,
539 9 : key: &RemotePath,
540 9 : cancel: &CancellationToken,
541 9 : ) -> Result<ListingObject, DownloadError> {
542 9 : match self {
543 0 : Self::LocalFs(s) => s.head_object(key, cancel).await,
544 6 : Self::AwsS3(s) => s.head_object(key, cancel).await,
545 3 : Self::AzureBlob(s) => s.head_object(key, cancel).await,
546 0 : Self::Unreliable(s) => s.head_object(key, cancel).await,
547 : }
548 0 : }
549 :
550 : /// See [`RemoteStorage::upload`]
551 2170 : pub async fn upload(
552 2170 : &self,
553 2170 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
554 2170 : data_size_bytes: usize,
555 2170 : to: &RemotePath,
556 2170 : metadata: Option<StorageMetadata>,
557 2170 : cancel: &CancellationToken,
558 2170 : ) -> anyhow::Result<()> {
559 2170 : match self {
560 1855 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
561 198 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
562 93 : Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
563 24 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
564 : }
565 0 : }
566 :
567 : /// See [`RemoteStorage::download`]
568 476 : pub async fn download(
569 476 : &self,
570 476 : from: &RemotePath,
571 476 : opts: &DownloadOpts,
572 476 : cancel: &CancellationToken,
573 476 : ) -> Result<Download, DownloadError> {
574 476 : match self {
575 432 : Self::LocalFs(s) => s.download(from, opts, cancel).await,
576 33 : Self::AwsS3(s) => s.download(from, opts, cancel).await,
577 11 : Self::AzureBlob(s) => s.download(from, opts, cancel).await,
578 0 : Self::Unreliable(s) => s.download(from, opts, cancel).await,
579 : }
580 0 : }
581 :
582 : /// See [`RemoteStorage::delete`]
583 471 : pub async fn delete(
584 471 : &self,
585 471 : path: &RemotePath,
586 471 : cancel: &CancellationToken,
587 471 : ) -> anyhow::Result<()> {
588 471 : match self {
589 211 : Self::LocalFs(s) => s.delete(path, cancel).await,
590 174 : Self::AwsS3(s) => s.delete(path, cancel).await,
591 86 : Self::AzureBlob(s) => s.delete(path, cancel).await,
592 0 : Self::Unreliable(s) => s.delete(path, cancel).await,
593 : }
594 0 : }
595 :
596 : /// See [`RemoteStorage::delete_objects`]
597 18 : pub async fn delete_objects(
598 18 : &self,
599 18 : paths: &[RemotePath],
600 18 : cancel: &CancellationToken,
601 18 : ) -> anyhow::Result<()> {
602 18 : match self {
603 3 : Self::LocalFs(s) => s.delete_objects(paths, cancel).await,
604 12 : Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
605 3 : Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
606 0 : Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
607 : }
608 0 : }
609 :
610 : /// [`RemoteStorage::max_keys_per_delete`]
611 4 : pub fn max_keys_per_delete(&self) -> usize {
612 4 : match self {
613 4 : Self::LocalFs(s) => s.max_keys_per_delete(),
614 0 : Self::AwsS3(s) => s.max_keys_per_delete(),
615 0 : Self::AzureBlob(s) => s.max_keys_per_delete(),
616 0 : Self::Unreliable(s) => s.max_keys_per_delete(),
617 : }
618 0 : }
619 :
620 : /// See [`RemoteStorage::delete_prefix`]
621 24 : pub async fn delete_prefix(
622 24 : &self,
623 24 : prefix: &RemotePath,
624 24 : cancel: &CancellationToken,
625 24 : ) -> anyhow::Result<()> {
626 24 : match self {
627 6 : Self::LocalFs(s) => s.delete_prefix(prefix, cancel).await,
628 12 : Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
629 6 : Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
630 0 : Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
631 : }
632 0 : }
633 :
634 : /// See [`RemoteStorage::copy`]
635 3 : pub async fn copy_object(
636 3 : &self,
637 3 : from: &RemotePath,
638 3 : to: &RemotePath,
639 3 : cancel: &CancellationToken,
640 3 : ) -> anyhow::Result<()> {
641 3 : match self {
642 0 : Self::LocalFs(s) => s.copy(from, to, cancel).await,
643 2 : Self::AwsS3(s) => s.copy(from, to, cancel).await,
644 1 : Self::AzureBlob(s) => s.copy(from, to, cancel).await,
645 0 : Self::Unreliable(s) => s.copy(from, to, cancel).await,
646 : }
647 0 : }
648 :
649 : /// See [`RemoteStorage::time_travel_recover`].
650 6 : pub async fn time_travel_recover(
651 6 : &self,
652 6 : prefix: Option<&RemotePath>,
653 6 : timestamp: SystemTime,
654 6 : done_if_after: SystemTime,
655 6 : cancel: &CancellationToken,
656 6 : complexity_limit: Option<NonZeroU32>,
657 6 : ) -> Result<(), TimeTravelError> {
658 6 : match self {
659 0 : Self::LocalFs(s) => {
660 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
661 0 : .await
662 : }
663 6 : Self::AwsS3(s) => {
664 6 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
665 6 : .await
666 : }
667 0 : Self::AzureBlob(s) => {
668 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
669 0 : .await
670 : }
671 0 : Self::Unreliable(s) => {
672 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
673 0 : .await
674 : }
675 : }
676 0 : }
677 : }
678 :
679 : impl GenericRemoteStorage {
680 0 : pub async fn from_storage_kind(kind: TypedRemoteStorageKind) -> anyhow::Result<Self> {
681 0 : Self::from_config(&RemoteStorageConfig {
682 0 : storage: kind.into(),
683 0 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
684 0 : small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
685 0 : })
686 0 : .await
687 0 : }
688 :
689 163 : pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
690 163 : let timeout = storage_config.timeout;
691 :
692 : // If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
693 163 : let small_timeout = std::cmp::min(storage_config.small_timeout, timeout);
694 :
695 163 : Ok(match &storage_config.storage {
696 127 : RemoteStorageKind::LocalFs { local_path: path } => {
697 127 : info!("Using fs root '{path}' as a remote storage");
698 127 : Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
699 : }
700 26 : RemoteStorageKind::AwsS3(s3_config) => {
701 : // The profile and access key id are only printed here for debugging purposes,
702 : // their values don't indicate the eventually taken choice for auth.
703 26 : let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
704 26 : let access_key_id =
705 26 : std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
706 26 : info!(
707 0 : "Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
708 : s3_config.bucket_name,
709 : s3_config.bucket_region,
710 : s3_config.prefix_in_bucket,
711 : s3_config.endpoint
712 : );
713 26 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?))
714 : }
715 10 : RemoteStorageKind::AzureContainer(azure_config) => {
716 10 : let storage_account = azure_config
717 10 : .storage_account
718 10 : .as_deref()
719 10 : .unwrap_or("<AZURE_STORAGE_ACCOUNT>");
720 10 : info!(
721 0 : "Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
722 : azure_config.container_name,
723 : azure_config.container_region,
724 : azure_config.prefix_in_container
725 : );
726 10 : Self::AzureBlob(Arc::new(AzureBlobStorage::new(
727 10 : azure_config,
728 10 : timeout,
729 10 : small_timeout,
730 0 : )?))
731 : }
732 : })
733 163 : }
734 :
735 : /* BEGIN_HADRON */
736 2 : pub fn unreliable_wrapper(s: Self, fail_first: u64, fail_probability: u64) -> Self {
737 2 : Self::Unreliable(Arc::new(UnreliableWrapper::new(
738 2 : s,
739 2 : fail_first,
740 2 : fail_probability,
741 2 : )))
742 2 : }
743 : /* END_HADRON */
744 :
745 : /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
746 891 : pub async fn upload_storage_object(
747 891 : &self,
748 891 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
749 891 : from_size_bytes: usize,
750 891 : to: &RemotePath,
751 891 : cancel: &CancellationToken,
752 891 : ) -> anyhow::Result<()> {
753 891 : self.upload(from, from_size_bytes, to, None, cancel)
754 891 : .await
755 887 : .with_context(|| {
756 0 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
757 0 : })
758 0 : }
759 :
760 : /// The name of the bucket/container/etc.
761 0 : pub fn bucket_name(&self) -> Option<&str> {
762 0 : match self {
763 0 : Self::LocalFs(_s) => None,
764 0 : Self::AwsS3(s) => Some(s.bucket_name()),
765 0 : Self::AzureBlob(s) => Some(s.container_name()),
766 0 : Self::Unreliable(_s) => None,
767 : }
768 0 : }
769 : }
770 :
771 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
772 : /// Immutable, cannot be changed once the file is created.
773 : #[derive(Debug, Clone, PartialEq, Eq)]
774 : pub struct StorageMetadata(HashMap<String, String>);
775 :
776 : impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
777 114 : fn from(arr: [(&str, &str); N]) -> Self {
778 114 : let map: HashMap<String, String> = arr
779 114 : .iter()
780 114 : .map(|(k, v)| (k.to_string(), v.to_string()))
781 114 : .collect();
782 114 : Self(map)
783 0 : }
784 : }
785 :
786 : struct ConcurrencyLimiter {
787 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
788 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
789 : // The helps to ensure we don't exceed the thresholds.
790 : write: Arc<Semaphore>,
791 : read: Arc<Semaphore>,
792 : }
793 :
794 : impl ConcurrencyLimiter {
795 730 : fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
796 730 : match kind {
797 44 : RequestKind::Get => &self.read,
798 291 : RequestKind::Put => &self.write,
799 64 : RequestKind::List => &self.read,
800 287 : RequestKind::Delete => &self.write,
801 3 : RequestKind::Copy => &self.write,
802 6 : RequestKind::TimeTravel => &self.write,
803 9 : RequestKind::Head => &self.read,
804 26 : RequestKind::ListVersions => &self.read,
805 : }
806 730 : }
807 :
808 697 : async fn acquire(
809 697 : &self,
810 697 : kind: RequestKind,
811 697 : ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
812 697 : self.for_kind(kind).acquire().await
813 697 : }
814 :
815 33 : async fn acquire_owned(
816 33 : &self,
817 33 : kind: RequestKind,
818 33 : ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
819 33 : Arc::clone(self.for_kind(kind)).acquire_owned().await
820 33 : }
821 :
822 51 : fn new(limit: usize) -> ConcurrencyLimiter {
823 51 : Self {
824 51 : read: Arc::new(Semaphore::new(limit)),
825 51 : write: Arc::new(Semaphore::new(limit)),
826 51 : }
827 51 : }
828 : }
829 :
830 : #[cfg(test)]
831 : mod tests {
832 : use super::*;
833 :
834 : /// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges
835 : /// with optional end bound, or None when unbounded.
836 : #[test]
837 3 : fn download_opts_byte_range() {
838 : // Consider using test_case or a similar table-driven test framework.
839 3 : let cases = [
840 3 : // (byte_start, byte_end, expected)
841 3 : (Bound::Unbounded, Bound::Unbounded, None),
842 3 : (Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))),
843 3 : (Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))),
844 3 : (Bound::Included(3), Bound::Unbounded, Some((3, None))),
845 3 : (Bound::Included(3), Bound::Included(7), Some((3, Some(8)))),
846 3 : (Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))),
847 3 : (Bound::Excluded(3), Bound::Unbounded, Some((4, None))),
848 3 : (Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))),
849 3 : (Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))),
850 3 : // 1-sized ranges are fine, 0 aren't and will panic (separate test).
851 3 : (Bound::Included(3), Bound::Included(3), Some((3, Some(4)))),
852 3 : (Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))),
853 3 : ];
854 :
855 36 : for (byte_start, byte_end, expect) in cases {
856 33 : let opts = DownloadOpts {
857 33 : byte_start,
858 33 : byte_end,
859 33 : ..Default::default()
860 33 : };
861 33 : let result = opts.byte_range();
862 33 : assert_eq!(
863 : result, expect,
864 0 : "byte_start={byte_start:?} byte_end={byte_end:?}"
865 : );
866 :
867 : // Check generated HTTP header, which uses an inclusive range.
868 33 : let expect_header = expect.map(|(start, end)| match end {
869 24 : Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end
870 6 : None => format!("bytes={start}-"),
871 30 : });
872 33 : assert_eq!(
873 33 : opts.byte_range_header(),
874 : expect_header,
875 0 : "byte_start={byte_start:?} byte_end={byte_end:?}"
876 : );
877 : }
878 3 : }
879 :
880 : /// DownloadOpts::byte_range() zero-sized byte range should panic.
881 : #[test]
882 : #[should_panic]
883 3 : fn download_opts_byte_range_zero() {
884 3 : DownloadOpts {
885 3 : byte_start: Bound::Included(3),
886 3 : byte_end: Bound::Excluded(3),
887 3 : ..Default::default()
888 3 : }
889 3 : .byte_range();
890 3 : }
891 :
892 : /// DownloadOpts::byte_range() negative byte range should panic.
893 : #[test]
894 : #[should_panic]
895 3 : fn download_opts_byte_range_negative() {
896 3 : DownloadOpts {
897 3 : byte_start: Bound::Included(3),
898 3 : byte_end: Bound::Included(2),
899 3 : ..Default::default()
900 3 : }
901 3 : .byte_range();
902 3 : }
903 :
904 : #[test]
905 3 : fn test_object_name() {
906 3 : let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
907 3 : assert_eq!(k.object_name(), Some("c"));
908 :
909 3 : let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
910 3 : assert_eq!(k.object_name(), Some("c"));
911 :
912 3 : let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
913 3 : assert_eq!(k.object_name(), Some("a"));
914 :
915 : // XXX is it impossible to have an empty key?
916 3 : let k = RemotePath::new(Utf8Path::new("")).unwrap();
917 3 : assert_eq!(k.object_name(), None);
918 3 : }
919 :
920 : #[test]
921 3 : fn rempte_path_cannot_be_created_from_absolute_ones() {
922 3 : let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
923 3 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
924 3 : }
925 : }
|