Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::{
8 : borrow::Cow,
9 : collections::HashMap,
10 : num::NonZeroU32,
11 : pin::Pin,
12 : sync::Arc,
13 : task::{Context, Poll},
14 : time::{Duration, SystemTime},
15 : };
16 :
17 : use anyhow::{anyhow, Context as _};
18 : use aws_config::{
19 : default_provider::credentials::DefaultCredentialsChain,
20 : retry::{RetryConfigBuilder, RetryMode},
21 : BehaviorVersion,
22 : };
23 : use aws_sdk_s3::{
24 : config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
25 : error::SdkError,
26 : operation::get_object::GetObjectError,
27 : types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
28 : Client,
29 : };
30 : use aws_smithy_async::rt::sleep::TokioSleep;
31 :
32 : use aws_smithy_types::{body::SdkBody, DateTime};
33 : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
34 : use bytes::Bytes;
35 : use futures::stream::Stream;
36 : use hyper::Body;
37 : use scopeguard::ScopeGuard;
38 : use tokio_util::sync::CancellationToken;
39 : use utils::backoff;
40 :
41 : use super::StorageMetadata;
42 : use crate::{
43 : config::S3Config,
44 : error::Cancelled,
45 : metrics::{start_counting_cancelled_wait, start_measuring_requests},
46 : support::PermitCarrying,
47 : ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath,
48 : RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
49 : REMOTE_STORAGE_PREFIX_SEPARATOR,
50 : };
51 :
52 : use crate::metrics::AttemptOutcome;
53 : pub(super) use crate::metrics::RequestKind;
54 :
55 : /// AWS S3 storage.
56 : pub struct S3Bucket {
57 : client: Client,
58 : bucket_name: String,
59 : prefix_in_bucket: Option<String>,
60 : max_keys_per_list_response: Option<i32>,
61 : upload_storage_class: Option<StorageClass>,
62 : concurrency_limiter: ConcurrencyLimiter,
63 : // Per-request timeout. Accessible for tests.
64 : pub timeout: Duration,
65 : }
66 :
67 : struct GetObjectRequest {
68 : bucket: String,
69 : key: String,
70 : range: Option<String>,
71 : }
72 : impl S3Bucket {
73 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
74 38 : pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
75 38 : tracing::debug!(
76 0 : "Creating s3 remote storage for S3 bucket {}",
77 : remote_storage_config.bucket_name
78 : );
79 :
80 38 : let region = Region::new(remote_storage_config.bucket_region.clone());
81 38 : let region_opt = Some(region.clone());
82 :
83 : // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
84 : // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
85 : // Incomplete list of auth methods used by this:
86 : // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
87 : // * "AWS_PROFILE" / `aws sso login --profile <profile>`
88 : // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
89 : // * http (ECS/EKS) container credentials
90 : // * imds v2
91 38 : let credentials_provider = DefaultCredentialsChain::builder()
92 38 : .region(region)
93 38 : .build()
94 0 : .await;
95 :
96 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
97 38 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
98 38 :
99 38 : let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
100 38 : #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
101 38 : BehaviorVersion::v2023_11_09(),
102 38 : )
103 38 : .region(region_opt)
104 38 : .identity_cache(IdentityCache::lazy().build())
105 38 : .credentials_provider(credentials_provider)
106 38 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
107 38 :
108 38 : let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
109 38 : s.spawn(|| {
110 38 : // TODO: make this function async.
111 38 : tokio::runtime::Builder::new_current_thread()
112 38 : .enable_all()
113 38 : .build()
114 38 : .unwrap()
115 38 : .block_on(sdk_config_loader.load())
116 38 : })
117 38 : .join()
118 38 : .unwrap()
119 38 : });
120 38 :
121 38 : let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
122 :
123 : // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
124 : // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
125 38 : if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
126 0 : s3_config_builder = s3_config_builder
127 0 : .endpoint_url(custom_endpoint)
128 0 : .force_path_style(true);
129 38 : }
130 :
131 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
132 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
133 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
134 38 : let mut retry_config = RetryConfigBuilder::new();
135 38 : retry_config
136 38 : .set_max_attempts(Some(1))
137 38 : .set_mode(Some(RetryMode::Adaptive));
138 38 : s3_config_builder = s3_config_builder.retry_config(retry_config.build());
139 38 :
140 38 : let s3_config = s3_config_builder.build();
141 38 : let client = aws_sdk_s3::Client::from_conf(s3_config);
142 38 :
143 38 : let prefix_in_bucket = remote_storage_config
144 38 : .prefix_in_bucket
145 38 : .as_deref()
146 38 : .map(|prefix| {
147 34 : let mut prefix = prefix;
148 38 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
149 4 : prefix = &prefix[1..]
150 : }
151 :
152 34 : let mut prefix = prefix.to_string();
153 60 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
154 26 : prefix.pop();
155 26 : }
156 34 : prefix
157 38 : });
158 38 :
159 38 : Ok(Self {
160 38 : client,
161 38 : bucket_name: remote_storage_config.bucket_name.clone(),
162 38 : max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
163 38 : prefix_in_bucket,
164 38 : concurrency_limiter: ConcurrencyLimiter::new(
165 38 : remote_storage_config.concurrency_limit.get(),
166 38 : ),
167 38 : upload_storage_class: remote_storage_config.upload_storage_class.clone(),
168 38 : timeout,
169 38 : })
170 38 : }
171 :
172 168 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
173 168 : let relative_path =
174 168 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
175 168 : Some(stripped) => stripped,
176 : // we rely on AWS to return properly prefixed paths
177 : // for requests with a certain prefix
178 0 : None => panic!(
179 0 : "Key {} does not start with bucket prefix {:?}",
180 0 : key, self.prefix_in_bucket
181 0 : ),
182 : };
183 168 : RemotePath(
184 168 : relative_path
185 168 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
186 168 : .collect(),
187 168 : )
188 168 : }
189 :
190 313 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
191 313 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
192 313 : let path_string = path.get_path().as_str();
193 313 : match &self.prefix_in_bucket {
194 301 : Some(prefix) => prefix.clone() + "/" + path_string,
195 12 : None => path_string.to_string(),
196 : }
197 313 : }
198 :
199 244 : async fn permit(
200 244 : &self,
201 244 : kind: RequestKind,
202 244 : cancel: &CancellationToken,
203 244 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
204 244 : let started_at = start_counting_cancelled_wait(kind);
205 244 : let acquire = self.concurrency_limiter.acquire(kind);
206 :
207 244 : let permit = tokio::select! {
208 : permit = acquire => permit.expect("semaphore is never closed"),
209 : _ = cancel.cancelled() => return Err(Cancelled),
210 : };
211 :
212 244 : let started_at = ScopeGuard::into_inner(started_at);
213 244 : crate::metrics::BUCKET_METRICS
214 244 : .wait_seconds
215 244 : .observe_elapsed(kind, started_at);
216 244 :
217 244 : Ok(permit)
218 244 : }
219 :
220 25 : async fn owned_permit(
221 25 : &self,
222 25 : kind: RequestKind,
223 25 : cancel: &CancellationToken,
224 25 : ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
225 25 : let started_at = start_counting_cancelled_wait(kind);
226 25 : let acquire = self.concurrency_limiter.acquire_owned(kind);
227 :
228 25 : let permit = tokio::select! {
229 : permit = acquire => permit.expect("semaphore is never closed"),
230 : _ = cancel.cancelled() => return Err(Cancelled),
231 : };
232 :
233 25 : let started_at = ScopeGuard::into_inner(started_at);
234 25 : crate::metrics::BUCKET_METRICS
235 25 : .wait_seconds
236 25 : .observe_elapsed(kind, started_at);
237 25 : Ok(permit)
238 25 : }
239 :
240 25 : async fn download_object(
241 25 : &self,
242 25 : request: GetObjectRequest,
243 25 : cancel: &CancellationToken,
244 25 : ) -> Result<Download, DownloadError> {
245 25 : let kind = RequestKind::Get;
246 :
247 25 : let permit = self.owned_permit(kind, cancel).await?;
248 :
249 25 : let started_at = start_measuring_requests(kind);
250 25 :
251 25 : let get_object = self
252 25 : .client
253 25 : .get_object()
254 25 : .bucket(request.bucket)
255 25 : .key(request.key)
256 25 : .set_range(request.range)
257 25 : .send();
258 :
259 25 : let get_object = tokio::select! {
260 : res = get_object => res,
261 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
262 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
263 : };
264 :
265 25 : let started_at = ScopeGuard::into_inner(started_at);
266 :
267 24 : let object_output = match get_object {
268 24 : Ok(object_output) => object_output,
269 0 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
270 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
271 : // an error: we expect to sometimes fetch an object and find it missing,
272 : // e.g. when probing for timeline indices.
273 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
274 0 : kind,
275 0 : AttemptOutcome::Ok,
276 0 : started_at,
277 0 : );
278 0 : return Err(DownloadError::NotFound);
279 : }
280 1 : Err(e) => {
281 1 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
282 1 : kind,
283 1 : AttemptOutcome::Err,
284 1 : started_at,
285 1 : );
286 1 :
287 1 : return Err(DownloadError::Other(
288 1 : anyhow::Error::new(e).context("download s3 object"),
289 1 : ));
290 : }
291 : };
292 :
293 : // even if we would have no timeout left, continue anyways. the caller can decide to ignore
294 : // the errors considering timeouts and cancellation.
295 24 : let remaining = self.timeout.saturating_sub(started_at.elapsed());
296 24 :
297 24 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
298 24 : let etag = object_output
299 24 : .e_tag
300 24 : .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
301 24 : .into();
302 24 : let last_modified = object_output
303 24 : .last_modified
304 24 : .ok_or(DownloadError::Other(anyhow::anyhow!(
305 24 : "Missing LastModified header"
306 24 : )))?
307 24 : .try_into()
308 24 : .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
309 :
310 24 : let body = object_output.body;
311 24 : let body = ByteStreamAsStream::from(body);
312 24 : let body = PermitCarrying::new(permit, body);
313 24 : let body = TimedDownload::new(started_at, body);
314 24 :
315 24 : let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
316 24 : let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
317 24 :
318 24 : Ok(Download {
319 24 : metadata,
320 24 : etag,
321 24 : last_modified,
322 24 : download_stream: Box::pin(body),
323 24 : })
324 25 : }
325 :
326 106 : async fn delete_oids(
327 106 : &self,
328 106 : _permit: &tokio::sync::SemaphorePermit<'_>,
329 106 : delete_objects: &[ObjectIdentifier],
330 106 : cancel: &CancellationToken,
331 106 : ) -> anyhow::Result<()> {
332 106 : let kind = RequestKind::Delete;
333 106 : let mut cancel = std::pin::pin!(cancel.cancelled());
334 :
335 106 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
336 106 : let started_at = start_measuring_requests(kind);
337 :
338 106 : let req = self
339 106 : .client
340 106 : .delete_objects()
341 106 : .bucket(self.bucket_name.clone())
342 106 : .delete(
343 106 : Delete::builder()
344 106 : .set_objects(Some(chunk.to_vec()))
345 106 : .build()
346 106 : .context("build request")?,
347 : )
348 106 : .send();
349 :
350 106 : let resp = tokio::select! {
351 : resp = req => resp,
352 : _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
353 : _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
354 : };
355 :
356 106 : let started_at = ScopeGuard::into_inner(started_at);
357 106 : crate::metrics::BUCKET_METRICS
358 106 : .req_seconds
359 106 : .observe_elapsed(kind, &resp, started_at);
360 :
361 106 : let resp = resp.context("request deletion")?;
362 106 : crate::metrics::BUCKET_METRICS
363 106 : .deleted_objects_total
364 106 : .inc_by(chunk.len() as u64);
365 :
366 106 : if let Some(errors) = resp.errors {
367 : // Log a bounded number of the errors within the response:
368 : // these requests can carry 1000 keys so logging each one
369 : // would be too verbose, especially as errors may lead us
370 : // to retry repeatedly.
371 : const LOG_UP_TO_N_ERRORS: usize = 10;
372 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
373 0 : tracing::warn!(
374 0 : "DeleteObjects key {} failed: {}: {}",
375 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
376 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
377 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
378 : );
379 : }
380 :
381 0 : return Err(anyhow::anyhow!(
382 0 : "Failed to delete {}/{} objects",
383 0 : errors.len(),
384 0 : chunk.len(),
385 0 : ));
386 106 : }
387 : }
388 106 : Ok(())
389 106 : }
390 :
391 0 : pub fn bucket_name(&self) -> &str {
392 0 : &self.bucket_name
393 0 : }
394 : }
395 :
396 : pin_project_lite::pin_project! {
397 : struct ByteStreamAsStream {
398 : #[pin]
399 : inner: aws_smithy_types::byte_stream::ByteStream
400 : }
401 : }
402 :
403 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
404 24 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
405 24 : ByteStreamAsStream { inner }
406 24 : }
407 : }
408 :
409 : impl Stream for ByteStreamAsStream {
410 : type Item = std::io::Result<Bytes>;
411 :
412 48 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
413 48 : // this does the std::io::ErrorKind::Other conversion
414 48 : self.project().inner.poll_next(cx).map_err(|x| x.into())
415 48 : }
416 :
417 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
418 : // sense and Stream::size_hint does not really
419 : }
420 :
421 : pin_project_lite::pin_project! {
422 : /// Times and tracks the outcome of the request.
423 : struct TimedDownload<S> {
424 : started_at: std::time::Instant,
425 : outcome: AttemptOutcome,
426 : #[pin]
427 : inner: S
428 : }
429 :
430 : impl<S> PinnedDrop for TimedDownload<S> {
431 : fn drop(mut this: Pin<&mut Self>) {
432 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
433 : }
434 : }
435 : }
436 :
437 : impl<S> TimedDownload<S> {
438 24 : fn new(started_at: std::time::Instant, inner: S) -> Self {
439 24 : TimedDownload {
440 24 : started_at,
441 24 : outcome: AttemptOutcome::Cancelled,
442 24 : inner,
443 24 : }
444 24 : }
445 : }
446 :
447 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
448 : type Item = <S as Stream>::Item;
449 :
450 48 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
451 48 : use std::task::ready;
452 48 :
453 48 : let this = self.project();
454 :
455 48 : let res = ready!(this.inner.poll_next(cx));
456 22 : match &res {
457 22 : Some(Ok(_)) => {}
458 0 : Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
459 18 : None => *this.outcome = AttemptOutcome::Ok,
460 : }
461 :
462 40 : Poll::Ready(res)
463 48 : }
464 :
465 0 : fn size_hint(&self) -> (usize, Option<usize>) {
466 0 : self.inner.size_hint()
467 0 : }
468 : }
469 :
470 : impl RemoteStorage for S3Bucket {
471 26 : fn list_streaming(
472 26 : &self,
473 26 : prefix: Option<&RemotePath>,
474 26 : mode: ListingMode,
475 26 : max_keys: Option<NonZeroU32>,
476 26 : cancel: &CancellationToken,
477 26 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
478 26 : let kind = RequestKind::List;
479 26 : // s3 sdk wants i32
480 26 : let mut max_keys = max_keys.map(|mk| mk.get() as i32);
481 26 :
482 26 : // get the passed prefix or if it is not set use prefix_in_bucket value
483 26 : let list_prefix = prefix
484 26 : .map(|p| self.relative_path_to_s3_object(p))
485 26 : .or_else(|| {
486 20 : self.prefix_in_bucket.clone().map(|mut s| {
487 20 : s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
488 20 : s
489 20 : })
490 26 : });
491 :
492 : async_stream::stream! {
493 : let _permit = self.permit(kind, cancel).await?;
494 :
495 : let mut continuation_token = None;
496 : 'outer: loop {
497 : let started_at = start_measuring_requests(kind);
498 :
499 : // min of two Options, returning Some if one is value and another is
500 : // None (None is smaller than anything, so plain min doesn't work).
501 : let request_max_keys = self
502 : .max_keys_per_list_response
503 : .into_iter()
504 : .chain(max_keys.into_iter())
505 : .min();
506 : let mut request = self
507 : .client
508 : .list_objects_v2()
509 : .bucket(self.bucket_name.clone())
510 : .set_prefix(list_prefix.clone())
511 : .set_continuation_token(continuation_token.clone())
512 : .set_max_keys(request_max_keys);
513 :
514 : if let ListingMode::WithDelimiter = mode {
515 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
516 : }
517 :
518 : let request = request.send();
519 :
520 : let response = tokio::select! {
521 : res = request => Ok(res),
522 : _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
523 : _ = cancel.cancelled() => Err(DownloadError::Cancelled),
524 : }?;
525 :
526 : let response = response
527 : .context("Failed to list S3 prefixes")
528 : .map_err(DownloadError::Other);
529 :
530 : let started_at = ScopeGuard::into_inner(started_at);
531 :
532 : crate::metrics::BUCKET_METRICS
533 : .req_seconds
534 : .observe_elapsed(kind, &response, started_at);
535 :
536 : let response = match response {
537 : Ok(response) => response,
538 : Err(e) => {
539 : // The error is potentially retryable, so we must rewind the loop after yielding.
540 : yield Err(e);
541 : continue 'outer;
542 : },
543 : };
544 :
545 : let keys = response.contents();
546 : let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
547 :
548 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
549 : let mut result = Listing::default();
550 :
551 : for object in keys {
552 : let key = object.key().expect("response does not contain a key");
553 : let key = self.s3_object_to_relative_path(key);
554 :
555 : let last_modified = match object.last_modified.map(SystemTime::try_from) {
556 : Some(Ok(t)) => t,
557 : Some(Err(_)) => {
558 : tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
559 : object.last_modified, key
560 : );
561 : SystemTime::now()
562 : },
563 : None => {
564 : SystemTime::now()
565 : }
566 : };
567 :
568 : let size = object.size.unwrap_or(0) as u64;
569 :
570 : result.keys.push(ListingObject{
571 : key,
572 : last_modified,
573 : size,
574 : });
575 : if let Some(mut mk) = max_keys {
576 : assert!(mk > 0);
577 : mk -= 1;
578 : if mk == 0 {
579 : // limit reached
580 : yield Ok(result);
581 : break 'outer;
582 : }
583 : max_keys = Some(mk);
584 : }
585 : }
586 :
587 : // S3 gives us prefixes like "foo/", we return them like "foo"
588 88 : result.prefixes.extend(prefixes.iter().filter_map(|o| {
589 88 : Some(
590 88 : self.s3_object_to_relative_path(
591 88 : o.prefix()?
592 88 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
593 : ),
594 : )
595 88 : }));
596 :
597 : yield Ok(result);
598 :
599 : continuation_token = match response.next_continuation_token {
600 : Some(new_token) => Some(new_token),
601 : None => break,
602 : };
603 : }
604 : }
605 26 : }
606 :
607 108 : async fn upload(
608 108 : &self,
609 108 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
610 108 : from_size_bytes: usize,
611 108 : to: &RemotePath,
612 108 : metadata: Option<StorageMetadata>,
613 108 : cancel: &CancellationToken,
614 108 : ) -> anyhow::Result<()> {
615 108 : let kind = RequestKind::Put;
616 108 : let _permit = self.permit(kind, cancel).await?;
617 :
618 108 : let started_at = start_measuring_requests(kind);
619 108 :
620 108 : let body = Body::wrap_stream(from);
621 108 : let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
622 :
623 108 : let upload = self
624 108 : .client
625 108 : .put_object()
626 108 : .bucket(self.bucket_name.clone())
627 108 : .key(self.relative_path_to_s3_object(to))
628 108 : .set_metadata(metadata.map(|m| m.0))
629 108 : .set_storage_class(self.upload_storage_class.clone())
630 108 : .content_length(from_size_bytes.try_into()?)
631 108 : .body(bytes_stream)
632 108 : .send();
633 108 :
634 108 : let upload = tokio::time::timeout(self.timeout, upload);
635 :
636 108 : let res = tokio::select! {
637 : res = upload => res,
638 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
639 : };
640 :
641 108 : if let Ok(inner) = &res {
642 108 : // do not incl. timeouts as errors in metrics but cancellations
643 108 : let started_at = ScopeGuard::into_inner(started_at);
644 108 : crate::metrics::BUCKET_METRICS
645 108 : .req_seconds
646 108 : .observe_elapsed(kind, inner, started_at);
647 108 : }
648 :
649 108 : match res {
650 106 : Ok(Ok(_put)) => Ok(()),
651 2 : Ok(Err(sdk)) => Err(sdk.into()),
652 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
653 : }
654 108 : }
655 :
656 2 : async fn copy(
657 2 : &self,
658 2 : from: &RemotePath,
659 2 : to: &RemotePath,
660 2 : cancel: &CancellationToken,
661 2 : ) -> anyhow::Result<()> {
662 2 : let kind = RequestKind::Copy;
663 2 : let _permit = self.permit(kind, cancel).await?;
664 :
665 2 : let timeout = tokio::time::sleep(self.timeout);
666 2 :
667 2 : let started_at = start_measuring_requests(kind);
668 2 :
669 2 : // we need to specify bucket_name as a prefix
670 2 : let copy_source = format!(
671 2 : "{}/{}",
672 2 : self.bucket_name,
673 2 : self.relative_path_to_s3_object(from)
674 2 : );
675 2 :
676 2 : let op = self
677 2 : .client
678 2 : .copy_object()
679 2 : .bucket(self.bucket_name.clone())
680 2 : .key(self.relative_path_to_s3_object(to))
681 2 : .set_storage_class(self.upload_storage_class.clone())
682 2 : .copy_source(copy_source)
683 2 : .send();
684 :
685 2 : let res = tokio::select! {
686 : res = op => res,
687 : _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
688 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
689 : };
690 :
691 2 : let started_at = ScopeGuard::into_inner(started_at);
692 2 : crate::metrics::BUCKET_METRICS
693 2 : .req_seconds
694 2 : .observe_elapsed(kind, &res, started_at);
695 2 :
696 2 : res?;
697 :
698 2 : Ok(())
699 2 : }
700 :
701 15 : async fn download(
702 15 : &self,
703 15 : from: &RemotePath,
704 15 : cancel: &CancellationToken,
705 15 : ) -> Result<Download, DownloadError> {
706 15 : // if prefix is not none then download file `prefix/from`
707 15 : // if prefix is none then download file `from`
708 15 : self.download_object(
709 15 : GetObjectRequest {
710 15 : bucket: self.bucket_name.clone(),
711 15 : key: self.relative_path_to_s3_object(from),
712 15 : range: None,
713 15 : },
714 15 : cancel,
715 15 : )
716 19 : .await
717 15 : }
718 :
719 10 : async fn download_byte_range(
720 10 : &self,
721 10 : from: &RemotePath,
722 10 : start_inclusive: u64,
723 10 : end_exclusive: Option<u64>,
724 10 : cancel: &CancellationToken,
725 10 : ) -> Result<Download, DownloadError> {
726 10 : // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
727 10 : // and needs both ends to be exclusive
728 10 : let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
729 10 : let range = Some(match end_inclusive {
730 6 : Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
731 4 : None => format!("bytes={start_inclusive}-"),
732 : });
733 :
734 10 : self.download_object(
735 10 : GetObjectRequest {
736 10 : bucket: self.bucket_name.clone(),
737 10 : key: self.relative_path_to_s3_object(from),
738 10 : range,
739 10 : },
740 10 : cancel,
741 10 : )
742 10 : .await
743 10 : }
744 :
745 102 : async fn delete_objects<'a>(
746 102 : &self,
747 102 : paths: &'a [RemotePath],
748 102 : cancel: &CancellationToken,
749 102 : ) -> anyhow::Result<()> {
750 102 : let kind = RequestKind::Delete;
751 102 : let permit = self.permit(kind, cancel).await?;
752 102 : let mut delete_objects = Vec::with_capacity(paths.len());
753 212 : for path in paths {
754 110 : let obj_id = ObjectIdentifier::builder()
755 110 : .set_key(Some(self.relative_path_to_s3_object(path)))
756 110 : .build()
757 110 : .context("convert path to oid")?;
758 110 : delete_objects.push(obj_id);
759 : }
760 :
761 377 : self.delete_oids(&permit, &delete_objects, cancel).await
762 102 : }
763 :
764 90 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
765 90 : let paths = std::array::from_ref(path);
766 317 : self.delete_objects(paths, cancel).await
767 90 : }
768 :
769 6 : async fn time_travel_recover(
770 6 : &self,
771 6 : prefix: Option<&RemotePath>,
772 6 : timestamp: SystemTime,
773 6 : done_if_after: SystemTime,
774 6 : cancel: &CancellationToken,
775 6 : ) -> Result<(), TimeTravelError> {
776 6 : let kind = RequestKind::TimeTravel;
777 6 : let permit = self.permit(kind, cancel).await?;
778 :
779 6 : let timestamp = DateTime::from(timestamp);
780 6 : let done_if_after = DateTime::from(done_if_after);
781 6 :
782 6 : tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
783 :
784 : // get the passed prefix or if it is not set use prefix_in_bucket value
785 6 : let prefix = prefix
786 6 : .map(|p| self.relative_path_to_s3_object(p))
787 6 : .or_else(|| self.prefix_in_bucket.clone());
788 6 :
789 6 : let warn_threshold = 3;
790 6 : let max_retries = 10;
791 6 : let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
792 :
793 6 : let mut key_marker = None;
794 6 : let mut version_id_marker = None;
795 6 : let mut versions_and_deletes = Vec::new();
796 :
797 : loop {
798 6 : let response = backoff::retry(
799 8 : || async {
800 8 : let op = self
801 8 : .client
802 8 : .list_object_versions()
803 8 : .bucket(self.bucket_name.clone())
804 8 : .set_prefix(prefix.clone())
805 8 : .set_key_marker(key_marker.clone())
806 8 : .set_version_id_marker(version_id_marker.clone())
807 8 : .send();
808 8 :
809 8 : tokio::select! {
810 8 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
811 8 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
812 8 : }
813 8 : },
814 6 : is_permanent,
815 6 : warn_threshold,
816 6 : max_retries,
817 6 : "listing object versions for time_travel_recover",
818 6 : cancel,
819 6 : )
820 50 : .await
821 6 : .ok_or_else(|| TimeTravelError::Cancelled)
822 6 : .and_then(|x| x)?;
823 :
824 6 : tracing::trace!(
825 0 : " Got List response version_id_marker={:?}, key_marker={:?}",
826 : response.version_id_marker,
827 : response.key_marker
828 : );
829 6 : let versions = response
830 6 : .versions
831 6 : .unwrap_or_default()
832 6 : .into_iter()
833 6 : .map(VerOrDelete::from_version);
834 6 : let deletes = response
835 6 : .delete_markers
836 6 : .unwrap_or_default()
837 6 : .into_iter()
838 6 : .map(VerOrDelete::from_delete_marker);
839 6 : itertools::process_results(versions.chain(deletes), |n_vds| {
840 6 : versions_and_deletes.extend(n_vds)
841 6 : })
842 6 : .map_err(TimeTravelError::Other)?;
843 12 : fn none_if_empty(v: Option<String>) -> Option<String> {
844 12 : v.filter(|v| !v.is_empty())
845 12 : }
846 6 : version_id_marker = none_if_empty(response.next_version_id_marker);
847 6 : key_marker = none_if_empty(response.next_key_marker);
848 6 : if version_id_marker.is_none() {
849 : // The final response is not supposed to be truncated
850 6 : if response.is_truncated.unwrap_or_default() {
851 0 : return Err(TimeTravelError::Other(anyhow::anyhow!(
852 0 : "Received truncated ListObjectVersions response for prefix={prefix:?}"
853 0 : )));
854 6 : }
855 6 : break;
856 0 : }
857 0 : // Limit the number of versions deletions, mostly so that we don't
858 0 : // keep requesting forever if the list is too long, as we'd put the
859 0 : // list in RAM.
860 0 : // Building a list of 100k entries that reaches the limit roughly takes
861 0 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
862 0 : const COMPLEXITY_LIMIT: usize = 100_000;
863 0 : if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
864 0 : return Err(TimeTravelError::TooManyVersions);
865 0 : }
866 : }
867 :
868 6 : tracing::info!(
869 0 : "Built list for time travel with {} versions and deletions",
870 0 : versions_and_deletes.len()
871 : );
872 :
873 : // Work on the list of references instead of the objects directly,
874 : // otherwise we get lifetime errors in the sort_by_key call below.
875 6 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
876 6 :
877 124 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
878 6 :
879 6 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
880 :
881 42 : for vd in &versions_and_deletes {
882 : let VerOrDelete {
883 36 : version_id, key, ..
884 36 : } = &vd;
885 36 : if version_id == "null" {
886 0 : return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
887 0 : indicating either disabled versioning, or legacy objects with null version id values")));
888 36 : }
889 36 : tracing::trace!(
890 0 : "Parsing version key={key} version_id={version_id} kind={:?}",
891 : vd.kind
892 : );
893 :
894 36 : vds_for_key.entry(key).or_default().push(vd);
895 : }
896 24 : for (key, versions) in vds_for_key {
897 18 : let last_vd = versions.last().unwrap();
898 18 : if last_vd.last_modified > done_if_after {
899 0 : tracing::trace!("Key {key} has version later than done_if_after, skipping");
900 0 : continue;
901 18 : }
902 : // the version we want to restore to.
903 18 : let version_to_restore_to =
904 28 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
905 0 : Ok(v) => v,
906 18 : Err(e) => e,
907 : };
908 18 : if version_to_restore_to == versions.len() {
909 6 : tracing::trace!("Key {key} has no changes since timestamp, skipping");
910 6 : continue;
911 12 : }
912 12 : let mut do_delete = false;
913 12 : if version_to_restore_to == 0 {
914 : // All versions more recent, so the key didn't exist at the specified time point.
915 6 : tracing::trace!(
916 0 : "All {} versions more recent for {key}, deleting",
917 0 : versions.len()
918 : );
919 6 : do_delete = true;
920 : } else {
921 6 : match &versions[version_to_restore_to - 1] {
922 : VerOrDelete {
923 : kind: VerOrDeleteKind::Version,
924 6 : version_id,
925 6 : ..
926 6 : } => {
927 6 : tracing::trace!("Copying old version {version_id} for {key}...");
928 : // Restore the state to the last version by copying
929 6 : let source_id =
930 6 : format!("{}/{key}?versionId={version_id}", self.bucket_name);
931 6 :
932 6 : backoff::retry(
933 6 : || async {
934 6 : let op = self
935 6 : .client
936 6 : .copy_object()
937 6 : .bucket(self.bucket_name.clone())
938 6 : .key(key)
939 6 : .set_storage_class(self.upload_storage_class.clone())
940 6 : .copy_source(&source_id)
941 6 : .send();
942 6 :
943 6 : tokio::select! {
944 6 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
945 6 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
946 6 : }
947 6 : },
948 6 : is_permanent,
949 6 : warn_threshold,
950 6 : max_retries,
951 6 : "copying object version for time_travel_recover",
952 6 : cancel,
953 6 : )
954 18 : .await
955 6 : .ok_or_else(|| TimeTravelError::Cancelled)
956 6 : .and_then(|x| x)?;
957 6 : tracing::info!(%version_id, %key, "Copied old version in S3");
958 : }
959 : VerOrDelete {
960 : kind: VerOrDeleteKind::DeleteMarker,
961 : ..
962 0 : } => {
963 0 : do_delete = true;
964 0 : }
965 : }
966 : };
967 12 : if do_delete {
968 6 : if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
969 : // Key has since been deleted (but there was some history), no need to do anything
970 2 : tracing::trace!("Key {key} already deleted, skipping.");
971 : } else {
972 4 : tracing::trace!("Deleting {key}...");
973 :
974 4 : let oid = ObjectIdentifier::builder()
975 4 : .key(key.to_owned())
976 4 : .build()
977 4 : .map_err(|e| TimeTravelError::Other(e.into()))?;
978 :
979 4 : self.delete_oids(&permit, &[oid], cancel)
980 8 : .await
981 4 : .map_err(|e| {
982 0 : // delete_oid0 will use TimeoutOrCancel
983 0 : if TimeoutOrCancel::caused_by_cancel(&e) {
984 0 : TimeTravelError::Cancelled
985 : } else {
986 0 : TimeTravelError::Other(e)
987 : }
988 4 : })?;
989 : }
990 6 : }
991 : }
992 6 : Ok(())
993 6 : }
994 : }
995 :
996 : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
997 : struct VerOrDelete {
998 : kind: VerOrDeleteKind,
999 : last_modified: DateTime,
1000 : version_id: String,
1001 : key: String,
1002 : }
1003 :
1004 : #[derive(Debug)]
1005 : enum VerOrDeleteKind {
1006 : Version,
1007 : DeleteMarker,
1008 : }
1009 :
1010 : impl VerOrDelete {
1011 36 : fn with_kind(
1012 36 : kind: VerOrDeleteKind,
1013 36 : last_modified: Option<DateTime>,
1014 36 : version_id: Option<String>,
1015 36 : key: Option<String>,
1016 36 : ) -> anyhow::Result<Self> {
1017 36 : let lvk = (last_modified, version_id, key);
1018 36 : let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
1019 0 : anyhow::bail!(
1020 0 : "One (or more) of last_modified, key, and id is None. \
1021 0 : Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
1022 0 : lvk.0,
1023 0 : lvk.1,
1024 0 : lvk.2,
1025 0 : );
1026 : };
1027 36 : Ok(Self {
1028 36 : kind,
1029 36 : last_modified,
1030 36 : version_id,
1031 36 : key,
1032 36 : })
1033 36 : }
1034 28 : fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
1035 28 : Self::with_kind(
1036 28 : VerOrDeleteKind::Version,
1037 28 : v.last_modified,
1038 28 : v.version_id,
1039 28 : v.key,
1040 28 : )
1041 28 : }
1042 8 : fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
1043 8 : Self::with_kind(
1044 8 : VerOrDeleteKind::DeleteMarker,
1045 8 : v.last_modified,
1046 8 : v.version_id,
1047 8 : v.key,
1048 8 : )
1049 8 : }
1050 : }
1051 :
1052 : #[cfg(test)]
1053 : mod tests {
1054 : use camino::Utf8Path;
1055 : use std::num::NonZeroUsize;
1056 :
1057 : use crate::{RemotePath, S3Bucket, S3Config};
1058 :
1059 : #[tokio::test]
1060 4 : async fn relative_path() {
1061 4 : let all_paths = ["", "some/path", "some/path/"];
1062 4 : let all_paths: Vec<RemotePath> = all_paths
1063 4 : .iter()
1064 12 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
1065 4 : .collect();
1066 4 : let prefixes = [
1067 4 : None,
1068 4 : Some(""),
1069 4 : Some("test/prefix"),
1070 4 : Some("test/prefix/"),
1071 4 : Some("/test/prefix/"),
1072 4 : ];
1073 4 : let expected_outputs = [
1074 4 : vec!["", "some/path", "some/path/"],
1075 4 : vec!["/", "/some/path", "/some/path/"],
1076 4 : vec![
1077 4 : "test/prefix/",
1078 4 : "test/prefix/some/path",
1079 4 : "test/prefix/some/path/",
1080 4 : ],
1081 4 : vec![
1082 4 : "test/prefix/",
1083 4 : "test/prefix/some/path",
1084 4 : "test/prefix/some/path/",
1085 4 : ],
1086 4 : vec![
1087 4 : "test/prefix/",
1088 4 : "test/prefix/some/path",
1089 4 : "test/prefix/some/path/",
1090 4 : ],
1091 4 : ];
1092 4 :
1093 20 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
1094 20 : let config = S3Config {
1095 20 : bucket_name: "bucket".to_owned(),
1096 20 : bucket_region: "region".to_owned(),
1097 20 : prefix_in_bucket: prefix.map(str::to_string),
1098 20 : endpoint: None,
1099 20 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
1100 20 : max_keys_per_list_response: Some(5),
1101 20 : upload_storage_class: None,
1102 20 : };
1103 20 : let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
1104 4 : .await
1105 20 : .expect("remote storage init");
1106 60 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
1107 60 : let result = storage.relative_path_to_s3_object(test_path);
1108 60 : let expected = expected_outputs[prefix_idx][test_path_idx];
1109 60 : assert_eq!(result, expected);
1110 4 : }
1111 4 : }
1112 4 : }
1113 : }
|