Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::{
8 : borrow::Cow,
9 : collections::HashMap,
10 : num::NonZeroU32,
11 : pin::Pin,
12 : sync::Arc,
13 : task::{Context, Poll},
14 : time::{Duration, SystemTime},
15 : };
16 :
17 : use anyhow::{anyhow, Context as _};
18 : use aws_config::{
19 : environment::credentials::EnvironmentVariableCredentialsProvider,
20 : imds::credentials::ImdsCredentialsProvider,
21 : meta::credentials::CredentialsProviderChain,
22 : profile::ProfileFileCredentialsProvider,
23 : provider_config::ProviderConfig,
24 : retry::{RetryConfigBuilder, RetryMode},
25 : web_identity_token::WebIdentityTokenCredentialsProvider,
26 : BehaviorVersion,
27 : };
28 : use aws_credential_types::provider::SharedCredentialsProvider;
29 : use aws_sdk_s3::{
30 : config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
31 : error::SdkError,
32 : operation::get_object::GetObjectError,
33 : types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
34 : Client,
35 : };
36 : use aws_smithy_async::rt::sleep::TokioSleep;
37 :
38 : use aws_smithy_types::{body::SdkBody, DateTime};
39 : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
40 : use bytes::Bytes;
41 : use futures::stream::Stream;
42 : use hyper::Body;
43 : use scopeguard::ScopeGuard;
44 : use tokio_util::sync::CancellationToken;
45 : use utils::backoff;
46 :
47 : use super::StorageMetadata;
48 : use crate::{
49 : error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError,
50 : Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel,
51 : MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
52 : };
53 :
54 : pub(super) mod metrics;
55 :
56 : use self::metrics::AttemptOutcome;
57 : pub(super) use self::metrics::RequestKind;
58 :
59 : /// AWS S3 storage.
60 : pub struct S3Bucket {
61 : client: Client,
62 : bucket_name: String,
63 : prefix_in_bucket: Option<String>,
64 : max_keys_per_list_response: Option<i32>,
65 : upload_storage_class: Option<StorageClass>,
66 : concurrency_limiter: ConcurrencyLimiter,
67 : // Per-request timeout. Accessible for tests.
68 : pub timeout: Duration,
69 : }
70 :
71 : struct GetObjectRequest {
72 : bucket: String,
73 : key: String,
74 : range: Option<String>,
75 : }
76 : impl S3Bucket {
77 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
78 28 : pub fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
79 28 : tracing::debug!(
80 0 : "Creating s3 remote storage for S3 bucket {}",
81 : remote_storage_config.bucket_name
82 : );
83 :
84 28 : let region = Some(Region::new(remote_storage_config.bucket_region.clone()));
85 28 :
86 28 : let provider_conf = ProviderConfig::without_region().with_region(region.clone());
87 28 :
88 28 : let credentials_provider = {
89 28 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
90 28 : CredentialsProviderChain::first_try(
91 28 : "env",
92 28 : EnvironmentVariableCredentialsProvider::new(),
93 28 : )
94 28 : // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
95 28 : .or_else(
96 28 : "profile-sso",
97 28 : ProfileFileCredentialsProvider::builder()
98 28 : .configure(&provider_conf)
99 28 : .build(),
100 28 : )
101 28 : // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
102 28 : // needed to access remote extensions bucket
103 28 : .or_else(
104 28 : "token",
105 28 : WebIdentityTokenCredentialsProvider::builder()
106 28 : .configure(&provider_conf)
107 28 : .build(),
108 28 : )
109 28 : // uses imds v2
110 28 : .or_else("imds", ImdsCredentialsProvider::builder().build())
111 28 : };
112 28 :
113 28 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
114 28 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
115 28 :
116 28 : let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
117 28 : #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
118 28 : BehaviorVersion::v2023_11_09(),
119 28 : )
120 28 : .region(region)
121 28 : .identity_cache(IdentityCache::lazy().build())
122 28 : .credentials_provider(SharedCredentialsProvider::new(credentials_provider))
123 28 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
124 28 :
125 28 : let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
126 28 : s.spawn(|| {
127 28 : // TODO: make this function async.
128 28 : tokio::runtime::Builder::new_current_thread()
129 28 : .enable_all()
130 28 : .build()
131 28 : .unwrap()
132 28 : .block_on(sdk_config_loader.load())
133 28 : })
134 28 : .join()
135 28 : .unwrap()
136 28 : });
137 28 :
138 28 : let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
139 :
140 : // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
141 : // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
142 28 : if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
143 0 : s3_config_builder = s3_config_builder
144 0 : .endpoint_url(custom_endpoint)
145 0 : .force_path_style(true);
146 28 : }
147 :
148 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
149 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
150 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
151 28 : let mut retry_config = RetryConfigBuilder::new();
152 28 : retry_config
153 28 : .set_max_attempts(Some(1))
154 28 : .set_mode(Some(RetryMode::Adaptive));
155 28 : s3_config_builder = s3_config_builder.retry_config(retry_config.build());
156 28 :
157 28 : let s3_config = s3_config_builder.build();
158 28 : let client = aws_sdk_s3::Client::from_conf(s3_config);
159 28 :
160 28 : let prefix_in_bucket = remote_storage_config
161 28 : .prefix_in_bucket
162 28 : .as_deref()
163 28 : .map(|prefix| {
164 26 : let mut prefix = prefix;
165 28 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
166 2 : prefix = &prefix[1..]
167 : }
168 :
169 26 : let mut prefix = prefix.to_string();
170 48 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
171 22 : prefix.pop();
172 22 : }
173 26 : prefix
174 28 : });
175 28 :
176 28 : Ok(Self {
177 28 : client,
178 28 : bucket_name: remote_storage_config.bucket_name.clone(),
179 28 : max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
180 28 : prefix_in_bucket,
181 28 : concurrency_limiter: ConcurrencyLimiter::new(
182 28 : remote_storage_config.concurrency_limit.get(),
183 28 : ),
184 28 : upload_storage_class: remote_storage_config.upload_storage_class.clone(),
185 28 : timeout,
186 28 : })
187 28 : }
188 :
189 126 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
190 126 : let relative_path =
191 126 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
192 126 : Some(stripped) => stripped,
193 : // we rely on AWS to return properly prefixed paths
194 : // for requests with a certain prefix
195 0 : None => panic!(
196 0 : "Key {} does not start with bucket prefix {:?}",
197 0 : key, self.prefix_in_bucket
198 0 : ),
199 : };
200 126 : RemotePath(
201 126 : relative_path
202 126 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
203 126 : .collect(),
204 126 : )
205 126 : }
206 :
207 279 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
208 279 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
209 279 : let path_string = path.get_path().as_str();
210 279 : match &self.prefix_in_bucket {
211 273 : Some(prefix) => prefix.clone() + "/" + path_string,
212 6 : None => path_string.to_string(),
213 : }
214 279 : }
215 :
216 241 : async fn permit(
217 241 : &self,
218 241 : kind: RequestKind,
219 241 : cancel: &CancellationToken,
220 241 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
221 0 : let started_at = start_counting_cancelled_wait(kind);
222 0 : let acquire = self.concurrency_limiter.acquire(kind);
223 :
224 0 : let permit = tokio::select! {
225 : permit = acquire => permit.expect("semaphore is never closed"),
226 : _ = cancel.cancelled() => return Err(Cancelled),
227 : };
228 :
229 0 : let started_at = ScopeGuard::into_inner(started_at);
230 0 : metrics::BUCKET_METRICS
231 0 : .wait_seconds
232 0 : .observe_elapsed(kind, started_at);
233 0 :
234 0 : Ok(permit)
235 0 : }
236 :
237 24 : async fn owned_permit(
238 24 : &self,
239 24 : kind: RequestKind,
240 24 : cancel: &CancellationToken,
241 24 : ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
242 0 : let started_at = start_counting_cancelled_wait(kind);
243 0 : let acquire = self.concurrency_limiter.acquire_owned(kind);
244 :
245 0 : let permit = tokio::select! {
246 : permit = acquire => permit.expect("semaphore is never closed"),
247 : _ = cancel.cancelled() => return Err(Cancelled),
248 : };
249 :
250 0 : let started_at = ScopeGuard::into_inner(started_at);
251 0 : metrics::BUCKET_METRICS
252 0 : .wait_seconds
253 0 : .observe_elapsed(kind, started_at);
254 0 : Ok(permit)
255 0 : }
256 :
257 24 : async fn download_object(
258 24 : &self,
259 24 : request: GetObjectRequest,
260 24 : cancel: &CancellationToken,
261 24 : ) -> Result<Download, DownloadError> {
262 0 : let kind = RequestKind::Get;
263 :
264 0 : let permit = self.owned_permit(kind, cancel).await?;
265 :
266 0 : let started_at = start_measuring_requests(kind);
267 0 :
268 0 : let get_object = self
269 0 : .client
270 0 : .get_object()
271 0 : .bucket(request.bucket)
272 0 : .key(request.key)
273 0 : .set_range(request.range)
274 0 : .send();
275 :
276 0 : let get_object = tokio::select! {
277 : res = get_object => res,
278 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
279 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
280 : };
281 :
282 0 : let started_at = ScopeGuard::into_inner(started_at);
283 :
284 0 : let object_output = match get_object {
285 0 : Ok(object_output) => object_output,
286 0 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
287 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
288 : // an error: we expect to sometimes fetch an object and find it missing,
289 : // e.g. when probing for timeline indices.
290 0 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
291 0 : kind,
292 0 : AttemptOutcome::Ok,
293 0 : started_at,
294 0 : );
295 0 : return Err(DownloadError::NotFound);
296 : }
297 0 : Err(e) => {
298 0 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
299 0 : kind,
300 0 : AttemptOutcome::Err,
301 0 : started_at,
302 0 : );
303 0 :
304 0 : return Err(DownloadError::Other(
305 0 : anyhow::Error::new(e).context("download s3 object"),
306 0 : ));
307 : }
308 : };
309 :
310 : // even if we would have no timeout left, continue anyways. the caller can decide to ignore
311 : // the errors considering timeouts and cancellation.
312 0 : let remaining = self.timeout.saturating_sub(started_at.elapsed());
313 0 :
314 0 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
315 0 : let etag = object_output
316 0 : .e_tag
317 0 : .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
318 0 : .into();
319 0 : let last_modified = object_output
320 0 : .last_modified
321 0 : .ok_or(DownloadError::Other(anyhow::anyhow!(
322 0 : "Missing LastModified header"
323 0 : )))?
324 0 : .try_into()
325 0 : .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
326 :
327 0 : let body = object_output.body;
328 0 : let body = ByteStreamAsStream::from(body);
329 0 : let body = PermitCarrying::new(permit, body);
330 0 : let body = TimedDownload::new(started_at, body);
331 0 :
332 0 : let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
333 0 : let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
334 0 :
335 0 : Ok(Download {
336 0 : metadata,
337 0 : etag,
338 0 : last_modified,
339 0 : download_stream: Box::pin(body),
340 0 : })
341 0 : }
342 :
343 106 : async fn delete_oids(
344 106 : &self,
345 106 : _permit: &tokio::sync::SemaphorePermit<'_>,
346 106 : delete_objects: &[ObjectIdentifier],
347 106 : cancel: &CancellationToken,
348 106 : ) -> anyhow::Result<()> {
349 0 : let kind = RequestKind::Delete;
350 0 : let mut cancel = std::pin::pin!(cancel.cancelled());
351 :
352 0 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
353 0 : let started_at = start_measuring_requests(kind);
354 :
355 0 : let req = self
356 0 : .client
357 0 : .delete_objects()
358 0 : .bucket(self.bucket_name.clone())
359 0 : .delete(
360 0 : Delete::builder()
361 0 : .set_objects(Some(chunk.to_vec()))
362 0 : .build()
363 0 : .context("build request")?,
364 : )
365 0 : .send();
366 :
367 0 : let resp = tokio::select! {
368 : resp = req => resp,
369 : _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
370 : _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
371 : };
372 :
373 0 : let started_at = ScopeGuard::into_inner(started_at);
374 0 : metrics::BUCKET_METRICS
375 0 : .req_seconds
376 0 : .observe_elapsed(kind, &resp, started_at);
377 :
378 0 : let resp = resp.context("request deletion")?;
379 0 : metrics::BUCKET_METRICS
380 0 : .deleted_objects_total
381 0 : .inc_by(chunk.len() as u64);
382 :
383 0 : if let Some(errors) = resp.errors {
384 : // Log a bounded number of the errors within the response:
385 : // these requests can carry 1000 keys so logging each one
386 : // would be too verbose, especially as errors may lead us
387 : // to retry repeatedly.
388 : const LOG_UP_TO_N_ERRORS: usize = 10;
389 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
390 0 : tracing::warn!(
391 0 : "DeleteObjects key {} failed: {}: {}",
392 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
393 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
394 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
395 : );
396 : }
397 :
398 0 : return Err(anyhow::anyhow!(
399 0 : "Failed to delete {}/{} objects",
400 0 : errors.len(),
401 0 : chunk.len(),
402 0 : ));
403 0 : }
404 : }
405 0 : Ok(())
406 0 : }
407 : }
408 :
409 : pin_project_lite::pin_project! {
410 : struct ByteStreamAsStream {
411 : #[pin]
412 : inner: aws_smithy_types::byte_stream::ByteStream
413 : }
414 : }
415 :
416 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
417 24 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
418 24 : ByteStreamAsStream { inner }
419 24 : }
420 : }
421 :
422 : impl Stream for ByteStreamAsStream {
423 : type Item = std::io::Result<Bytes>;
424 :
425 47 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
426 47 : // this does the std::io::ErrorKind::Other conversion
427 47 : self.project().inner.poll_next(cx).map_err(|x| x.into())
428 47 : }
429 :
430 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
431 : // sense and Stream::size_hint does not really
432 : }
433 :
434 : pin_project_lite::pin_project! {
435 : /// Times and tracks the outcome of the request.
436 : struct TimedDownload<S> {
437 : started_at: std::time::Instant,
438 : outcome: metrics::AttemptOutcome,
439 : #[pin]
440 : inner: S
441 : }
442 :
443 : impl<S> PinnedDrop for TimedDownload<S> {
444 : fn drop(mut this: Pin<&mut Self>) {
445 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
446 : }
447 : }
448 : }
449 :
450 : impl<S> TimedDownload<S> {
451 0 : fn new(started_at: std::time::Instant, inner: S) -> Self {
452 0 : TimedDownload {
453 0 : started_at,
454 0 : outcome: metrics::AttemptOutcome::Cancelled,
455 0 : inner,
456 0 : }
457 0 : }
458 : }
459 :
460 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
461 : type Item = <S as Stream>::Item;
462 :
463 0 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
464 0 : use std::task::ready;
465 0 :
466 0 : let this = self.project();
467 :
468 0 : let res = ready!(this.inner.poll_next(cx));
469 0 : match &res {
470 0 : Some(Ok(_)) => {}
471 0 : Some(Err(_)) => *this.outcome = metrics::AttemptOutcome::Err,
472 0 : None => *this.outcome = metrics::AttemptOutcome::Ok,
473 : }
474 :
475 0 : Poll::Ready(res)
476 0 : }
477 :
478 0 : fn size_hint(&self) -> (usize, Option<usize>) {
479 0 : self.inner.size_hint()
480 0 : }
481 : }
482 :
483 : impl RemoteStorage for S3Bucket {
484 24 : async fn list(
485 24 : &self,
486 24 : prefix: Option<&RemotePath>,
487 24 : mode: ListingMode,
488 24 : max_keys: Option<NonZeroU32>,
489 24 : cancel: &CancellationToken,
490 24 : ) -> Result<Listing, DownloadError> {
491 0 : let kind = RequestKind::List;
492 0 : // s3 sdk wants i32
493 0 : let mut max_keys = max_keys.map(|mk| mk.get() as i32);
494 0 : let mut result = Listing::default();
495 0 :
496 0 : // get the passed prefix or if it is not set use prefix_in_bucket value
497 0 : let list_prefix = prefix
498 0 : .map(|p| self.relative_path_to_s3_object(p))
499 0 : .or_else(|| {
500 0 : self.prefix_in_bucket.clone().map(|mut s| {
501 0 : s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
502 0 : s
503 0 : })
504 0 : });
505 :
506 0 : let _permit = self.permit(kind, cancel).await?;
507 :
508 0 : let mut continuation_token = None;
509 :
510 : loop {
511 0 : let started_at = start_measuring_requests(kind);
512 0 :
513 0 : // min of two Options, returning Some if one is value and another is
514 0 : // None (None is smaller than anything, so plain min doesn't work).
515 0 : let request_max_keys = self
516 0 : .max_keys_per_list_response
517 0 : .into_iter()
518 0 : .chain(max_keys.into_iter())
519 0 : .min();
520 0 : let mut request = self
521 0 : .client
522 0 : .list_objects_v2()
523 0 : .bucket(self.bucket_name.clone())
524 0 : .set_prefix(list_prefix.clone())
525 0 : .set_continuation_token(continuation_token)
526 0 : .set_max_keys(request_max_keys);
527 0 :
528 0 : if let ListingMode::WithDelimiter = mode {
529 0 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
530 0 : }
531 :
532 0 : let request = request.send();
533 :
534 0 : let response = tokio::select! {
535 : res = request => res,
536 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
537 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
538 : };
539 :
540 0 : let response = response
541 0 : .context("Failed to list S3 prefixes")
542 0 : .map_err(DownloadError::Other);
543 0 :
544 0 : let started_at = ScopeGuard::into_inner(started_at);
545 0 :
546 0 : metrics::BUCKET_METRICS
547 0 : .req_seconds
548 0 : .observe_elapsed(kind, &response, started_at);
549 :
550 0 : let response = response?;
551 :
552 0 : let keys = response.contents();
553 0 : let empty = Vec::new();
554 0 : let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
555 0 :
556 0 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
557 :
558 0 : for object in keys {
559 0 : let object_path = object.key().expect("response does not contain a key");
560 0 : let remote_path = self.s3_object_to_relative_path(object_path);
561 0 : result.keys.push(remote_path);
562 0 : if let Some(mut mk) = max_keys {
563 0 : assert!(mk > 0);
564 0 : mk -= 1;
565 0 : if mk == 0 {
566 0 : return Ok(result); // limit reached
567 0 : }
568 0 : max_keys = Some(mk);
569 0 : }
570 : }
571 :
572 : // S3 gives us prefixes like "foo/", we return them like "foo"
573 0 : result.prefixes.extend(prefixes.iter().filter_map(|o| {
574 0 : Some(
575 0 : self.s3_object_to_relative_path(
576 0 : o.prefix()?
577 0 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
578 : ),
579 : )
580 0 : }));
581 :
582 0 : continuation_token = match response.next_continuation_token {
583 0 : Some(new_token) => Some(new_token),
584 0 : None => break,
585 0 : };
586 0 : }
587 0 :
588 0 : Ok(result)
589 0 : }
590 :
591 0 : async fn upload(
592 0 : &self,
593 0 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
594 0 : from_size_bytes: usize,
595 0 : to: &RemotePath,
596 0 : metadata: Option<StorageMetadata>,
597 0 : cancel: &CancellationToken,
598 0 : ) -> anyhow::Result<()> {
599 0 : let kind = RequestKind::Put;
600 0 : let _permit = self.permit(kind, cancel).await?;
601 :
602 0 : let started_at = start_measuring_requests(kind);
603 0 :
604 0 : let body = Body::wrap_stream(from);
605 0 : let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
606 :
607 0 : let upload = self
608 0 : .client
609 0 : .put_object()
610 0 : .bucket(self.bucket_name.clone())
611 0 : .key(self.relative_path_to_s3_object(to))
612 0 : .set_metadata(metadata.map(|m| m.0))
613 0 : .set_storage_class(self.upload_storage_class.clone())
614 0 : .content_length(from_size_bytes.try_into()?)
615 0 : .body(bytes_stream)
616 0 : .send();
617 0 :
618 0 : let upload = tokio::time::timeout(self.timeout, upload);
619 :
620 0 : let res = tokio::select! {
621 : res = upload => res,
622 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
623 : };
624 :
625 0 : if let Ok(inner) = &res {
626 0 : // do not incl. timeouts as errors in metrics but cancellations
627 0 : let started_at = ScopeGuard::into_inner(started_at);
628 0 : metrics::BUCKET_METRICS
629 0 : .req_seconds
630 0 : .observe_elapsed(kind, inner, started_at);
631 0 : }
632 :
633 0 : match res {
634 0 : Ok(Ok(_put)) => Ok(()),
635 0 : Ok(Err(sdk)) => Err(sdk.into()),
636 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
637 : }
638 0 : }
639 :
640 2 : async fn copy(
641 2 : &self,
642 2 : from: &RemotePath,
643 2 : to: &RemotePath,
644 2 : cancel: &CancellationToken,
645 2 : ) -> anyhow::Result<()> {
646 0 : let kind = RequestKind::Copy;
647 0 : let _permit = self.permit(kind, cancel).await?;
648 :
649 0 : let timeout = tokio::time::sleep(self.timeout);
650 0 :
651 0 : let started_at = start_measuring_requests(kind);
652 0 :
653 0 : // we need to specify bucket_name as a prefix
654 0 : let copy_source = format!(
655 0 : "{}/{}",
656 0 : self.bucket_name,
657 0 : self.relative_path_to_s3_object(from)
658 0 : );
659 0 :
660 0 : let op = self
661 0 : .client
662 0 : .copy_object()
663 0 : .bucket(self.bucket_name.clone())
664 0 : .key(self.relative_path_to_s3_object(to))
665 0 : .set_storage_class(self.upload_storage_class.clone())
666 0 : .copy_source(copy_source)
667 0 : .send();
668 :
669 0 : let res = tokio::select! {
670 : res = op => res,
671 : _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
672 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
673 : };
674 :
675 0 : let started_at = ScopeGuard::into_inner(started_at);
676 0 : metrics::BUCKET_METRICS
677 0 : .req_seconds
678 0 : .observe_elapsed(kind, &res, started_at);
679 0 :
680 0 : res?;
681 :
682 0 : Ok(())
683 0 : }
684 :
685 14 : async fn download(
686 14 : &self,
687 14 : from: &RemotePath,
688 14 : cancel: &CancellationToken,
689 14 : ) -> Result<Download, DownloadError> {
690 0 : // if prefix is not none then download file `prefix/from`
691 0 : // if prefix is none then download file `from`
692 0 : self.download_object(
693 0 : GetObjectRequest {
694 0 : bucket: self.bucket_name.clone(),
695 0 : key: self.relative_path_to_s3_object(from),
696 0 : range: None,
697 0 : },
698 0 : cancel,
699 0 : )
700 0 : .await
701 0 : }
702 :
703 10 : async fn download_byte_range(
704 10 : &self,
705 10 : from: &RemotePath,
706 10 : start_inclusive: u64,
707 10 : end_exclusive: Option<u64>,
708 10 : cancel: &CancellationToken,
709 10 : ) -> Result<Download, DownloadError> {
710 0 : // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
711 0 : // and needs both ends to be exclusive
712 0 : let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
713 0 : let range = Some(match end_inclusive {
714 0 : Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
715 0 : None => format!("bytes={start_inclusive}-"),
716 : });
717 :
718 0 : self.download_object(
719 0 : GetObjectRequest {
720 0 : bucket: self.bucket_name.clone(),
721 0 : key: self.relative_path_to_s3_object(from),
722 0 : range,
723 0 : },
724 0 : cancel,
725 0 : )
726 0 : .await
727 0 : }
728 :
729 102 : async fn delete_objects<'a>(
730 102 : &self,
731 102 : paths: &'a [RemotePath],
732 102 : cancel: &CancellationToken,
733 102 : ) -> anyhow::Result<()> {
734 0 : let kind = RequestKind::Delete;
735 0 : let permit = self.permit(kind, cancel).await?;
736 0 : let mut delete_objects = Vec::with_capacity(paths.len());
737 0 : for path in paths {
738 0 : let obj_id = ObjectIdentifier::builder()
739 0 : .set_key(Some(self.relative_path_to_s3_object(path)))
740 0 : .build()
741 0 : .context("convert path to oid")?;
742 0 : delete_objects.push(obj_id);
743 : }
744 :
745 0 : self.delete_oids(&permit, &delete_objects, cancel).await
746 0 : }
747 :
748 90 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
749 0 : let paths = std::array::from_ref(path);
750 0 : self.delete_objects(paths, cancel).await
751 0 : }
752 :
753 6 : async fn time_travel_recover(
754 6 : &self,
755 6 : prefix: Option<&RemotePath>,
756 6 : timestamp: SystemTime,
757 6 : done_if_after: SystemTime,
758 6 : cancel: &CancellationToken,
759 6 : ) -> Result<(), TimeTravelError> {
760 0 : let kind = RequestKind::TimeTravel;
761 0 : let permit = self.permit(kind, cancel).await?;
762 :
763 0 : let timestamp = DateTime::from(timestamp);
764 0 : let done_if_after = DateTime::from(done_if_after);
765 0 :
766 0 : tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
767 :
768 : // get the passed prefix or if it is not set use prefix_in_bucket value
769 0 : let prefix = prefix
770 0 : .map(|p| self.relative_path_to_s3_object(p))
771 0 : .or_else(|| self.prefix_in_bucket.clone());
772 0 :
773 0 : let warn_threshold = 3;
774 0 : let max_retries = 10;
775 0 : let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
776 :
777 0 : let mut key_marker = None;
778 0 : let mut version_id_marker = None;
779 0 : let mut versions_and_deletes = Vec::new();
780 :
781 : loop {
782 0 : let response = backoff::retry(
783 0 : || async {
784 0 : let op = self
785 0 : .client
786 0 : .list_object_versions()
787 0 : .bucket(self.bucket_name.clone())
788 0 : .set_prefix(prefix.clone())
789 0 : .set_key_marker(key_marker.clone())
790 0 : .set_version_id_marker(version_id_marker.clone())
791 0 : .send();
792 0 :
793 0 : tokio::select! {
794 0 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
795 0 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
796 0 : }
797 0 : },
798 0 : is_permanent,
799 0 : warn_threshold,
800 0 : max_retries,
801 0 : "listing object versions for time_travel_recover",
802 0 : cancel,
803 0 : )
804 0 : .await
805 0 : .ok_or_else(|| TimeTravelError::Cancelled)
806 0 : .and_then(|x| x)?;
807 :
808 0 : tracing::trace!(
809 0 : " Got List response version_id_marker={:?}, key_marker={:?}",
810 : response.version_id_marker,
811 : response.key_marker
812 : );
813 0 : let versions = response
814 0 : .versions
815 0 : .unwrap_or_default()
816 0 : .into_iter()
817 0 : .map(VerOrDelete::from_version);
818 0 : let deletes = response
819 0 : .delete_markers
820 0 : .unwrap_or_default()
821 0 : .into_iter()
822 0 : .map(VerOrDelete::from_delete_marker);
823 0 : itertools::process_results(versions.chain(deletes), |n_vds| {
824 0 : versions_and_deletes.extend(n_vds)
825 0 : })
826 0 : .map_err(TimeTravelError::Other)?;
827 12 : fn none_if_empty(v: Option<String>) -> Option<String> {
828 12 : v.filter(|v| !v.is_empty())
829 12 : }
830 0 : version_id_marker = none_if_empty(response.next_version_id_marker);
831 0 : key_marker = none_if_empty(response.next_key_marker);
832 0 : if version_id_marker.is_none() {
833 : // The final response is not supposed to be truncated
834 0 : if response.is_truncated.unwrap_or_default() {
835 0 : return Err(TimeTravelError::Other(anyhow::anyhow!(
836 0 : "Received truncated ListObjectVersions response for prefix={prefix:?}"
837 0 : )));
838 0 : }
839 0 : break;
840 0 : }
841 0 : // Limit the number of versions deletions, mostly so that we don't
842 0 : // keep requesting forever if the list is too long, as we'd put the
843 0 : // list in RAM.
844 0 : // Building a list of 100k entries that reaches the limit roughly takes
845 0 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
846 0 : const COMPLEXITY_LIMIT: usize = 100_000;
847 0 : if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
848 0 : return Err(TimeTravelError::TooManyVersions);
849 0 : }
850 : }
851 :
852 0 : tracing::info!(
853 0 : "Built list for time travel with {} versions and deletions",
854 0 : versions_and_deletes.len()
855 : );
856 :
857 : // Work on the list of references instead of the objects directly,
858 : // otherwise we get lifetime errors in the sort_by_key call below.
859 0 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
860 0 :
861 0 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
862 0 :
863 0 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
864 :
865 0 : for vd in &versions_and_deletes {
866 : let VerOrDelete {
867 0 : version_id, key, ..
868 0 : } = &vd;
869 0 : if version_id == "null" {
870 0 : return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
871 0 : indicating either disabled versioning, or legacy objects with null version id values")));
872 0 : }
873 0 : tracing::trace!(
874 0 : "Parsing version key={key} version_id={version_id} kind={:?}",
875 : vd.kind
876 : );
877 :
878 0 : vds_for_key.entry(key).or_default().push(vd);
879 : }
880 0 : for (key, versions) in vds_for_key {
881 0 : let last_vd = versions.last().unwrap();
882 0 : if last_vd.last_modified > done_if_after {
883 0 : tracing::trace!("Key {key} has version later than done_if_after, skipping");
884 0 : continue;
885 0 : }
886 : // the version we want to restore to.
887 0 : let version_to_restore_to =
888 0 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
889 0 : Ok(v) => v,
890 0 : Err(e) => e,
891 : };
892 0 : if version_to_restore_to == versions.len() {
893 0 : tracing::trace!("Key {key} has no changes since timestamp, skipping");
894 0 : continue;
895 0 : }
896 0 : let mut do_delete = false;
897 0 : if version_to_restore_to == 0 {
898 : // All versions more recent, so the key didn't exist at the specified time point.
899 0 : tracing::trace!(
900 0 : "All {} versions more recent for {key}, deleting",
901 0 : versions.len()
902 : );
903 0 : do_delete = true;
904 : } else {
905 0 : match &versions[version_to_restore_to - 1] {
906 : VerOrDelete {
907 : kind: VerOrDeleteKind::Version,
908 0 : version_id,
909 0 : ..
910 0 : } => {
911 0 : tracing::trace!("Copying old version {version_id} for {key}...");
912 : // Restore the state to the last version by copying
913 0 : let source_id =
914 0 : format!("{}/{key}?versionId={version_id}", self.bucket_name);
915 0 :
916 0 : backoff::retry(
917 0 : || async {
918 0 : let op = self
919 0 : .client
920 0 : .copy_object()
921 0 : .bucket(self.bucket_name.clone())
922 0 : .key(key)
923 0 : .set_storage_class(self.upload_storage_class.clone())
924 0 : .copy_source(&source_id)
925 0 : .send();
926 0 :
927 0 : tokio::select! {
928 0 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
929 0 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
930 0 : }
931 0 : },
932 0 : is_permanent,
933 0 : warn_threshold,
934 0 : max_retries,
935 0 : "copying object version for time_travel_recover",
936 0 : cancel,
937 0 : )
938 0 : .await
939 0 : .ok_or_else(|| TimeTravelError::Cancelled)
940 0 : .and_then(|x| x)?;
941 0 : tracing::info!(%version_id, %key, "Copied old version in S3");
942 : }
943 : VerOrDelete {
944 : kind: VerOrDeleteKind::DeleteMarker,
945 : ..
946 0 : } => {
947 0 : do_delete = true;
948 0 : }
949 : }
950 : };
951 0 : if do_delete {
952 0 : if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
953 : // Key has since been deleted (but there was some history), no need to do anything
954 0 : tracing::trace!("Key {key} already deleted, skipping.");
955 : } else {
956 0 : tracing::trace!("Deleting {key}...");
957 :
958 0 : let oid = ObjectIdentifier::builder()
959 0 : .key(key.to_owned())
960 0 : .build()
961 0 : .map_err(|e| TimeTravelError::Other(e.into()))?;
962 :
963 0 : self.delete_oids(&permit, &[oid], cancel)
964 0 : .await
965 0 : .map_err(|e| {
966 0 : // delete_oid0 will use TimeoutOrCancel
967 0 : if TimeoutOrCancel::caused_by_cancel(&e) {
968 0 : TimeTravelError::Cancelled
969 : } else {
970 0 : TimeTravelError::Other(e)
971 : }
972 0 : })?;
973 : }
974 0 : }
975 : }
976 0 : Ok(())
977 0 : }
978 : }
979 :
980 : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
981 265 : fn start_counting_cancelled_wait(
982 265 : kind: RequestKind,
983 265 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
984 265 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
985 0 : metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
986 265 : })
987 265 : }
988 :
989 : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
990 271 : fn start_measuring_requests(
991 271 : kind: RequestKind,
992 271 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
993 271 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
994 0 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
995 0 : kind,
996 0 : AttemptOutcome::Cancelled,
997 0 : started_at,
998 0 : )
999 271 : })
1000 271 : }
1001 :
1002 : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
1003 : struct VerOrDelete {
1004 : kind: VerOrDeleteKind,
1005 : last_modified: DateTime,
1006 : version_id: String,
1007 : key: String,
1008 : }
1009 :
1010 : #[derive(Debug)]
1011 : enum VerOrDeleteKind {
1012 : Version,
1013 : DeleteMarker,
1014 : }
1015 :
1016 : impl VerOrDelete {
1017 36 : fn with_kind(
1018 36 : kind: VerOrDeleteKind,
1019 36 : last_modified: Option<DateTime>,
1020 36 : version_id: Option<String>,
1021 36 : key: Option<String>,
1022 36 : ) -> anyhow::Result<Self> {
1023 36 : let lvk = (last_modified, version_id, key);
1024 36 : let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
1025 0 : anyhow::bail!(
1026 0 : "One (or more) of last_modified, key, and id is None. \
1027 0 : Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
1028 0 : lvk.0,
1029 0 : lvk.1,
1030 0 : lvk.2,
1031 0 : );
1032 : };
1033 36 : Ok(Self {
1034 36 : kind,
1035 36 : last_modified,
1036 36 : version_id,
1037 36 : key,
1038 36 : })
1039 36 : }
1040 28 : fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
1041 28 : Self::with_kind(
1042 28 : VerOrDeleteKind::Version,
1043 28 : v.last_modified,
1044 28 : v.version_id,
1045 28 : v.key,
1046 28 : )
1047 28 : }
1048 8 : fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
1049 8 : Self::with_kind(
1050 8 : VerOrDeleteKind::DeleteMarker,
1051 8 : v.last_modified,
1052 8 : v.version_id,
1053 8 : v.key,
1054 8 : )
1055 8 : }
1056 : }
1057 :
1058 : #[cfg(test)]
1059 : mod tests {
1060 : use camino::Utf8Path;
1061 : use std::num::NonZeroUsize;
1062 :
1063 : use crate::{RemotePath, S3Bucket, S3Config};
1064 :
1065 : #[test]
1066 2 : fn relative_path() {
1067 2 : let all_paths = ["", "some/path", "some/path/"];
1068 2 : let all_paths: Vec<RemotePath> = all_paths
1069 2 : .iter()
1070 6 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
1071 2 : .collect();
1072 2 : let prefixes = [
1073 2 : None,
1074 2 : Some(""),
1075 2 : Some("test/prefix"),
1076 2 : Some("test/prefix/"),
1077 2 : Some("/test/prefix/"),
1078 2 : ];
1079 2 : let expected_outputs = [
1080 2 : vec!["", "some/path", "some/path/"],
1081 2 : vec!["/", "/some/path", "/some/path/"],
1082 2 : vec![
1083 2 : "test/prefix/",
1084 2 : "test/prefix/some/path",
1085 2 : "test/prefix/some/path/",
1086 2 : ],
1087 2 : vec![
1088 2 : "test/prefix/",
1089 2 : "test/prefix/some/path",
1090 2 : "test/prefix/some/path/",
1091 2 : ],
1092 2 : vec![
1093 2 : "test/prefix/",
1094 2 : "test/prefix/some/path",
1095 2 : "test/prefix/some/path/",
1096 2 : ],
1097 2 : ];
1098 :
1099 10 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
1100 10 : let config = S3Config {
1101 10 : bucket_name: "bucket".to_owned(),
1102 10 : bucket_region: "region".to_owned(),
1103 10 : prefix_in_bucket: prefix.map(str::to_string),
1104 10 : endpoint: None,
1105 10 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
1106 10 : max_keys_per_list_response: Some(5),
1107 10 : upload_storage_class: None,
1108 10 : };
1109 10 : let storage =
1110 10 : S3Bucket::new(&config, std::time::Duration::ZERO).expect("remote storage init");
1111 30 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
1112 30 : let result = storage.relative_path_to_s3_object(test_path);
1113 30 : let expected = expected_outputs[prefix_idx][test_path_idx];
1114 30 : assert_eq!(result, expected);
1115 : }
1116 : }
1117 2 : }
1118 : }
|