Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //! * [`azure_blob`] allows to use Azure Blob storage as an external storage
8 : //!
9 : #![deny(unsafe_code)]
10 : #![deny(clippy::undocumented_unsafe_blocks)]
11 :
12 : mod azure_blob;
13 : mod config;
14 : mod error;
15 : mod local_fs;
16 : mod metrics;
17 : mod s3_bucket;
18 : mod simulate_failures;
19 : mod support;
20 :
21 : use std::collections::HashMap;
22 : use std::fmt::Debug;
23 : use std::num::NonZeroU32;
24 : use std::ops::Bound;
25 : use std::pin::{Pin, pin};
26 : use std::sync::Arc;
27 : use std::time::SystemTime;
28 :
29 : use anyhow::Context;
30 : /// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
31 : pub use azure_core::Etag;
32 : use bytes::Bytes;
33 : use camino::{Utf8Path, Utf8PathBuf};
34 : pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
35 : use futures::StreamExt;
36 : use futures::stream::Stream;
37 : use itertools::Itertools as _;
38 : use s3_bucket::RequestKind;
39 : use serde::{Deserialize, Serialize};
40 : use tokio::sync::Semaphore;
41 : use tokio_util::sync::CancellationToken;
42 : use tracing::info;
43 :
44 : pub use self::azure_blob::AzureBlobStorage;
45 : pub use self::local_fs::LocalFs;
46 : pub use self::s3_bucket::S3Bucket;
47 : pub use self::simulate_failures::UnreliableWrapper;
48 : pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
49 :
50 : /// Default concurrency limit for S3 operations
51 : ///
52 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
53 : /// ~200 RPS for IAM services
54 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
55 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
56 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
57 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
58 : /// Set this limit analogously to the S3 limit
59 : ///
60 : /// Here, a limit of max 20k concurrent connections was noted.
61 : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
62 : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
63 : /// Set this limit analogously to the S3 limit.
64 : ///
65 : /// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
66 : /// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
67 : /// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
68 : pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
69 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
70 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
71 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
72 :
73 : /// As defined in S3 docs
74 : ///
75 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
76 : pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
77 :
78 : /// As defined in Azure docs
79 : ///
80 : /// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
81 : pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
82 :
83 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
84 :
85 : /// Path on the remote storage, relative to some inner prefix.
86 : /// The prefix is an implementation detail, that allows representing local paths
87 : /// as the remote ones, stripping the local storage prefix away.
88 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
89 : pub struct RemotePath(Utf8PathBuf);
90 :
91 : impl Serialize for RemotePath {
92 0 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
93 0 : where
94 0 : S: serde::Serializer,
95 0 : {
96 0 : serializer.collect_str(self)
97 0 : }
98 : }
99 :
100 : impl<'de> Deserialize<'de> for RemotePath {
101 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
102 0 : where
103 0 : D: serde::Deserializer<'de>,
104 0 : {
105 0 : let str = String::deserialize(deserializer)?;
106 0 : Ok(Self(Utf8PathBuf::from(&str)))
107 0 : }
108 : }
109 :
110 : impl std::fmt::Display for RemotePath {
111 3563 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 3563 : std::fmt::Display::fmt(&self.0, f)
113 3563 : }
114 : }
115 :
116 : impl RemotePath {
117 30782 : pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
118 30782 : anyhow::ensure!(
119 30782 : relative_path.is_relative(),
120 4 : "Path {relative_path:?} is not relative"
121 : );
122 30778 : Ok(Self(relative_path.to_path_buf()))
123 30782 : }
124 :
125 30159 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
126 30159 : Self::new(Utf8Path::new(relative_path))
127 30159 : }
128 :
129 29704 : pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
130 29704 : base_path.join(&self.0)
131 29704 : }
132 :
133 48 : pub fn object_name(&self) -> Option<&str> {
134 48 : self.0.file_name()
135 48 : }
136 :
137 282 : pub fn join(&self, path: impl AsRef<Utf8Path>) -> Self {
138 282 : Self(self.0.join(path))
139 282 : }
140 :
141 1258 : pub fn get_path(&self) -> &Utf8PathBuf {
142 1258 : &self.0
143 1258 : }
144 :
145 237 : pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
146 237 : self.0.strip_prefix(&p.0)
147 237 : }
148 :
149 1398 : pub fn add_trailing_slash(&self) -> Self {
150 1398 : // Unwrap safety inputs are guararnteed to be valid UTF-8
151 1398 : Self(format!("{}/", self.0).try_into().unwrap())
152 1398 : }
153 : }
154 :
155 : /// We don't need callers to be able to pass arbitrary delimiters: just control
156 : /// whether listings will use a '/' separator or not.
157 : ///
158 : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
159 : /// NoDelimiter mode will only populate `keys`.
160 : #[derive(Copy, Clone)]
161 : pub enum ListingMode {
162 : WithDelimiter,
163 : NoDelimiter,
164 : }
165 :
166 : #[derive(PartialEq, Eq, Debug, Clone)]
167 : pub struct ListingObject {
168 : pub key: RemotePath,
169 : pub last_modified: SystemTime,
170 : pub size: u64,
171 : }
172 :
173 : #[derive(Default)]
174 : pub struct Listing {
175 : pub prefixes: Vec<RemotePath>,
176 : pub keys: Vec<ListingObject>,
177 : }
178 :
179 : #[derive(Default)]
180 : pub struct VersionListing {
181 : pub versions: Vec<Version>,
182 : }
183 :
184 : pub struct Version {
185 : pub key: RemotePath,
186 : pub last_modified: SystemTime,
187 : pub kind: VersionKind,
188 : }
189 :
190 : impl Version {
191 36 : pub fn version_id(&self) -> Option<&VersionId> {
192 36 : match &self.kind {
193 28 : VersionKind::Version(id) => Some(id),
194 8 : VersionKind::DeletionMarker => None,
195 : }
196 36 : }
197 : }
198 :
199 : #[derive(Debug)]
200 : pub enum VersionKind {
201 : DeletionMarker,
202 : Version(VersionId),
203 : }
204 :
205 : /// Options for downloads. The default value is a plain GET.
206 : pub struct DownloadOpts {
207 : /// If given, returns [`DownloadError::Unmodified`] if the object still has
208 : /// the same ETag (using If-None-Match).
209 : pub etag: Option<Etag>,
210 : /// The start of the byte range to download, or unbounded.
211 : pub byte_start: Bound<u64>,
212 : /// The end of the byte range to download, or unbounded. Must be after the
213 : /// start bound.
214 : pub byte_end: Bound<u64>,
215 : /// Optionally request a specific version of a key
216 : pub version_id: Option<VersionId>,
217 : /// Indicate whether we're downloading something small or large: this indirectly controls
218 : /// timeouts: for something like an index/manifest/heatmap, we should time out faster than
219 : /// for layer files
220 : pub kind: DownloadKind,
221 : }
222 :
223 : pub enum DownloadKind {
224 : Large,
225 : Small,
226 : }
227 :
228 : #[derive(Debug, Clone)]
229 : pub struct VersionId(pub String);
230 :
231 : impl Default for DownloadOpts {
232 4572 : fn default() -> Self {
233 4572 : Self {
234 4572 : etag: Default::default(),
235 4572 : byte_start: Bound::Unbounded,
236 4572 : byte_end: Bound::Unbounded,
237 4572 : version_id: None,
238 4572 : kind: DownloadKind::Large,
239 4572 : }
240 4572 : }
241 : }
242 :
243 : impl DownloadOpts {
244 : /// Returns the byte range with inclusive start and exclusive end, or None
245 : /// if unbounded.
246 444 : pub fn byte_range(&self) -> Option<(u64, Option<u64>)> {
247 444 : if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded {
248 348 : return None;
249 96 : }
250 96 : let start = match self.byte_start {
251 18 : Bound::Excluded(i) => i + 1,
252 60 : Bound::Included(i) => i,
253 18 : Bound::Unbounded => 0,
254 : };
255 96 : let end = match self.byte_end {
256 48 : Bound::Excluded(i) => Some(i),
257 27 : Bound::Included(i) => Some(i + 1),
258 21 : Bound::Unbounded => None,
259 : };
260 96 : if let Some(end) = end {
261 75 : assert!(start < end, "range end {end} at or before start {start}");
262 21 : }
263 87 : Some((start, end))
264 435 : }
265 :
266 : /// Returns the byte range as an RFC 2616 Range header value with inclusive
267 : /// bounds, or None if unbounded.
268 65 : pub fn byte_range_header(&self) -> Option<String> {
269 65 : self.byte_range()
270 65 : .map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive
271 65 : .map(|(start, end)| match end {
272 30 : Some(end) => format!("bytes={start}-{end}"),
273 10 : None => format!("bytes={start}-"),
274 65 : })
275 65 : }
276 : }
277 :
278 : /// Storage (potentially remote) API to manage its state.
279 : /// This storage tries to be unaware of any layered repository context,
280 : /// providing basic CRUD operations for storage files.
281 : #[allow(async_fn_in_trait)]
282 : pub trait RemoteStorage: Send + Sync + 'static {
283 : /// List objects in remote storage, with semantics matching AWS S3's [`ListObjectsV2`].
284 : ///
285 : /// The stream is guaranteed to return at least one element, even in the case of errors
286 : /// (in that case it's an `Err()`), or an empty `Listing`.
287 : ///
288 : /// The stream is not ending if it returns an error, as long as [`is_permanent`] returns false on the error.
289 : /// The `next` function can be retried, and maybe in a future retry, there will be success.
290 : ///
291 : /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not
292 : /// from the absolute root of the bucket.
293 : ///
294 : /// `mode` configures whether to use a delimiter. Without a delimiter, all keys
295 : /// within the prefix are listed in the `keys` of the result. With a delimiter, any "directories" at the top level of
296 : /// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are
297 : /// returned in `keys` ().
298 : ///
299 : /// `max_keys` controls the maximum number of keys that will be returned. If this is None, this function
300 : /// will iteratively call listobjects until it runs out of keys. Note that this is not safe to use on
301 : /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure.
302 : ///
303 : /// [`ListObjectsV2`]: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
304 : /// [`is_permanent`]: DownloadError::is_permanent
305 : fn list_streaming(
306 : &self,
307 : prefix: Option<&RemotePath>,
308 : mode: ListingMode,
309 : max_keys: Option<NonZeroU32>,
310 : cancel: &CancellationToken,
311 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;
312 :
313 69 : async fn list(
314 69 : &self,
315 69 : prefix: Option<&RemotePath>,
316 69 : mode: ListingMode,
317 69 : max_keys: Option<NonZeroU32>,
318 69 : cancel: &CancellationToken,
319 69 : ) -> Result<Listing, DownloadError> {
320 69 : let mut stream = pin!(self.list_streaming(prefix, mode, max_keys, cancel));
321 69 : let mut combined = stream.next().await.expect("At least one item required")?;
322 118 : while let Some(list) = stream.next().await {
323 49 : let list = list?;
324 49 : combined.keys.extend(list.keys.into_iter());
325 49 : combined.prefixes.extend_from_slice(&list.prefixes);
326 : }
327 69 : Ok(combined)
328 69 : }
329 :
330 : async fn list_versions(
331 : &self,
332 : prefix: Option<&RemotePath>,
333 : mode: ListingMode,
334 : max_keys: Option<NonZeroU32>,
335 : cancel: &CancellationToken,
336 : ) -> Result<VersionListing, DownloadError>;
337 :
338 : /// Obtain metadata information about an object.
339 : async fn head_object(
340 : &self,
341 : key: &RemotePath,
342 : cancel: &CancellationToken,
343 : ) -> Result<ListingObject, DownloadError>;
344 :
345 : /// Streams the local file contents into remote into the remote storage entry.
346 : ///
347 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
348 : /// set to `TimeoutOrCancel`.
349 : async fn upload(
350 : &self,
351 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
352 : // S3 PUT request requires the content length to be specified,
353 : // otherwise it starts to fail with the concurrent connection count increasing.
354 : data_size_bytes: usize,
355 : to: &RemotePath,
356 : metadata: Option<StorageMetadata>,
357 : cancel: &CancellationToken,
358 : ) -> anyhow::Result<()>;
359 :
360 : /// Streams the remote storage entry contents.
361 : ///
362 : /// The returned download stream will obey initial timeout and cancellation signal by erroring
363 : /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
364 : /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
365 : ///
366 : /// Returns the metadata, if any was stored with the file previously.
367 : async fn download(
368 : &self,
369 : from: &RemotePath,
370 : opts: &DownloadOpts,
371 : cancel: &CancellationToken,
372 : ) -> Result<Download, DownloadError>;
373 :
374 : /// Delete a single path from remote storage.
375 : ///
376 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
377 : /// set to `TimeoutOrCancel`. In such situation it is unknown if the deletion went through.
378 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()>;
379 :
380 : /// Delete a multiple paths from remote storage.
381 : ///
382 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
383 : /// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
384 : /// through.
385 : async fn delete_objects(
386 : &self,
387 : paths: &[RemotePath],
388 : cancel: &CancellationToken,
389 : ) -> anyhow::Result<()>;
390 :
391 : /// Returns the maximum number of keys that a call to [`Self::delete_objects`] can delete without chunking
392 : ///
393 : /// The value returned is only an optimization hint, One can pass larger number of objects to
394 : /// `delete_objects` as well.
395 : ///
396 : /// The value is guaranteed to be >= 1.
397 : fn max_keys_per_delete(&self) -> usize;
398 :
399 : /// Deletes all objects matching the given prefix.
400 : ///
401 : /// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
402 : /// delete /a/b, /a/b/*, /a/bc, /a/bc/*, etc.
403 : ///
404 : /// If the operation fails because of timeout or cancellation, the root cause of the error will
405 : /// be set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
406 : /// through.
407 24 : async fn delete_prefix(
408 24 : &self,
409 24 : prefix: &RemotePath,
410 24 : cancel: &CancellationToken,
411 24 : ) -> anyhow::Result<()> {
412 24 : let mut stream =
413 24 : pin!(self.list_streaming(Some(prefix), ListingMode::NoDelimiter, None, cancel));
414 51 : while let Some(result) = stream.next().await {
415 27 : let keys = match result {
416 27 : Ok(listing) if listing.keys.is_empty() => continue,
417 73 : Ok(listing) => listing.keys.into_iter().map(|o| o.key).collect_vec(),
418 0 : Err(DownloadError::Cancelled) => return Err(TimeoutOrCancel::Cancel.into()),
419 0 : Err(DownloadError::Timeout) => return Err(TimeoutOrCancel::Timeout.into()),
420 0 : Err(err) => return Err(err.into()),
421 : };
422 18 : tracing::info!("Deleting {} keys from remote storage", keys.len());
423 18 : self.delete_objects(&keys, cancel).await?;
424 : }
425 24 : Ok(())
426 0 : }
427 :
428 : /// Copy a remote object inside a bucket from one path to another.
429 : async fn copy(
430 : &self,
431 : from: &RemotePath,
432 : to: &RemotePath,
433 : cancel: &CancellationToken,
434 : ) -> anyhow::Result<()>;
435 :
436 : /// Resets the content of everything with the given prefix to the given state
437 : async fn time_travel_recover(
438 : &self,
439 : prefix: Option<&RemotePath>,
440 : timestamp: SystemTime,
441 : done_if_after: SystemTime,
442 : cancel: &CancellationToken,
443 : ) -> Result<(), TimeTravelError>;
444 : }
445 :
446 : /// Data part of an ongoing [`Download`].
447 : ///
448 : /// `DownloadStream` is sensitive to the timeout and cancellation used with the original
449 : /// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
450 : /// with `tokio::io::copy_buf`.
451 : // This has 'static because safekeepers do not use cancellation tokens (yet)
452 : pub type DownloadStream =
453 : Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>>;
454 :
455 : pub struct Download {
456 : pub download_stream: DownloadStream,
457 : /// The last time the file was modified (`last-modified` HTTP header)
458 : pub last_modified: SystemTime,
459 : /// A way to identify this specific version of the resource (`etag` HTTP header)
460 : pub etag: Etag,
461 : /// Extra key-value data, associated with the current remote file.
462 : pub metadata: Option<StorageMetadata>,
463 : }
464 :
465 : impl Debug for Download {
466 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467 0 : f.debug_struct("Download")
468 0 : .field("metadata", &self.metadata)
469 0 : .finish()
470 0 : }
471 : }
472 :
473 : /// Every storage, currently supported.
474 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
475 : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
476 : #[derive(Clone)]
477 : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
478 : LocalFs(LocalFs),
479 : AwsS3(Arc<S3Bucket>),
480 : AzureBlob(Arc<AzureBlobStorage>),
481 : Unreliable(Other),
482 : }
483 :
484 : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
485 : // See [`RemoteStorage::list`].
486 2853 : pub async fn list(
487 2853 : &self,
488 2853 : prefix: Option<&RemotePath>,
489 2853 : mode: ListingMode,
490 2853 : max_keys: Option<NonZeroU32>,
491 2853 : cancel: &CancellationToken,
492 2853 : ) -> Result<Listing, DownloadError> {
493 2853 : match self {
494 2784 : Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await,
495 50 : Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
496 19 : Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
497 0 : Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
498 : }
499 0 : }
500 :
501 : // See [`RemoteStorage::list_streaming`].
502 3 : pub fn list_streaming<'a>(
503 3 : &'a self,
504 3 : prefix: Option<&'a RemotePath>,
505 3 : mode: ListingMode,
506 3 : max_keys: Option<NonZeroU32>,
507 3 : cancel: &'a CancellationToken,
508 3 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
509 3 : match self {
510 0 : Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
511 0 : as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
512 2 : Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
513 1 : Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
514 0 : Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
515 : }
516 0 : }
517 :
518 : // See [`RemoteStorage::list_versions`].
519 0 : pub async fn list_versions<'a>(
520 0 : &'a self,
521 0 : prefix: Option<&'a RemotePath>,
522 0 : mode: ListingMode,
523 0 : max_keys: Option<NonZeroU32>,
524 0 : cancel: &'a CancellationToken,
525 0 : ) -> Result<VersionListing, DownloadError> {
526 0 : match self {
527 0 : Self::LocalFs(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
528 0 : Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
529 0 : Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
530 0 : Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
531 : }
532 0 : }
533 :
534 : // See [`RemoteStorage::head_object`].
535 9 : pub async fn head_object(
536 9 : &self,
537 9 : key: &RemotePath,
538 9 : cancel: &CancellationToken,
539 9 : ) -> Result<ListingObject, DownloadError> {
540 9 : match self {
541 0 : Self::LocalFs(s) => s.head_object(key, cancel).await,
542 6 : Self::AwsS3(s) => s.head_object(key, cancel).await,
543 3 : Self::AzureBlob(s) => s.head_object(key, cancel).await,
544 0 : Self::Unreliable(s) => s.head_object(key, cancel).await,
545 : }
546 0 : }
547 :
548 : /// See [`RemoteStorage::upload`]
549 20263 : pub async fn upload(
550 20263 : &self,
551 20263 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
552 20263 : data_size_bytes: usize,
553 20263 : to: &RemotePath,
554 20263 : metadata: Option<StorageMetadata>,
555 20263 : cancel: &CancellationToken,
556 20263 : ) -> anyhow::Result<()> {
557 20263 : match self {
558 19948 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
559 198 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
560 93 : Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
561 24 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
562 : }
563 0 : }
564 :
565 : /// See [`RemoteStorage::download`]
566 4503 : pub async fn download(
567 4503 : &self,
568 4503 : from: &RemotePath,
569 4503 : opts: &DownloadOpts,
570 4503 : cancel: &CancellationToken,
571 4503 : ) -> Result<Download, DownloadError> {
572 4503 : match self {
573 4460 : Self::LocalFs(s) => s.download(from, opts, cancel).await,
574 32 : Self::AwsS3(s) => s.download(from, opts, cancel).await,
575 11 : Self::AzureBlob(s) => s.download(from, opts, cancel).await,
576 0 : Self::Unreliable(s) => s.download(from, opts, cancel).await,
577 : }
578 0 : }
579 :
580 : /// See [`RemoteStorage::delete`]
581 2033 : pub async fn delete(
582 2033 : &self,
583 2033 : path: &RemotePath,
584 2033 : cancel: &CancellationToken,
585 2033 : ) -> anyhow::Result<()> {
586 2033 : match self {
587 1773 : Self::LocalFs(s) => s.delete(path, cancel).await,
588 174 : Self::AwsS3(s) => s.delete(path, cancel).await,
589 86 : Self::AzureBlob(s) => s.delete(path, cancel).await,
590 0 : Self::Unreliable(s) => s.delete(path, cancel).await,
591 : }
592 0 : }
593 :
594 : /// See [`RemoteStorage::delete_objects`]
595 51 : pub async fn delete_objects(
596 51 : &self,
597 51 : paths: &[RemotePath],
598 51 : cancel: &CancellationToken,
599 51 : ) -> anyhow::Result<()> {
600 51 : match self {
601 36 : Self::LocalFs(s) => s.delete_objects(paths, cancel).await,
602 12 : Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
603 3 : Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
604 0 : Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
605 : }
606 0 : }
607 :
608 : /// [`RemoteStorage::max_keys_per_delete`]
609 48 : pub fn max_keys_per_delete(&self) -> usize {
610 48 : match self {
611 48 : Self::LocalFs(s) => s.max_keys_per_delete(),
612 0 : Self::AwsS3(s) => s.max_keys_per_delete(),
613 0 : Self::AzureBlob(s) => s.max_keys_per_delete(),
614 0 : Self::Unreliable(s) => s.max_keys_per_delete(),
615 : }
616 0 : }
617 :
618 : /// See [`RemoteStorage::delete_prefix`]
619 24 : pub async fn delete_prefix(
620 24 : &self,
621 24 : prefix: &RemotePath,
622 24 : cancel: &CancellationToken,
623 24 : ) -> anyhow::Result<()> {
624 24 : match self {
625 6 : Self::LocalFs(s) => s.delete_prefix(prefix, cancel).await,
626 12 : Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
627 6 : Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
628 0 : Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
629 : }
630 0 : }
631 :
632 : /// See [`RemoteStorage::copy`]
633 3 : pub async fn copy_object(
634 3 : &self,
635 3 : from: &RemotePath,
636 3 : to: &RemotePath,
637 3 : cancel: &CancellationToken,
638 3 : ) -> anyhow::Result<()> {
639 3 : match self {
640 0 : Self::LocalFs(s) => s.copy(from, to, cancel).await,
641 2 : Self::AwsS3(s) => s.copy(from, to, cancel).await,
642 1 : Self::AzureBlob(s) => s.copy(from, to, cancel).await,
643 0 : Self::Unreliable(s) => s.copy(from, to, cancel).await,
644 : }
645 0 : }
646 :
647 : /// See [`RemoteStorage::time_travel_recover`].
648 6 : pub async fn time_travel_recover(
649 6 : &self,
650 6 : prefix: Option<&RemotePath>,
651 6 : timestamp: SystemTime,
652 6 : done_if_after: SystemTime,
653 6 : cancel: &CancellationToken,
654 6 : ) -> Result<(), TimeTravelError> {
655 6 : match self {
656 0 : Self::LocalFs(s) => {
657 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
658 0 : .await
659 : }
660 6 : Self::AwsS3(s) => {
661 6 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
662 6 : .await
663 : }
664 0 : Self::AzureBlob(s) => {
665 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
666 0 : .await
667 : }
668 0 : Self::Unreliable(s) => {
669 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
670 0 : .await
671 : }
672 : }
673 0 : }
674 : }
675 :
676 : impl GenericRemoteStorage {
677 1469 : pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
678 1469 : let timeout = storage_config.timeout;
679 1469 :
680 1469 : // If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
681 1469 : let small_timeout = std::cmp::min(storage_config.small_timeout, timeout);
682 1469 :
683 1469 : Ok(match &storage_config.storage {
684 1433 : RemoteStorageKind::LocalFs { local_path: path } => {
685 1433 : info!("Using fs root '{path}' as a remote storage");
686 1433 : Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
687 : }
688 26 : RemoteStorageKind::AwsS3(s3_config) => {
689 26 : // The profile and access key id are only printed here for debugging purposes,
690 26 : // their values don't indicate the eventually taken choice for auth.
691 26 : let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
692 26 : let access_key_id =
693 26 : std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
694 26 : info!(
695 0 : "Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
696 : s3_config.bucket_name,
697 : s3_config.bucket_region,
698 : s3_config.prefix_in_bucket,
699 : s3_config.endpoint
700 : );
701 26 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?))
702 : }
703 10 : RemoteStorageKind::AzureContainer(azure_config) => {
704 10 : let storage_account = azure_config
705 10 : .storage_account
706 10 : .as_deref()
707 10 : .unwrap_or("<AZURE_STORAGE_ACCOUNT>");
708 10 : info!(
709 0 : "Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
710 : azure_config.container_name,
711 : azure_config.container_region,
712 : azure_config.prefix_in_container
713 : );
714 10 : Self::AzureBlob(Arc::new(AzureBlobStorage::new(
715 10 : azure_config,
716 10 : timeout,
717 10 : small_timeout,
718 10 : )?))
719 : }
720 : })
721 1469 : }
722 :
723 2 : pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
724 2 : Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
725 2 : }
726 :
727 : /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
728 10330 : pub async fn upload_storage_object(
729 10330 : &self,
730 10330 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
731 10330 : from_size_bytes: usize,
732 10330 : to: &RemotePath,
733 10330 : cancel: &CancellationToken,
734 10330 : ) -> anyhow::Result<()> {
735 10330 : self.upload(from, from_size_bytes, to, None, cancel)
736 10330 : .await
737 10273 : .with_context(|| {
738 0 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
739 10273 : })
740 10273 : }
741 :
742 : /// The name of the bucket/container/etc.
743 0 : pub fn bucket_name(&self) -> Option<&str> {
744 0 : match self {
745 0 : Self::LocalFs(_s) => None,
746 0 : Self::AwsS3(s) => Some(s.bucket_name()),
747 0 : Self::AzureBlob(s) => Some(s.container_name()),
748 0 : Self::Unreliable(_s) => None,
749 : }
750 0 : }
751 : }
752 :
753 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
754 : /// Immutable, cannot be changed once the file is created.
755 : #[derive(Debug, Clone, PartialEq, Eq)]
756 : pub struct StorageMetadata(HashMap<String, String>);
757 :
758 : impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
759 0 : fn from(arr: [(&str, &str); N]) -> Self {
760 0 : let map: HashMap<String, String> = arr
761 0 : .iter()
762 0 : .map(|(k, v)| (k.to_string(), v.to_string()))
763 0 : .collect();
764 0 : Self(map)
765 0 : }
766 : }
767 :
768 : struct ConcurrencyLimiter {
769 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
770 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
771 : // The helps to ensure we don't exceed the thresholds.
772 : write: Arc<Semaphore>,
773 : read: Arc<Semaphore>,
774 : }
775 :
776 : impl ConcurrencyLimiter {
777 729 : fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
778 729 : match kind {
779 43 : RequestKind::Get => &self.read,
780 291 : RequestKind::Put => &self.write,
781 64 : RequestKind::List => &self.read,
782 287 : RequestKind::Delete => &self.write,
783 3 : RequestKind::Copy => &self.write,
784 6 : RequestKind::TimeTravel => &self.write,
785 9 : RequestKind::Head => &self.read,
786 26 : RequestKind::ListVersions => &self.read,
787 : }
788 729 : }
789 :
790 697 : async fn acquire(
791 697 : &self,
792 697 : kind: RequestKind,
793 697 : ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
794 697 : self.for_kind(kind).acquire().await
795 697 : }
796 :
797 32 : async fn acquire_owned(
798 32 : &self,
799 32 : kind: RequestKind,
800 32 : ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
801 32 : Arc::clone(self.for_kind(kind)).acquire_owned().await
802 32 : }
803 :
804 51 : fn new(limit: usize) -> ConcurrencyLimiter {
805 51 : Self {
806 51 : read: Arc::new(Semaphore::new(limit)),
807 51 : write: Arc::new(Semaphore::new(limit)),
808 51 : }
809 51 : }
810 : }
811 :
812 : #[cfg(test)]
813 : mod tests {
814 : use super::*;
815 :
816 : /// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges
817 : /// with optional end bound, or None when unbounded.
818 : #[test]
819 3 : fn download_opts_byte_range() {
820 3 : // Consider using test_case or a similar table-driven test framework.
821 3 : let cases = [
822 3 : // (byte_start, byte_end, expected)
823 3 : (Bound::Unbounded, Bound::Unbounded, None),
824 3 : (Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))),
825 3 : (Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))),
826 3 : (Bound::Included(3), Bound::Unbounded, Some((3, None))),
827 3 : (Bound::Included(3), Bound::Included(7), Some((3, Some(8)))),
828 3 : (Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))),
829 3 : (Bound::Excluded(3), Bound::Unbounded, Some((4, None))),
830 3 : (Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))),
831 3 : (Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))),
832 3 : // 1-sized ranges are fine, 0 aren't and will panic (separate test).
833 3 : (Bound::Included(3), Bound::Included(3), Some((3, Some(4)))),
834 3 : (Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))),
835 3 : ];
836 :
837 36 : for (byte_start, byte_end, expect) in cases {
838 33 : let opts = DownloadOpts {
839 33 : byte_start,
840 33 : byte_end,
841 33 : ..Default::default()
842 33 : };
843 33 : let result = opts.byte_range();
844 33 : assert_eq!(
845 : result, expect,
846 0 : "byte_start={byte_start:?} byte_end={byte_end:?}"
847 : );
848 :
849 : // Check generated HTTP header, which uses an inclusive range.
850 33 : let expect_header = expect.map(|(start, end)| match end {
851 24 : Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end
852 6 : None => format!("bytes={start}-"),
853 33 : });
854 33 : assert_eq!(
855 33 : opts.byte_range_header(),
856 : expect_header,
857 0 : "byte_start={byte_start:?} byte_end={byte_end:?}"
858 : );
859 : }
860 3 : }
861 :
862 : /// DownloadOpts::byte_range() zero-sized byte range should panic.
863 : #[test]
864 : #[should_panic]
865 3 : fn download_opts_byte_range_zero() {
866 3 : DownloadOpts {
867 3 : byte_start: Bound::Included(3),
868 3 : byte_end: Bound::Excluded(3),
869 3 : ..Default::default()
870 3 : }
871 3 : .byte_range();
872 3 : }
873 :
874 : /// DownloadOpts::byte_range() negative byte range should panic.
875 : #[test]
876 : #[should_panic]
877 3 : fn download_opts_byte_range_negative() {
878 3 : DownloadOpts {
879 3 : byte_start: Bound::Included(3),
880 3 : byte_end: Bound::Included(2),
881 3 : ..Default::default()
882 3 : }
883 3 : .byte_range();
884 3 : }
885 :
886 : #[test]
887 3 : fn test_object_name() {
888 3 : let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
889 3 : assert_eq!(k.object_name(), Some("c"));
890 :
891 3 : let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
892 3 : assert_eq!(k.object_name(), Some("c"));
893 :
894 3 : let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
895 3 : assert_eq!(k.object_name(), Some("a"));
896 :
897 : // XXX is it impossible to have an empty key?
898 3 : let k = RemotePath::new(Utf8Path::new("")).unwrap();
899 3 : assert_eq!(k.object_name(), None);
900 3 : }
901 :
902 : #[test]
903 3 : fn rempte_path_cannot_be_created_from_absolute_ones() {
904 3 : let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
905 3 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
906 3 : }
907 : }
|