Line data Source code
1 : //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
2 : //! No other modules from this tree are supposed to be used directly by the external code.
3 : //!
4 : //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
5 : //! * [`local_fs`] allows to use local file system as an external storage
6 : //! * [`s3_bucket`] uses AWS S3 bucket as an external storage
7 : //!
8 : mod local_fs;
9 : mod s3_bucket;
10 : mod simulate_failures;
11 :
12 : use std::{
13 : collections::HashMap,
14 : fmt::Debug,
15 : num::{NonZeroU32, NonZeroUsize},
16 : path::{Path, PathBuf},
17 : pin::Pin,
18 : sync::Arc,
19 : };
20 :
21 : use anyhow::{bail, Context};
22 :
23 : use tokio::io;
24 : use toml_edit::Item;
25 : use tracing::info;
26 :
27 : pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket, simulate_failures::UnreliableWrapper};
28 :
29 : /// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
30 : /// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
31 : /// during start (where local and remote timelines are compared and initial sync tasks are scheduled) and timeline attach.
32 : /// Both cases may trigger timeline download, that might download a lot of layers. This concurrency is limited by the clients internally, if needed.
33 : pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS: usize = 50;
34 : pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
35 : /// Currently, sync happens with AWS S3, that has two limits on requests per second:
36 : /// ~200 RPS for IAM services
37 : /// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
38 : /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
39 : /// <https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/>
40 : pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
41 : /// No limits on the client side, which currenltly means 1000 for AWS S3.
42 : /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
43 : pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
44 :
45 : const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
46 :
47 : /// Path on the remote storage, relative to some inner prefix.
48 : /// The prefix is an implementation detail, that allows representing local paths
49 : /// as the remote ones, stripping the local storage prefix away.
50 11814 : #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
51 : pub struct RemotePath(PathBuf);
52 :
53 : impl std::fmt::Display for RemotePath {
54 187 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 187 : write!(f, "{}", self.0.display())
56 187 : }
57 : }
58 :
59 : impl RemotePath {
60 31455 : pub fn new(relative_path: &Path) -> anyhow::Result<Self> {
61 31455 : anyhow::ensure!(
62 31455 : relative_path.is_relative(),
63 1 : "Path {relative_path:?} is not relative"
64 : );
65 31454 : Ok(Self(relative_path.to_path_buf()))
66 31455 : }
67 :
68 6731 : pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
69 6731 : Self::new(Path::new(relative_path))
70 6731 : }
71 :
72 8993 : pub fn with_base(&self, base_path: &Path) -> PathBuf {
73 8993 : base_path.join(&self.0)
74 8993 : }
75 :
76 2036 : pub fn object_name(&self) -> Option<&str> {
77 2036 : self.0.file_name().and_then(|os_str| os_str.to_str())
78 2036 : }
79 :
80 931 : pub fn join(&self, segment: &Path) -> Self {
81 931 : Self(self.0.join(segment))
82 931 : }
83 :
84 18844 : pub fn get_path(&self) -> &PathBuf {
85 18844 : &self.0
86 18844 : }
87 :
88 0 : pub fn extension(&self) -> Option<&str> {
89 0 : self.0.extension()?.to_str()
90 0 : }
91 : }
92 :
93 : /// Storage (potentially remote) API to manage its state.
94 : /// This storage tries to be unaware of any layered repository context,
95 : /// providing basic CRUD operations for storage files.
96 : #[async_trait::async_trait]
97 : pub trait RemoteStorage: Send + Sync + 'static {
98 : /// Lists all top level subdirectories for a given prefix
99 : /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
100 : /// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
101 : /// so this method doesnt need to.
102 : async fn list_prefixes(
103 : &self,
104 : prefix: Option<&RemotePath>,
105 : ) -> Result<Vec<RemotePath>, DownloadError>;
106 :
107 : /// Lists all files in directory "recursively"
108 : /// (not really recursively, because AWS has a flat namespace)
109 : /// Note: This is subtely different than list_prefixes,
110 : /// because it is for listing files instead of listing
111 : /// names sharing common prefixes.
112 : /// For example,
113 : /// list_files("foo/bar") = ["foo/bar/cat123.txt",
114 : /// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"]
115 : /// whereas,
116 : /// list_prefixes("foo/bar/") = ["cat", "dog"]
117 : /// See `test_real_s3.rs` for more details.
118 : async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>>;
119 :
120 : /// Streams the local file contents into remote into the remote storage entry.
121 : async fn upload(
122 : &self,
123 : from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
124 : // S3 PUT request requires the content length to be specified,
125 : // otherwise it starts to fail with the concurrent connection count increasing.
126 : data_size_bytes: usize,
127 : to: &RemotePath,
128 : metadata: Option<StorageMetadata>,
129 : ) -> anyhow::Result<()>;
130 :
131 : /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
132 : /// Returns the metadata, if any was stored with the file previously.
133 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError>;
134 :
135 : /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
136 : /// Returns the metadata, if any was stored with the file previously.
137 : async fn download_byte_range(
138 : &self,
139 : from: &RemotePath,
140 : start_inclusive: u64,
141 : end_exclusive: Option<u64>,
142 : ) -> Result<Download, DownloadError>;
143 :
144 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
145 :
146 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
147 : }
148 :
149 : pub struct Download {
150 : pub download_stream: Pin<Box<dyn io::AsyncRead + Unpin + Send + Sync>>,
151 : /// Extra key-value data, associated with the current remote file.
152 : pub metadata: Option<StorageMetadata>,
153 : }
154 :
155 : impl Debug for Download {
156 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 0 : f.debug_struct("Download")
158 0 : .field("metadata", &self.metadata)
159 0 : .finish()
160 0 : }
161 : }
162 :
163 0 : #[derive(Debug)]
164 : pub enum DownloadError {
165 : /// Validation or other error happened due to user input.
166 : BadInput(anyhow::Error),
167 : /// The file was not found in the remote storage.
168 : NotFound,
169 : /// The file was found in the remote storage, but the download failed.
170 : Other(anyhow::Error),
171 : }
172 :
173 : impl std::fmt::Display for DownloadError {
174 5079 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 5079 : match self {
176 0 : DownloadError::BadInput(e) => {
177 0 : write!(f, "Failed to download a remote file due to user input: {e}")
178 : }
179 0 : DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
180 5079 : DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
181 : }
182 5079 : }
183 : }
184 :
185 : impl std::error::Error for DownloadError {}
186 :
187 : /// Every storage, currently supported.
188 : /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
189 2346 : #[derive(Clone)]
190 : pub enum GenericRemoteStorage {
191 : LocalFs(LocalFs),
192 : AwsS3(Arc<S3Bucket>),
193 : Unreliable(Arc<UnreliableWrapper>),
194 : }
195 :
196 : impl GenericRemoteStorage {
197 : // A function for listing all the files in a "directory"
198 : // Example:
199 : // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
200 331 : pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
201 331 : match self {
202 30 : Self::LocalFs(s) => s.list_files(folder).await,
203 1142 : Self::AwsS3(s) => s.list_files(folder).await,
204 270 : Self::Unreliable(s) => s.list_files(folder).await,
205 : }
206 331 : }
207 :
208 : // lists common *prefixes*, if any of files
209 : // Example:
210 : // list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
211 51 : pub async fn list_prefixes(
212 51 : &self,
213 51 : prefix: Option<&RemotePath>,
214 51 : ) -> Result<Vec<RemotePath>, DownloadError> {
215 51 : match self {
216 67 : Self::LocalFs(s) => s.list_prefixes(prefix).await,
217 65 : Self::AwsS3(s) => s.list_prefixes(prefix).await,
218 18 : Self::Unreliable(s) => s.list_prefixes(prefix).await,
219 : }
220 51 : }
221 :
222 22230 : pub async fn upload(
223 22230 : &self,
224 22230 : from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
225 22230 : data_size_bytes: usize,
226 22230 : to: &RemotePath,
227 22230 : metadata: Option<StorageMetadata>,
228 22230 : ) -> anyhow::Result<()> {
229 22230 : match self {
230 1317262 : Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
231 37480 : Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
232 41224 : Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
233 : }
234 22217 : }
235 :
236 1822 : pub async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
237 1822 : match self {
238 875 : Self::LocalFs(s) => s.download(from).await,
239 2719 : Self::AwsS3(s) => s.download(from).await,
240 285 : Self::Unreliable(s) => s.download(from).await,
241 : }
242 1822 : }
243 :
244 6 : pub async fn download_byte_range(
245 6 : &self,
246 6 : from: &RemotePath,
247 6 : start_inclusive: u64,
248 6 : end_exclusive: Option<u64>,
249 6 : ) -> Result<Download, DownloadError> {
250 6 : match self {
251 2 : Self::LocalFs(s) => {
252 2 : s.download_byte_range(from, start_inclusive, end_exclusive)
253 4 : .await
254 : }
255 4 : Self::AwsS3(s) => {
256 4 : s.download_byte_range(from, start_inclusive, end_exclusive)
257 17 : .await
258 : }
259 0 : Self::Unreliable(s) => {
260 0 : s.download_byte_range(from, start_inclusive, end_exclusive)
261 0 : .await
262 : }
263 : }
264 6 : }
265 :
266 12467 : pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
267 12467 : match self {
268 1767 : Self::LocalFs(s) => s.delete(path).await,
269 30569 : Self::AwsS3(s) => s.delete(path).await,
270 8023 : Self::Unreliable(s) => s.delete(path).await,
271 : }
272 12467 : }
273 :
274 53 : pub async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
275 53 : match self {
276 3 : Self::LocalFs(s) => s.delete_objects(paths).await,
277 123 : Self::AwsS3(s) => s.delete_objects(paths).await,
278 512 : Self::Unreliable(s) => s.delete_objects(paths).await,
279 : }
280 53 : }
281 : }
282 :
283 : impl GenericRemoteStorage {
284 375 : pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
285 375 : Ok(match &storage_config.storage {
286 139 : RemoteStorageKind::LocalFs(root) => {
287 139 : info!("Using fs root '{}' as a remote storage", root.display());
288 139 : Self::LocalFs(LocalFs::new(root.clone())?)
289 : }
290 236 : RemoteStorageKind::AwsS3(s3_config) => {
291 236 : info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}'",
292 236 : s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
293 236 : Self::AwsS3(Arc::new(S3Bucket::new(s3_config)?))
294 : }
295 : })
296 375 : }
297 :
298 56 : pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
299 56 : Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
300 56 : }
301 :
302 : /// Takes storage object contents and its size and uploads to remote storage,
303 : /// mapping `from_path` to the corresponding remote object id in the storage.
304 : ///
305 : /// The storage object does not have to be present on the `from_path`,
306 : /// this path is used for the remote object id conversion only.
307 5267 : pub async fn upload_storage_object(
308 5267 : &self,
309 5267 : from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
310 5267 : from_size_bytes: usize,
311 5267 : to: &RemotePath,
312 5267 : ) -> anyhow::Result<()> {
313 5267 : self.upload(from, from_size_bytes, to, None)
314 40682 : .await
315 5262 : .with_context(|| {
316 779 : format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
317 5262 : })
318 5262 : }
319 :
320 : /// Downloads the storage object into the `to_path` provided.
321 : /// `byte_range` could be specified to dowload only a part of the file, if needed.
322 6 : pub async fn download_storage_object(
323 6 : &self,
324 6 : byte_range: Option<(u64, Option<u64>)>,
325 6 : from: &RemotePath,
326 6 : ) -> Result<Download, DownloadError> {
327 6 : match byte_range {
328 21 : Some((start, end)) => self.download_byte_range(from, start, end).await,
329 0 : None => self.download(from).await,
330 : }
331 6 : }
332 : }
333 :
334 : /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
335 : /// Immutable, cannot be changed once the file is created.
336 2 : #[derive(Debug, Clone, PartialEq, Eq)]
337 : pub struct StorageMetadata(HashMap<String, String>);
338 :
339 : /// External backup storage configuration, enough for creating a client for that storage.
340 669 : #[derive(Debug, Clone, PartialEq, Eq)]
341 : pub struct RemoteStorageConfig {
342 : /// Max allowed number of concurrent sync operations between the API user and the remote storage.
343 : pub max_concurrent_syncs: NonZeroUsize,
344 : /// Max allowed errors before the sync task is considered failed and evicted.
345 : pub max_sync_errors: NonZeroU32,
346 : /// The storage connection configuration.
347 : pub storage: RemoteStorageKind,
348 : }
349 :
350 : /// A kind of a remote storage to connect to, with its connection configuration.
351 669 : #[derive(Debug, Clone, PartialEq, Eq)]
352 : pub enum RemoteStorageKind {
353 : /// Storage based on local file system.
354 : /// Specify a root folder to place all stored files into.
355 : LocalFs(PathBuf),
356 : /// AWS S3 based storage, storing all files in the S3 bucket
357 : /// specified by the config
358 : AwsS3(S3Config),
359 : }
360 :
361 : /// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
362 446 : #[derive(Clone, PartialEq, Eq)]
363 : pub struct S3Config {
364 : /// Name of the bucket to connect to.
365 : pub bucket_name: String,
366 : /// The region where the bucket is located at.
367 : pub bucket_region: String,
368 : /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
369 : pub prefix_in_bucket: Option<String>,
370 : /// A base URL to send S3 requests to.
371 : /// By default, the endpoint is derived from a region name, assuming it's
372 : /// an AWS S3 region name, erroring on wrong region name.
373 : /// Endpoint provides a way to support other S3 flavors and their regions.
374 : ///
375 : /// Example: `http://127.0.0.1:5000`
376 : pub endpoint: Option<String>,
377 : /// AWS S3 has various limits on its API calls, we need not to exceed those.
378 : /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
379 : pub concurrency_limit: NonZeroUsize,
380 : pub max_keys_per_list_response: Option<i32>,
381 : }
382 :
383 : impl Debug for S3Config {
384 24 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
385 24 : f.debug_struct("S3Config")
386 24 : .field("bucket_name", &self.bucket_name)
387 24 : .field("bucket_region", &self.bucket_region)
388 24 : .field("prefix_in_bucket", &self.prefix_in_bucket)
389 24 : .field("concurrency_limit", &self.concurrency_limit)
390 24 : .field(
391 24 : "max_keys_per_list_response",
392 24 : &self.max_keys_per_list_response,
393 24 : )
394 24 : .finish()
395 24 : }
396 : }
397 :
398 : impl RemoteStorageConfig {
399 986 : pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
400 986 : let local_path = toml.get("local_path");
401 986 : let bucket_name = toml.get("bucket_name");
402 986 : let bucket_region = toml.get("bucket_region");
403 :
404 986 : let max_concurrent_syncs = NonZeroUsize::new(
405 986 : parse_optional_integer("max_concurrent_syncs", toml)?
406 986 : .unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS),
407 986 : )
408 986 : .context("Failed to parse 'max_concurrent_syncs' as a positive integer")?;
409 :
410 986 : let max_sync_errors = NonZeroU32::new(
411 986 : parse_optional_integer("max_sync_errors", toml)?
412 986 : .unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS),
413 986 : )
414 986 : .context("Failed to parse 'max_sync_errors' as a positive integer")?;
415 :
416 986 : let concurrency_limit = NonZeroUsize::new(
417 986 : parse_optional_integer("concurrency_limit", toml)?
418 986 : .unwrap_or(DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT),
419 986 : )
420 986 : .context("Failed to parse 'concurrency_limit' as a positive integer")?;
421 :
422 986 : let max_keys_per_list_response =
423 986 : parse_optional_integer::<i32, _>("max_keys_per_list_response", toml)
424 986 : .context("Failed to parse 'max_keys_per_list_response' as a positive integer")?
425 986 : .or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE);
426 :
427 986 : let storage = match (local_path, bucket_name, bucket_region) {
428 : // no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled
429 449 : (None, None, None) => return Ok(None),
430 : (_, Some(_), None) => {
431 0 : bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
432 : }
433 : (_, None, Some(_)) => {
434 0 : bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
435 : }
436 366 : (None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config {
437 366 : bucket_name: parse_toml_string("bucket_name", bucket_name)?,
438 366 : bucket_region: parse_toml_string("bucket_region", bucket_region)?,
439 366 : prefix_in_bucket: toml
440 366 : .get("prefix_in_bucket")
441 366 : .map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket))
442 366 : .transpose()?,
443 366 : endpoint: toml
444 366 : .get("endpoint")
445 366 : .map(|endpoint| parse_toml_string("endpoint", endpoint))
446 366 : .transpose()?,
447 366 : concurrency_limit,
448 366 : max_keys_per_list_response,
449 : }),
450 171 : (Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
451 171 : parse_toml_string("local_path", local_path)?,
452 : )),
453 0 : (Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
454 : };
455 :
456 537 : Ok(Some(RemoteStorageConfig {
457 537 : max_concurrent_syncs,
458 537 : max_sync_errors,
459 537 : storage,
460 537 : }))
461 986 : }
462 : }
463 :
464 : // Helper functions to parse a toml Item
465 3944 : fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
466 3944 : where
467 3944 : I: TryFrom<i64, Error = E>,
468 3944 : E: std::error::Error + Send + Sync + 'static,
469 3944 : {
470 3944 : let toml_integer = match item.get(name) {
471 6 : Some(item) => item
472 6 : .as_integer()
473 6 : .with_context(|| format!("configure option {name} is not an integer"))?,
474 3938 : None => return Ok(None),
475 : };
476 :
477 6 : I::try_from(toml_integer)
478 6 : .map(Some)
479 6 : .with_context(|| format!("configure option {name} is too large"))
480 3944 : }
481 :
482 1452 : fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result<String> {
483 1452 : let s = item
484 1452 : .as_str()
485 1452 : .with_context(|| format!("configure option {name} is not a string"))?;
486 1452 : Ok(s.to_string())
487 1452 : }
488 :
489 : #[cfg(test)]
490 : mod tests {
491 : use super::*;
492 :
493 1 : #[test]
494 1 : fn test_object_name() {
495 1 : let k = RemotePath::new(Path::new("a/b/c")).unwrap();
496 1 : assert_eq!(k.object_name(), Some("c"));
497 :
498 1 : let k = RemotePath::new(Path::new("a/b/c/")).unwrap();
499 1 : assert_eq!(k.object_name(), Some("c"));
500 :
501 1 : let k = RemotePath::new(Path::new("a/")).unwrap();
502 1 : assert_eq!(k.object_name(), Some("a"));
503 :
504 : // XXX is it impossible to have an empty key?
505 1 : let k = RemotePath::new(Path::new("")).unwrap();
506 1 : assert_eq!(k.object_name(), None);
507 1 : }
508 :
509 1 : #[test]
510 1 : fn rempte_path_cannot_be_created_from_absolute_ones() {
511 1 : let err = RemotePath::new(Path::new("/")).expect_err("Should fail on absolute paths");
512 1 : assert_eq!(err.to_string(), "Path \"/\" is not relative");
513 1 : }
514 : }
|