Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::{
8 : borrow::Cow,
9 : collections::HashMap,
10 : num::NonZeroU32,
11 : pin::Pin,
12 : sync::Arc,
13 : task::{Context, Poll},
14 : time::{Duration, SystemTime},
15 : };
16 :
17 : use anyhow::{anyhow, Context as _};
18 : use aws_config::{
19 : default_provider::credentials::DefaultCredentialsChain,
20 : retry::{RetryConfigBuilder, RetryMode},
21 : BehaviorVersion,
22 : };
23 : use aws_sdk_s3::{
24 : config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
25 : error::SdkError,
26 : operation::get_object::GetObjectError,
27 : types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
28 : Client,
29 : };
30 : use aws_smithy_async::rt::sleep::TokioSleep;
31 :
32 : use aws_smithy_types::{body::SdkBody, DateTime};
33 : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
34 : use bytes::Bytes;
35 : use futures::stream::Stream;
36 : use hyper::Body;
37 : use scopeguard::ScopeGuard;
38 : use tokio_util::sync::CancellationToken;
39 : use utils::backoff;
40 :
41 : use super::StorageMetadata;
42 : use crate::{
43 : config::S3Config,
44 : error::Cancelled,
45 : metrics::{start_counting_cancelled_wait, start_measuring_requests},
46 : support::PermitCarrying,
47 : ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
48 : TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
49 : };
50 :
51 : use crate::metrics::AttemptOutcome;
52 : pub(super) use crate::metrics::RequestKind;
53 :
54 : /// AWS S3 storage.
55 : pub struct S3Bucket {
56 : client: Client,
57 : bucket_name: String,
58 : prefix_in_bucket: Option<String>,
59 : max_keys_per_list_response: Option<i32>,
60 : upload_storage_class: Option<StorageClass>,
61 : concurrency_limiter: ConcurrencyLimiter,
62 : // Per-request timeout. Accessible for tests.
63 : pub timeout: Duration,
64 : }
65 :
66 : struct GetObjectRequest {
67 : bucket: String,
68 : key: String,
69 : range: Option<String>,
70 : }
71 : impl S3Bucket {
72 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
73 38 : pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
74 38 : tracing::debug!(
75 0 : "Creating s3 remote storage for S3 bucket {}",
76 : remote_storage_config.bucket_name
77 : );
78 :
79 38 : let region = Region::new(remote_storage_config.bucket_region.clone());
80 38 : let region_opt = Some(region.clone());
81 :
82 : // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
83 : // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
84 : // Incomplete list of auth methods used by this:
85 : // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
86 : // * "AWS_PROFILE" / `aws sso login --profile <profile>`
87 : // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
88 : // * http (ECS/EKS) container credentials
89 : // * imds v2
90 38 : let credentials_provider = DefaultCredentialsChain::builder()
91 38 : .region(region)
92 38 : .build()
93 0 : .await;
94 :
95 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
96 38 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
97 38 :
98 38 : let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
99 38 : #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
100 38 : BehaviorVersion::v2023_11_09(),
101 38 : )
102 38 : .region(region_opt)
103 38 : .identity_cache(IdentityCache::lazy().build())
104 38 : .credentials_provider(credentials_provider)
105 38 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
106 38 :
107 38 : let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
108 38 : s.spawn(|| {
109 38 : // TODO: make this function async.
110 38 : tokio::runtime::Builder::new_current_thread()
111 38 : .enable_all()
112 38 : .build()
113 38 : .unwrap()
114 38 : .block_on(sdk_config_loader.load())
115 38 : })
116 38 : .join()
117 38 : .unwrap()
118 38 : });
119 38 :
120 38 : let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
121 :
122 : // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
123 : // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
124 38 : if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
125 0 : s3_config_builder = s3_config_builder
126 0 : .endpoint_url(custom_endpoint)
127 0 : .force_path_style(true);
128 38 : }
129 :
130 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
131 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
132 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
133 38 : let mut retry_config = RetryConfigBuilder::new();
134 38 : retry_config
135 38 : .set_max_attempts(Some(1))
136 38 : .set_mode(Some(RetryMode::Adaptive));
137 38 : s3_config_builder = s3_config_builder.retry_config(retry_config.build());
138 38 :
139 38 : let s3_config = s3_config_builder.build();
140 38 : let client = aws_sdk_s3::Client::from_conf(s3_config);
141 38 :
142 38 : let prefix_in_bucket = remote_storage_config
143 38 : .prefix_in_bucket
144 38 : .as_deref()
145 38 : .map(|prefix| {
146 34 : let mut prefix = prefix;
147 38 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
148 4 : prefix = &prefix[1..]
149 : }
150 :
151 34 : let mut prefix = prefix.to_string();
152 60 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
153 26 : prefix.pop();
154 26 : }
155 34 : prefix
156 38 : });
157 38 :
158 38 : Ok(Self {
159 38 : client,
160 38 : bucket_name: remote_storage_config.bucket_name.clone(),
161 38 : max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
162 38 : prefix_in_bucket,
163 38 : concurrency_limiter: ConcurrencyLimiter::new(
164 38 : remote_storage_config.concurrency_limit.get(),
165 38 : ),
166 38 : upload_storage_class: remote_storage_config.upload_storage_class.clone(),
167 38 : timeout,
168 38 : })
169 38 : }
170 :
171 126 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
172 126 : let relative_path =
173 126 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
174 126 : Some(stripped) => stripped,
175 : // we rely on AWS to return properly prefixed paths
176 : // for requests with a certain prefix
177 0 : None => panic!(
178 0 : "Key {} does not start with bucket prefix {:?}",
179 0 : key, self.prefix_in_bucket
180 0 : ),
181 : };
182 126 : RemotePath(
183 126 : relative_path
184 126 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
185 126 : .collect(),
186 126 : )
187 126 : }
188 :
189 309 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
190 309 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
191 309 : let path_string = path.get_path().as_str();
192 309 : match &self.prefix_in_bucket {
193 297 : Some(prefix) => prefix.clone() + "/" + path_string,
194 12 : None => path_string.to_string(),
195 : }
196 309 : }
197 :
198 241 : async fn permit(
199 241 : &self,
200 241 : kind: RequestKind,
201 241 : cancel: &CancellationToken,
202 241 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
203 241 : let started_at = start_counting_cancelled_wait(kind);
204 241 : let acquire = self.concurrency_limiter.acquire(kind);
205 :
206 241 : let permit = tokio::select! {
207 : permit = acquire => permit.expect("semaphore is never closed"),
208 : _ = cancel.cancelled() => return Err(Cancelled),
209 : };
210 :
211 241 : let started_at = ScopeGuard::into_inner(started_at);
212 241 : crate::metrics::BUCKET_METRICS
213 241 : .wait_seconds
214 241 : .observe_elapsed(kind, started_at);
215 241 :
216 241 : Ok(permit)
217 241 : }
218 :
219 24 : async fn owned_permit(
220 24 : &self,
221 24 : kind: RequestKind,
222 24 : cancel: &CancellationToken,
223 24 : ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
224 24 : let started_at = start_counting_cancelled_wait(kind);
225 24 : let acquire = self.concurrency_limiter.acquire_owned(kind);
226 :
227 24 : let permit = tokio::select! {
228 : permit = acquire => permit.expect("semaphore is never closed"),
229 : _ = cancel.cancelled() => return Err(Cancelled),
230 : };
231 :
232 24 : let started_at = ScopeGuard::into_inner(started_at);
233 24 : crate::metrics::BUCKET_METRICS
234 24 : .wait_seconds
235 24 : .observe_elapsed(kind, started_at);
236 24 : Ok(permit)
237 24 : }
238 :
239 24 : async fn download_object(
240 24 : &self,
241 24 : request: GetObjectRequest,
242 24 : cancel: &CancellationToken,
243 24 : ) -> Result<Download, DownloadError> {
244 24 : let kind = RequestKind::Get;
245 :
246 24 : let permit = self.owned_permit(kind, cancel).await?;
247 :
248 24 : let started_at = start_measuring_requests(kind);
249 24 :
250 24 : let get_object = self
251 24 : .client
252 24 : .get_object()
253 24 : .bucket(request.bucket)
254 24 : .key(request.key)
255 24 : .set_range(request.range)
256 24 : .send();
257 :
258 24 : let get_object = tokio::select! {
259 : res = get_object => res,
260 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
261 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
262 : };
263 :
264 24 : let started_at = ScopeGuard::into_inner(started_at);
265 :
266 24 : let object_output = match get_object {
267 24 : Ok(object_output) => object_output,
268 0 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
269 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
270 : // an error: we expect to sometimes fetch an object and find it missing,
271 : // e.g. when probing for timeline indices.
272 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
273 0 : kind,
274 0 : AttemptOutcome::Ok,
275 0 : started_at,
276 0 : );
277 0 : return Err(DownloadError::NotFound);
278 : }
279 0 : Err(e) => {
280 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
281 0 : kind,
282 0 : AttemptOutcome::Err,
283 0 : started_at,
284 0 : );
285 0 :
286 0 : return Err(DownloadError::Other(
287 0 : anyhow::Error::new(e).context("download s3 object"),
288 0 : ));
289 : }
290 : };
291 :
292 : // even if we would have no timeout left, continue anyways. the caller can decide to ignore
293 : // the errors considering timeouts and cancellation.
294 24 : let remaining = self.timeout.saturating_sub(started_at.elapsed());
295 24 :
296 24 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
297 24 : let etag = object_output
298 24 : .e_tag
299 24 : .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
300 24 : .into();
301 24 : let last_modified = object_output
302 24 : .last_modified
303 24 : .ok_or(DownloadError::Other(anyhow::anyhow!(
304 24 : "Missing LastModified header"
305 24 : )))?
306 24 : .try_into()
307 24 : .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
308 :
309 24 : let body = object_output.body;
310 24 : let body = ByteStreamAsStream::from(body);
311 24 : let body = PermitCarrying::new(permit, body);
312 24 : let body = TimedDownload::new(started_at, body);
313 24 :
314 24 : let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
315 24 : let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
316 24 :
317 24 : Ok(Download {
318 24 : metadata,
319 24 : etag,
320 24 : last_modified,
321 24 : download_stream: Box::pin(body),
322 24 : })
323 24 : }
324 :
325 106 : async fn delete_oids(
326 106 : &self,
327 106 : _permit: &tokio::sync::SemaphorePermit<'_>,
328 106 : delete_objects: &[ObjectIdentifier],
329 106 : cancel: &CancellationToken,
330 106 : ) -> anyhow::Result<()> {
331 106 : let kind = RequestKind::Delete;
332 106 : let mut cancel = std::pin::pin!(cancel.cancelled());
333 :
334 106 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
335 106 : let started_at = start_measuring_requests(kind);
336 :
337 106 : let req = self
338 106 : .client
339 106 : .delete_objects()
340 106 : .bucket(self.bucket_name.clone())
341 106 : .delete(
342 106 : Delete::builder()
343 106 : .set_objects(Some(chunk.to_vec()))
344 106 : .build()
345 106 : .context("build request")?,
346 : )
347 106 : .send();
348 :
349 106 : let resp = tokio::select! {
350 : resp = req => resp,
351 : _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
352 : _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
353 : };
354 :
355 106 : let started_at = ScopeGuard::into_inner(started_at);
356 106 : crate::metrics::BUCKET_METRICS
357 106 : .req_seconds
358 106 : .observe_elapsed(kind, &resp, started_at);
359 :
360 106 : let resp = resp.context("request deletion")?;
361 106 : crate::metrics::BUCKET_METRICS
362 106 : .deleted_objects_total
363 106 : .inc_by(chunk.len() as u64);
364 :
365 106 : if let Some(errors) = resp.errors {
366 : // Log a bounded number of the errors within the response:
367 : // these requests can carry 1000 keys so logging each one
368 : // would be too verbose, especially as errors may lead us
369 : // to retry repeatedly.
370 : const LOG_UP_TO_N_ERRORS: usize = 10;
371 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
372 0 : tracing::warn!(
373 0 : "DeleteObjects key {} failed: {}: {}",
374 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
375 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
376 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
377 : );
378 : }
379 :
380 0 : return Err(anyhow::anyhow!(
381 0 : "Failed to delete {}/{} objects",
382 0 : errors.len(),
383 0 : chunk.len(),
384 0 : ));
385 106 : }
386 : }
387 106 : Ok(())
388 106 : }
389 : }
390 :
391 : pin_project_lite::pin_project! {
392 : struct ByteStreamAsStream {
393 : #[pin]
394 : inner: aws_smithy_types::byte_stream::ByteStream
395 : }
396 : }
397 :
398 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
399 24 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
400 24 : ByteStreamAsStream { inner }
401 24 : }
402 : }
403 :
404 : impl Stream for ByteStreamAsStream {
405 : type Item = std::io::Result<Bytes>;
406 :
407 50 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
408 50 : // this does the std::io::ErrorKind::Other conversion
409 50 : self.project().inner.poll_next(cx).map_err(|x| x.into())
410 50 : }
411 :
412 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
413 : // sense and Stream::size_hint does not really
414 : }
415 :
416 : pin_project_lite::pin_project! {
417 : /// Times and tracks the outcome of the request.
418 : struct TimedDownload<S> {
419 : started_at: std::time::Instant,
420 : outcome: AttemptOutcome,
421 : #[pin]
422 : inner: S
423 : }
424 :
425 : impl<S> PinnedDrop for TimedDownload<S> {
426 : fn drop(mut this: Pin<&mut Self>) {
427 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
428 : }
429 : }
430 : }
431 :
432 : impl<S> TimedDownload<S> {
433 24 : fn new(started_at: std::time::Instant, inner: S) -> Self {
434 24 : TimedDownload {
435 24 : started_at,
436 24 : outcome: AttemptOutcome::Cancelled,
437 24 : inner,
438 24 : }
439 24 : }
440 : }
441 :
442 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
443 : type Item = <S as Stream>::Item;
444 :
445 50 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
446 50 : use std::task::ready;
447 50 :
448 50 : let this = self.project();
449 :
450 50 : let res = ready!(this.inner.poll_next(cx));
451 22 : match &res {
452 22 : Some(Ok(_)) => {}
453 0 : Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
454 18 : None => *this.outcome = AttemptOutcome::Ok,
455 : }
456 :
457 40 : Poll::Ready(res)
458 50 : }
459 :
460 0 : fn size_hint(&self) -> (usize, Option<usize>) {
461 0 : self.inner.size_hint()
462 0 : }
463 : }
464 :
465 : impl RemoteStorage for S3Bucket {
466 24 : async fn list(
467 24 : &self,
468 24 : prefix: Option<&RemotePath>,
469 24 : mode: ListingMode,
470 24 : max_keys: Option<NonZeroU32>,
471 24 : cancel: &CancellationToken,
472 24 : ) -> Result<Listing, DownloadError> {
473 24 : let kind = RequestKind::List;
474 24 : // s3 sdk wants i32
475 24 : let mut max_keys = max_keys.map(|mk| mk.get() as i32);
476 24 : let mut result = Listing::default();
477 24 :
478 24 : // get the passed prefix or if it is not set use prefix_in_bucket value
479 24 : let list_prefix = prefix
480 24 : .map(|p| self.relative_path_to_s3_object(p))
481 24 : .or_else(|| {
482 20 : self.prefix_in_bucket.clone().map(|mut s| {
483 20 : s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
484 20 : s
485 20 : })
486 24 : });
487 :
488 24 : let _permit = self.permit(kind, cancel).await?;
489 :
490 24 : let mut continuation_token = None;
491 :
492 : loop {
493 32 : let started_at = start_measuring_requests(kind);
494 32 :
495 32 : // min of two Options, returning Some if one is value and another is
496 32 : // None (None is smaller than anything, so plain min doesn't work).
497 32 : let request_max_keys = self
498 32 : .max_keys_per_list_response
499 32 : .into_iter()
500 32 : .chain(max_keys.into_iter())
501 32 : .min();
502 32 : let mut request = self
503 32 : .client
504 32 : .list_objects_v2()
505 32 : .bucket(self.bucket_name.clone())
506 32 : .set_prefix(list_prefix.clone())
507 32 : .set_continuation_token(continuation_token)
508 32 : .set_max_keys(request_max_keys);
509 32 :
510 32 : if let ListingMode::WithDelimiter = mode {
511 10 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
512 22 : }
513 :
514 32 : let request = request.send();
515 :
516 32 : let response = tokio::select! {
517 : res = request => res,
518 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
519 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
520 : };
521 :
522 32 : let response = response
523 32 : .context("Failed to list S3 prefixes")
524 32 : .map_err(DownloadError::Other);
525 32 :
526 32 : let started_at = ScopeGuard::into_inner(started_at);
527 32 :
528 32 : crate::metrics::BUCKET_METRICS
529 32 : .req_seconds
530 32 : .observe_elapsed(kind, &response, started_at);
531 :
532 32 : let response = response?;
533 :
534 32 : let keys = response.contents();
535 32 : let empty = Vec::new();
536 32 : let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
537 32 :
538 32 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
539 :
540 110 : for object in keys {
541 80 : let object_path = object.key().expect("response does not contain a key");
542 80 : let remote_path = self.s3_object_to_relative_path(object_path);
543 80 : result.keys.push(remote_path);
544 80 : if let Some(mut mk) = max_keys {
545 4 : assert!(mk > 0);
546 4 : mk -= 1;
547 4 : if mk == 0 {
548 2 : return Ok(result); // limit reached
549 2 : }
550 2 : max_keys = Some(mk);
551 76 : }
552 : }
553 :
554 : // S3 gives us prefixes like "foo/", we return them like "foo"
555 46 : result.prefixes.extend(prefixes.iter().filter_map(|o| {
556 46 : Some(
557 46 : self.s3_object_to_relative_path(
558 46 : o.prefix()?
559 46 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
560 : ),
561 : )
562 46 : }));
563 :
564 30 : continuation_token = match response.next_continuation_token {
565 8 : Some(new_token) => Some(new_token),
566 22 : None => break,
567 22 : };
568 22 : }
569 22 :
570 22 : Ok(result)
571 24 : }
572 :
573 107 : async fn upload(
574 107 : &self,
575 107 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
576 107 : from_size_bytes: usize,
577 107 : to: &RemotePath,
578 107 : metadata: Option<StorageMetadata>,
579 107 : cancel: &CancellationToken,
580 107 : ) -> anyhow::Result<()> {
581 107 : let kind = RequestKind::Put;
582 107 : let _permit = self.permit(kind, cancel).await?;
583 :
584 107 : let started_at = start_measuring_requests(kind);
585 107 :
586 107 : let body = Body::wrap_stream(from);
587 107 : let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
588 :
589 107 : let upload = self
590 107 : .client
591 107 : .put_object()
592 107 : .bucket(self.bucket_name.clone())
593 107 : .key(self.relative_path_to_s3_object(to))
594 107 : .set_metadata(metadata.map(|m| m.0))
595 107 : .set_storage_class(self.upload_storage_class.clone())
596 107 : .content_length(from_size_bytes.try_into()?)
597 107 : .body(bytes_stream)
598 107 : .send();
599 107 :
600 107 : let upload = tokio::time::timeout(self.timeout, upload);
601 :
602 107 : let res = tokio::select! {
603 : res = upload => res,
604 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
605 : };
606 :
607 107 : if let Ok(inner) = &res {
608 107 : // do not incl. timeouts as errors in metrics but cancellations
609 107 : let started_at = ScopeGuard::into_inner(started_at);
610 107 : crate::metrics::BUCKET_METRICS
611 107 : .req_seconds
612 107 : .observe_elapsed(kind, inner, started_at);
613 107 : }
614 :
615 107 : match res {
616 106 : Ok(Ok(_put)) => Ok(()),
617 1 : Ok(Err(sdk)) => Err(sdk.into()),
618 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
619 : }
620 107 : }
621 :
622 2 : async fn copy(
623 2 : &self,
624 2 : from: &RemotePath,
625 2 : to: &RemotePath,
626 2 : cancel: &CancellationToken,
627 2 : ) -> anyhow::Result<()> {
628 2 : let kind = RequestKind::Copy;
629 2 : let _permit = self.permit(kind, cancel).await?;
630 :
631 2 : let timeout = tokio::time::sleep(self.timeout);
632 2 :
633 2 : let started_at = start_measuring_requests(kind);
634 2 :
635 2 : // we need to specify bucket_name as a prefix
636 2 : let copy_source = format!(
637 2 : "{}/{}",
638 2 : self.bucket_name,
639 2 : self.relative_path_to_s3_object(from)
640 2 : );
641 2 :
642 2 : let op = self
643 2 : .client
644 2 : .copy_object()
645 2 : .bucket(self.bucket_name.clone())
646 2 : .key(self.relative_path_to_s3_object(to))
647 2 : .set_storage_class(self.upload_storage_class.clone())
648 2 : .copy_source(copy_source)
649 2 : .send();
650 :
651 2 : let res = tokio::select! {
652 : res = op => res,
653 : _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
654 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
655 : };
656 :
657 2 : let started_at = ScopeGuard::into_inner(started_at);
658 2 : crate::metrics::BUCKET_METRICS
659 2 : .req_seconds
660 2 : .observe_elapsed(kind, &res, started_at);
661 2 :
662 2 : res?;
663 :
664 2 : Ok(())
665 2 : }
666 :
667 14 : async fn download(
668 14 : &self,
669 14 : from: &RemotePath,
670 14 : cancel: &CancellationToken,
671 14 : ) -> Result<Download, DownloadError> {
672 14 : // if prefix is not none then download file `prefix/from`
673 14 : // if prefix is none then download file `from`
674 14 : self.download_object(
675 14 : GetObjectRequest {
676 14 : bucket: self.bucket_name.clone(),
677 14 : key: self.relative_path_to_s3_object(from),
678 14 : range: None,
679 14 : },
680 14 : cancel,
681 14 : )
682 14 : .await
683 14 : }
684 :
685 10 : async fn download_byte_range(
686 10 : &self,
687 10 : from: &RemotePath,
688 10 : start_inclusive: u64,
689 10 : end_exclusive: Option<u64>,
690 10 : cancel: &CancellationToken,
691 10 : ) -> Result<Download, DownloadError> {
692 10 : // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
693 10 : // and needs both ends to be exclusive
694 10 : let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
695 10 : let range = Some(match end_inclusive {
696 6 : Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
697 4 : None => format!("bytes={start_inclusive}-"),
698 : });
699 :
700 10 : self.download_object(
701 10 : GetObjectRequest {
702 10 : bucket: self.bucket_name.clone(),
703 10 : key: self.relative_path_to_s3_object(from),
704 10 : range,
705 10 : },
706 10 : cancel,
707 10 : )
708 10 : .await
709 10 : }
710 :
711 102 : async fn delete_objects<'a>(
712 102 : &self,
713 102 : paths: &'a [RemotePath],
714 102 : cancel: &CancellationToken,
715 102 : ) -> anyhow::Result<()> {
716 102 : let kind = RequestKind::Delete;
717 102 : let permit = self.permit(kind, cancel).await?;
718 102 : let mut delete_objects = Vec::with_capacity(paths.len());
719 212 : for path in paths {
720 110 : let obj_id = ObjectIdentifier::builder()
721 110 : .set_key(Some(self.relative_path_to_s3_object(path)))
722 110 : .build()
723 110 : .context("convert path to oid")?;
724 110 : delete_objects.push(obj_id);
725 : }
726 :
727 350 : self.delete_oids(&permit, &delete_objects, cancel).await
728 102 : }
729 :
730 90 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
731 90 : let paths = std::array::from_ref(path);
732 294 : self.delete_objects(paths, cancel).await
733 90 : }
734 :
735 6 : async fn time_travel_recover(
736 6 : &self,
737 6 : prefix: Option<&RemotePath>,
738 6 : timestamp: SystemTime,
739 6 : done_if_after: SystemTime,
740 6 : cancel: &CancellationToken,
741 6 : ) -> Result<(), TimeTravelError> {
742 6 : let kind = RequestKind::TimeTravel;
743 6 : let permit = self.permit(kind, cancel).await?;
744 :
745 6 : let timestamp = DateTime::from(timestamp);
746 6 : let done_if_after = DateTime::from(done_if_after);
747 6 :
748 6 : tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
749 :
750 : // get the passed prefix or if it is not set use prefix_in_bucket value
751 6 : let prefix = prefix
752 6 : .map(|p| self.relative_path_to_s3_object(p))
753 6 : .or_else(|| self.prefix_in_bucket.clone());
754 6 :
755 6 : let warn_threshold = 3;
756 6 : let max_retries = 10;
757 6 : let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
758 :
759 6 : let mut key_marker = None;
760 6 : let mut version_id_marker = None;
761 6 : let mut versions_and_deletes = Vec::new();
762 :
763 : loop {
764 6 : let response = backoff::retry(
765 7 : || async {
766 7 : let op = self
767 7 : .client
768 7 : .list_object_versions()
769 7 : .bucket(self.bucket_name.clone())
770 7 : .set_prefix(prefix.clone())
771 7 : .set_key_marker(key_marker.clone())
772 7 : .set_version_id_marker(version_id_marker.clone())
773 7 : .send();
774 7 :
775 7 : tokio::select! {
776 7 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
777 7 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
778 7 : }
779 7 : },
780 6 : is_permanent,
781 6 : warn_threshold,
782 6 : max_retries,
783 6 : "listing object versions for time_travel_recover",
784 6 : cancel,
785 6 : )
786 49 : .await
787 6 : .ok_or_else(|| TimeTravelError::Cancelled)
788 6 : .and_then(|x| x)?;
789 :
790 6 : tracing::trace!(
791 0 : " Got List response version_id_marker={:?}, key_marker={:?}",
792 : response.version_id_marker,
793 : response.key_marker
794 : );
795 6 : let versions = response
796 6 : .versions
797 6 : .unwrap_or_default()
798 6 : .into_iter()
799 6 : .map(VerOrDelete::from_version);
800 6 : let deletes = response
801 6 : .delete_markers
802 6 : .unwrap_or_default()
803 6 : .into_iter()
804 6 : .map(VerOrDelete::from_delete_marker);
805 6 : itertools::process_results(versions.chain(deletes), |n_vds| {
806 6 : versions_and_deletes.extend(n_vds)
807 6 : })
808 6 : .map_err(TimeTravelError::Other)?;
809 12 : fn none_if_empty(v: Option<String>) -> Option<String> {
810 12 : v.filter(|v| !v.is_empty())
811 12 : }
812 6 : version_id_marker = none_if_empty(response.next_version_id_marker);
813 6 : key_marker = none_if_empty(response.next_key_marker);
814 6 : if version_id_marker.is_none() {
815 : // The final response is not supposed to be truncated
816 6 : if response.is_truncated.unwrap_or_default() {
817 0 : return Err(TimeTravelError::Other(anyhow::anyhow!(
818 0 : "Received truncated ListObjectVersions response for prefix={prefix:?}"
819 0 : )));
820 6 : }
821 6 : break;
822 0 : }
823 0 : // Limit the number of versions deletions, mostly so that we don't
824 0 : // keep requesting forever if the list is too long, as we'd put the
825 0 : // list in RAM.
826 0 : // Building a list of 100k entries that reaches the limit roughly takes
827 0 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
828 0 : const COMPLEXITY_LIMIT: usize = 100_000;
829 0 : if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
830 0 : return Err(TimeTravelError::TooManyVersions);
831 0 : }
832 : }
833 :
834 6 : tracing::info!(
835 0 : "Built list for time travel with {} versions and deletions",
836 0 : versions_and_deletes.len()
837 : );
838 :
839 : // Work on the list of references instead of the objects directly,
840 : // otherwise we get lifetime errors in the sort_by_key call below.
841 6 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
842 6 :
843 124 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
844 6 :
845 6 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
846 :
847 42 : for vd in &versions_and_deletes {
848 : let VerOrDelete {
849 36 : version_id, key, ..
850 36 : } = &vd;
851 36 : if version_id == "null" {
852 0 : return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
853 0 : indicating either disabled versioning, or legacy objects with null version id values")));
854 36 : }
855 36 : tracing::trace!(
856 0 : "Parsing version key={key} version_id={version_id} kind={:?}",
857 : vd.kind
858 : );
859 :
860 36 : vds_for_key.entry(key).or_default().push(vd);
861 : }
862 24 : for (key, versions) in vds_for_key {
863 18 : let last_vd = versions.last().unwrap();
864 18 : if last_vd.last_modified > done_if_after {
865 0 : tracing::trace!("Key {key} has version later than done_if_after, skipping");
866 0 : continue;
867 18 : }
868 : // the version we want to restore to.
869 18 : let version_to_restore_to =
870 28 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
871 0 : Ok(v) => v,
872 18 : Err(e) => e,
873 : };
874 18 : if version_to_restore_to == versions.len() {
875 6 : tracing::trace!("Key {key} has no changes since timestamp, skipping");
876 6 : continue;
877 12 : }
878 12 : let mut do_delete = false;
879 12 : if version_to_restore_to == 0 {
880 : // All versions more recent, so the key didn't exist at the specified time point.
881 6 : tracing::trace!(
882 0 : "All {} versions more recent for {key}, deleting",
883 0 : versions.len()
884 : );
885 6 : do_delete = true;
886 : } else {
887 6 : match &versions[version_to_restore_to - 1] {
888 : VerOrDelete {
889 : kind: VerOrDeleteKind::Version,
890 6 : version_id,
891 6 : ..
892 6 : } => {
893 6 : tracing::trace!("Copying old version {version_id} for {key}...");
894 : // Restore the state to the last version by copying
895 6 : let source_id =
896 6 : format!("{}/{key}?versionId={version_id}", self.bucket_name);
897 6 :
898 6 : backoff::retry(
899 6 : || async {
900 6 : let op = self
901 6 : .client
902 6 : .copy_object()
903 6 : .bucket(self.bucket_name.clone())
904 6 : .key(key)
905 6 : .set_storage_class(self.upload_storage_class.clone())
906 6 : .copy_source(&source_id)
907 6 : .send();
908 6 :
909 6 : tokio::select! {
910 6 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
911 6 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
912 6 : }
913 6 : },
914 6 : is_permanent,
915 6 : warn_threshold,
916 6 : max_retries,
917 6 : "copying object version for time_travel_recover",
918 6 : cancel,
919 6 : )
920 12 : .await
921 6 : .ok_or_else(|| TimeTravelError::Cancelled)
922 6 : .and_then(|x| x)?;
923 6 : tracing::info!(%version_id, %key, "Copied old version in S3");
924 : }
925 : VerOrDelete {
926 : kind: VerOrDeleteKind::DeleteMarker,
927 : ..
928 0 : } => {
929 0 : do_delete = true;
930 0 : }
931 : }
932 : };
933 12 : if do_delete {
934 6 : if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
935 : // Key has since been deleted (but there was some history), no need to do anything
936 2 : tracing::trace!("Key {key} already deleted, skipping.");
937 : } else {
938 4 : tracing::trace!("Deleting {key}...");
939 :
940 4 : let oid = ObjectIdentifier::builder()
941 4 : .key(key.to_owned())
942 4 : .build()
943 4 : .map_err(|e| TimeTravelError::Other(e.into()))?;
944 :
945 4 : self.delete_oids(&permit, &[oid], cancel)
946 8 : .await
947 4 : .map_err(|e| {
948 0 : // delete_oid0 will use TimeoutOrCancel
949 0 : if TimeoutOrCancel::caused_by_cancel(&e) {
950 0 : TimeTravelError::Cancelled
951 : } else {
952 0 : TimeTravelError::Other(e)
953 : }
954 4 : })?;
955 : }
956 6 : }
957 : }
958 6 : Ok(())
959 6 : }
960 : }
961 :
962 : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
963 : struct VerOrDelete {
964 : kind: VerOrDeleteKind,
965 : last_modified: DateTime,
966 : version_id: String,
967 : key: String,
968 : }
969 :
970 : #[derive(Debug)]
971 : enum VerOrDeleteKind {
972 : Version,
973 : DeleteMarker,
974 : }
975 :
976 : impl VerOrDelete {
977 36 : fn with_kind(
978 36 : kind: VerOrDeleteKind,
979 36 : last_modified: Option<DateTime>,
980 36 : version_id: Option<String>,
981 36 : key: Option<String>,
982 36 : ) -> anyhow::Result<Self> {
983 36 : let lvk = (last_modified, version_id, key);
984 36 : let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
985 0 : anyhow::bail!(
986 0 : "One (or more) of last_modified, key, and id is None. \
987 0 : Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
988 0 : lvk.0,
989 0 : lvk.1,
990 0 : lvk.2,
991 0 : );
992 : };
993 36 : Ok(Self {
994 36 : kind,
995 36 : last_modified,
996 36 : version_id,
997 36 : key,
998 36 : })
999 36 : }
1000 28 : fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
1001 28 : Self::with_kind(
1002 28 : VerOrDeleteKind::Version,
1003 28 : v.last_modified,
1004 28 : v.version_id,
1005 28 : v.key,
1006 28 : )
1007 28 : }
1008 8 : fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
1009 8 : Self::with_kind(
1010 8 : VerOrDeleteKind::DeleteMarker,
1011 8 : v.last_modified,
1012 8 : v.version_id,
1013 8 : v.key,
1014 8 : )
1015 8 : }
1016 : }
1017 :
1018 : #[cfg(test)]
1019 : mod tests {
1020 : use camino::Utf8Path;
1021 : use std::num::NonZeroUsize;
1022 :
1023 : use crate::{RemotePath, S3Bucket, S3Config};
1024 :
1025 : #[tokio::test]
1026 4 : async fn relative_path() {
1027 4 : let all_paths = ["", "some/path", "some/path/"];
1028 4 : let all_paths: Vec<RemotePath> = all_paths
1029 4 : .iter()
1030 12 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
1031 4 : .collect();
1032 4 : let prefixes = [
1033 4 : None,
1034 4 : Some(""),
1035 4 : Some("test/prefix"),
1036 4 : Some("test/prefix/"),
1037 4 : Some("/test/prefix/"),
1038 4 : ];
1039 4 : let expected_outputs = [
1040 4 : vec!["", "some/path", "some/path/"],
1041 4 : vec!["/", "/some/path", "/some/path/"],
1042 4 : vec![
1043 4 : "test/prefix/",
1044 4 : "test/prefix/some/path",
1045 4 : "test/prefix/some/path/",
1046 4 : ],
1047 4 : vec![
1048 4 : "test/prefix/",
1049 4 : "test/prefix/some/path",
1050 4 : "test/prefix/some/path/",
1051 4 : ],
1052 4 : vec![
1053 4 : "test/prefix/",
1054 4 : "test/prefix/some/path",
1055 4 : "test/prefix/some/path/",
1056 4 : ],
1057 4 : ];
1058 4 :
1059 20 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
1060 20 : let config = S3Config {
1061 20 : bucket_name: "bucket".to_owned(),
1062 20 : bucket_region: "region".to_owned(),
1063 20 : prefix_in_bucket: prefix.map(str::to_string),
1064 20 : endpoint: None,
1065 20 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
1066 20 : max_keys_per_list_response: Some(5),
1067 20 : upload_storage_class: None,
1068 20 : };
1069 20 : let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
1070 4 : .await
1071 20 : .expect("remote storage init");
1072 60 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
1073 60 : let result = storage.relative_path_to_s3_object(test_path);
1074 60 : let expected = expected_outputs[prefix_idx][test_path_idx];
1075 60 : assert_eq!(result, expected);
1076 4 : }
1077 4 : }
1078 4 : }
1079 : }
|