Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //! * [`azure_blob`] allows to use Azure Blob storage as an external storage
8 : //!
9 : #![deny(unsafe_code)]
10 : #![deny(clippy::undocumented_unsafe_blocks)]
11 :
12 : mod azure_blob;
13 : mod local_fs;
14 : mod s3_bucket;
15 : mod simulate_failures;
16 :
17 : use std::{
18 : collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc, time::SystemTime,
19 : };
20 :
21 : use anyhow::{bail, Context};
22 : use camino::{Utf8Path, Utf8PathBuf};
23 :
24 : use bytes::Bytes;
25 : use futures::stream::Stream;
26 : use serde::{Deserialize, Serialize};
27 : use tokio::sync::Semaphore;
28 : use tokio_util::sync::CancellationToken;
29 : use toml_edit::Item;
30 : use tracing::info;
31 :
32 : pub use self::{
33 : azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket,
34 : simulate_failures::UnreliableWrapper,
35 : };
36 : use s3_bucket::RequestKind;
37 :
38 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
39 : /// ~200 RPS for IAM services
40 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
41 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
42 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
43 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
44 : /// We set this a little bit low as we currently buffer the entire file into RAM
45 : ///
46 : /// Here, a limit of max 20k concurrent connections was noted.
47 : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
48 : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 30;
49 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
50 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
51 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
52 :
53 : /// As defined in S3 docs
54 : pub const MAX_KEYS_PER_DELETE: usize = 1000;
55 :
56 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
57 :
58 : /// Path on the remote storage, relative to some inner prefix.
59 : /// The prefix is an implementation detail, that allows representing local paths
60 : /// as the remote ones, stripping the local storage prefix away.
61 17866 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
62 : pub struct RemotePath(Utf8PathBuf);
63 :
64 : impl Serialize for RemotePath {
65 0 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
66 0 : where
67 0 : S: serde::Serializer,
68 0 : {
69 0 : serializer.collect_str(self)
70 0 : }
71 : }
72 :
73 : impl<'de> Deserialize<'de> for RemotePath {
74 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
75 0 : where
76 0 : D: serde::Deserializer<'de>,
77 0 : {
78 0 : let str = String::deserialize(deserializer)?;
79 0 : Ok(Self(Utf8PathBuf::from(&str)))
80 0 : }
81 : }
82 :
83 : impl std::fmt::Display for RemotePath {
84 1036 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 1036 : std::fmt::Display::fmt(&self.0, f)
86 1036 : }
87 : }
88 :
89 : impl RemotePath {
90 57447 : pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
91 57447 : anyhow::ensure!(
92 57447 : relative_path.is_relative(),
93 2 : "Path {relative_path:?} is not relative"
94 : );
95 57445 : Ok(Self(relative_path.to_path_buf()))
96 57447 : }
97 :
98 33443 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
99 33443 : Self::new(Utf8Path::new(relative_path))
100 33443 : }
101 :
102 16329 : pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
103 16329 : base_path.join(&self.0)
104 16329 : }
105 :
106 6562 : pub fn object_name(&self) -> Option<&str> {
107 6562 : self.0.file_name()
108 6562 : }
109 :
110 5601 : pub fn join(&self, segment: &Utf8Path) -> Self {
111 5601 : Self(self.0.join(segment))
112 5601 : }
113 :
114 35188 : pub fn get_path(&self) -> &Utf8PathBuf {
115 35188 : &self.0
116 35188 : }
117 :
118 0 : pub fn extension(&self) -> Option<&str> {
119 0 : self.0.extension()
120 0 : }
121 :
122 3696 : pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
123 3696 : self.0.strip_prefix(&p.0)
124 3696 : }
125 : }
126 :
127 : /// We don't need callers to be able to pass arbitrary delimiters: just control
128 : /// whether listings will use a '/' separator or not.
129 : ///
130 : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
131 : /// NoDelimiter mode will only populate `keys`.
132 : pub enum ListingMode {
133 : WithDelimiter,
134 : NoDelimiter,
135 : }
136 :
137 1276 : #[derive(Default)]
138 : pub struct Listing {
139 : pub prefixes: Vec<RemotePath>,
140 : pub keys: Vec<RemotePath>,
141 : }
142 :
143 : /// Storage (potentially remote) API to manage its state.
144 : /// This storage tries to be unaware of any layered repository context,
145 : /// providing basic CRUD operations for storage files.
146 : #[allow(async_fn_in_trait)]
147 : pub trait RemoteStorage: Send + Sync + 'static {
148 : /// Lists all top level subdirectories for a given prefix
149 : /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
150 : /// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
151 : /// so this method doesnt need to.
152 9 : async fn list_prefixes(
153 9 : &self,
154 9 : prefix: Option<&RemotePath>,
155 9 : ) -> Result<Vec<RemotePath>, DownloadError> {
156 0 : let result = self
157 0 : .list(prefix, ListingMode::WithDelimiter)
158 0 : .await?
159 : .prefixes;
160 0 : Ok(result)
161 0 : }
162 : /// Lists all files in directory "recursively"
163 : /// (not really recursively, because AWS has a flat namespace)
164 : /// Note: This is subtely different than list_prefixes,
165 : /// because it is for listing files instead of listing
166 : /// names sharing common prefixes.
167 : /// For example,
168 : /// list_files("foo/bar") = ["foo/bar/cat123.txt",
169 : /// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"]
170 : /// whereas,
171 : /// list_prefixes("foo/bar/") = ["cat", "dog"]
172 : /// See `test_real_s3.rs` for more details.
173 448 : async fn list_files(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
174 1427 : let result = self.list(prefix, ListingMode::NoDelimiter).await?.keys;
175 430 : Ok(result)
176 430 : }
177 :
178 : async fn list(
179 : &self,
180 : prefix: Option<&RemotePath>,
181 : _mode: ListingMode,
182 : ) -> anyhow::Result<Listing, DownloadError>;
183 :
184 : /// Streams the local file contents into remote into the remote storage entry.
185 : async fn upload(
186 : &self,
187 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
188 : // S3 PUT request requires the content length to be specified,
189 : // otherwise it starts to fail with the concurrent connection count increasing.
190 : data_size_bytes: usize,
191 : to: &RemotePath,
192 : metadata: Option<StorageMetadata>,
193 : ) -> anyhow::Result<()>;
194 :
195 : /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
196 : /// Returns the metadata, if any was stored with the file previously.
197 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError>;
198 :
199 : /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
200 : /// Returns the metadata, if any was stored with the file previously.
201 : async fn download_byte_range(
202 : &self,
203 : from: &RemotePath,
204 : start_inclusive: u64,
205 : end_exclusive: Option<u64>,
206 : ) -> Result<Download, DownloadError>;
207 :
208 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
209 :
210 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
211 :
212 : /// Copy a remote object inside a bucket from one path to another.
213 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>;
214 :
215 : /// Resets the content of everything with the given prefix to the given state
216 : async fn time_travel_recover(
217 : &self,
218 : prefix: Option<&RemotePath>,
219 : timestamp: SystemTime,
220 : done_if_after: SystemTime,
221 : cancel: &CancellationToken,
222 : ) -> Result<(), TimeTravelError>;
223 : }
224 :
225 : pub type DownloadStream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync>>;
226 : pub struct Download {
227 : pub download_stream: DownloadStream,
228 : /// The last time the file was modified (`last-modified` HTTP header)
229 : pub last_modified: Option<SystemTime>,
230 : /// A way to identify this specific version of the resource (`etag` HTTP header)
231 : pub etag: Option<String>,
232 : /// Extra key-value data, associated with the current remote file.
233 : pub metadata: Option<StorageMetadata>,
234 : }
235 :
236 : impl Debug for Download {
237 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 0 : f.debug_struct("Download")
239 0 : .field("metadata", &self.metadata)
240 0 : .finish()
241 0 : }
242 : }
243 :
244 0 : #[derive(Debug)]
245 : pub enum DownloadError {
246 : /// Validation or other error happened due to user input.
247 : BadInput(anyhow::Error),
248 : /// The file was not found in the remote storage.
249 : NotFound,
250 : /// A cancellation token aborted the download, typically during
251 : /// tenant detach or process shutdown.
252 : Cancelled,
253 : /// The file was found in the remote storage, but the download failed.
254 : Other(anyhow::Error),
255 : }
256 :
257 : impl std::fmt::Display for DownloadError {
258 3096 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 3096 : match self {
260 0 : DownloadError::BadInput(e) => {
261 0 : write!(f, "Failed to download a remote file due to user input: {e}")
262 : }
263 12 : DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
264 0 : DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
265 3084 : DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
266 : }
267 3096 : }
268 : }
269 :
270 : impl std::error::Error for DownloadError {}
271 :
272 0 : #[derive(Debug)]
273 : pub enum TimeTravelError {
274 : /// Validation or other error happened due to user input.
275 : BadInput(anyhow::Error),
276 : /// The used remote storage does not have time travel recovery implemented
277 : Unimplemented,
278 : /// The number of versions/deletion markers is above our limit.
279 : TooManyVersions,
280 : /// A cancellation token aborted the process, typically during
281 : /// request closure or process shutdown.
282 : Cancelled,
283 : /// Other errors
284 : Other(anyhow::Error),
285 : }
286 :
287 : impl std::fmt::Display for TimeTravelError {
288 1 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 1 : match self {
290 0 : TimeTravelError::BadInput(e) => {
291 0 : write!(
292 0 : f,
293 0 : "Failed to time travel recover a prefix due to user input: {e}"
294 0 : )
295 : }
296 0 : TimeTravelError::Unimplemented => write!(
297 0 : f,
298 0 : "time travel recovery is not implemented for the current storage backend"
299 0 : ),
300 0 : TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"),
301 : TimeTravelError::TooManyVersions => {
302 0 : write!(f, "Number of versions/delete markers above limit")
303 : }
304 1 : TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"),
305 : }
306 1 : }
307 : }
308 :
309 : impl std::error::Error for TimeTravelError {}
310 :
311 : /// Every storage, currently supported.
312 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
313 7411 : #[derive(Clone)]
314 : // Require Clone for `Other` due to https://github.com/rust-lang/rust/issues/26925
315 : pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
316 : LocalFs(LocalFs),
317 : AwsS3(Arc<S3Bucket>),
318 : AzureBlob(Arc<AzureBlobStorage>),
319 : Unreliable(Other),
320 : }
321 :
322 : impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
323 966 : pub async fn list(
324 966 : &self,
325 966 : prefix: Option<&RemotePath>,
326 966 : mode: ListingMode,
327 966 : ) -> anyhow::Result<Listing, DownloadError> {
328 966 : match self {
329 572 : Self::LocalFs(s) => s.list(prefix, mode).await,
330 1552 : Self::AwsS3(s) => s.list(prefix, mode).await,
331 0 : Self::AzureBlob(s) => s.list(prefix, mode).await,
332 221 : Self::Unreliable(s) => s.list(prefix, mode).await,
333 : }
334 966 : }
335 :
336 : // A function for listing all the files in a "directory"
337 : // Example:
338 : // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
339 558 : pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
340 558 : match self {
341 336 : Self::LocalFs(s) => s.list_files(folder).await,
342 1091 : Self::AwsS3(s) => s.list_files(folder).await,
343 0 : Self::AzureBlob(s) => s.list_files(folder).await,
344 262 : Self::Unreliable(s) => s.list_files(folder).await,
345 : }
346 558 : }
347 :
348 : // lists common *prefixes*, if any of files
349 : // Example:
350 : // list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
351 0 : pub async fn list_prefixes(
352 0 : &self,
353 0 : prefix: Option<&RemotePath>,
354 0 : ) -> Result<Vec<RemotePath>, DownloadError> {
355 0 : match self {
356 0 : Self::LocalFs(s) => s.list_prefixes(prefix).await,
357 0 : Self::AwsS3(s) => s.list_prefixes(prefix).await,
358 0 : Self::AzureBlob(s) => s.list_prefixes(prefix).await,
359 0 : Self::Unreliable(s) => s.list_prefixes(prefix).await,
360 : }
361 0 : }
362 :
363 33624 : pub async fn upload(
364 33624 : &self,
365 33624 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
366 33624 : data_size_bytes: usize,
367 33624 : to: &RemotePath,
368 33624 : metadata: Option<StorageMetadata>,
369 33624 : ) -> anyhow::Result<()> {
370 33624 : match self {
371 1047820 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
372 56135 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
373 0 : Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await,
374 19068 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
375 : }
376 33618 : }
377 :
378 12284 : pub async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
379 12284 : match self {
380 1411 : Self::LocalFs(s) => s.download(from).await,
381 31469 : Self::AwsS3(s) => s.download(from).await,
382 0 : Self::AzureBlob(s) => s.download(from).await,
383 186 : Self::Unreliable(s) => s.download(from).await,
384 : }
385 12279 : }
386 :
387 38 : pub async fn download_byte_range(
388 38 : &self,
389 38 : from: &RemotePath,
390 38 : start_inclusive: u64,
391 38 : end_exclusive: Option<u64>,
392 38 : ) -> Result<Download, DownloadError> {
393 38 : match self {
394 2 : Self::LocalFs(s) => {
395 2 : s.download_byte_range(from, start_inclusive, end_exclusive)
396 4 : .await
397 : }
398 36 : Self::AwsS3(s) => {
399 36 : s.download_byte_range(from, start_inclusive, end_exclusive)
400 107 : .await
401 : }
402 0 : Self::AzureBlob(s) => {
403 0 : s.download_byte_range(from, start_inclusive, end_exclusive)
404 0 : .await
405 : }
406 0 : Self::Unreliable(s) => {
407 0 : s.download_byte_range(from, start_inclusive, end_exclusive)
408 0 : .await
409 : }
410 : }
411 38 : }
412 :
413 2100 : pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
414 2100 : match self {
415 16 : Self::LocalFs(s) => s.delete(path).await,
416 8257 : Self::AwsS3(s) => s.delete(path).await,
417 0 : Self::AzureBlob(s) => s.delete(path).await,
418 140 : Self::Unreliable(s) => s.delete(path).await,
419 : }
420 2100 : }
421 :
422 538 : pub async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
423 538 : match self {
424 1538 : Self::LocalFs(s) => s.delete_objects(paths).await,
425 1021 : Self::AwsS3(s) => s.delete_objects(paths).await,
426 0 : Self::AzureBlob(s) => s.delete_objects(paths).await,
427 7962 : Self::Unreliable(s) => s.delete_objects(paths).await,
428 : }
429 538 : }
430 :
431 14 : pub async fn copy_object(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
432 14 : match self {
433 2 : Self::LocalFs(s) => s.copy(from, to).await,
434 48 : Self::AwsS3(s) => s.copy(from, to).await,
435 0 : Self::AzureBlob(s) => s.copy(from, to).await,
436 0 : Self::Unreliable(s) => s.copy(from, to).await,
437 : }
438 14 : }
439 :
440 1 : pub async fn time_travel_recover(
441 1 : &self,
442 1 : prefix: Option<&RemotePath>,
443 1 : timestamp: SystemTime,
444 1 : done_if_after: SystemTime,
445 1 : cancel: &CancellationToken,
446 1 : ) -> Result<(), TimeTravelError> {
447 1 : match self {
448 0 : Self::LocalFs(s) => {
449 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
450 0 : .await
451 : }
452 1 : Self::AwsS3(s) => {
453 1 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
454 131 : .await
455 : }
456 0 : Self::AzureBlob(s) => {
457 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
458 0 : .await
459 : }
460 0 : Self::Unreliable(s) => {
461 0 : s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
462 0 : .await
463 : }
464 : }
465 1 : }
466 : }
467 :
468 : impl GenericRemoteStorage {
469 747 : pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
470 747 : Ok(match &storage_config.storage {
471 476 : RemoteStorageKind::LocalFs(root) => {
472 476 : info!("Using fs root '{root}' as a remote storage");
473 476 : Self::LocalFs(LocalFs::new(root.clone())?)
474 : }
475 265 : RemoteStorageKind::AwsS3(s3_config) => {
476 265 : // The profile and access key id are only printed here for debugging purposes,
477 265 : // their values don't indicate the eventually taken choice for auth.
478 265 : let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
479 265 : let access_key_id =
480 265 : std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
481 265 : info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
482 265 : s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
483 265 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config)?))
484 : }
485 6 : RemoteStorageKind::AzureContainer(azure_config) => {
486 6 : info!("Using azure container '{}' in region '{}' as a remote storage, prefix in container: '{:?}'",
487 6 : azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
488 6 : Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?))
489 : }
490 : })
491 747 : }
492 :
493 67 : pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
494 67 : Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
495 67 : }
496 :
497 : /// Takes storage object contents and its size and uploads to remote storage,
498 : /// mapping `from_path` to the corresponding remote object id in the storage.
499 : ///
500 : /// The storage object does not have to be present on the `from_path`,
501 : /// this path is used for the remote object id conversion only.
502 7600 : pub async fn upload_storage_object(
503 7600 : &self,
504 7600 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
505 7600 : from_size_bytes: usize,
506 7600 : to: &RemotePath,
507 7600 : ) -> anyhow::Result<()> {
508 7600 : self.upload(from, from_size_bytes, to, None)
509 44790 : .await
510 7595 : .with_context(|| {
511 772 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
512 7595 : })
513 7595 : }
514 :
515 : /// Downloads the storage object into the `to_path` provided.
516 : /// `byte_range` could be specified to dowload only a part of the file, if needed.
517 38 : pub async fn download_storage_object(
518 38 : &self,
519 38 : byte_range: Option<(u64, Option<u64>)>,
520 38 : from: &RemotePath,
521 38 : ) -> Result<Download, DownloadError> {
522 38 : match byte_range {
523 111 : Some((start, end)) => self.download_byte_range(from, start, end).await,
524 0 : None => self.download(from).await,
525 : }
526 38 : }
527 : }
528 :
529 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
530 : /// Immutable, cannot be changed once the file is created.
531 4 : #[derive(Debug, Clone, PartialEq, Eq)]
532 : pub struct StorageMetadata(HashMap<String, String>);
533 :
534 : /// External backup storage configuration, enough for creating a client for that storage.
535 793 : #[derive(Debug, Clone, PartialEq, Eq)]
536 : pub struct RemoteStorageConfig {
537 : /// The storage connection configuration.
538 : pub storage: RemoteStorageKind,
539 : }
540 :
541 : /// A kind of a remote storage to connect to, with its connection configuration.
542 793 : #[derive(Debug, Clone, PartialEq, Eq)]
543 : pub enum RemoteStorageKind {
544 : /// Storage based on local file system.
545 : /// Specify a root folder to place all stored files into.
546 : LocalFs(Utf8PathBuf),
547 : /// AWS S3 based storage, storing all files in the S3 bucket
548 : /// specified by the config
549 : AwsS3(S3Config),
550 : /// Azure Blob based storage, storing all files in the container
551 : /// specified by the config
552 : AzureContainer(AzureConfig),
553 : }
554 :
555 : /// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
556 685 : #[derive(Clone, PartialEq, Eq)]
557 : pub struct S3Config {
558 : /// Name of the bucket to connect to.
559 : pub bucket_name: String,
560 : /// The region where the bucket is located at.
561 : pub bucket_region: String,
562 : /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
563 : pub prefix_in_bucket: Option<String>,
564 : /// A base URL to send S3 requests to.
565 : /// By default, the endpoint is derived from a region name, assuming it's
566 : /// an AWS S3 region name, erroring on wrong region name.
567 : /// Endpoint provides a way to support other S3 flavors and their regions.
568 : ///
569 : /// Example: `http://127.0.0.1:5000`
570 : pub endpoint: Option<String>,
571 : /// AWS S3 has various limits on its API calls, we need not to exceed those.
572 : /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
573 : pub concurrency_limit: NonZeroUsize,
574 : pub max_keys_per_list_response: Option<i32>,
575 : }
576 :
577 : impl Debug for S3Config {
578 19 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
579 19 : f.debug_struct("S3Config")
580 19 : .field("bucket_name", &self.bucket_name)
581 19 : .field("bucket_region", &self.bucket_region)
582 19 : .field("prefix_in_bucket", &self.prefix_in_bucket)
583 19 : .field("concurrency_limit", &self.concurrency_limit)
584 19 : .field(
585 19 : "max_keys_per_list_response",
586 19 : &self.max_keys_per_list_response,
587 19 : )
588 19 : .finish()
589 19 : }
590 : }
591 :
592 : /// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
593 0 : #[derive(Clone, PartialEq, Eq)]
594 : pub struct AzureConfig {
595 : /// Name of the container to connect to.
596 : pub container_name: String,
597 : /// The region where the bucket is located at.
598 : pub container_region: String,
599 : /// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
600 : pub prefix_in_container: Option<String>,
601 : /// Azure has various limits on its API calls, we need not to exceed those.
602 : /// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
603 : pub concurrency_limit: NonZeroUsize,
604 : pub max_keys_per_list_response: Option<i32>,
605 : }
606 :
607 : impl Debug for AzureConfig {
608 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
609 0 : f.debug_struct("AzureConfig")
610 0 : .field("bucket_name", &self.container_name)
611 0 : .field("bucket_region", &self.container_region)
612 0 : .field("prefix_in_bucket", &self.prefix_in_container)
613 0 : .field("concurrency_limit", &self.concurrency_limit)
614 0 : .field(
615 0 : "max_keys_per_list_response",
616 0 : &self.max_keys_per_list_response,
617 0 : )
618 0 : .finish()
619 0 : }
620 : }
621 :
622 : impl RemoteStorageConfig {
623 1085 : pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
624 1085 : let local_path = toml.get("local_path");
625 1085 : let bucket_name = toml.get("bucket_name");
626 1085 : let bucket_region = toml.get("bucket_region");
627 1085 : let container_name = toml.get("container_name");
628 1085 : let container_region = toml.get("container_region");
629 :
630 1085 : let use_azure = container_name.is_some() && container_region.is_some();
631 :
632 1085 : let default_concurrency_limit = if use_azure {
633 0 : DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT
634 : } else {
635 1085 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
636 : };
637 1085 : let concurrency_limit = NonZeroUsize::new(
638 1085 : parse_optional_integer("concurrency_limit", toml)?.unwrap_or(default_concurrency_limit),
639 1085 : )
640 1085 : .context("Failed to parse 'concurrency_limit' as a positive integer")?;
641 :
642 1085 : let max_keys_per_list_response =
643 1085 : parse_optional_integer::<i32, _>("max_keys_per_list_response", toml)
644 1085 : .context("Failed to parse 'max_keys_per_list_response' as a positive integer")?
645 1085 : .or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE);
646 :
647 1085 : let endpoint = toml
648 1085 : .get("endpoint")
649 1085 : .map(|endpoint| parse_toml_string("endpoint", endpoint))
650 1085 : .transpose()?;
651 :
652 1027 : let storage = match (
653 1085 : local_path,
654 1085 : bucket_name,
655 1085 : bucket_region,
656 1085 : container_name,
657 1085 : container_region,
658 : ) {
659 : // no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled
660 58 : (None, None, None, None, None) => return Ok(None),
661 : (_, Some(_), None, ..) => {
662 0 : bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
663 : }
664 : (_, None, Some(_), ..) => {
665 0 : bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
666 : }
667 392 : (None, Some(bucket_name), Some(bucket_region), ..) => {
668 392 : RemoteStorageKind::AwsS3(S3Config {
669 392 : bucket_name: parse_toml_string("bucket_name", bucket_name)?,
670 392 : bucket_region: parse_toml_string("bucket_region", bucket_region)?,
671 392 : prefix_in_bucket: toml
672 392 : .get("prefix_in_bucket")
673 392 : .map(|prefix_in_bucket| {
674 392 : parse_toml_string("prefix_in_bucket", prefix_in_bucket)
675 392 : })
676 392 : .transpose()?,
677 392 : endpoint,
678 392 : concurrency_limit,
679 392 : max_keys_per_list_response,
680 : })
681 : }
682 : (_, _, _, Some(_), None) => {
683 0 : bail!("'container_name' option is mandatory if 'container_region' is given ")
684 : }
685 : (_, _, _, None, Some(_)) => {
686 0 : bail!("'container_name' option is mandatory if 'container_region' is given ")
687 : }
688 0 : (None, None, None, Some(container_name), Some(container_region)) => {
689 0 : RemoteStorageKind::AzureContainer(AzureConfig {
690 0 : container_name: parse_toml_string("container_name", container_name)?,
691 0 : container_region: parse_toml_string("container_region", container_region)?,
692 0 : prefix_in_container: toml
693 0 : .get("prefix_in_container")
694 0 : .map(|prefix_in_container| {
695 0 : parse_toml_string("prefix_in_container", prefix_in_container)
696 0 : })
697 0 : .transpose()?,
698 0 : concurrency_limit,
699 0 : max_keys_per_list_response,
700 : })
701 : }
702 635 : (Some(local_path), None, None, None, None) => RemoteStorageKind::LocalFs(
703 635 : Utf8PathBuf::from(parse_toml_string("local_path", local_path)?),
704 : ),
705 : (Some(_), Some(_), ..) => {
706 0 : bail!("'local_path' and 'bucket_name' are mutually exclusive")
707 : }
708 : (Some(_), _, _, Some(_), Some(_)) => {
709 0 : bail!("local_path and 'container_name' are mutually exclusive")
710 : }
711 : };
712 :
713 1027 : Ok(Some(RemoteStorageConfig { storage }))
714 1085 : }
715 : }
716 :
717 : // Helper functions to parse a toml Item
718 2170 : fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
719 2170 : where
720 2170 : I: TryFrom<i64, Error = E>,
721 2170 : E: std::error::Error + Send + Sync + 'static,
722 2170 : {
723 2170 : let toml_integer = match item.get(name) {
724 4 : Some(item) => item
725 4 : .as_integer()
726 4 : .with_context(|| format!("configure option {name} is not an integer"))?,
727 2166 : None => return Ok(None),
728 : };
729 :
730 4 : I::try_from(toml_integer)
731 4 : .map(Some)
732 4 : .with_context(|| format!("configure option {name} is too large"))
733 2170 : }
734 :
735 2024 : fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result<String> {
736 2024 : let s = item
737 2024 : .as_str()
738 2024 : .with_context(|| format!("configure option {name} is not a string"))?;
739 2024 : Ok(s.to_string())
740 2024 : }
741 :
742 : struct ConcurrencyLimiter {
743 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
744 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
745 : // The helps to ensure we don't exceed the thresholds.
746 : write: Arc<Semaphore>,
747 : read: Arc<Semaphore>,
748 : }
749 :
750 : impl ConcurrencyLimiter {
751 29917 : fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
752 29917 : match kind {
753 10771 : RequestKind::Get => &self.read,
754 16194 : RequestKind::Put => &self.write,
755 542 : RequestKind::List => &self.read,
756 2388 : RequestKind::Delete => &self.write,
757 15 : RequestKind::Copy => &self.write,
758 7 : RequestKind::TimeTravel => &self.write,
759 : }
760 29917 : }
761 :
762 19153 : async fn acquire(
763 19153 : &self,
764 19153 : kind: RequestKind,
765 19153 : ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
766 18810 : self.for_kind(kind).acquire().await
767 18810 : }
768 :
769 10764 : async fn acquire_owned(
770 10764 : &self,
771 10764 : kind: RequestKind,
772 10764 : ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
773 10744 : Arc::clone(self.for_kind(kind)).acquire_owned().await
774 10744 : }
775 :
776 281 : fn new(limit: usize) -> ConcurrencyLimiter {
777 281 : Self {
778 281 : read: Arc::new(Semaphore::new(limit)),
779 281 : write: Arc::new(Semaphore::new(limit)),
780 281 : }
781 281 : }
782 : }
783 :
784 : #[cfg(test)]
785 : mod tests {
786 : use super::*;
787 :
788 2 : #[test]
789 2 : fn test_object_name() {
790 2 : let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
791 2 : assert_eq!(k.object_name(), Some("c"));
792 :
793 2 : let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
794 2 : assert_eq!(k.object_name(), Some("c"));
795 :
796 2 : let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
797 2 : assert_eq!(k.object_name(), Some("a"));
798 :
799 : // XXX is it impossible to have an empty key?
800 2 : let k = RemotePath::new(Utf8Path::new("")).unwrap();
801 2 : assert_eq!(k.object_name(), None);
802 2 : }
803 :
804 2 : #[test]
805 2 : fn rempte_path_cannot_be_created_from_absolute_ones() {
806 2 : let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
807 2 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
808 2 : }
809 : }
|