Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::{
8 : borrow::Cow,
9 : collections::HashMap,
10 : num::NonZeroU32,
11 : pin::Pin,
12 : sync::Arc,
13 : task::{Context, Poll},
14 : time::SystemTime,
15 : };
16 :
17 : use anyhow::{anyhow, Context as _};
18 : use aws_config::{
19 : environment::credentials::EnvironmentVariableCredentialsProvider,
20 : imds::credentials::ImdsCredentialsProvider,
21 : meta::credentials::CredentialsProviderChain,
22 : profile::ProfileFileCredentialsProvider,
23 : provider_config::ProviderConfig,
24 : retry::{RetryConfigBuilder, RetryMode},
25 : web_identity_token::WebIdentityTokenCredentialsProvider,
26 : BehaviorVersion,
27 : };
28 : use aws_credential_types::provider::SharedCredentialsProvider;
29 : use aws_sdk_s3::{
30 : config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep},
31 : error::SdkError,
32 : operation::get_object::GetObjectError,
33 : types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion},
34 : Client,
35 : };
36 : use aws_smithy_async::rt::sleep::TokioSleep;
37 :
38 : use aws_smithy_types::byte_stream::ByteStream;
39 : use aws_smithy_types::{body::SdkBody, DateTime};
40 : use bytes::Bytes;
41 : use futures::stream::Stream;
42 : use hyper::Body;
43 : use scopeguard::ScopeGuard;
44 : use tokio_util::sync::CancellationToken;
45 : use utils::backoff;
46 :
47 : use super::StorageMetadata;
48 : use crate::{
49 : support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode,
50 : RemotePath, RemoteStorage, S3Config, TimeTravelError, MAX_KEYS_PER_DELETE,
51 : REMOTE_STORAGE_PREFIX_SEPARATOR,
52 : };
53 :
54 : pub(super) mod metrics;
55 :
56 : use self::metrics::AttemptOutcome;
57 : pub(super) use self::metrics::RequestKind;
58 :
59 : /// AWS S3 storage.
60 : pub struct S3Bucket {
61 : client: Client,
62 : bucket_name: String,
63 : prefix_in_bucket: Option<String>,
64 : max_keys_per_list_response: Option<i32>,
65 : concurrency_limiter: ConcurrencyLimiter,
66 : }
67 :
68 : struct GetObjectRequest {
69 : bucket: String,
70 : key: String,
71 : range: Option<String>,
72 : }
73 : impl S3Bucket {
74 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
75 294 : pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
76 294 : tracing::debug!(
77 0 : "Creating s3 remote storage for S3 bucket {}",
78 0 : aws_config.bucket_name
79 0 : );
80 :
81 294 : let region = Some(Region::new(aws_config.bucket_region.clone()));
82 294 :
83 294 : let provider_conf = ProviderConfig::without_region().with_region(region.clone());
84 294 :
85 294 : let credentials_provider = {
86 294 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
87 294 : CredentialsProviderChain::first_try(
88 294 : "env",
89 294 : EnvironmentVariableCredentialsProvider::new(),
90 294 : )
91 294 : // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
92 294 : .or_else(
93 294 : "profile-sso",
94 294 : ProfileFileCredentialsProvider::builder()
95 294 : .configure(&provider_conf)
96 294 : .build(),
97 294 : )
98 294 : // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
99 294 : // needed to access remote extensions bucket
100 294 : .or_else(
101 294 : "token",
102 294 : WebIdentityTokenCredentialsProvider::builder()
103 294 : .configure(&provider_conf)
104 294 : .build(),
105 294 : )
106 294 : // uses imds v2
107 294 : .or_else("imds", ImdsCredentialsProvider::builder().build())
108 294 : };
109 294 :
110 294 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
111 294 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
112 294 :
113 294 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
114 294 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
115 294 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
116 294 : let mut retry_config = RetryConfigBuilder::new();
117 294 : retry_config
118 294 : .set_max_attempts(Some(1))
119 294 : .set_mode(Some(RetryMode::Adaptive));
120 294 :
121 294 : let mut config_builder = Builder::default()
122 294 : .behavior_version(BehaviorVersion::v2023_11_09())
123 294 : .region(region)
124 294 : .identity_cache(IdentityCache::lazy().build())
125 294 : .credentials_provider(SharedCredentialsProvider::new(credentials_provider))
126 294 : .retry_config(retry_config.build())
127 294 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
128 :
129 294 : if let Some(custom_endpoint) = aws_config.endpoint.clone() {
130 146 : config_builder = config_builder
131 146 : .endpoint_url(custom_endpoint)
132 146 : .force_path_style(true);
133 148 : }
134 :
135 294 : let client = Client::from_conf(config_builder.build());
136 294 :
137 294 : let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
138 292 : let mut prefix = prefix;
139 294 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
140 2 : prefix = &prefix[1..]
141 : }
142 :
143 292 : let mut prefix = prefix.to_string();
144 310 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
145 18 : prefix.pop();
146 18 : }
147 292 : prefix
148 294 : });
149 294 : Ok(Self {
150 294 : client,
151 294 : bucket_name: aws_config.bucket_name.clone(),
152 294 : max_keys_per_list_response: aws_config.max_keys_per_list_response,
153 294 : prefix_in_bucket,
154 294 : concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
155 294 : })
156 294 : }
157 :
158 2607 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
159 2607 : let relative_path =
160 2607 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
161 2607 : Some(stripped) => stripped,
162 : // we rely on AWS to return properly prefixed paths
163 : // for requests with a certain prefix
164 0 : None => panic!(
165 0 : "Key {} does not start with bucket prefix {:?}",
166 0 : key, self.prefix_in_bucket
167 0 : ),
168 : };
169 2607 : RemotePath(
170 2607 : relative_path
171 2607 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
172 2607 : .collect(),
173 2607 : )
174 2607 : }
175 :
176 34298 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
177 34298 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
178 34298 : let path_string = path
179 34298 : .get_path()
180 34298 : .as_str()
181 34298 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
182 34298 : match &self.prefix_in_bucket {
183 34292 : Some(prefix) => prefix.clone() + "/" + path_string,
184 6 : None => path_string.to_string(),
185 : }
186 34298 : }
187 :
188 19248 : async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
189 19006 : let started_at = start_counting_cancelled_wait(kind);
190 19006 : let permit = self
191 19006 : .concurrency_limiter
192 19006 : .acquire(kind)
193 4143 : .await
194 19006 : .expect("semaphore is never closed");
195 19006 :
196 19006 : let started_at = ScopeGuard::into_inner(started_at);
197 19006 : metrics::BUCKET_METRICS
198 19006 : .wait_seconds
199 19006 : .observe_elapsed(kind, started_at);
200 19006 :
201 19006 : permit
202 19006 : }
203 :
204 10485 : async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
205 10463 : let started_at = start_counting_cancelled_wait(kind);
206 10463 : let permit = self
207 10463 : .concurrency_limiter
208 10463 : .acquire_owned(kind)
209 0 : .await
210 10463 : .expect("semaphore is never closed");
211 10463 :
212 10463 : let started_at = ScopeGuard::into_inner(started_at);
213 10463 : metrics::BUCKET_METRICS
214 10463 : .wait_seconds
215 10463 : .observe_elapsed(kind, started_at);
216 10463 : permit
217 10463 : }
218 :
219 10485 : async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
220 10463 : let kind = RequestKind::Get;
221 10463 : let permit = self.owned_permit(kind).await;
222 :
223 10463 : let started_at = start_measuring_requests(kind);
224 :
225 10463 : let get_object = self
226 10463 : .client
227 10463 : .get_object()
228 10463 : .bucket(request.bucket)
229 10463 : .key(request.key)
230 10463 : .set_range(request.range)
231 10463 : .send()
232 30427 : .await;
233 :
234 10460 : let started_at = ScopeGuard::into_inner(started_at);
235 :
236 10252 : let object_output = match get_object {
237 10252 : Ok(object_output) => object_output,
238 208 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
239 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
240 : // an error: we expect to sometimes fetch an object and find it missing,
241 : // e.g. when probing for timeline indices.
242 208 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
243 208 : kind,
244 208 : AttemptOutcome::Ok,
245 208 : started_at,
246 208 : );
247 208 : return Err(DownloadError::NotFound);
248 : }
249 0 : Err(e) => {
250 0 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
251 0 : kind,
252 0 : AttemptOutcome::Err,
253 0 : started_at,
254 0 : );
255 0 :
256 0 : return Err(DownloadError::Other(
257 0 : anyhow::Error::new(e).context("download s3 object"),
258 0 : ));
259 : }
260 : };
261 :
262 10252 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
263 10252 : let etag = object_output.e_tag;
264 10252 : let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok());
265 10252 :
266 10252 : let body = object_output.body;
267 10252 : let body = ByteStreamAsStream::from(body);
268 10252 : let body = PermitCarrying::new(permit, body);
269 10252 : let body = TimedDownload::new(started_at, body);
270 10252 :
271 10252 : Ok(Download {
272 10252 : metadata,
273 10252 : etag,
274 10252 : last_modified,
275 10252 : download_stream: Box::pin(body),
276 10252 : })
277 10460 : }
278 :
279 2343 : async fn delete_oids(
280 2343 : &self,
281 2343 : kind: RequestKind,
282 2343 : delete_objects: &[ObjectIdentifier],
283 2343 : ) -> anyhow::Result<()> {
284 2241 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
285 2241 : let started_at = start_measuring_requests(kind);
286 :
287 2241 : let resp = self
288 2241 : .client
289 2241 : .delete_objects()
290 2241 : .bucket(self.bucket_name.clone())
291 2241 : .delete(
292 2241 : Delete::builder()
293 2241 : .set_objects(Some(chunk.to_vec()))
294 2241 : .build()?,
295 : )
296 2241 : .send()
297 9319 : .await;
298 :
299 2241 : let started_at = ScopeGuard::into_inner(started_at);
300 2241 : metrics::BUCKET_METRICS
301 2241 : .req_seconds
302 2241 : .observe_elapsed(kind, &resp, started_at);
303 :
304 2241 : let resp = resp?;
305 2241 : metrics::BUCKET_METRICS
306 2241 : .deleted_objects_total
307 2241 : .inc_by(chunk.len() as u64);
308 2241 : if let Some(errors) = resp.errors {
309 : // Log a bounded number of the errors within the response:
310 : // these requests can carry 1000 keys so logging each one
311 : // would be too verbose, especially as errors may lead us
312 : // to retry repeatedly.
313 : const LOG_UP_TO_N_ERRORS: usize = 10;
314 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
315 0 : tracing::warn!(
316 0 : "DeleteObjects key {} failed: {}: {}",
317 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
318 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
319 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
320 0 : );
321 : }
322 :
323 0 : return Err(anyhow::format_err!(
324 0 : "Failed to delete {} objects",
325 0 : errors.len()
326 0 : ));
327 2241 : }
328 : }
329 2241 : Ok(())
330 2241 : }
331 : }
332 :
333 : pin_project_lite::pin_project! {
334 : struct ByteStreamAsStream {
335 : #[pin]
336 : inner: aws_smithy_types::byte_stream::ByteStream
337 : }
338 : }
339 :
340 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
341 10272 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
342 10272 : ByteStreamAsStream { inner }
343 10272 : }
344 : }
345 :
346 : impl Stream for ByteStreamAsStream {
347 : type Item = std::io::Result<Bytes>;
348 :
349 164238 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
350 164238 : // this does the std::io::ErrorKind::Other conversion
351 164238 : self.project().inner.poll_next(cx).map_err(|x| x.into())
352 164238 : }
353 :
354 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
355 : // sense and Stream::size_hint does not really
356 : }
357 :
358 : pin_project_lite::pin_project! {
359 : /// Times and tracks the outcome of the request.
360 : struct TimedDownload<S> {
361 : started_at: std::time::Instant,
362 : outcome: metrics::AttemptOutcome,
363 : #[pin]
364 : inner: S
365 : }
366 :
367 : impl<S> PinnedDrop for TimedDownload<S> {
368 : fn drop(mut this: Pin<&mut Self>) {
369 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
370 : }
371 : }
372 : }
373 :
374 : impl<S> TimedDownload<S> {
375 10252 : fn new(started_at: std::time::Instant, inner: S) -> Self {
376 10252 : TimedDownload {
377 10252 : started_at,
378 10252 : outcome: metrics::AttemptOutcome::Cancelled,
379 10252 : inner,
380 10252 : }
381 10252 : }
382 : }
383 :
384 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
385 : type Item = <S as Stream>::Item;
386 :
387 164202 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
388 164202 : use std::task::ready;
389 164202 :
390 164202 : let this = self.project();
391 :
392 164202 : let res = ready!(this.inner.poll_next(cx));
393 80382 : match &res {
394 80382 : Some(Ok(_)) => {}
395 0 : Some(Err(_)) => *this.outcome = metrics::AttemptOutcome::Err,
396 23645 : None => *this.outcome = metrics::AttemptOutcome::Ok,
397 : }
398 :
399 104027 : Poll::Ready(res)
400 164202 : }
401 :
402 0 : fn size_hint(&self) -> (usize, Option<usize>) {
403 0 : self.inner.size_hint()
404 0 : }
405 : }
406 :
407 : impl RemoteStorage for S3Bucket {
408 561 : async fn list(
409 561 : &self,
410 561 : prefix: Option<&RemotePath>,
411 561 : mode: ListingMode,
412 561 : max_keys: Option<NonZeroU32>,
413 561 : ) -> Result<Listing, DownloadError> {
414 537 : let kind = RequestKind::List;
415 537 : // s3 sdk wants i32
416 537 : let mut max_keys = max_keys.map(|mk| mk.get() as i32);
417 537 : let mut result = Listing::default();
418 537 :
419 537 : // get the passed prefix or if it is not set use prefix_in_bucket value
420 537 : let list_prefix = prefix
421 537 : .map(|p| self.relative_path_to_s3_object(p))
422 537 : .or_else(|| self.prefix_in_bucket.clone())
423 537 : .map(|mut p| {
424 : // required to end with a separator
425 : // otherwise request will return only the entry of a prefix
426 537 : if matches!(mode, ListingMode::WithDelimiter)
427 293 : && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
428 293 : {
429 293 : p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
430 293 : }
431 537 : p
432 537 : });
433 537 :
434 537 : let mut continuation_token = None;
435 :
436 : loop {
437 537 : let _guard = self.permit(kind).await;
438 537 : let started_at = start_measuring_requests(kind);
439 537 :
440 537 : // min of two Options, returning Some if one is value and another is
441 537 : // None (None is smaller than anything, so plain min doesn't work).
442 537 : let request_max_keys = self
443 537 : .max_keys_per_list_response
444 537 : .into_iter()
445 537 : .chain(max_keys.into_iter())
446 537 : .min();
447 537 : let mut request = self
448 537 : .client
449 537 : .list_objects_v2()
450 537 : .bucket(self.bucket_name.clone())
451 537 : .set_prefix(list_prefix.clone())
452 537 : .set_continuation_token(continuation_token)
453 537 : .set_max_keys(request_max_keys);
454 537 :
455 537 : if let ListingMode::WithDelimiter = mode {
456 293 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
457 293 : }
458 :
459 537 : let response = request
460 537 : .send()
461 2878 : .await
462 537 : .context("Failed to list S3 prefixes")
463 537 : .map_err(DownloadError::Other);
464 537 :
465 537 : let started_at = ScopeGuard::into_inner(started_at);
466 537 :
467 537 : metrics::BUCKET_METRICS
468 537 : .req_seconds
469 537 : .observe_elapsed(kind, &response, started_at);
470 :
471 537 : let response = response?;
472 :
473 537 : let keys = response.contents();
474 537 : let empty = Vec::new();
475 537 : let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
476 :
477 0 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
478 :
479 2837 : for object in keys {
480 2300 : let object_path = object.key().expect("response does not contain a key");
481 2300 : let remote_path = self.s3_object_to_relative_path(object_path);
482 2300 : result.keys.push(remote_path);
483 2300 : if let Some(mut mk) = max_keys {
484 4 : assert!(mk > 0);
485 4 : mk -= 1;
486 4 : if mk == 0 {
487 0 : return Ok(result); // limit reached
488 4 : }
489 4 : max_keys = Some(mk);
490 2296 : }
491 : }
492 :
493 537 : result.prefixes.extend(
494 537 : prefixes
495 537 : .iter()
496 537 : .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
497 537 : );
498 :
499 537 : continuation_token = match response.next_continuation_token {
500 0 : Some(new_token) => Some(new_token),
501 537 : None => break,
502 537 : };
503 537 : }
504 537 :
505 537 : Ok(result)
506 537 : }
507 :
508 16215 : async fn upload(
509 16215 : &self,
510 16215 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
511 16215 : from_size_bytes: usize,
512 16215 : to: &RemotePath,
513 16215 : metadata: Option<StorageMetadata>,
514 16215 : ) -> anyhow::Result<()> {
515 16215 : let kind = RequestKind::Put;
516 16215 : let _guard = self.permit(kind).await;
517 :
518 16215 : let started_at = start_measuring_requests(kind);
519 16215 :
520 16215 : let body = Body::wrap_stream(from);
521 16215 : let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
522 :
523 16215 : let res = self
524 16215 : .client
525 16215 : .put_object()
526 16215 : .bucket(self.bucket_name.clone())
527 16215 : .key(self.relative_path_to_s3_object(to))
528 16215 : .set_metadata(metadata.map(|m| m.0))
529 16215 : .content_length(from_size_bytes.try_into()?)
530 16215 : .body(bytes_stream)
531 16215 : .send()
532 52442 : .await;
533 :
534 16214 : let started_at = ScopeGuard::into_inner(started_at);
535 16214 : metrics::BUCKET_METRICS
536 16214 : .req_seconds
537 16214 : .observe_elapsed(kind, &res, started_at);
538 16214 :
539 16214 : res?;
540 :
541 16214 : Ok(())
542 16214 : }
543 :
544 14 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
545 12 : let kind = RequestKind::Copy;
546 12 : let _guard = self.permit(kind).await;
547 :
548 12 : let started_at = start_measuring_requests(kind);
549 12 :
550 12 : // we need to specify bucket_name as a prefix
551 12 : let copy_source = format!(
552 12 : "{}/{}",
553 12 : self.bucket_name,
554 12 : self.relative_path_to_s3_object(from)
555 12 : );
556 :
557 12 : let res = self
558 12 : .client
559 12 : .copy_object()
560 12 : .bucket(self.bucket_name.clone())
561 12 : .key(self.relative_path_to_s3_object(to))
562 12 : .copy_source(copy_source)
563 12 : .send()
564 48 : .await;
565 :
566 12 : let started_at = ScopeGuard::into_inner(started_at);
567 12 : metrics::BUCKET_METRICS
568 12 : .req_seconds
569 12 : .observe_elapsed(kind, &res, started_at);
570 12 :
571 12 : res?;
572 :
573 12 : Ok(())
574 12 : }
575 :
576 10439 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
577 10427 : // if prefix is not none then download file `prefix/from`
578 10427 : // if prefix is none then download file `from`
579 10427 : self.download_object(GetObjectRequest {
580 10427 : bucket: self.bucket_name.clone(),
581 10427 : key: self.relative_path_to_s3_object(from),
582 10427 : range: None,
583 10427 : })
584 30321 : .await
585 10424 : }
586 :
587 46 : async fn download_byte_range(
588 46 : &self,
589 46 : from: &RemotePath,
590 46 : start_inclusive: u64,
591 46 : end_exclusive: Option<u64>,
592 46 : ) -> Result<Download, DownloadError> {
593 36 : // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
594 36 : // and needs both ends to be exclusive
595 36 : let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
596 36 : let range = Some(match end_inclusive {
597 0 : Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
598 36 : None => format!("bytes={start_inclusive}-"),
599 : });
600 :
601 36 : self.download_object(GetObjectRequest {
602 36 : bucket: self.bucket_name.clone(),
603 36 : key: self.relative_path_to_s3_object(from),
604 36 : range,
605 36 : })
606 106 : .await
607 36 : }
608 2339 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
609 2241 : let kind = RequestKind::Delete;
610 2241 : let _guard = self.permit(kind).await;
611 :
612 2241 : let mut delete_objects = Vec::with_capacity(paths.len());
613 9029 : for path in paths {
614 6788 : let obj_id = ObjectIdentifier::builder()
615 6788 : .set_key(Some(self.relative_path_to_s3_object(path)))
616 6788 : .build()?;
617 6788 : delete_objects.push(obj_id);
618 : }
619 :
620 9319 : self.delete_oids(kind, &delete_objects).await
621 2241 : }
622 :
623 2106 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
624 2016 : let paths = std::array::from_ref(path);
625 8284 : self.delete_objects(paths).await
626 2016 : }
627 :
628 7 : async fn time_travel_recover(
629 7 : &self,
630 7 : prefix: Option<&RemotePath>,
631 7 : timestamp: SystemTime,
632 7 : done_if_after: SystemTime,
633 7 : cancel: &CancellationToken,
634 7 : ) -> Result<(), TimeTravelError> {
635 1 : let kind = RequestKind::TimeTravel;
636 1 : let _guard = self.permit(kind).await;
637 :
638 1 : let timestamp = DateTime::from(timestamp);
639 1 : let done_if_after = DateTime::from(done_if_after);
640 :
641 0 : tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
642 :
643 : // get the passed prefix or if it is not set use prefix_in_bucket value
644 1 : let prefix = prefix
645 1 : .map(|p| self.relative_path_to_s3_object(p))
646 1 : .or_else(|| self.prefix_in_bucket.clone());
647 1 :
648 1 : let warn_threshold = 3;
649 1 : let max_retries = 10;
650 1 : let is_permanent = |_e: &_| false;
651 :
652 1 : let mut key_marker = None;
653 1 : let mut version_id_marker = None;
654 1 : let mut versions_and_deletes = Vec::new();
655 :
656 : loop {
657 1 : let response = backoff::retry(
658 1 : || async {
659 1 : self.client
660 1 : .list_object_versions()
661 1 : .bucket(self.bucket_name.clone())
662 1 : .set_prefix(prefix.clone())
663 1 : .set_key_marker(key_marker.clone())
664 1 : .set_version_id_marker(version_id_marker.clone())
665 1 : .send()
666 41 : .await
667 1 : .map_err(|e| TimeTravelError::Other(e.into()))
668 1 : },
669 1 : is_permanent,
670 1 : warn_threshold,
671 1 : max_retries,
672 1 : "listing object versions for time_travel_recover",
673 1 : cancel,
674 1 : )
675 41 : .await
676 1 : .ok_or_else(|| TimeTravelError::Cancelled)
677 1 : .and_then(|x| x)?;
678 :
679 0 : tracing::trace!(
680 0 : " Got List response version_id_marker={:?}, key_marker={:?}",
681 0 : response.version_id_marker,
682 0 : response.key_marker
683 0 : );
684 1 : let versions = response
685 1 : .versions
686 1 : .unwrap_or_default()
687 1 : .into_iter()
688 1 : .map(VerOrDelete::from_version);
689 1 : let deletes = response
690 1 : .delete_markers
691 1 : .unwrap_or_default()
692 1 : .into_iter()
693 1 : .map(VerOrDelete::from_delete_marker);
694 1 : itertools::process_results(versions.chain(deletes), |n_vds| {
695 1 : versions_and_deletes.extend(n_vds)
696 1 : })
697 1 : .map_err(TimeTravelError::Other)?;
698 14 : fn none_if_empty(v: Option<String>) -> Option<String> {
699 14 : v.filter(|v| !v.is_empty())
700 14 : }
701 1 : version_id_marker = none_if_empty(response.next_version_id_marker);
702 1 : key_marker = none_if_empty(response.next_key_marker);
703 1 : if version_id_marker.is_none() {
704 : // The final response is not supposed to be truncated
705 1 : if response.is_truncated.unwrap_or_default() {
706 0 : return Err(TimeTravelError::Other(anyhow::anyhow!(
707 0 : "Received truncated ListObjectVersions response for prefix={prefix:?}"
708 0 : )));
709 1 : }
710 1 : break;
711 0 : }
712 0 : // Limit the number of versions deletions, mostly so that we don't
713 0 : // keep requesting forever if the list is too long, as we'd put the
714 0 : // list in RAM.
715 0 : // Building a list of 100k entries that reaches the limit roughly takes
716 0 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
717 0 : const COMPLEXITY_LIMIT: usize = 100_000;
718 0 : if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
719 0 : return Err(TimeTravelError::TooManyVersions);
720 0 : }
721 : }
722 :
723 1 : tracing::info!(
724 1 : "Built list for time travel with {} versions and deletions",
725 1 : versions_and_deletes.len()
726 1 : );
727 :
728 : // Work on the list of references instead of the objects directly,
729 : // otherwise we get lifetime errors in the sort_by_key call below.
730 1 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
731 1 :
732 2214 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
733 1 :
734 1 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
735 :
736 268 : for vd in &versions_and_deletes {
737 : let VerOrDelete {
738 267 : version_id, key, ..
739 267 : } = &vd;
740 267 : if version_id == "null" {
741 0 : return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
742 0 : indicating either disabled versioning, or legacy objects with null version id values")));
743 267 : }
744 0 : tracing::trace!(
745 0 : "Parsing version key={key} version_id={version_id} kind={:?}",
746 0 : vd.kind
747 0 : );
748 :
749 267 : vds_for_key.entry(key).or_default().push(vd);
750 : }
751 93 : for (key, versions) in vds_for_key {
752 92 : let last_vd = versions.last().unwrap();
753 92 : if last_vd.last_modified > done_if_after {
754 0 : tracing::trace!("Key {key} has version later than done_if_after, skipping");
755 0 : continue;
756 92 : }
757 : // the version we want to restore to.
758 92 : let version_to_restore_to =
759 188 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
760 0 : Ok(v) => v,
761 92 : Err(e) => e,
762 : };
763 92 : if version_to_restore_to == versions.len() {
764 0 : tracing::trace!("Key {key} has no changes since timestamp, skipping");
765 0 : continue;
766 92 : }
767 92 : let mut do_delete = false;
768 92 : if version_to_restore_to == 0 {
769 : // All versions more recent, so the key didn't exist at the specified time point.
770 0 : tracing::trace!(
771 0 : "All {} versions more recent for {key}, deleting",
772 0 : versions.len()
773 0 : );
774 3 : do_delete = true;
775 : } else {
776 89 : match &versions[version_to_restore_to - 1] {
777 : VerOrDelete {
778 : kind: VerOrDeleteKind::Version,
779 89 : version_id,
780 : ..
781 : } => {
782 0 : tracing::trace!("Copying old version {version_id} for {key}...");
783 : // Restore the state to the last version by copying
784 89 : let source_id =
785 89 : format!("{}/{key}?versionId={version_id}", self.bucket_name);
786 89 :
787 89 : backoff::retry(
788 89 : || async {
789 89 : self.client
790 89 : .copy_object()
791 89 : .bucket(self.bucket_name.clone())
792 89 : .key(key)
793 89 : .copy_source(&source_id)
794 89 : .send()
795 110 : .await
796 89 : .map_err(|e| TimeTravelError::Other(e.into()))
797 89 : },
798 89 : is_permanent,
799 89 : warn_threshold,
800 89 : max_retries,
801 89 : "copying object version for time_travel_recover",
802 89 : cancel,
803 89 : )
804 110 : .await
805 89 : .ok_or_else(|| TimeTravelError::Cancelled)
806 89 : .and_then(|x| x)?;
807 89 : tracing::info!(%version_id, %key, "Copied old version in S3");
808 : }
809 : VerOrDelete {
810 : kind: VerOrDeleteKind::DeleteMarker,
811 : ..
812 0 : } => {
813 0 : do_delete = true;
814 0 : }
815 : }
816 : };
817 92 : if do_delete {
818 3 : if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
819 : // Key has since been deleted (but there was some history), no need to do anything
820 0 : tracing::trace!("Key {key} already deleted, skipping.");
821 : } else {
822 0 : tracing::trace!("Deleting {key}...");
823 :
824 0 : let oid = ObjectIdentifier::builder()
825 0 : .key(key.to_owned())
826 0 : .build()
827 0 : .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
828 0 : self.delete_oids(kind, &[oid])
829 0 : .await
830 0 : .map_err(TimeTravelError::Other)?;
831 : }
832 89 : }
833 : }
834 1 : Ok(())
835 1 : }
836 : }
837 :
838 : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
839 29733 : fn start_counting_cancelled_wait(
840 29733 : kind: RequestKind,
841 29733 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
842 29733 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
843 0 : metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
844 29733 : })
845 29733 : }
846 :
847 : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
848 29730 : fn start_measuring_requests(
849 29730 : kind: RequestKind,
850 29730 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
851 29730 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
852 3 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
853 3 : kind,
854 3 : AttemptOutcome::Cancelled,
855 3 : started_at,
856 3 : )
857 29730 : })
858 29730 : }
859 :
860 : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
861 : struct VerOrDelete {
862 : kind: VerOrDeleteKind,
863 : last_modified: DateTime,
864 : version_id: String,
865 : key: String,
866 : }
867 :
868 0 : #[derive(Debug)]
869 : enum VerOrDeleteKind {
870 : Version,
871 : DeleteMarker,
872 : }
873 :
874 : impl VerOrDelete {
875 303 : fn with_kind(
876 303 : kind: VerOrDeleteKind,
877 303 : last_modified: Option<DateTime>,
878 303 : version_id: Option<String>,
879 303 : key: Option<String>,
880 303 : ) -> anyhow::Result<Self> {
881 303 : let lvk = (last_modified, version_id, key);
882 303 : let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
883 0 : anyhow::bail!(
884 0 : "One (or more) of last_modified, key, and id is None. \
885 0 : Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
886 0 : lvk.0,
887 0 : lvk.1,
888 0 : lvk.2,
889 0 : );
890 : };
891 303 : Ok(Self {
892 303 : kind,
893 303 : last_modified,
894 303 : version_id,
895 303 : key,
896 303 : })
897 303 : }
898 163 : fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
899 163 : Self::with_kind(
900 163 : VerOrDeleteKind::Version,
901 163 : v.last_modified,
902 163 : v.version_id,
903 163 : v.key,
904 163 : )
905 163 : }
906 140 : fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
907 140 : Self::with_kind(
908 140 : VerOrDeleteKind::DeleteMarker,
909 140 : v.last_modified,
910 140 : v.version_id,
911 140 : v.key,
912 140 : )
913 140 : }
914 : }
915 :
916 : #[cfg(test)]
917 : mod tests {
918 : use camino::Utf8Path;
919 : use std::num::NonZeroUsize;
920 :
921 : use crate::{RemotePath, S3Bucket, S3Config};
922 :
923 2 : #[test]
924 2 : fn relative_path() {
925 2 : let all_paths = ["", "some/path", "some/path/"];
926 2 : let all_paths: Vec<RemotePath> = all_paths
927 2 : .iter()
928 6 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
929 2 : .collect();
930 2 : let prefixes = [
931 2 : None,
932 2 : Some(""),
933 2 : Some("test/prefix"),
934 2 : Some("test/prefix/"),
935 2 : Some("/test/prefix/"),
936 2 : ];
937 2 : let expected_outputs = vec![
938 2 : vec!["", "some/path", "some/path"],
939 2 : vec!["/", "/some/path", "/some/path"],
940 2 : vec![
941 2 : "test/prefix/",
942 2 : "test/prefix/some/path",
943 2 : "test/prefix/some/path",
944 2 : ],
945 2 : vec![
946 2 : "test/prefix/",
947 2 : "test/prefix/some/path",
948 2 : "test/prefix/some/path",
949 2 : ],
950 2 : vec![
951 2 : "test/prefix/",
952 2 : "test/prefix/some/path",
953 2 : "test/prefix/some/path",
954 2 : ],
955 2 : ];
956 :
957 10 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
958 10 : let config = S3Config {
959 10 : bucket_name: "bucket".to_owned(),
960 10 : bucket_region: "region".to_owned(),
961 10 : prefix_in_bucket: prefix.map(str::to_string),
962 10 : endpoint: None,
963 10 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
964 10 : max_keys_per_list_response: Some(5),
965 10 : };
966 10 : let storage = S3Bucket::new(&config).expect("remote storage init");
967 30 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
968 30 : let result = storage.relative_path_to_s3_object(test_path);
969 30 : let expected = expected_outputs[prefix_idx][test_path_idx];
970 30 : assert_eq!(result, expected);
971 : }
972 : }
973 2 : }
974 : }
|