TLA Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //! * [`azure_blob`] allows to use Azure Blob storage as an external storage
8 : //!
9 :
10 : mod azure_blob;
11 : mod local_fs;
12 : mod s3_bucket;
13 : mod simulate_failures;
14 :
15 : use std::{
16 : collections::HashMap,
17 : fmt::Debug,
18 : num::{NonZeroU32, NonZeroUsize},
19 : pin::Pin,
20 : sync::Arc,
21 : };
22 :
23 : use anyhow::{bail, Context};
24 : use camino::{Utf8Path, Utf8PathBuf};
25 :
26 : use serde::{Deserialize, Serialize};
27 : use tokio::{io, sync::Semaphore};
28 : use toml_edit::Item;
29 : use tracing::info;
30 :
31 : pub use self::{
32 : azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket,
33 : simulate_failures::UnreliableWrapper,
34 : };
35 : use s3_bucket::RequestKind;
36 :
37 : /// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
38 : /// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
39 : /// during start (where local and remote timelines are compared and initial sync tasks are scheduled) and timeline attach.
40 : /// Both cases may trigger timeline download, that might download a lot of layers. This concurrency is limited by the clients internally, if needed.
41 : pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS: usize = 50;
42 : pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
43 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
44 : /// ~200 RPS for IAM services
45 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
46 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
47 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
48 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
49 : /// We set this a little bit low as we currently buffer the entire file into RAM
50 : ///
51 : /// Here, a limit of max 20k concurrent connections was noted.
52 : /// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
53 : pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 30;
54 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
55 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
56 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
57 :
58 : /// As defined in S3 docs
59 : pub const MAX_KEYS_PER_DELETE: usize = 1000;
60 :
61 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
62 :
63 : /// Path on the remote storage, relative to some inner prefix.
64 : /// The prefix is an implementation detail, that allows representing local paths
65 : /// as the remote ones, stripping the local storage prefix away.
66 CBC 10941 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
67 : pub struct RemotePath(Utf8PathBuf);
68 :
69 : impl Serialize for RemotePath {
70 UBC 0 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
71 0 : where
72 0 : S: serde::Serializer,
73 0 : {
74 0 : serializer.collect_str(self)
75 0 : }
76 : }
77 :
78 : impl<'de> Deserialize<'de> for RemotePath {
79 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
80 0 : where
81 0 : D: serde::Deserializer<'de>,
82 0 : {
83 0 : let str = String::deserialize(deserializer)?;
84 0 : Ok(Self(Utf8PathBuf::from(&str)))
85 0 : }
86 : }
87 :
88 : impl std::fmt::Display for RemotePath {
89 CBC 1445 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 1445 : std::fmt::Display::fmt(&self.0, f)
91 1445 : }
92 : }
93 :
94 : impl RemotePath {
95 40739 : pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
96 40739 : anyhow::ensure!(
97 40739 : relative_path.is_relative(),
98 1 : "Path {relative_path:?} is not relative"
99 : );
100 40738 : Ok(Self(relative_path.to_path_buf()))
101 40739 : }
102 :
103 19454 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
104 19454 : Self::new(Utf8Path::new(relative_path))
105 19454 : }
106 :
107 14318 : pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
108 14318 : base_path.join(&self.0)
109 14318 : }
110 :
111 2448 : pub fn object_name(&self) -> Option<&str> {
112 2448 : self.0.file_name()
113 2448 : }
114 :
115 2756 : pub fn join(&self, segment: &Utf8Path) -> Self {
116 2756 : Self(self.0.join(segment))
117 2756 : }
118 :
119 24414 : pub fn get_path(&self) -> &Utf8PathBuf {
120 24414 : &self.0
121 24414 : }
122 :
123 UBC 0 : pub fn extension(&self) -> Option<&str> {
124 0 : self.0.extension()
125 0 : }
126 :
127 CBC 1672 : pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
128 1672 : self.0.strip_prefix(&p.0)
129 1672 : }
130 : }
131 :
132 : /// Storage (potentially remote) API to manage its state.
133 : /// This storage tries to be unaware of any layered repository context,
134 : /// providing basic CRUD operations for storage files.
135 : #[async_trait::async_trait]
136 : pub trait RemoteStorage: Send + Sync + 'static {
137 : /// Lists all top level subdirectories for a given prefix
138 : /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
139 : /// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
140 : /// so this method doesnt need to.
141 : async fn list_prefixes(
142 : &self,
143 : prefix: Option<&RemotePath>,
144 : ) -> Result<Vec<RemotePath>, DownloadError>;
145 :
146 : /// Lists all files in directory "recursively"
147 : /// (not really recursively, because AWS has a flat namespace)
148 : /// Note: This is subtely different than list_prefixes,
149 : /// because it is for listing files instead of listing
150 : /// names sharing common prefixes.
151 : /// For example,
152 : /// list_files("foo/bar") = ["foo/bar/cat123.txt",
153 : /// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"]
154 : /// whereas,
155 : /// list_prefixes("foo/bar/") = ["cat", "dog"]
156 : /// See `test_real_s3.rs` for more details.
157 : async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>>;
158 :
159 : /// Streams the local file contents into remote into the remote storage entry.
160 : async fn upload(
161 : &self,
162 : from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
163 : // S3 PUT request requires the content length to be specified,
164 : // otherwise it starts to fail with the concurrent connection count increasing.
165 : data_size_bytes: usize,
166 : to: &RemotePath,
167 : metadata: Option<StorageMetadata>,
168 : ) -> anyhow::Result<()>;
169 :
170 : /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
171 : /// Returns the metadata, if any was stored with the file previously.
172 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError>;
173 :
174 : /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
175 : /// Returns the metadata, if any was stored with the file previously.
176 : async fn download_byte_range(
177 : &self,
178 : from: &RemotePath,
179 : start_inclusive: u64,
180 : end_exclusive: Option<u64>,
181 : ) -> Result<Download, DownloadError>;
182 :
183 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
184 :
185 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
186 : }
187 :
188 : pub struct Download {
189 : pub download_stream: Pin<Box<dyn io::AsyncRead + Unpin + Send + Sync>>,
190 : /// Extra key-value data, associated with the current remote file.
191 : pub metadata: Option<StorageMetadata>,
192 : }
193 :
194 : impl Debug for Download {
195 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 0 : f.debug_struct("Download")
197 0 : .field("metadata", &self.metadata)
198 0 : .finish()
199 0 : }
200 : }
201 :
202 0 : #[derive(Debug)]
203 : pub enum DownloadError {
204 : /// Validation or other error happened due to user input.
205 : BadInput(anyhow::Error),
206 : /// The file was not found in the remote storage.
207 : NotFound,
208 : /// The file was found in the remote storage, but the download failed.
209 : Other(anyhow::Error),
210 : }
211 :
212 : impl std::fmt::Display for DownloadError {
213 CBC 3192 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 3192 : match self {
215 UBC 0 : DownloadError::BadInput(e) => {
216 0 : write!(f, "Failed to download a remote file due to user input: {e}")
217 : }
218 0 : DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
219 CBC 3192 : DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
220 : }
221 3192 : }
222 : }
223 :
224 : impl std::error::Error for DownloadError {}
225 :
226 : /// Every storage, currently supported.
227 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
228 5619 : #[derive(Clone)]
229 : pub enum GenericRemoteStorage {
230 : LocalFs(LocalFs),
231 : AwsS3(Arc<S3Bucket>),
232 : AzureBlob(Arc<AzureBlobStorage>),
233 : Unreliable(Arc<UnreliableWrapper>),
234 : }
235 :
236 : impl GenericRemoteStorage {
237 : // A function for listing all the files in a "directory"
238 : // Example:
239 : // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
240 357 : pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
241 357 : match self {
242 30 : Self::LocalFs(s) => s.list_files(folder).await,
243 817 : Self::AwsS3(s) => s.list_files(folder).await,
244 UBC 0 : Self::AzureBlob(s) => s.list_files(folder).await,
245 CBC 272 : Self::Unreliable(s) => s.list_files(folder).await,
246 : }
247 357 : }
248 :
249 : // lists common *prefixes*, if any of files
250 : // Example:
251 : // list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
252 60 : pub async fn list_prefixes(
253 60 : &self,
254 60 : prefix: Option<&RemotePath>,
255 60 : ) -> Result<Vec<RemotePath>, DownloadError> {
256 60 : match self {
257 70 : Self::LocalFs(s) => s.list_prefixes(prefix).await,
258 67 : Self::AwsS3(s) => s.list_prefixes(prefix).await,
259 UBC 0 : Self::AzureBlob(s) => s.list_prefixes(prefix).await,
260 CBC 27 : Self::Unreliable(s) => s.list_prefixes(prefix).await,
261 : }
262 60 : }
263 :
264 30077 : pub async fn upload(
265 30077 : &self,
266 30077 : from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
267 30077 : data_size_bytes: usize,
268 30077 : to: &RemotePath,
269 30077 : metadata: Option<StorageMetadata>,
270 30077 : ) -> anyhow::Result<()> {
271 30077 : match self {
272 3192105 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
273 50671 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
274 UBC 0 : Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await,
275 CBC 43379 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
276 : }
277 30001 : }
278 :
279 2731 : pub async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
280 2731 : match self {
281 1251 : Self::LocalFs(s) => s.download(from).await,
282 4060 : Self::AwsS3(s) => s.download(from).await,
283 UBC 0 : Self::AzureBlob(s) => s.download(from).await,
284 CBC 430 : Self::Unreliable(s) => s.download(from).await,
285 : }
286 2731 : }
287 :
288 6 : pub async fn download_byte_range(
289 6 : &self,
290 6 : from: &RemotePath,
291 6 : start_inclusive: u64,
292 6 : end_exclusive: Option<u64>,
293 6 : ) -> Result<Download, DownloadError> {
294 6 : match self {
295 2 : Self::LocalFs(s) => {
296 2 : s.download_byte_range(from, start_inclusive, end_exclusive)
297 4 : .await
298 : }
299 4 : Self::AwsS3(s) => {
300 4 : s.download_byte_range(from, start_inclusive, end_exclusive)
301 19 : .await
302 : }
303 UBC 0 : Self::AzureBlob(s) => {
304 0 : s.download_byte_range(from, start_inclusive, end_exclusive)
305 0 : .await
306 : }
307 0 : Self::Unreliable(s) => {
308 0 : s.download_byte_range(from, start_inclusive, end_exclusive)
309 0 : .await
310 : }
311 : }
312 CBC 6 : }
313 :
314 2046 : pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
315 2046 : match self {
316 43 : Self::LocalFs(s) => s.delete(path).await,
317 7911 : Self::AwsS3(s) => s.delete(path).await,
318 UBC 0 : Self::AzureBlob(s) => s.delete(path).await,
319 CBC 146 : Self::Unreliable(s) => s.delete(path).await,
320 : }
321 2046 : }
322 :
323 764 : pub async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
324 764 : match self {
325 2623 : Self::LocalFs(s) => s.delete_objects(paths).await,
326 1270 : Self::AwsS3(s) => s.delete_objects(paths).await,
327 UBC 0 : Self::AzureBlob(s) => s.delete_objects(paths).await,
328 CBC 7631 : Self::Unreliable(s) => s.delete_objects(paths).await,
329 : }
330 764 : }
331 : }
332 :
333 : impl GenericRemoteStorage {
334 640 : pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
335 640 : Ok(match &storage_config.storage {
336 375 : RemoteStorageKind::LocalFs(root) => {
337 375 : info!("Using fs root '{root}' as a remote storage");
338 375 : Self::LocalFs(LocalFs::new(root.clone())?)
339 : }
340 265 : RemoteStorageKind::AwsS3(s3_config) => {
341 265 : info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}'",
342 265 : s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
343 265 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config)?))
344 : }
345 UBC 0 : RemoteStorageKind::AzureContainer(azure_config) => {
346 0 : info!("Using azure container '{}' in region '{}' as a remote storage, prefix in container: '{:?}'",
347 0 : azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
348 0 : Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?))
349 : }
350 : })
351 CBC 640 : }
352 :
353 65 : pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
354 65 : Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
355 65 : }
356 :
357 : /// Takes storage object contents and its size and uploads to remote storage,
358 : /// mapping `from_path` to the corresponding remote object id in the storage.
359 : ///
360 : /// The storage object does not have to be present on the `from_path`,
361 : /// this path is used for the remote object id conversion only.
362 6987 : pub async fn upload_storage_object(
363 6987 : &self,
364 6987 : from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
365 6987 : from_size_bytes: usize,
366 6987 : to: &RemotePath,
367 6987 : ) -> anyhow::Result<()> {
368 6987 : self.upload(from, from_size_bytes, to, None)
369 36564 : .await
370 6986 : .with_context(|| {
371 783 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
372 6986 : })
373 6986 : }
374 :
375 : /// Downloads the storage object into the `to_path` provided.
376 : /// `byte_range` could be specified to dowload only a part of the file, if needed.
377 6 : pub async fn download_storage_object(
378 6 : &self,
379 6 : byte_range: Option<(u64, Option<u64>)>,
380 6 : from: &RemotePath,
381 6 : ) -> Result<Download, DownloadError> {
382 6 : match byte_range {
383 23 : Some((start, end)) => self.download_byte_range(from, start, end).await,
384 UBC 0 : None => self.download(from).await,
385 : }
386 CBC 6 : }
387 : }
388 :
389 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
390 : /// Immutable, cannot be changed once the file is created.
391 2 : #[derive(Debug, Clone, PartialEq, Eq)]
392 : pub struct StorageMetadata(HashMap<String, String>);
393 :
394 : /// External backup storage configuration, enough for creating a client for that storage.
395 670 : #[derive(Debug, Clone, PartialEq, Eq)]
396 : pub struct RemoteStorageConfig {
397 : /// Max allowed number of concurrent sync operations between the API user and the remote storage.
398 : pub max_concurrent_syncs: NonZeroUsize,
399 : /// Max allowed errors before the sync task is considered failed and evicted.
400 : pub max_sync_errors: NonZeroU32,
401 : /// The storage connection configuration.
402 : pub storage: RemoteStorageKind,
403 : }
404 :
405 : /// A kind of a remote storage to connect to, with its connection configuration.
406 670 : #[derive(Debug, Clone, PartialEq, Eq)]
407 : pub enum RemoteStorageKind {
408 : /// Storage based on local file system.
409 : /// Specify a root folder to place all stored files into.
410 : LocalFs(Utf8PathBuf),
411 : /// AWS S3 based storage, storing all files in the S3 bucket
412 : /// specified by the config
413 : AwsS3(S3Config),
414 : /// Azure Blob based storage, storing all files in the container
415 : /// specified by the config
416 : AzureContainer(AzureConfig),
417 : }
418 :
419 : /// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
420 446 : #[derive(Clone, PartialEq, Eq)]
421 : pub struct S3Config {
422 : /// Name of the bucket to connect to.
423 : pub bucket_name: String,
424 : /// The region where the bucket is located at.
425 : pub bucket_region: String,
426 : /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
427 : pub prefix_in_bucket: Option<String>,
428 : /// A base URL to send S3 requests to.
429 : /// By default, the endpoint is derived from a region name, assuming it's
430 : /// an AWS S3 region name, erroring on wrong region name.
431 : /// Endpoint provides a way to support other S3 flavors and their regions.
432 : ///
433 : /// Example: `http://127.0.0.1:5000`
434 : pub endpoint: Option<String>,
435 : /// AWS S3 has various limits on its API calls, we need not to exceed those.
436 : /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
437 : pub concurrency_limit: NonZeroUsize,
438 : pub max_keys_per_list_response: Option<i32>,
439 : }
440 :
441 : impl Debug for S3Config {
442 24 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
443 24 : f.debug_struct("S3Config")
444 24 : .field("bucket_name", &self.bucket_name)
445 24 : .field("bucket_region", &self.bucket_region)
446 24 : .field("prefix_in_bucket", &self.prefix_in_bucket)
447 24 : .field("concurrency_limit", &self.concurrency_limit)
448 24 : .field(
449 24 : "max_keys_per_list_response",
450 24 : &self.max_keys_per_list_response,
451 24 : )
452 24 : .finish()
453 24 : }
454 : }
455 :
456 : /// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
457 UBC 0 : #[derive(Clone, PartialEq, Eq)]
458 : pub struct AzureConfig {
459 : /// Name of the container to connect to.
460 : pub container_name: String,
461 : /// The region where the bucket is located at.
462 : pub container_region: String,
463 : /// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
464 : pub prefix_in_container: Option<String>,
465 : /// Azure has various limits on its API calls, we need not to exceed those.
466 : /// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
467 : pub concurrency_limit: NonZeroUsize,
468 : pub max_keys_per_list_response: Option<i32>,
469 : }
470 :
471 : impl Debug for AzureConfig {
472 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473 0 : f.debug_struct("AzureConfig")
474 0 : .field("bucket_name", &self.container_name)
475 0 : .field("bucket_region", &self.container_region)
476 0 : .field("prefix_in_bucket", &self.prefix_in_container)
477 0 : .field("concurrency_limit", &self.concurrency_limit)
478 0 : .field(
479 0 : "max_keys_per_list_response",
480 0 : &self.max_keys_per_list_response,
481 0 : )
482 0 : .finish()
483 0 : }
484 : }
485 :
486 : impl RemoteStorageConfig {
487 CBC 951 : pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
488 951 : let local_path = toml.get("local_path");
489 951 : let bucket_name = toml.get("bucket_name");
490 951 : let bucket_region = toml.get("bucket_region");
491 951 : let container_name = toml.get("container_name");
492 951 : let container_region = toml.get("container_region");
493 :
494 951 : let use_azure = container_name.is_some() && container_region.is_some();
495 :
496 951 : let max_concurrent_syncs = NonZeroUsize::new(
497 951 : parse_optional_integer("max_concurrent_syncs", toml)?
498 951 : .unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS),
499 951 : )
500 951 : .context("Failed to parse 'max_concurrent_syncs' as a positive integer")?;
501 :
502 951 : let max_sync_errors = NonZeroU32::new(
503 951 : parse_optional_integer("max_sync_errors", toml)?
504 951 : .unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS),
505 951 : )
506 951 : .context("Failed to parse 'max_sync_errors' as a positive integer")?;
507 :
508 951 : let default_concurrency_limit = if use_azure {
509 UBC 0 : DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT
510 : } else {
511 CBC 951 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
512 : };
513 951 : let concurrency_limit = NonZeroUsize::new(
514 951 : parse_optional_integer("concurrency_limit", toml)?.unwrap_or(default_concurrency_limit),
515 951 : )
516 951 : .context("Failed to parse 'concurrency_limit' as a positive integer")?;
517 :
518 951 : let max_keys_per_list_response =
519 951 : parse_optional_integer::<i32, _>("max_keys_per_list_response", toml)
520 951 : .context("Failed to parse 'max_keys_per_list_response' as a positive integer")?
521 951 : .or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE);
522 :
523 951 : let endpoint = toml
524 951 : .get("endpoint")
525 951 : .map(|endpoint| parse_toml_string("endpoint", endpoint))
526 951 : .transpose()?;
527 :
528 949 : let storage = match (
529 951 : local_path,
530 951 : bucket_name,
531 951 : bucket_region,
532 951 : container_name,
533 951 : container_region,
534 : ) {
535 : // no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled
536 2 : (None, None, None, None, None) => return Ok(None),
537 : (_, Some(_), None, ..) => {
538 UBC 0 : bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
539 : }
540 : (_, None, Some(_), ..) => {
541 0 : bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
542 : }
543 CBC 406 : (None, Some(bucket_name), Some(bucket_region), ..) => {
544 406 : RemoteStorageKind::AwsS3(S3Config {
545 406 : bucket_name: parse_toml_string("bucket_name", bucket_name)?,
546 406 : bucket_region: parse_toml_string("bucket_region", bucket_region)?,
547 406 : prefix_in_bucket: toml
548 406 : .get("prefix_in_bucket")
549 406 : .map(|prefix_in_bucket| {
550 406 : parse_toml_string("prefix_in_bucket", prefix_in_bucket)
551 406 : })
552 406 : .transpose()?,
553 406 : endpoint,
554 406 : concurrency_limit,
555 406 : max_keys_per_list_response,
556 : })
557 : }
558 : (_, _, _, Some(_), None) => {
559 UBC 0 : bail!("'container_name' option is mandatory if 'container_region' is given ")
560 : }
561 : (_, _, _, None, Some(_)) => {
562 0 : bail!("'container_name' option is mandatory if 'container_region' is given ")
563 : }
564 0 : (None, None, None, Some(container_name), Some(container_region)) => {
565 0 : RemoteStorageKind::AzureContainer(AzureConfig {
566 0 : container_name: parse_toml_string("container_name", container_name)?,
567 0 : container_region: parse_toml_string("container_region", container_region)?,
568 0 : prefix_in_container: toml
569 0 : .get("prefix_in_container")
570 0 : .map(|prefix_in_container| {
571 0 : parse_toml_string("prefix_in_container", prefix_in_container)
572 0 : })
573 0 : .transpose()?,
574 0 : concurrency_limit,
575 0 : max_keys_per_list_response,
576 : })
577 : }
578 CBC 543 : (Some(local_path), None, None, None, None) => RemoteStorageKind::LocalFs(
579 543 : Utf8PathBuf::from(parse_toml_string("local_path", local_path)?),
580 : ),
581 : (Some(_), Some(_), ..) => {
582 UBC 0 : bail!("'local_path' and 'bucket_name' are mutually exclusive")
583 : }
584 : (Some(_), _, _, Some(_), Some(_)) => {
585 0 : bail!("local_path and 'container_name' are mutually exclusive")
586 : }
587 : };
588 :
589 CBC 949 : Ok(Some(RemoteStorageConfig {
590 949 : max_concurrent_syncs,
591 949 : max_sync_errors,
592 949 : storage,
593 949 : }))
594 951 : }
595 : }
596 :
597 : // Helper functions to parse a toml Item
598 3804 : fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
599 3804 : where
600 3804 : I: TryFrom<i64, Error = E>,
601 3804 : E: std::error::Error + Send + Sync + 'static,
602 3804 : {
603 3804 : let toml_integer = match item.get(name) {
604 6 : Some(item) => item
605 6 : .as_integer()
606 6 : .with_context(|| format!("configure option {name} is not an integer"))?,
607 3798 : None => return Ok(None),
608 : };
609 :
610 6 : I::try_from(toml_integer)
611 6 : .map(Some)
612 6 : .with_context(|| format!("configure option {name} is too large"))
613 3804 : }
614 :
615 1976 : fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result<String> {
616 1976 : let s = item
617 1976 : .as_str()
618 1976 : .with_context(|| format!("configure option {name} is not a string"))?;
619 1976 : Ok(s.to_string())
620 1976 : }
621 :
622 : struct ConcurrencyLimiter {
623 : // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
624 : // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
625 : // The helps to ensure we don't exceed the thresholds.
626 : write: Arc<Semaphore>,
627 : read: Arc<Semaphore>,
628 : }
629 :
630 : impl ConcurrencyLimiter {
631 18141 : fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
632 18141 : match kind {
633 1198 : RequestKind::Get => &self.read,
634 14483 : RequestKind::Put => &self.write,
635 215 : RequestKind::List => &self.read,
636 2245 : RequestKind::Delete => &self.write,
637 : }
638 18141 : }
639 :
640 16943 : async fn acquire(
641 16943 : &self,
642 16943 : kind: RequestKind,
643 16943 : ) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
644 16943 : self.for_kind(kind).acquire().await
645 16943 : }
646 :
647 1198 : async fn acquire_owned(
648 1198 : &self,
649 1198 : kind: RequestKind,
650 1198 : ) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
651 1198 : Arc::clone(self.for_kind(kind)).acquire_owned().await
652 1198 : }
653 :
654 270 : fn new(limit: usize) -> ConcurrencyLimiter {
655 270 : Self {
656 270 : read: Arc::new(Semaphore::new(limit)),
657 270 : write: Arc::new(Semaphore::new(limit)),
658 270 : }
659 270 : }
660 : }
661 :
662 : #[cfg(test)]
663 : mod tests {
664 : use super::*;
665 :
666 1 : #[test]
667 1 : fn test_object_name() {
668 1 : let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
669 1 : assert_eq!(k.object_name(), Some("c"));
670 :
671 1 : let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
672 1 : assert_eq!(k.object_name(), Some("c"));
673 :
674 1 : let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
675 1 : assert_eq!(k.object_name(), Some("a"));
676 :
677 : // XXX is it impossible to have an empty key?
678 1 : let k = RemotePath::new(Utf8Path::new("")).unwrap();
679 1 : assert_eq!(k.object_name(), None);
680 1 : }
681 :
682 1 : #[test]
683 1 : fn rempte_path_cannot_be_created_from_absolute_ones() {
684 1 : let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
685 1 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
686 1 : }
687 : }
|