Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::{
8 : borrow::Cow,
9 : collections::HashMap,
10 : num::NonZeroU32,
11 : pin::Pin,
12 : sync::Arc,
13 : task::{Context, Poll},
14 : time::{Duration, SystemTime},
15 : };
16 :
17 : use anyhow::{anyhow, Context as _};
18 : use aws_config::{
19 : default_provider::credentials::DefaultCredentialsChain,
20 : retry::{RetryConfigBuilder, RetryMode},
21 : BehaviorVersion,
22 : };
23 : use aws_sdk_s3::{
24 : config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
25 : error::SdkError,
26 : operation::{get_object::GetObjectError, head_object::HeadObjectError},
27 : types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
28 : Client,
29 : };
30 : use aws_smithy_async::rt::sleep::TokioSleep;
31 :
32 : use aws_smithy_types::{body::SdkBody, DateTime};
33 : use aws_smithy_types::{byte_stream::ByteStream, date_time::ConversionError};
34 : use bytes::Bytes;
35 : use futures::stream::Stream;
36 : use hyper::Body;
37 : use scopeguard::ScopeGuard;
38 : use tokio_util::sync::CancellationToken;
39 : use utils::backoff;
40 :
41 : use super::StorageMetadata;
42 : use crate::{
43 : config::S3Config,
44 : error::Cancelled,
45 : metrics::{start_counting_cancelled_wait, start_measuring_requests},
46 : support::PermitCarrying,
47 : ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath,
48 : RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
49 : REMOTE_STORAGE_PREFIX_SEPARATOR,
50 : };
51 :
52 : use crate::metrics::AttemptOutcome;
53 : pub(super) use crate::metrics::RequestKind;
54 :
55 : /// AWS S3 storage.
56 : pub struct S3Bucket {
57 : client: Client,
58 : bucket_name: String,
59 : prefix_in_bucket: Option<String>,
60 : max_keys_per_list_response: Option<i32>,
61 : upload_storage_class: Option<StorageClass>,
62 : concurrency_limiter: ConcurrencyLimiter,
63 : // Per-request timeout. Accessible for tests.
64 : pub timeout: Duration,
65 : }
66 :
67 : struct GetObjectRequest {
68 : bucket: String,
69 : key: String,
70 : range: Option<String>,
71 : }
72 : impl S3Bucket {
73 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
74 33 : pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
75 33 : tracing::debug!(
76 0 : "Creating s3 remote storage for S3 bucket {}",
77 : remote_storage_config.bucket_name
78 : );
79 :
80 33 : let region = Region::new(remote_storage_config.bucket_region.clone());
81 33 : let region_opt = Some(region.clone());
82 :
83 : // https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
84 : // https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
85 : // Incomplete list of auth methods used by this:
86 : // * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
87 : // * "AWS_PROFILE" / `aws sso login --profile <profile>`
88 : // * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
89 : // * http (ECS/EKS) container credentials
90 : // * imds v2
91 33 : let credentials_provider = DefaultCredentialsChain::builder()
92 33 : .region(region)
93 33 : .build()
94 0 : .await;
95 :
96 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
97 33 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
98 33 :
99 33 : let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
100 33 : #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
101 33 : BehaviorVersion::v2023_11_09(),
102 33 : )
103 33 : .region(region_opt)
104 33 : .identity_cache(IdentityCache::lazy().build())
105 33 : .credentials_provider(credentials_provider)
106 33 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
107 33 :
108 33 : let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
109 33 : s.spawn(|| {
110 33 : // TODO: make this function async.
111 33 : tokio::runtime::Builder::new_current_thread()
112 33 : .enable_all()
113 33 : .build()
114 33 : .unwrap()
115 33 : .block_on(sdk_config_loader.load())
116 33 : })
117 33 : .join()
118 33 : .unwrap()
119 33 : });
120 33 :
121 33 : let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
122 :
123 : // Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
124 : // (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
125 33 : if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
126 0 : s3_config_builder = s3_config_builder
127 0 : .endpoint_url(custom_endpoint)
128 0 : .force_path_style(true);
129 33 : }
130 :
131 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
132 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
133 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
134 33 : let mut retry_config = RetryConfigBuilder::new();
135 33 : retry_config
136 33 : .set_max_attempts(Some(1))
137 33 : .set_mode(Some(RetryMode::Adaptive));
138 33 : s3_config_builder = s3_config_builder.retry_config(retry_config.build());
139 33 :
140 33 : let s3_config = s3_config_builder.build();
141 33 : let client = aws_sdk_s3::Client::from_conf(s3_config);
142 33 :
143 33 : let prefix_in_bucket = remote_storage_config
144 33 : .prefix_in_bucket
145 33 : .as_deref()
146 33 : .map(|prefix| {
147 30 : let mut prefix = prefix;
148 33 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
149 3 : prefix = &prefix[1..]
150 : }
151 :
152 30 : let mut prefix = prefix.to_string();
153 54 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
154 24 : prefix.pop();
155 24 : }
156 30 : prefix
157 33 : });
158 33 :
159 33 : Ok(Self {
160 33 : client,
161 33 : bucket_name: remote_storage_config.bucket_name.clone(),
162 33 : max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
163 33 : prefix_in_bucket,
164 33 : concurrency_limiter: ConcurrencyLimiter::new(
165 33 : remote_storage_config.concurrency_limit.get(),
166 33 : ),
167 33 : upload_storage_class: remote_storage_config.upload_storage_class.clone(),
168 33 : timeout,
169 33 : })
170 33 : }
171 :
172 168 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
173 168 : let relative_path =
174 168 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
175 168 : Some(stripped) => stripped,
176 : // we rely on AWS to return properly prefixed paths
177 : // for requests with a certain prefix
178 0 : None => panic!(
179 0 : "Key {} does not start with bucket prefix {:?}",
180 0 : key, self.prefix_in_bucket
181 0 : ),
182 : };
183 168 : RemotePath(
184 168 : relative_path
185 168 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
186 168 : .collect(),
187 168 : )
188 168 : }
189 :
190 296 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
191 296 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
192 296 : let path_string = path.get_path().as_str();
193 296 : match &self.prefix_in_bucket {
194 287 : Some(prefix) => prefix.clone() + "/" + path_string,
195 9 : None => path_string.to_string(),
196 : }
197 296 : }
198 :
199 242 : async fn permit(
200 242 : &self,
201 242 : kind: RequestKind,
202 242 : cancel: &CancellationToken,
203 242 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
204 242 : let started_at = start_counting_cancelled_wait(kind);
205 242 : let acquire = self.concurrency_limiter.acquire(kind);
206 :
207 242 : let permit = tokio::select! {
208 242 : permit = acquire => permit.expect("semaphore is never closed"),
209 242 : _ = cancel.cancelled() => return Err(Cancelled),
210 : };
211 :
212 242 : let started_at = ScopeGuard::into_inner(started_at);
213 242 : crate::metrics::BUCKET_METRICS
214 242 : .wait_seconds
215 242 : .observe_elapsed(kind, started_at);
216 242 :
217 242 : Ok(permit)
218 242 : }
219 :
220 25 : async fn owned_permit(
221 25 : &self,
222 25 : kind: RequestKind,
223 25 : cancel: &CancellationToken,
224 25 : ) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
225 25 : let started_at = start_counting_cancelled_wait(kind);
226 25 : let acquire = self.concurrency_limiter.acquire_owned(kind);
227 :
228 25 : let permit = tokio::select! {
229 25 : permit = acquire => permit.expect("semaphore is never closed"),
230 25 : _ = cancel.cancelled() => return Err(Cancelled),
231 : };
232 :
233 25 : let started_at = ScopeGuard::into_inner(started_at);
234 25 : crate::metrics::BUCKET_METRICS
235 25 : .wait_seconds
236 25 : .observe_elapsed(kind, started_at);
237 25 : Ok(permit)
238 25 : }
239 :
240 25 : async fn download_object(
241 25 : &self,
242 25 : request: GetObjectRequest,
243 25 : cancel: &CancellationToken,
244 25 : ) -> Result<Download, DownloadError> {
245 25 : let kind = RequestKind::Get;
246 :
247 25 : let permit = self.owned_permit(kind, cancel).await?;
248 :
249 25 : let started_at = start_measuring_requests(kind);
250 25 :
251 25 : let get_object = self
252 25 : .client
253 25 : .get_object()
254 25 : .bucket(request.bucket)
255 25 : .key(request.key)
256 25 : .set_range(request.range)
257 25 : .send();
258 :
259 25 : let get_object = tokio::select! {
260 25 : res = get_object => res,
261 25 : _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
262 25 : _ = cancel.cancelled() => return Err(DownloadError::Cancelled),
263 : };
264 :
265 25 : let started_at = ScopeGuard::into_inner(started_at);
266 :
267 24 : let object_output = match get_object {
268 24 : Ok(object_output) => object_output,
269 0 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
270 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
271 : // an error: we expect to sometimes fetch an object and find it missing,
272 : // e.g. when probing for timeline indices.
273 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
274 0 : kind,
275 0 : AttemptOutcome::Ok,
276 0 : started_at,
277 0 : );
278 0 : return Err(DownloadError::NotFound);
279 : }
280 1 : Err(e) => {
281 1 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
282 1 : kind,
283 1 : AttemptOutcome::Err,
284 1 : started_at,
285 1 : );
286 1 :
287 1 : return Err(DownloadError::Other(
288 1 : anyhow::Error::new(e).context("download s3 object"),
289 1 : ));
290 : }
291 : };
292 :
293 : // even if we would have no timeout left, continue anyways. the caller can decide to ignore
294 : // the errors considering timeouts and cancellation.
295 24 : let remaining = self.timeout.saturating_sub(started_at.elapsed());
296 24 :
297 24 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
298 24 : let etag = object_output
299 24 : .e_tag
300 24 : .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
301 24 : .into();
302 24 : let last_modified = object_output
303 24 : .last_modified
304 24 : .ok_or(DownloadError::Other(anyhow::anyhow!(
305 24 : "Missing LastModified header"
306 24 : )))?
307 24 : .try_into()
308 24 : .map_err(|e: ConversionError| DownloadError::Other(e.into()))?;
309 :
310 24 : let body = object_output.body;
311 24 : let body = ByteStreamAsStream::from(body);
312 24 : let body = PermitCarrying::new(permit, body);
313 24 : let body = TimedDownload::new(started_at, body);
314 24 :
315 24 : let cancel_or_timeout = crate::support::cancel_or_timeout(remaining, cancel.clone());
316 24 : let body = crate::support::DownloadStream::new(cancel_or_timeout, body);
317 24 :
318 24 : Ok(Download {
319 24 : metadata,
320 24 : etag,
321 24 : last_modified,
322 24 : download_stream: Box::pin(body),
323 24 : })
324 25 : }
325 :
326 106 : async fn delete_oids(
327 106 : &self,
328 106 : _permit: &tokio::sync::SemaphorePermit<'_>,
329 106 : delete_objects: &[ObjectIdentifier],
330 106 : cancel: &CancellationToken,
331 106 : ) -> anyhow::Result<()> {
332 106 : let kind = RequestKind::Delete;
333 106 : let mut cancel = std::pin::pin!(cancel.cancelled());
334 :
335 106 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
336 106 : let started_at = start_measuring_requests(kind);
337 :
338 106 : let req = self
339 106 : .client
340 106 : .delete_objects()
341 106 : .bucket(self.bucket_name.clone())
342 106 : .delete(
343 106 : Delete::builder()
344 106 : .set_objects(Some(chunk.to_vec()))
345 106 : .build()
346 106 : .context("build request")?,
347 : )
348 106 : .send();
349 :
350 106 : let resp = tokio::select! {
351 106 : resp = req => resp,
352 106 : _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
353 106 : _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
354 : };
355 :
356 106 : let started_at = ScopeGuard::into_inner(started_at);
357 106 : crate::metrics::BUCKET_METRICS
358 106 : .req_seconds
359 106 : .observe_elapsed(kind, &resp, started_at);
360 :
361 106 : let resp = resp.context("request deletion")?;
362 106 : crate::metrics::BUCKET_METRICS
363 106 : .deleted_objects_total
364 106 : .inc_by(chunk.len() as u64);
365 :
366 106 : if let Some(errors) = resp.errors {
367 : // Log a bounded number of the errors within the response:
368 : // these requests can carry 1000 keys so logging each one
369 : // would be too verbose, especially as errors may lead us
370 : // to retry repeatedly.
371 : const LOG_UP_TO_N_ERRORS: usize = 10;
372 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
373 0 : tracing::warn!(
374 0 : "DeleteObjects key {} failed: {}: {}",
375 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
376 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
377 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
378 : );
379 : }
380 :
381 0 : return Err(anyhow::anyhow!(
382 0 : "Failed to delete {}/{} objects",
383 0 : errors.len(),
384 0 : chunk.len(),
385 0 : ));
386 106 : }
387 : }
388 106 : Ok(())
389 106 : }
390 :
391 0 : pub fn bucket_name(&self) -> &str {
392 0 : &self.bucket_name
393 0 : }
394 : }
395 :
396 : pin_project_lite::pin_project! {
397 : struct ByteStreamAsStream {
398 : #[pin]
399 : inner: aws_smithy_types::byte_stream::ByteStream
400 : }
401 : }
402 :
403 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
404 24 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
405 24 : ByteStreamAsStream { inner }
406 24 : }
407 : }
408 :
409 : impl Stream for ByteStreamAsStream {
410 : type Item = std::io::Result<Bytes>;
411 :
412 49 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
413 49 : // this does the std::io::ErrorKind::Other conversion
414 49 : self.project().inner.poll_next(cx).map_err(|x| x.into())
415 49 : }
416 :
417 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
418 : // sense and Stream::size_hint does not really
419 : }
420 :
421 : pin_project_lite::pin_project! {
422 : /// Times and tracks the outcome of the request.
423 : struct TimedDownload<S> {
424 : started_at: std::time::Instant,
425 : outcome: AttemptOutcome,
426 : #[pin]
427 : inner: S
428 : }
429 :
430 : impl<S> PinnedDrop for TimedDownload<S> {
431 : fn drop(mut this: Pin<&mut Self>) {
432 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
433 : }
434 : }
435 : }
436 :
437 : impl<S> TimedDownload<S> {
438 24 : fn new(started_at: std::time::Instant, inner: S) -> Self {
439 24 : TimedDownload {
440 24 : started_at,
441 24 : outcome: AttemptOutcome::Cancelled,
442 24 : inner,
443 24 : }
444 24 : }
445 : }
446 :
447 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
448 : type Item = <S as Stream>::Item;
449 :
450 49 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
451 : use std::task::ready;
452 :
453 49 : let this = self.project();
454 :
455 49 : let res = ready!(this.inner.poll_next(cx));
456 22 : match &res {
457 22 : Some(Ok(_)) => {}
458 0 : Some(Err(_)) => *this.outcome = AttemptOutcome::Err,
459 18 : None => *this.outcome = AttemptOutcome::Ok,
460 : }
461 :
462 40 : Poll::Ready(res)
463 49 : }
464 :
465 0 : fn size_hint(&self) -> (usize, Option<usize>) {
466 0 : self.inner.size_hint()
467 0 : }
468 : }
469 :
470 : impl RemoteStorage for S3Bucket {
471 26 : fn list_streaming(
472 26 : &self,
473 26 : prefix: Option<&RemotePath>,
474 26 : mode: ListingMode,
475 26 : max_keys: Option<NonZeroU32>,
476 26 : cancel: &CancellationToken,
477 26 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
478 26 : let kind = RequestKind::List;
479 26 : // s3 sdk wants i32
480 26 : let mut max_keys = max_keys.map(|mk| mk.get() as i32);
481 26 :
482 26 : // get the passed prefix or if it is not set use prefix_in_bucket value
483 26 : let list_prefix = prefix
484 26 : .map(|p| self.relative_path_to_s3_object(p))
485 26 : .or_else(|| {
486 20 : self.prefix_in_bucket.clone().map(|mut s| {
487 20 : s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
488 20 : s
489 20 : })
490 26 : });
491 26 :
492 26 : async_stream::stream! {
493 26 : let _permit = self.permit(kind, cancel).await?;
494 26 :
495 26 : let mut continuation_token = None;
496 26 : 'outer: loop {
497 26 : let started_at = start_measuring_requests(kind);
498 26 :
499 26 : // min of two Options, returning Some if one is value and another is
500 26 : // None (None is smaller than anything, so plain min doesn't work).
501 26 : let request_max_keys = self
502 26 : .max_keys_per_list_response
503 26 : .into_iter()
504 26 : .chain(max_keys.into_iter())
505 26 : .min();
506 26 : let mut request = self
507 26 : .client
508 26 : .list_objects_v2()
509 26 : .bucket(self.bucket_name.clone())
510 26 : .set_prefix(list_prefix.clone())
511 26 : .set_continuation_token(continuation_token.clone())
512 26 : .set_max_keys(request_max_keys);
513 26 :
514 26 : if let ListingMode::WithDelimiter = mode {
515 26 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
516 26 : }
517 26 :
518 26 : let request = request.send();
519 26 :
520 26 : let response = tokio::select! {
521 26 : res = request => Ok(res),
522 26 : _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
523 26 : _ = cancel.cancelled() => Err(DownloadError::Cancelled),
524 26 : }?;
525 26 :
526 26 : let response = response
527 26 : .context("Failed to list S3 prefixes")
528 26 : .map_err(DownloadError::Other);
529 26 :
530 26 : let started_at = ScopeGuard::into_inner(started_at);
531 26 :
532 26 : crate::metrics::BUCKET_METRICS
533 26 : .req_seconds
534 26 : .observe_elapsed(kind, &response, started_at);
535 26 :
536 26 : let response = match response {
537 26 : Ok(response) => response,
538 26 : Err(e) => {
539 26 : // The error is potentially retryable, so we must rewind the loop after yielding.
540 26 : yield Err(e);
541 26 : continue 'outer;
542 26 : },
543 26 : };
544 26 :
545 26 : let keys = response.contents();
546 26 : let prefixes = response.common_prefixes.as_deref().unwrap_or_default();
547 26 :
548 26 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
549 26 : let mut result = Listing::default();
550 26 :
551 26 : for object in keys {
552 26 : let key = object.key().expect("response does not contain a key");
553 26 : let key = self.s3_object_to_relative_path(key);
554 26 :
555 26 : let last_modified = match object.last_modified.map(SystemTime::try_from) {
556 26 : Some(Ok(t)) => t,
557 26 : Some(Err(_)) => {
558 26 : tracing::warn!("Remote storage last_modified {:?} for {} is out of bounds",
559 26 : object.last_modified, key
560 26 : );
561 26 : SystemTime::now()
562 26 : },
563 26 : None => {
564 26 : SystemTime::now()
565 26 : }
566 26 : };
567 26 :
568 26 : let size = object.size.unwrap_or(0) as u64;
569 26 :
570 26 : result.keys.push(ListingObject{
571 26 : key,
572 26 : last_modified,
573 26 : size,
574 26 : });
575 26 : if let Some(mut mk) = max_keys {
576 26 : assert!(mk > 0);
577 26 : mk -= 1;
578 26 : if mk == 0 {
579 26 : // limit reached
580 26 : yield Ok(result);
581 26 : break 'outer;
582 26 : }
583 26 : max_keys = Some(mk);
584 26 : }
585 26 : }
586 26 :
587 26 : // S3 gives us prefixes like "foo/", we return them like "foo"
588 88 : result.prefixes.extend(prefixes.iter().filter_map(|o| {
589 88 : Some(
590 88 : self.s3_object_to_relative_path(
591 88 : o.prefix()?
592 88 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR),
593 26 : ),
594 26 : )
595 88 : }));
596 26 :
597 26 : yield Ok(result);
598 26 :
599 26 : continuation_token = match response.next_continuation_token {
600 26 : Some(new_token) => Some(new_token),
601 26 : None => break,
602 26 : };
603 26 : }
604 26 : }
605 26 : }
606 :
607 0 : async fn head_object(
608 0 : &self,
609 0 : key: &RemotePath,
610 0 : cancel: &CancellationToken,
611 0 : ) -> Result<ListingObject, DownloadError> {
612 0 : let kind = RequestKind::Head;
613 0 : let _permit = self.permit(kind, cancel).await?;
614 :
615 0 : let started_at = start_measuring_requests(kind);
616 0 :
617 0 : let head_future = self
618 0 : .client
619 0 : .head_object()
620 0 : .bucket(self.bucket_name())
621 0 : .key(self.relative_path_to_s3_object(key))
622 0 : .send();
623 0 :
624 0 : let head_future = tokio::time::timeout(self.timeout, head_future);
625 :
626 0 : let res = tokio::select! {
627 0 : res = head_future => res,
628 0 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
629 : };
630 :
631 0 : let res = res.map_err(|_e| DownloadError::Timeout)?;
632 :
633 : // do not incl. timeouts as errors in metrics but cancellations
634 0 : let started_at = ScopeGuard::into_inner(started_at);
635 0 : crate::metrics::BUCKET_METRICS
636 0 : .req_seconds
637 0 : .observe_elapsed(kind, &res, started_at);
638 :
639 0 : let data = match res {
640 0 : Ok(object_output) => object_output,
641 0 : Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
642 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
643 : // an error: we expect to sometimes fetch an object and find it missing,
644 : // e.g. when probing for timeline indices.
645 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
646 0 : kind,
647 0 : AttemptOutcome::Ok,
648 0 : started_at,
649 0 : );
650 0 : return Err(DownloadError::NotFound);
651 : }
652 0 : Err(e) => {
653 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
654 0 : kind,
655 0 : AttemptOutcome::Err,
656 0 : started_at,
657 0 : );
658 0 :
659 0 : return Err(DownloadError::Other(
660 0 : anyhow::Error::new(e).context("s3 head object"),
661 0 : ));
662 : }
663 : };
664 :
665 0 : let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
666 0 : return Err(DownloadError::Other(anyhow!(
667 0 : "head_object doesn't contain last_modified or content_length"
668 0 : )))?;
669 : };
670 : Ok(ListingObject {
671 0 : key: key.to_owned(),
672 0 : last_modified: SystemTime::try_from(last_modified).map_err(|e| {
673 0 : DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
674 0 : })?,
675 0 : size: size as u64,
676 : })
677 0 : }
678 :
679 106 : async fn upload(
680 106 : &self,
681 106 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
682 106 : from_size_bytes: usize,
683 106 : to: &RemotePath,
684 106 : metadata: Option<StorageMetadata>,
685 106 : cancel: &CancellationToken,
686 106 : ) -> anyhow::Result<()> {
687 106 : let kind = RequestKind::Put;
688 106 : let _permit = self.permit(kind, cancel).await?;
689 :
690 106 : let started_at = start_measuring_requests(kind);
691 106 :
692 106 : let body = Body::wrap_stream(from);
693 106 : let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
694 :
695 106 : let upload = self
696 106 : .client
697 106 : .put_object()
698 106 : .bucket(self.bucket_name.clone())
699 106 : .key(self.relative_path_to_s3_object(to))
700 106 : .set_metadata(metadata.map(|m| m.0))
701 106 : .set_storage_class(self.upload_storage_class.clone())
702 106 : .content_length(from_size_bytes.try_into()?)
703 106 : .body(bytes_stream)
704 106 : .send();
705 106 :
706 106 : let upload = tokio::time::timeout(self.timeout, upload);
707 :
708 106 : let res = tokio::select! {
709 106 : res = upload => res,
710 106 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
711 : };
712 :
713 106 : if let Ok(inner) = &res {
714 106 : // do not incl. timeouts as errors in metrics but cancellations
715 106 : let started_at = ScopeGuard::into_inner(started_at);
716 106 : crate::metrics::BUCKET_METRICS
717 106 : .req_seconds
718 106 : .observe_elapsed(kind, inner, started_at);
719 106 : }
720 :
721 106 : match res {
722 106 : Ok(Ok(_put)) => Ok(()),
723 0 : Ok(Err(sdk)) => Err(sdk.into()),
724 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
725 : }
726 106 : }
727 :
728 2 : async fn copy(
729 2 : &self,
730 2 : from: &RemotePath,
731 2 : to: &RemotePath,
732 2 : cancel: &CancellationToken,
733 2 : ) -> anyhow::Result<()> {
734 2 : let kind = RequestKind::Copy;
735 2 : let _permit = self.permit(kind, cancel).await?;
736 :
737 2 : let timeout = tokio::time::sleep(self.timeout);
738 2 :
739 2 : let started_at = start_measuring_requests(kind);
740 2 :
741 2 : // we need to specify bucket_name as a prefix
742 2 : let copy_source = format!(
743 2 : "{}/{}",
744 2 : self.bucket_name,
745 2 : self.relative_path_to_s3_object(from)
746 2 : );
747 2 :
748 2 : let op = self
749 2 : .client
750 2 : .copy_object()
751 2 : .bucket(self.bucket_name.clone())
752 2 : .key(self.relative_path_to_s3_object(to))
753 2 : .set_storage_class(self.upload_storage_class.clone())
754 2 : .copy_source(copy_source)
755 2 : .send();
756 :
757 2 : let res = tokio::select! {
758 2 : res = op => res,
759 2 : _ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
760 2 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
761 : };
762 :
763 2 : let started_at = ScopeGuard::into_inner(started_at);
764 2 : crate::metrics::BUCKET_METRICS
765 2 : .req_seconds
766 2 : .observe_elapsed(kind, &res, started_at);
767 2 :
768 2 : res?;
769 :
770 2 : Ok(())
771 2 : }
772 :
773 15 : async fn download(
774 15 : &self,
775 15 : from: &RemotePath,
776 15 : cancel: &CancellationToken,
777 15 : ) -> Result<Download, DownloadError> {
778 15 : // if prefix is not none then download file `prefix/from`
779 15 : // if prefix is none then download file `from`
780 15 : self.download_object(
781 15 : GetObjectRequest {
782 15 : bucket: self.bucket_name.clone(),
783 15 : key: self.relative_path_to_s3_object(from),
784 15 : range: None,
785 15 : },
786 15 : cancel,
787 15 : )
788 22 : .await
789 15 : }
790 :
791 10 : async fn download_byte_range(
792 10 : &self,
793 10 : from: &RemotePath,
794 10 : start_inclusive: u64,
795 10 : end_exclusive: Option<u64>,
796 10 : cancel: &CancellationToken,
797 10 : ) -> Result<Download, DownloadError> {
798 10 : // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
799 10 : // and needs both ends to be exclusive
800 10 : let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
801 10 : let range = Some(match end_inclusive {
802 6 : Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
803 4 : None => format!("bytes={start_inclusive}-"),
804 : });
805 :
806 10 : self.download_object(
807 10 : GetObjectRequest {
808 10 : bucket: self.bucket_name.clone(),
809 10 : key: self.relative_path_to_s3_object(from),
810 10 : range,
811 10 : },
812 10 : cancel,
813 10 : )
814 10 : .await
815 10 : }
816 :
817 102 : async fn delete_objects<'a>(
818 102 : &self,
819 102 : paths: &'a [RemotePath],
820 102 : cancel: &CancellationToken,
821 102 : ) -> anyhow::Result<()> {
822 102 : let kind = RequestKind::Delete;
823 102 : let permit = self.permit(kind, cancel).await?;
824 102 : let mut delete_objects = Vec::with_capacity(paths.len());
825 212 : for path in paths {
826 110 : let obj_id = ObjectIdentifier::builder()
827 110 : .set_key(Some(self.relative_path_to_s3_object(path)))
828 110 : .build()
829 110 : .context("convert path to oid")?;
830 110 : delete_objects.push(obj_id);
831 : }
832 :
833 340 : self.delete_oids(&permit, &delete_objects, cancel).await
834 102 : }
835 :
836 90 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
837 90 : let paths = std::array::from_ref(path);
838 285 : self.delete_objects(paths, cancel).await
839 90 : }
840 :
841 6 : async fn time_travel_recover(
842 6 : &self,
843 6 : prefix: Option<&RemotePath>,
844 6 : timestamp: SystemTime,
845 6 : done_if_after: SystemTime,
846 6 : cancel: &CancellationToken,
847 6 : ) -> Result<(), TimeTravelError> {
848 6 : let kind = RequestKind::TimeTravel;
849 6 : let permit = self.permit(kind, cancel).await?;
850 :
851 6 : let timestamp = DateTime::from(timestamp);
852 6 : let done_if_after = DateTime::from(done_if_after);
853 6 :
854 6 : tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
855 :
856 : // get the passed prefix or if it is not set use prefix_in_bucket value
857 6 : let prefix = prefix
858 6 : .map(|p| self.relative_path_to_s3_object(p))
859 6 : .or_else(|| self.prefix_in_bucket.clone());
860 6 :
861 6 : let warn_threshold = 3;
862 6 : let max_retries = 10;
863 6 : let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
864 :
865 6 : let mut key_marker = None;
866 6 : let mut version_id_marker = None;
867 6 : let mut versions_and_deletes = Vec::new();
868 :
869 : loop {
870 6 : let response = backoff::retry(
871 7 : || async {
872 7 : let op = self
873 7 : .client
874 7 : .list_object_versions()
875 7 : .bucket(self.bucket_name.clone())
876 7 : .set_prefix(prefix.clone())
877 7 : .set_key_marker(key_marker.clone())
878 7 : .set_version_id_marker(version_id_marker.clone())
879 7 : .send();
880 7 :
881 7 : tokio::select! {
882 7 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
883 7 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
884 : }
885 14 : },
886 6 : is_permanent,
887 6 : warn_threshold,
888 6 : max_retries,
889 6 : "listing object versions for time_travel_recover",
890 6 : cancel,
891 6 : )
892 42 : .await
893 6 : .ok_or_else(|| TimeTravelError::Cancelled)
894 6 : .and_then(|x| x)?;
895 :
896 6 : tracing::trace!(
897 0 : " Got List response version_id_marker={:?}, key_marker={:?}",
898 : response.version_id_marker,
899 : response.key_marker
900 : );
901 6 : let versions = response
902 6 : .versions
903 6 : .unwrap_or_default()
904 6 : .into_iter()
905 6 : .map(VerOrDelete::from_version);
906 6 : let deletes = response
907 6 : .delete_markers
908 6 : .unwrap_or_default()
909 6 : .into_iter()
910 6 : .map(VerOrDelete::from_delete_marker);
911 6 : itertools::process_results(versions.chain(deletes), |n_vds| {
912 6 : versions_and_deletes.extend(n_vds)
913 6 : })
914 6 : .map_err(TimeTravelError::Other)?;
915 12 : fn none_if_empty(v: Option<String>) -> Option<String> {
916 12 : v.filter(|v| !v.is_empty())
917 12 : }
918 6 : version_id_marker = none_if_empty(response.next_version_id_marker);
919 6 : key_marker = none_if_empty(response.next_key_marker);
920 6 : if version_id_marker.is_none() {
921 : // The final response is not supposed to be truncated
922 6 : if response.is_truncated.unwrap_or_default() {
923 0 : return Err(TimeTravelError::Other(anyhow::anyhow!(
924 0 : "Received truncated ListObjectVersions response for prefix={prefix:?}"
925 0 : )));
926 6 : }
927 6 : break;
928 0 : }
929 : // Limit the number of versions deletions, mostly so that we don't
930 : // keep requesting forever if the list is too long, as we'd put the
931 : // list in RAM.
932 : // Building a list of 100k entries that reaches the limit roughly takes
933 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
934 : const COMPLEXITY_LIMIT: usize = 100_000;
935 0 : if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
936 0 : return Err(TimeTravelError::TooManyVersions);
937 0 : }
938 : }
939 :
940 6 : tracing::info!(
941 0 : "Built list for time travel with {} versions and deletions",
942 0 : versions_and_deletes.len()
943 : );
944 :
945 : // Work on the list of references instead of the objects directly,
946 : // otherwise we get lifetime errors in the sort_by_key call below.
947 6 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
948 6 :
949 124 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
950 6 :
951 6 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
952 :
953 42 : for vd in &versions_and_deletes {
954 : let VerOrDelete {
955 36 : version_id, key, ..
956 36 : } = &vd;
957 36 : if version_id == "null" {
958 0 : return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
959 0 : indicating either disabled versioning, or legacy objects with null version id values")));
960 36 : }
961 36 : tracing::trace!(
962 0 : "Parsing version key={key} version_id={version_id} kind={:?}",
963 : vd.kind
964 : );
965 :
966 36 : vds_for_key.entry(key).or_default().push(vd);
967 : }
968 24 : for (key, versions) in vds_for_key {
969 18 : let last_vd = versions.last().unwrap();
970 18 : if last_vd.last_modified > done_if_after {
971 0 : tracing::trace!("Key {key} has version later than done_if_after, skipping");
972 0 : continue;
973 18 : }
974 : // the version we want to restore to.
975 18 : let version_to_restore_to =
976 28 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
977 0 : Ok(v) => v,
978 18 : Err(e) => e,
979 : };
980 18 : if version_to_restore_to == versions.len() {
981 6 : tracing::trace!("Key {key} has no changes since timestamp, skipping");
982 6 : continue;
983 12 : }
984 12 : let mut do_delete = false;
985 12 : if version_to_restore_to == 0 {
986 : // All versions more recent, so the key didn't exist at the specified time point.
987 6 : tracing::trace!(
988 0 : "All {} versions more recent for {key}, deleting",
989 0 : versions.len()
990 : );
991 6 : do_delete = true;
992 : } else {
993 6 : match &versions[version_to_restore_to - 1] {
994 : VerOrDelete {
995 : kind: VerOrDeleteKind::Version,
996 6 : version_id,
997 6 : ..
998 6 : } => {
999 6 : tracing::trace!("Copying old version {version_id} for {key}...");
1000 : // Restore the state to the last version by copying
1001 6 : let source_id =
1002 6 : format!("{}/{key}?versionId={version_id}", self.bucket_name);
1003 6 :
1004 6 : backoff::retry(
1005 6 : || async {
1006 6 : let op = self
1007 6 : .client
1008 6 : .copy_object()
1009 6 : .bucket(self.bucket_name.clone())
1010 6 : .key(key)
1011 6 : .set_storage_class(self.upload_storage_class.clone())
1012 6 : .copy_source(&source_id)
1013 6 : .send();
1014 6 :
1015 6 : tokio::select! {
1016 6 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
1017 6 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
1018 : }
1019 12 : },
1020 6 : is_permanent,
1021 6 : warn_threshold,
1022 6 : max_retries,
1023 6 : "copying object version for time_travel_recover",
1024 6 : cancel,
1025 6 : )
1026 21 : .await
1027 6 : .ok_or_else(|| TimeTravelError::Cancelled)
1028 6 : .and_then(|x| x)?;
1029 6 : tracing::info!(%version_id, %key, "Copied old version in S3");
1030 : }
1031 : VerOrDelete {
1032 : kind: VerOrDeleteKind::DeleteMarker,
1033 : ..
1034 0 : } => {
1035 0 : do_delete = true;
1036 0 : }
1037 : }
1038 : };
1039 12 : if do_delete {
1040 6 : if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
1041 : // Key has since been deleted (but there was some history), no need to do anything
1042 2 : tracing::trace!("Key {key} already deleted, skipping.");
1043 : } else {
1044 4 : tracing::trace!("Deleting {key}...");
1045 :
1046 4 : let oid = ObjectIdentifier::builder()
1047 4 : .key(key.to_owned())
1048 4 : .build()
1049 4 : .map_err(|e| TimeTravelError::Other(e.into()))?;
1050 :
1051 4 : self.delete_oids(&permit, &[oid], cancel)
1052 8 : .await
1053 4 : .map_err(|e| {
1054 0 : // delete_oid0 will use TimeoutOrCancel
1055 0 : if TimeoutOrCancel::caused_by_cancel(&e) {
1056 0 : TimeTravelError::Cancelled
1057 : } else {
1058 0 : TimeTravelError::Other(e)
1059 : }
1060 4 : })?;
1061 : }
1062 6 : }
1063 : }
1064 6 : Ok(())
1065 6 : }
1066 : }
1067 :
1068 : // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
1069 : struct VerOrDelete {
1070 : kind: VerOrDeleteKind,
1071 : last_modified: DateTime,
1072 : version_id: String,
1073 : key: String,
1074 : }
1075 :
1076 : #[derive(Debug)]
1077 : enum VerOrDeleteKind {
1078 : Version,
1079 : DeleteMarker,
1080 : }
1081 :
1082 : impl VerOrDelete {
1083 36 : fn with_kind(
1084 36 : kind: VerOrDeleteKind,
1085 36 : last_modified: Option<DateTime>,
1086 36 : version_id: Option<String>,
1087 36 : key: Option<String>,
1088 36 : ) -> anyhow::Result<Self> {
1089 36 : let lvk = (last_modified, version_id, key);
1090 36 : let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
1091 0 : anyhow::bail!(
1092 0 : "One (or more) of last_modified, key, and id is None. \
1093 0 : Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
1094 0 : lvk.0,
1095 0 : lvk.1,
1096 0 : lvk.2,
1097 0 : );
1098 : };
1099 36 : Ok(Self {
1100 36 : kind,
1101 36 : last_modified,
1102 36 : version_id,
1103 36 : key,
1104 36 : })
1105 36 : }
1106 28 : fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
1107 28 : Self::with_kind(
1108 28 : VerOrDeleteKind::Version,
1109 28 : v.last_modified,
1110 28 : v.version_id,
1111 28 : v.key,
1112 28 : )
1113 28 : }
1114 8 : fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
1115 8 : Self::with_kind(
1116 8 : VerOrDeleteKind::DeleteMarker,
1117 8 : v.last_modified,
1118 8 : v.version_id,
1119 8 : v.key,
1120 8 : )
1121 8 : }
1122 : }
1123 :
1124 : #[cfg(test)]
1125 : mod tests {
1126 : use camino::Utf8Path;
1127 : use std::num::NonZeroUsize;
1128 :
1129 : use crate::{RemotePath, S3Bucket, S3Config};
1130 :
1131 : #[tokio::test]
1132 3 : async fn relative_path() {
1133 3 : let all_paths = ["", "some/path", "some/path/"];
1134 3 : let all_paths: Vec<RemotePath> = all_paths
1135 3 : .iter()
1136 9 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
1137 3 : .collect();
1138 3 : let prefixes = [
1139 3 : None,
1140 3 : Some(""),
1141 3 : Some("test/prefix"),
1142 3 : Some("test/prefix/"),
1143 3 : Some("/test/prefix/"),
1144 3 : ];
1145 3 : let expected_outputs = [
1146 3 : vec!["", "some/path", "some/path/"],
1147 3 : vec!["/", "/some/path", "/some/path/"],
1148 3 : vec![
1149 3 : "test/prefix/",
1150 3 : "test/prefix/some/path",
1151 3 : "test/prefix/some/path/",
1152 3 : ],
1153 3 : vec![
1154 3 : "test/prefix/",
1155 3 : "test/prefix/some/path",
1156 3 : "test/prefix/some/path/",
1157 3 : ],
1158 3 : vec![
1159 3 : "test/prefix/",
1160 3 : "test/prefix/some/path",
1161 3 : "test/prefix/some/path/",
1162 3 : ],
1163 3 : ];
1164 3 :
1165 15 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
1166 15 : let config = S3Config {
1167 15 : bucket_name: "bucket".to_owned(),
1168 15 : bucket_region: "region".to_owned(),
1169 15 : prefix_in_bucket: prefix.map(str::to_string),
1170 15 : endpoint: None,
1171 15 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
1172 15 : max_keys_per_list_response: Some(5),
1173 15 : upload_storage_class: None,
1174 15 : };
1175 15 : let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
1176 3 : .await
1177 15 : .expect("remote storage init");
1178 45 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
1179 45 : let result = storage.relative_path_to_s3_object(test_path);
1180 45 : let expected = expected_outputs[prefix_idx][test_path_idx];
1181 45 : assert_eq!(result, expected);
1182 3 : }
1183 3 : }
1184 3 : }
1185 : }
|