Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //! * [`azure_blob`] allows to use Azure Blob storage as an external storage
8 : //!
9 : #![deny(unsafe_code)]
10 : #![deny(clippy::undocumented_unsafe_blocks)]
11 :
12 : mod azure_blob;
13 : mod config;
14 : mod error;
15 : mod local_fs;
16 : mod metrics;
17 : mod s3_bucket;
18 : mod simulate_failures;
19 : mod support;
20 :
21 : use std::{
22 : collections::HashMap,
23 : fmt::Debug,
24 : num::NonZeroU32,
25 : ops::Bound,
26 : pin::{pin, Pin},
27 : sync::Arc,
28 : time::SystemTime,
29 : };
30 :
31 : use anyhow::Context;
32 : use camino::{Utf8Path, Utf8PathBuf};
33 :
34 : use bytes::Bytes;
35 : use futures::{stream::Stream, StreamExt};
36 : use itertools::Itertools as _;
37 : use serde::{Deserialize, Serialize};
38 : use tokio::sync::Semaphore;
39 : use tokio_util::sync::CancellationToken;
40 : use tracing::info;
41 :
42 : pub use self::{
43 : azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket,
44 : simulate_failures::UnreliableWrapper,
45 : };
46 : use s3_bucket::RequestKind;
47 :
48 : pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
49 :
50 : /// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
51 : pub use azure_core::Etag;
52 :
53 : pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
54 :
55 : /// Default concurrency limit for S3 operations
56 : ///
57 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
58 : /// ~200 RPS for IAM services
59 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
60 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
61 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
62 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
63 : /// Set this limit analogously to the S3 limit
64 : ///
65 : /// Here, a limit of max 20k concurrent connections was noted.
66 : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
67 : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
68 : /// Set this limit analogously to the S3 limit.
69 : ///
70 : /// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
71 : /// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
72 : /// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
73 : pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
74 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
75 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
76 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
77 :
78 : /// As defined in S3 docs
79 : ///
80 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
81 : pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
82 :
83 : /// As defined in Azure docs
84 : ///
85 : /// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
86 : pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
87 :
88 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
89 :
90 : /// Path on the remote storage, relative to some inner prefix.
91 : /// The prefix is an implementation detail, that allows representing local paths
92 : /// as the remote ones, stripping the local storage prefix away.
93 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
94 : pub struct RemotePath(Utf8PathBuf);
95 :
96 : impl Serialize for RemotePath {
97 0 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
98 0 : where
99 0 : S: serde::Serializer,
100 0 : {
101 0 : serializer.collect_str(self)
102 0 : }
103 : }
104 :
105 : impl<'de> Deserialize<'de> for RemotePath {
106 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
107 0 : where
108 0 : D: serde::Deserializer<'de>,
109 0 : {
110 0 : let str = String::deserialize(deserializer)?;
111 0 : Ok(Self(Utf8PathBuf::from(&str)))
112 0 : }
113 : }
114 :
115 : impl std::fmt::Display for RemotePath {
116 1136 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 1136 : std::fmt::Display::fmt(&self.0, f)
118 1136 : }
119 : }
120 :
121 : impl RemotePath {
122 10014 : pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
123 10014 : anyhow::ensure!(
124 10014 : relative_path.is_relative(),
125 4 : "Path {relative_path:?} is not relative"
126 : );
127 10010 : Ok(Self(relative_path.to_path_buf()))
128 10014 : }
129 :
130 9532 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
131 9532 : Self::new(Utf8Path::new(relative_path))
132 9532 : }
133 :
134 9444 : pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
135 9444 : base_path.join(&self.0)
136 9444 : }
137 :
138 24 : pub fn object_name(&self) -> Option<&str> {
139 24 : self.0.file_name()
140 24 : }
141 :
142 138 : pub fn join(&self, path: impl AsRef<Utf8Path>) -> Self {
143 138 : Self(self.0.join(path))
144 138 : }
145 :
146 1066 : pub fn get_path(&self) -> &Utf8PathBuf {
147 1066 : &self.0
148 1066 : }
149 :
150 101 : pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
151 101 : self.0.strip_prefix(&p.0)
152 101 : }
153 :
154 450 : pub fn add_trailing_slash(&self) -> Self {
155 450 : // Unwrap safety inputs are guararnteed to be valid UTF-8
156 450 : Self(format!("{}/", self.0).try_into().unwrap())
157 450 : }
158 : }
159 :
160 : /// We don't need callers to be able to pass arbitrary delimiters: just control
161 : /// whether listings will use a '/' separator or not.
162 : ///
163 : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
164 : /// NoDelimiter mode will only populate `keys`.
165 : #[derive(Copy, Clone)]
166 : pub enum ListingMode {
167 : WithDelimiter,
168 : NoDelimiter,
169 : }
170 :
171 : #[derive(PartialEq, Eq, Debug, Clone)]
172 : pub struct ListingObject {
173 : pub key: RemotePath,
174 : pub last_modified: SystemTime,
175 : pub size: u64,
176 : }
177 :
178 : #[derive(Default)]
179 : pub struct Listing {
180 : pub prefixes: Vec<RemotePath>,
181 : pub keys: Vec<ListingObject>,
182 : }
183 :
184 : /// Options for downloads. The default value is a plain GET.
185 : pub struct DownloadOpts {
186 : /// If given, returns [`DownloadError::Unmodified`] if the object still has
187 : /// the same ETag (using If-None-Match).
188 : pub etag: Option<Etag>,
189 : /// The start of the byte range to download, or unbounded.
190 : pub byte_start: Bound<u64>,
191 : /// The end of the byte range to download, or unbounded. Must be after the
192 : /// start bound.
193 : pub byte_end: Bound<u64>,
194 : /// Indicate whether we're downloading something small or large: this indirectly controls
195 : /// timeouts: for something like an index/manifest/heatmap, we should time out faster than
196 : /// for layer files
197 : pub kind: DownloadKind,
198 : }
199 :
200 : pub enum DownloadKind {
201 : Large,
202 : Small,
203 : }
204 :
205 : impl Default for DownloadOpts {
206 1544 : fn default() -> Self {
207 1544 : Self {
208 1544 : etag: Default::default(),
209 1544 : byte_start: Bound::Unbounded,
210 1544 : byte_end: Bound::Unbounded,
211 1544 : kind: DownloadKind::Large,
212 1544 : }
213 1544 : }
214 : }
215 :
216 : impl DownloadOpts {
217 : /// Returns the byte range with inclusive start and exclusive end, or None
218 : /// if unbounded.
219 224 : pub fn byte_range(&self) -> Option<(u64, Option<u64>)> {
220 224 : if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded {
221 128 : return None;
222 96 : }
223 96 : let start = match self.byte_start {
224 18 : Bound::Excluded(i) => i + 1,
225 60 : Bound::Included(i) => i,
226 18 : Bound::Unbounded => 0,
227 : };
228 96 : let end = match self.byte_end {
229 48 : Bound::Excluded(i) => Some(i),
230 27 : Bound::Included(i) => Some(i + 1),
231 21 : Bound::Unbounded => None,
232 : };
233 96 : if let Some(end) = end {
234 75 : assert!(start < end, "range end {end} at or before start {start}");
235 21 : }
236 87 : Some((start, end))
237 215 : }
238 :
239 : /// Returns the byte range as an RFC 2616 Range header value with inclusive
240 : /// bounds, or None if unbounded.
241 66 : pub fn byte_range_header(&self) -> Option<String> {
242 66 : self.byte_range()
243 66 : .map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive
244 66 : .map(|(start, end)| match end {
245 30 : Some(end) => format!("bytes={start}-{end}"),
246 10 : None => format!("bytes={start}-"),
247 66 : })
248 66 : }
249 : }
250 :
251 : /// Storage (potentially remote) API to manage its state.
252 : /// This storage tries to be unaware of any layered repository context,
253 : /// providing basic CRUD operations for storage files.
254 : #[allow(async_fn_in_trait)]
255 : pub trait RemoteStorage: Send + Sync + 'static {
256 : /// List objects in remote storage, with semantics matching AWS S3's [`ListObjectsV2`].
257 : ///
258 : /// The stream is guaranteed to return at least one element, even in the case of errors
259 : /// (in that case it's an `Err()`), or an empty `Listing`.
260 : ///
261 : /// The stream is not ending if it returns an error, as long as [`is_permanent`] returns false on the error.
262 : /// The `next` function can be retried, and maybe in a future retry, there will be success.
263 : ///
264 : /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not
265 : /// from the absolute root of the bucket.
266 : ///
267 : /// `mode` configures whether to use a delimiter. Without a delimiter, all keys
268 : /// within the prefix are listed in the `keys` of the result. With a delimiter, any "directories" at the top level of
269 : /// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are
270 : /// returned in `keys` ().
271 : ///
272 : /// `max_keys` controls the maximum number of keys that will be returned. If this is None, this function
273 : /// will iteratively call listobjects until it runs out of keys. Note that this is not safe to use on
274 : /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure.
275 : ///
276 : /// [`ListObjectsV2`]: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
277 : /// [`is_permanent`]: DownloadError::is_permanent
278 : fn list_streaming(
279 : &self,
280 : prefix: Option<&RemotePath>,
281 : mode: ListingMode,
282 : max_keys: Option<NonZeroU32>,
283 : cancel: &CancellationToken,
284 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;
285 :
286 69 : async fn list(
287 69 : &self,
288 69 : prefix: Option<&RemotePath>,
289 69 : mode: ListingMode,
290 69 : max_keys: Option<NonZeroU32>,
291 69 : cancel: &CancellationToken,
292 69 : ) -> Result<Listing, DownloadError> {
293 69 : let mut stream = pin!(self.list_streaming(prefix, mode, max_keys, cancel));
294 69 : let mut combined = stream.next().await.expect("At least one item required")?;
295 117 : while let Some(list) = stream.next().await {
296 48 : let list = list?;
297 48 : combined.keys.extend(list.keys.into_iter());
298 48 : combined.prefixes.extend_from_slice(&list.prefixes);
299 : }
300 69 : Ok(combined)
301 69 : }
302 :
303 : /// Obtain metadata information about an object.
304 : async fn head_object(
305 : &self,
306 : key: &RemotePath,
307 : cancel: &CancellationToken,
308 : ) -> Result<ListingObject, DownloadError>;
309 :
310 : /// Streams the local file contents into remote into the remote storage entry.
311 : ///
312 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
313 : /// set to `TimeoutOrCancel`.
314 : async fn upload(
315 : &self,
316 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
317 : // S3 PUT request requires the content length to be specified,
318 : // otherwise it starts to fail with the concurrent connection count increasing.
319 : data_size_bytes: usize,
320 : to: &RemotePath,
321 : metadata: Option<StorageMetadata>,
322 : cancel: &CancellationToken,
323 : ) -> anyhow::Result<()>;
324 :
325 : /// Streams the remote storage entry contents.
326 : ///
327 : /// The returned download stream will obey initial timeout and cancellation signal by erroring
328 : /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
329 : /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
330 : ///
331 : /// Returns the metadata, if any was stored with the file previously.
332 : async fn download(
333 : &self,
334 : from: &RemotePath,
335 : opts: &DownloadOpts,
336 : cancel: &CancellationToken,
337 : ) -> Result<Download, DownloadError>;
338 :
339 : /// Delete a single path from remote storage.
340 : ///
341 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
342 : /// set to `TimeoutOrCancel`. In such situation it is unknown if the deletion went through.
343 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()>;
344 :
345 : /// Delete a multiple paths from remote storage.
346 : ///
347 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
348 : /// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
349 : /// through.
350 : async fn delete_objects(
351 : &self,
352 : paths: &[RemotePath],
353 : cancel: &CancellationToken,
354 : ) -> anyhow::Result<()>;
355 :
356 : /// Returns the maximum number of keys that a call to [`Self::delete_objects`] can delete without chunking
357 : ///
358 : /// The value returned is only an optimization hint, One can pass larger number of objects to
359 : /// `delete_objects` as well.
360 : ///
361 : /// The value is guaranteed to be >= 1.
362 : fn max_keys_per_delete(&self) -> usize;
363 :
364 : /// Deletes all objects matching the given prefix.
365 : ///
366 : /// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
367 : /// delete /a/b, /a/b/*, /a/bc, /a/bc/*, etc.
368 : ///
369 : /// If the operation fails because of timeout or cancellation, the root cause of the error will
370 : /// be set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
371 : /// through.
372 18 : async fn delete_prefix(
373 18 : &self,
374 18 : prefix: &RemotePath,
375 18 : cancel: &CancellationToken,
376 18 : ) -> anyhow::Result<()> {
377 18 : let mut stream =
378 18 : pin!(self.list_streaming(Some(prefix), ListingMode::NoDelimiter, None, cancel));
379 39 : while let Some(result) = stream.next().await {
380 21 : let keys = match result {
381 21 : Ok(listing) if listing.keys.is_empty() => continue,
382 63 : Ok(listing) => listing.keys.into_iter().map(|o| o.key).collect_vec(),
383 0 : Err(DownloadError::Cancelled) => return Err(TimeoutOrCancel::Cancel.into()),
384 0 : Err(DownloadError::Timeout) => return Err(TimeoutOrCancel::Timeout.into()),
385 0 : Err(err) => return Err(err.into()),
386 : };
387 12 : tracing::info!("Deleting {} keys from remote storage", keys.len());
388 12 : self.delete_objects(&keys, cancel).await?;
389 : }
390 18 : Ok(())
391 18 : }
392 :
393 : /// Copy a remote object inside a bucket from one path to another.
394 : async fn copy(
395 : &self,
396 : from: &RemotePath,
397 : to: &RemotePath,
398 : cancel: &CancellationToken,
399 : ) -> anyhow::Result<()>;
400 :
401 : /// Resets the content of everything with the given prefix to the given state
402 : async fn time_travel_recover(
403 : &self,
404 : prefix: Option<&RemotePath>,
405 : timestamp: SystemTime,
406 : done_if_after: SystemTime,
407 : cancel: &CancellationToken,
408 : ) -> Result<(), TimeTravelError>;
409 : }
410 :
411 : /// Data part of an ongoing [`Download`].
412 : ///
413 : /// `DownloadStream` is sensitive to the timeout and cancellation used with the original
414 : /// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
415 : /// with `tokio::io::copy_buf`.
416 : // This has 'static because safekeepers do not use cancellation tokens (yet)
417 : pub type DownloadStream =
418 : Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>>;
419 :
420 : pub struct Download {
421 : pub download_stream: DownloadStream,
422 : /// The last time the file was modified (`last-modified` HTTP header)
423 : pub last_modified: SystemTime,
424 : /// A way to identify this specific version of the resource (`etag` HTTP header)
425 : pub etag: Etag,
426 : /// Extra key-value data, associated with the current remote file.
427 : pub metadata: Option<StorageMetadata>,
428 : }
429 :
430 : impl Debug for Download {
431 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432 0 : f.debug_struct("Download")
433 0 : .field("metadata", &self.metadata)
434 0 : .finish()
435 0 : }
436 : }
437 :
438 : /// Every storage, currently supported.
439 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
440 : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
441 : #[derive(Clone)]
442 : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
443 : LocalFs(LocalFs),
444 : AwsS3(Arc<S3Bucket>),
445 : AzureBlob(Arc<AzureBlobStorage>),
446 : Unreliable(Other),
447 : }
448 :
449 : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
450 : // See [`RemoteStorage::list`].
451 969 : pub async fn list(
452 969 : &self,
453 969 : prefix: Option<&RemotePath>,
454 969 : mode: ListingMode,
455 969 : max_keys: Option<NonZeroU32>,
456 969 : cancel: &CancellationToken,
457 969 : ) -> Result<Listing, DownloadError> {
458 969 : match self {
459 900 : Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await,
460 50 : Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
461 19 : Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
462 0 : Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
463 : }
464 969 : }
465 :
466 : // See [`RemoteStorage::list_streaming`].
467 3 : pub fn list_streaming<'a>(
468 3 : &'a self,
469 3 : prefix: Option<&'a RemotePath>,
470 3 : mode: ListingMode,
471 3 : max_keys: Option<NonZeroU32>,
472 3 : cancel: &'a CancellationToken,
473 3 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
474 3 : match self {
475 0 : Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
476 0 : as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
477 2 : Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
478 1 : Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
479 0 : Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
480 : }
481 3 : }
482 :
483 : // See [`RemoteStorage::head_object`].
484 9 : pub async fn head_object(
485 9 : &self,
486 9 : key: &RemotePath,
487 9 : cancel: &CancellationToken,
488 9 : ) -> Result<ListingObject, DownloadError> {
489 9 : match self {
490 0 : Self::LocalFs(s) => s.head_object(key, cancel).await,
491 6 : Self::AwsS3(s) => s.head_object(key, cancel).await,
492 3 : Self::AzureBlob(s) => s.head_object(key, cancel).await,
493 0 : Self::Unreliable(s) => s.head_object(key, cancel).await,
494 : }
495 9 : }
496 :
497 : /// See [`RemoteStorage::upload`]
498 6450 : pub async fn upload(
499 6450 : &self,
500 6450 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
501 6450 : data_size_bytes: usize,
502 6450 : to: &RemotePath,
503 6450 : metadata: Option<StorageMetadata>,
504 6450 : cancel: &CancellationToken,
505 6450 : ) -> anyhow::Result<()> {
506 6450 : match self {
507 6134 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
508 199 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
509 93 : Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
510 24 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
511 : }
512 6405 : }
513 :
514 : /// See [`RemoteStorage::download`]
515 1476 : pub async fn download(
516 1476 : &self,
517 1476 : from: &RemotePath,
518 1476 : opts: &DownloadOpts,
519 1476 : cancel: &CancellationToken,
520 1476 : ) -> Result<Download, DownloadError> {
521 1476 : match self {
522 1432 : Self::LocalFs(s) => s.download(from, opts, cancel).await,
523 33 : Self::AwsS3(s) => s.download(from, opts, cancel).await,
524 11 : Self::AzureBlob(s) => s.download(from, opts, cancel).await,
525 0 : Self::Unreliable(s) => s.download(from, opts, cancel).await,
526 : }
527 1476 : }
528 :
529 : /// See [`RemoteStorage::delete`]
530 838 : pub async fn delete(
531 838 : &self,
532 838 : path: &RemotePath,
533 838 : cancel: &CancellationToken,
534 838 : ) -> anyhow::Result<()> {
535 838 : match self {
536 578 : Self::LocalFs(s) => s.delete(path, cancel).await,
537 174 : Self::AwsS3(s) => s.delete(path, cancel).await,
538 86 : Self::AzureBlob(s) => s.delete(path, cancel).await,
539 0 : Self::Unreliable(s) => s.delete(path, cancel).await,
540 : }
541 831 : }
542 :
543 : /// See [`RemoteStorage::delete_objects`]
544 27 : pub async fn delete_objects(
545 27 : &self,
546 27 : paths: &[RemotePath],
547 27 : cancel: &CancellationToken,
548 27 : ) -> anyhow::Result<()> {
549 27 : match self {
550 12 : Self::LocalFs(s) => s.delete_objects(paths, cancel).await,
551 12 : Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
552 3 : Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
553 0 : Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
554 : }
555 27 : }
556 :
557 : /// [`RemoteStorage::max_keys_per_delete`]
558 16 : pub fn max_keys_per_delete(&self) -> usize {
559 16 : match self {
560 16 : Self::LocalFs(s) => s.max_keys_per_delete(),
561 0 : Self::AwsS3(s) => s.max_keys_per_delete(),
562 0 : Self::AzureBlob(s) => s.max_keys_per_delete(),
563 0 : Self::Unreliable(s) => s.max_keys_per_delete(),
564 : }
565 16 : }
566 :
567 : /// See [`RemoteStorage::delete_prefix`]
568 18 : pub async fn delete_prefix(
569 18 : &self,
570 18 : prefix: &RemotePath,
571 18 : cancel: &CancellationToken,
572 18 : ) -> anyhow::Result<()> {
573 18 : match self {
574 0 : Self::LocalFs(s) => s.delete_prefix(prefix, cancel).await,
575 12 : Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
576 6 : Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
577 0 : Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
578 : }
579 18 : }
580 :
581 : /// See [`RemoteStorage::copy`]
582 3 : pub async fn copy_object(
583 3 : &self,
584 3 : from: &RemotePath,
585 3 : to: &RemotePath,
586 3 : cancel: &CancellationToken,
587 3 : ) -> anyhow::Result<()> {
588 3 : match self {
589 0 : Self::LocalFs(s) => s.copy(from, to, cancel).await,
590 2 : Self::AwsS3(s) => s.copy(from, to, cancel).await,
591 1 : Self::AzureBlob(s) => s.copy(from, to, cancel).await,
592 0 : Self::Unreliable(s) => s.copy(from, to, cancel).await,
593 : }
594 3 : }
595 :
596 : /// See [`RemoteStorage::time_travel_recover`].
597 6 : pub async fn time_travel_recover(
598 6 : &self,
599 6 : prefix: Option<&RemotePath>,
600 6 : timestamp: SystemTime,
601 6 : done_if_after: SystemTime,
602 6 : cancel: &CancellationToken,
603 6 : ) -> Result<(), TimeTravelError> {
604 6 : match self {
605 0 : Self::LocalFs(s) => {
606 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
607 0 : .await
608 : }
609 6 : Self::AwsS3(s) => {
610 6 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
611 6 : .await
612 : }
613 0 : Self::AzureBlob(s) => {
614 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
615 0 : .await
616 : }
617 0 : Self::Unreliable(s) => {
618 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
619 0 : .await
620 : }
621 : }
622 6 : }
623 : }
624 :
625 : impl GenericRemoteStorage {
626 497 : pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
627 497 : let timeout = storage_config.timeout;
628 497 :
629 497 : // If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
630 497 : let small_timeout = std::cmp::min(storage_config.small_timeout, timeout);
631 497 :
632 497 : Ok(match &storage_config.storage {
633 461 : RemoteStorageKind::LocalFs { local_path: path } => {
634 461 : info!("Using fs root '{path}' as a remote storage");
635 461 : Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
636 : }
637 26 : RemoteStorageKind::AwsS3(s3_config) => {
638 26 : // The profile and access key id are only printed here for debugging purposes,
639 26 : // their values don't indicate the eventually taken choice for auth.
640 26 : let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
641 26 : let access_key_id =
642 26 : std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
643 26 : info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
644 : s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
645 26 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?))
646 : }
647 10 : RemoteStorageKind::AzureContainer(azure_config) => {
648 10 : let storage_account = azure_config
649 10 : .storage_account
650 10 : .as_deref()
651 10 : .unwrap_or("<AZURE_STORAGE_ACCOUNT>");
652 10 : info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
653 : azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
654 10 : Self::AzureBlob(Arc::new(AzureBlobStorage::new(
655 10 : azure_config,
656 10 : timeout,
657 10 : small_timeout,
658 10 : )?))
659 : }
660 : })
661 497 : }
662 :
663 2 : pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
664 2 : Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
665 2 : }
666 :
667 : /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
668 2986 : pub async fn upload_storage_object(
669 2986 : &self,
670 2986 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
671 2986 : from_size_bytes: usize,
672 2986 : to: &RemotePath,
673 2986 : cancel: &CancellationToken,
674 2986 : ) -> anyhow::Result<()> {
675 2986 : self.upload(from, from_size_bytes, to, None, cancel)
676 2986 : .await
677 2969 : .with_context(|| {
678 0 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
679 2969 : })
680 2969 : }
681 :
682 : /// The name of the bucket/container/etc.
683 0 : pub fn bucket_name(&self) -> Option<&str> {
684 0 : match self {
685 0 : Self::LocalFs(_s) => None,
686 0 : Self::AwsS3(s) => Some(s.bucket_name()),
687 0 : Self::AzureBlob(s) => Some(s.container_name()),
688 0 : Self::Unreliable(_s) => None,
689 : }
690 0 : }
691 : }
692 :
693 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
694 : /// Immutable, cannot be changed once the file is created.
695 : #[derive(Debug, Clone, PartialEq, Eq)]
696 : pub struct StorageMetadata(HashMap<String, String>);
697 :
698 : impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
699 0 : fn from(arr: [(&str, &str); N]) -> Self {
700 0 : let map: HashMap<String, String> = arr
701 0 : .iter()
702 0 : .map(|(k, v)| (k.to_string(), v.to_string()))
703 0 : .collect();
704 0 : Self(map)
705 0 : }
706 : }
707 :
708 : struct ConcurrencyLimiter {
709 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
710 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
711 : // The helps to ensure we don't exceed the thresholds.
712 : write: Arc<Semaphore>,
713 : read: Arc<Semaphore>,
714 : }
715 :
716 : impl ConcurrencyLimiter {
717 731 : fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
718 731 : match kind {
719 44 : RequestKind::Get => &self.read,
720 292 : RequestKind::Put => &self.write,
721 90 : RequestKind::List => &self.read,
722 287 : RequestKind::Delete => &self.write,
723 3 : RequestKind::Copy => &self.write,
724 6 : RequestKind::TimeTravel => &self.write,
725 9 : RequestKind::Head => &self.read,
726 : }
727 731 : }
728 :
729 698 : async fn acquire(
730 698 : &self,
731 698 : kind: RequestKind,
732 698 : ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
733 698 : self.for_kind(kind).acquire().await
734 698 : }
735 :
736 33 : async fn acquire_owned(
737 33 : &self,
738 33 : kind: RequestKind,
739 33 : ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
740 33 : Arc::clone(self.for_kind(kind)).acquire_owned().await
741 33 : }
742 :
743 51 : fn new(limit: usize) -> ConcurrencyLimiter {
744 51 : Self {
745 51 : read: Arc::new(Semaphore::new(limit)),
746 51 : write: Arc::new(Semaphore::new(limit)),
747 51 : }
748 51 : }
749 : }
750 :
751 : #[cfg(test)]
752 : mod tests {
753 : use super::*;
754 :
755 : /// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges
756 : /// with optional end bound, or None when unbounded.
757 : #[test]
758 3 : fn download_opts_byte_range() {
759 3 : // Consider using test_case or a similar table-driven test framework.
760 3 : let cases = [
761 3 : // (byte_start, byte_end, expected)
762 3 : (Bound::Unbounded, Bound::Unbounded, None),
763 3 : (Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))),
764 3 : (Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))),
765 3 : (Bound::Included(3), Bound::Unbounded, Some((3, None))),
766 3 : (Bound::Included(3), Bound::Included(7), Some((3, Some(8)))),
767 3 : (Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))),
768 3 : (Bound::Excluded(3), Bound::Unbounded, Some((4, None))),
769 3 : (Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))),
770 3 : (Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))),
771 3 : // 1-sized ranges are fine, 0 aren't and will panic (separate test).
772 3 : (Bound::Included(3), Bound::Included(3), Some((3, Some(4)))),
773 3 : (Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))),
774 3 : ];
775 :
776 36 : for (byte_start, byte_end, expect) in cases {
777 33 : let opts = DownloadOpts {
778 33 : byte_start,
779 33 : byte_end,
780 33 : ..Default::default()
781 33 : };
782 33 : let result = opts.byte_range();
783 33 : assert_eq!(
784 : result, expect,
785 0 : "byte_start={byte_start:?} byte_end={byte_end:?}"
786 : );
787 :
788 : // Check generated HTTP header, which uses an inclusive range.
789 33 : let expect_header = expect.map(|(start, end)| match end {
790 24 : Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end
791 6 : None => format!("bytes={start}-"),
792 33 : });
793 33 : assert_eq!(
794 33 : opts.byte_range_header(),
795 : expect_header,
796 0 : "byte_start={byte_start:?} byte_end={byte_end:?}"
797 : );
798 : }
799 3 : }
800 :
801 : /// DownloadOpts::byte_range() zero-sized byte range should panic.
802 : #[test]
803 : #[should_panic]
804 3 : fn download_opts_byte_range_zero() {
805 3 : DownloadOpts {
806 3 : byte_start: Bound::Included(3),
807 3 : byte_end: Bound::Excluded(3),
808 3 : ..Default::default()
809 3 : }
810 3 : .byte_range();
811 3 : }
812 :
813 : /// DownloadOpts::byte_range() negative byte range should panic.
814 : #[test]
815 : #[should_panic]
816 3 : fn download_opts_byte_range_negative() {
817 3 : DownloadOpts {
818 3 : byte_start: Bound::Included(3),
819 3 : byte_end: Bound::Included(2),
820 3 : ..Default::default()
821 3 : }
822 3 : .byte_range();
823 3 : }
824 :
825 : #[test]
826 3 : fn test_object_name() {
827 3 : let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
828 3 : assert_eq!(k.object_name(), Some("c"));
829 :
830 3 : let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
831 3 : assert_eq!(k.object_name(), Some("c"));
832 :
833 3 : let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
834 3 : assert_eq!(k.object_name(), Some("a"));
835 :
836 : // XXX is it impossible to have an empty key?
837 3 : let k = RemotePath::new(Utf8Path::new("")).unwrap();
838 3 : assert_eq!(k.object_name(), None);
839 3 : }
840 :
841 : #[test]
842 3 : fn rempte_path_cannot_be_created_from_absolute_ones() {
843 3 : let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
844 3 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
845 3 : }
846 : }
|