TLA Line data Source code
1 : //! AWS S3 storage wrapper around `rusoto` library.
2 : //!
3 : //! Respects `prefix_in_bucket` property from [`S3Config`],
4 : //! allowing multiple api users to independently work with the same S3 bucket, if
5 : //! their bucket prefixes are both specified and different.
6 :
7 : use std::{
8 : borrow::Cow,
9 : pin::Pin,
10 : sync::Arc,
11 : task::{Context, Poll},
12 : };
13 :
14 : use anyhow::Context as _;
15 : use aws_config::{
16 : environment::credentials::EnvironmentVariableCredentialsProvider,
17 : imds::credentials::ImdsCredentialsProvider,
18 : meta::credentials::CredentialsProviderChain,
19 : profile::ProfileFileCredentialsProvider,
20 : provider_config::ProviderConfig,
21 : retry::{RetryConfigBuilder, RetryMode},
22 : web_identity_token::WebIdentityTokenCredentialsProvider,
23 : BehaviorVersion,
24 : };
25 : use aws_credential_types::provider::SharedCredentialsProvider;
26 : use aws_sdk_s3::{
27 : config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep},
28 : error::SdkError,
29 : operation::get_object::GetObjectError,
30 : types::{Delete, ObjectIdentifier},
31 : Client,
32 : };
33 : use aws_smithy_async::rt::sleep::TokioSleep;
34 :
35 : use aws_smithy_types::body::SdkBody;
36 : use aws_smithy_types::byte_stream::ByteStream;
37 : use bytes::Bytes;
38 : use futures::stream::Stream;
39 : use hyper::Body;
40 : use scopeguard::ScopeGuard;
41 :
42 : use super::StorageMetadata;
43 : use crate::{
44 : ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
45 : S3Config, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
46 : };
47 :
48 : pub(super) mod metrics;
49 :
50 : use self::metrics::AttemptOutcome;
51 : pub(super) use self::metrics::RequestKind;
52 :
53 : /// AWS S3 storage.
54 : pub struct S3Bucket {
55 : client: Client,
56 : bucket_name: String,
57 : prefix_in_bucket: Option<String>,
58 : max_keys_per_list_response: Option<i32>,
59 : concurrency_limiter: ConcurrencyLimiter,
60 : }
61 :
62 UBC 0 : #[derive(Default)]
63 : struct GetObjectRequest {
64 : bucket: String,
65 : key: String,
66 : range: Option<String>,
67 : }
68 : impl S3Bucket {
69 : /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
70 CBC 239 : pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
71 239 : tracing::debug!(
72 UBC 0 : "Creating s3 remote storage for S3 bucket {}",
73 0 : aws_config.bucket_name
74 0 : );
75 :
76 CBC 239 : let region = Some(Region::new(aws_config.bucket_region.clone()));
77 239 :
78 239 : let provider_conf = ProviderConfig::without_region().with_region(region.clone());
79 239 :
80 239 : let credentials_provider = {
81 239 : // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
82 239 : CredentialsProviderChain::first_try(
83 239 : "env",
84 239 : EnvironmentVariableCredentialsProvider::new(),
85 239 : )
86 239 : // uses "AWS_PROFILE" / `aws sso login --profile <profile>`
87 239 : .or_else(
88 239 : "profile-sso",
89 239 : ProfileFileCredentialsProvider::builder()
90 239 : .configure(&provider_conf)
91 239 : .build(),
92 239 : )
93 239 : // uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
94 239 : // needed to access remote extensions bucket
95 239 : .or_else(
96 239 : "token",
97 239 : WebIdentityTokenCredentialsProvider::builder()
98 239 : .configure(&provider_conf)
99 239 : .build(),
100 239 : )
101 239 : // uses imds v2
102 239 : .or_else("imds", ImdsCredentialsProvider::builder().build())
103 239 : };
104 239 :
105 239 : // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
106 239 : let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
107 239 :
108 239 : // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
109 239 : // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
110 239 : // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
111 239 : let mut retry_config = RetryConfigBuilder::new();
112 239 : retry_config
113 239 : .set_max_attempts(Some(1))
114 239 : .set_mode(Some(RetryMode::Adaptive));
115 239 :
116 239 : let mut config_builder = Builder::default()
117 239 : .behavior_version(BehaviorVersion::v2023_11_09())
118 239 : .region(region)
119 239 : .identity_cache(IdentityCache::lazy().build())
120 239 : .credentials_provider(SharedCredentialsProvider::new(credentials_provider))
121 239 : .retry_config(retry_config.build())
122 239 : .sleep_impl(SharedAsyncSleep::from(sleep_impl));
123 :
124 239 : if let Some(custom_endpoint) = aws_config.endpoint.clone() {
125 143 : config_builder = config_builder
126 143 : .endpoint_url(custom_endpoint)
127 143 : .force_path_style(true);
128 143 : }
129 :
130 239 : let client = Client::from_conf(config_builder.build());
131 239 :
132 239 : let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
133 238 : let mut prefix = prefix;
134 239 : while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
135 1 : prefix = &prefix[1..]
136 : }
137 :
138 238 : let mut prefix = prefix.to_string();
139 250 : while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
140 12 : prefix.pop();
141 12 : }
142 238 : prefix
143 239 : });
144 239 : Ok(Self {
145 239 : client,
146 239 : bucket_name: aws_config.bucket_name.clone(),
147 239 : max_keys_per_list_response: aws_config.max_keys_per_list_response,
148 239 : prefix_in_bucket,
149 239 : concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
150 239 : })
151 239 : }
152 :
153 2672 : fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
154 2672 : let relative_path =
155 2672 : match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
156 2672 : Some(stripped) => stripped,
157 : // we rely on AWS to return properly prefixed paths
158 : // for requests with a certain prefix
159 UBC 0 : None => panic!(
160 0 : "Key {} does not start with bucket prefix {:?}",
161 0 : key, self.prefix_in_bucket
162 0 : ),
163 : };
164 CBC 2672 : RemotePath(
165 2672 : relative_path
166 2672 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
167 2672 : .collect(),
168 2672 : )
169 2672 : }
170 :
171 32496 : pub fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
172 32496 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
173 32496 : let path_string = path
174 32496 : .get_path()
175 32496 : .as_str()
176 32496 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
177 32496 : match &self.prefix_in_bucket {
178 32493 : Some(prefix) => prefix.clone() + "/" + path_string,
179 3 : None => path_string.to_string(),
180 : }
181 32496 : }
182 :
183 18037 : async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
184 18037 : let started_at = start_counting_cancelled_wait(kind);
185 18037 : let permit = self
186 18037 : .concurrency_limiter
187 18037 : .acquire(kind)
188 4254 : .await
189 18037 : .expect("semaphore is never closed");
190 18037 :
191 18037 : let started_at = ScopeGuard::into_inner(started_at);
192 18037 : metrics::BUCKET_METRICS
193 18037 : .wait_seconds
194 18037 : .observe_elapsed(kind, started_at);
195 18037 :
196 18037 : permit
197 18037 : }
198 :
199 10457 : async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit {
200 10457 : let started_at = start_counting_cancelled_wait(kind);
201 10457 : let permit = self
202 10457 : .concurrency_limiter
203 10457 : .acquire_owned(kind)
204 UBC 0 : .await
205 CBC 10457 : .expect("semaphore is never closed");
206 10457 :
207 10457 : let started_at = ScopeGuard::into_inner(started_at);
208 10457 : metrics::BUCKET_METRICS
209 10457 : .wait_seconds
210 10457 : .observe_elapsed(kind, started_at);
211 10457 : permit
212 10457 : }
213 :
214 10457 : async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
215 10457 : let kind = RequestKind::Get;
216 10457 : let permit = self.owned_permit(kind).await;
217 :
218 10457 : let started_at = start_measuring_requests(kind);
219 :
220 10457 : let get_object = self
221 10457 : .client
222 10457 : .get_object()
223 10457 : .bucket(request.bucket)
224 10457 : .key(request.key)
225 10457 : .set_range(request.range)
226 10457 : .send()
227 30605 : .await;
228 :
229 10454 : let started_at = ScopeGuard::into_inner(started_at);
230 :
231 256 : match get_object {
232 10198 : Ok(object_output) => {
233 10198 : let metadata = object_output.metadata().cloned().map(StorageMetadata);
234 10198 : let etag = object_output.e_tag.clone();
235 10198 : let last_modified = object_output.last_modified.and_then(|t| t.try_into().ok());
236 10198 :
237 10198 : let body = object_output.body;
238 10198 : let body = ByteStreamAsStream::from(body);
239 10198 : let body = PermitCarrying::new(permit, body);
240 10198 : let body = TimedDownload::new(started_at, body);
241 10198 :
242 10198 : Ok(Download {
243 10198 : metadata,
244 10198 : etag,
245 10198 : last_modified,
246 10198 : download_stream: Box::pin(body),
247 10198 : })
248 : }
249 256 : Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
250 : // Count this in the AttemptOutcome::Ok bucket, because 404 is not
251 : // an error: we expect to sometimes fetch an object and find it missing,
252 : // e.g. when probing for timeline indices.
253 256 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
254 256 : kind,
255 256 : AttemptOutcome::Ok,
256 256 : started_at,
257 256 : );
258 256 : Err(DownloadError::NotFound)
259 : }
260 UBC 0 : Err(e) => {
261 0 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
262 0 : kind,
263 0 : AttemptOutcome::Err,
264 0 : started_at,
265 0 : );
266 0 :
267 0 : Err(DownloadError::Other(
268 0 : anyhow::Error::new(e).context("download s3 object"),
269 0 : ))
270 : }
271 : }
272 CBC 10454 : }
273 : }
274 :
275 : pin_project_lite::pin_project! {
276 : struct ByteStreamAsStream {
277 : #[pin]
278 : inner: aws_smithy_types::byte_stream::ByteStream
279 : }
280 : }
281 :
282 : impl From<aws_smithy_types::byte_stream::ByteStream> for ByteStreamAsStream {
283 10198 : fn from(inner: aws_smithy_types::byte_stream::ByteStream) -> Self {
284 10198 : ByteStreamAsStream { inner }
285 10198 : }
286 : }
287 :
288 : impl Stream for ByteStreamAsStream {
289 : type Item = std::io::Result<Bytes>;
290 :
291 120583 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
292 120583 : // this does the std::io::ErrorKind::Other conversion
293 120583 : self.project().inner.poll_next(cx).map_err(|x| x.into())
294 120583 : }
295 :
296 : // cannot implement size_hint because inner.size_hint is remaining size in bytes, which makes
297 : // sense and Stream::size_hint does not really
298 : }
299 :
300 : pin_project_lite::pin_project! {
301 : /// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
302 : struct PermitCarrying<S> {
303 : permit: tokio::sync::OwnedSemaphorePermit,
304 : #[pin]
305 : inner: S,
306 : }
307 : }
308 :
309 : impl<S> PermitCarrying<S> {
310 10198 : fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
311 10198 : Self { permit, inner }
312 10198 : }
313 : }
314 :
315 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for PermitCarrying<S> {
316 : type Item = <S as Stream>::Item;
317 :
318 120583 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
319 120583 : self.project().inner.poll_next(cx)
320 120583 : }
321 :
322 UBC 0 : fn size_hint(&self) -> (usize, Option<usize>) {
323 0 : self.inner.size_hint()
324 0 : }
325 : }
326 :
327 : pin_project_lite::pin_project! {
328 : /// Times and tracks the outcome of the request.
329 : struct TimedDownload<S> {
330 : started_at: std::time::Instant,
331 : outcome: metrics::AttemptOutcome,
332 : #[pin]
333 : inner: S
334 : }
335 :
336 : impl<S> PinnedDrop for TimedDownload<S> {
337 : fn drop(mut this: Pin<&mut Self>) {
338 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at);
339 : }
340 : }
341 : }
342 :
343 : impl<S> TimedDownload<S> {
344 CBC 10198 : fn new(started_at: std::time::Instant, inner: S) -> Self {
345 10198 : TimedDownload {
346 10198 : started_at,
347 10198 : outcome: metrics::AttemptOutcome::Cancelled,
348 10198 : inner,
349 10198 : }
350 10198 : }
351 : }
352 :
353 : impl<S: Stream<Item = std::io::Result<Bytes>>> Stream for TimedDownload<S> {
354 : type Item = <S as Stream>::Item;
355 :
356 120583 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
357 120583 : use std::task::ready;
358 120583 :
359 120583 : let this = self.project();
360 :
361 120583 : let res = ready!(this.inner.poll_next(cx));
362 58122 : match &res {
363 58122 : Some(Ok(_)) => {}
364 UBC 0 : Some(Err(_)) => *this.outcome = metrics::AttemptOutcome::Err,
365 CBC 22985 : None => *this.outcome = metrics::AttemptOutcome::Ok,
366 : }
367 :
368 81107 : Poll::Ready(res)
369 120583 : }
370 :
371 UBC 0 : fn size_hint(&self) -> (usize, Option<usize>) {
372 0 : self.inner.size_hint()
373 0 : }
374 : }
375 :
376 : #[async_trait::async_trait]
377 : impl RemoteStorage for S3Bucket {
378 CBC 406 : async fn list(
379 406 : &self,
380 406 : prefix: Option<&RemotePath>,
381 406 : mode: ListingMode,
382 406 : ) -> Result<Listing, DownloadError> {
383 406 : let kind = RequestKind::List;
384 406 : let mut result = Listing::default();
385 406 :
386 406 : // get the passed prefix or if it is not set use prefix_in_bucket value
387 406 : let list_prefix = prefix
388 406 : .map(|p| self.relative_path_to_s3_object(p))
389 406 : .or_else(|| self.prefix_in_bucket.clone())
390 406 : .map(|mut p| {
391 : // required to end with a separator
392 : // otherwise request will return only the entry of a prefix
393 406 : if matches!(mode, ListingMode::WithDelimiter)
394 145 : && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
395 145 : {
396 145 : p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
397 261 : }
398 406 : p
399 406 : });
400 406 :
401 406 : let mut continuation_token = None;
402 :
403 : loop {
404 414 : let _guard = self.permit(kind).await;
405 414 : let started_at = start_measuring_requests(kind);
406 414 :
407 414 : let mut request = self
408 414 : .client
409 414 : .list_objects_v2()
410 414 : .bucket(self.bucket_name.clone())
411 414 : .set_prefix(list_prefix.clone())
412 414 : .set_continuation_token(continuation_token)
413 414 : .set_max_keys(self.max_keys_per_list_response);
414 414 :
415 414 : if let ListingMode::WithDelimiter = mode {
416 149 : request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
417 265 : }
418 :
419 414 : let response = request
420 414 : .send()
421 1862 : .await
422 414 : .context("Failed to list S3 prefixes")
423 414 : .map_err(DownloadError::Other);
424 414 :
425 414 : let started_at = ScopeGuard::into_inner(started_at);
426 414 :
427 414 : metrics::BUCKET_METRICS
428 414 : .req_seconds
429 414 : .observe_elapsed(kind, &response, started_at);
430 :
431 414 : let response = response?;
432 :
433 414 : let keys = response.contents();
434 414 : let empty = Vec::new();
435 414 : let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
436 414 :
437 414 : tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
438 :
439 2860 : for object in keys {
440 2446 : let object_path = object.key().expect("response does not contain a key");
441 2446 : let remote_path = self.s3_object_to_relative_path(object_path);
442 2446 : result.keys.push(remote_path);
443 2446 : }
444 :
445 414 : result.prefixes.extend(
446 414 : prefixes
447 414 : .iter()
448 414 : .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
449 414 : );
450 :
451 414 : continuation_token = match response.next_continuation_token {
452 8 : Some(new_token) => Some(new_token),
453 406 : None => break,
454 406 : };
455 406 : }
456 406 :
457 406 : Ok(result)
458 812 : }
459 :
460 15315 : async fn upload(
461 15315 : &self,
462 15315 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
463 15315 : from_size_bytes: usize,
464 15315 : to: &RemotePath,
465 15315 : metadata: Option<StorageMetadata>,
466 15315 : ) -> anyhow::Result<()> {
467 15315 : let kind = RequestKind::Put;
468 15315 : let _guard = self.permit(kind).await;
469 :
470 15315 : let started_at = start_measuring_requests(kind);
471 15315 :
472 15315 : let body = Body::wrap_stream(from);
473 15315 : let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body));
474 :
475 15315 : let res = self
476 15315 : .client
477 15315 : .put_object()
478 15315 : .bucket(self.bucket_name.clone())
479 15315 : .key(self.relative_path_to_s3_object(to))
480 15315 : .set_metadata(metadata.map(|m| m.0))
481 15315 : .content_length(from_size_bytes.try_into()?)
482 15315 : .body(bytes_stream)
483 15315 : .send()
484 50020 : .await;
485 :
486 15313 : let started_at = ScopeGuard::into_inner(started_at);
487 15313 : metrics::BUCKET_METRICS
488 15313 : .req_seconds
489 15313 : .observe_elapsed(kind, &res, started_at);
490 15313 :
491 15313 : res?;
492 :
493 15313 : Ok(())
494 30628 : }
495 :
496 12 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
497 12 : let kind = RequestKind::Copy;
498 12 : let _guard = self.permit(kind).await;
499 :
500 12 : let started_at = start_measuring_requests(kind);
501 12 :
502 12 : // we need to specify bucket_name as a prefix
503 12 : let copy_source = format!(
504 12 : "{}/{}",
505 12 : self.bucket_name,
506 12 : self.relative_path_to_s3_object(from)
507 12 : );
508 :
509 12 : let res = self
510 12 : .client
511 12 : .copy_object()
512 12 : .bucket(self.bucket_name.clone())
513 12 : .key(self.relative_path_to_s3_object(to))
514 12 : .copy_source(copy_source)
515 12 : .send()
516 48 : .await;
517 :
518 12 : let started_at = ScopeGuard::into_inner(started_at);
519 12 : metrics::BUCKET_METRICS
520 12 : .req_seconds
521 12 : .observe_elapsed(kind, &res, started_at);
522 12 :
523 12 : res?;
524 :
525 12 : Ok(())
526 24 : }
527 :
528 10411 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
529 : // if prefix is not none then download file `prefix/from`
530 : // if prefix is none then download file `from`
531 10411 : self.download_object(GetObjectRequest {
532 10411 : bucket: self.bucket_name.clone(),
533 10411 : key: self.relative_path_to_s3_object(from),
534 10411 : range: None,
535 10411 : })
536 30489 : .await
537 20819 : }
538 :
539 46 : async fn download_byte_range(
540 46 : &self,
541 46 : from: &RemotePath,
542 46 : start_inclusive: u64,
543 46 : end_exclusive: Option<u64>,
544 46 : ) -> Result<Download, DownloadError> {
545 : // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
546 : // and needs both ends to be exclusive
547 46 : let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
548 46 : let range = Some(match end_inclusive {
549 6 : Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
550 40 : None => format!("bytes={start_inclusive}-"),
551 : });
552 :
553 46 : self.download_object(GetObjectRequest {
554 46 : bucket: self.bucket_name.clone(),
555 46 : key: self.relative_path_to_s3_object(from),
556 46 : range,
557 46 : })
558 116 : .await
559 92 : }
560 2204 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
561 2204 : let kind = RequestKind::Delete;
562 2204 : let _guard = self.permit(kind).await;
563 :
564 2204 : let mut delete_objects = Vec::with_capacity(paths.len());
565 8397 : for path in paths {
566 6193 : let obj_id = ObjectIdentifier::builder()
567 6193 : .set_key(Some(self.relative_path_to_s3_object(path)))
568 6193 : .build()?;
569 6193 : delete_objects.push(obj_id);
570 : }
571 :
572 2204 : for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
573 2204 : let started_at = start_measuring_requests(kind);
574 :
575 2204 : let resp = self
576 2204 : .client
577 2204 : .delete_objects()
578 2204 : .bucket(self.bucket_name.clone())
579 2204 : .delete(
580 2204 : Delete::builder()
581 2204 : .set_objects(Some(chunk.to_vec()))
582 2204 : .build()?,
583 : )
584 2204 : .send()
585 9008 : .await;
586 :
587 2204 : let started_at = ScopeGuard::into_inner(started_at);
588 2204 : metrics::BUCKET_METRICS
589 2204 : .req_seconds
590 2204 : .observe_elapsed(kind, &resp, started_at);
591 2204 :
592 2204 : match resp {
593 2204 : Ok(resp) => {
594 2204 : metrics::BUCKET_METRICS
595 2204 : .deleted_objects_total
596 2204 : .inc_by(chunk.len() as u64);
597 2204 : if let Some(errors) = resp.errors {
598 : // Log a bounded number of the errors within the response:
599 : // these requests can carry 1000 keys so logging each one
600 : // would be too verbose, especially as errors may lead us
601 : // to retry repeatedly.
602 : const LOG_UP_TO_N_ERRORS: usize = 10;
603 UBC 0 : for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
604 0 : tracing::warn!(
605 0 : "DeleteObjects key {} failed: {}: {}",
606 0 : e.key.as_ref().map(Cow::from).unwrap_or("".into()),
607 0 : e.code.as_ref().map(Cow::from).unwrap_or("".into()),
608 0 : e.message.as_ref().map(Cow::from).unwrap_or("".into())
609 0 : );
610 : }
611 :
612 0 : return Err(anyhow::format_err!(
613 0 : "Failed to delete {} objects",
614 0 : errors.len()
615 0 : ));
616 CBC 2204 : }
617 : }
618 UBC 0 : Err(e) => {
619 0 : return Err(e.into());
620 : }
621 : }
622 : }
623 CBC 2204 : Ok(())
624 4408 : }
625 :
626 2003 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
627 2003 : let paths = std::array::from_ref(path);
628 8101 : self.delete_objects(paths).await
629 4006 : }
630 : }
631 :
632 : /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
633 28494 : fn start_counting_cancelled_wait(
634 28494 : kind: RequestKind,
635 28494 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
636 28494 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
637 UBC 0 : metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc()
638 CBC 28494 : })
639 28494 : }
640 :
641 : /// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`].
642 28494 : fn start_measuring_requests(
643 28494 : kind: RequestKind,
644 28494 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
645 28494 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
646 4 : metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
647 4 : kind,
648 4 : AttemptOutcome::Cancelled,
649 4 : started_at,
650 4 : )
651 28494 : })
652 28494 : }
653 :
654 : #[cfg(test)]
655 : mod tests {
656 : use camino::Utf8Path;
657 : use std::num::NonZeroUsize;
658 :
659 : use crate::{RemotePath, S3Bucket, S3Config};
660 :
661 1 : #[test]
662 1 : fn relative_path() {
663 1 : let all_paths = ["", "some/path", "some/path/"];
664 1 : let all_paths: Vec<RemotePath> = all_paths
665 1 : .iter()
666 3 : .map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
667 1 : .collect();
668 1 : let prefixes = [
669 1 : None,
670 1 : Some(""),
671 1 : Some("test/prefix"),
672 1 : Some("test/prefix/"),
673 1 : Some("/test/prefix/"),
674 1 : ];
675 1 : let expected_outputs = vec![
676 1 : vec!["", "some/path", "some/path"],
677 1 : vec!["/", "/some/path", "/some/path"],
678 1 : vec![
679 1 : "test/prefix/",
680 1 : "test/prefix/some/path",
681 1 : "test/prefix/some/path",
682 1 : ],
683 1 : vec![
684 1 : "test/prefix/",
685 1 : "test/prefix/some/path",
686 1 : "test/prefix/some/path",
687 1 : ],
688 1 : vec![
689 1 : "test/prefix/",
690 1 : "test/prefix/some/path",
691 1 : "test/prefix/some/path",
692 1 : ],
693 1 : ];
694 :
695 5 : for (prefix_idx, prefix) in prefixes.iter().enumerate() {
696 5 : let config = S3Config {
697 5 : bucket_name: "bucket".to_owned(),
698 5 : bucket_region: "region".to_owned(),
699 5 : prefix_in_bucket: prefix.map(str::to_string),
700 5 : endpoint: None,
701 5 : concurrency_limit: NonZeroUsize::new(100).unwrap(),
702 5 : max_keys_per_list_response: Some(5),
703 5 : };
704 5 : let storage = S3Bucket::new(&config).expect("remote storage init");
705 15 : for (test_path_idx, test_path) in all_paths.iter().enumerate() {
706 15 : let result = storage.relative_path_to_s3_object(test_path);
707 15 : let expected = expected_outputs[prefix_idx][test_path_idx];
708 15 : assert_eq!(result, expected);
709 : }
710 : }
711 1 : }
712 : }
|