Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::{
8 : borrow::Cow,
9 : collections::HashMap,
10 : pin::Pin,
11 : sync::Arc,
12 : task::{Context, Poll},
13 : time::SystemTime,
14 : };
15 :
16 : use anyhow::{anyhow, Context as _};
17 : use aws_config::{
18 : environment::credentials::EnvironmentVariableCredentialsProvider,
19 : imds::credentials::ImdsCredentialsProvider,
20 : meta::credentials::CredentialsProviderChain,
21 : profile::ProfileFileCredentialsProvider,
22 : provider_config::ProviderConfig,
23 : retry::{RetryConfigBuilder, RetryMode},
24 : web_identity_token::WebIdentityTokenCredentialsProvider,
25 : BehaviorVersion,
26 : };
27 : use aws_credential_types::provider::SharedCredentialsProvider;
28 : use aws_sdk_s3::{
29 : config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep},
30 : error::SdkError,
31 : operation::get_object::GetObjectError,
32 : types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion},
33 : Client,
34 : };
35 : use aws_smithy_async::rt::sleep::TokioSleep;
36 :
37 : use aws_smithy_types::byte_stream::ByteStream;
38 : use aws_smithy_types::{body::SdkBody, DateTime};
39 : use bytes::Bytes;
40 : use futures::stream::Stream;
41 : use hyper::Body;
42 : use scopeguard::ScopeGuard;
43 : use tokio_util::sync::CancellationToken;
44 : use utils::backoff;
45 :
46 : use super::StorageMetadata;
47 : use crate::{
48 : ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
49 : S3Config, TimeTravelError, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
50 : };
51 :
52 : pub(super) mod metrics;
53 :
54 : use self::metrics::AttemptOutcome;
55 : pub(super) use self::metrics::RequestKind;
56 :
57 : /// AWS S3 storage.
58 : pub struct S3Bucket {
59 : client: Client,
60 : bucket_name: String,
61 : prefix_in_bucket: Option<String>,
62 : max_keys_per_list_response: Option<i32>,
63 : concurrency_limiter: ConcurrencyLimiter,
64 : }
65 :
66 0 : #[derive(Default)]
67 : struct GetObjectRequest {
68 : bucket: String,
69 : key: String,
70 : range: Option<String>,
71 : }
72 : impl S3Bucket {
73 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
74 275 : pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
75 275 : tracing::debug!(
76 0 : "Creating s3 remote storage for S3 bucket {}",
77 0 : aws_config.bucket_name
78 0 : );
79 :
80 275 : let region = Some(Region::new(aws_config.bucket_region.clone()));
81 275 :
82 275 : let provider_conf = ProviderConfig::without_region().with_region(region.clone());
83 275 :
84 275 : let credentials_provider = {
85 275 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
86 275 : CredentialsProviderChain::first_try(
87 275 : "env",
88 275 : EnvironmentVariableCredentialsProvider::new(),
89 275 : )
90 275 : // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
91 275 : .or_else(
92 275 : "profile-sso",
93 275 : ProfileFileCredentialsProvider::builder()
94 275 : .configure(&provider_conf)
95 275 : .build(),
96 275 : )
97 275 : // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
98 275 : // needed to access remote extensions bucket
99 275 : .or_else(
100 275 : "token",
101 275 : WebIdentityTokenCredentialsProvider::builder()
102 275 : .configure(&provider_conf)
103 275 : .build(),
104 275 : )
105 275 : // uses imds v2
106 275 : .or_else("imds", ImdsCredentialsProvider::builder().build())
107 275 : };
108 275 :
109 275 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
110 275 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
111 275 :
112 275 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
113 275 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
114 275 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
115 275 : let mut retry_config = RetryConfigBuilder::new();
116 275 : retry_config
117 275 : .set_max_attempts(Some(1))
118 275 : .set_mode(Some(RetryMode::Adaptive));
119 275 :
120 275 : let mut config_builder = Builder::default()
121 275 : .behavior_version(BehaviorVersion::v2023_11_09())
122 275 : .region(region)
123 275 : .identity_cache(IdentityCache::lazy().build())
124 275 : .credentials_provider(SharedCredentialsProvider::new(credentials_provider))
125 275 : .retry_config(retry_config.build())
126 275 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
127 :
128 275 : if let Some(custom_endpoint) = aws_config.endpoint.clone() {
129 143 : config_builder = config_builder
130 143 : .endpoint_url(custom_endpoint)
131 143 : .force_path_style(true);
132 143 : }
133 :
134 275 : let client = Client::from_conf(config_builder.build());
135 275 :
136 275 : let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
137 273 : let mut prefix = prefix;
138 275 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
139 2 : prefix = &prefix[1..]
140 : }
141 :
142 273 : let mut prefix = prefix.to_string();
143 291 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
144 18 : prefix.pop();
145 18 : }
146 273 : prefix
147 275 : });
148 275 : Ok(Self {
149 275 : client,
150 275 : bucket_name: aws_config.bucket_name.clone(),
151 275 : max_keys_per_list_response: aws_config.max_keys_per_list_response,
152 275 : prefix_in_bucket,
153 275 : concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
154 275 : })
155 275 : }
156 :
157 2702 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
158 2702 : let relative_path =
159 2702 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
160 2702 : Some(stripped) => stripped,
161 : // we rely on AWS to return properly prefixed paths
162 : // for requests with a certain prefix
163 0 : None => panic!(
164 0 : "Key {} does not start with bucket prefix {:?}",
165 0 : key, self.prefix_in_bucket
166 0 : ),
167 : };
168 2702 : RemotePath(
169 2702 : relative_path
170 2702 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
171 2702 : .collect(),
172 2702 : )
173 2702 : }
174 :
175 34314 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
176 34314 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
177 34314 : let path_string = path
178 34314 : .get_path()
179 34314 : .as_str()
180 34314 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
181 34314 : match &self.prefix_in_bucket {
182 34308 : Some(prefix) => prefix.clone() + "/" + path_string,
183 6 : None => path_string.to_string(),
184 : }
185 34314 : }
186 :
187 19049 : async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
188 18810 : let started_at = start_counting_cancelled_wait(kind);
189 18810 : let permit = self
190 18810 : .concurrency_limiter
191 18810 : .acquire(kind)
192 4209 : .await
193 18810 : .expect("semaphore is never closed");
194 18810 :
195 18810 : let started_at = ScopeGuard::into_inner(started_at);
196 18810 : metrics::BUCKET_METRICS
197 18810 : .wait_seconds
198 18810 : .observe_elapsed(kind, started_at);
199 18810 :
200 18810 : permit
201 18810 : }
202 :
203 10764 : async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
204 10744 : let started_at = start_counting_cancelled_wait(kind);
205 10744 : let permit = self
206 10744 : .concurrency_limiter
207 10744 : .acquire_owned(kind)
208 1 : .await
209 10744 : .expect("semaphore is never closed");
210 10744 :
211 10744 : let started_at = ScopeGuard::into_inner(started_at);
212 10744 : metrics::BUCKET_METRICS
213 10744 : .wait_seconds
214 10744 : .observe_elapsed(kind, started_at);
215 10744 : permit
216 10744 : }
217 :
218 10764 : async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
219 10744 : let kind = RequestKind::Get;
220 10744 : let permit = self.owned_permit(kind).await;
221 :
222 10744 : let started_at = start_measuring_requests(kind);
223 :
224 10744 : let get_object = self
225 10744 : .client
226 10744 : .get_object()
227 10744 : .bucket(request.bucket)
228 10744 : .key(request.key)
229 10744 : .set_range(request.range)
230 10744 : .send()
231 31575 : .await;
232 :
233 10739 : let started_at = ScopeGuard::into_inner(started_at);
234 :
235 199 : match get_object {
236 10540 : Ok(object_output) => {
237 10540 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
238 10540 : let etag = object_output.e_tag.clone();
239 10540 : let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok());
240 10540 :
241 10540 : let body = object_output.body;
242 10540 : let body = ByteStreamAsStream::from(body);
243 10540 : let body = PermitCarrying::new(permit, body);
244 10540 : let body = TimedDownload::new(started_at, body);
245 10540 :
246 10540 : Ok(Download {
247 10540 : metadata,
248 10540 : etag,
249 10540 : last_modified,
250 10540 : download_stream: Box::pin(body),
251 10540 : })
252 : }
253 199 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
254 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
255 : // an error: we expect to sometimes fetch an object and find it missing,
256 : // e.g. when probing for timeline indices.
257 199 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
258 199 : kind,
259 199 : AttemptOutcome::Ok,
260 199 : started_at,
261 199 : );
262 199 : Err(DownloadError::NotFound)
263 : }
264 0 : Err(e) => {
265 0 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
266 0 : kind,
267 0 : AttemptOutcome::Err,
268 0 : started_at,
269 0 : );
270 0 :
271 0 : Err(DownloadError::Other(
272 0 : anyhow::Error::new(e).context("download s3 object"),
273 0 : ))
274 : }
275 : }
276 10739 : }
277 :
278 2343 : async fn delete_oids(
279 2343 : &self,
280 2343 : kind: RequestKind,
281 2343 : delete_objects: &[ObjectIdentifier],
282 2343 : ) -> anyhow::Result<()> {
283 2241 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
284 2239 : let started_at = start_measuring_requests(kind);
285 :
286 2239 : let resp = self
287 2239 : .client
288 2239 : .delete_objects()
289 2239 : .bucket(self.bucket_name.clone())
290 2239 : .delete(
291 2239 : Delete::builder()
292 2239 : .set_objects(Some(chunk.to_vec()))
293 2239 : .build()?,
294 : )
295 2239 : .send()
296 9278 : .await;
297 :
298 2239 : let started_at = ScopeGuard::into_inner(started_at);
299 2239 : metrics::BUCKET_METRICS
300 2239 : .req_seconds
301 2239 : .observe_elapsed(kind, &resp, started_at);
302 :
303 2239 : let resp = resp?;
304 2239 : metrics::BUCKET_METRICS
305 2239 : .deleted_objects_total
306 2239 : .inc_by(chunk.len() as u64);
307 2239 : if let Some(errors) = resp.errors {
308 : // Log a bounded number of the errors within the response:
309 : // these requests can carry 1000 keys so logging each one
310 : // would be too verbose, especially as errors may lead us
311 : // to retry repeatedly.
312 : const LOG_UP_TO_N_ERRORS: usize = 10;
313 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
314 0 : tracing::warn!(
315 0 : "DeleteObjects key {} failed: {}: {}",
316 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
317 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
318 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
319 0 : );
320 : }
321 :
322 0 : return Err(anyhow::format_err!(
323 0 : "Failed to delete {} objects",
324 0 : errors.len()
325 0 : ));
326 2239 : }
327 : }
328 2241 : Ok(())
329 2241 : }
330 : }
331 :
332 : pin_project_lite::pin_project! {
333 : struct ByteStreamAsStream {
334 : #[pin]
335 : inner: aws_smithy_types::byte_stream::ByteStream
336 : }
337 : }
338 :
339 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
340 10560 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
341 10560 : ByteStreamAsStream { inner }
342 10560 : }
343 : }
344 :
345 : impl Stream for ByteStreamAsStream {
346 : type Item = std::io::Result<Bytes>;
347 :
348 127207 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
349 127207 : // this does the std::io::ErrorKind::Other conversion
350 127207 : self.project().inner.poll_next(cx).map_err(|x| x.into())
351 127207 : }
352 :
353 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
354 : // sense and Stream::size_hint does not really
355 : }
356 :
357 : pin_project_lite::pin_project! {
358 : /// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
359 : struct PermitCarrying<S> {
360 : permit: tokio::sync::OwnedSemaphorePermit,
361 : #[pin]
362 : inner: S,
363 : }
364 : }
365 :
366 : impl<S> PermitCarrying<S> {
367 10540 : fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
368 10540 : Self { permit, inner }
369 10540 : }
370 : }
371 :
372 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for PermitCarrying<S> {
373 : type Item = <S as Stream>::Item;
374 :
375 127161 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
376 127161 : self.project().inner.poll_next(cx)
377 127161 : }
378 :
379 0 : fn size_hint(&self) -> (usize, Option<usize>) {
380 0 : self.inner.size_hint()
381 0 : }
382 : }
383 :
384 : pin_project_lite::pin_project! {
385 : /// Times and tracks the outcome of the request.
386 : struct TimedDownload<S> {
387 : started_at: std::time::Instant,
388 : outcome: metrics::AttemptOutcome,
389 : #[pin]
390 : inner: S
391 : }
392 :
393 : impl<S> PinnedDrop for TimedDownload<S> {
394 : fn drop(mut this: Pin<&mut Self>) {
395 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
396 : }
397 : }
398 : }
399 :
400 : impl<S> TimedDownload<S> {
401 10540 : fn new(started_at: std::time::Instant, inner: S) -> Self {
402 10540 : TimedDownload {
403 10540 : started_at,
404 10540 : outcome: metrics::AttemptOutcome::Cancelled,
405 10540 : inner,
406 10540 : }
407 10540 : }
408 : }
409 :
410 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
411 : type Item = <S as Stream>::Item;
412 :
413 127161 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
414 127161 : use std::task::ready;
415 127161 :
416 127161 : let this = self.project();
417 :
418 127161 : let res = ready!(this.inner.poll_next(cx));
419 61596 : match &res {
420 61596 : Some(Ok(_)) => {}
421 0 : Some(Err(_)) => *this.outcome = metrics::AttemptOutcome::Err,
422 24206 : None => *this.outcome = metrics::AttemptOutcome::Ok,
423 : }
424 :
425 85802 : Poll::Ready(res)
426 127161 : }
427 :
428 0 : fn size_hint(&self) -> (usize, Option<usize>) {
429 0 : self.inner.size_hint()
430 0 : }
431 : }
432 :
433 : impl RemoteStorage for S3Bucket {
434 534 : async fn list(
435 534 : &self,
436 534 : prefix: Option<&RemotePath>,
437 534 : mode: ListingMode,
438 534 : ) -> Result<Listing, DownloadError> {
439 512 : let kind = RequestKind::List;
440 512 : let mut result = Listing::default();
441 512 :
442 512 : // get the passed prefix or if it is not set use prefix_in_bucket value
443 512 : let list_prefix = prefix
444 512 : .map(|p| self.relative_path_to_s3_object(p))
445 512 : .or_else(|| self.prefix_in_bucket.clone())
446 512 : .map(|mut p| {
447 : // required to end with a separator
448 : // otherwise request will return only the entry of a prefix
449 512 : if matches!(mode, ListingMode::WithDelimiter)
450 268 : && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
451 268 : {
452 268 : p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
453 268 : }
454 512 : p
455 512 : });
456 512 :
457 512 : let mut continuation_token = None;
458 :
459 : loop {
460 512 : let _guard = self.permit(kind).await;
461 512 : let started_at = start_measuring_requests(kind);
462 512 :
463 512 : let mut request = self
464 512 : .client
465 512 : .list_objects_v2()
466 512 : .bucket(self.bucket_name.clone())
467 512 : .set_prefix(list_prefix.clone())
468 512 : .set_continuation_token(continuation_token)
469 512 : .set_max_keys(self.max_keys_per_list_response);
470 512 :
471 512 : if let ListingMode::WithDelimiter = mode {
472 268 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
473 268 : }
474 :
475 512 : let response = request
476 512 : .send()
477 2643 : .await
478 512 : .context("Failed to list S3 prefixes")
479 512 : .map_err(DownloadError::Other);
480 512 :
481 512 : let started_at = ScopeGuard::into_inner(started_at);
482 512 :
483 512 : metrics::BUCKET_METRICS
484 512 : .req_seconds
485 512 : .observe_elapsed(kind, &response, started_at);
486 :
487 512 : let response = response?;
488 :
489 512 : let keys = response.contents();
490 512 : let empty = Vec::new();
491 512 : let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
492 :
493 0 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
494 :
495 2930 : for object in keys {
496 2418 : let object_path = object.key().expect("response does not contain a key");
497 2418 : let remote_path = self.s3_object_to_relative_path(object_path);
498 2418 : result.keys.push(remote_path);
499 2418 : }
500 :
501 512 : result.prefixes.extend(
502 512 : prefixes
503 512 : .iter()
504 512 : .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
505 512 : );
506 :
507 512 : continuation_token = match response.next_continuation_token {
508 0 : Some(new_token) => Some(new_token),
509 512 : None => break,
510 512 : };
511 512 : }
512 512 :
513 512 : Ok(result)
514 512 : }
515 :
516 16044 : async fn upload(
517 16044 : &self,
518 16044 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
519 16044 : from_size_bytes: usize,
520 16044 : to: &RemotePath,
521 16044 : metadata: Option<StorageMetadata>,
522 16044 : ) -> anyhow::Result<()> {
523 16044 : let kind = RequestKind::Put;
524 16044 : let _guard = self.permit(kind).await;
525 :
526 16044 : let started_at = start_measuring_requests(kind);
527 16044 :
528 16044 : let body = Body::wrap_stream(from);
529 16044 : let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
530 :
531 16044 : let res = self
532 16044 : .client
533 16044 : .put_object()
534 16044 : .bucket(self.bucket_name.clone())
535 16044 : .key(self.relative_path_to_s3_object(to))
536 16044 : .set_metadata(metadata.map(|m| m.0))
537 16044 : .content_length(from_size_bytes.try_into()?)
538 16044 : .body(bytes_stream)
539 16044 : .send()
540 51926 : .await;
541 :
542 16043 : let started_at = ScopeGuard::into_inner(started_at);
543 16043 : metrics::BUCKET_METRICS
544 16043 : .req_seconds
545 16043 : .observe_elapsed(kind, &res, started_at);
546 16043 :
547 16043 : res?;
548 :
549 16043 : Ok(())
550 16043 : }
551 :
552 14 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
553 12 : let kind = RequestKind::Copy;
554 12 : let _guard = self.permit(kind).await;
555 :
556 12 : let started_at = start_measuring_requests(kind);
557 12 :
558 12 : // we need to specify bucket_name as a prefix
559 12 : let copy_source = format!(
560 12 : "{}/{}",
561 12 : self.bucket_name,
562 12 : self.relative_path_to_s3_object(from)
563 12 : );
564 :
565 12 : let res = self
566 12 : .client
567 12 : .copy_object()
568 12 : .bucket(self.bucket_name.clone())
569 12 : .key(self.relative_path_to_s3_object(to))
570 12 : .copy_source(copy_source)
571 12 : .send()
572 48 : .await;
573 :
574 12 : let started_at = ScopeGuard::into_inner(started_at);
575 12 : metrics::BUCKET_METRICS
576 12 : .req_seconds
577 12 : .observe_elapsed(kind, &res, started_at);
578 12 :
579 12 : res?;
580 :
581 12 : Ok(())
582 12 : }
583 :
584 10718 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
585 10708 : // if prefix is not none then download file `prefix/from`
586 10708 : // if prefix is none then download file `from`
587 10708 : self.download_object(GetObjectRequest {
588 10708 : bucket: self.bucket_name.clone(),
589 10708 : key: self.relative_path_to_s3_object(from),
590 10708 : range: None,
591 10708 : })
592 31469 : .await
593 10703 : }
594 :
595 46 : async fn download_byte_range(
596 46 : &self,
597 46 : from: &RemotePath,
598 46 : start_inclusive: u64,
599 46 : end_exclusive: Option<u64>,
600 46 : ) -> Result<Download, DownloadError> {
601 36 : // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
602 36 : // and needs both ends to be exclusive
603 36 : let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
604 36 : let range = Some(match end_inclusive {
605 0 : Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
606 36 : None => format!("bytes={start_inclusive}-"),
607 : });
608 :
609 36 : self.download_object(GetObjectRequest {
610 36 : bucket: self.bucket_name.clone(),
611 36 : key: self.relative_path_to_s3_object(from),
612 36 : range,
613 36 : })
614 107 : .await
615 36 : }
616 2339 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
617 2241 : let kind = RequestKind::Delete;
618 2241 : let _guard = self.permit(kind).await;
619 :
620 2241 : let mut delete_objects = Vec::with_capacity(paths.len());
621 8963 : for path in paths {
622 6722 : let obj_id = ObjectIdentifier::builder()
623 6722 : .set_key(Some(self.relative_path_to_s3_object(path)))
624 6722 : .build()?;
625 6722 : delete_objects.push(obj_id);
626 : }
627 :
628 9278 : self.delete_oids(kind, &delete_objects).await
629 2241 : }
630 :
631 2106 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
632 2016 : let paths = std::array::from_ref(path);
633 8257 : self.delete_objects(paths).await
634 2016 : }
635 :
636 7 : async fn time_travel_recover(
637 7 : &self,
638 7 : prefix: Option<&RemotePath>,
639 7 : timestamp: SystemTime,
640 7 : done_if_after: SystemTime,
641 7 : cancel: &CancellationToken,
642 7 : ) -> Result<(), TimeTravelError> {
643 1 : let kind = RequestKind::TimeTravel;
644 1 : let _guard = self.permit(kind).await;
645 :
646 1 : let timestamp = DateTime::from(timestamp);
647 1 : let done_if_after = DateTime::from(done_if_after);
648 :
649 0 : tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
650 :
651 : // get the passed prefix or if it is not set use prefix_in_bucket value
652 1 : let prefix = prefix
653 1 : .map(|p| self.relative_path_to_s3_object(p))
654 1 : .or_else(|| self.prefix_in_bucket.clone());
655 1 :
656 1 : let warn_threshold = 3;
657 1 : let max_retries = 10;
658 1 : let is_permanent = |_e: &_| false;
659 :
660 1 : let mut key_marker = None;
661 1 : let mut version_id_marker = None;
662 1 : let mut versions_and_deletes = Vec::new();
663 :
664 : loop {
665 1 : let response = backoff::retry(
666 1 : || async {
667 1 : self.client
668 1 : .list_object_versions()
669 1 : .bucket(self.bucket_name.clone())
670 1 : .set_prefix(prefix.clone())
671 1 : .set_key_marker(key_marker.clone())
672 1 : .set_version_id_marker(version_id_marker.clone())
673 1 : .send()
674 40 : .await
675 1 : .map_err(|e| TimeTravelError::Other(e.into()))
676 1 : },
677 1 : is_permanent,
678 1 : warn_threshold,
679 1 : max_retries,
680 1 : "listing object versions for time_travel_recover",
681 1 : cancel,
682 1 : )
683 40 : .await
684 1 : .ok_or_else(|| TimeTravelError::Cancelled)
685 1 : .and_then(|x| x)?;
686 :
687 0 : tracing::trace!(
688 0 : " Got List response version_id_marker={:?}, key_marker={:?}",
689 0 : response.version_id_marker,
690 0 : response.key_marker
691 0 : );
692 1 : let versions = response
693 1 : .versions
694 1 : .unwrap_or_default()
695 1 : .into_iter()
696 1 : .map(VerOrDelete::from_version);
697 1 : let deletes = response
698 1 : .delete_markers
699 1 : .unwrap_or_default()
700 1 : .into_iter()
701 1 : .map(VerOrDelete::from_delete_marker);
702 1 : itertools::process_results(versions.chain(deletes), |n_vds| {
703 1 : versions_and_deletes.extend(n_vds)
704 1 : })
705 1 : .map_err(TimeTravelError::Other)?;
706 14 : fn none_if_empty(v: Option<String>) -> Option<String> {
707 14 : v.filter(|v| !v.is_empty())
708 14 : }
709 1 : version_id_marker = none_if_empty(response.next_version_id_marker);
710 1 : key_marker = none_if_empty(response.next_key_marker);
711 1 : if version_id_marker.is_none() {
712 : // The final response is not supposed to be truncated
713 1 : if response.is_truncated.unwrap_or_default() {
714 0 : return Err(TimeTravelError::Other(anyhow::anyhow!(
715 0 : "Received truncated ListObjectVersions response for prefix={prefix:?}"
716 0 : )));
717 1 : }
718 1 : break;
719 0 : }
720 0 : // Limit the number of versions deletions, mostly so that we don't
721 0 : // keep requesting forever if the list is too long, as we'd put the
722 0 : // list in RAM.
723 0 : // Building a list of 100k entries that reaches the limit roughly takes
724 0 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
725 0 : const COMPLEXITY_LIMIT: usize = 100_000;
726 0 : if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
727 0 : return Err(TimeTravelError::TooManyVersions);
728 0 : }
729 : }
730 :
731 1 : tracing::info!(
732 1 : "Built list for time travel with {} versions and deletions",
733 1 : versions_and_deletes.len()
734 1 : );
735 :
736 : // Work on the list of references instead of the objects directly,
737 : // otherwise we get lifetime errors in the sort_by_key call below.
738 1 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
739 1 :
740 2276 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
741 1 :
742 1 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
743 :
744 268 : for vd in &versions_and_deletes {
745 : let VerOrDelete {
746 267 : version_id, key, ..
747 267 : } = &vd;
748 267 : if version_id == "null" {
749 0 : return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
750 0 : indicating either disabled versioning, or legacy objects with null version id values")));
751 267 : }
752 0 : tracing::trace!(
753 0 : "Parsing version key={key} version_id={version_id} kind={:?}",
754 0 : vd.kind
755 0 : );
756 :
757 267 : vds_for_key.entry(key).or_default().push(vd);
758 : }
759 93 : for (key, versions) in vds_for_key {
760 92 : let last_vd = versions.last().unwrap();
761 92 : if last_vd.last_modified > done_if_after {
762 0 : tracing::trace!("Key {key} has version later than done_if_after, skipping");
763 0 : continue;
764 92 : }
765 : // the version we want to restore to.
766 92 : let version_to_restore_to =
767 188 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
768 0 : Ok(v) => v,
769 92 : Err(e) => e,
770 : };
771 92 : if version_to_restore_to == versions.len() {
772 0 : tracing::trace!("Key {key} has no changes since timestamp, skipping");
773 0 : continue;
774 92 : }
775 92 : let mut do_delete = false;
776 92 : if version_to_restore_to == 0 {
777 : // All versions more recent, so the key didn't exist at the specified time point.
778 0 : tracing::trace!(
779 0 : "All {} versions more recent for {key}, deleting",
780 0 : versions.len()
781 0 : );
782 3 : do_delete = true;
783 : } else {
784 89 : match &versions[version_to_restore_to - 1] {
785 : VerOrDelete {
786 : kind: VerOrDeleteKind::Version,
787 89 : version_id,
788 : ..
789 : } => {
790 0 : tracing::trace!("Copying old version {version_id} for {key}...");
791 : // Restore the state to the last version by copying
792 89 : let source_id =
793 89 : format!("{}/{key}?versionId={version_id}", self.bucket_name);
794 89 :
795 89 : backoff::retry(
796 89 : || async {
797 89 : self.client
798 89 : .copy_object()
799 89 : .bucket(self.bucket_name.clone())
800 89 : .key(key)
801 89 : .copy_source(&source_id)
802 89 : .send()
803 91 : .await
804 89 : .map_err(|e| TimeTravelError::Other(e.into()))
805 89 : },
806 89 : is_permanent,
807 89 : warn_threshold,
808 89 : max_retries,
809 89 : "copying object version for time_travel_recover",
810 89 : cancel,
811 89 : )
812 91 : .await
813 89 : .ok_or_else(|| TimeTravelError::Cancelled)
814 89 : .and_then(|x| x)?;
815 89 : tracing::info!(%version_id, %key, "Copied old version in S3");
816 : }
817 : VerOrDelete {
818 : kind: VerOrDeleteKind::DeleteMarker,
819 : ..
820 0 : } => {
821 0 : do_delete = true;
822 0 : }
823 : }
824 : };
825 92 : if do_delete {
826 3 : if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
827 : // Key has since been deleted (but there was some history), no need to do anything
828 0 : tracing::trace!("Key {key} already deleted, skipping.");
829 : } else {
830 0 : tracing::trace!("Deleting {key}...");
831 :
832 0 : let oid = ObjectIdentifier::builder()
833 0 : .key(key.to_owned())
834 0 : .build()
835 0 : .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
836 0 : self.delete_oids(kind, &[oid])
837 0 : .await
838 0 : .map_err(TimeTravelError::Other)?;
839 : }
840 89 : }
841 : }
842 1 : Ok(())
843 1 : }
844 : }
845 :
846 : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
847 29813 : fn start_counting_cancelled_wait(
848 29813 : kind: RequestKind,
849 29813 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
850 29813 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
851 0 : metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
852 29813 : })
853 29813 : }
854 :
855 : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
856 29808 : fn start_measuring_requests(
857 29808 : kind: RequestKind,
858 29808 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
859 29808 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
860 5 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
861 5 : kind,
862 5 : AttemptOutcome::Cancelled,
863 5 : started_at,
864 5 : )
865 29808 : })
866 29808 : }
867 :
868 : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
869 : struct VerOrDelete {
870 : kind: VerOrDeleteKind,
871 : last_modified: DateTime,
872 : version_id: String,
873 : key: String,
874 : }
875 :
876 0 : #[derive(Debug)]
877 : enum VerOrDeleteKind {
878 : Version,
879 : DeleteMarker,
880 : }
881 :
882 : impl VerOrDelete {
883 303 : fn with_kind(
884 303 : kind: VerOrDeleteKind,
885 303 : last_modified: Option<DateTime>,
886 303 : version_id: Option<String>,
887 303 : key: Option<String>,
888 303 : ) -> anyhow::Result<Self> {
889 303 : let lvk = (last_modified, version_id, key);
890 303 : let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
891 0 : anyhow::bail!(
892 0 : "One (or more) of last_modified, key, and id is None. \
893 0 : Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
894 0 : lvk.0,
895 0 : lvk.1,
896 0 : lvk.2,
897 0 : );
898 : };
899 303 : Ok(Self {
900 303 : kind,
901 303 : last_modified,
902 303 : version_id,
903 303 : key,
904 303 : })
905 303 : }
906 163 : fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
907 163 : Self::with_kind(
908 163 : VerOrDeleteKind::Version,
909 163 : v.last_modified,
910 163 : v.version_id,
911 163 : v.key,
912 163 : )
913 163 : }
914 140 : fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
915 140 : Self::with_kind(
916 140 : VerOrDeleteKind::DeleteMarker,
917 140 : v.last_modified,
918 140 : v.version_id,
919 140 : v.key,
920 140 : )
921 140 : }
922 : }
923 :
924 : #[cfg(test)]
925 : mod tests {
926 : use camino::Utf8Path;
927 : use std::num::NonZeroUsize;
928 :
929 : use crate::{RemotePath, S3Bucket, S3Config};
930 :
931 2 : #[test]
932 2 : fn relative_path() {
933 2 : let all_paths = ["", "some/path", "some/path/"];
934 2 : let all_paths: Vec<RemotePath> = all_paths
935 2 : .iter()
936 6 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
937 2 : .collect();
938 2 : let prefixes = [
939 2 : None,
940 2 : Some(""),
941 2 : Some("test/prefix"),
942 2 : Some("test/prefix/"),
943 2 : Some("/test/prefix/"),
944 2 : ];
945 2 : let expected_outputs = vec![
946 2 : vec!["", "some/path", "some/path"],
947 2 : vec!["/", "/some/path", "/some/path"],
948 2 : vec![
949 2 : "test/prefix/",
950 2 : "test/prefix/some/path",
951 2 : "test/prefix/some/path",
952 2 : ],
953 2 : vec![
954 2 : "test/prefix/",
955 2 : "test/prefix/some/path",
956 2 : "test/prefix/some/path",
957 2 : ],
958 2 : vec![
959 2 : "test/prefix/",
960 2 : "test/prefix/some/path",
961 2 : "test/prefix/some/path",
962 2 : ],
963 2 : ];
964 :
965 10 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
966 10 : let config = S3Config {
967 10 : bucket_name: "bucket".to_owned(),
968 10 : bucket_region: "region".to_owned(),
969 10 : prefix_in_bucket: prefix.map(str::to_string),
970 10 : endpoint: None,
971 10 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
972 10 : max_keys_per_list_response: Some(5),
973 10 : };
974 10 : let storage = S3Bucket::new(&config).expect("remote storage init");
975 30 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
976 30 : let result = storage.relative_path_to_s3_object(test_path);
977 30 : let expected = expected_outputs[prefix_idx][test_path_idx];
978 30 : assert_eq!(result, expected);
979 : }
980 : }
981 2 : }
982 : }
|