LCOV - code coverage report
Current view: top level - libs/remote_storage/src - azure_blob.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 82.2 % 589 484
Test Date: 2024-11-25 17:48:16 Functions: 54.5 % 77 42

            Line data    Source code
       1              : //! Azure Blob Storage wrapper
       2              : 
       3              : use std::borrow::Cow;
       4              : use std::collections::HashMap;
       5              : use std::env;
       6              : use std::fmt::Display;
       7              : use std::io;
       8              : use std::num::NonZeroU32;
       9              : use std::pin::Pin;
      10              : use std::str::FromStr;
      11              : use std::sync::Arc;
      12              : use std::time::Duration;
      13              : use std::time::SystemTime;
      14              : 
      15              : use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
      16              : use anyhow::Result;
      17              : use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
      18              : use azure_core::{Continuable, RetryOptions};
      19              : use azure_identity::DefaultAzureCredential;
      20              : use azure_storage::StorageCredentials;
      21              : use azure_storage_blobs::blob::CopyStatus;
      22              : use azure_storage_blobs::prelude::ClientBuilder;
      23              : use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient};
      24              : use bytes::Bytes;
      25              : use futures::future::Either;
      26              : use futures::stream::Stream;
      27              : use futures::FutureExt;
      28              : use futures_util::StreamExt;
      29              : use futures_util::TryStreamExt;
      30              : use http_types::{StatusCode, Url};
      31              : use scopeguard::ScopeGuard;
      32              : use tokio_util::sync::CancellationToken;
      33              : use tracing::debug;
      34              : use utils::backoff;
      35              : use utils::backoff::exponential_backoff_duration_seconds;
      36              : 
      37              : use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
      38              : use crate::{
      39              :     config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError,
      40              :     DownloadOpts, Listing, ListingMode, ListingObject, RemotePath, RemoteStorage, StorageMetadata,
      41              :     TimeTravelError, TimeoutOrCancel,
      42              : };
      43              : 
      44              : pub struct AzureBlobStorage {
      45              :     client: ContainerClient,
      46              :     container_name: String,
      47              :     prefix_in_container: Option<String>,
      48              :     max_keys_per_list_response: Option<NonZeroU32>,
      49              :     concurrency_limiter: ConcurrencyLimiter,
      50              :     // Per-request timeout. Accessible for tests.
      51              :     pub timeout: Duration,
      52              : }
      53              : 
      54              : impl AzureBlobStorage {
      55           10 :     pub fn new(azure_config: &AzureConfig, timeout: Duration) -> Result<Self> {
      56           10 :         debug!(
      57            0 :             "Creating azure remote storage for azure container {}",
      58              :             azure_config.container_name
      59              :         );
      60              : 
      61              :         // Use the storage account from the config by default, fall back to env var if not present.
      62           10 :         let account = azure_config.storage_account.clone().unwrap_or_else(|| {
      63           10 :             env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT")
      64           10 :         });
      65              : 
      66              :         // If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
      67              :         // otherwise try the token based credentials.
      68           10 :         let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
      69           10 :             StorageCredentials::access_key(account.clone(), access_key)
      70              :         } else {
      71            0 :             let token_credential = DefaultAzureCredential::default();
      72            0 :             StorageCredentials::token_credential(Arc::new(token_credential))
      73              :         };
      74              : 
      75              :         // we have an outer retry
      76           10 :         let builder = ClientBuilder::new(account, credentials).retry(RetryOptions::none());
      77           10 : 
      78           10 :         let client = builder.container_client(azure_config.container_name.to_owned());
      79              : 
      80           10 :         let max_keys_per_list_response =
      81           10 :             if let Some(limit) = azure_config.max_keys_per_list_response {
      82              :                 Some(
      83            4 :                     NonZeroU32::new(limit as u32)
      84            4 :                         .ok_or_else(|| anyhow::anyhow!("max_keys_per_list_response can't be 0"))?,
      85              :                 )
      86              :             } else {
      87            6 :                 None
      88              :             };
      89              : 
      90           10 :         Ok(AzureBlobStorage {
      91           10 :             client,
      92           10 :             container_name: azure_config.container_name.to_owned(),
      93           10 :             prefix_in_container: azure_config.prefix_in_container.to_owned(),
      94           10 :             max_keys_per_list_response,
      95           10 :             concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
      96           10 :             timeout,
      97           10 :         })
      98           10 :     }
      99              : 
     100          237 :     pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
     101          237 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     102          237 :         let path_string = path.get_path().as_str();
     103          237 :         match &self.prefix_in_container {
     104          237 :             Some(prefix) => {
     105          237 :                 if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     106          237 :                     prefix.clone() + path_string
     107              :                 } else {
     108            0 :                     format!("{prefix}{REMOTE_STORAGE_PREFIX_SEPARATOR}{path_string}")
     109              :                 }
     110              :             }
     111            0 :             None => path_string.to_string(),
     112              :         }
     113          237 :     }
     114              : 
     115          249 :     fn name_to_relative_path(&self, key: &str) -> RemotePath {
     116          249 :         let relative_path =
     117          249 :             match key.strip_prefix(self.prefix_in_container.as_deref().unwrap_or_default()) {
     118          249 :                 Some(stripped) => stripped,
     119              :                 // we rely on Azure to return properly prefixed paths
     120              :                 // for requests with a certain prefix
     121            0 :                 None => panic!(
     122            0 :                     "Key {key} does not start with container prefix {:?}",
     123            0 :                     self.prefix_in_container
     124            0 :                 ),
     125              :             };
     126          249 :         RemotePath(
     127          249 :             relative_path
     128          249 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     129          249 :                 .collect(),
     130          249 :         )
     131          249 :     }
     132              : 
     133           11 :     async fn download_for_builder(
     134           11 :         &self,
     135           11 :         builder: GetBlobBuilder,
     136           11 :         cancel: &CancellationToken,
     137           11 :     ) -> Result<Download, DownloadError> {
     138           11 :         let kind = RequestKind::Get;
     139              : 
     140           11 :         let _permit = self.permit(kind, cancel).await?;
     141           11 :         let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     142           11 :         let cancel_or_timeout_ = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     143           11 : 
     144           11 :         let mut etag = None;
     145           11 :         let mut last_modified = None;
     146           11 :         let mut metadata = HashMap::new();
     147           11 : 
     148           11 :         let started_at = start_measuring_requests(kind);
     149           11 : 
     150           11 :         let download = async {
     151           11 :             let response = builder
     152           11 :                 // convert to concrete Pageable
     153           11 :                 .into_stream()
     154           11 :                 // convert to TryStream
     155           11 :                 .into_stream()
     156           11 :                 .map_err(to_download_error);
     157           11 : 
     158           11 :             // apply per request timeout
     159           11 :             let response = tokio_stream::StreamExt::timeout(response, self.timeout);
     160           11 : 
     161           11 :             // flatten
     162           11 :             let response = response.map(|res| match res {
     163           11 :                 Ok(res) => res,
     164            0 :                 Err(_elapsed) => Err(DownloadError::Timeout),
     165           11 :             });
     166           11 : 
     167           11 :             let mut response = Box::pin(response);
     168              : 
     169           56 :             let Some(part) = response.next().await else {
     170            0 :                 return Err(DownloadError::Other(anyhow::anyhow!(
     171            0 :                     "Azure GET response contained no response body"
     172            0 :                 )));
     173              :             };
     174           11 :             let part = part?;
     175            9 :             if etag.is_none() {
     176            9 :                 etag = Some(part.blob.properties.etag);
     177            9 :             }
     178            9 :             if last_modified.is_none() {
     179            9 :                 last_modified = Some(part.blob.properties.last_modified.into());
     180            9 :             }
     181            9 :             if let Some(blob_meta) = part.blob.metadata {
     182            0 :                 metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned())));
     183            9 :             }
     184              : 
     185              :             // unwrap safety: if these were None, bufs would be empty and we would have returned an error already
     186            9 :             let etag = etag.unwrap();
     187            9 :             let last_modified = last_modified.unwrap();
     188            9 : 
     189            9 :             let tail_stream = response
     190            9 :                 .map(|part| match part {
     191            0 :                     Ok(part) => Either::Left(part.data.map(|r| r.map_err(io::Error::other))),
     192            0 :                     Err(e) => {
     193            0 :                         Either::Right(futures::stream::once(async { Err(io::Error::other(e)) }))
     194              :                     }
     195            9 :                 })
     196            9 :                 .flatten();
     197            9 :             let stream = part
     198            9 :                 .data
     199            9 :                 .map(|r| r.map_err(io::Error::other))
     200            9 :                 .chain(sync_wrapper::SyncStream::new(tail_stream));
     201            9 :             //.chain(SyncStream::from_pin(Box::pin(tail_stream)));
     202            9 : 
     203            9 :             let download_stream = crate::support::DownloadStream::new(cancel_or_timeout_, stream);
     204            9 : 
     205            9 :             Ok(Download {
     206            9 :                 download_stream: Box::pin(download_stream),
     207            9 :                 etag,
     208            9 :                 last_modified,
     209            9 :                 metadata: Some(StorageMetadata(metadata)),
     210            9 :             })
     211           11 :         };
     212              : 
     213           11 :         let download = tokio::select! {
     214           11 :             bufs = download => bufs,
     215           11 :             cancel_or_timeout = cancel_or_timeout => match cancel_or_timeout {
     216            0 :                 TimeoutOrCancel::Timeout => return Err(DownloadError::Timeout),
     217            0 :                 TimeoutOrCancel::Cancel => return Err(DownloadError::Cancelled),
     218              :             },
     219              :         };
     220           11 :         let started_at = ScopeGuard::into_inner(started_at);
     221           11 :         let outcome = match &download {
     222            9 :             Ok(_) => AttemptOutcome::Ok,
     223            2 :             Err(_) => AttemptOutcome::Err,
     224              :         };
     225           11 :         crate::metrics::BUCKET_METRICS
     226           11 :             .req_seconds
     227           11 :             .observe_elapsed(kind, outcome, started_at);
     228           11 :         download
     229           11 :     }
     230              : 
     231          227 :     async fn permit(
     232          227 :         &self,
     233          227 :         kind: RequestKind,
     234          227 :         cancel: &CancellationToken,
     235          227 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     236          227 :         let acquire = self.concurrency_limiter.acquire(kind);
     237          227 : 
     238          227 :         tokio::select! {
     239          227 :             permit = acquire => Ok(permit.expect("never closed")),
     240          227 :             _ = cancel.cancelled() => Err(Cancelled),
     241              :         }
     242          227 :     }
     243              : 
     244            0 :     pub fn container_name(&self) -> &str {
     245            0 :         &self.container_name
     246            0 :     }
     247              : }
     248              : 
     249            0 : fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
     250            0 :     let mut res = Metadata::new();
     251            0 :     for (k, v) in metadata.0.into_iter() {
     252            0 :         res.insert(k, v);
     253            0 :     }
     254            0 :     res
     255            0 : }
     256              : 
     257            3 : fn to_download_error(error: azure_core::Error) -> DownloadError {
     258            3 :     if let Some(http_err) = error.as_http_error() {
     259            3 :         match http_err.status() {
     260            1 :             StatusCode::NotFound => DownloadError::NotFound,
     261            2 :             StatusCode::NotModified => DownloadError::Unmodified,
     262            0 :             StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
     263            0 :             _ => DownloadError::Other(anyhow::Error::new(error)),
     264              :         }
     265              :     } else {
     266            0 :         DownloadError::Other(error.into())
     267              :     }
     268            3 : }
     269              : 
     270              : impl RemoteStorage for AzureBlobStorage {
     271           26 :     fn list_streaming(
     272           26 :         &self,
     273           26 :         prefix: Option<&RemotePath>,
     274           26 :         mode: ListingMode,
     275           26 :         max_keys: Option<NonZeroU32>,
     276           26 :         cancel: &CancellationToken,
     277           26 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     278           26 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     279           26 :         let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
     280           10 :             self.prefix_in_container.clone().map(|mut s| {
     281           10 :                 if !s.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     282            0 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     283           10 :                 }
     284           10 :                 s
     285           10 :             })
     286           26 :         });
     287           26 : 
     288           26 :         async_stream::stream! {
     289           26 :             let _permit = self.permit(RequestKind::List, cancel).await?;
     290           26 : 
     291           26 :             let mut builder = self.client.list_blobs();
     292           26 : 
     293           26 :             if let ListingMode::WithDelimiter = mode {
     294           26 :                 builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     295           26 :             }
     296           26 : 
     297           26 :             if let Some(prefix) = list_prefix {
     298           26 :                 builder = builder.prefix(Cow::from(prefix.to_owned()));
     299           26 :             }
     300           26 : 
     301           26 :             if let Some(limit) = self.max_keys_per_list_response {
     302           26 :                 builder = builder.max_results(MaxResults::new(limit));
     303           26 :             }
     304           26 : 
     305           26 :             let mut next_marker = None;
     306           26 : 
     307           26 :             let mut timeout_try_cnt = 1;
     308           26 : 
     309           26 :             'outer: loop {
     310           26 :                 let mut builder = builder.clone();
     311           26 :                 if let Some(marker) = next_marker.clone() {
     312           26 :                     builder = builder.marker(marker);
     313           26 :                 }
     314           26 :                 // Azure Blob Rust SDK does not expose the list blob API directly. Users have to use
     315           26 :                 // their pageable iterator wrapper that returns all keys as a stream. We want to have
     316           26 :                 // full control of paging, and therefore we only take the first item from the stream.
     317           26 :                 let mut response_stream = builder.into_stream();
     318           26 :                 let response = response_stream.next();
     319           26 :                 // Timeout mechanism: Azure client will sometimes stuck on a request, but retrying that request
     320           26 :                 // would immediately succeed. Therefore, we use exponential backoff timeout to retry the request.
     321           26 :                 // (Usually, exponential backoff is used to determine the sleep time between two retries.) We
     322           26 :                 // start with 10.0 second timeout, and double the timeout for each failure, up to 5 failures.
     323           26 :                 // timeout = min(5 * (1.0+1.0)^n, self.timeout).
     324           26 :                 let this_timeout = (5.0 * exponential_backoff_duration_seconds(timeout_try_cnt, 1.0, self.timeout.as_secs_f64())).min(self.timeout.as_secs_f64());
     325           26 :                 let response = tokio::time::timeout(Duration::from_secs_f64(this_timeout), response);
     326           45 :                 let response = response.map(|res| {
     327           45 :                     match res {
     328           45 :                         Ok(Some(Ok(res))) => Ok(Some(res)),
     329           26 :                         Ok(Some(Err(e)))  => Err(to_download_error(e)),
     330           26 :                         Ok(None) => Ok(None),
     331           26 :                         Err(_elasped) => Err(DownloadError::Timeout),
     332           26 :                     }
     333           45 :                 });
     334           26 :                 let mut max_keys = max_keys.map(|mk| mk.get());
     335           26 :                 let next_item = tokio::select! {
     336           26 :                     op = response => op,
     337           26 :                     _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     338           26 :                 };
     339           26 : 
     340           26 :                 if let Err(DownloadError::Timeout) = &next_item {
     341           26 :                     timeout_try_cnt += 1;
     342           26 :                     if timeout_try_cnt <= 5 {
     343           26 :                         continue;
     344           26 :                     }
     345           26 :                 }
     346           26 : 
     347           26 :                 let next_item = next_item?;
     348           26 : 
     349           26 :                 if timeout_try_cnt >= 2 {
     350           26 :                     tracing::warn!("Azure Blob Storage list timed out and succeeded after {} tries", timeout_try_cnt);
     351           26 :                 }
     352           26 :                 timeout_try_cnt = 1;
     353           26 : 
     354           26 :                 let Some(entry) = next_item else {
     355           26 :                     // The list is complete, so yield it.
     356           26 :                     break;
     357           26 :                 };
     358           26 : 
     359           26 :                 let mut res = Listing::default();
     360           26 :                 next_marker = entry.continuation();
     361           26 :                 let prefix_iter = entry
     362           26 :                     .blobs
     363           26 :                     .prefixes()
     364           52 :                     .map(|prefix| self.name_to_relative_path(&prefix.name));
     365           26 :                 res.prefixes.extend(prefix_iter);
     366           26 : 
     367           26 :                 let blob_iter = entry
     368           26 :                     .blobs
     369           26 :                     .blobs()
     370          197 :                     .map(|k| ListingObject{
     371          197 :                         key: self.name_to_relative_path(&k.name),
     372          197 :                         last_modified: k.properties.last_modified.into(),
     373          197 :                         size: k.properties.content_length,
     374          197 :                     }
     375           26 :                 );
     376           26 : 
     377           26 :                 for key in blob_iter {
     378           26 :                     res.keys.push(key);
     379           26 : 
     380           26 :                     if let Some(mut mk) = max_keys {
     381           26 :                         assert!(mk > 0);
     382           26 :                         mk -= 1;
     383           26 :                         if mk == 0 {
     384           26 :                             yield Ok(res); // limit reached
     385           26 :                             break 'outer;
     386           26 :                         }
     387           26 :                         max_keys = Some(mk);
     388           26 :                     }
     389           26 :                 }
     390           26 :                 yield Ok(res);
     391           26 : 
     392           26 :                 // We are done here
     393           26 :                 if next_marker.is_none() {
     394           26 :                     break;
     395           26 :                 }
     396           26 :             }
     397           26 :         }
     398           26 :     }
     399              : 
     400            3 :     async fn head_object(
     401            3 :         &self,
     402            3 :         key: &RemotePath,
     403            3 :         cancel: &CancellationToken,
     404            3 :     ) -> Result<ListingObject, DownloadError> {
     405            3 :         let kind = RequestKind::Head;
     406            3 :         let _permit = self.permit(kind, cancel).await?;
     407              : 
     408            3 :         let started_at = start_measuring_requests(kind);
     409            3 : 
     410            3 :         let blob_client = self.client.blob_client(self.relative_path_to_name(key));
     411            3 :         let properties_future = blob_client.get_properties().into_future();
     412            3 : 
     413            3 :         let properties_future = tokio::time::timeout(self.timeout, properties_future);
     414              : 
     415            3 :         let res = tokio::select! {
     416            3 :             res = properties_future => res,
     417            3 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     418              :         };
     419              : 
     420            3 :         if let Ok(inner) = &res {
     421            3 :             // do not incl. timeouts as errors in metrics but cancellations
     422            3 :             let started_at = ScopeGuard::into_inner(started_at);
     423            3 :             crate::metrics::BUCKET_METRICS
     424            3 :                 .req_seconds
     425            3 :                 .observe_elapsed(kind, inner, started_at);
     426            3 :         }
     427              : 
     428            3 :         let data = match res {
     429            2 :             Ok(Ok(data)) => Ok(data),
     430            1 :             Ok(Err(sdk)) => Err(to_download_error(sdk)),
     431            0 :             Err(_timeout) => Err(DownloadError::Timeout),
     432            1 :         }?;
     433              : 
     434            2 :         let properties = data.blob.properties;
     435            2 :         Ok(ListingObject {
     436            2 :             key: key.to_owned(),
     437            2 :             last_modified: SystemTime::from(properties.last_modified),
     438            2 :             size: properties.content_length,
     439            2 :         })
     440            3 :     }
     441              : 
     442           93 :     async fn upload(
     443           93 :         &self,
     444           93 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     445           93 :         data_size_bytes: usize,
     446           93 :         to: &RemotePath,
     447           93 :         metadata: Option<StorageMetadata>,
     448           93 :         cancel: &CancellationToken,
     449           93 :     ) -> anyhow::Result<()> {
     450           93 :         let kind = RequestKind::Put;
     451           93 :         let _permit = self.permit(kind, cancel).await?;
     452              : 
     453           93 :         let started_at = start_measuring_requests(kind);
     454           93 : 
     455           93 :         let op = async {
     456           93 :             let blob_client = self.client.blob_client(self.relative_path_to_name(to));
     457           93 : 
     458           93 :             let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
     459           93 :                 Box::pin(from);
     460           93 : 
     461           93 :             let from = NonSeekableStream::new(from, data_size_bytes);
     462           93 : 
     463           93 :             let body = azure_core::Body::SeekableStream(Box::new(from));
     464           93 : 
     465           93 :             let mut builder = blob_client.put_block_blob(body);
     466              : 
     467           93 :             if let Some(metadata) = metadata {
     468            0 :                 builder = builder.metadata(to_azure_metadata(metadata));
     469           93 :             }
     470              : 
     471           93 :             let fut = builder.into_future();
     472           93 :             let fut = tokio::time::timeout(self.timeout, fut);
     473           93 : 
     474          574 :             match fut.await {
     475           93 :                 Ok(Ok(_response)) => Ok(()),
     476            0 :                 Ok(Err(azure)) => Err(azure.into()),
     477            0 :                 Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     478              :             }
     479           93 :         };
     480              : 
     481           93 :         let res = tokio::select! {
     482           93 :             res = op => res,
     483           93 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     484              :         };
     485              : 
     486           93 :         let outcome = match res {
     487           93 :             Ok(_) => AttemptOutcome::Ok,
     488            0 :             Err(_) => AttemptOutcome::Err,
     489              :         };
     490           93 :         let started_at = ScopeGuard::into_inner(started_at);
     491           93 :         crate::metrics::BUCKET_METRICS
     492           93 :             .req_seconds
     493           93 :             .observe_elapsed(kind, outcome, started_at);
     494           93 : 
     495           93 :         res
     496           93 :     }
     497              : 
     498           11 :     async fn download(
     499           11 :         &self,
     500           11 :         from: &RemotePath,
     501           11 :         opts: &DownloadOpts,
     502           11 :         cancel: &CancellationToken,
     503           11 :     ) -> Result<Download, DownloadError> {
     504           11 :         let blob_client = self.client.blob_client(self.relative_path_to_name(from));
     505           11 : 
     506           11 :         let mut builder = blob_client.get();
     507              : 
     508           11 :         if let Some(ref etag) = opts.etag {
     509            3 :             builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
     510            8 :         }
     511              : 
     512           11 :         if let Some((start, end)) = opts.byte_range() {
     513            5 :             builder = builder.range(match end {
     514            3 :                 Some(end) => Range::Range(start..end),
     515            2 :                 None => Range::RangeFrom(start..),
     516              :             });
     517            6 :         }
     518              : 
     519           56 :         self.download_for_builder(builder, cancel).await
     520           11 :     }
     521              : 
     522           86 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     523           86 :         self.delete_objects(std::array::from_ref(path), cancel)
     524          431 :             .await
     525           86 :     }
     526              : 
     527           93 :     async fn delete_objects<'a>(
     528           93 :         &self,
     529           93 :         paths: &'a [RemotePath],
     530           93 :         cancel: &CancellationToken,
     531           93 :     ) -> anyhow::Result<()> {
     532           93 :         let kind = RequestKind::Delete;
     533           93 :         let _permit = self.permit(kind, cancel).await?;
     534           93 :         let started_at = start_measuring_requests(kind);
     535           93 : 
     536           93 :         let op = async {
     537              :             // TODO batch requests are not supported by the SDK
     538              :             // https://github.com/Azure/azure-sdk-for-rust/issues/1068
     539          205 :             for path in paths {
     540          112 :                 #[derive(Debug)]
     541          112 :                 enum AzureOrTimeout {
     542              :                     AzureError(azure_core::Error),
     543              :                     Timeout,
     544              :                     Cancel,
     545          112 :                 }
     546          112 :                 impl Display for AzureOrTimeout {
     547            0 :                     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     548            0 :                         write!(f, "{self:?}")
     549            0 :                     }
     550              :                 }
     551          112 :                 let warn_threshold = 3;
     552          112 :                 let max_retries = 5;
     553          112 :                 backoff::retry(
     554          112 :                     || async {
     555          112 :                         let blob_client = self.client.blob_client(self.relative_path_to_name(path));
     556          112 : 
     557          112 :                         let request = blob_client.delete().into_future();
     558              : 
     559          561 :                         let res = tokio::time::timeout(self.timeout, request).await;
     560              : 
     561          112 :                         match res {
     562           90 :                             Ok(Ok(_v)) => Ok(()),
     563           22 :                             Ok(Err(azure_err)) => {
     564           22 :                                 if let Some(http_err) = azure_err.as_http_error() {
     565           22 :                                     if http_err.status() == StatusCode::NotFound {
     566           22 :                                         return Ok(());
     567            0 :                                     }
     568            0 :                                 }
     569            0 :                                 Err(AzureOrTimeout::AzureError(azure_err))
     570              :                             }
     571            0 :                             Err(_elapsed) => Err(AzureOrTimeout::Timeout),
     572              :                         }
     573          224 :                     },
     574          112 :                     |err| match err {
     575            0 :                         AzureOrTimeout::AzureError(_) | AzureOrTimeout::Timeout => false,
     576            0 :                         AzureOrTimeout::Cancel => true,
     577          112 :                     },
     578          112 :                     warn_threshold,
     579          112 :                     max_retries,
     580          112 :                     "deleting remote object",
     581          112 :                     cancel,
     582          112 :                 )
     583          561 :                 .await
     584          112 :                 .ok_or_else(|| AzureOrTimeout::Cancel)
     585          112 :                 .and_then(|x| x)
     586          112 :                 .map_err(|e| match e {
     587            0 :                     AzureOrTimeout::AzureError(err) => anyhow::Error::from(err),
     588            0 :                     AzureOrTimeout::Timeout => TimeoutOrCancel::Timeout.into(),
     589            0 :                     AzureOrTimeout::Cancel => TimeoutOrCancel::Cancel.into(),
     590          112 :                 })?;
     591              :             }
     592           93 :             Ok(())
     593           93 :         };
     594              : 
     595           93 :         let res = tokio::select! {
     596           93 :             res = op => res,
     597           93 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     598              :         };
     599              : 
     600           93 :         let started_at = ScopeGuard::into_inner(started_at);
     601           93 :         crate::metrics::BUCKET_METRICS
     602           93 :             .req_seconds
     603           93 :             .observe_elapsed(kind, &res, started_at);
     604           93 :         res
     605           93 :     }
     606              : 
     607            1 :     async fn copy(
     608            1 :         &self,
     609            1 :         from: &RemotePath,
     610            1 :         to: &RemotePath,
     611            1 :         cancel: &CancellationToken,
     612            1 :     ) -> anyhow::Result<()> {
     613            1 :         let kind = RequestKind::Copy;
     614            1 :         let _permit = self.permit(kind, cancel).await?;
     615            1 :         let started_at = start_measuring_requests(kind);
     616            1 : 
     617            1 :         let timeout = tokio::time::sleep(self.timeout);
     618            1 : 
     619            1 :         let mut copy_status = None;
     620            1 : 
     621            1 :         let op = async {
     622            1 :             let blob_client = self.client.blob_client(self.relative_path_to_name(to));
     623              : 
     624            1 :             let source_url = format!(
     625            1 :                 "{}/{}",
     626            1 :                 self.client.url()?,
     627            1 :                 self.relative_path_to_name(from)
     628              :             );
     629              : 
     630            1 :             let builder = blob_client.copy(Url::from_str(&source_url)?);
     631            1 :             let copy = builder.into_future();
     632              : 
     633            5 :             let result = copy.await?;
     634              : 
     635            1 :             copy_status = Some(result.copy_status);
     636              :             loop {
     637            1 :                 match copy_status.as_ref().expect("we always set it to Some") {
     638              :                     CopyStatus::Aborted => {
     639            0 :                         anyhow::bail!("Received abort for copy from {from} to {to}.");
     640              :                     }
     641              :                     CopyStatus::Failed => {
     642            0 :                         anyhow::bail!("Received failure response for copy from {from} to {to}.");
     643              :                     }
     644            1 :                     CopyStatus::Success => return Ok(()),
     645            0 :                     CopyStatus::Pending => (),
     646            0 :                 }
     647            0 :                 // The copy is taking longer. Waiting a second and then re-trying.
     648            0 :                 // TODO estimate time based on copy_progress and adjust time based on that
     649            0 :                 tokio::time::sleep(Duration::from_millis(1000)).await;
     650            0 :                 let properties = blob_client.get_properties().into_future().await?;
     651            0 :                 let Some(status) = properties.blob.properties.copy_status else {
     652            0 :                     tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
     653            0 :                     return Ok(());
     654              :                 };
     655            0 :                 copy_status = Some(status);
     656              :             }
     657            1 :         };
     658              : 
     659            1 :         let res = tokio::select! {
     660            1 :             res = op => res,
     661            1 :             _ = cancel.cancelled() => return Err(anyhow::Error::new(TimeoutOrCancel::Cancel)),
     662            1 :             _ = timeout => {
     663            0 :                 let e = anyhow::Error::new(TimeoutOrCancel::Timeout);
     664            0 :                 let e = e.context(format!("Timeout, last status: {copy_status:?}"));
     665            0 :                 Err(e)
     666              :             },
     667              :         };
     668              : 
     669            1 :         let started_at = ScopeGuard::into_inner(started_at);
     670            1 :         crate::metrics::BUCKET_METRICS
     671            1 :             .req_seconds
     672            1 :             .observe_elapsed(kind, &res, started_at);
     673            1 :         res
     674            1 :     }
     675              : 
     676            0 :     async fn time_travel_recover(
     677            0 :         &self,
     678            0 :         _prefix: Option<&RemotePath>,
     679            0 :         _timestamp: SystemTime,
     680            0 :         _done_if_after: SystemTime,
     681            0 :         _cancel: &CancellationToken,
     682            0 :     ) -> Result<(), TimeTravelError> {
     683            0 :         // TODO use Azure point in time recovery feature for this
     684            0 :         // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
     685            0 :         Err(TimeTravelError::Unimplemented)
     686            0 :     }
     687              : }
     688              : 
     689              : pin_project_lite::pin_project! {
     690              :     /// Hack to work around not being able to stream once with azure sdk.
     691              :     ///
     692              :     /// Azure sdk clones streams around with the assumption that they are like
     693              :     /// `Arc<tokio::fs::File>` (except not supporting tokio), however our streams are not like
     694              :     /// that. For example for an `index_part.json` we just have a single chunk of [`Bytes`]
     695              :     /// representing the whole serialized vec. It could be trivially cloneable and "semi-trivially"
     696              :     /// seekable, but we can also just re-try the request easier.
     697              :     #[project = NonSeekableStreamProj]
     698              :     enum NonSeekableStream<S> {
     699              :         /// A stream wrappers initial form.
     700              :         ///
     701              :         /// Mutex exists to allow moving when cloning. If the sdk changes to do less than 1
     702              :         /// clone before first request, then this must be changed.
     703              :         Initial {
     704              :             inner: std::sync::Mutex<Option<tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>>>,
     705              :             len: usize,
     706              :         },
     707              :         /// The actually readable variant, produced by cloning the Initial variant.
     708              :         ///
     709              :         /// The sdk currently always clones once, even without retry policy.
     710              :         Actual {
     711              :             #[pin]
     712              :             inner: tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>,
     713              :             len: usize,
     714              :             read_any: bool,
     715              :         },
     716              :         /// Most likely unneeded, but left to make life easier, in case more clones are added.
     717              :         Cloned {
     718              :             len_was: usize,
     719              :         }
     720              :     }
     721              : }
     722              : 
     723              : impl<S> NonSeekableStream<S>
     724              : where
     725              :     S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     726              : {
     727           93 :     fn new(inner: S, len: usize) -> NonSeekableStream<S> {
     728              :         use tokio_util::compat::TokioAsyncReadCompatExt;
     729              : 
     730           93 :         let inner = tokio_util::io::StreamReader::new(inner).compat();
     731           93 :         let inner = Some(inner);
     732           93 :         let inner = std::sync::Mutex::new(inner);
     733           93 :         NonSeekableStream::Initial { inner, len }
     734           93 :     }
     735              : }
     736              : 
     737              : impl<S> std::fmt::Debug for NonSeekableStream<S> {
     738            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     739            0 :         match self {
     740            0 :             Self::Initial { len, .. } => f.debug_struct("Initial").field("len", len).finish(),
     741            0 :             Self::Actual { len, .. } => f.debug_struct("Actual").field("len", len).finish(),
     742            0 :             Self::Cloned { len_was, .. } => f.debug_struct("Cloned").field("len", len_was).finish(),
     743              :         }
     744            0 :     }
     745              : }
     746              : 
     747              : impl<S> futures::io::AsyncRead for NonSeekableStream<S>
     748              : where
     749              :     S: Stream<Item = std::io::Result<Bytes>>,
     750              : {
     751           93 :     fn poll_read(
     752           93 :         self: std::pin::Pin<&mut Self>,
     753           93 :         cx: &mut std::task::Context<'_>,
     754           93 :         buf: &mut [u8],
     755           93 :     ) -> std::task::Poll<std::io::Result<usize>> {
     756           93 :         match self.project() {
     757              :             NonSeekableStreamProj::Actual {
     758           93 :                 inner, read_any, ..
     759           93 :             } => {
     760           93 :                 *read_any = true;
     761           93 :                 inner.poll_read(cx, buf)
     762              :             }
     763              :             // NonSeekableStream::Initial does not support reading because it is just much easier
     764              :             // to have the mutex in place where one does not poll the contents, or that's how it
     765              :             // seemed originally. If there is a version upgrade which changes the cloning, then
     766              :             // that support needs to be hacked in.
     767              :             //
     768              :             // including {self:?} into the message would be useful, but unsure how to unproject.
     769            0 :             _ => std::task::Poll::Ready(Err(std::io::Error::new(
     770            0 :                 std::io::ErrorKind::Other,
     771            0 :                 "cloned or initial values cannot be read",
     772            0 :             ))),
     773              :         }
     774           93 :     }
     775              : }
     776              : 
     777              : impl<S> Clone for NonSeekableStream<S> {
     778              :     /// Weird clone implementation exists to support the sdk doing cloning before issuing the first
     779              :     /// request, see type documentation.
     780           93 :     fn clone(&self) -> Self {
     781              :         use NonSeekableStream::*;
     782              : 
     783           93 :         match self {
     784           93 :             Initial { inner, len } => {
     785           93 :                 if let Some(inner) = inner.lock().unwrap().take() {
     786           93 :                     Actual {
     787           93 :                         inner,
     788           93 :                         len: *len,
     789           93 :                         read_any: false,
     790           93 :                     }
     791              :                 } else {
     792            0 :                     Self::Cloned { len_was: *len }
     793              :                 }
     794              :             }
     795            0 :             Actual { len, .. } => Cloned { len_was: *len },
     796            0 :             Cloned { len_was } => Cloned { len_was: *len_was },
     797              :         }
     798           93 :     }
     799              : }
     800              : 
     801              : #[async_trait::async_trait]
     802              : impl<S> azure_core::SeekableStream for NonSeekableStream<S>
     803              : where
     804              :     S: Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync + 'static,
     805              : {
     806            0 :     async fn reset(&mut self) -> azure_core::error::Result<()> {
     807              :         use NonSeekableStream::*;
     808              : 
     809            0 :         let msg = match self {
     810            0 :             Initial { inner, .. } => {
     811            0 :                 if inner.get_mut().unwrap().is_some() {
     812            0 :                     return Ok(());
     813              :                 } else {
     814            0 :                     "reset after first clone is not supported"
     815              :                 }
     816              :             }
     817            0 :             Actual { read_any, .. } if !*read_any => return Ok(()),
     818            0 :             Actual { .. } => "reset after reading is not supported",
     819            0 :             Cloned { .. } => "reset after second clone is not supported",
     820              :         };
     821            0 :         Err(azure_core::error::Error::new(
     822            0 :             azure_core::error::ErrorKind::Io,
     823            0 :             std::io::Error::new(std::io::ErrorKind::Other, msg),
     824            0 :         ))
     825            0 :     }
     826              : 
     827              :     // Note: it is not documented if this should be the total or remaining length, total passes the
     828              :     // tests.
     829           93 :     fn len(&self) -> usize {
     830              :         use NonSeekableStream::*;
     831           93 :         match self {
     832           93 :             Initial { len, .. } => *len,
     833            0 :             Actual { len, .. } => *len,
     834            0 :             Cloned { len_was, .. } => *len_was,
     835              :         }
     836           93 :     }
     837              : }
        

Generated by: LCOV version 2.1-beta