Line data Source code
1 : //! Azure Blob Storage wrapper
2 :
3 : use std::borrow::Cow;
4 : use std::collections::HashMap;
5 : use std::env;
6 : use std::fmt::Display;
7 : use std::io;
8 : use std::num::NonZeroU32;
9 : use std::pin::Pin;
10 : use std::str::FromStr;
11 : use std::sync::Arc;
12 : use std::time::Duration;
13 : use std::time::SystemTime;
14 :
15 : use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
16 : use anyhow::Context;
17 : use anyhow::Result;
18 : use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
19 : use azure_core::HttpClient;
20 : use azure_core::TransportOptions;
21 : use azure_core::{Continuable, RetryOptions};
22 : use azure_storage::StorageCredentials;
23 : use azure_storage_blobs::blob::CopyStatus;
24 : use azure_storage_blobs::prelude::ClientBuilder;
25 : use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient};
26 : use bytes::Bytes;
27 : use futures::future::Either;
28 : use futures::stream::Stream;
29 : use futures::FutureExt;
30 : use futures_util::StreamExt;
31 : use futures_util::TryStreamExt;
32 : use http_types::{StatusCode, Url};
33 : use scopeguard::ScopeGuard;
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::debug;
36 : use utils::backoff;
37 : use utils::backoff::exponential_backoff_duration_seconds;
38 :
39 : use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
40 : use crate::DownloadKind;
41 : use crate::{
42 : config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError,
43 : DownloadOpts, Listing, ListingMode, ListingObject, RemotePath, RemoteStorage, StorageMetadata,
44 : TimeTravelError, TimeoutOrCancel,
45 : };
46 :
47 : pub struct AzureBlobStorage {
48 : client: ContainerClient,
49 : container_name: String,
50 : prefix_in_container: Option<String>,
51 : max_keys_per_list_response: Option<NonZeroU32>,
52 : concurrency_limiter: ConcurrencyLimiter,
53 : // Per-request timeout. Accessible for tests.
54 : pub timeout: Duration,
55 :
56 : // Alternative timeout used for metadata objects which are expected to be small
57 : pub small_timeout: Duration,
58 : }
59 :
60 : impl AzureBlobStorage {
61 10 : pub fn new(
62 10 : azure_config: &AzureConfig,
63 10 : timeout: Duration,
64 10 : small_timeout: Duration,
65 10 : ) -> Result<Self> {
66 10 : debug!(
67 0 : "Creating azure remote storage for azure container {}",
68 : azure_config.container_name
69 : );
70 :
71 : // Use the storage account from the config by default, fall back to env var if not present.
72 10 : let account = azure_config.storage_account.clone().unwrap_or_else(|| {
73 10 : env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT")
74 10 : });
75 :
76 : // If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
77 : // otherwise try the token based credentials.
78 10 : let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
79 10 : StorageCredentials::access_key(account.clone(), access_key)
80 : } else {
81 0 : let token_credential = azure_identity::create_default_credential()
82 0 : .context("trying to obtain Azure default credentials")?;
83 0 : StorageCredentials::token_credential(token_credential)
84 : };
85 :
86 10 : let builder = ClientBuilder::new(account, credentials)
87 10 : // we have an outer retry
88 10 : .retry(RetryOptions::none())
89 10 : // Customize transport to configure conneciton pooling
90 10 : .transport(TransportOptions::new(Self::reqwest_client(
91 10 : azure_config.conn_pool_size,
92 10 : )));
93 10 :
94 10 : let client = builder.container_client(azure_config.container_name.to_owned());
95 :
96 10 : let max_keys_per_list_response =
97 10 : if let Some(limit) = azure_config.max_keys_per_list_response {
98 : Some(
99 4 : NonZeroU32::new(limit as u32)
100 4 : .ok_or_else(|| anyhow::anyhow!("max_keys_per_list_response can't be 0"))?,
101 : )
102 : } else {
103 6 : None
104 : };
105 :
106 10 : Ok(AzureBlobStorage {
107 10 : client,
108 10 : container_name: azure_config.container_name.to_owned(),
109 10 : prefix_in_container: azure_config.prefix_in_container.to_owned(),
110 10 : max_keys_per_list_response,
111 10 : concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
112 10 : timeout,
113 10 : small_timeout,
114 10 : })
115 10 : }
116 :
117 10 : fn reqwest_client(conn_pool_size: usize) -> Arc<dyn HttpClient> {
118 10 : let client = reqwest::ClientBuilder::new()
119 10 : .pool_max_idle_per_host(conn_pool_size)
120 10 : .build()
121 10 : .expect("failed to build `reqwest` client");
122 10 : Arc::new(client)
123 10 : }
124 :
125 237 : pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
126 237 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
127 237 : let path_string = path.get_path().as_str();
128 237 : match &self.prefix_in_container {
129 237 : Some(prefix) => {
130 237 : if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
131 237 : prefix.clone() + path_string
132 : } else {
133 0 : format!("{prefix}{REMOTE_STORAGE_PREFIX_SEPARATOR}{path_string}")
134 : }
135 : }
136 0 : None => path_string.to_string(),
137 : }
138 237 : }
139 :
140 249 : fn name_to_relative_path(&self, key: &str) -> RemotePath {
141 249 : let relative_path =
142 249 : match key.strip_prefix(self.prefix_in_container.as_deref().unwrap_or_default()) {
143 249 : Some(stripped) => stripped,
144 : // we rely on Azure to return properly prefixed paths
145 : // for requests with a certain prefix
146 0 : None => panic!(
147 0 : "Key {key} does not start with container prefix {:?}",
148 0 : self.prefix_in_container
149 0 : ),
150 : };
151 249 : RemotePath(
152 249 : relative_path
153 249 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
154 249 : .collect(),
155 249 : )
156 249 : }
157 :
158 11 : async fn download_for_builder(
159 11 : &self,
160 11 : builder: GetBlobBuilder,
161 11 : timeout: Duration,
162 11 : cancel: &CancellationToken,
163 11 : ) -> Result<Download, DownloadError> {
164 11 : let kind = RequestKind::Get;
165 :
166 11 : let _permit = self.permit(kind, cancel).await?;
167 11 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
168 11 : let cancel_or_timeout_ = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
169 11 :
170 11 : let mut etag = None;
171 11 : let mut last_modified = None;
172 11 : let mut metadata = HashMap::new();
173 11 :
174 11 : let started_at = start_measuring_requests(kind);
175 11 :
176 11 : let download = async {
177 11 : let response = builder
178 11 : // convert to concrete Pageable
179 11 : .into_stream()
180 11 : // convert to TryStream
181 11 : .into_stream()
182 11 : .map_err(to_download_error);
183 11 :
184 11 : // apply per request timeout
185 11 : let response = tokio_stream::StreamExt::timeout(response, timeout);
186 11 :
187 11 : // flatten
188 11 : let response = response.map(|res| match res {
189 11 : Ok(res) => res,
190 0 : Err(_elapsed) => Err(DownloadError::Timeout),
191 11 : });
192 11 :
193 11 : let mut response = Box::pin(response);
194 :
195 11 : let Some(part) = response.next().await else {
196 0 : return Err(DownloadError::Other(anyhow::anyhow!(
197 0 : "Azure GET response contained no response body"
198 0 : )));
199 : };
200 11 : let part = part?;
201 9 : if etag.is_none() {
202 9 : etag = Some(part.blob.properties.etag);
203 9 : }
204 9 : if last_modified.is_none() {
205 9 : last_modified = Some(part.blob.properties.last_modified.into());
206 9 : }
207 9 : if let Some(blob_meta) = part.blob.metadata {
208 0 : metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned())));
209 9 : }
210 :
211 : // unwrap safety: if these were None, bufs would be empty and we would have returned an error already
212 9 : let etag = etag.unwrap();
213 9 : let last_modified = last_modified.unwrap();
214 9 :
215 9 : let tail_stream = response
216 9 : .map(|part| match part {
217 0 : Ok(part) => Either::Left(part.data.map(|r| r.map_err(io::Error::other))),
218 0 : Err(e) => {
219 0 : Either::Right(futures::stream::once(async { Err(io::Error::other(e)) }))
220 : }
221 9 : })
222 9 : .flatten();
223 9 : let stream = part
224 9 : .data
225 9 : .map(|r| r.map_err(io::Error::other))
226 9 : .chain(sync_wrapper::SyncStream::new(tail_stream));
227 9 : //.chain(SyncStream::from_pin(Box::pin(tail_stream)));
228 9 :
229 9 : let download_stream = crate::support::DownloadStream::new(cancel_or_timeout_, stream);
230 9 :
231 9 : Ok(Download {
232 9 : download_stream: Box::pin(download_stream),
233 9 : etag,
234 9 : last_modified,
235 9 : metadata: Some(StorageMetadata(metadata)),
236 9 : })
237 11 : };
238 :
239 11 : let download = tokio::select! {
240 11 : bufs = download => bufs,
241 11 : cancel_or_timeout = cancel_or_timeout => match cancel_or_timeout {
242 0 : TimeoutOrCancel::Timeout => return Err(DownloadError::Timeout),
243 0 : TimeoutOrCancel::Cancel => return Err(DownloadError::Cancelled),
244 : },
245 : };
246 11 : let started_at = ScopeGuard::into_inner(started_at);
247 11 : let outcome = match &download {
248 9 : Ok(_) => AttemptOutcome::Ok,
249 : // At this level in the stack 404 and 304 responses do not indicate an error.
250 : // There's expected cases when a blob may not exist or hasn't been modified since
251 : // the last get (e.g. probing for timeline indices and heatmap downloads).
252 : // Callers should handle errors if they are unexpected.
253 2 : Err(DownloadError::NotFound | DownloadError::Unmodified) => AttemptOutcome::Ok,
254 0 : Err(_) => AttemptOutcome::Err,
255 : };
256 11 : crate::metrics::BUCKET_METRICS
257 11 : .req_seconds
258 11 : .observe_elapsed(kind, outcome, started_at);
259 11 : download
260 11 : }
261 :
262 227 : async fn permit(
263 227 : &self,
264 227 : kind: RequestKind,
265 227 : cancel: &CancellationToken,
266 227 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
267 227 : let acquire = self.concurrency_limiter.acquire(kind);
268 227 :
269 227 : tokio::select! {
270 227 : permit = acquire => Ok(permit.expect("never closed")),
271 227 : _ = cancel.cancelled() => Err(Cancelled),
272 : }
273 227 : }
274 :
275 0 : pub fn container_name(&self) -> &str {
276 0 : &self.container_name
277 0 : }
278 : }
279 :
280 0 : fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
281 0 : let mut res = Metadata::new();
282 0 : for (k, v) in metadata.0.into_iter() {
283 0 : res.insert(k, v);
284 0 : }
285 0 : res
286 0 : }
287 :
288 3 : fn to_download_error(error: azure_core::Error) -> DownloadError {
289 3 : if let Some(http_err) = error.as_http_error() {
290 3 : match http_err.status() {
291 1 : StatusCode::NotFound => DownloadError::NotFound,
292 2 : StatusCode::NotModified => DownloadError::Unmodified,
293 0 : StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
294 0 : _ => DownloadError::Other(anyhow::Error::new(error)),
295 : }
296 : } else {
297 0 : DownloadError::Other(error.into())
298 : }
299 3 : }
300 :
301 : impl RemoteStorage for AzureBlobStorage {
302 26 : fn list_streaming(
303 26 : &self,
304 26 : prefix: Option<&RemotePath>,
305 26 : mode: ListingMode,
306 26 : max_keys: Option<NonZeroU32>,
307 26 : cancel: &CancellationToken,
308 26 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
309 26 : // get the passed prefix or if it is not set use prefix_in_bucket value
310 26 : let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
311 10 : self.prefix_in_container.clone().map(|mut s| {
312 10 : if !s.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
313 0 : s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
314 10 : }
315 10 : s
316 10 : })
317 26 : });
318 26 :
319 26 : async_stream::stream! {
320 26 : let _permit = self.permit(RequestKind::List, cancel).await?;
321 26 :
322 26 : let mut builder = self.client.list_blobs();
323 26 :
324 26 : if let ListingMode::WithDelimiter = mode {
325 26 : builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
326 26 : }
327 26 :
328 26 : if let Some(prefix) = list_prefix {
329 26 : builder = builder.prefix(Cow::from(prefix.to_owned()));
330 26 : }
331 26 :
332 26 : if let Some(limit) = self.max_keys_per_list_response {
333 26 : builder = builder.max_results(MaxResults::new(limit));
334 26 : }
335 26 :
336 26 : let mut next_marker = None;
337 26 :
338 26 : let mut timeout_try_cnt = 1;
339 26 :
340 26 : 'outer: loop {
341 26 : let mut builder = builder.clone();
342 26 : if let Some(marker) = next_marker.clone() {
343 26 : builder = builder.marker(marker);
344 26 : }
345 26 : // Azure Blob Rust SDK does not expose the list blob API directly. Users have to use
346 26 : // their pageable iterator wrapper that returns all keys as a stream. We want to have
347 26 : // full control of paging, and therefore we only take the first item from the stream.
348 26 : let mut response_stream = builder.into_stream();
349 26 : let response = response_stream.next();
350 26 : // Timeout mechanism: Azure client will sometimes stuck on a request, but retrying that request
351 26 : // would immediately succeed. Therefore, we use exponential backoff timeout to retry the request.
352 26 : // (Usually, exponential backoff is used to determine the sleep time between two retries.) We
353 26 : // start with 10.0 second timeout, and double the timeout for each failure, up to 5 failures.
354 26 : // timeout = min(5 * (1.0+1.0)^n, self.timeout).
355 26 : let this_timeout = (5.0 * exponential_backoff_duration_seconds(timeout_try_cnt, 1.0, self.timeout.as_secs_f64())).min(self.timeout.as_secs_f64());
356 26 : let response = tokio::time::timeout(Duration::from_secs_f64(this_timeout), response);
357 45 : let response = response.map(|res| {
358 45 : match res {
359 45 : Ok(Some(Ok(res))) => Ok(Some(res)),
360 26 : Ok(Some(Err(e))) => Err(to_download_error(e)),
361 26 : Ok(None) => Ok(None),
362 26 : Err(_elasped) => Err(DownloadError::Timeout),
363 26 : }
364 45 : });
365 26 : let mut max_keys = max_keys.map(|mk| mk.get());
366 26 : let next_item = tokio::select! {
367 26 : op = response => op,
368 26 : _ = cancel.cancelled() => Err(DownloadError::Cancelled),
369 26 : };
370 26 :
371 26 : if let Err(DownloadError::Timeout) = &next_item {
372 26 : timeout_try_cnt += 1;
373 26 : if timeout_try_cnt <= 5 {
374 26 : continue;
375 26 : }
376 26 : }
377 26 :
378 26 : let next_item = next_item?;
379 26 :
380 26 : // Log a warning if we saw two timeouts in a row before a successful request
381 26 : if timeout_try_cnt > 2 {
382 26 : tracing::warn!("Azure Blob Storage list timed out and succeeded after {} tries", timeout_try_cnt);
383 26 : }
384 26 : timeout_try_cnt = 1;
385 26 :
386 26 : let Some(entry) = next_item else {
387 26 : // The list is complete, so yield it.
388 26 : break;
389 26 : };
390 26 :
391 26 : let mut res = Listing::default();
392 26 : next_marker = entry.continuation();
393 26 : let prefix_iter = entry
394 26 : .blobs
395 26 : .prefixes()
396 52 : .map(|prefix| self.name_to_relative_path(&prefix.name));
397 26 : res.prefixes.extend(prefix_iter);
398 26 :
399 26 : let blob_iter = entry
400 26 : .blobs
401 26 : .blobs()
402 197 : .map(|k| ListingObject{
403 197 : key: self.name_to_relative_path(&k.name),
404 197 : last_modified: k.properties.last_modified.into(),
405 197 : size: k.properties.content_length,
406 197 : }
407 26 : );
408 26 :
409 26 : for key in blob_iter {
410 26 : res.keys.push(key);
411 26 :
412 26 : if let Some(mut mk) = max_keys {
413 26 : assert!(mk > 0);
414 26 : mk -= 1;
415 26 : if mk == 0 {
416 26 : yield Ok(res); // limit reached
417 26 : break 'outer;
418 26 : }
419 26 : max_keys = Some(mk);
420 26 : }
421 26 : }
422 26 : yield Ok(res);
423 26 :
424 26 : // We are done here
425 26 : if next_marker.is_none() {
426 26 : break;
427 26 : }
428 26 : }
429 26 : }
430 26 : }
431 :
432 3 : async fn head_object(
433 3 : &self,
434 3 : key: &RemotePath,
435 3 : cancel: &CancellationToken,
436 3 : ) -> Result<ListingObject, DownloadError> {
437 3 : let kind = RequestKind::Head;
438 3 : let _permit = self.permit(kind, cancel).await?;
439 :
440 3 : let started_at = start_measuring_requests(kind);
441 3 :
442 3 : let blob_client = self.client.blob_client(self.relative_path_to_name(key));
443 3 : let properties_future = blob_client.get_properties().into_future();
444 3 :
445 3 : let properties_future = tokio::time::timeout(self.small_timeout, properties_future);
446 :
447 3 : let res = tokio::select! {
448 3 : res = properties_future => res,
449 3 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
450 : };
451 :
452 3 : if let Ok(inner) = &res {
453 3 : // do not incl. timeouts as errors in metrics but cancellations
454 3 : let started_at = ScopeGuard::into_inner(started_at);
455 3 : crate::metrics::BUCKET_METRICS
456 3 : .req_seconds
457 3 : .observe_elapsed(kind, inner, started_at);
458 3 : }
459 :
460 3 : let data = match res {
461 2 : Ok(Ok(data)) => Ok(data),
462 1 : Ok(Err(sdk)) => Err(to_download_error(sdk)),
463 0 : Err(_timeout) => Err(DownloadError::Timeout),
464 1 : }?;
465 :
466 2 : let properties = data.blob.properties;
467 2 : Ok(ListingObject {
468 2 : key: key.to_owned(),
469 2 : last_modified: SystemTime::from(properties.last_modified),
470 2 : size: properties.content_length,
471 2 : })
472 3 : }
473 :
474 93 : async fn upload(
475 93 : &self,
476 93 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
477 93 : data_size_bytes: usize,
478 93 : to: &RemotePath,
479 93 : metadata: Option<StorageMetadata>,
480 93 : cancel: &CancellationToken,
481 93 : ) -> anyhow::Result<()> {
482 93 : let kind = RequestKind::Put;
483 93 : let _permit = self.permit(kind, cancel).await?;
484 :
485 93 : let started_at = start_measuring_requests(kind);
486 93 :
487 93 : let op = async {
488 93 : let blob_client = self.client.blob_client(self.relative_path_to_name(to));
489 93 :
490 93 : let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
491 93 : Box::pin(from);
492 93 :
493 93 : let from = NonSeekableStream::new(from, data_size_bytes);
494 93 :
495 93 : let body = azure_core::Body::SeekableStream(Box::new(from));
496 93 :
497 93 : let mut builder = blob_client.put_block_blob(body);
498 :
499 93 : if let Some(metadata) = metadata {
500 0 : builder = builder.metadata(to_azure_metadata(metadata));
501 93 : }
502 :
503 93 : let fut = builder.into_future();
504 93 : let fut = tokio::time::timeout(self.timeout, fut);
505 93 :
506 93 : match fut.await {
507 93 : Ok(Ok(_response)) => Ok(()),
508 0 : Ok(Err(azure)) => Err(azure.into()),
509 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
510 : }
511 93 : };
512 :
513 93 : let res = tokio::select! {
514 93 : res = op => res,
515 93 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
516 : };
517 :
518 93 : let outcome = match res {
519 93 : Ok(_) => AttemptOutcome::Ok,
520 0 : Err(_) => AttemptOutcome::Err,
521 : };
522 93 : let started_at = ScopeGuard::into_inner(started_at);
523 93 : crate::metrics::BUCKET_METRICS
524 93 : .req_seconds
525 93 : .observe_elapsed(kind, outcome, started_at);
526 93 :
527 93 : res
528 93 : }
529 :
530 11 : async fn download(
531 11 : &self,
532 11 : from: &RemotePath,
533 11 : opts: &DownloadOpts,
534 11 : cancel: &CancellationToken,
535 11 : ) -> Result<Download, DownloadError> {
536 11 : let blob_client = self.client.blob_client(self.relative_path_to_name(from));
537 11 :
538 11 : let mut builder = blob_client.get();
539 :
540 11 : if let Some(ref etag) = opts.etag {
541 3 : builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
542 8 : }
543 :
544 11 : if let Some((start, end)) = opts.byte_range() {
545 5 : builder = builder.range(match end {
546 3 : Some(end) => Range::Range(start..end),
547 2 : None => Range::RangeFrom(start..),
548 : });
549 6 : }
550 :
551 11 : let timeout = match opts.kind {
552 0 : DownloadKind::Small => self.small_timeout,
553 11 : DownloadKind::Large => self.timeout,
554 : };
555 :
556 11 : self.download_for_builder(builder, timeout, cancel).await
557 11 : }
558 :
559 86 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
560 86 : self.delete_objects(std::array::from_ref(path), cancel)
561 86 : .await
562 86 : }
563 :
564 93 : async fn delete_objects(
565 93 : &self,
566 93 : paths: &[RemotePath],
567 93 : cancel: &CancellationToken,
568 93 : ) -> anyhow::Result<()> {
569 93 : let kind = RequestKind::Delete;
570 93 : let _permit = self.permit(kind, cancel).await?;
571 93 : let started_at = start_measuring_requests(kind);
572 93 :
573 93 : let op = async {
574 : // TODO batch requests are not supported by the SDK
575 : // https://github.com/Azure/azure-sdk-for-rust/issues/1068
576 205 : for path in paths {
577 112 : #[derive(Debug)]
578 112 : enum AzureOrTimeout {
579 : AzureError(azure_core::Error),
580 : Timeout,
581 : Cancel,
582 112 : }
583 112 : impl Display for AzureOrTimeout {
584 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
585 0 : write!(f, "{self:?}")
586 0 : }
587 : }
588 112 : let warn_threshold = 3;
589 112 : let max_retries = 5;
590 112 : backoff::retry(
591 112 : || async {
592 112 : let blob_client = self.client.blob_client(self.relative_path_to_name(path));
593 112 :
594 112 : let request = blob_client.delete().into_future();
595 :
596 112 : let res = tokio::time::timeout(self.timeout, request).await;
597 :
598 112 : match res {
599 90 : Ok(Ok(_v)) => Ok(()),
600 22 : Ok(Err(azure_err)) => {
601 22 : if let Some(http_err) = azure_err.as_http_error() {
602 22 : if http_err.status() == StatusCode::NotFound {
603 22 : return Ok(());
604 0 : }
605 0 : }
606 0 : Err(AzureOrTimeout::AzureError(azure_err))
607 : }
608 0 : Err(_elapsed) => Err(AzureOrTimeout::Timeout),
609 : }
610 224 : },
611 112 : |err| match err {
612 0 : AzureOrTimeout::AzureError(_) | AzureOrTimeout::Timeout => false,
613 0 : AzureOrTimeout::Cancel => true,
614 112 : },
615 112 : warn_threshold,
616 112 : max_retries,
617 112 : "deleting remote object",
618 112 : cancel,
619 112 : )
620 112 : .await
621 112 : .ok_or_else(|| AzureOrTimeout::Cancel)
622 112 : .and_then(|x| x)
623 112 : .map_err(|e| match e {
624 0 : AzureOrTimeout::AzureError(err) => anyhow::Error::from(err),
625 0 : AzureOrTimeout::Timeout => TimeoutOrCancel::Timeout.into(),
626 0 : AzureOrTimeout::Cancel => TimeoutOrCancel::Cancel.into(),
627 112 : })?;
628 : }
629 93 : Ok(())
630 93 : };
631 :
632 93 : let res = tokio::select! {
633 93 : res = op => res,
634 93 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
635 : };
636 :
637 93 : let started_at = ScopeGuard::into_inner(started_at);
638 93 : crate::metrics::BUCKET_METRICS
639 93 : .req_seconds
640 93 : .observe_elapsed(kind, &res, started_at);
641 93 : res
642 93 : }
643 :
644 0 : fn max_keys_per_delete(&self) -> usize {
645 0 : super::MAX_KEYS_PER_DELETE_AZURE
646 0 : }
647 :
648 1 : async fn copy(
649 1 : &self,
650 1 : from: &RemotePath,
651 1 : to: &RemotePath,
652 1 : cancel: &CancellationToken,
653 1 : ) -> anyhow::Result<()> {
654 1 : let kind = RequestKind::Copy;
655 1 : let _permit = self.permit(kind, cancel).await?;
656 1 : let started_at = start_measuring_requests(kind);
657 1 :
658 1 : let timeout = tokio::time::sleep(self.timeout);
659 1 :
660 1 : let mut copy_status = None;
661 1 :
662 1 : let op = async {
663 1 : let blob_client = self.client.blob_client(self.relative_path_to_name(to));
664 :
665 1 : let source_url = format!(
666 1 : "{}/{}",
667 1 : self.client.url()?,
668 1 : self.relative_path_to_name(from)
669 : );
670 :
671 1 : let builder = blob_client.copy(Url::from_str(&source_url)?);
672 1 : let copy = builder.into_future();
673 :
674 1 : let result = copy.await?;
675 :
676 1 : copy_status = Some(result.copy_status);
677 : loop {
678 1 : match copy_status.as_ref().expect("we always set it to Some") {
679 : CopyStatus::Aborted => {
680 0 : anyhow::bail!("Received abort for copy from {from} to {to}.");
681 : }
682 : CopyStatus::Failed => {
683 0 : anyhow::bail!("Received failure response for copy from {from} to {to}.");
684 : }
685 1 : CopyStatus::Success => return Ok(()),
686 0 : CopyStatus::Pending => (),
687 0 : }
688 0 : // The copy is taking longer. Waiting a second and then re-trying.
689 0 : // TODO estimate time based on copy_progress and adjust time based on that
690 0 : tokio::time::sleep(Duration::from_millis(1000)).await;
691 0 : let properties = blob_client.get_properties().into_future().await?;
692 0 : let Some(status) = properties.blob.properties.copy_status else {
693 0 : tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
694 0 : return Ok(());
695 : };
696 0 : copy_status = Some(status);
697 : }
698 1 : };
699 :
700 1 : let res = tokio::select! {
701 1 : res = op => res,
702 1 : _ = cancel.cancelled() => return Err(anyhow::Error::new(TimeoutOrCancel::Cancel)),
703 1 : _ = timeout => {
704 0 : let e = anyhow::Error::new(TimeoutOrCancel::Timeout);
705 0 : let e = e.context(format!("Timeout, last status: {copy_status:?}"));
706 0 : Err(e)
707 : },
708 : };
709 :
710 1 : let started_at = ScopeGuard::into_inner(started_at);
711 1 : crate::metrics::BUCKET_METRICS
712 1 : .req_seconds
713 1 : .observe_elapsed(kind, &res, started_at);
714 1 : res
715 1 : }
716 :
717 0 : async fn time_travel_recover(
718 0 : &self,
719 0 : _prefix: Option<&RemotePath>,
720 0 : _timestamp: SystemTime,
721 0 : _done_if_after: SystemTime,
722 0 : _cancel: &CancellationToken,
723 0 : ) -> Result<(), TimeTravelError> {
724 0 : // TODO use Azure point in time recovery feature for this
725 0 : // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
726 0 : Err(TimeTravelError::Unimplemented)
727 0 : }
728 : }
729 :
730 : pin_project_lite::pin_project! {
731 : /// Hack to work around not being able to stream once with azure sdk.
732 : ///
733 : /// Azure sdk clones streams around with the assumption that they are like
734 : /// `Arc<tokio::fs::File>` (except not supporting tokio), however our streams are not like
735 : /// that. For example for an `index_part.json` we just have a single chunk of [`Bytes`]
736 : /// representing the whole serialized vec. It could be trivially cloneable and "semi-trivially"
737 : /// seekable, but we can also just re-try the request easier.
738 : #[project = NonSeekableStreamProj]
739 : enum NonSeekableStream<S> {
740 : /// A stream wrappers initial form.
741 : ///
742 : /// Mutex exists to allow moving when cloning. If the sdk changes to do less than 1
743 : /// clone before first request, then this must be changed.
744 : Initial {
745 : inner: std::sync::Mutex<Option<tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>>>,
746 : len: usize,
747 : },
748 : /// The actually readable variant, produced by cloning the Initial variant.
749 : ///
750 : /// The sdk currently always clones once, even without retry policy.
751 : Actual {
752 : #[pin]
753 : inner: tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>,
754 : len: usize,
755 : read_any: bool,
756 : },
757 : /// Most likely unneeded, but left to make life easier, in case more clones are added.
758 : Cloned {
759 : len_was: usize,
760 : }
761 : }
762 : }
763 :
764 : impl<S> NonSeekableStream<S>
765 : where
766 : S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
767 : {
768 93 : fn new(inner: S, len: usize) -> NonSeekableStream<S> {
769 : use tokio_util::compat::TokioAsyncReadCompatExt;
770 :
771 93 : let inner = tokio_util::io::StreamReader::new(inner).compat();
772 93 : let inner = Some(inner);
773 93 : let inner = std::sync::Mutex::new(inner);
774 93 : NonSeekableStream::Initial { inner, len }
775 93 : }
776 : }
777 :
778 : impl<S> std::fmt::Debug for NonSeekableStream<S> {
779 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
780 0 : match self {
781 0 : Self::Initial { len, .. } => f.debug_struct("Initial").field("len", len).finish(),
782 0 : Self::Actual { len, .. } => f.debug_struct("Actual").field("len", len).finish(),
783 0 : Self::Cloned { len_was, .. } => f.debug_struct("Cloned").field("len", len_was).finish(),
784 : }
785 0 : }
786 : }
787 :
788 : impl<S> futures::io::AsyncRead for NonSeekableStream<S>
789 : where
790 : S: Stream<Item = std::io::Result<Bytes>>,
791 : {
792 93 : fn poll_read(
793 93 : self: std::pin::Pin<&mut Self>,
794 93 : cx: &mut std::task::Context<'_>,
795 93 : buf: &mut [u8],
796 93 : ) -> std::task::Poll<std::io::Result<usize>> {
797 93 : match self.project() {
798 : NonSeekableStreamProj::Actual {
799 93 : inner, read_any, ..
800 93 : } => {
801 93 : *read_any = true;
802 93 : inner.poll_read(cx, buf)
803 : }
804 : // NonSeekableStream::Initial does not support reading because it is just much easier
805 : // to have the mutex in place where one does not poll the contents, or that's how it
806 : // seemed originally. If there is a version upgrade which changes the cloning, then
807 : // that support needs to be hacked in.
808 : //
809 : // including {self:?} into the message would be useful, but unsure how to unproject.
810 0 : _ => std::task::Poll::Ready(Err(std::io::Error::new(
811 0 : std::io::ErrorKind::Other,
812 0 : "cloned or initial values cannot be read",
813 0 : ))),
814 : }
815 93 : }
816 : }
817 :
818 : impl<S> Clone for NonSeekableStream<S> {
819 : /// Weird clone implementation exists to support the sdk doing cloning before issuing the first
820 : /// request, see type documentation.
821 93 : fn clone(&self) -> Self {
822 : use NonSeekableStream::*;
823 :
824 93 : match self {
825 93 : Initial { inner, len } => {
826 93 : if let Some(inner) = inner.lock().unwrap().take() {
827 93 : Actual {
828 93 : inner,
829 93 : len: *len,
830 93 : read_any: false,
831 93 : }
832 : } else {
833 0 : Self::Cloned { len_was: *len }
834 : }
835 : }
836 0 : Actual { len, .. } => Cloned { len_was: *len },
837 0 : Cloned { len_was } => Cloned { len_was: *len_was },
838 : }
839 93 : }
840 : }
841 :
842 : #[async_trait::async_trait]
843 : impl<S> azure_core::SeekableStream for NonSeekableStream<S>
844 : where
845 : S: Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync + 'static,
846 : {
847 0 : async fn reset(&mut self) -> azure_core::error::Result<()> {
848 : use NonSeekableStream::*;
849 :
850 0 : let msg = match self {
851 0 : Initial { inner, .. } => {
852 0 : if inner.get_mut().unwrap().is_some() {
853 0 : return Ok(());
854 : } else {
855 0 : "reset after first clone is not supported"
856 : }
857 : }
858 0 : Actual { read_any, .. } if !*read_any => return Ok(()),
859 0 : Actual { .. } => "reset after reading is not supported",
860 0 : Cloned { .. } => "reset after second clone is not supported",
861 : };
862 0 : Err(azure_core::error::Error::new(
863 0 : azure_core::error::ErrorKind::Io,
864 0 : std::io::Error::new(std::io::ErrorKind::Other, msg),
865 0 : ))
866 0 : }
867 :
868 : // Note: it is not documented if this should be the total or remaining length, total passes the
869 : // tests.
870 93 : fn len(&self) -> usize {
871 : use NonSeekableStream::*;
872 93 : match self {
873 93 : Initial { len, .. } => *len,
874 0 : Actual { len, .. } => *len,
875 0 : Cloned { len_was, .. } => *len_was,
876 : }
877 93 : }
878 : }
|