Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //! * [`azure_blob`] allows to use Azure Blob storage as an external storage
8 : //!
9 : #![deny(unsafe_code)]
10 : #![deny(clippy::undocumented_unsafe_blocks)]
11 :
12 : mod azure_blob;
13 : mod local_fs;
14 : mod s3_bucket;
15 : mod simulate_failures;
16 : mod support;
17 :
18 : use std::{
19 : collections::HashMap,
20 : fmt::Debug,
21 : num::{NonZeroU32, NonZeroUsize},
22 : pin::Pin,
23 : sync::Arc,
24 : time::SystemTime,
25 : };
26 :
27 : use anyhow::{bail, Context};
28 : use camino::{Utf8Path, Utf8PathBuf};
29 :
30 : use bytes::Bytes;
31 : use futures::stream::Stream;
32 : use serde::{Deserialize, Serialize};
33 : use tokio::sync::Semaphore;
34 : use tokio_util::sync::CancellationToken;
35 : use toml_edit::Item;
36 : use tracing::info;
37 :
38 : pub use self::{
39 : azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket,
40 : simulate_failures::UnreliableWrapper,
41 : };
42 : use s3_bucket::RequestKind;
43 :
44 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
45 : /// ~200 RPS for IAM services
46 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
47 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
48 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
49 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
50 : /// We set this a little bit low as we currently buffer the entire file into RAM
51 : ///
52 : /// Here, a limit of max 20k concurrent connections was noted.
53 : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
54 : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 30;
55 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
56 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
57 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
58 :
59 : /// As defined in S3 docs
60 : pub const MAX_KEYS_PER_DELETE: usize = 1000;
61 :
62 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
63 :
64 : /// Path on the remote storage, relative to some inner prefix.
65 : /// The prefix is an implementation detail, that allows representing local paths
66 : /// as the remote ones, stripping the local storage prefix away.
67 17591 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
68 : pub struct RemotePath(Utf8PathBuf);
69 :
70 : impl Serialize for RemotePath {
71 0 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
72 0 : where
73 0 : S: serde::Serializer,
74 0 : {
75 0 : serializer.collect_str(self)
76 0 : }
77 : }
78 :
79 : impl<'de> Deserialize<'de> for RemotePath {
80 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
81 0 : where
82 0 : D: serde::Deserializer<'de>,
83 0 : {
84 0 : let str = String::deserialize(deserializer)?;
85 0 : Ok(Self(Utf8PathBuf::from(&str)))
86 0 : }
87 : }
88 :
89 : impl std::fmt::Display for RemotePath {
90 1032 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 1032 : std::fmt::Display::fmt(&self.0, f)
92 1032 : }
93 : }
94 :
95 : impl RemotePath {
96 57576 : pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
97 57576 : anyhow::ensure!(
98 57576 : relative_path.is_relative(),
99 2 : "Path {relative_path:?} is not relative"
100 : );
101 57574 : Ok(Self(relative_path.to_path_buf()))
102 57576 : }
103 :
104 33172 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
105 33172 : Self::new(Utf8Path::new(relative_path))
106 33172 : }
107 :
108 16055 : pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
109 16055 : base_path.join(&self.0)
110 16055 : }
111 :
112 6349 : pub fn object_name(&self) -> Option<&str> {
113 6349 : self.0.file_name()
114 6349 : }
115 :
116 5245 : pub fn join(&self, segment: &Utf8Path) -> Self {
117 5245 : Self(self.0.join(segment))
118 5245 : }
119 :
120 35128 : pub fn get_path(&self) -> &Utf8PathBuf {
121 35128 : &self.0
122 35128 : }
123 :
124 0 : pub fn extension(&self) -> Option<&str> {
125 0 : self.0.extension()
126 0 : }
127 :
128 3675 : pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
129 3675 : self.0.strip_prefix(&p.0)
130 3675 : }
131 : }
132 :
133 : /// We don't need callers to be able to pass arbitrary delimiters: just control
134 : /// whether listings will use a '/' separator or not.
135 : ///
136 : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
137 : /// NoDelimiter mode will only populate `keys`.
138 : pub enum ListingMode {
139 : WithDelimiter,
140 : NoDelimiter,
141 : }
142 :
143 1298 : #[derive(Default)]
144 : pub struct Listing {
145 : pub prefixes: Vec<RemotePath>,
146 : pub keys: Vec<RemotePath>,
147 : }
148 :
149 : /// Storage (potentially remote) API to manage its state.
150 : /// This storage tries to be unaware of any layered repository context,
151 : /// providing basic CRUD operations for storage files.
152 : #[allow(async_fn_in_trait)]
153 : pub trait RemoteStorage: Send + Sync + 'static {
154 : /// Lists all top level subdirectories for a given prefix
155 : /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
156 : /// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
157 : /// so this method doesnt need to.
158 9 : async fn list_prefixes(
159 9 : &self,
160 9 : prefix: Option<&RemotePath>,
161 9 : ) -> Result<Vec<RemotePath>, DownloadError> {
162 0 : let result = self
163 0 : .list(prefix, ListingMode::WithDelimiter, None)
164 0 : .await?
165 : .prefixes;
166 0 : Ok(result)
167 0 : }
168 : /// Lists all files in directory "recursively"
169 : /// (not really recursively, because AWS has a flat namespace)
170 : /// Note: This is subtely different than list_prefixes,
171 : /// because it is for listing files instead of listing
172 : /// names sharing common prefixes.
173 : /// For example,
174 : /// list_files("foo/bar") = ["foo/bar/cat123.txt",
175 : /// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"]
176 : /// whereas,
177 : /// list_prefixes("foo/bar/") = ["cat", "dog"]
178 : /// See `test_real_s3.rs` for more details.
179 : ///
180 : /// max_keys limits max number of keys returned; None means unlimited.
181 447 : async fn list_files(
182 447 : &self,
183 447 : prefix: Option<&RemotePath>,
184 447 : max_keys: Option<NonZeroU32>,
185 447 : ) -> Result<Vec<RemotePath>, DownloadError> {
186 426 : let result = self
187 426 : .list(prefix, ListingMode::NoDelimiter, max_keys)
188 1440 : .await?
189 : .keys;
190 426 : Ok(result)
191 426 : }
192 :
193 : async fn list(
194 : &self,
195 : prefix: Option<&RemotePath>,
196 : _mode: ListingMode,
197 : max_keys: Option<NonZeroU32>,
198 : ) -> Result<Listing, DownloadError>;
199 :
200 : /// Streams the local file contents into remote into the remote storage entry.
201 : async fn upload(
202 : &self,
203 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
204 : // S3 PUT request requires the content length to be specified,
205 : // otherwise it starts to fail with the concurrent connection count increasing.
206 : data_size_bytes: usize,
207 : to: &RemotePath,
208 : metadata: Option<StorageMetadata>,
209 : ) -> anyhow::Result<()>;
210 :
211 : /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
212 : /// Returns the metadata, if any was stored with the file previously.
213 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError>;
214 :
215 : /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
216 : /// Returns the metadata, if any was stored with the file previously.
217 : async fn download_byte_range(
218 : &self,
219 : from: &RemotePath,
220 : start_inclusive: u64,
221 : end_exclusive: Option<u64>,
222 : ) -> Result<Download, DownloadError>;
223 :
224 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
225 :
226 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
227 :
228 : /// Copy a remote object inside a bucket from one path to another.
229 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>;
230 :
231 : /// Resets the content of everything with the given prefix to the given state
232 : async fn time_travel_recover(
233 : &self,
234 : prefix: Option<&RemotePath>,
235 : timestamp: SystemTime,
236 : done_if_after: SystemTime,
237 : cancel: &CancellationToken,
238 : ) -> Result<(), TimeTravelError>;
239 : }
240 :
241 : pub type DownloadStream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync>>;
242 : pub struct Download {
243 : pub download_stream: DownloadStream,
244 : /// The last time the file was modified (`last-modified` HTTP header)
245 : pub last_modified: Option<SystemTime>,
246 : /// A way to identify this specific version of the resource (`etag` HTTP header)
247 : pub etag: Option<String>,
248 : /// Extra key-value data, associated with the current remote file.
249 : pub metadata: Option<StorageMetadata>,
250 : }
251 :
252 : impl Debug for Download {
253 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 0 : f.debug_struct("Download")
255 0 : .field("metadata", &self.metadata)
256 0 : .finish()
257 0 : }
258 : }
259 :
260 0 : #[derive(Debug)]
261 : pub enum DownloadError {
262 : /// Validation or other error happened due to user input.
263 : BadInput(anyhow::Error),
264 : /// The file was not found in the remote storage.
265 : NotFound,
266 : /// A cancellation token aborted the download, typically during
267 : /// tenant detach or process shutdown.
268 : Cancelled,
269 : /// The file was found in the remote storage, but the download failed.
270 : Other(anyhow::Error),
271 : }
272 :
273 : impl std::fmt::Display for DownloadError {
274 258 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 258 : match self {
276 0 : DownloadError::BadInput(e) => {
277 0 : write!(f, "Failed to download a remote file due to user input: {e}")
278 : }
279 7 : DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
280 0 : DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
281 251 : DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
282 : }
283 258 : }
284 : }
285 :
286 : impl std::error::Error for DownloadError {}
287 :
288 : impl DownloadError {
289 : /// Returns true if the error should not be retried with backoff
290 835 : pub fn is_permanent(&self) -> bool {
291 835 : use DownloadError::*;
292 835 : match self {
293 0 : BadInput(_) => true,
294 623 : NotFound => true,
295 0 : Cancelled => true,
296 212 : Other(_) => false,
297 : }
298 835 : }
299 : }
300 :
301 0 : #[derive(Debug)]
302 : pub enum TimeTravelError {
303 : /// Validation or other error happened due to user input.
304 : BadInput(anyhow::Error),
305 : /// The used remote storage does not have time travel recovery implemented
306 : Unimplemented,
307 : /// The number of versions/deletion markers is above our limit.
308 : TooManyVersions,
309 : /// A cancellation token aborted the process, typically during
310 : /// request closure or process shutdown.
311 : Cancelled,
312 : /// Other errors
313 : Other(anyhow::Error),
314 : }
315 :
316 : impl std::fmt::Display for TimeTravelError {
317 1 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318 1 : match self {
319 0 : TimeTravelError::BadInput(e) => {
320 0 : write!(
321 0 : f,
322 0 : "Failed to time travel recover a prefix due to user input: {e}"
323 0 : )
324 : }
325 0 : TimeTravelError::Unimplemented => write!(
326 0 : f,
327 0 : "time travel recovery is not implemented for the current storage backend"
328 0 : ),
329 0 : TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"),
330 : TimeTravelError::TooManyVersions => {
331 0 : write!(f, "Number of versions/delete markers above limit")
332 : }
333 1 : TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"),
334 : }
335 1 : }
336 : }
337 :
338 : impl std::error::Error for TimeTravelError {}
339 :
340 : /// Every storage, currently supported.
341 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
342 7631 : #[derive(Clone)]
343 : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
344 : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
345 : LocalFs(LocalFs),
346 : AwsS3(Arc<S3Bucket>),
347 : AzureBlob(Arc<AzureBlobStorage>),
348 : Unreliable(Other),
349 : }
350 :
351 : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
352 992 : pub async fn list(
353 992 : &self,
354 992 : prefix: Option<&RemotePath>,
355 992 : mode: ListingMode,
356 992 : max_keys: Option<NonZeroU32>,
357 992 : ) -> anyhow::Result<Listing, DownloadError> {
358 992 : match self {
359 573 : Self::LocalFs(s) => s.list(prefix, mode, max_keys).await,
360 1768 : Self::AwsS3(s) => s.list(prefix, mode, max_keys).await,
361 0 : Self::AzureBlob(s) => s.list(prefix, mode, max_keys).await,
362 224 : Self::Unreliable(s) => s.list(prefix, mode, max_keys).await,
363 : }
364 992 : }
365 :
366 : // A function for listing all the files in a "directory"
367 : // Example:
368 : // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
369 : //
370 : // max_keys limits max number of keys returned; None means unlimited.
371 554 : pub async fn list_files(
372 554 : &self,
373 554 : folder: Option<&RemotePath>,
374 554 : max_keys: Option<NonZeroU32>,
375 554 : ) -> Result<Vec<RemotePath>, DownloadError> {
376 554 : match self {
377 330 : Self::LocalFs(s) => s.list_files(folder, max_keys).await,
378 1110 : Self::AwsS3(s) => s.list_files(folder, max_keys).await,
379 0 : Self::AzureBlob(s) => s.list_files(folder, max_keys).await,
380 262 : Self::Unreliable(s) => s.list_files(folder, max_keys).await,
381 : }
382 554 : }
383 :
384 : // lists common *prefixes*, if any of files
385 : // Example:
386 : // list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
387 0 : pub async fn list_prefixes(
388 0 : &self,
389 0 : prefix: Option<&RemotePath>,
390 0 : ) -> Result<Vec<RemotePath>, DownloadError> {
391 0 : match self {
392 0 : Self::LocalFs(s) => s.list_prefixes(prefix).await,
393 0 : Self::AwsS3(s) => s.list_prefixes(prefix).await,
394 0 : Self::AzureBlob(s) => s.list_prefixes(prefix).await,
395 0 : Self::Unreliable(s) => s.list_prefixes(prefix).await,
396 : }
397 0 : }
398 :
399 34039 : pub async fn upload(
400 34039 : &self,
401 34039 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
402 34039 : data_size_bytes: usize,
403 34039 : to: &RemotePath,
404 34039 : metadata: Option<StorageMetadata>,
405 34039 : ) -> anyhow::Result<()> {
406 34039 : match self {
407 944335 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
408 56585 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
409 0 : Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await,
410 20183 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
411 : }
412 34030 : }
413 :
414 11992 : pub async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
415 11992 : match self {
416 1403 : Self::LocalFs(s) => s.download(from).await,
417 30321 : Self::AwsS3(s) => s.download(from).await,
418 0 : Self::AzureBlob(s) => s.download(from).await,
419 184 : Self::Unreliable(s) => s.download(from).await,
420 : }
421 11989 : }
422 :
423 38 : pub async fn download_byte_range(
424 38 : &self,
425 38 : from: &RemotePath,
426 38 : start_inclusive: u64,
427 38 : end_exclusive: Option<u64>,
428 38 : ) -> Result<Download, DownloadError> {
429 38 : match self {
430 2 : Self::LocalFs(s) => {
431 2 : s.download_byte_range(from, start_inclusive, end_exclusive)
432 6 : .await
433 : }
434 36 : Self::AwsS3(s) => {
435 36 : s.download_byte_range(from, start_inclusive, end_exclusive)
436 106 : .await
437 : }
438 0 : Self::AzureBlob(s) => {
439 0 : s.download_byte_range(from, start_inclusive, end_exclusive)
440 0 : .await
441 : }
442 0 : Self::Unreliable(s) => {
443 0 : s.download_byte_range(from, start_inclusive, end_exclusive)
444 0 : .await
445 : }
446 : }
447 38 : }
448 :
449 2100 : pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
450 2100 : match self {
451 16 : Self::LocalFs(s) => s.delete(path).await,
452 8284 : Self::AwsS3(s) => s.delete(path).await,
453 0 : Self::AzureBlob(s) => s.delete(path).await,
454 142 : Self::Unreliable(s) => s.delete(path).await,
455 : }
456 2100 : }
457 :
458 532 : pub async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
459 532 : match self {
460 1133 : Self::LocalFs(s) => s.delete_objects(paths).await,
461 1035 : Self::AwsS3(s) => s.delete_objects(paths).await,
462 0 : Self::AzureBlob(s) => s.delete_objects(paths).await,
463 7981 : Self::Unreliable(s) => s.delete_objects(paths).await,
464 : }
465 532 : }
466 :
467 14 : pub async fn copy_object(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
468 14 : match self {
469 2 : Self::LocalFs(s) => s.copy(from, to).await,
470 48 : Self::AwsS3(s) => s.copy(from, to).await,
471 0 : Self::AzureBlob(s) => s.copy(from, to).await,
472 0 : Self::Unreliable(s) => s.copy(from, to).await,
473 : }
474 14 : }
475 :
476 1 : pub async fn time_travel_recover(
477 1 : &self,
478 1 : prefix: Option<&RemotePath>,
479 1 : timestamp: SystemTime,
480 1 : done_if_after: SystemTime,
481 1 : cancel: &CancellationToken,
482 1 : ) -> Result<(), TimeTravelError> {
483 1 : match self {
484 0 : Self::LocalFs(s) => {
485 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
486 0 : .await
487 : }
488 1 : Self::AwsS3(s) => {
489 1 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
490 151 : .await
491 : }
492 0 : Self::AzureBlob(s) => {
493 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
494 0 : .await
495 : }
496 0 : Self::Unreliable(s) => {
497 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
498 0 : .await
499 : }
500 : }
501 1 : }
502 : }
503 :
504 : impl GenericRemoteStorage {
505 770 : pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
506 770 : Ok(match &storage_config.storage {
507 480 : RemoteStorageKind::LocalFs(root) => {
508 480 : info!("Using fs root '{root}' as a remote storage");
509 480 : Self::LocalFs(LocalFs::new(root.clone())?)
510 : }
511 284 : RemoteStorageKind::AwsS3(s3_config) => {
512 284 : // The profile and access key id are only printed here for debugging purposes,
513 284 : // their values don't indicate the eventually taken choice for auth.
514 284 : let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
515 284 : let access_key_id =
516 284 : std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
517 284 : info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
518 284 : s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
519 284 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config)?))
520 : }
521 6 : RemoteStorageKind::AzureContainer(azure_config) => {
522 6 : info!("Using azure container '{}' in region '{}' as a remote storage, prefix in container: '{:?}'",
523 6 : azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
524 6 : Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?))
525 : }
526 : })
527 770 : }
528 :
529 67 : pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
530 67 : Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
531 67 : }
532 :
533 : /// Takes storage object contents and its size and uploads to remote storage,
534 : /// mapping `from_path` to the corresponding remote object id in the storage.
535 : ///
536 : /// The storage object does not have to be present on the `from_path`,
537 : /// this path is used for the remote object id conversion only.
538 7549 : pub async fn upload_storage_object(
539 7549 : &self,
540 7549 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
541 7549 : from_size_bytes: usize,
542 7549 : to: &RemotePath,
543 7549 : ) -> anyhow::Result<()> {
544 7549 : self.upload(from, from_size_bytes, to, None)
545 44615 : .await
546 7542 : .with_context(|| {
547 783 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
548 7542 : })
549 7542 : }
550 :
551 : /// Downloads the storage object into the `to_path` provided.
552 : /// `byte_range` could be specified to dowload only a part of the file, if needed.
553 38 : pub async fn download_storage_object(
554 38 : &self,
555 38 : byte_range: Option<(u64, Option<u64>)>,
556 38 : from: &RemotePath,
557 38 : ) -> Result<Download, DownloadError> {
558 38 : match byte_range {
559 112 : Some((start, end)) => self.download_byte_range(from, start, end).await,
560 0 : None => self.download(from).await,
561 : }
562 38 : }
563 : }
564 :
565 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
566 : /// Immutable, cannot be changed once the file is created.
567 4 : #[derive(Debug, Clone, PartialEq, Eq)]
568 : pub struct StorageMetadata(HashMap<String, String>);
569 :
570 : /// External backup storage configuration, enough for creating a client for that storage.
571 797 : #[derive(Debug, Clone, PartialEq, Eq)]
572 : pub struct RemoteStorageConfig {
573 : /// The storage connection configuration.
574 : pub storage: RemoteStorageKind,
575 : }
576 :
577 : /// A kind of a remote storage to connect to, with its connection configuration.
578 797 : #[derive(Debug, Clone, PartialEq, Eq)]
579 : pub enum RemoteStorageKind {
580 : /// Storage based on local file system.
581 : /// Specify a root folder to place all stored files into.
582 : LocalFs(Utf8PathBuf),
583 : /// AWS S3 based storage, storing all files in the S3 bucket
584 : /// specified by the config
585 : AwsS3(S3Config),
586 : /// Azure Blob based storage, storing all files in the container
587 : /// specified by the config
588 : AzureContainer(AzureConfig),
589 : }
590 :
591 : /// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
592 688 : #[derive(Clone, PartialEq, Eq)]
593 : pub struct S3Config {
594 : /// Name of the bucket to connect to.
595 : pub bucket_name: String,
596 : /// The region where the bucket is located at.
597 : pub bucket_region: String,
598 : /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
599 : pub prefix_in_bucket: Option<String>,
600 : /// A base URL to send S3 requests to.
601 : /// By default, the endpoint is derived from a region name, assuming it's
602 : /// an AWS S3 region name, erroring on wrong region name.
603 : /// Endpoint provides a way to support other S3 flavors and their regions.
604 : ///
605 : /// Example: `http://127.0.0.1:5000`
606 : pub endpoint: Option<String>,
607 : /// AWS S3 has various limits on its API calls, we need not to exceed those.
608 : /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
609 : pub concurrency_limit: NonZeroUsize,
610 : pub max_keys_per_list_response: Option<i32>,
611 : }
612 :
613 : impl Debug for S3Config {
614 19 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
615 19 : f.debug_struct("S3Config")
616 19 : .field("bucket_name", &self.bucket_name)
617 19 : .field("bucket_region", &self.bucket_region)
618 19 : .field("prefix_in_bucket", &self.prefix_in_bucket)
619 19 : .field("concurrency_limit", &self.concurrency_limit)
620 19 : .field(
621 19 : "max_keys_per_list_response",
622 19 : &self.max_keys_per_list_response,
623 19 : )
624 19 : .finish()
625 19 : }
626 : }
627 :
628 : /// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
629 0 : #[derive(Clone, PartialEq, Eq)]
630 : pub struct AzureConfig {
631 : /// Name of the container to connect to.
632 : pub container_name: String,
633 : /// The region where the bucket is located at.
634 : pub container_region: String,
635 : /// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
636 : pub prefix_in_container: Option<String>,
637 : /// Azure has various limits on its API calls, we need not to exceed those.
638 : /// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
639 : pub concurrency_limit: NonZeroUsize,
640 : pub max_keys_per_list_response: Option<i32>,
641 : }
642 :
643 : impl Debug for AzureConfig {
644 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
645 0 : f.debug_struct("AzureConfig")
646 0 : .field("bucket_name", &self.container_name)
647 0 : .field("bucket_region", &self.container_region)
648 0 : .field("prefix_in_bucket", &self.prefix_in_container)
649 0 : .field("concurrency_limit", &self.concurrency_limit)
650 0 : .field(
651 0 : "max_keys_per_list_response",
652 0 : &self.max_keys_per_list_response,
653 0 : )
654 0 : .finish()
655 0 : }
656 : }
657 :
658 : impl RemoteStorageConfig {
659 1122 : pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
660 1122 : let local_path = toml.get("local_path");
661 1122 : let bucket_name = toml.get("bucket_name");
662 1122 : let bucket_region = toml.get("bucket_region");
663 1122 : let container_name = toml.get("container_name");
664 1122 : let container_region = toml.get("container_region");
665 :
666 1122 : let use_azure = container_name.is_some() && container_region.is_some();
667 :
668 1122 : let default_concurrency_limit = if use_azure {
669 0 : DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT
670 : } else {
671 1122 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
672 : };
673 1122 : let concurrency_limit = NonZeroUsize::new(
674 1122 : parse_optional_integer("concurrency_limit", toml)?.unwrap_or(default_concurrency_limit),
675 1122 : )
676 1122 : .context("Failed to parse 'concurrency_limit' as a positive integer")?;
677 :
678 1122 : let max_keys_per_list_response =
679 1122 : parse_optional_integer::<i32, _>("max_keys_per_list_response", toml)
680 1122 : .context("Failed to parse 'max_keys_per_list_response' as a positive integer")?
681 1122 : .or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE);
682 :
683 1122 : let endpoint = toml
684 1122 : .get("endpoint")
685 1122 : .map(|endpoint| parse_toml_string("endpoint", endpoint))
686 1122 : .transpose()?;
687 :
688 1060 : let storage = match (
689 1122 : local_path,
690 1122 : bucket_name,
691 1122 : bucket_region,
692 1122 : container_name,
693 1122 : container_region,
694 : ) {
695 : // no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled
696 62 : (None, None, None, None, None) => return Ok(None),
697 : (_, Some(_), None, ..) => {
698 0 : bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
699 : }
700 : (_, None, Some(_), ..) => {
701 0 : bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
702 : }
703 420 : (None, Some(bucket_name), Some(bucket_region), ..) => {
704 420 : RemoteStorageKind::AwsS3(S3Config {
705 420 : bucket_name: parse_toml_string("bucket_name", bucket_name)?,
706 420 : bucket_region: parse_toml_string("bucket_region", bucket_region)?,
707 420 : prefix_in_bucket: toml
708 420 : .get("prefix_in_bucket")
709 420 : .map(|prefix_in_bucket| {
710 420 : parse_toml_string("prefix_in_bucket", prefix_in_bucket)
711 420 : })
712 420 : .transpose()?,
713 420 : endpoint,
714 420 : concurrency_limit,
715 420 : max_keys_per_list_response,
716 : })
717 : }
718 : (_, _, _, Some(_), None) => {
719 0 : bail!("'container_name' option is mandatory if 'container_region' is given ")
720 : }
721 : (_, _, _, None, Some(_)) => {
722 0 : bail!("'container_name' option is mandatory if 'container_region' is given ")
723 : }
724 0 : (None, None, None, Some(container_name), Some(container_region)) => {
725 0 : RemoteStorageKind::AzureContainer(AzureConfig {
726 0 : container_name: parse_toml_string("container_name", container_name)?,
727 0 : container_region: parse_toml_string("container_region", container_region)?,
728 0 : prefix_in_container: toml
729 0 : .get("prefix_in_container")
730 0 : .map(|prefix_in_container| {
731 0 : parse_toml_string("prefix_in_container", prefix_in_container)
732 0 : })
733 0 : .transpose()?,
734 0 : concurrency_limit,
735 0 : max_keys_per_list_response,
736 : })
737 : }
738 640 : (Some(local_path), None, None, None, None) => RemoteStorageKind::LocalFs(
739 640 : Utf8PathBuf::from(parse_toml_string("local_path", local_path)?),
740 : ),
741 : (Some(_), Some(_), ..) => {
742 0 : bail!("'local_path' and 'bucket_name' are mutually exclusive")
743 : }
744 : (Some(_), _, _, Some(_), Some(_)) => {
745 0 : bail!("local_path and 'container_name' are mutually exclusive")
746 : }
747 : };
748 :
749 1060 : Ok(Some(RemoteStorageConfig { storage }))
750 1122 : }
751 : }
752 :
753 : // Helper functions to parse a toml Item
754 2244 : fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
755 2244 : where
756 2244 : I: TryFrom<i64, Error = E>,
757 2244 : E: std::error::Error + Send + Sync + 'static,
758 2244 : {
759 2244 : let toml_integer = match item.get(name) {
760 4 : Some(item) => item
761 4 : .as_integer()
762 4 : .with_context(|| format!("configure option {name} is not an integer"))?,
763 2240 : None => return Ok(None),
764 : };
765 :
766 4 : I::try_from(toml_integer)
767 4 : .map(Some)
768 4 : .with_context(|| format!("configure option {name} is too large"))
769 2244 : }
770 :
771 2117 : fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result<String> {
772 2117 : let s = item
773 2117 : .as_str()
774 2117 : .with_context(|| format!("configure option {name} is not a string"))?;
775 2117 : Ok(s.to_string())
776 2117 : }
777 :
778 : struct ConcurrencyLimiter {
779 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
780 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
781 : // The helps to ensure we don't exceed the thresholds.
782 : write: Arc<Semaphore>,
783 : read: Arc<Semaphore>,
784 : }
785 :
786 : impl ConcurrencyLimiter {
787 29837 : fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
788 29837 : match kind {
789 10492 : RequestKind::Get => &self.read,
790 16366 : RequestKind::Put => &self.write,
791 569 : RequestKind::List => &self.read,
792 2388 : RequestKind::Delete => &self.write,
793 15 : RequestKind::Copy => &self.write,
794 7 : RequestKind::TimeTravel => &self.write,
795 : }
796 29837 : }
797 :
798 19352 : async fn acquire(
799 19352 : &self,
800 19352 : kind: RequestKind,
801 19352 : ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
802 19006 : self.for_kind(kind).acquire().await
803 19006 : }
804 :
805 10485 : async fn acquire_owned(
806 10485 : &self,
807 10485 : kind: RequestKind,
808 10485 : ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
809 10463 : Arc::clone(self.for_kind(kind)).acquire_owned().await
810 10463 : }
811 :
812 300 : fn new(limit: usize) -> ConcurrencyLimiter {
813 300 : Self {
814 300 : read: Arc::new(Semaphore::new(limit)),
815 300 : write: Arc::new(Semaphore::new(limit)),
816 300 : }
817 300 : }
818 : }
819 :
820 : #[cfg(test)]
821 : mod tests {
822 : use super::*;
823 :
824 2 : #[test]
825 2 : fn test_object_name() {
826 2 : let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
827 2 : assert_eq!(k.object_name(), Some("c"));
828 :
829 2 : let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
830 2 : assert_eq!(k.object_name(), Some("c"));
831 :
832 2 : let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
833 2 : assert_eq!(k.object_name(), Some("a"));
834 :
835 : // XXX is it impossible to have an empty key?
836 2 : let k = RemotePath::new(Utf8Path::new("")).unwrap();
837 2 : assert_eq!(k.object_name(), None);
838 2 : }
839 :
840 2 : #[test]
841 2 : fn rempte_path_cannot_be_created_from_absolute_ones() {
842 2 : let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
843 2 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
844 2 : }
845 : }
|