Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::borrow::Cow;
8 : use std::collections::HashMap;
9 : use std::num::NonZeroU32;
10 : use std::pin::Pin;
11 : use std::sync::Arc;
12 : use std::task::{Context, Poll};
13 : use std::time::{Duration, SystemTime};
14 :
15 : use anyhow::{Context as _, anyhow};
16 : use aws_config::BehaviorVersion;
17 : use aws_config::default_provider::credentials::DefaultCredentialsChain;
18 : use aws_config::retry::{RetryConfigBuilder, RetryMode};
19 : use aws_sdk_s3::Client;
20 : use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep};
21 : use aws_sdk_s3::error::SdkError;
22 : use aws_sdk_s3::operation::get_object::GetObjectError;
23 : use aws_sdk_s3::operation::head_object::HeadObjectError;
24 : use aws_sdk_s3::types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass};
25 : use aws_smithy_async::rt::sleep::TokioSleep;
26 : use aws_smithy_types::DateTime;
27 : use aws_smithy_types::body::SdkBody;
28 : use aws_smithy_types::byte_stream::ByteStream;
29 : use aws_smithy_types::date_time::ConversionError;
30 : use bytes::Bytes;
31 : use futures::stream::Stream;
32 : use futures_util::StreamExt;
33 : use http_body_util::StreamBody;
34 : use http_types::StatusCode;
35 : use hyper::body::Frame;
36 : use scopeguard::ScopeGuard;
37 : use tokio_util::sync::CancellationToken;
38 : use utils::backoff;
39 :
40 : use super::StorageMetadata;
41 : use crate::config::S3Config;
42 : use crate::error::Cancelled;
43 : pub(super) use crate::metrics::RequestKind;
44 : use crate::metrics::{AttemptOutcome, start_counting_cancelled_wait, start_measuring_requests};
45 : use crate::support::PermitCarrying;
46 : use crate::{
47 : ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
48 : MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage,
49 : TimeTravelError, TimeoutOrCancel,
50 : };
51 :
52 : /// AWS S3 storage.
53 : pub struct S3Bucket {
54 : client: Client,
55 : bucket_name: String,
56 : prefix_in_bucket: Option<String>,
57 : max_keys_per_list_response: Option<i32>,
58 : upload_storage_class: Option<StorageClass>,
59 : concurrency_limiter: ConcurrencyLimiter,
60 : // Per-request timeout. Accessible for tests.
61 : pub timeout: Duration,
62 : }
63 :
64 : struct GetObjectRequest {
65 : bucket: String,
66 : key: String,
67 : etag: Option<String>,
68 : range: Option<String>,
69 : }
70 : impl S3Bucket {
71 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
72 41 : pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
73 41 : tracing::debug!(
74 0 : "Creating s3 remote storage for S3 bucket {}",
75 : remote_storage_config.bucket_name
76 : );
77 :
78 41 : let region = Region::new(remote_storage_config.bucket_region.clone());
79 41 : let region_opt = Some(region.clone());
80 :
81 : // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
82 : // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
83 : // Incomplete list of auth methods used by this:
84 : // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
85 : // * "AWS_PROFILE" / `aws sso login --profile <profile>`
86 : // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
87 : // * http (ECS/EKS) container credentials
88 : // * imds v2
89 41 : let credentials_provider = DefaultCredentialsChain::builder()
90 41 : .region(region)
91 41 : .build()
92 41 : .await;
93 :
94 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
95 41 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
96 41 :
97 41 : let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
98 41 : #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
99 41 : BehaviorVersion::v2023_11_09(),
100 41 : )
101 41 : .region(region_opt)
102 41 : .identity_cache(IdentityCache::lazy().build())
103 41 : .credentials_provider(credentials_provider)
104 41 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
105 41 :
106 41 : let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
107 41 : s.spawn(|| {
108 41 : // TODO: make this function async.
109 41 : tokio::runtime::Builder::new_current_thread()
110 41 : .enable_all()
111 41 : .build()
112 41 : .unwrap()
113 41 : .block_on(sdk_config_loader.load())
114 41 : })
115 41 : .join()
116 41 : .unwrap()
117 41 : });
118 41 :
119 41 : let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
120 :
121 : // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
122 : // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
123 41 : if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
124 0 : s3_config_builder = s3_config_builder
125 0 : .endpoint_url(custom_endpoint)
126 0 : .force_path_style(true);
127 15 : }
128 :
129 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
130 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
131 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
132 41 : let mut retry_config = RetryConfigBuilder::new();
133 41 : retry_config
134 41 : .set_max_attempts(Some(1))
135 41 : .set_mode(Some(RetryMode::Adaptive));
136 41 : s3_config_builder = s3_config_builder.retry_config(retry_config.build());
137 41 :
138 41 : let s3_config = s3_config_builder.build();
139 41 : let client = aws_sdk_s3::Client::from_conf(s3_config);
140 41 :
141 41 : let prefix_in_bucket = remote_storage_config
142 41 : .prefix_in_bucket
143 41 : .as_deref()
144 41 : .map(|prefix| {
145 38 : let mut prefix = prefix;
146 41 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
147 3 : prefix = &prefix[1..]
148 : }
149 :
150 38 : let mut prefix = prefix.to_string();
151 70 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
152 32 : prefix.pop();
153 32 : }
154 38 : prefix
155 41 : });
156 41 :
157 41 : Ok(Self {
158 41 : client,
159 41 : bucket_name: remote_storage_config.bucket_name.clone(),
160 41 : max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
161 41 : prefix_in_bucket,
162 41 : concurrency_limiter: ConcurrencyLimiter::new(
163 41 : remote_storage_config.concurrency_limit.get(),
164 41 : ),
165 41 : upload_storage_class: remote_storage_config.upload_storage_class.clone(),
166 41 : timeout,
167 41 : })
168 41 : }
169 :
170 519 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
171 519 : let relative_path =
172 519 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
173 519 : Some(stripped) => stripped,
174 : // we rely on AWS to return properly prefixed paths
175 : // for requests with a certain prefix
176 0 : None => panic!(
177 0 : "Key {} does not start with bucket prefix {:?}",
178 0 : key, self.prefix_in_bucket
179 0 : ),
180 : };
181 519 : RemotePath(
182 519 : relative_path
183 519 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
184 519 : .collect(),
185 519 : )
186 519 : }
187 :
188 556 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
189 556 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
190 556 : let path_string = path.get_path().as_str();
191 556 : match &self.prefix_in_bucket {
192 547 : Some(prefix) => prefix.clone() + "/" + path_string,
193 9 : None => path_string.to_string(),
194 : }
195 556 : }
196 :
197 472 : async fn permit(
198 472 : &self,
199 472 : kind: RequestKind,
200 472 : cancel: &CancellationToken,
201 472 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
202 472 : let started_at = start_counting_cancelled_wait(kind);
203 472 : let acquire = self.concurrency_limiter.acquire(kind);
204 :
205 472 : let permit = tokio::select! {
206 472 : permit = acquire => permit.expect("semaphore is never closed"),
207 472 : _ = cancel.cancelled() => return Err(Cancelled),
208 : };
209 :
210 472 : let started_at = ScopeGuard::into_inner(started_at);
211 472 : crate::metrics::BUCKET_METRICS
212 472 : .wait_seconds
213 472 : .observe_elapsed(kind, started_at);
214 472 :
215 472 : Ok(permit)
216 0 : }
217 :
218 33 : async fn owned_permit(
219 33 : &self,
220 33 : kind: RequestKind,
221 33 : cancel: &CancellationToken,
222 33 : ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
223 33 : let started_at = start_counting_cancelled_wait(kind);
224 33 : let acquire = self.concurrency_limiter.acquire_owned(kind);
225 :
226 33 : let permit = tokio::select! {
227 33 : permit = acquire => permit.expect("semaphore is never closed"),
228 33 : _ = cancel.cancelled() => return Err(Cancelled),
229 : };
230 :
231 33 : let started_at = ScopeGuard::into_inner(started_at);
232 33 : crate::metrics::BUCKET_METRICS
233 33 : .wait_seconds
234 33 : .observe_elapsed(kind, started_at);
235 33 : Ok(permit)
236 0 : }
237 :
238 33 : async fn download_object(
239 33 : &self,
240 33 : request: GetObjectRequest,
241 33 : cancel: &CancellationToken,
242 33 : ) -> Result<Download, DownloadError> {
243 33 : let kind = RequestKind::Get;
244 :
245 33 : let permit = self.owned_permit(kind, cancel).await?;
246 :
247 33 : let started_at = start_measuring_requests(kind);
248 33 :
249 33 : let mut builder = self
250 33 : .client
251 33 : .get_object()
252 33 : .bucket(request.bucket)
253 33 : .key(request.key)
254 33 : .set_range(request.range);
255 :
256 33 : if let Some(etag) = request.etag {
257 6 : builder = builder.if_none_match(etag);
258 6 : }
259 :
260 33 : let get_object = builder.send();
261 :
262 33 : let get_object = tokio::select! {
263 33 : res = get_object => res,
264 33 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
265 33 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
266 : };
267 :
268 33 : let started_at = ScopeGuard::into_inner(started_at);
269 :
270 28 : let object_output = match get_object {
271 28 : Ok(object_output) => object_output,
272 4 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
273 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
274 : // an error: we expect to sometimes fetch an object and find it missing,
275 : // e.g. when probing for timeline indices.
276 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
277 0 : kind,
278 0 : AttemptOutcome::Ok,
279 0 : started_at,
280 0 : );
281 0 : return Err(DownloadError::NotFound);
282 : }
283 4 : Err(SdkError::ServiceError(e))
284 : // aws_smithy_runtime_api::http::response::StatusCode isn't
285 : // re-exported by any aws crates, so just check the numeric
286 : // status against http_types::StatusCode instead of pulling it.
287 4 : if e.raw().status().as_u16() == StatusCode::NotModified =>
288 4 : {
289 4 : // Count an unmodified file as a success.
290 4 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
291 4 : kind,
292 4 : AttemptOutcome::Ok,
293 4 : started_at,
294 4 : );
295 4 : return Err(DownloadError::Unmodified);
296 : }
297 1 : Err(e) => {
298 1 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
299 1 : kind,
300 1 : AttemptOutcome::Err,
301 1 : started_at,
302 1 : );
303 1 :
304 1 : return Err(DownloadError::Other(
305 1 : anyhow::Error::new(e).context("download s3 object"),
306 1 : ));
307 : }
308 : };
309 :
310 : // even if we would have no timeout left, continue anyways. the caller can decide to ignore
311 : // the errors considering timeouts and cancellation.
312 28 : let remaining = self.timeout.saturating_sub(started_at.elapsed());
313 28 :
314 28 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
315 28 : let etag = object_output
316 28 : .e_tag
317 28 : .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
318 28 : .into();
319 28 : let last_modified = object_output
320 28 : .last_modified
321 28 : .ok_or(DownloadError::Other(anyhow::anyhow!(
322 28 : "Missing LastModified header"
323 28 : )))?
324 28 : .try_into()
325 28 : .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
326 :
327 28 : let body = object_output.body;
328 28 : let body = ByteStreamAsStream::from(body);
329 28 : let body = PermitCarrying::new(permit, body);
330 28 : let body = TimedDownload::new(started_at, body);
331 28 :
332 28 : let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
333 28 : let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
334 28 :
335 28 : Ok(Download {
336 28 : metadata,
337 28 : etag,
338 28 : last_modified,
339 28 : download_stream: Box::pin(body),
340 28 : })
341 0 : }
342 :
343 198 : async fn delete_oids(
344 198 : &self,
345 198 : _permit: &tokio::sync::SemaphorePermit<'_>,
346 198 : delete_objects: &[ObjectIdentifier],
347 198 : cancel: &CancellationToken,
348 198 : ) -> anyhow::Result<()> {
349 198 : let kind = RequestKind::Delete;
350 198 : let mut cancel = std::pin::pin!(cancel.cancelled());
351 :
352 198 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
353 198 : let started_at = start_measuring_requests(kind);
354 :
355 198 : let req = self
356 198 : .client
357 198 : .delete_objects()
358 198 : .bucket(self.bucket_name.clone())
359 198 : .delete(
360 198 : Delete::builder()
361 198 : .set_objects(Some(chunk.to_vec()))
362 198 : .build()
363 198 : .context("build request")?,
364 : )
365 198 : .send();
366 :
367 198 : let resp = tokio::select! {
368 198 : resp = req => resp,
369 198 : _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
370 198 : _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
371 : };
372 :
373 198 : let started_at = ScopeGuard::into_inner(started_at);
374 198 : crate::metrics::BUCKET_METRICS
375 198 : .req_seconds
376 198 : .observe_elapsed(kind, &resp, started_at);
377 :
378 198 : let resp = resp.context("request deletion")?;
379 198 : crate::metrics::BUCKET_METRICS
380 198 : .deleted_objects_total
381 198 : .inc_by(chunk.len() as u64);
382 :
383 198 : if let Some(errors) = resp.errors {
384 : // Log a bounded number of the errors within the response:
385 : // these requests can carry 1000 keys so logging each one
386 : // would be too verbose, especially as errors may lead us
387 : // to retry repeatedly.
388 : const LOG_UP_TO_N_ERRORS: usize = 10;
389 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
390 0 : tracing::warn!(
391 0 : "DeleteObjects key {} failed: {}: {}",
392 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
393 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
394 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
395 : );
396 : }
397 :
398 0 : return Err(anyhow::anyhow!(
399 0 : "Failed to delete {}/{} objects",
400 0 : errors.len(),
401 0 : chunk.len(),
402 0 : ));
403 0 : }
404 : }
405 198 : Ok(())
406 0 : }
407 :
408 6 : pub fn bucket_name(&self) -> &str {
409 6 : &self.bucket_name
410 6 : }
411 : }
412 :
413 : pin_project_lite::pin_project! {
414 : struct ByteStreamAsStream {
415 : #[pin]
416 : inner: aws_smithy_types::byte_stream::ByteStream
417 : }
418 : }
419 :
420 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
421 28 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
422 28 : ByteStreamAsStream { inner }
423 28 : }
424 : }
425 :
426 : impl Stream for ByteStreamAsStream {
427 : type Item = std::io::Result<Bytes>;
428 :
429 48 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
430 48 : // this does the std::io::ErrorKind::Other conversion
431 48 : self.project().inner.poll_next(cx).map_err(|x| x.into())
432 48 : }
433 :
434 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
435 : // sense and Stream::size_hint does not really
436 : }
437 :
438 : pin_project_lite::pin_project! {
439 : /// Times and tracks the outcome of the request.
440 : struct TimedDownload<S> {
441 : started_at: std::time::Instant,
442 : outcome: AttemptOutcome,
443 : #[pin]
444 : inner: S
445 : }
446 :
447 : impl<S> PinnedDrop for TimedDownload<S> {
448 : fn drop(mut this: Pin<&mut Self>) {
449 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
450 : }
451 : }
452 : }
453 :
454 : impl<S> TimedDownload<S> {
455 28 : fn new(started_at: std::time::Instant, inner: S) -> Self {
456 28 : TimedDownload {
457 28 : started_at,
458 28 : outcome: AttemptOutcome::Cancelled,
459 28 : inner,
460 28 : }
461 28 : }
462 : }
463 :
464 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
465 : type Item = <S as Stream>::Item;
466 :
467 48 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
468 : use std::task::ready;
469 :
470 48 : let this = self.project();
471 :
472 48 : let res = ready!(this.inner.poll_next(cx));
473 22 : match &res {
474 22 : Some(Ok(_)) => {}
475 0 : Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
476 18 : None => *this.outcome = AttemptOutcome::Ok,
477 : }
478 :
479 40 : Poll::Ready(res)
480 0 : }
481 :
482 0 : fn size_hint(&self) -> (usize, Option<usize>) {
483 0 : self.inner.size_hint()
484 0 : }
485 : }
486 :
487 : impl RemoteStorage for S3Bucket {
488 64 : fn list_streaming(
489 64 : &self,
490 64 : prefix: Option<&RemotePath>,
491 64 : mode: ListingMode,
492 64 : max_keys: Option<NonZeroU32>,
493 64 : cancel: &CancellationToken,
494 64 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
495 64 : let kind = RequestKind::List;
496 64 : // s3 sdk wants i32
497 64 : let mut max_keys = max_keys.map(|mk| mk.get() as i32);
498 64 :
499 64 : // get the passed prefix or if it is not set use prefix_in_bucket value
500 64 : let list_prefix = prefix
501 64 : .map(|p| self.relative_path_to_s3_object(p))
502 64 : .or_else(|| {
503 32 : self.prefix_in_bucket.clone().map(|mut s| {
504 32 : s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
505 32 : s
506 32 : })
507 64 : });
508 64 :
509 64 : async_stream::stream! {
510 64 : let _permit = self.permit(kind, cancel).await?;
511 64 :
512 64 : let mut continuation_token = None;
513 64 : 'outer: loop {
514 64 : let started_at = start_measuring_requests(kind);
515 64 :
516 64 : // min of two Options, returning Some if one is value and another is
517 64 : // None (None is smaller than anything, so plain min doesn't work).
518 64 : let request_max_keys = self
519 64 : .max_keys_per_list_response
520 64 : .into_iter()
521 64 : .chain(max_keys.into_iter())
522 64 : .min();
523 64 : let mut request = self
524 64 : .client
525 64 : .list_objects_v2()
526 64 : .bucket(self.bucket_name.clone())
527 64 : .set_prefix(list_prefix.clone())
528 64 : .set_continuation_token(continuation_token.clone())
529 64 : .set_max_keys(request_max_keys);
530 64 :
531 64 : if let ListingMode::WithDelimiter = mode {
532 64 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
533 64 : }
534 64 :
535 64 : let request = request.send();
536 64 :
537 64 : let response = tokio::select! {
538 64 : res = request => Ok(res),
539 64 : _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
540 64 : _ = cancel.cancelled() => Err(DownloadError::Cancelled),
541 64 : }?;
542 64 :
543 64 : let response = response
544 64 : .context("Failed to list S3 prefixes")
545 64 : .map_err(DownloadError::Other);
546 64 :
547 64 : let started_at = ScopeGuard::into_inner(started_at);
548 64 :
549 64 : crate::metrics::BUCKET_METRICS
550 64 : .req_seconds
551 64 : .observe_elapsed(kind, &response, started_at);
552 64 :
553 64 : let response = match response {
554 64 : Ok(response) => response,
555 64 : Err(e) => {
556 64 : // The error is potentially retryable, so we must rewind the loop after yielding.
557 64 : yield Err(e);
558 64 : continue 'outer;
559 64 : },
560 64 : };
561 64 :
562 64 : let keys = response.contents();
563 64 : let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
564 64 :
565 64 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
566 64 : let mut result = Listing::default();
567 64 :
568 64 : for object in keys {
569 64 : let key = object.key().expect("response does not contain a key");
570 64 : let key = self.s3_object_to_relative_path(key);
571 64 :
572 64 : let last_modified = match object.last_modified.map(SystemTime::try_from) {
573 64 : Some(Ok(t)) => t,
574 64 : Some(Err(_)) => {
575 64 : tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
576 64 : object.last_modified, key
577 64 : );
578 64 : SystemTime::now()
579 64 : },
580 64 : None => {
581 64 : SystemTime::now()
582 64 : }
583 64 : };
584 64 :
585 64 : let size = object.size.unwrap_or(0) as u64;
586 64 :
587 64 : result.keys.push(ListingObject{
588 64 : key,
589 64 : last_modified,
590 64 : size,
591 64 : });
592 64 : if let Some(mut mk) = max_keys {
593 64 : assert!(mk > 0);
594 64 : mk -= 1;
595 64 : if mk == 0 {
596 64 : // limit reached
597 64 : yield Ok(result);
598 64 : break 'outer;
599 64 : }
600 64 : max_keys = Some(mk);
601 64 : }
602 64 : }
603 64 :
604 64 : // S3 gives us prefixes like "foo/", we return them like "foo"
605 104 : result.prefixes.extend(prefixes.iter().filter_map(|o| {
606 104 : Some(
607 104 : self.s3_object_to_relative_path(
608 104 : o.prefix()?
609 104 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
610 64 : ),
611 64 : )
612 64 : }));
613 64 :
614 64 : yield Ok(result);
615 64 :
616 64 : continuation_token = match response.next_continuation_token {
617 64 : Some(new_token) => Some(new_token),
618 64 : None => break,
619 64 : };
620 64 : }
621 64 : }
622 64 : }
623 :
624 6 : async fn head_object(
625 6 : &self,
626 6 : key: &RemotePath,
627 6 : cancel: &CancellationToken,
628 6 : ) -> Result<ListingObject, DownloadError> {
629 6 : let kind = RequestKind::Head;
630 6 : let _permit = self.permit(kind, cancel).await?;
631 :
632 6 : let started_at = start_measuring_requests(kind);
633 6 :
634 6 : let head_future = self
635 6 : .client
636 6 : .head_object()
637 6 : .bucket(self.bucket_name())
638 6 : .key(self.relative_path_to_s3_object(key))
639 6 : .send();
640 6 :
641 6 : let head_future = tokio::time::timeout(self.timeout, head_future);
642 :
643 6 : let res = tokio::select! {
644 6 : res = head_future => res,
645 6 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
646 : };
647 :
648 6 : let res = res.map_err(|_e| DownloadError::Timeout)?;
649 :
650 : // do not incl. timeouts as errors in metrics but cancellations
651 6 : let started_at = ScopeGuard::into_inner(started_at);
652 6 : crate::metrics::BUCKET_METRICS
653 6 : .req_seconds
654 6 : .observe_elapsed(kind, &res, started_at);
655 :
656 4 : let data = match res {
657 4 : Ok(object_output) => object_output,
658 2 : Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
659 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
660 : // an error: we expect to sometimes fetch an object and find it missing,
661 : // e.g. when probing for timeline indices.
662 2 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
663 2 : kind,
664 2 : AttemptOutcome::Ok,
665 2 : started_at,
666 2 : );
667 2 : return Err(DownloadError::NotFound);
668 : }
669 0 : Err(e) => {
670 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
671 0 : kind,
672 0 : AttemptOutcome::Err,
673 0 : started_at,
674 0 : );
675 0 :
676 0 : return Err(DownloadError::Other(
677 0 : anyhow::Error::new(e).context("s3 head object"),
678 0 : ));
679 : }
680 : };
681 :
682 4 : let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
683 0 : return Err(DownloadError::Other(anyhow!(
684 0 : "head_object doesn't contain last_modified or content_length"
685 0 : )))?;
686 : };
687 : Ok(ListingObject {
688 4 : key: key.to_owned(),
689 4 : last_modified: SystemTime::try_from(last_modified).map_err(|e| {
690 0 : DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
691 4 : })?,
692 4 : size: size as u64,
693 : })
694 0 : }
695 :
696 200 : async fn upload(
697 200 : &self,
698 200 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
699 200 : from_size_bytes: usize,
700 200 : to: &RemotePath,
701 200 : metadata: Option<StorageMetadata>,
702 200 : cancel: &CancellationToken,
703 200 : ) -> anyhow::Result<()> {
704 200 : let kind = RequestKind::Put;
705 200 : let _permit = self.permit(kind, cancel).await?;
706 :
707 200 : let started_at = start_measuring_requests(kind);
708 200 :
709 712 : let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
710 200 : let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
711 :
712 200 : let upload = self
713 200 : .client
714 200 : .put_object()
715 200 : .bucket(self.bucket_name.clone())
716 200 : .key(self.relative_path_to_s3_object(to))
717 200 : .set_metadata(metadata.map(|m| m.0))
718 200 : .set_storage_class(self.upload_storage_class.clone())
719 200 : .content_length(from_size_bytes.try_into()?)
720 200 : .body(bytes_stream)
721 200 : .send();
722 200 :
723 200 : let upload = tokio::time::timeout(self.timeout, upload);
724 :
725 200 : let res = tokio::select! {
726 200 : res = upload => res,
727 200 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
728 : };
729 :
730 200 : if let Ok(inner) = &res {
731 200 : // do not incl. timeouts as errors in metrics but cancellations
732 200 : let started_at = ScopeGuard::into_inner(started_at);
733 200 : crate::metrics::BUCKET_METRICS
734 200 : .req_seconds
735 200 : .observe_elapsed(kind, inner, started_at);
736 200 : }
737 :
738 200 : match res {
739 198 : Ok(Ok(_put)) => Ok(()),
740 2 : Ok(Err(sdk)) => Err(sdk.into()),
741 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
742 : }
743 0 : }
744 :
745 2 : async fn copy(
746 2 : &self,
747 2 : from: &RemotePath,
748 2 : to: &RemotePath,
749 2 : cancel: &CancellationToken,
750 2 : ) -> anyhow::Result<()> {
751 2 : let kind = RequestKind::Copy;
752 2 : let _permit = self.permit(kind, cancel).await?;
753 :
754 2 : let timeout = tokio::time::sleep(self.timeout);
755 2 :
756 2 : let started_at = start_measuring_requests(kind);
757 2 :
758 2 : // we need to specify bucket_name as a prefix
759 2 : let copy_source = format!(
760 2 : "{}/{}",
761 2 : self.bucket_name,
762 2 : self.relative_path_to_s3_object(from)
763 2 : );
764 2 :
765 2 : let op = self
766 2 : .client
767 2 : .copy_object()
768 2 : .bucket(self.bucket_name.clone())
769 2 : .key(self.relative_path_to_s3_object(to))
770 2 : .set_storage_class(self.upload_storage_class.clone())
771 2 : .copy_source(copy_source)
772 2 : .send();
773 :
774 2 : let res = tokio::select! {
775 2 : res = op => res,
776 2 : _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
777 2 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
778 : };
779 :
780 2 : let started_at = ScopeGuard::into_inner(started_at);
781 2 : crate::metrics::BUCKET_METRICS
782 2 : .req_seconds
783 2 : .observe_elapsed(kind, &res, started_at);
784 2 :
785 2 : res?;
786 :
787 2 : Ok(())
788 0 : }
789 :
790 33 : async fn download(
791 33 : &self,
792 33 : from: &RemotePath,
793 33 : opts: &DownloadOpts,
794 33 : cancel: &CancellationToken,
795 33 : ) -> Result<Download, DownloadError> {
796 33 : // if prefix is not none then download file `prefix/from`
797 33 : // if prefix is none then download file `from`
798 33 : self.download_object(
799 33 : GetObjectRequest {
800 33 : bucket: self.bucket_name.clone(),
801 33 : key: self.relative_path_to_s3_object(from),
802 33 : etag: opts.etag.as_ref().map(|e| e.to_string()),
803 33 : range: opts.byte_range_header(),
804 33 : },
805 33 : cancel,
806 33 : )
807 33 : .await
808 0 : }
809 :
810 194 : async fn delete_objects(
811 194 : &self,
812 194 : paths: &[RemotePath],
813 194 : cancel: &CancellationToken,
814 194 : ) -> anyhow::Result<()> {
815 194 : let kind = RequestKind::Delete;
816 194 : let permit = self.permit(kind, cancel).await?;
817 194 : let mut delete_objects = Vec::with_capacity(paths.len());
818 430 : for path in paths {
819 236 : let obj_id = ObjectIdentifier::builder()
820 236 : .set_key(Some(self.relative_path_to_s3_object(path)))
821 236 : .build()
822 236 : .context("convert path to oid")?;
823 236 : delete_objects.push(obj_id);
824 : }
825 :
826 194 : self.delete_oids(&permit, &delete_objects, cancel).await
827 0 : }
828 :
829 0 : fn max_keys_per_delete(&self) -> usize {
830 0 : MAX_KEYS_PER_DELETE_S3
831 0 : }
832 :
833 174 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
834 174 : let paths = std::array::from_ref(path);
835 174 : self.delete_objects(paths, cancel).await
836 0 : }
837 :
838 6 : async fn time_travel_recover(
839 6 : &self,
840 6 : prefix: Option<&RemotePath>,
841 6 : timestamp: SystemTime,
842 6 : done_if_after: SystemTime,
843 6 : cancel: &CancellationToken,
844 6 : ) -> Result<(), TimeTravelError> {
845 6 : let kind = RequestKind::TimeTravel;
846 6 : let permit = self.permit(kind, cancel).await?;
847 :
848 6 : let timestamp = DateTime::from(timestamp);
849 6 : let done_if_after = DateTime::from(done_if_after);
850 6 :
851 6 : tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
852 :
853 : // get the passed prefix or if it is not set use prefix_in_bucket value
854 6 : let prefix = prefix
855 6 : .map(|p| self.relative_path_to_s3_object(p))
856 6 : .or_else(|| self.prefix_in_bucket.clone());
857 6 :
858 6 : let warn_threshold = 3;
859 6 : let max_retries = 10;
860 6 : let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
861 :
862 6 : let mut key_marker = None;
863 6 : let mut version_id_marker = None;
864 6 : let mut versions_and_deletes = Vec::new();
865 :
866 : loop {
867 6 : let response = backoff::retry(
868 7 : || async {
869 7 : let op = self
870 7 : .client
871 7 : .list_object_versions()
872 7 : .bucket(self.bucket_name.clone())
873 7 : .set_prefix(prefix.clone())
874 7 : .set_key_marker(key_marker.clone())
875 7 : .set_version_id_marker(version_id_marker.clone())
876 7 : .send();
877 7 :
878 7 : tokio::select! {
879 7 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
880 7 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
881 : }
882 6 : },
883 6 : is_permanent,
884 6 : warn_threshold,
885 6 : max_retries,
886 6 : "listing object versions for time_travel_recover",
887 6 : cancel,
888 6 : )
889 6 : .await
890 6 : .ok_or_else(|| TimeTravelError::Cancelled)
891 6 : .and_then(|x| x)?;
892 :
893 6 : tracing::trace!(
894 0 : " Got List response version_id_marker={:?}, key_marker={:?}",
895 : response.version_id_marker,
896 : response.key_marker
897 : );
898 6 : let versions = response
899 6 : .versions
900 6 : .unwrap_or_default()
901 6 : .into_iter()
902 6 : .map(VerOrDelete::from_version);
903 6 : let deletes = response
904 6 : .delete_markers
905 6 : .unwrap_or_default()
906 6 : .into_iter()
907 6 : .map(VerOrDelete::from_delete_marker);
908 6 : itertools::process_results(versions.chain(deletes), |n_vds| {
909 6 : versions_and_deletes.extend(n_vds)
910 6 : })
911 6 : .map_err(TimeTravelError::Other)?;
912 12 : fn none_if_empty(v: Option<String>) -> Option<String> {
913 12 : v.filter(|v| !v.is_empty())
914 12 : }
915 6 : version_id_marker = none_if_empty(response.next_version_id_marker);
916 6 : key_marker = none_if_empty(response.next_key_marker);
917 6 : if version_id_marker.is_none() {
918 : // The final response is not supposed to be truncated
919 6 : if response.is_truncated.unwrap_or_default() {
920 0 : return Err(TimeTravelError::Other(anyhow::anyhow!(
921 0 : "Received truncated ListObjectVersions response for prefix={prefix:?}"
922 0 : )));
923 6 : }
924 6 : break;
925 0 : }
926 : // Limit the number of versions deletions, mostly so that we don't
927 : // keep requesting forever if the list is too long, as we'd put the
928 : // list in RAM.
929 : // Building a list of 100k entries that reaches the limit roughly takes
930 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
931 : const COMPLEXITY_LIMIT: usize = 100_000;
932 0 : if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
933 0 : return Err(TimeTravelError::TooManyVersions);
934 0 : }
935 : }
936 :
937 6 : tracing::info!(
938 0 : "Built list for time travel with {} versions and deletions",
939 0 : versions_and_deletes.len()
940 : );
941 :
942 : // Work on the list of references instead of the objects directly,
943 : // otherwise we get lifetime errors in the sort_by_key call below.
944 6 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
945 6 :
946 124 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
947 6 :
948 6 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
949 :
950 42 : for vd in &versions_and_deletes {
951 : let VerOrDelete {
952 36 : version_id, key, ..
953 36 : } = &vd;
954 36 : if version_id == "null" {
955 0 : return Err(TimeTravelError::Other(anyhow!(
956 0 : "Received ListVersions response for key={key} with version_id='null', \
957 0 : indicating either disabled versioning, or legacy objects with null version id values"
958 0 : )));
959 36 : }
960 36 : tracing::trace!(
961 0 : "Parsing version key={key} version_id={version_id} kind={:?}",
962 : vd.kind
963 : );
964 :
965 36 : vds_for_key.entry(key).or_default().push(vd);
966 : }
967 24 : for (key, versions) in vds_for_key {
968 18 : let last_vd = versions.last().unwrap();
969 18 : if last_vd.last_modified > done_if_after {
970 0 : tracing::trace!("Key {key} has version later than done_if_after, skipping");
971 0 : continue;
972 0 : }
973 : // the version we want to restore to.
974 18 : let version_to_restore_to =
975 36 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
976 0 : Ok(v) => v,
977 18 : Err(e) => e,
978 : };
979 18 : if version_to_restore_to == versions.len() {
980 6 : tracing::trace!("Key {key} has no changes since timestamp, skipping");
981 6 : continue;
982 12 : }
983 12 : let mut do_delete = false;
984 12 : if version_to_restore_to == 0 {
985 : // All versions more recent, so the key didn't exist at the specified time point.
986 6 : tracing::trace!(
987 0 : "All {} versions more recent for {key}, deleting",
988 0 : versions.len()
989 : );
990 6 : do_delete = true;
991 : } else {
992 6 : match &versions[version_to_restore_to - 1] {
993 : VerOrDelete {
994 : kind: VerOrDeleteKind::Version,
995 6 : version_id,
996 6 : ..
997 6 : } => {
998 6 : tracing::trace!("Copying old version {version_id} for {key}...");
999 : // Restore the state to the last version by copying
1000 6 : let source_id =
1001 6 : format!("{}/{key}?versionId={version_id}", self.bucket_name);
1002 6 :
1003 6 : backoff::retry(
1004 6 : || async {
1005 6 : let op = self
1006 6 : .client
1007 6 : .copy_object()
1008 6 : .bucket(self.bucket_name.clone())
1009 6 : .key(key)
1010 6 : .set_storage_class(self.upload_storage_class.clone())
1011 6 : .copy_source(&source_id)
1012 6 : .send();
1013 6 :
1014 6 : tokio::select! {
1015 6 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
1016 6 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
1017 : }
1018 6 : },
1019 6 : is_permanent,
1020 6 : warn_threshold,
1021 6 : max_retries,
1022 6 : "copying object version for time_travel_recover",
1023 6 : cancel,
1024 6 : )
1025 6 : .await
1026 6 : .ok_or_else(|| TimeTravelError::Cancelled)
1027 6 : .and_then(|x| x)?;
1028 6 : tracing::info!(%version_id, %key, "Copied old version in S3");
1029 : }
1030 : VerOrDelete {
1031 : kind: VerOrDeleteKind::DeleteMarker,
1032 : ..
1033 0 : } => {
1034 0 : do_delete = true;
1035 0 : }
1036 : }
1037 : };
1038 12 : if do_delete {
1039 6 : if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
1040 : // Key has since been deleted (but there was some history), no need to do anything
1041 2 : tracing::trace!("Key {key} already deleted, skipping.");
1042 : } else {
1043 4 : tracing::trace!("Deleting {key}...");
1044 :
1045 4 : let oid = ObjectIdentifier::builder()
1046 4 : .key(key.to_owned())
1047 4 : .build()
1048 4 : .map_err(|e| TimeTravelError::Other(e.into()))?;
1049 :
1050 4 : self.delete_oids(&permit, &[oid], cancel)
1051 4 : .await
1052 4 : .map_err(|e| {
1053 0 : // delete_oid0 will use TimeoutOrCancel
1054 0 : if TimeoutOrCancel::caused_by_cancel(&e) {
1055 0 : TimeTravelError::Cancelled
1056 : } else {
1057 0 : TimeTravelError::Other(e)
1058 : }
1059 4 : })?;
1060 : }
1061 0 : }
1062 : }
1063 6 : Ok(())
1064 0 : }
1065 : }
1066 :
1067 : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
1068 : struct VerOrDelete {
1069 : kind: VerOrDeleteKind,
1070 : last_modified: DateTime,
1071 : version_id: String,
1072 : key: String,
1073 : }
1074 :
1075 : #[derive(Debug)]
1076 : enum VerOrDeleteKind {
1077 : Version,
1078 : DeleteMarker,
1079 : }
1080 :
1081 : impl VerOrDelete {
1082 36 : fn with_kind(
1083 36 : kind: VerOrDeleteKind,
1084 36 : last_modified: Option<DateTime>,
1085 36 : version_id: Option<String>,
1086 36 : key: Option<String>,
1087 36 : ) -> anyhow::Result<Self> {
1088 36 : let lvk = (last_modified, version_id, key);
1089 36 : let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
1090 0 : anyhow::bail!(
1091 0 : "One (or more) of last_modified, key, and id is None. \
1092 0 : Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
1093 0 : lvk.0,
1094 0 : lvk.1,
1095 0 : lvk.2,
1096 0 : );
1097 : };
1098 36 : Ok(Self {
1099 36 : kind,
1100 36 : last_modified,
1101 36 : version_id,
1102 36 : key,
1103 36 : })
1104 36 : }
1105 28 : fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
1106 28 : Self::with_kind(
1107 28 : VerOrDeleteKind::Version,
1108 28 : v.last_modified,
1109 28 : v.version_id,
1110 28 : v.key,
1111 28 : )
1112 28 : }
1113 8 : fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
1114 8 : Self::with_kind(
1115 8 : VerOrDeleteKind::DeleteMarker,
1116 8 : v.last_modified,
1117 8 : v.version_id,
1118 8 : v.key,
1119 8 : )
1120 8 : }
1121 : }
1122 :
1123 : #[cfg(test)]
1124 : mod tests {
1125 : use std::num::NonZeroUsize;
1126 :
1127 : use camino::Utf8Path;
1128 :
1129 : use crate::{RemotePath, S3Bucket, S3Config};
1130 :
1131 : #[tokio::test]
1132 3 : async fn relative_path() {
1133 3 : let all_paths = ["", "some/path", "some/path/"];
1134 3 : let all_paths: Vec<RemotePath> = all_paths
1135 3 : .iter()
1136 9 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
1137 3 : .collect();
1138 3 : let prefixes = [
1139 3 : None,
1140 3 : Some(""),
1141 3 : Some("test/prefix"),
1142 3 : Some("test/prefix/"),
1143 3 : Some("/test/prefix/"),
1144 3 : ];
1145 3 : let expected_outputs = [
1146 3 : vec!["", "some/path", "some/path/"],
1147 3 : vec!["/", "/some/path", "/some/path/"],
1148 3 : vec![
1149 3 : "test/prefix/",
1150 3 : "test/prefix/some/path",
1151 3 : "test/prefix/some/path/",
1152 3 : ],
1153 3 : vec![
1154 3 : "test/prefix/",
1155 3 : "test/prefix/some/path",
1156 3 : "test/prefix/some/path/",
1157 3 : ],
1158 3 : vec![
1159 3 : "test/prefix/",
1160 3 : "test/prefix/some/path",
1161 3 : "test/prefix/some/path/",
1162 3 : ],
1163 3 : ];
1164 3 :
1165 15 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
1166 15 : let config = S3Config {
1167 15 : bucket_name: "bucket".to_owned(),
1168 15 : bucket_region: "region".to_owned(),
1169 15 : prefix_in_bucket: prefix.map(str::to_string),
1170 15 : endpoint: None,
1171 15 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
1172 15 : max_keys_per_list_response: Some(5),
1173 15 : upload_storage_class: None,
1174 15 : };
1175 15 : let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
1176 15 : .await
1177 15 : .expect("remote storage init");
1178 45 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
1179 45 : let result = storage.relative_path_to_s3_object(test_path);
1180 45 : let expected = expected_outputs[prefix_idx][test_path_idx];
1181 45 : assert_eq!(result, expected);
1182 3 : }
1183 3 : }
1184 3 : }
1185 : }
|