Line data Source code
1 : use std::{fmt::Debug, num::NonZeroUsize, str::FromStr, time::Duration};
2 :
3 : use aws_sdk_s3::types::StorageClass;
4 : use camino::Utf8PathBuf;
5 :
6 : use serde::{Deserialize, Serialize};
7 :
8 : use crate::{
9 : DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
10 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
11 : };
12 :
13 : /// External backup storage configuration, enough for creating a client for that storage.
14 37 : #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
15 : pub struct RemoteStorageConfig {
16 : /// The storage connection configuration.
17 : #[serde(flatten)]
18 : pub storage: RemoteStorageKind,
19 : /// A common timeout enforced for all requests after concurrency limiter permit has been
20 : /// acquired.
21 : #[serde(
22 : with = "humantime_serde",
23 : default = "default_timeout",
24 : skip_serializing_if = "is_default_timeout"
25 : )]
26 : pub timeout: Duration,
27 : /// Alternative timeout used for metadata objects which are expected to be small
28 : #[serde(
29 : with = "humantime_serde",
30 : default = "default_small_timeout",
31 : skip_serializing_if = "is_default_small_timeout"
32 : )]
33 : pub small_timeout: Duration,
34 : }
35 :
36 : impl RemoteStorageKind {
37 0 : pub fn bucket_name(&self) -> Option<&str> {
38 0 : match self {
39 0 : RemoteStorageKind::LocalFs { .. } => None,
40 0 : RemoteStorageKind::AwsS3(config) => Some(&config.bucket_name),
41 0 : RemoteStorageKind::AzureContainer(config) => Some(&config.container_name),
42 : }
43 0 : }
44 : }
45 :
46 1 : fn default_timeout() -> Duration {
47 1 : RemoteStorageConfig::DEFAULT_TIMEOUT
48 1 : }
49 :
50 10 : fn default_small_timeout() -> Duration {
51 10 : RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
52 10 : }
53 :
54 0 : fn is_default_timeout(d: &Duration) -> bool {
55 0 : *d == RemoteStorageConfig::DEFAULT_TIMEOUT
56 0 : }
57 :
58 0 : fn is_default_small_timeout(d: &Duration) -> bool {
59 0 : *d == RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
60 0 : }
61 :
62 : /// A kind of a remote storage to connect to, with its connection configuration.
63 28 : #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
64 : #[serde(untagged)]
65 : pub enum RemoteStorageKind {
66 : /// Storage based on local file system.
67 : /// Specify a root folder to place all stored files into.
68 : LocalFs { local_path: Utf8PathBuf },
69 : /// AWS S3 based storage, storing all files in the S3 bucket
70 : /// specified by the config
71 : AwsS3(S3Config),
72 : /// Azure Blob based storage, storing all files in the container
73 : /// specified by the config
74 : AzureContainer(AzureConfig),
75 : }
76 :
77 : /// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
78 25 : #[derive(Clone, PartialEq, Eq, Deserialize, Serialize)]
79 : pub struct S3Config {
80 : /// Name of the bucket to connect to.
81 : pub bucket_name: String,
82 : /// The region where the bucket is located at.
83 : pub bucket_region: String,
84 : /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
85 : pub prefix_in_bucket: Option<String>,
86 : /// A base URL to send S3 requests to.
87 : /// By default, the endpoint is derived from a region name, assuming it's
88 : /// an AWS S3 region name, erroring on wrong region name.
89 : /// Endpoint provides a way to support other S3 flavors and their regions.
90 : ///
91 : /// Example: `http://127.0.0.1:5000`
92 : pub endpoint: Option<String>,
93 : /// AWS S3 has various limits on its API calls, we need not to exceed those.
94 : /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details.
95 : #[serde(default = "default_remote_storage_s3_concurrency_limit")]
96 : pub concurrency_limit: NonZeroUsize,
97 : #[serde(default = "default_max_keys_per_list_response")]
98 : pub max_keys_per_list_response: Option<i32>,
99 : #[serde(
100 : deserialize_with = "deserialize_storage_class",
101 : serialize_with = "serialize_storage_class",
102 : default
103 : )]
104 : pub upload_storage_class: Option<StorageClass>,
105 : }
106 :
107 7 : fn default_remote_storage_s3_concurrency_limit() -> NonZeroUsize {
108 7 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
109 7 : .try_into()
110 7 : .unwrap()
111 7 : }
112 :
113 7 : fn default_max_keys_per_list_response() -> Option<i32> {
114 7 : DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
115 7 : }
116 :
117 0 : fn default_azure_conn_pool_size() -> usize {
118 0 : // By default, the Azure SDK does no connection pooling, due to historic reports of hard-to-reproduce issues
119 0 : // (https://github.com/hyperium/hyper/issues/2312)
120 0 : //
121 0 : // However, using connection pooling is important to avoid exhausting client ports when
122 0 : // doing huge numbers of requests (https://github.com/neondatabase/cloud/issues/20971)
123 0 : //
124 0 : // We therefore enable a modest pool size by default: this may be configured to zero if
125 0 : // issues like the alleged upstream hyper issue appear.
126 0 : 8
127 0 : }
128 :
129 : impl Debug for S3Config {
130 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 0 : f.debug_struct("S3Config")
132 0 : .field("bucket_name", &self.bucket_name)
133 0 : .field("bucket_region", &self.bucket_region)
134 0 : .field("prefix_in_bucket", &self.prefix_in_bucket)
135 0 : .field("concurrency_limit", &self.concurrency_limit)
136 0 : .field(
137 0 : "max_keys_per_list_response",
138 0 : &self.max_keys_per_list_response,
139 0 : )
140 0 : .finish()
141 0 : }
142 : }
143 :
144 : /// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
145 12 : #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
146 : pub struct AzureConfig {
147 : /// Name of the container to connect to.
148 : pub container_name: String,
149 : /// Name of the storage account the container is inside of
150 : pub storage_account: Option<String>,
151 : /// The region where the bucket is located at.
152 : pub container_region: String,
153 : /// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
154 : pub prefix_in_container: Option<String>,
155 : /// Azure has various limits on its API calls, we need not to exceed those.
156 : /// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
157 : #[serde(default = "default_remote_storage_azure_concurrency_limit")]
158 : pub concurrency_limit: NonZeroUsize,
159 : #[serde(default = "default_max_keys_per_list_response")]
160 : pub max_keys_per_list_response: Option<i32>,
161 : #[serde(default = "default_azure_conn_pool_size")]
162 : pub conn_pool_size: usize,
163 : }
164 :
165 6 : fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
166 6 : NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
167 6 : }
168 :
169 : impl Debug for AzureConfig {
170 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 0 : f.debug_struct("AzureConfig")
172 0 : .field("bucket_name", &self.container_name)
173 0 : .field("storage_account", &self.storage_account)
174 0 : .field("bucket_region", &self.container_region)
175 0 : .field("prefix_in_container", &self.prefix_in_container)
176 0 : .field("concurrency_limit", &self.concurrency_limit)
177 0 : .field(
178 0 : "max_keys_per_list_response",
179 0 : &self.max_keys_per_list_response,
180 0 : )
181 0 : .finish()
182 0 : }
183 : }
184 :
185 15 : fn deserialize_storage_class<'de, D: serde::Deserializer<'de>>(
186 15 : deserializer: D,
187 15 : ) -> Result<Option<StorageClass>, D::Error> {
188 15 : Option::<String>::deserialize(deserializer).and_then(|s| {
189 15 : if let Some(s) = s {
190 : use serde::de::Error;
191 12 : let storage_class = StorageClass::from_str(&s).expect("infallible");
192 : #[allow(deprecated)]
193 12 : if matches!(storage_class, StorageClass::Unknown(_)) {
194 0 : return Err(D::Error::custom(format!(
195 0 : "Specified storage class unknown to SDK: '{s}'. Allowed values: {:?}",
196 0 : StorageClass::values()
197 0 : )));
198 12 : }
199 12 : Ok(Some(storage_class))
200 : } else {
201 3 : Ok(None)
202 : }
203 15 : })
204 15 : }
205 :
206 9 : fn serialize_storage_class<S: serde::Serializer>(
207 9 : val: &Option<StorageClass>,
208 9 : serializer: S,
209 9 : ) -> Result<S::Ok, S::Error> {
210 9 : let val = val.as_ref().map(StorageClass::as_str);
211 9 : Option::<&str>::serialize(&val, serializer)
212 9 : }
213 :
214 : impl RemoteStorageConfig {
215 : pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
216 : pub const DEFAULT_SMALL_TIMEOUT: Duration = std::time::Duration::from_secs(30);
217 :
218 10 : pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
219 10 : Ok(utils::toml_edit_ext::deserialize_item(toml)?)
220 10 : }
221 :
222 9 : pub fn from_toml_str(input: &str) -> anyhow::Result<RemoteStorageConfig> {
223 9 : let toml_document = toml_edit::DocumentMut::from_str(input)?;
224 9 : if let Some(item) = toml_document.get("remote_storage") {
225 0 : return Self::from_toml(item);
226 9 : }
227 9 : Self::from_toml(toml_document.as_item())
228 9 : }
229 : }
230 :
231 : #[cfg(test)]
232 : mod tests {
233 : use super::*;
234 :
235 9 : fn parse(input: &str) -> anyhow::Result<RemoteStorageConfig> {
236 9 : RemoteStorageConfig::from_toml_str(input)
237 9 : }
238 :
239 : #[test]
240 3 : fn parse_localfs_config_with_timeout() {
241 3 : let input = "local_path = '.'
242 3 : timeout = '5s'";
243 3 :
244 3 : let config = parse(input).unwrap();
245 3 :
246 3 : assert_eq!(
247 3 : config,
248 3 : RemoteStorageConfig {
249 3 : storage: RemoteStorageKind::LocalFs {
250 3 : local_path: Utf8PathBuf::from(".")
251 3 : },
252 3 : timeout: Duration::from_secs(5),
253 3 : small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
254 3 : }
255 3 : );
256 3 : }
257 :
258 : #[test]
259 3 : fn test_s3_parsing() {
260 3 : let toml = "\
261 3 : bucket_name = 'foo-bar'
262 3 : bucket_region = 'eu-central-1'
263 3 : upload_storage_class = 'INTELLIGENT_TIERING'
264 3 : timeout = '7s'
265 3 : ";
266 3 :
267 3 : let config = parse(toml).unwrap();
268 3 :
269 3 : assert_eq!(
270 3 : config,
271 3 : RemoteStorageConfig {
272 3 : storage: RemoteStorageKind::AwsS3(S3Config {
273 3 : bucket_name: "foo-bar".into(),
274 3 : bucket_region: "eu-central-1".into(),
275 3 : prefix_in_bucket: None,
276 3 : endpoint: None,
277 3 : concurrency_limit: default_remote_storage_s3_concurrency_limit(),
278 3 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
279 3 : upload_storage_class: Some(StorageClass::IntelligentTiering),
280 3 : }),
281 3 : timeout: Duration::from_secs(7),
282 3 : small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
283 3 : }
284 3 : );
285 3 : }
286 :
287 : #[test]
288 3 : fn test_storage_class_serde_roundtrip() {
289 3 : let classes = [
290 3 : None,
291 3 : Some(StorageClass::Standard),
292 3 : Some(StorageClass::IntelligentTiering),
293 3 : ];
294 12 : for class in classes {
295 9 : #[derive(Serialize, Deserialize)]
296 9 : struct Wrapper {
297 : #[serde(
298 : deserialize_with = "deserialize_storage_class",
299 : serialize_with = "serialize_storage_class"
300 : )]
301 : class: Option<StorageClass>,
302 : }
303 9 : let wrapped = Wrapper {
304 9 : class: class.clone(),
305 9 : };
306 9 : let serialized = serde_json::to_string(&wrapped).unwrap();
307 9 : let deserialized: Wrapper = serde_json::from_str(&serialized).unwrap();
308 9 : assert_eq!(class, deserialized.class);
309 : }
310 3 : }
311 :
312 : #[test]
313 3 : fn test_azure_parsing() {
314 3 : let toml = "\
315 3 : container_name = 'foo-bar'
316 3 : container_region = 'westeurope'
317 3 : upload_storage_class = 'INTELLIGENT_TIERING'
318 3 : timeout = '7s'
319 3 : conn_pool_size = 8
320 3 : ";
321 3 :
322 3 : let config = parse(toml).unwrap();
323 3 :
324 3 : assert_eq!(
325 3 : config,
326 3 : RemoteStorageConfig {
327 3 : storage: RemoteStorageKind::AzureContainer(AzureConfig {
328 3 : container_name: "foo-bar".into(),
329 3 : storage_account: None,
330 3 : container_region: "westeurope".into(),
331 3 : prefix_in_container: None,
332 3 : concurrency_limit: default_remote_storage_azure_concurrency_limit(),
333 3 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
334 3 : conn_pool_size: 8,
335 3 : }),
336 3 : timeout: Duration::from_secs(7),
337 3 : small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
338 3 : }
339 3 : );
340 3 : }
341 : }
|