Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //! * [`azure_blob`] allows to use Azure Blob storage as an external storage
8 : //!
9 : #![deny(unsafe_code)]
10 : #![deny(clippy::undocumented_unsafe_blocks)]
11 :
12 : mod azure_blob;
13 : mod config;
14 : mod error;
15 : mod local_fs;
16 : mod metrics;
17 : mod s3_bucket;
18 : mod simulate_failures;
19 : mod support;
20 :
21 : use std::{
22 : collections::HashMap,
23 : fmt::Debug,
24 : num::NonZeroU32,
25 : ops::Bound,
26 : pin::{pin, Pin},
27 : sync::Arc,
28 : time::SystemTime,
29 : };
30 :
31 : use anyhow::Context;
32 : use camino::{Utf8Path, Utf8PathBuf};
33 :
34 : use bytes::Bytes;
35 : use futures::{stream::Stream, StreamExt};
36 : use itertools::Itertools as _;
37 : use serde::{Deserialize, Serialize};
38 : use tokio::sync::Semaphore;
39 : use tokio_util::sync::CancellationToken;
40 : use tracing::info;
41 :
42 : pub use self::{
43 : azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket,
44 : simulate_failures::UnreliableWrapper,
45 : };
46 : use s3_bucket::RequestKind;
47 :
48 : pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
49 :
50 : /// Azure SDK's ETag type is a simple String wrapper: we use this internally instead of repeating it here.
51 : pub use azure_core::Etag;
52 :
53 : pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
54 :
55 : /// Default concurrency limit for S3 operations
56 : ///
57 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
58 : /// ~200 RPS for IAM services
59 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
60 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
61 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
62 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
63 : /// Set this limit analogously to the S3 limit
64 : ///
65 : /// Here, a limit of max 20k concurrent connections was noted.
66 : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
67 : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
68 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
69 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
70 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
71 :
72 : /// As defined in S3 docs
73 : pub const MAX_KEYS_PER_DELETE: usize = 1000;
74 :
75 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
76 :
77 : /// Path on the remote storage, relative to some inner prefix.
78 : /// The prefix is an implementation detail, that allows representing local paths
79 : /// as the remote ones, stripping the local storage prefix away.
80 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
81 : pub struct RemotePath(Utf8PathBuf);
82 :
83 : impl Serialize for RemotePath {
84 0 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
85 0 : where
86 0 : S: serde::Serializer,
87 0 : {
88 0 : serializer.collect_str(self)
89 0 : }
90 : }
91 :
92 : impl<'de> Deserialize<'de> for RemotePath {
93 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
94 0 : where
95 0 : D: serde::Deserializer<'de>,
96 0 : {
97 0 : let str = String::deserialize(deserializer)?;
98 0 : Ok(Self(Utf8PathBuf::from(&str)))
99 0 : }
100 : }
101 :
102 : impl std::fmt::Display for RemotePath {
103 266 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 266 : std::fmt::Display::fmt(&self.0, f)
105 266 : }
106 : }
107 :
108 : impl RemotePath {
109 3699 : pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
110 3699 : anyhow::ensure!(
111 3699 : relative_path.is_relative(),
112 4 : "Path {relative_path:?} is not relative"
113 : );
114 3695 : Ok(Self(relative_path.to_path_buf()))
115 3699 : }
116 :
117 3247 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
118 3247 : Self::new(Utf8Path::new(relative_path))
119 3247 : }
120 :
121 3343 : pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
122 3343 : base_path.join(&self.0)
123 3343 : }
124 :
125 18 : pub fn object_name(&self) -> Option<&str> {
126 18 : self.0.file_name()
127 18 : }
128 :
129 102 : pub fn join(&self, path: impl AsRef<Utf8Path>) -> Self {
130 102 : Self(self.0.join(path))
131 102 : }
132 :
133 1022 : pub fn get_path(&self) -> &Utf8PathBuf {
134 1022 : &self.0
135 1022 : }
136 :
137 67 : pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
138 67 : self.0.strip_prefix(&p.0)
139 67 : }
140 :
141 192 : pub fn add_trailing_slash(&self) -> Self {
142 192 : // Unwrap safety inputs are guararnteed to be valid UTF-8
143 192 : Self(format!("{}/", self.0).try_into().unwrap())
144 192 : }
145 : }
146 :
147 : /// We don't need callers to be able to pass arbitrary delimiters: just control
148 : /// whether listings will use a '/' separator or not.
149 : ///
150 : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
151 : /// NoDelimiter mode will only populate `keys`.
152 : #[derive(Copy, Clone)]
153 : pub enum ListingMode {
154 : WithDelimiter,
155 : NoDelimiter,
156 : }
157 :
158 : #[derive(PartialEq, Eq, Debug, Clone)]
159 : pub struct ListingObject {
160 : pub key: RemotePath,
161 : pub last_modified: SystemTime,
162 : pub size: u64,
163 : }
164 :
165 : #[derive(Default)]
166 : pub struct Listing {
167 : pub prefixes: Vec<RemotePath>,
168 : pub keys: Vec<ListingObject>,
169 : }
170 :
171 : /// Options for downloads. The default value is a plain GET.
172 : pub struct DownloadOpts {
173 : /// If given, returns [`DownloadError::Unmodified`] if the object still has
174 : /// the same ETag (using If-None-Match).
175 : pub etag: Option<Etag>,
176 : /// The start of the byte range to download, or unbounded.
177 : pub byte_start: Bound<u64>,
178 : /// The end of the byte range to download, or unbounded. Must be after the
179 : /// start bound.
180 : pub byte_end: Bound<u64>,
181 : }
182 :
183 : impl Default for DownloadOpts {
184 154 : fn default() -> Self {
185 154 : Self {
186 154 : etag: Default::default(),
187 154 : byte_start: Bound::Unbounded,
188 154 : byte_end: Bound::Unbounded,
189 154 : }
190 154 : }
191 : }
192 :
193 : impl DownloadOpts {
194 : /// Returns the byte range with inclusive start and exclusive end, or None
195 : /// if unbounded.
196 180 : pub fn byte_range(&self) -> Option<(u64, Option<u64>)> {
197 180 : if self.byte_start == Bound::Unbounded && self.byte_end == Bound::Unbounded {
198 84 : return None;
199 96 : }
200 96 : let start = match self.byte_start {
201 18 : Bound::Excluded(i) => i + 1,
202 60 : Bound::Included(i) => i,
203 18 : Bound::Unbounded => 0,
204 : };
205 96 : let end = match self.byte_end {
206 48 : Bound::Excluded(i) => Some(i),
207 27 : Bound::Included(i) => Some(i + 1),
208 21 : Bound::Unbounded => None,
209 : };
210 96 : if let Some(end) = end {
211 75 : assert!(start < end, "range end {end} at or before start {start}");
212 21 : }
213 87 : Some((start, end))
214 171 : }
215 :
216 : /// Returns the byte range as an RFC 2616 Range header value with inclusive
217 : /// bounds, or None if unbounded.
218 66 : pub fn byte_range_header(&self) -> Option<String> {
219 66 : self.byte_range()
220 66 : .map(|(start, end)| (start, end.map(|end| end - 1))) // make end inclusive
221 66 : .map(|(start, end)| match end {
222 30 : Some(end) => format!("bytes={start}-{end}"),
223 10 : None => format!("bytes={start}-"),
224 66 : })
225 66 : }
226 : }
227 :
228 : /// Storage (potentially remote) API to manage its state.
229 : /// This storage tries to be unaware of any layered repository context,
230 : /// providing basic CRUD operations for storage files.
231 : #[allow(async_fn_in_trait)]
232 : pub trait RemoteStorage: Send + Sync + 'static {
233 : /// List objects in remote storage, with semantics matching AWS S3's [`ListObjectsV2`].
234 : ///
235 : /// The stream is guaranteed to return at least one element, even in the case of errors
236 : /// (in that case it's an `Err()`), or an empty `Listing`.
237 : ///
238 : /// The stream is not ending if it returns an error, as long as [`is_permanent`] returns false on the error.
239 : /// The `next` function can be retried, and maybe in a future retry, there will be success.
240 : ///
241 : /// Note that the prefix is relative to any `prefix_in_bucket` configured for the client, not
242 : /// from the absolute root of the bucket.
243 : ///
244 : /// `mode` configures whether to use a delimiter. Without a delimiter, all keys
245 : /// within the prefix are listed in the `keys` of the result. With a delimiter, any "directories" at the top level of
246 : /// the prefix are returned in the `prefixes` of the result, and keys in the top level of the prefix are
247 : /// returned in `keys` ().
248 : ///
249 : /// `max_keys` controls the maximum number of keys that will be returned. If this is None, this function
250 : /// will iteratively call listobjects until it runs out of keys. Note that this is not safe to use on
251 : /// unlimted size buckets, as the full list of objects is allocated into a monolithic data structure.
252 : ///
253 : /// [`ListObjectsV2`]: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
254 : /// [`is_permanent`]: DownloadError::is_permanent
255 : fn list_streaming(
256 : &self,
257 : prefix: Option<&RemotePath>,
258 : mode: ListingMode,
259 : max_keys: Option<NonZeroU32>,
260 : cancel: &CancellationToken,
261 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;
262 :
263 69 : async fn list(
264 69 : &self,
265 69 : prefix: Option<&RemotePath>,
266 69 : mode: ListingMode,
267 69 : max_keys: Option<NonZeroU32>,
268 69 : cancel: &CancellationToken,
269 69 : ) -> Result<Listing, DownloadError> {
270 69 : let mut stream = pin!(self.list_streaming(prefix, mode, max_keys, cancel));
271 285 : let mut combined = stream.next().await.expect("At least one item required")?;
272 191 : while let Some(list) = stream.next().await {
273 48 : let list = list?;
274 48 : combined.keys.extend(list.keys.into_iter());
275 48 : combined.prefixes.extend_from_slice(&list.prefixes);
276 : }
277 69 : Ok(combined)
278 69 : }
279 :
280 : /// Obtain metadata information about an object.
281 : async fn head_object(
282 : &self,
283 : key: &RemotePath,
284 : cancel: &CancellationToken,
285 : ) -> Result<ListingObject, DownloadError>;
286 :
287 : /// Streams the local file contents into remote into the remote storage entry.
288 : ///
289 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
290 : /// set to `TimeoutOrCancel`.
291 : async fn upload(
292 : &self,
293 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
294 : // S3 PUT request requires the content length to be specified,
295 : // otherwise it starts to fail with the concurrent connection count increasing.
296 : data_size_bytes: usize,
297 : to: &RemotePath,
298 : metadata: Option<StorageMetadata>,
299 : cancel: &CancellationToken,
300 : ) -> anyhow::Result<()>;
301 :
302 : /// Streams the remote storage entry contents.
303 : ///
304 : /// The returned download stream will obey initial timeout and cancellation signal by erroring
305 : /// on whichever happens first. Only one of the reasons will fail the stream, which is usually
306 : /// enough for `tokio::io::copy_buf` usage. If needed the error can be filtered out.
307 : ///
308 : /// Returns the metadata, if any was stored with the file previously.
309 : async fn download(
310 : &self,
311 : from: &RemotePath,
312 : opts: &DownloadOpts,
313 : cancel: &CancellationToken,
314 : ) -> Result<Download, DownloadError>;
315 :
316 : /// Delete a single path from remote storage.
317 : ///
318 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
319 : /// set to `TimeoutOrCancel`. In such situation it is unknown if the deletion went through.
320 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()>;
321 :
322 : /// Delete a multiple paths from remote storage.
323 : ///
324 : /// If the operation fails because of timeout or cancellation, the root cause of the error will be
325 : /// set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
326 : /// through.
327 : async fn delete_objects<'a>(
328 : &self,
329 : paths: &'a [RemotePath],
330 : cancel: &CancellationToken,
331 : ) -> anyhow::Result<()>;
332 :
333 : /// Deletes all objects matching the given prefix.
334 : ///
335 : /// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
336 : /// delete /a/b, /a/b/*, /a/bc, /a/bc/*, etc.
337 : ///
338 : /// If the operation fails because of timeout or cancellation, the root cause of the error will
339 : /// be set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
340 : /// through.
341 18 : async fn delete_prefix(
342 18 : &self,
343 18 : prefix: &RemotePath,
344 18 : cancel: &CancellationToken,
345 18 : ) -> anyhow::Result<()> {
346 18 : let mut stream =
347 18 : pin!(self.list_streaming(Some(prefix), ListingMode::NoDelimiter, None, cancel));
348 76 : while let Some(result) = stream.next().await {
349 21 : let keys = match result {
350 21 : Ok(listing) if listing.keys.is_empty() => continue,
351 63 : Ok(listing) => listing.keys.into_iter().map(|o| o.key).collect_vec(),
352 0 : Err(DownloadError::Cancelled) => return Err(TimeoutOrCancel::Cancel.into()),
353 0 : Err(DownloadError::Timeout) => return Err(TimeoutOrCancel::Timeout.into()),
354 0 : Err(err) => return Err(err.into()),
355 : };
356 12 : tracing::info!("Deleting {} keys from remote storage", keys.len());
357 122 : self.delete_objects(&keys, cancel).await?;
358 : }
359 18 : Ok(())
360 18 : }
361 :
362 : /// Copy a remote object inside a bucket from one path to another.
363 : async fn copy(
364 : &self,
365 : from: &RemotePath,
366 : to: &RemotePath,
367 : cancel: &CancellationToken,
368 : ) -> anyhow::Result<()>;
369 :
370 : /// Resets the content of everything with the given prefix to the given state
371 : async fn time_travel_recover(
372 : &self,
373 : prefix: Option<&RemotePath>,
374 : timestamp: SystemTime,
375 : done_if_after: SystemTime,
376 : cancel: &CancellationToken,
377 : ) -> Result<(), TimeTravelError>;
378 : }
379 :
380 : /// Data part of an ongoing [`Download`].
381 : ///
382 : /// `DownloadStream` is sensitive to the timeout and cancellation used with the original
383 : /// [`RemoteStorage::download`] request. The type yields `std::io::Result<Bytes>` to be compatible
384 : /// with `tokio::io::copy_buf`.
385 : // This has 'static because safekeepers do not use cancellation tokens (yet)
386 : pub type DownloadStream =
387 : Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>>;
388 :
389 : pub struct Download {
390 : pub download_stream: DownloadStream,
391 : /// The last time the file was modified (`last-modified` HTTP header)
392 : pub last_modified: SystemTime,
393 : /// A way to identify this specific version of the resource (`etag` HTTP header)
394 : pub etag: Etag,
395 : /// Extra key-value data, associated with the current remote file.
396 : pub metadata: Option<StorageMetadata>,
397 : }
398 :
399 : impl Debug for Download {
400 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401 0 : f.debug_struct("Download")
402 0 : .field("metadata", &self.metadata)
403 0 : .finish()
404 0 : }
405 : }
406 :
407 : /// Every storage, currently supported.
408 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
409 : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
410 : #[derive(Clone)]
411 : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
412 : LocalFs(LocalFs),
413 : AwsS3(Arc<S3Bucket>),
414 : AzureBlob(Arc<AzureBlobStorage>),
415 : Unreliable(Other),
416 : }
417 :
418 : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
419 : // See [`RemoteStorage::list`].
420 261 : pub async fn list(
421 261 : &self,
422 261 : prefix: Option<&RemotePath>,
423 261 : mode: ListingMode,
424 261 : max_keys: Option<NonZeroU32>,
425 261 : cancel: &CancellationToken,
426 261 : ) -> Result<Listing, DownloadError> {
427 261 : match self {
428 722 : Self::LocalFs(s) => s.list(prefix, mode, max_keys, cancel).await,
429 250 : Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
430 226 : Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
431 0 : Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
432 : }
433 261 : }
434 :
435 : // See [`RemoteStorage::list_streaming`].
436 3 : pub fn list_streaming<'a>(
437 3 : &'a self,
438 3 : prefix: Option<&'a RemotePath>,
439 3 : mode: ListingMode,
440 3 : max_keys: Option<NonZeroU32>,
441 3 : cancel: &'a CancellationToken,
442 3 : ) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
443 3 : match self {
444 0 : Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
445 0 : as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
446 2 : Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
447 1 : Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
448 0 : Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
449 : }
450 3 : }
451 :
452 : // See [`RemoteStorage::head_object`].
453 9 : pub async fn head_object(
454 9 : &self,
455 9 : key: &RemotePath,
456 9 : cancel: &CancellationToken,
457 9 : ) -> Result<ListingObject, DownloadError> {
458 9 : match self {
459 0 : Self::LocalFs(s) => s.head_object(key, cancel).await,
460 20 : Self::AwsS3(s) => s.head_object(key, cancel).await,
461 17 : Self::AzureBlob(s) => s.head_object(key, cancel).await,
462 0 : Self::Unreliable(s) => s.head_object(key, cancel).await,
463 : }
464 9 : }
465 :
466 : /// See [`RemoteStorage::upload`]
467 3097 : pub async fn upload(
468 3097 : &self,
469 3097 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
470 3097 : data_size_bytes: usize,
471 3097 : to: &RemotePath,
472 3097 : metadata: Option<StorageMetadata>,
473 3097 : cancel: &CancellationToken,
474 3097 : ) -> anyhow::Result<()> {
475 3097 : match self {
476 37048 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
477 1094 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
478 557 : Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
479 38 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
480 : }
481 3089 : }
482 :
483 : /// See [`RemoteStorage::download`]
484 86 : pub async fn download(
485 86 : &self,
486 86 : from: &RemotePath,
487 86 : opts: &DownloadOpts,
488 86 : cancel: &CancellationToken,
489 86 : ) -> Result<Download, DownloadError> {
490 86 : match self {
491 63 : Self::LocalFs(s) => s.download(from, opts, cancel).await,
492 40 : Self::AwsS3(s) => s.download(from, opts, cancel).await,
493 56 : Self::AzureBlob(s) => s.download(from, opts, cancel).await,
494 0 : Self::Unreliable(s) => s.download(from, opts, cancel).await,
495 : }
496 86 : }
497 :
498 : /// See [`RemoteStorage::delete`]
499 262 : pub async fn delete(
500 262 : &self,
501 262 : path: &RemotePath,
502 262 : cancel: &CancellationToken,
503 262 : ) -> anyhow::Result<()> {
504 262 : match self {
505 2 : Self::LocalFs(s) => s.delete(path, cancel).await,
506 580 : Self::AwsS3(s) => s.delete(path, cancel).await,
507 437 : Self::AzureBlob(s) => s.delete(path, cancel).await,
508 0 : Self::Unreliable(s) => s.delete(path, cancel).await,
509 : }
510 262 : }
511 :
512 : /// See [`RemoteStorage::delete_objects`]
513 21 : pub async fn delete_objects(
514 21 : &self,
515 21 : paths: &[RemotePath],
516 21 : cancel: &CancellationToken,
517 21 : ) -> anyhow::Result<()> {
518 21 : match self {
519 6 : Self::LocalFs(s) => s.delete_objects(paths, cancel).await,
520 49 : Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
521 25 : Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
522 0 : Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
523 : }
524 21 : }
525 :
526 : /// See [`RemoteStorage::delete_prefix`]
527 18 : pub async fn delete_prefix(
528 18 : &self,
529 18 : prefix: &RemotePath,
530 18 : cancel: &CancellationToken,
531 18 : ) -> anyhow::Result<()> {
532 18 : match self {
533 0 : Self::LocalFs(s) => s.delete_prefix(prefix, cancel).await,
534 50 : Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
535 148 : Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
536 0 : Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
537 : }
538 18 : }
539 :
540 : /// See [`RemoteStorage::copy`]
541 3 : pub async fn copy_object(
542 3 : &self,
543 3 : from: &RemotePath,
544 3 : to: &RemotePath,
545 3 : cancel: &CancellationToken,
546 3 : ) -> anyhow::Result<()> {
547 3 : match self {
548 0 : Self::LocalFs(s) => s.copy(from, to, cancel).await,
549 2 : Self::AwsS3(s) => s.copy(from, to, cancel).await,
550 5 : Self::AzureBlob(s) => s.copy(from, to, cancel).await,
551 0 : Self::Unreliable(s) => s.copy(from, to, cancel).await,
552 : }
553 3 : }
554 :
555 : /// See [`RemoteStorage::time_travel_recover`].
556 6 : pub async fn time_travel_recover(
557 6 : &self,
558 6 : prefix: Option<&RemotePath>,
559 6 : timestamp: SystemTime,
560 6 : done_if_after: SystemTime,
561 6 : cancel: &CancellationToken,
562 6 : ) -> Result<(), TimeTravelError> {
563 6 : match self {
564 0 : Self::LocalFs(s) => {
565 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
566 0 : .await
567 : }
568 6 : Self::AwsS3(s) => {
569 6 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
570 81 : .await
571 : }
572 0 : Self::AzureBlob(s) => {
573 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
574 0 : .await
575 : }
576 0 : Self::Unreliable(s) => {
577 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
578 0 : .await
579 : }
580 : }
581 6 : }
582 : }
583 :
584 : impl GenericRemoteStorage {
585 232 : pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
586 232 : let timeout = storage_config.timeout;
587 232 : Ok(match &storage_config.storage {
588 196 : RemoteStorageKind::LocalFs { local_path: path } => {
589 196 : info!("Using fs root '{path}' as a remote storage");
590 196 : Self::LocalFs(LocalFs::new(path.clone(), timeout)?)
591 : }
592 26 : RemoteStorageKind::AwsS3(s3_config) => {
593 26 : // The profile and access key id are only printed here for debugging purposes,
594 26 : // their values don't indicate the eventually taken choice for auth.
595 26 : let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
596 26 : let access_key_id =
597 26 : std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
598 26 : info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
599 : s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
600 26 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?))
601 : }
602 10 : RemoteStorageKind::AzureContainer(azure_config) => {
603 10 : let storage_account = azure_config
604 10 : .storage_account
605 10 : .as_deref()
606 10 : .unwrap_or("<AZURE_STORAGE_ACCOUNT>");
607 10 : info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
608 : azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
609 10 : Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?))
610 : }
611 : })
612 232 : }
613 :
614 2 : pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
615 2 : Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
616 2 : }
617 :
618 : /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
619 1399 : pub async fn upload_storage_object(
620 1399 : &self,
621 1399 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
622 1399 : from_size_bytes: usize,
623 1399 : to: &RemotePath,
624 1399 : cancel: &CancellationToken,
625 1399 : ) -> anyhow::Result<()> {
626 1399 : self.upload(from, from_size_bytes, to, None, cancel)
627 3665 : .await
628 1396 : .with_context(|| {
629 0 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
630 1396 : })
631 1396 : }
632 :
633 : /// The name of the bucket/container/etc.
634 0 : pub fn bucket_name(&self) -> Option<&str> {
635 0 : match self {
636 0 : Self::LocalFs(_s) => None,
637 0 : Self::AwsS3(s) => Some(s.bucket_name()),
638 0 : Self::AzureBlob(s) => Some(s.container_name()),
639 0 : Self::Unreliable(_s) => None,
640 : }
641 0 : }
642 : }
643 :
644 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
645 : /// Immutable, cannot be changed once the file is created.
646 : #[derive(Debug, Clone, PartialEq, Eq)]
647 : pub struct StorageMetadata(HashMap<String, String>);
648 :
649 : impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
650 0 : fn from(arr: [(&str, &str); N]) -> Self {
651 0 : let map: HashMap<String, String> = arr
652 0 : .iter()
653 0 : .map(|(k, v)| (k.to_string(), v.to_string()))
654 0 : .collect();
655 0 : Self(map)
656 0 : }
657 : }
658 :
659 : struct ConcurrencyLimiter {
660 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
661 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
662 : // The helps to ensure we don't exceed the thresholds.
663 : write: Arc<Semaphore>,
664 : read: Arc<Semaphore>,
665 : }
666 :
667 : impl ConcurrencyLimiter {
668 731 : fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
669 731 : match kind {
670 44 : RequestKind::Get => &self.read,
671 292 : RequestKind::Put => &self.write,
672 90 : RequestKind::List => &self.read,
673 287 : RequestKind::Delete => &self.write,
674 3 : RequestKind::Copy => &self.write,
675 6 : RequestKind::TimeTravel => &self.write,
676 9 : RequestKind::Head => &self.read,
677 : }
678 731 : }
679 :
680 698 : async fn acquire(
681 698 : &self,
682 698 : kind: RequestKind,
683 698 : ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
684 698 : self.for_kind(kind).acquire().await
685 698 : }
686 :
687 33 : async fn acquire_owned(
688 33 : &self,
689 33 : kind: RequestKind,
690 33 : ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
691 33 : Arc::clone(self.for_kind(kind)).acquire_owned().await
692 33 : }
693 :
694 51 : fn new(limit: usize) -> ConcurrencyLimiter {
695 51 : Self {
696 51 : read: Arc::new(Semaphore::new(limit)),
697 51 : write: Arc::new(Semaphore::new(limit)),
698 51 : }
699 51 : }
700 : }
701 :
702 : #[cfg(test)]
703 : mod tests {
704 : use super::*;
705 :
706 : /// DownloadOpts::byte_range() should generate (inclusive, exclusive) ranges
707 : /// with optional end bound, or None when unbounded.
708 : #[test]
709 3 : fn download_opts_byte_range() {
710 3 : // Consider using test_case or a similar table-driven test framework.
711 3 : let cases = [
712 3 : // (byte_start, byte_end, expected)
713 3 : (Bound::Unbounded, Bound::Unbounded, None),
714 3 : (Bound::Unbounded, Bound::Included(7), Some((0, Some(8)))),
715 3 : (Bound::Unbounded, Bound::Excluded(7), Some((0, Some(7)))),
716 3 : (Bound::Included(3), Bound::Unbounded, Some((3, None))),
717 3 : (Bound::Included(3), Bound::Included(7), Some((3, Some(8)))),
718 3 : (Bound::Included(3), Bound::Excluded(7), Some((3, Some(7)))),
719 3 : (Bound::Excluded(3), Bound::Unbounded, Some((4, None))),
720 3 : (Bound::Excluded(3), Bound::Included(7), Some((4, Some(8)))),
721 3 : (Bound::Excluded(3), Bound::Excluded(7), Some((4, Some(7)))),
722 3 : // 1-sized ranges are fine, 0 aren't and will panic (separate test).
723 3 : (Bound::Included(3), Bound::Included(3), Some((3, Some(4)))),
724 3 : (Bound::Included(3), Bound::Excluded(4), Some((3, Some(4)))),
725 3 : ];
726 :
727 36 : for (byte_start, byte_end, expect) in cases {
728 33 : let opts = DownloadOpts {
729 33 : byte_start,
730 33 : byte_end,
731 33 : ..Default::default()
732 33 : };
733 33 : let result = opts.byte_range();
734 33 : assert_eq!(
735 : result, expect,
736 0 : "byte_start={byte_start:?} byte_end={byte_end:?}"
737 : );
738 :
739 : // Check generated HTTP header, which uses an inclusive range.
740 33 : let expect_header = expect.map(|(start, end)| match end {
741 24 : Some(end) => format!("bytes={start}-{}", end - 1), // inclusive end
742 6 : None => format!("bytes={start}-"),
743 33 : });
744 33 : assert_eq!(
745 33 : opts.byte_range_header(),
746 : expect_header,
747 0 : "byte_start={byte_start:?} byte_end={byte_end:?}"
748 : );
749 : }
750 3 : }
751 :
752 : /// DownloadOpts::byte_range() zero-sized byte range should panic.
753 : #[test]
754 : #[should_panic]
755 3 : fn download_opts_byte_range_zero() {
756 3 : DownloadOpts {
757 3 : byte_start: Bound::Included(3),
758 3 : byte_end: Bound::Excluded(3),
759 3 : ..Default::default()
760 3 : }
761 3 : .byte_range();
762 3 : }
763 :
764 : /// DownloadOpts::byte_range() negative byte range should panic.
765 : #[test]
766 : #[should_panic]
767 3 : fn download_opts_byte_range_negative() {
768 3 : DownloadOpts {
769 3 : byte_start: Bound::Included(3),
770 3 : byte_end: Bound::Included(2),
771 3 : ..Default::default()
772 3 : }
773 3 : .byte_range();
774 3 : }
775 :
776 : #[test]
777 3 : fn test_object_name() {
778 3 : let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
779 3 : assert_eq!(k.object_name(), Some("c"));
780 :
781 3 : let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
782 3 : assert_eq!(k.object_name(), Some("c"));
783 :
784 3 : let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
785 3 : assert_eq!(k.object_name(), Some("a"));
786 :
787 : // XXX is it impossible to have an empty key?
788 3 : let k = RemotePath::new(Utf8Path::new("")).unwrap();
789 3 : assert_eq!(k.object_name(), None);
790 3 : }
791 :
792 : #[test]
793 3 : fn rempte_path_cannot_be_created_from_absolute_ones() {
794 3 : let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
795 3 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
796 3 : }
797 : }
|