Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::{
8 : borrow::Cow,
9 : collections::HashMap,
10 : num::NonZeroU32,
11 : pin::Pin,
12 : sync::Arc,
13 : task::{Context, Poll},
14 : time::{Duration, SystemTime},
15 : };
16 :
17 : use anyhow::{anyhow, Context as _};
18 : use aws_config::{
19 : environment::credentials::EnvironmentVariableCredentialsProvider,
20 : imds::credentials::ImdsCredentialsProvider,
21 : meta::credentials::CredentialsProviderChain,
22 : profile::ProfileFileCredentialsProvider,
23 : provider_config::ProviderConfig,
24 : retry::{RetryConfigBuilder, RetryMode},
25 : web_identity_token::WebIdentityTokenCredentialsProvider,
26 : BehaviorVersion,
27 : };
28 : use aws_credential_types::provider::SharedCredentialsProvider;
29 : use aws_sdk_s3::{
30 : config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
31 : error::SdkError,
32 : operation::get_object::GetObjectError,
33 : types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
34 : Client,
35 : };
36 : use aws_smithy_async::rt::sleep::TokioSleep;
37 :
38 : use aws_smithy_types::{body::SdkBody, DateTime};
39 : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
40 : use bytes::Bytes;
41 : use futures::stream::Stream;
42 : use hyper::Body;
43 : use scopeguard::ScopeGuard;
44 : use tokio_util::sync::CancellationToken;
45 : use utils::backoff;
46 :
47 : use super::StorageMetadata;
48 : use crate::{
49 : error::Cancelled,
50 : metrics::{start_counting_cancelled_wait, start_measuring_requests},
51 : support::PermitCarrying,
52 : ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
53 : S3Config, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
54 : REMOTE_STORAGE_PREFIX_SEPARATOR,
55 : };
56 :
57 : use crate::metrics::AttemptOutcome;
58 : pub(super) use crate::metrics::RequestKind;
59 :
60 : /// AWS S3 storage.
61 : pub struct S3Bucket {
62 : client: Client,
63 : bucket_name: String,
64 : prefix_in_bucket: Option<String>,
65 : max_keys_per_list_response: Option<i32>,
66 : upload_storage_class: Option<StorageClass>,
67 : concurrency_limiter: ConcurrencyLimiter,
68 : // Per-request timeout. Accessible for tests.
69 : pub timeout: Duration,
70 : }
71 :
72 : struct GetObjectRequest {
73 : bucket: String,
74 : key: String,
75 : range: Option<String>,
76 : }
77 : impl S3Bucket {
78 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
79 38 : pub fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
80 38 : tracing::debug!(
81 0 : "Creating s3 remote storage for S3 bucket {}",
82 : remote_storage_config.bucket_name
83 : );
84 :
85 38 : let region = Some(Region::new(remote_storage_config.bucket_region.clone()));
86 38 :
87 38 : let provider_conf = ProviderConfig::without_region().with_region(region.clone());
88 38 :
89 38 : let credentials_provider = {
90 38 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
91 38 : CredentialsProviderChain::first_try(
92 38 : "env",
93 38 : EnvironmentVariableCredentialsProvider::new(),
94 38 : )
95 38 : // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
96 38 : .or_else(
97 38 : "profile-sso",
98 38 : ProfileFileCredentialsProvider::builder()
99 38 : .configure(&provider_conf)
100 38 : .build(),
101 38 : )
102 38 : // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
103 38 : // needed to access remote extensions bucket
104 38 : .or_else(
105 38 : "token",
106 38 : WebIdentityTokenCredentialsProvider::builder()
107 38 : .configure(&provider_conf)
108 38 : .build(),
109 38 : )
110 38 : // uses imds v2
111 38 : .or_else("imds", ImdsCredentialsProvider::builder().build())
112 38 : };
113 38 :
114 38 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
115 38 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
116 38 :
117 38 : let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
118 38 : #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
119 38 : BehaviorVersion::v2023_11_09(),
120 38 : )
121 38 : .region(region)
122 38 : .identity_cache(IdentityCache::lazy().build())
123 38 : .credentials_provider(SharedCredentialsProvider::new(credentials_provider))
124 38 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
125 38 :
126 38 : let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
127 38 : s.spawn(|| {
128 38 : // TODO: make this function async.
129 38 : tokio::runtime::Builder::new_current_thread()
130 38 : .enable_all()
131 38 : .build()
132 38 : .unwrap()
133 38 : .block_on(sdk_config_loader.load())
134 38 : })
135 38 : .join()
136 38 : .unwrap()
137 38 : });
138 38 :
139 38 : let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
140 :
141 : // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
142 : // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
143 38 : if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
144 0 : s3_config_builder = s3_config_builder
145 0 : .endpoint_url(custom_endpoint)
146 0 : .force_path_style(true);
147 38 : }
148 :
149 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
150 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
151 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
152 38 : let mut retry_config = RetryConfigBuilder::new();
153 38 : retry_config
154 38 : .set_max_attempts(Some(1))
155 38 : .set_mode(Some(RetryMode::Adaptive));
156 38 : s3_config_builder = s3_config_builder.retry_config(retry_config.build());
157 38 :
158 38 : let s3_config = s3_config_builder.build();
159 38 : let client = aws_sdk_s3::Client::from_conf(s3_config);
160 38 :
161 38 : let prefix_in_bucket = remote_storage_config
162 38 : .prefix_in_bucket
163 38 : .as_deref()
164 38 : .map(|prefix| {
165 34 : let mut prefix = prefix;
166 38 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
167 4 : prefix = &prefix[1..]
168 : }
169 :
170 34 : let mut prefix = prefix.to_string();
171 60 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
172 26 : prefix.pop();
173 26 : }
174 34 : prefix
175 38 : });
176 38 :
177 38 : Ok(Self {
178 38 : client,
179 38 : bucket_name: remote_storage_config.bucket_name.clone(),
180 38 : max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
181 38 : prefix_in_bucket,
182 38 : concurrency_limiter: ConcurrencyLimiter::new(
183 38 : remote_storage_config.concurrency_limit.get(),
184 38 : ),
185 38 : upload_storage_class: remote_storage_config.upload_storage_class.clone(),
186 38 : timeout,
187 38 : })
188 38 : }
189 :
190 126 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
191 126 : let relative_path =
192 126 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
193 126 : Some(stripped) => stripped,
194 : // we rely on AWS to return properly prefixed paths
195 : // for requests with a certain prefix
196 0 : None => panic!(
197 0 : "Key {} does not start with bucket prefix {:?}",
198 0 : key, self.prefix_in_bucket
199 0 : ),
200 : };
201 126 : RemotePath(
202 126 : relative_path
203 126 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
204 126 : .collect(),
205 126 : )
206 126 : }
207 :
208 308 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
209 308 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
210 308 : let path_string = path.get_path().as_str();
211 308 : match &self.prefix_in_bucket {
212 296 : Some(prefix) => prefix.clone() + "/" + path_string,
213 12 : None => path_string.to_string(),
214 : }
215 308 : }
216 :
217 240 : async fn permit(
218 240 : &self,
219 240 : kind: RequestKind,
220 240 : cancel: &CancellationToken,
221 240 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
222 240 : let started_at = start_counting_cancelled_wait(kind);
223 240 : let acquire = self.concurrency_limiter.acquire(kind);
224 :
225 240 : let permit = tokio::select! {
226 : permit = acquire => permit.expect("semaphore is never closed"),
227 : _ = cancel.cancelled() => return Err(Cancelled),
228 : };
229 :
230 240 : let started_at = ScopeGuard::into_inner(started_at);
231 240 : crate::metrics::BUCKET_METRICS
232 240 : .wait_seconds
233 240 : .observe_elapsed(kind, started_at);
234 240 :
235 240 : Ok(permit)
236 240 : }
237 :
238 24 : async fn owned_permit(
239 24 : &self,
240 24 : kind: RequestKind,
241 24 : cancel: &CancellationToken,
242 24 : ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
243 24 : let started_at = start_counting_cancelled_wait(kind);
244 24 : let acquire = self.concurrency_limiter.acquire_owned(kind);
245 :
246 24 : let permit = tokio::select! {
247 : permit = acquire => permit.expect("semaphore is never closed"),
248 : _ = cancel.cancelled() => return Err(Cancelled),
249 : };
250 :
251 24 : let started_at = ScopeGuard::into_inner(started_at);
252 24 : crate::metrics::BUCKET_METRICS
253 24 : .wait_seconds
254 24 : .observe_elapsed(kind, started_at);
255 24 : Ok(permit)
256 24 : }
257 :
258 24 : async fn download_object(
259 24 : &self,
260 24 : request: GetObjectRequest,
261 24 : cancel: &CancellationToken,
262 24 : ) -> Result<Download, DownloadError> {
263 24 : let kind = RequestKind::Get;
264 :
265 24 : let permit = self.owned_permit(kind, cancel).await?;
266 :
267 24 : let started_at = start_measuring_requests(kind);
268 24 :
269 24 : let get_object = self
270 24 : .client
271 24 : .get_object()
272 24 : .bucket(request.bucket)
273 24 : .key(request.key)
274 24 : .set_range(request.range)
275 24 : .send();
276 :
277 24 : let get_object = tokio::select! {
278 : res = get_object => res,
279 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
280 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
281 : };
282 :
283 24 : let started_at = ScopeGuard::into_inner(started_at);
284 :
285 24 : let object_output = match get_object {
286 24 : Ok(object_output) => object_output,
287 0 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
288 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
289 : // an error: we expect to sometimes fetch an object and find it missing,
290 : // e.g. when probing for timeline indices.
291 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
292 0 : kind,
293 0 : AttemptOutcome::Ok,
294 0 : started_at,
295 0 : );
296 0 : return Err(DownloadError::NotFound);
297 : }
298 0 : Err(e) => {
299 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
300 0 : kind,
301 0 : AttemptOutcome::Err,
302 0 : started_at,
303 0 : );
304 0 :
305 0 : return Err(DownloadError::Other(
306 0 : anyhow::Error::new(e).context("download s3 object"),
307 0 : ));
308 : }
309 : };
310 :
311 : // even if we would have no timeout left, continue anyways. the caller can decide to ignore
312 : // the errors considering timeouts and cancellation.
313 24 : let remaining = self.timeout.saturating_sub(started_at.elapsed());
314 24 :
315 24 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
316 24 : let etag = object_output
317 24 : .e_tag
318 24 : .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
319 24 : .into();
320 24 : let last_modified = object_output
321 24 : .last_modified
322 24 : .ok_or(DownloadError::Other(anyhow::anyhow!(
323 24 : "Missing LastModified header"
324 24 : )))?
325 24 : .try_into()
326 24 : .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
327 :
328 24 : let body = object_output.body;
329 24 : let body = ByteStreamAsStream::from(body);
330 24 : let body = PermitCarrying::new(permit, body);
331 24 : let body = TimedDownload::new(started_at, body);
332 24 :
333 24 : let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
334 24 : let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
335 24 :
336 24 : Ok(Download {
337 24 : metadata,
338 24 : etag,
339 24 : last_modified,
340 24 : download_stream: Box::pin(body),
341 24 : })
342 24 : }
343 :
344 106 : async fn delete_oids(
345 106 : &self,
346 106 : _permit: &tokio::sync::SemaphorePermit<'_>,
347 106 : delete_objects: &[ObjectIdentifier],
348 106 : cancel: &CancellationToken,
349 106 : ) -> anyhow::Result<()> {
350 106 : let kind = RequestKind::Delete;
351 106 : let mut cancel = std::pin::pin!(cancel.cancelled());
352 :
353 106 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
354 106 : let started_at = start_measuring_requests(kind);
355 :
356 106 : let req = self
357 106 : .client
358 106 : .delete_objects()
359 106 : .bucket(self.bucket_name.clone())
360 106 : .delete(
361 106 : Delete::builder()
362 106 : .set_objects(Some(chunk.to_vec()))
363 106 : .build()
364 106 : .context("build request")?,
365 : )
366 106 : .send();
367 :
368 106 : let resp = tokio::select! {
369 : resp = req => resp,
370 : _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
371 : _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
372 : };
373 :
374 106 : let started_at = ScopeGuard::into_inner(started_at);
375 106 : crate::metrics::BUCKET_METRICS
376 106 : .req_seconds
377 106 : .observe_elapsed(kind, &resp, started_at);
378 :
379 106 : let resp = resp.context("request deletion")?;
380 106 : crate::metrics::BUCKET_METRICS
381 106 : .deleted_objects_total
382 106 : .inc_by(chunk.len() as u64);
383 :
384 106 : if let Some(errors) = resp.errors {
385 : // Log a bounded number of the errors within the response:
386 : // these requests can carry 1000 keys so logging each one
387 : // would be too verbose, especially as errors may lead us
388 : // to retry repeatedly.
389 : const LOG_UP_TO_N_ERRORS: usize = 10;
390 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
391 0 : tracing::warn!(
392 0 : "DeleteObjects key {} failed: {}: {}",
393 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
394 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
395 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
396 : );
397 : }
398 :
399 0 : return Err(anyhow::anyhow!(
400 0 : "Failed to delete {}/{} objects",
401 0 : errors.len(),
402 0 : chunk.len(),
403 0 : ));
404 106 : }
405 : }
406 106 : Ok(())
407 106 : }
408 : }
409 :
410 : pin_project_lite::pin_project! {
411 : struct ByteStreamAsStream {
412 : #[pin]
413 : inner: aws_smithy_types::byte_stream::ByteStream
414 : }
415 : }
416 :
417 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
418 24 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
419 24 : ByteStreamAsStream { inner }
420 24 : }
421 : }
422 :
423 : impl Stream for ByteStreamAsStream {
424 : type Item = std::io::Result<Bytes>;
425 :
426 48 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
427 48 : // this does the std::io::ErrorKind::Other conversion
428 48 : self.project().inner.poll_next(cx).map_err(|x| x.into())
429 48 : }
430 :
431 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
432 : // sense and Stream::size_hint does not really
433 : }
434 :
435 : pin_project_lite::pin_project! {
436 : /// Times and tracks the outcome of the request.
437 : struct TimedDownload<S> {
438 : started_at: std::time::Instant,
439 : outcome: AttemptOutcome,
440 : #[pin]
441 : inner: S
442 : }
443 :
444 : impl<S> PinnedDrop for TimedDownload<S> {
445 : fn drop(mut this: Pin<&mut Self>) {
446 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
447 : }
448 : }
449 : }
450 :
451 : impl<S> TimedDownload<S> {
452 24 : fn new(started_at: std::time::Instant, inner: S) -> Self {
453 24 : TimedDownload {
454 24 : started_at,
455 24 : outcome: AttemptOutcome::Cancelled,
456 24 : inner,
457 24 : }
458 24 : }
459 : }
460 :
461 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
462 : type Item = <S as Stream>::Item;
463 :
464 48 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
465 48 : use std::task::ready;
466 48 :
467 48 : let this = self.project();
468 :
469 48 : let res = ready!(this.inner.poll_next(cx));
470 22 : match &res {
471 22 : Some(Ok(_)) => {}
472 0 : Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
473 18 : None => *this.outcome = AttemptOutcome::Ok,
474 : }
475 :
476 40 : Poll::Ready(res)
477 48 : }
478 :
479 0 : fn size_hint(&self) -> (usize, Option<usize>) {
480 0 : self.inner.size_hint()
481 0 : }
482 : }
483 :
484 : impl RemoteStorage for S3Bucket {
485 24 : async fn list(
486 24 : &self,
487 24 : prefix: Option<&RemotePath>,
488 24 : mode: ListingMode,
489 24 : max_keys: Option<NonZeroU32>,
490 24 : cancel: &CancellationToken,
491 24 : ) -> Result<Listing, DownloadError> {
492 24 : let kind = RequestKind::List;
493 24 : // s3 sdk wants i32
494 24 : let mut max_keys = max_keys.map(|mk| mk.get() as i32);
495 24 : let mut result = Listing::default();
496 24 :
497 24 : // get the passed prefix or if it is not set use prefix_in_bucket value
498 24 : let list_prefix = prefix
499 24 : .map(|p| self.relative_path_to_s3_object(p))
500 24 : .or_else(|| {
501 20 : self.prefix_in_bucket.clone().map(|mut s| {
502 20 : s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
503 20 : s
504 20 : })
505 24 : });
506 :
507 24 : let _permit = self.permit(kind, cancel).await?;
508 :
509 24 : let mut continuation_token = None;
510 :
511 : loop {
512 32 : let started_at = start_measuring_requests(kind);
513 32 :
514 32 : // min of two Options, returning Some if one is value and another is
515 32 : // None (None is smaller than anything, so plain min doesn't work).
516 32 : let request_max_keys = self
517 32 : .max_keys_per_list_response
518 32 : .into_iter()
519 32 : .chain(max_keys.into_iter())
520 32 : .min();
521 32 : let mut request = self
522 32 : .client
523 32 : .list_objects_v2()
524 32 : .bucket(self.bucket_name.clone())
525 32 : .set_prefix(list_prefix.clone())
526 32 : .set_continuation_token(continuation_token)
527 32 : .set_max_keys(request_max_keys);
528 32 :
529 32 : if let ListingMode::WithDelimiter = mode {
530 10 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
531 22 : }
532 :
533 32 : let request = request.send();
534 :
535 32 : let response = tokio::select! {
536 : res = request => res,
537 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
538 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
539 : };
540 :
541 32 : let response = response
542 32 : .context("Failed to list S3 prefixes")
543 32 : .map_err(DownloadError::Other);
544 32 :
545 32 : let started_at = ScopeGuard::into_inner(started_at);
546 32 :
547 32 : crate::metrics::BUCKET_METRICS
548 32 : .req_seconds
549 32 : .observe_elapsed(kind, &response, started_at);
550 :
551 32 : let response = response?;
552 :
553 32 : let keys = response.contents();
554 32 : let empty = Vec::new();
555 32 : let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
556 32 :
557 32 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
558 :
559 110 : for object in keys {
560 80 : let object_path = object.key().expect("response does not contain a key");
561 80 : let remote_path = self.s3_object_to_relative_path(object_path);
562 80 : result.keys.push(remote_path);
563 80 : if let Some(mut mk) = max_keys {
564 4 : assert!(mk > 0);
565 4 : mk -= 1;
566 4 : if mk == 0 {
567 2 : return Ok(result); // limit reached
568 2 : }
569 2 : max_keys = Some(mk);
570 76 : }
571 : }
572 :
573 : // S3 gives us prefixes like "foo/", we return them like "foo"
574 46 : result.prefixes.extend(prefixes.iter().filter_map(|o| {
575 46 : Some(
576 46 : self.s3_object_to_relative_path(
577 46 : o.prefix()?
578 46 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
579 : ),
580 : )
581 46 : }));
582 :
583 30 : continuation_token = match response.next_continuation_token {
584 8 : Some(new_token) => Some(new_token),
585 22 : None => break,
586 22 : };
587 22 : }
588 22 :
589 22 : Ok(result)
590 24 : }
591 :
592 106 : async fn upload(
593 106 : &self,
594 106 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
595 106 : from_size_bytes: usize,
596 106 : to: &RemotePath,
597 106 : metadata: Option<StorageMetadata>,
598 106 : cancel: &CancellationToken,
599 106 : ) -> anyhow::Result<()> {
600 106 : let kind = RequestKind::Put;
601 106 : let _permit = self.permit(kind, cancel).await?;
602 :
603 106 : let started_at = start_measuring_requests(kind);
604 106 :
605 106 : let body = Body::wrap_stream(from);
606 106 : let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
607 :
608 106 : let upload = self
609 106 : .client
610 106 : .put_object()
611 106 : .bucket(self.bucket_name.clone())
612 106 : .key(self.relative_path_to_s3_object(to))
613 106 : .set_metadata(metadata.map(|m| m.0))
614 106 : .set_storage_class(self.upload_storage_class.clone())
615 106 : .content_length(from_size_bytes.try_into()?)
616 106 : .body(bytes_stream)
617 106 : .send();
618 106 :
619 106 : let upload = tokio::time::timeout(self.timeout, upload);
620 :
621 106 : let res = tokio::select! {
622 : res = upload => res,
623 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
624 : };
625 :
626 106 : if let Ok(inner) = &res {
627 106 : // do not incl. timeouts as errors in metrics but cancellations
628 106 : let started_at = ScopeGuard::into_inner(started_at);
629 106 : crate::metrics::BUCKET_METRICS
630 106 : .req_seconds
631 106 : .observe_elapsed(kind, inner, started_at);
632 106 : }
633 :
634 106 : match res {
635 106 : Ok(Ok(_put)) => Ok(()),
636 0 : Ok(Err(sdk)) => Err(sdk.into()),
637 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
638 : }
639 106 : }
640 :
641 2 : async fn copy(
642 2 : &self,
643 2 : from: &RemotePath,
644 2 : to: &RemotePath,
645 2 : cancel: &CancellationToken,
646 2 : ) -> anyhow::Result<()> {
647 2 : let kind = RequestKind::Copy;
648 2 : let _permit = self.permit(kind, cancel).await?;
649 :
650 2 : let timeout = tokio::time::sleep(self.timeout);
651 2 :
652 2 : let started_at = start_measuring_requests(kind);
653 2 :
654 2 : // we need to specify bucket_name as a prefix
655 2 : let copy_source = format!(
656 2 : "{}/{}",
657 2 : self.bucket_name,
658 2 : self.relative_path_to_s3_object(from)
659 2 : );
660 2 :
661 2 : let op = self
662 2 : .client
663 2 : .copy_object()
664 2 : .bucket(self.bucket_name.clone())
665 2 : .key(self.relative_path_to_s3_object(to))
666 2 : .set_storage_class(self.upload_storage_class.clone())
667 2 : .copy_source(copy_source)
668 2 : .send();
669 :
670 2 : let res = tokio::select! {
671 : res = op => res,
672 : _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
673 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
674 : };
675 :
676 2 : let started_at = ScopeGuard::into_inner(started_at);
677 2 : crate::metrics::BUCKET_METRICS
678 2 : .req_seconds
679 2 : .observe_elapsed(kind, &res, started_at);
680 2 :
681 2 : res?;
682 :
683 2 : Ok(())
684 2 : }
685 :
686 14 : async fn download(
687 14 : &self,
688 14 : from: &RemotePath,
689 14 : cancel: &CancellationToken,
690 14 : ) -> Result<Download, DownloadError> {
691 14 : // if prefix is not none then download file `prefix/from`
692 14 : // if prefix is none then download file `from`
693 14 : self.download_object(
694 14 : GetObjectRequest {
695 14 : bucket: self.bucket_name.clone(),
696 14 : key: self.relative_path_to_s3_object(from),
697 14 : range: None,
698 14 : },
699 14 : cancel,
700 14 : )
701 26 : .await
702 14 : }
703 :
704 10 : async fn download_byte_range(
705 10 : &self,
706 10 : from: &RemotePath,
707 10 : start_inclusive: u64,
708 10 : end_exclusive: Option<u64>,
709 10 : cancel: &CancellationToken,
710 10 : ) -> Result<Download, DownloadError> {
711 10 : // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
712 10 : // and needs both ends to be exclusive
713 10 : let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
714 10 : let range = Some(match end_inclusive {
715 6 : Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
716 4 : None => format!("bytes={start_inclusive}-"),
717 : });
718 :
719 10 : self.download_object(
720 10 : GetObjectRequest {
721 10 : bucket: self.bucket_name.clone(),
722 10 : key: self.relative_path_to_s3_object(from),
723 10 : range,
724 10 : },
725 10 : cancel,
726 10 : )
727 10 : .await
728 10 : }
729 :
730 102 : async fn delete_objects<'a>(
731 102 : &self,
732 102 : paths: &'a [RemotePath],
733 102 : cancel: &CancellationToken,
734 102 : ) -> anyhow::Result<()> {
735 102 : let kind = RequestKind::Delete;
736 102 : let permit = self.permit(kind, cancel).await?;
737 102 : let mut delete_objects = Vec::with_capacity(paths.len());
738 212 : for path in paths {
739 110 : let obj_id = ObjectIdentifier::builder()
740 110 : .set_key(Some(self.relative_path_to_s3_object(path)))
741 110 : .build()
742 110 : .context("convert path to oid")?;
743 110 : delete_objects.push(obj_id);
744 : }
745 :
746 345 : self.delete_oids(&permit, &delete_objects, cancel).await
747 102 : }
748 :
749 90 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
750 90 : let paths = std::array::from_ref(path);
751 290 : self.delete_objects(paths, cancel).await
752 90 : }
753 :
754 6 : async fn time_travel_recover(
755 6 : &self,
756 6 : prefix: Option<&RemotePath>,
757 6 : timestamp: SystemTime,
758 6 : done_if_after: SystemTime,
759 6 : cancel: &CancellationToken,
760 6 : ) -> Result<(), TimeTravelError> {
761 6 : let kind = RequestKind::TimeTravel;
762 6 : let permit = self.permit(kind, cancel).await?;
763 :
764 6 : let timestamp = DateTime::from(timestamp);
765 6 : let done_if_after = DateTime::from(done_if_after);
766 6 :
767 6 : tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
768 :
769 : // get the passed prefix or if it is not set use prefix_in_bucket value
770 6 : let prefix = prefix
771 6 : .map(|p| self.relative_path_to_s3_object(p))
772 6 : .or_else(|| self.prefix_in_bucket.clone());
773 6 :
774 6 : let warn_threshold = 3;
775 6 : let max_retries = 10;
776 6 : let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
777 :
778 6 : let mut key_marker = None;
779 6 : let mut version_id_marker = None;
780 6 : let mut versions_and_deletes = Vec::new();
781 :
782 : loop {
783 6 : let response = backoff::retry(
784 7 : || async {
785 7 : let op = self
786 7 : .client
787 7 : .list_object_versions()
788 7 : .bucket(self.bucket_name.clone())
789 7 : .set_prefix(prefix.clone())
790 7 : .set_key_marker(key_marker.clone())
791 7 : .set_version_id_marker(version_id_marker.clone())
792 7 : .send();
793 7 :
794 7 : tokio::select! {
795 7 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
796 7 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
797 7 : }
798 7 : },
799 6 : is_permanent,
800 6 : warn_threshold,
801 6 : max_retries,
802 6 : "listing object versions for time_travel_recover",
803 6 : cancel,
804 6 : )
805 45 : .await
806 6 : .ok_or_else(|| TimeTravelError::Cancelled)
807 6 : .and_then(|x| x)?;
808 :
809 6 : tracing::trace!(
810 0 : " Got List response version_id_marker={:?}, key_marker={:?}",
811 : response.version_id_marker,
812 : response.key_marker
813 : );
814 6 : let versions = response
815 6 : .versions
816 6 : .unwrap_or_default()
817 6 : .into_iter()
818 6 : .map(VerOrDelete::from_version);
819 6 : let deletes = response
820 6 : .delete_markers
821 6 : .unwrap_or_default()
822 6 : .into_iter()
823 6 : .map(VerOrDelete::from_delete_marker);
824 6 : itertools::process_results(versions.chain(deletes), |n_vds| {
825 6 : versions_and_deletes.extend(n_vds)
826 6 : })
827 6 : .map_err(TimeTravelError::Other)?;
828 12 : fn none_if_empty(v: Option<String>) -> Option<String> {
829 12 : v.filter(|v| !v.is_empty())
830 12 : }
831 6 : version_id_marker = none_if_empty(response.next_version_id_marker);
832 6 : key_marker = none_if_empty(response.next_key_marker);
833 6 : if version_id_marker.is_none() {
834 : // The final response is not supposed to be truncated
835 6 : if response.is_truncated.unwrap_or_default() {
836 0 : return Err(TimeTravelError::Other(anyhow::anyhow!(
837 0 : "Received truncated ListObjectVersions response for prefix={prefix:?}"
838 0 : )));
839 6 : }
840 6 : break;
841 0 : }
842 0 : // Limit the number of versions deletions, mostly so that we don't
843 0 : // keep requesting forever if the list is too long, as we'd put the
844 0 : // list in RAM.
845 0 : // Building a list of 100k entries that reaches the limit roughly takes
846 0 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
847 0 : const COMPLEXITY_LIMIT: usize = 100_000;
848 0 : if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
849 0 : return Err(TimeTravelError::TooManyVersions);
850 0 : }
851 : }
852 :
853 6 : tracing::info!(
854 0 : "Built list for time travel with {} versions and deletions",
855 0 : versions_and_deletes.len()
856 : );
857 :
858 : // Work on the list of references instead of the objects directly,
859 : // otherwise we get lifetime errors in the sort_by_key call below.
860 6 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
861 6 :
862 124 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
863 6 :
864 6 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
865 :
866 42 : for vd in &versions_and_deletes {
867 : let VerOrDelete {
868 36 : version_id, key, ..
869 36 : } = &vd;
870 36 : if version_id == "null" {
871 0 : return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
872 0 : indicating either disabled versioning, or legacy objects with null version id values")));
873 36 : }
874 36 : tracing::trace!(
875 0 : "Parsing version key={key} version_id={version_id} kind={:?}",
876 : vd.kind
877 : );
878 :
879 36 : vds_for_key.entry(key).or_default().push(vd);
880 : }
881 24 : for (key, versions) in vds_for_key {
882 18 : let last_vd = versions.last().unwrap();
883 18 : if last_vd.last_modified > done_if_after {
884 0 : tracing::trace!("Key {key} has version later than done_if_after, skipping");
885 0 : continue;
886 18 : }
887 : // the version we want to restore to.
888 18 : let version_to_restore_to =
889 28 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
890 0 : Ok(v) => v,
891 18 : Err(e) => e,
892 : };
893 18 : if version_to_restore_to == versions.len() {
894 6 : tracing::trace!("Key {key} has no changes since timestamp, skipping");
895 6 : continue;
896 12 : }
897 12 : let mut do_delete = false;
898 12 : if version_to_restore_to == 0 {
899 : // All versions more recent, so the key didn't exist at the specified time point.
900 6 : tracing::trace!(
901 0 : "All {} versions more recent for {key}, deleting",
902 0 : versions.len()
903 : );
904 6 : do_delete = true;
905 : } else {
906 6 : match &versions[version_to_restore_to - 1] {
907 : VerOrDelete {
908 : kind: VerOrDeleteKind::Version,
909 6 : version_id,
910 6 : ..
911 6 : } => {
912 6 : tracing::trace!("Copying old version {version_id} for {key}...");
913 : // Restore the state to the last version by copying
914 6 : let source_id =
915 6 : format!("{}/{key}?versionId={version_id}", self.bucket_name);
916 6 :
917 6 : backoff::retry(
918 6 : || async {
919 6 : let op = self
920 6 : .client
921 6 : .copy_object()
922 6 : .bucket(self.bucket_name.clone())
923 6 : .key(key)
924 6 : .set_storage_class(self.upload_storage_class.clone())
925 6 : .copy_source(&source_id)
926 6 : .send();
927 6 :
928 6 : tokio::select! {
929 6 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
930 6 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
931 6 : }
932 6 : },
933 6 : is_permanent,
934 6 : warn_threshold,
935 6 : max_retries,
936 6 : "copying object version for time_travel_recover",
937 6 : cancel,
938 6 : )
939 19 : .await
940 6 : .ok_or_else(|| TimeTravelError::Cancelled)
941 6 : .and_then(|x| x)?;
942 6 : tracing::info!(%version_id, %key, "Copied old version in S3");
943 : }
944 : VerOrDelete {
945 : kind: VerOrDeleteKind::DeleteMarker,
946 : ..
947 0 : } => {
948 0 : do_delete = true;
949 0 : }
950 : }
951 : };
952 12 : if do_delete {
953 6 : if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
954 : // Key has since been deleted (but there was some history), no need to do anything
955 2 : tracing::trace!("Key {key} already deleted, skipping.");
956 : } else {
957 4 : tracing::trace!("Deleting {key}...");
958 :
959 4 : let oid = ObjectIdentifier::builder()
960 4 : .key(key.to_owned())
961 4 : .build()
962 4 : .map_err(|e| TimeTravelError::Other(e.into()))?;
963 :
964 4 : self.delete_oids(&permit, &[oid], cancel)
965 8 : .await
966 4 : .map_err(|e| {
967 0 : // delete_oid0 will use TimeoutOrCancel
968 0 : if TimeoutOrCancel::caused_by_cancel(&e) {
969 0 : TimeTravelError::Cancelled
970 : } else {
971 0 : TimeTravelError::Other(e)
972 : }
973 4 : })?;
974 : }
975 6 : }
976 : }
977 6 : Ok(())
978 6 : }
979 : }
980 :
981 : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
982 : struct VerOrDelete {
983 : kind: VerOrDeleteKind,
984 : last_modified: DateTime,
985 : version_id: String,
986 : key: String,
987 : }
988 :
989 : #[derive(Debug)]
990 : enum VerOrDeleteKind {
991 : Version,
992 : DeleteMarker,
993 : }
994 :
995 : impl VerOrDelete {
996 36 : fn with_kind(
997 36 : kind: VerOrDeleteKind,
998 36 : last_modified: Option<DateTime>,
999 36 : version_id: Option<String>,
1000 36 : key: Option<String>,
1001 36 : ) -> anyhow::Result<Self> {
1002 36 : let lvk = (last_modified, version_id, key);
1003 36 : let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
1004 0 : anyhow::bail!(
1005 0 : "One (or more) of last_modified, key, and id is None. \
1006 0 : Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
1007 0 : lvk.0,
1008 0 : lvk.1,
1009 0 : lvk.2,
1010 0 : );
1011 : };
1012 36 : Ok(Self {
1013 36 : kind,
1014 36 : last_modified,
1015 36 : version_id,
1016 36 : key,
1017 36 : })
1018 36 : }
1019 28 : fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
1020 28 : Self::with_kind(
1021 28 : VerOrDeleteKind::Version,
1022 28 : v.last_modified,
1023 28 : v.version_id,
1024 28 : v.key,
1025 28 : )
1026 28 : }
1027 8 : fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
1028 8 : Self::with_kind(
1029 8 : VerOrDeleteKind::DeleteMarker,
1030 8 : v.last_modified,
1031 8 : v.version_id,
1032 8 : v.key,
1033 8 : )
1034 8 : }
1035 : }
1036 :
1037 : #[cfg(test)]
1038 : mod tests {
1039 : use camino::Utf8Path;
1040 : use std::num::NonZeroUsize;
1041 :
1042 : use crate::{RemotePath, S3Bucket, S3Config};
1043 :
1044 : #[test]
1045 4 : fn relative_path() {
1046 4 : let all_paths = ["", "some/path", "some/path/"];
1047 4 : let all_paths: Vec<RemotePath> = all_paths
1048 4 : .iter()
1049 12 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
1050 4 : .collect();
1051 4 : let prefixes = [
1052 4 : None,
1053 4 : Some(""),
1054 4 : Some("test/prefix"),
1055 4 : Some("test/prefix/"),
1056 4 : Some("/test/prefix/"),
1057 4 : ];
1058 4 : let expected_outputs = [
1059 4 : vec!["", "some/path", "some/path/"],
1060 4 : vec!["/", "/some/path", "/some/path/"],
1061 4 : vec![
1062 4 : "test/prefix/",
1063 4 : "test/prefix/some/path",
1064 4 : "test/prefix/some/path/",
1065 4 : ],
1066 4 : vec![
1067 4 : "test/prefix/",
1068 4 : "test/prefix/some/path",
1069 4 : "test/prefix/some/path/",
1070 4 : ],
1071 4 : vec![
1072 4 : "test/prefix/",
1073 4 : "test/prefix/some/path",
1074 4 : "test/prefix/some/path/",
1075 4 : ],
1076 4 : ];
1077 :
1078 20 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
1079 20 : let config = S3Config {
1080 20 : bucket_name: "bucket".to_owned(),
1081 20 : bucket_region: "region".to_owned(),
1082 20 : prefix_in_bucket: prefix.map(str::to_string),
1083 20 : endpoint: None,
1084 20 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
1085 20 : max_keys_per_list_response: Some(5),
1086 20 : upload_storage_class: None,
1087 20 : };
1088 20 : let storage =
1089 20 : S3Bucket::new(&config, std::time::Duration::ZERO).expect("remote storage init");
1090 60 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
1091 60 : let result = storage.relative_path_to_s3_object(test_path);
1092 60 : let expected = expected_outputs[prefix_idx][test_path_idx];
1093 60 : assert_eq!(result, expected);
1094 : }
1095 : }
1096 4 : }
1097 : }
|