Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::{
8 : borrow::Cow,
9 : collections::HashMap,
10 : num::NonZeroU32,
11 : pin::Pin,
12 : sync::Arc,
13 : task::{Context, Poll},
14 : time::{Duration, SystemTime},
15 : };
16 :
17 : use anyhow::{anyhow, Context as _};
18 : use aws_config::{
19 : default_provider::credentials::DefaultCredentialsChain,
20 : retry::{RetryConfigBuilder, RetryMode},
21 : BehaviorVersion,
22 : };
23 : use aws_sdk_s3::{
24 : config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
25 : error::SdkError,
26 : operation::{get_object::GetObjectError, head_object::HeadObjectError},
27 : types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
28 : Client,
29 : };
30 : use aws_smithy_async::rt::sleep::TokioSleep;
31 : use http_body_util::StreamBody;
32 : use http_types::StatusCode;
33 :
34 : use aws_smithy_types::{body::SdkBody, DateTime};
35 : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
36 : use bytes::Bytes;
37 : use futures::stream::Stream;
38 : use futures_util::StreamExt;
39 : use hyper::body::Frame;
40 : use scopeguard::ScopeGuard;
41 : use tokio_util::sync::CancellationToken;
42 : use utils::backoff;
43 :
44 : use super::StorageMetadata;
45 : use crate::{
46 : config::S3Config,
47 : error::Cancelled,
48 : metrics::{start_counting_cancelled_wait, start_measuring_requests},
49 : support::PermitCarrying,
50 : ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
51 : RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
52 : REMOTE_STORAGE_PREFIX_SEPARATOR,
53 : };
54 :
55 : use crate::metrics::AttemptOutcome;
56 : pub(super) use crate::metrics::RequestKind;
57 :
58 : /// AWS S3 storage.
59 : pub struct S3Bucket {
60 : client: Client,
61 : bucket_name: String,
62 : prefix_in_bucket: Option<String>,
63 : max_keys_per_list_response: Option<i32>,
64 : upload_storage_class: Option<StorageClass>,
65 : concurrency_limiter: ConcurrencyLimiter,
66 : // Per-request timeout. Accessible for tests.
67 : pub timeout: Duration,
68 : }
69 :
70 : struct GetObjectRequest {
71 : bucket: String,
72 : key: String,
73 : etag: Option<String>,
74 : range: Option<String>,
75 : }
76 : impl S3Bucket {
77 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
78 41 : pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
79 41 : tracing::debug!(
80 0 : "Creating s3 remote storage for S3 bucket {}",
81 : remote_storage_config.bucket_name
82 : );
83 :
84 41 : let region = Region::new(remote_storage_config.bucket_region.clone());
85 41 : let region_opt = Some(region.clone());
86 :
87 : // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
88 : // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
89 : // Incomplete list of auth methods used by this:
90 : // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
91 : // * "AWS_PROFILE" / `aws sso login --profile <profile>`
92 : // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
93 : // * http (ECS/EKS) container credentials
94 : // * imds v2
95 41 : let credentials_provider = DefaultCredentialsChain::builder()
96 41 : .region(region)
97 41 : .build()
98 0 : .await;
99 :
100 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
101 41 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
102 41 :
103 41 : let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
104 41 : #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
105 41 : BehaviorVersion::v2023_11_09(),
106 41 : )
107 41 : .region(region_opt)
108 41 : .identity_cache(IdentityCache::lazy().build())
109 41 : .credentials_provider(credentials_provider)
110 41 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
111 41 :
112 41 : let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
113 41 : s.spawn(|| {
114 41 : // TODO: make this function async.
115 41 : tokio::runtime::Builder::new_current_thread()
116 41 : .enable_all()
117 41 : .build()
118 41 : .unwrap()
119 41 : .block_on(sdk_config_loader.load())
120 41 : })
121 41 : .join()
122 41 : .unwrap()
123 41 : });
124 41 :
125 41 : let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
126 :
127 : // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
128 : // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
129 41 : if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
130 0 : s3_config_builder = s3_config_builder
131 0 : .endpoint_url(custom_endpoint)
132 0 : .force_path_style(true);
133 41 : }
134 :
135 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
136 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
137 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
138 41 : let mut retry_config = RetryConfigBuilder::new();
139 41 : retry_config
140 41 : .set_max_attempts(Some(1))
141 41 : .set_mode(Some(RetryMode::Adaptive));
142 41 : s3_config_builder = s3_config_builder.retry_config(retry_config.build());
143 41 :
144 41 : let s3_config = s3_config_builder.build();
145 41 : let client = aws_sdk_s3::Client::from_conf(s3_config);
146 41 :
147 41 : let prefix_in_bucket = remote_storage_config
148 41 : .prefix_in_bucket
149 41 : .as_deref()
150 41 : .map(|prefix| {
151 38 : let mut prefix = prefix;
152 41 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
153 3 : prefix = &prefix[1..]
154 : }
155 :
156 38 : let mut prefix = prefix.to_string();
157 70 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
158 32 : prefix.pop();
159 32 : }
160 38 : prefix
161 41 : });
162 41 :
163 41 : Ok(Self {
164 41 : client,
165 41 : bucket_name: remote_storage_config.bucket_name.clone(),
166 41 : max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
167 41 : prefix_in_bucket,
168 41 : concurrency_limiter: ConcurrencyLimiter::new(
169 41 : remote_storage_config.concurrency_limit.get(),
170 41 : ),
171 41 : upload_storage_class: remote_storage_config.upload_storage_class.clone(),
172 41 : timeout,
173 41 : })
174 41 : }
175 :
176 518 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
177 518 : let relative_path =
178 518 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
179 518 : Some(stripped) => stripped,
180 : // we rely on AWS to return properly prefixed paths
181 : // for requests with a certain prefix
182 0 : None => panic!(
183 0 : "Key {} does not start with bucket prefix {:?}",
184 0 : key, self.prefix_in_bucket
185 0 : ),
186 : };
187 518 : RemotePath(
188 518 : relative_path
189 518 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
190 518 : .collect(),
191 518 : )
192 518 : }
193 :
194 553 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
195 553 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
196 553 : let path_string = path.get_path().as_str();
197 553 : match &self.prefix_in_bucket {
198 544 : Some(prefix) => prefix.clone() + "/" + path_string,
199 9 : None => path_string.to_string(),
200 : }
201 553 : }
202 :
203 470 : async fn permit(
204 470 : &self,
205 470 : kind: RequestKind,
206 470 : cancel: &CancellationToken,
207 470 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
208 470 : let started_at = start_counting_cancelled_wait(kind);
209 470 : let acquire = self.concurrency_limiter.acquire(kind);
210 :
211 470 : let permit = tokio::select! {
212 470 : permit = acquire => permit.expect("semaphore is never closed"),
213 470 : _ = cancel.cancelled() => return Err(Cancelled),
214 : };
215 :
216 470 : let started_at = ScopeGuard::into_inner(started_at);
217 470 : crate::metrics::BUCKET_METRICS
218 470 : .wait_seconds
219 470 : .observe_elapsed(kind, started_at);
220 470 :
221 470 : Ok(permit)
222 470 : }
223 :
224 32 : async fn owned_permit(
225 32 : &self,
226 32 : kind: RequestKind,
227 32 : cancel: &CancellationToken,
228 32 : ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
229 32 : let started_at = start_counting_cancelled_wait(kind);
230 32 : let acquire = self.concurrency_limiter.acquire_owned(kind);
231 :
232 32 : let permit = tokio::select! {
233 32 : permit = acquire => permit.expect("semaphore is never closed"),
234 32 : _ = cancel.cancelled() => return Err(Cancelled),
235 : };
236 :
237 32 : let started_at = ScopeGuard::into_inner(started_at);
238 32 : crate::metrics::BUCKET_METRICS
239 32 : .wait_seconds
240 32 : .observe_elapsed(kind, started_at);
241 32 : Ok(permit)
242 32 : }
243 :
244 32 : async fn download_object(
245 32 : &self,
246 32 : request: GetObjectRequest,
247 32 : cancel: &CancellationToken,
248 32 : ) -> Result<Download, DownloadError> {
249 32 : let kind = RequestKind::Get;
250 :
251 32 : let permit = self.owned_permit(kind, cancel).await?;
252 :
253 32 : let started_at = start_measuring_requests(kind);
254 32 :
255 32 : let mut builder = self
256 32 : .client
257 32 : .get_object()
258 32 : .bucket(request.bucket)
259 32 : .key(request.key)
260 32 : .set_range(request.range);
261 :
262 32 : if let Some(etag) = request.etag {
263 6 : builder = builder.if_none_match(etag);
264 26 : }
265 :
266 32 : let get_object = builder.send();
267 :
268 32 : let get_object = tokio::select! {
269 32 : res = get_object => res,
270 32 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
271 32 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
272 : };
273 :
274 32 : let started_at = ScopeGuard::into_inner(started_at);
275 :
276 28 : let object_output = match get_object {
277 28 : Ok(object_output) => object_output,
278 4 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
279 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
280 : // an error: we expect to sometimes fetch an object and find it missing,
281 : // e.g. when probing for timeline indices.
282 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
283 0 : kind,
284 0 : AttemptOutcome::Ok,
285 0 : started_at,
286 0 : );
287 0 : return Err(DownloadError::NotFound);
288 : }
289 4 : Err(SdkError::ServiceError(e))
290 : // aws_smithy_runtime_api::http::response::StatusCode isn't
291 : // re-exported by any aws crates, so just check the numeric
292 : // status against http_types::StatusCode instead of pulling it.
293 4 : if e.raw().status().as_u16() == StatusCode::NotModified =>
294 4 : {
295 4 : // Count an unmodified file as a success.
296 4 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
297 4 : kind,
298 4 : AttemptOutcome::Ok,
299 4 : started_at,
300 4 : );
301 4 : return Err(DownloadError::Unmodified);
302 : }
303 0 : Err(e) => {
304 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
305 0 : kind,
306 0 : AttemptOutcome::Err,
307 0 : started_at,
308 0 : );
309 0 :
310 0 : return Err(DownloadError::Other(
311 0 : anyhow::Error::new(e).context("download s3 object"),
312 0 : ));
313 : }
314 : };
315 :
316 : // even if we would have no timeout left, continue anyways. the caller can decide to ignore
317 : // the errors considering timeouts and cancellation.
318 28 : let remaining = self.timeout.saturating_sub(started_at.elapsed());
319 28 :
320 28 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
321 28 : let etag = object_output
322 28 : .e_tag
323 28 : .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
324 28 : .into();
325 28 : let last_modified = object_output
326 28 : .last_modified
327 28 : .ok_or(DownloadError::Other(anyhow::anyhow!(
328 28 : "Missing LastModified header"
329 28 : )))?
330 28 : .try_into()
331 28 : .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
332 :
333 28 : let body = object_output.body;
334 28 : let body = ByteStreamAsStream::from(body);
335 28 : let body = PermitCarrying::new(permit, body);
336 28 : let body = TimedDownload::new(started_at, body);
337 28 :
338 28 : let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
339 28 : let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
340 28 :
341 28 : Ok(Download {
342 28 : metadata,
343 28 : etag,
344 28 : last_modified,
345 28 : download_stream: Box::pin(body),
346 28 : })
347 32 : }
348 :
349 198 : async fn delete_oids(
350 198 : &self,
351 198 : _permit: &tokio::sync::SemaphorePermit<'_>,
352 198 : delete_objects: &[ObjectIdentifier],
353 198 : cancel: &CancellationToken,
354 198 : ) -> anyhow::Result<()> {
355 198 : let kind = RequestKind::Delete;
356 198 : let mut cancel = std::pin::pin!(cancel.cancelled());
357 :
358 198 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
359 198 : let started_at = start_measuring_requests(kind);
360 :
361 198 : let req = self
362 198 : .client
363 198 : .delete_objects()
364 198 : .bucket(self.bucket_name.clone())
365 198 : .delete(
366 198 : Delete::builder()
367 198 : .set_objects(Some(chunk.to_vec()))
368 198 : .build()
369 198 : .context("build request")?,
370 : )
371 198 : .send();
372 :
373 198 : let resp = tokio::select! {
374 198 : resp = req => resp,
375 198 : _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
376 198 : _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
377 : };
378 :
379 198 : let started_at = ScopeGuard::into_inner(started_at);
380 198 : crate::metrics::BUCKET_METRICS
381 198 : .req_seconds
382 198 : .observe_elapsed(kind, &resp, started_at);
383 :
384 198 : let resp = resp.context("request deletion")?;
385 198 : crate::metrics::BUCKET_METRICS
386 198 : .deleted_objects_total
387 198 : .inc_by(chunk.len() as u64);
388 :
389 198 : if let Some(errors) = resp.errors {
390 : // Log a bounded number of the errors within the response:
391 : // these requests can carry 1000 keys so logging each one
392 : // would be too verbose, especially as errors may lead us
393 : // to retry repeatedly.
394 : const LOG_UP_TO_N_ERRORS: usize = 10;
395 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
396 0 : tracing::warn!(
397 0 : "DeleteObjects key {} failed: {}: {}",
398 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
399 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
400 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
401 : );
402 : }
403 :
404 0 : return Err(anyhow::anyhow!(
405 0 : "Failed to delete {}/{} objects",
406 0 : errors.len(),
407 0 : chunk.len(),
408 0 : ));
409 198 : }
410 : }
411 198 : Ok(())
412 198 : }
413 :
414 6 : pub fn bucket_name(&self) -> &str {
415 6 : &self.bucket_name
416 6 : }
417 : }
418 :
419 : pin_project_lite::pin_project! {
420 : struct ByteStreamAsStream {
421 : #[pin]
422 : inner: aws_smithy_types::byte_stream::ByteStream
423 : }
424 : }
425 :
426 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
427 28 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
428 28 : ByteStreamAsStream { inner }
429 28 : }
430 : }
431 :
432 : impl Stream for ByteStreamAsStream {
433 : type Item = std::io::Result<Bytes>;
434 :
435 46 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
436 46 : // this does the std::io::ErrorKind::Other conversion
437 46 : self.project().inner.poll_next(cx).map_err(|x| x.into())
438 46 : }
439 :
440 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
441 : // sense and Stream::size_hint does not really
442 : }
443 :
444 : pin_project_lite::pin_project! {
445 : /// Times and tracks the outcome of the request.
446 : struct TimedDownload<S> {
447 : started_at: std::time::Instant,
448 : outcome: AttemptOutcome,
449 : #[pin]
450 : inner: S
451 : }
452 :
453 : impl<S> PinnedDrop for TimedDownload<S> {
454 : fn drop(mut this: Pin<&mut Self>) {
455 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
456 : }
457 : }
458 : }
459 :
460 : impl<S> TimedDownload<S> {
461 28 : fn new(started_at: std::time::Instant, inner: S) -> Self {
462 28 : TimedDownload {
463 28 : started_at,
464 28 : outcome: AttemptOutcome::Cancelled,
465 28 : inner,
466 28 : }
467 28 : }
468 : }
469 :
470 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
471 : type Item = <S as Stream>::Item;
472 :
473 46 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
474 : use std::task::ready;
475 :
476 46 : let this = self.project();
477 :
478 46 : let res = ready!(this.inner.poll_next(cx));
479 22 : match &res {
480 22 : Some(Ok(_)) => {}
481 0 : Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
482 18 : None => *this.outcome = AttemptOutcome::Ok,
483 : }
484 :
485 40 : Poll::Ready(res)
486 46 : }
487 :
488 0 : fn size_hint(&self) -> (usize, Option<usize>) {
489 0 : self.inner.size_hint()
490 0 : }
491 : }
492 :
493 : impl RemoteStorage for S3Bucket {
494 64 : fn list_streaming(
495 64 : &self,
496 64 : prefix: Option<&RemotePath>,
497 64 : mode: ListingMode,
498 64 : max_keys: Option<NonZeroU32>,
499 64 : cancel: &CancellationToken,
500 64 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
501 64 : let kind = RequestKind::List;
502 64 : // s3 sdk wants i32
503 64 : let mut max_keys = max_keys.map(|mk| mk.get() as i32);
504 64 :
505 64 : // get the passed prefix or if it is not set use prefix_in_bucket value
506 64 : let list_prefix = prefix
507 64 : .map(|p| self.relative_path_to_s3_object(p))
508 64 : .or_else(|| {
509 32 : self.prefix_in_bucket.clone().map(|mut s| {
510 32 : s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
511 32 : s
512 32 : })
513 64 : });
514 64 :
515 64 : async_stream::stream! {
516 64 : let _permit = self.permit(kind, cancel).await?;
517 64 :
518 64 : let mut continuation_token = None;
519 64 : 'outer: loop {
520 64 : let started_at = start_measuring_requests(kind);
521 64 :
522 64 : // min of two Options, returning Some if one is value and another is
523 64 : // None (None is smaller than anything, so plain min doesn't work).
524 64 : let request_max_keys = self
525 64 : .max_keys_per_list_response
526 64 : .into_iter()
527 64 : .chain(max_keys.into_iter())
528 64 : .min();
529 64 : let mut request = self
530 64 : .client
531 64 : .list_objects_v2()
532 64 : .bucket(self.bucket_name.clone())
533 64 : .set_prefix(list_prefix.clone())
534 64 : .set_continuation_token(continuation_token.clone())
535 64 : .set_max_keys(request_max_keys);
536 64 :
537 64 : if let ListingMode::WithDelimiter = mode {
538 64 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
539 64 : }
540 64 :
541 64 : let request = request.send();
542 64 :
543 64 : let response = tokio::select! {
544 64 : res = request => Ok(res),
545 64 : _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
546 64 : _ = cancel.cancelled() => Err(DownloadError::Cancelled),
547 64 : }?;
548 64 :
549 64 : let response = response
550 64 : .context("Failed to list S3 prefixes")
551 64 : .map_err(DownloadError::Other);
552 64 :
553 64 : let started_at = ScopeGuard::into_inner(started_at);
554 64 :
555 64 : crate::metrics::BUCKET_METRICS
556 64 : .req_seconds
557 64 : .observe_elapsed(kind, &response, started_at);
558 64 :
559 64 : let response = match response {
560 64 : Ok(response) => response,
561 64 : Err(e) => {
562 64 : // The error is potentially retryable, so we must rewind the loop after yielding.
563 64 : yield Err(e);
564 64 : continue 'outer;
565 64 : },
566 64 : };
567 64 :
568 64 : let keys = response.contents();
569 64 : let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
570 64 :
571 64 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
572 64 : let mut result = Listing::default();
573 64 :
574 64 : for object in keys {
575 64 : let key = object.key().expect("response does not contain a key");
576 64 : let key = self.s3_object_to_relative_path(key);
577 64 :
578 64 : let last_modified = match object.last_modified.map(SystemTime::try_from) {
579 64 : Some(Ok(t)) => t,
580 64 : Some(Err(_)) => {
581 64 : tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
582 64 : object.last_modified, key
583 64 : );
584 64 : SystemTime::now()
585 64 : },
586 64 : None => {
587 64 : SystemTime::now()
588 64 : }
589 64 : };
590 64 :
591 64 : let size = object.size.unwrap_or(0) as u64;
592 64 :
593 64 : result.keys.push(ListingObject{
594 64 : key,
595 64 : last_modified,
596 64 : size,
597 64 : });
598 64 : if let Some(mut mk) = max_keys {
599 64 : assert!(mk > 0);
600 64 : mk -= 1;
601 64 : if mk == 0 {
602 64 : // limit reached
603 64 : yield Ok(result);
604 64 : break 'outer;
605 64 : }
606 64 : max_keys = Some(mk);
607 64 : }
608 64 : }
609 64 :
610 64 : // S3 gives us prefixes like "foo/", we return them like "foo"
611 104 : result.prefixes.extend(prefixes.iter().filter_map(|o| {
612 104 : Some(
613 104 : self.s3_object_to_relative_path(
614 104 : o.prefix()?
615 104 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
616 64 : ),
617 64 : )
618 104 : }));
619 64 :
620 64 : yield Ok(result);
621 64 :
622 64 : continuation_token = match response.next_continuation_token {
623 64 : Some(new_token) => Some(new_token),
624 64 : None => break,
625 64 : };
626 64 : }
627 64 : }
628 64 : }
629 :
630 6 : async fn head_object(
631 6 : &self,
632 6 : key: &RemotePath,
633 6 : cancel: &CancellationToken,
634 6 : ) -> Result<ListingObject, DownloadError> {
635 6 : let kind = RequestKind::Head;
636 6 : let _permit = self.permit(kind, cancel).await?;
637 :
638 6 : let started_at = start_measuring_requests(kind);
639 6 :
640 6 : let head_future = self
641 6 : .client
642 6 : .head_object()
643 6 : .bucket(self.bucket_name())
644 6 : .key(self.relative_path_to_s3_object(key))
645 6 : .send();
646 6 :
647 6 : let head_future = tokio::time::timeout(self.timeout, head_future);
648 :
649 6 : let res = tokio::select! {
650 6 : res = head_future => res,
651 6 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
652 : };
653 :
654 6 : let res = res.map_err(|_e| DownloadError::Timeout)?;
655 :
656 : // do not incl. timeouts as errors in metrics but cancellations
657 6 : let started_at = ScopeGuard::into_inner(started_at);
658 6 : crate::metrics::BUCKET_METRICS
659 6 : .req_seconds
660 6 : .observe_elapsed(kind, &res, started_at);
661 :
662 4 : let data = match res {
663 4 : Ok(object_output) => object_output,
664 2 : Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
665 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
666 : // an error: we expect to sometimes fetch an object and find it missing,
667 : // e.g. when probing for timeline indices.
668 2 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
669 2 : kind,
670 2 : AttemptOutcome::Ok,
671 2 : started_at,
672 2 : );
673 2 : return Err(DownloadError::NotFound);
674 : }
675 0 : Err(e) => {
676 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
677 0 : kind,
678 0 : AttemptOutcome::Err,
679 0 : started_at,
680 0 : );
681 0 :
682 0 : return Err(DownloadError::Other(
683 0 : anyhow::Error::new(e).context("s3 head object"),
684 0 : ));
685 : }
686 : };
687 :
688 4 : let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
689 0 : return Err(DownloadError::Other(anyhow!(
690 0 : "head_object doesn't contain last_modified or content_length"
691 0 : )))?;
692 : };
693 : Ok(ListingObject {
694 4 : key: key.to_owned(),
695 4 : last_modified: SystemTime::try_from(last_modified).map_err(|e| {
696 0 : DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
697 4 : })?,
698 4 : size: size as u64,
699 : })
700 6 : }
701 :
702 198 : async fn upload(
703 198 : &self,
704 198 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
705 198 : from_size_bytes: usize,
706 198 : to: &RemotePath,
707 198 : metadata: Option<StorageMetadata>,
708 198 : cancel: &CancellationToken,
709 198 : ) -> anyhow::Result<()> {
710 198 : let kind = RequestKind::Put;
711 198 : let _permit = self.permit(kind, cancel).await?;
712 :
713 198 : let started_at = start_measuring_requests(kind);
714 198 :
715 710 : let body = StreamBody::new(from.map(|x| x.map(Frame::data)));
716 198 : let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body));
717 :
718 198 : let upload = self
719 198 : .client
720 198 : .put_object()
721 198 : .bucket(self.bucket_name.clone())
722 198 : .key(self.relative_path_to_s3_object(to))
723 198 : .set_metadata(metadata.map(|m| m.0))
724 198 : .set_storage_class(self.upload_storage_class.clone())
725 198 : .content_length(from_size_bytes.try_into()?)
726 198 : .body(bytes_stream)
727 198 : .send();
728 198 :
729 198 : let upload = tokio::time::timeout(self.timeout, upload);
730 :
731 198 : let res = tokio::select! {
732 198 : res = upload => res,
733 198 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
734 : };
735 :
736 198 : if let Ok(inner) = &res {
737 198 : // do not incl. timeouts as errors in metrics but cancellations
738 198 : let started_at = ScopeGuard::into_inner(started_at);
739 198 : crate::metrics::BUCKET_METRICS
740 198 : .req_seconds
741 198 : .observe_elapsed(kind, inner, started_at);
742 198 : }
743 :
744 198 : match res {
745 198 : Ok(Ok(_put)) => Ok(()),
746 0 : Ok(Err(sdk)) => Err(sdk.into()),
747 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
748 : }
749 198 : }
750 :
751 2 : async fn copy(
752 2 : &self,
753 2 : from: &RemotePath,
754 2 : to: &RemotePath,
755 2 : cancel: &CancellationToken,
756 2 : ) -> anyhow::Result<()> {
757 2 : let kind = RequestKind::Copy;
758 2 : let _permit = self.permit(kind, cancel).await?;
759 :
760 2 : let timeout = tokio::time::sleep(self.timeout);
761 2 :
762 2 : let started_at = start_measuring_requests(kind);
763 2 :
764 2 : // we need to specify bucket_name as a prefix
765 2 : let copy_source = format!(
766 2 : "{}/{}",
767 2 : self.bucket_name,
768 2 : self.relative_path_to_s3_object(from)
769 2 : );
770 2 :
771 2 : let op = self
772 2 : .client
773 2 : .copy_object()
774 2 : .bucket(self.bucket_name.clone())
775 2 : .key(self.relative_path_to_s3_object(to))
776 2 : .set_storage_class(self.upload_storage_class.clone())
777 2 : .copy_source(copy_source)
778 2 : .send();
779 :
780 2 : let res = tokio::select! {
781 2 : res = op => res,
782 2 : _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
783 2 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
784 : };
785 :
786 2 : let started_at = ScopeGuard::into_inner(started_at);
787 2 : crate::metrics::BUCKET_METRICS
788 2 : .req_seconds
789 2 : .observe_elapsed(kind, &res, started_at);
790 2 :
791 2 : res?;
792 :
793 2 : Ok(())
794 2 : }
795 :
796 32 : async fn download(
797 32 : &self,
798 32 : from: &RemotePath,
799 32 : opts: &DownloadOpts,
800 32 : cancel: &CancellationToken,
801 32 : ) -> Result<Download, DownloadError> {
802 32 : // if prefix is not none then download file `prefix/from`
803 32 : // if prefix is none then download file `from`
804 32 : self.download_object(
805 32 : GetObjectRequest {
806 32 : bucket: self.bucket_name.clone(),
807 32 : key: self.relative_path_to_s3_object(from),
808 32 : etag: opts.etag.as_ref().map(|e| e.to_string()),
809 32 : range: opts.byte_range_header(),
810 32 : },
811 32 : cancel,
812 32 : )
813 33 : .await
814 32 : }
815 :
816 194 : async fn delete_objects<'a>(
817 194 : &self,
818 194 : paths: &'a [RemotePath],
819 194 : cancel: &CancellationToken,
820 194 : ) -> anyhow::Result<()> {
821 194 : let kind = RequestKind::Delete;
822 194 : let permit = self.permit(kind, cancel).await?;
823 194 : let mut delete_objects = Vec::with_capacity(paths.len());
824 430 : for path in paths {
825 236 : let obj_id = ObjectIdentifier::builder()
826 236 : .set_key(Some(self.relative_path_to_s3_object(path)))
827 236 : .build()
828 236 : .context("convert path to oid")?;
829 236 : delete_objects.push(obj_id);
830 : }
831 :
832 654 : self.delete_oids(&permit, &delete_objects, cancel).await
833 194 : }
834 :
835 174 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
836 174 : let paths = std::array::from_ref(path);
837 580 : self.delete_objects(paths, cancel).await
838 174 : }
839 :
840 6 : async fn time_travel_recover(
841 6 : &self,
842 6 : prefix: Option<&RemotePath>,
843 6 : timestamp: SystemTime,
844 6 : done_if_after: SystemTime,
845 6 : cancel: &CancellationToken,
846 6 : ) -> Result<(), TimeTravelError> {
847 6 : let kind = RequestKind::TimeTravel;
848 6 : let permit = self.permit(kind, cancel).await?;
849 :
850 6 : let timestamp = DateTime::from(timestamp);
851 6 : let done_if_after = DateTime::from(done_if_after);
852 6 :
853 6 : tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
854 :
855 : // get the passed prefix or if it is not set use prefix_in_bucket value
856 6 : let prefix = prefix
857 6 : .map(|p| self.relative_path_to_s3_object(p))
858 6 : .or_else(|| self.prefix_in_bucket.clone());
859 6 :
860 6 : let warn_threshold = 3;
861 6 : let max_retries = 10;
862 6 : let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
863 :
864 6 : let mut key_marker = None;
865 6 : let mut version_id_marker = None;
866 6 : let mut versions_and_deletes = Vec::new();
867 :
868 : loop {
869 6 : let response = backoff::retry(
870 9 : || async {
871 9 : let op = self
872 9 : .client
873 9 : .list_object_versions()
874 9 : .bucket(self.bucket_name.clone())
875 9 : .set_prefix(prefix.clone())
876 9 : .set_key_marker(key_marker.clone())
877 9 : .set_version_id_marker(version_id_marker.clone())
878 9 : .send();
879 9 :
880 9 : tokio::select! {
881 9 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
882 9 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
883 : }
884 18 : },
885 6 : is_permanent,
886 6 : warn_threshold,
887 6 : max_retries,
888 6 : "listing object versions for time_travel_recover",
889 6 : cancel,
890 6 : )
891 46 : .await
892 6 : .ok_or_else(|| TimeTravelError::Cancelled)
893 6 : .and_then(|x| x)?;
894 :
895 6 : tracing::trace!(
896 0 : " Got List response version_id_marker={:?}, key_marker={:?}",
897 : response.version_id_marker,
898 : response.key_marker
899 : );
900 6 : let versions = response
901 6 : .versions
902 6 : .unwrap_or_default()
903 6 : .into_iter()
904 6 : .map(VerOrDelete::from_version);
905 6 : let deletes = response
906 6 : .delete_markers
907 6 : .unwrap_or_default()
908 6 : .into_iter()
909 6 : .map(VerOrDelete::from_delete_marker);
910 6 : itertools::process_results(versions.chain(deletes), |n_vds| {
911 6 : versions_and_deletes.extend(n_vds)
912 6 : })
913 6 : .map_err(TimeTravelError::Other)?;
914 12 : fn none_if_empty(v: Option<String>) -> Option<String> {
915 12 : v.filter(|v| !v.is_empty())
916 12 : }
917 6 : version_id_marker = none_if_empty(response.next_version_id_marker);
918 6 : key_marker = none_if_empty(response.next_key_marker);
919 6 : if version_id_marker.is_none() {
920 : // The final response is not supposed to be truncated
921 6 : if response.is_truncated.unwrap_or_default() {
922 0 : return Err(TimeTravelError::Other(anyhow::anyhow!(
923 0 : "Received truncated ListObjectVersions response for prefix={prefix:?}"
924 0 : )));
925 6 : }
926 6 : break;
927 0 : }
928 : // Limit the number of versions deletions, mostly so that we don't
929 : // keep requesting forever if the list is too long, as we'd put the
930 : // list in RAM.
931 : // Building a list of 100k entries that reaches the limit roughly takes
932 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
933 : const COMPLEXITY_LIMIT: usize = 100_000;
934 0 : if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
935 0 : return Err(TimeTravelError::TooManyVersions);
936 0 : }
937 : }
938 :
939 6 : tracing::info!(
940 0 : "Built list for time travel with {} versions and deletions",
941 0 : versions_and_deletes.len()
942 : );
943 :
944 : // Work on the list of references instead of the objects directly,
945 : // otherwise we get lifetime errors in the sort_by_key call below.
946 6 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
947 6 :
948 124 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
949 6 :
950 6 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
951 :
952 42 : for vd in &versions_and_deletes {
953 : let VerOrDelete {
954 36 : version_id, key, ..
955 36 : } = &vd;
956 36 : if version_id == "null" {
957 0 : return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
958 0 : indicating either disabled versioning, or legacy objects with null version id values")));
959 36 : }
960 36 : tracing::trace!(
961 0 : "Parsing version key={key} version_id={version_id} kind={:?}",
962 : vd.kind
963 : );
964 :
965 36 : vds_for_key.entry(key).or_default().push(vd);
966 : }
967 24 : for (key, versions) in vds_for_key {
968 18 : let last_vd = versions.last().unwrap();
969 18 : if last_vd.last_modified > done_if_after {
970 0 : tracing::trace!("Key {key} has version later than done_if_after, skipping");
971 0 : continue;
972 18 : }
973 : // the version we want to restore to.
974 18 : let version_to_restore_to =
975 36 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
976 0 : Ok(v) => v,
977 18 : Err(e) => e,
978 : };
979 18 : if version_to_restore_to == versions.len() {
980 6 : tracing::trace!("Key {key} has no changes since timestamp, skipping");
981 6 : continue;
982 12 : }
983 12 : let mut do_delete = false;
984 12 : if version_to_restore_to == 0 {
985 : // All versions more recent, so the key didn't exist at the specified time point.
986 6 : tracing::trace!(
987 0 : "All {} versions more recent for {key}, deleting",
988 0 : versions.len()
989 : );
990 6 : do_delete = true;
991 : } else {
992 6 : match &versions[version_to_restore_to - 1] {
993 : VerOrDelete {
994 : kind: VerOrDeleteKind::Version,
995 6 : version_id,
996 6 : ..
997 6 : } => {
998 6 : tracing::trace!("Copying old version {version_id} for {key}...");
999 : // Restore the state to the last version by copying
1000 6 : let source_id =
1001 6 : format!("{}/{key}?versionId={version_id}", self.bucket_name);
1002 6 :
1003 6 : backoff::retry(
1004 6 : || async {
1005 6 : let op = self
1006 6 : .client
1007 6 : .copy_object()
1008 6 : .bucket(self.bucket_name.clone())
1009 6 : .key(key)
1010 6 : .set_storage_class(self.upload_storage_class.clone())
1011 6 : .copy_source(&source_id)
1012 6 : .send();
1013 6 :
1014 6 : tokio::select! {
1015 6 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
1016 6 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
1017 : }
1018 12 : },
1019 6 : is_permanent,
1020 6 : warn_threshold,
1021 6 : max_retries,
1022 6 : "copying object version for time_travel_recover",
1023 6 : cancel,
1024 6 : )
1025 16 : .await
1026 6 : .ok_or_else(|| TimeTravelError::Cancelled)
1027 6 : .and_then(|x| x)?;
1028 6 : tracing::info!(%version_id, %key, "Copied old version in S3");
1029 : }
1030 : VerOrDelete {
1031 : kind: VerOrDeleteKind::DeleteMarker,
1032 : ..
1033 0 : } => {
1034 0 : do_delete = true;
1035 0 : }
1036 : }
1037 : };
1038 12 : if do_delete {
1039 6 : if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
1040 : // Key has since been deleted (but there was some history), no need to do anything
1041 2 : tracing::trace!("Key {key} already deleted, skipping.");
1042 : } else {
1043 4 : tracing::trace!("Deleting {key}...");
1044 :
1045 4 : let oid = ObjectIdentifier::builder()
1046 4 : .key(key.to_owned())
1047 4 : .build()
1048 4 : .map_err(|e| TimeTravelError::Other(e.into()))?;
1049 :
1050 4 : self.delete_oids(&permit, &[oid], cancel)
1051 8 : .await
1052 4 : .map_err(|e| {
1053 0 : // delete_oid0 will use TimeoutOrCancel
1054 0 : if TimeoutOrCancel::caused_by_cancel(&e) {
1055 0 : TimeTravelError::Cancelled
1056 : } else {
1057 0 : TimeTravelError::Other(e)
1058 : }
1059 4 : })?;
1060 : }
1061 6 : }
1062 : }
1063 6 : Ok(())
1064 6 : }
1065 : }
1066 :
1067 : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
1068 : struct VerOrDelete {
1069 : kind: VerOrDeleteKind,
1070 : last_modified: DateTime,
1071 : version_id: String,
1072 : key: String,
1073 : }
1074 :
1075 : #[derive(Debug)]
1076 : enum VerOrDeleteKind {
1077 : Version,
1078 : DeleteMarker,
1079 : }
1080 :
1081 : impl VerOrDelete {
1082 36 : fn with_kind(
1083 36 : kind: VerOrDeleteKind,
1084 36 : last_modified: Option<DateTime>,
1085 36 : version_id: Option<String>,
1086 36 : key: Option<String>,
1087 36 : ) -> anyhow::Result<Self> {
1088 36 : let lvk = (last_modified, version_id, key);
1089 36 : let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
1090 0 : anyhow::bail!(
1091 0 : "One (or more) of last_modified, key, and id is None. \
1092 0 : Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
1093 0 : lvk.0,
1094 0 : lvk.1,
1095 0 : lvk.2,
1096 0 : );
1097 : };
1098 36 : Ok(Self {
1099 36 : kind,
1100 36 : last_modified,
1101 36 : version_id,
1102 36 : key,
1103 36 : })
1104 36 : }
1105 28 : fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
1106 28 : Self::with_kind(
1107 28 : VerOrDeleteKind::Version,
1108 28 : v.last_modified,
1109 28 : v.version_id,
1110 28 : v.key,
1111 28 : )
1112 28 : }
1113 8 : fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
1114 8 : Self::with_kind(
1115 8 : VerOrDeleteKind::DeleteMarker,
1116 8 : v.last_modified,
1117 8 : v.version_id,
1118 8 : v.key,
1119 8 : )
1120 8 : }
1121 : }
1122 :
1123 : #[cfg(test)]
1124 : mod tests {
1125 : use camino::Utf8Path;
1126 : use std::num::NonZeroUsize;
1127 :
1128 : use crate::{RemotePath, S3Bucket, S3Config};
1129 :
1130 : #[tokio::test]
1131 3 : async fn relative_path() {
1132 3 : let all_paths = ["", "some/path", "some/path/"];
1133 3 : let all_paths: Vec<RemotePath> = all_paths
1134 3 : .iter()
1135 9 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
1136 3 : .collect();
1137 3 : let prefixes = [
1138 3 : None,
1139 3 : Some(""),
1140 3 : Some("test/prefix"),
1141 3 : Some("test/prefix/"),
1142 3 : Some("/test/prefix/"),
1143 3 : ];
1144 3 : let expected_outputs = [
1145 3 : vec!["", "some/path", "some/path/"],
1146 3 : vec!["/", "/some/path", "/some/path/"],
1147 3 : vec![
1148 3 : "test/prefix/",
1149 3 : "test/prefix/some/path",
1150 3 : "test/prefix/some/path/",
1151 3 : ],
1152 3 : vec![
1153 3 : "test/prefix/",
1154 3 : "test/prefix/some/path",
1155 3 : "test/prefix/some/path/",
1156 3 : ],
1157 3 : vec![
1158 3 : "test/prefix/",
1159 3 : "test/prefix/some/path",
1160 3 : "test/prefix/some/path/",
1161 3 : ],
1162 3 : ];
1163 3 :
1164 15 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
1165 15 : let config = S3Config {
1166 15 : bucket_name: "bucket".to_owned(),
1167 15 : bucket_region: "region".to_owned(),
1168 15 : prefix_in_bucket: prefix.map(str::to_string),
1169 15 : endpoint: None,
1170 15 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
1171 15 : max_keys_per_list_response: Some(5),
1172 15 : upload_storage_class: None,
1173 15 : };
1174 15 : let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
1175 3 : .await
1176 15 : .expect("remote storage init");
1177 45 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
1178 45 : let result = storage.relative_path_to_s3_object(test_path);
1179 45 : let expected = expected_outputs[prefix_idx][test_path_idx];
1180 45 : assert_eq!(result, expected);
1181 3 : }
1182 3 : }
1183 3 : }
1184 : }
|