TLA Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //! * [`azure_blob`] allows to use Azure Blob storage as an external storage
8 : //!
9 : #![deny(unsafe_code)]
10 : #![deny(clippy::undocumented_unsafe_blocks)]
11 :
12 : mod azure_blob;
13 : mod local_fs;
14 : mod s3_bucket;
15 : mod simulate_failures;
16 :
17 : use std::{
18 : collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc, time::SystemTime,
19 : };
20 :
21 : use anyhow::{bail, Context};
22 : use camino::{Utf8Path, Utf8PathBuf};
23 :
24 : use bytes::Bytes;
25 : use futures::stream::Stream;
26 : use serde::{Deserialize, Serialize};
27 : use tokio::sync::Semaphore;
28 : use toml_edit::Item;
29 : use tracing::info;
30 :
31 : pub use self::{
32 : azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket,
33 : simulate_failures::UnreliableWrapper,
34 : };
35 : use s3_bucket::RequestKind;
36 :
37 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
38 : /// ~200 RPS for IAM services
39 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
40 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
41 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
42 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
43 : /// We set this a little bit low as we currently buffer the entire file into RAM
44 : ///
45 : /// Here, a limit of max 20k concurrent connections was noted.
46 : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
47 : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 30;
48 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
49 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
50 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
51 :
52 : /// As defined in S3 docs
53 : pub const MAX_KEYS_PER_DELETE: usize = 1000;
54 :
55 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
56 :
57 : /// Path on the remote storage, relative to some inner prefix.
58 : /// The prefix is an implementation detail, that allows representing local paths
59 : /// as the remote ones, stripping the local storage prefix away.
60 CBC 17154 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
61 : pub struct RemotePath(Utf8PathBuf);
62 :
63 : impl Serialize for RemotePath {
64 UBC 0 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
65 0 : where
66 0 : S: serde::Serializer,
67 0 : {
68 0 : serializer.collect_str(self)
69 0 : }
70 : }
71 :
72 : impl<'de> Deserialize<'de> for RemotePath {
73 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
74 0 : where
75 0 : D: serde::Deserializer<'de>,
76 0 : {
77 0 : let str = String::deserialize(deserializer)?;
78 0 : Ok(Self(Utf8PathBuf::from(&str)))
79 0 : }
80 : }
81 :
82 : impl std::fmt::Display for RemotePath {
83 CBC 892 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 892 : std::fmt::Display::fmt(&self.0, f)
85 892 : }
86 : }
87 :
88 : impl RemotePath {
89 51333 : pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
90 51333 : anyhow::ensure!(
91 51333 : relative_path.is_relative(),
92 1 : "Path {relative_path:?} is not relative"
93 : );
94 51332 : Ok(Self(relative_path.to_path_buf()))
95 51333 : }
96 :
97 29157 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
98 29157 : Self::new(Utf8Path::new(relative_path))
99 29157 : }
100 :
101 13130 : pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
102 13130 : base_path.join(&self.0)
103 13130 : }
104 :
105 6376 : pub fn object_name(&self) -> Option<&str> {
106 6376 : self.0.file_name()
107 6376 : }
108 :
109 4049 : pub fn join(&self, segment: &Utf8Path) -> Self {
110 4049 : Self(self.0.join(segment))
111 4049 : }
112 :
113 33249 : pub fn get_path(&self) -> &Utf8PathBuf {
114 33249 : &self.0
115 33249 : }
116 :
117 UBC 0 : pub fn extension(&self) -> Option<&str> {
118 0 : self.0.extension()
119 0 : }
120 :
121 CBC 2797 : pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
122 2797 : self.0.strip_prefix(&p.0)
123 2797 : }
124 : }
125 :
126 : /// We don't need callers to be able to pass arbitrary delimiters: just control
127 : /// whether listings will use a '/' separator or not.
128 : ///
129 : /// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
130 : /// NoDelimiter mode will only populate `keys`.
131 : pub enum ListingMode {
132 : WithDelimiter,
133 : NoDelimiter,
134 : }
135 :
136 698 : #[derive(Default)]
137 : pub struct Listing {
138 : pub prefixes: Vec<RemotePath>,
139 : pub keys: Vec<RemotePath>,
140 : }
141 :
142 : /// Storage (potentially remote) API to manage its state.
143 : /// This storage tries to be unaware of any layered repository context,
144 : /// providing basic CRUD operations for storage files.
145 : #[async_trait::async_trait]
146 : pub trait RemoteStorage: Send + Sync + 'static {
147 : /// Lists all top level subdirectories for a given prefix
148 : /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
149 : /// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
150 : /// so this method doesnt need to.
151 9 : async fn list_prefixes(
152 9 : &self,
153 9 : prefix: Option<&RemotePath>,
154 9 : ) -> Result<Vec<RemotePath>, DownloadError> {
155 9 : let result = self
156 9 : .list(prefix, ListingMode::WithDelimiter)
157 57 : .await?
158 : .prefixes;
159 9 : Ok(result)
160 18 : }
161 : /// Lists all files in directory "recursively"
162 : /// (not really recursively, because AWS has a flat namespace)
163 : /// Note: This is subtely different than list_prefixes,
164 : /// because it is for listing files instead of listing
165 : /// names sharing common prefixes.
166 : /// For example,
167 : /// list_files("foo/bar") = ["foo/bar/cat123.txt",
168 : /// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"]
169 : /// whereas,
170 : /// list_prefixes("foo/bar/") = ["cat", "dog"]
171 : /// See `test_real_s3.rs` for more details.
172 384 : async fn list_files(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
173 1360 : let result = self.list(prefix, ListingMode::NoDelimiter).await?.keys;
174 384 : Ok(result)
175 768 : }
176 :
177 : async fn list(
178 : &self,
179 : prefix: Option<&RemotePath>,
180 : _mode: ListingMode,
181 : ) -> anyhow::Result<Listing, DownloadError>;
182 :
183 : /// Streams the local file contents into remote into the remote storage entry.
184 : async fn upload(
185 : &self,
186 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
187 : // S3 PUT request requires the content length to be specified,
188 : // otherwise it starts to fail with the concurrent connection count increasing.
189 : data_size_bytes: usize,
190 : to: &RemotePath,
191 : metadata: Option<StorageMetadata>,
192 : ) -> anyhow::Result<()>;
193 :
194 : /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
195 : /// Returns the metadata, if any was stored with the file previously.
196 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError>;
197 :
198 : /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
199 : /// Returns the metadata, if any was stored with the file previously.
200 : async fn download_byte_range(
201 : &self,
202 : from: &RemotePath,
203 : start_inclusive: u64,
204 : end_exclusive: Option<u64>,
205 : ) -> Result<Download, DownloadError>;
206 :
207 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
208 :
209 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
210 :
211 : /// Copy a remote object inside a bucket from one path to another.
212 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>;
213 : }
214 :
215 : pub type DownloadStream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync>>;
216 : pub struct Download {
217 : pub download_stream: DownloadStream,
218 : /// The last time the file was modified (`last-modified` HTTP header)
219 : pub last_modified: Option<SystemTime>,
220 : /// A way to identify this specific version of the resource (`etag` HTTP header)
221 : pub etag: Option<String>,
222 : /// Extra key-value data, associated with the current remote file.
223 : pub metadata: Option<StorageMetadata>,
224 : }
225 :
226 : impl Debug for Download {
227 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 0 : f.debug_struct("Download")
229 0 : .field("metadata", &self.metadata)
230 0 : .finish()
231 0 : }
232 : }
233 :
234 0 : #[derive(Debug)]
235 : pub enum DownloadError {
236 : /// Validation or other error happened due to user input.
237 : BadInput(anyhow::Error),
238 : /// The file was not found in the remote storage.
239 : NotFound,
240 : /// A cancellation token aborted the download, typically during
241 : /// tenant detach or process shutdown.
242 : Cancelled,
243 : /// The file was found in the remote storage, but the download failed.
244 : Other(anyhow::Error),
245 : }
246 :
247 : impl std::fmt::Display for DownloadError {
248 CBC 3053 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 3053 : match self {
250 UBC 0 : DownloadError::BadInput(e) => {
251 0 : write!(f, "Failed to download a remote file due to user input: {e}")
252 : }
253 CBC 8 : DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
254 UBC 0 : DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
255 CBC 3045 : DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
256 : }
257 3053 : }
258 : }
259 :
260 : impl std::error::Error for DownloadError {}
261 :
262 : /// Every storage, currently supported.
263 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
264 6521 : #[derive(Clone)]
265 : pub enum GenericRemoteStorage {
266 : LocalFs(LocalFs),
267 : AwsS3(Arc<S3Bucket>),
268 : AzureBlob(Arc<AzureBlobStorage>),
269 : Unreliable(Arc<UnreliableWrapper>),
270 : }
271 :
272 : impl GenericRemoteStorage {
273 346 : pub async fn list(
274 346 : &self,
275 346 : prefix: Option<&RemotePath>,
276 346 : mode: ListingMode,
277 346 : ) -> anyhow::Result<Listing, DownloadError> {
278 346 : match self {
279 374 : Self::LocalFs(s) => s.list(prefix, mode).await,
280 681 : Self::AwsS3(s) => s.list(prefix, mode).await,
281 UBC 0 : Self::AzureBlob(s) => s.list(prefix, mode).await,
282 CBC 87 : Self::Unreliable(s) => s.list(prefix, mode).await,
283 : }
284 346 : }
285 :
286 : // A function for listing all the files in a "directory"
287 : // Example:
288 : // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
289 530 : pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
290 530 : match self {
291 218 : Self::LocalFs(s) => s.list_files(folder).await,
292 1134 : Self::AwsS3(s) => s.list_files(folder).await,
293 8 : Self::AzureBlob(s) => s.list_files(folder).await,
294 295 : Self::Unreliable(s) => s.list_files(folder).await,
295 : }
296 530 : }
297 :
298 : // lists common *prefixes*, if any of files
299 : // Example:
300 : // list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
301 9 : pub async fn list_prefixes(
302 9 : &self,
303 9 : prefix: Option<&RemotePath>,
304 9 : ) -> Result<Vec<RemotePath>, DownloadError> {
305 9 : match self {
306 UBC 0 : Self::LocalFs(s) => s.list_prefixes(prefix).await,
307 CBC 47 : Self::AwsS3(s) => s.list_prefixes(prefix).await,
308 10 : Self::AzureBlob(s) => s.list_prefixes(prefix).await,
309 UBC 0 : Self::Unreliable(s) => s.list_prefixes(prefix).await,
310 : }
311 CBC 9 : }
312 :
313 30585 : pub async fn upload(
314 30585 : &self,
315 30585 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
316 30585 : data_size_bytes: usize,
317 30585 : to: &RemotePath,
318 30585 : metadata: Option<StorageMetadata>,
319 30585 : ) -> anyhow::Result<()> {
320 30585 : match self {
321 772166 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
322 54274 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
323 UBC 0 : Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await,
324 CBC 17966 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
325 : }
326 30580 : }
327 :
328 11841 : pub async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
329 11841 : match self {
330 1185 : Self::LocalFs(s) => s.download(from).await,
331 30489 : Self::AwsS3(s) => s.download(from).await,
332 1 : Self::AzureBlob(s) => s.download(from).await,
333 327 : Self::Unreliable(s) => s.download(from).await,
334 : }
335 11838 : }
336 :
337 53 : pub async fn download_byte_range(
338 53 : &self,
339 53 : from: &RemotePath,
340 53 : start_inclusive: u64,
341 53 : end_exclusive: Option<u64>,
342 53 : ) -> Result<Download, DownloadError> {
343 53 : match self {
344 2 : Self::LocalFs(s) => {
345 2 : s.download_byte_range(from, start_inclusive, end_exclusive)
346 4 : .await
347 : }
348 46 : Self::AwsS3(s) => {
349 46 : s.download_byte_range(from, start_inclusive, end_exclusive)
350 116 : .await
351 : }
352 5 : Self::AzureBlob(s) => {
353 5 : s.download_byte_range(from, start_inclusive, end_exclusive)
354 5 : .await
355 : }
356 UBC 0 : Self::Unreliable(s) => {
357 0 : s.download_byte_range(from, start_inclusive, end_exclusive)
358 0 : .await
359 : }
360 : }
361 CBC 53 : }
362 :
363 2118 : pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
364 2118 : match self {
365 3 : Self::LocalFs(s) => s.delete(path).await,
366 8101 : Self::AwsS3(s) => s.delete(path).await,
367 50 : Self::AzureBlob(s) => s.delete(path).await,
368 140 : Self::Unreliable(s) => s.delete(path).await,
369 : }
370 2118 : }
371 :
372 479 : pub async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
373 473 : match self {
374 1184 : Self::LocalFs(s) => s.delete_objects(paths).await,
375 897 : Self::AwsS3(s) => s.delete_objects(paths).await,
376 UBC 0 : Self::AzureBlob(s) => s.delete_objects(paths).await,
377 CBC 7532 : Self::Unreliable(s) => s.delete_objects(paths).await,
378 : }
379 473 : }
380 :
381 12 : pub async fn copy_object(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
382 12 : match self {
383 UBC 0 : Self::LocalFs(s) => s.copy(from, to).await,
384 CBC 48 : Self::AwsS3(s) => s.copy(from, to).await,
385 UBC 0 : Self::AzureBlob(s) => s.copy(from, to).await,
386 0 : Self::Unreliable(s) => s.copy(from, to).await,
387 : }
388 CBC 12 : }
389 : }
390 :
391 : impl GenericRemoteStorage {
392 645 : pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
393 645 : Ok(match &storage_config.storage {
394 406 : RemoteStorageKind::LocalFs(root) => {
395 406 : info!("Using fs root '{root}' as a remote storage");
396 406 : Self::LocalFs(LocalFs::new(root.clone())?)
397 : }
398 234 : RemoteStorageKind::AwsS3(s3_config) => {
399 234 : info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}'",
400 234 : s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
401 234 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config)?))
402 : }
403 5 : RemoteStorageKind::AzureContainer(azure_config) => {
404 5 : info!("Using azure container '{}' in region '{}' as a remote storage, prefix in container: '{:?}'",
405 5 : azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
406 5 : Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?))
407 : }
408 : })
409 645 : }
410 :
411 65 : pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
412 65 : Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
413 65 : }
414 :
415 : /// Takes storage object contents and its size and uploads to remote storage,
416 : /// mapping `from_path` to the corresponding remote object id in the storage.
417 : ///
418 : /// The storage object does not have to be present on the `from_path`,
419 : /// this path is used for the remote object id conversion only.
420 6561 : pub async fn upload_storage_object(
421 6561 : &self,
422 6561 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
423 6561 : from_size_bytes: usize,
424 6561 : to: &RemotePath,
425 6561 : ) -> anyhow::Result<()> {
426 6561 : self.upload(from, from_size_bytes, to, None)
427 41516 : .await
428 6559 : .with_context(|| {
429 746 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
430 6559 : })
431 6559 : }
432 :
433 : /// Downloads the storage object into the `to_path` provided.
434 : /// `byte_range` could be specified to dowload only a part of the file, if needed.
435 38 : pub async fn download_storage_object(
436 38 : &self,
437 38 : byte_range: Option<(u64, Option<u64>)>,
438 38 : from: &RemotePath,
439 38 : ) -> Result<Download, DownloadError> {
440 38 : match byte_range {
441 110 : Some((start, end)) => self.download_byte_range(from, start, end).await,
442 UBC 0 : None => self.download(from).await,
443 : }
444 CBC 38 : }
445 : }
446 :
447 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
448 : /// Immutable, cannot be changed once the file is created.
449 2 : #[derive(Debug, Clone, PartialEq, Eq)]
450 : pub struct StorageMetadata(HashMap<String, String>);
451 :
452 : /// External backup storage configuration, enough for creating a client for that storage.
453 780 : #[derive(Debug, Clone, PartialEq, Eq)]
454 : pub struct RemoteStorageConfig {
455 : /// The storage connection configuration.
456 : pub storage: RemoteStorageKind,
457 : }
458 :
459 : /// A kind of a remote storage to connect to, with its connection configuration.
460 780 : #[derive(Debug, Clone, PartialEq, Eq)]
461 : pub enum RemoteStorageKind {
462 : /// Storage based on local file system.
463 : /// Specify a root folder to place all stored files into.
464 : LocalFs(Utf8PathBuf),
465 : /// AWS S3 based storage, storing all files in the S3 bucket
466 : /// specified by the config
467 : AwsS3(S3Config),
468 : /// Azure Blob based storage, storing all files in the container
469 : /// specified by the config
470 : AzureContainer(AzureConfig),
471 : }
472 :
473 : /// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
474 569 : #[derive(Clone, PartialEq, Eq)]
475 : pub struct S3Config {
476 : /// Name of the bucket to connect to.
477 : pub bucket_name: String,
478 : /// The region where the bucket is located at.
479 : pub bucket_region: String,
480 : /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
481 : pub prefix_in_bucket: Option<String>,
482 : /// A base URL to send S3 requests to.
483 : /// By default, the endpoint is derived from a region name, assuming it's
484 : /// an AWS S3 region name, erroring on wrong region name.
485 : /// Endpoint provides a way to support other S3 flavors and their regions.
486 : ///
487 : /// Example: `http://127.0.0.1:5000`
488 : pub endpoint: Option<String>,
489 : /// AWS S3 has various limits on its API calls, we need not to exceed those.
490 : /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
491 : pub concurrency_limit: NonZeroUsize,
492 : pub max_keys_per_list_response: Option<i32>,
493 : }
494 :
495 : impl Debug for S3Config {
496 12 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497 12 : f.debug_struct("S3Config")
498 12 : .field("bucket_name", &self.bucket_name)
499 12 : .field("bucket_region", &self.bucket_region)
500 12 : .field("prefix_in_bucket", &self.prefix_in_bucket)
501 12 : .field("concurrency_limit", &self.concurrency_limit)
502 12 : .field(
503 12 : "max_keys_per_list_response",
504 12 : &self.max_keys_per_list_response,
505 12 : )
506 12 : .finish()
507 12 : }
508 : }
509 :
510 : /// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
511 UBC 0 : #[derive(Clone, PartialEq, Eq)]
512 : pub struct AzureConfig {
513 : /// Name of the container to connect to.
514 : pub container_name: String,
515 : /// The region where the bucket is located at.
516 : pub container_region: String,
517 : /// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
518 : pub prefix_in_container: Option<String>,
519 : /// Azure has various limits on its API calls, we need not to exceed those.
520 : /// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
521 : pub concurrency_limit: NonZeroUsize,
522 : pub max_keys_per_list_response: Option<i32>,
523 : }
524 :
525 : impl Debug for AzureConfig {
526 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
527 0 : f.debug_struct("AzureConfig")
528 0 : .field("bucket_name", &self.container_name)
529 0 : .field("bucket_region", &self.container_region)
530 0 : .field("prefix_in_bucket", &self.prefix_in_container)
531 0 : .field("concurrency_limit", &self.concurrency_limit)
532 0 : .field(
533 0 : "max_keys_per_list_response",
534 0 : &self.max_keys_per_list_response,
535 0 : )
536 0 : .finish()
537 0 : }
538 : }
539 :
540 : impl RemoteStorageConfig {
541 CBC 986 : pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
542 986 : let local_path = toml.get("local_path");
543 986 : let bucket_name = toml.get("bucket_name");
544 986 : let bucket_region = toml.get("bucket_region");
545 986 : let container_name = toml.get("container_name");
546 986 : let container_region = toml.get("container_region");
547 :
548 986 : let use_azure = container_name.is_some() && container_region.is_some();
549 :
550 986 : let default_concurrency_limit = if use_azure {
551 UBC 0 : DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT
552 : } else {
553 CBC 986 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
554 : };
555 986 : let concurrency_limit = NonZeroUsize::new(
556 986 : parse_optional_integer("concurrency_limit", toml)?.unwrap_or(default_concurrency_limit),
557 986 : )
558 986 : .context("Failed to parse 'concurrency_limit' as a positive integer")?;
559 :
560 986 : let max_keys_per_list_response =
561 986 : parse_optional_integer::<i32, _>("max_keys_per_list_response", toml)
562 986 : .context("Failed to parse 'max_keys_per_list_response' as a positive integer")?
563 986 : .or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE);
564 :
565 986 : let endpoint = toml
566 986 : .get("endpoint")
567 986 : .map(|endpoint| parse_toml_string("endpoint", endpoint))
568 986 : .transpose()?;
569 :
570 935 : let storage = match (
571 986 : local_path,
572 986 : bucket_name,
573 986 : bucket_region,
574 986 : container_name,
575 986 : container_region,
576 : ) {
577 : // no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled
578 51 : (None, None, None, None, None) => return Ok(None),
579 : (_, Some(_), None, ..) => {
580 UBC 0 : bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
581 : }
582 : (_, None, Some(_), ..) => {
583 0 : bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
584 : }
585 CBC 341 : (None, Some(bucket_name), Some(bucket_region), ..) => {
586 341 : RemoteStorageKind::AwsS3(S3Config {
587 341 : bucket_name: parse_toml_string("bucket_name", bucket_name)?,
588 341 : bucket_region: parse_toml_string("bucket_region", bucket_region)?,
589 341 : prefix_in_bucket: toml
590 341 : .get("prefix_in_bucket")
591 341 : .map(|prefix_in_bucket| {
592 341 : parse_toml_string("prefix_in_bucket", prefix_in_bucket)
593 341 : })
594 341 : .transpose()?,
595 341 : endpoint,
596 341 : concurrency_limit,
597 341 : max_keys_per_list_response,
598 : })
599 : }
600 : (_, _, _, Some(_), None) => {
601 UBC 0 : bail!("'container_name' option is mandatory if 'container_region' is given ")
602 : }
603 : (_, _, _, None, Some(_)) => {
604 0 : bail!("'container_name' option is mandatory if 'container_region' is given ")
605 : }
606 0 : (None, None, None, Some(container_name), Some(container_region)) => {
607 0 : RemoteStorageKind::AzureContainer(AzureConfig {
608 0 : container_name: parse_toml_string("container_name", container_name)?,
609 0 : container_region: parse_toml_string("container_region", container_region)?,
610 0 : prefix_in_container: toml
611 0 : .get("prefix_in_container")
612 0 : .map(|prefix_in_container| {
613 0 : parse_toml_string("prefix_in_container", prefix_in_container)
614 0 : })
615 0 : .transpose()?,
616 0 : concurrency_limit,
617 0 : max_keys_per_list_response,
618 : })
619 : }
620 CBC 594 : (Some(local_path), None, None, None, None) => RemoteStorageKind::LocalFs(
621 594 : Utf8PathBuf::from(parse_toml_string("local_path", local_path)?),
622 : ),
623 : (Some(_), Some(_), ..) => {
624 UBC 0 : bail!("'local_path' and 'bucket_name' are mutually exclusive")
625 : }
626 : (Some(_), _, _, Some(_), Some(_)) => {
627 0 : bail!("local_path and 'container_name' are mutually exclusive")
628 : }
629 : };
630 :
631 CBC 935 : Ok(Some(RemoteStorageConfig { storage }))
632 986 : }
633 : }
634 :
635 : // Helper functions to parse a toml Item
636 1972 : fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
637 1972 : where
638 1972 : I: TryFrom<i64, Error = E>,
639 1972 : E: std::error::Error + Send + Sync + 'static,
640 1972 : {
641 1972 : let toml_integer = match item.get(name) {
642 2 : Some(item) => item
643 2 : .as_integer()
644 2 : .with_context(|| format!("configure option {name} is not an integer"))?,
645 1970 : None => return Ok(None),
646 : };
647 :
648 2 : I::try_from(toml_integer)
649 2 : .map(Some)
650 2 : .with_context(|| format!("configure option {name} is too large"))
651 1972 : }
652 :
653 1826 : fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result<String> {
654 1826 : let s = item
655 1826 : .as_str()
656 1826 : .with_context(|| format!("configure option {name} is not a string"))?;
657 1826 : Ok(s.to_string())
658 1826 : }
659 :
660 : struct ConcurrencyLimiter {
661 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
662 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
663 : // The helps to ensure we don't exceed the thresholds.
664 : write: Arc<Semaphore>,
665 : read: Arc<Semaphore>,
666 : }
667 :
668 : impl ConcurrencyLimiter {
669 28593 : fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
670 28593 : match kind {
671 10463 : RequestKind::Get => &self.read,
672 15453 : RequestKind::Put => &self.write,
673 414 : RequestKind::List => &self.read,
674 2251 : RequestKind::Delete => &self.write,
675 12 : RequestKind::Copy => &self.write,
676 : }
677 28593 : }
678 :
679 18136 : async fn acquire(
680 18136 : &self,
681 18136 : kind: RequestKind,
682 18136 : ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
683 18136 : self.for_kind(kind).acquire().await
684 18136 : }
685 :
686 10457 : async fn acquire_owned(
687 10457 : &self,
688 10457 : kind: RequestKind,
689 10457 : ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
690 10457 : Arc::clone(self.for_kind(kind)).acquire_owned().await
691 10457 : }
692 :
693 244 : fn new(limit: usize) -> ConcurrencyLimiter {
694 244 : Self {
695 244 : read: Arc::new(Semaphore::new(limit)),
696 244 : write: Arc::new(Semaphore::new(limit)),
697 244 : }
698 244 : }
699 : }
700 :
701 : #[cfg(test)]
702 : mod tests {
703 : use super::*;
704 :
705 1 : #[test]
706 1 : fn test_object_name() {
707 1 : let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
708 1 : assert_eq!(k.object_name(), Some("c"));
709 :
710 1 : let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
711 1 : assert_eq!(k.object_name(), Some("c"));
712 :
713 1 : let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
714 1 : assert_eq!(k.object_name(), Some("a"));
715 :
716 : // XXX is it impossible to have an empty key?
717 1 : let k = RemotePath::new(Utf8Path::new("")).unwrap();
718 1 : assert_eq!(k.object_name(), None);
719 1 : }
720 :
721 1 : #[test]
722 1 : fn rempte_path_cannot_be_created_from_absolute_ones() {
723 1 : let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
724 1 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
725 1 : }
726 : }
|