LCOV - code coverage report
Current view: top level - libs/remote_storage/src - azure_blob.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 76.9 % 696 535
Test Date: 2025-05-26 10:37:33 Functions: 49.5 % 93 46

            Line data    Source code
       1              : //! Azure Blob Storage wrapper
       2              : 
       3              : use std::borrow::Cow;
       4              : use std::collections::HashMap;
       5              : use std::fmt::Display;
       6              : use std::num::NonZeroU32;
       7              : use std::pin::Pin;
       8              : use std::str::FromStr;
       9              : use std::sync::Arc;
      10              : use std::time::{Duration, SystemTime};
      11              : use std::{env, io};
      12              : 
      13              : use anyhow::{Context, Result};
      14              : use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
      15              : use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
      16              : use azure_storage::StorageCredentials;
      17              : use azure_storage_blobs::blob::operations::GetBlobBuilder;
      18              : use azure_storage_blobs::blob::{Blob, CopyStatus};
      19              : use azure_storage_blobs::container::operations::ListBlobsBuilder;
      20              : use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
      21              : use bytes::Bytes;
      22              : use futures::FutureExt;
      23              : use futures::future::Either;
      24              : use futures::stream::Stream;
      25              : use futures_util::{StreamExt, TryStreamExt};
      26              : use http_types::{StatusCode, Url};
      27              : use scopeguard::ScopeGuard;
      28              : use tokio_util::sync::CancellationToken;
      29              : use tracing::debug;
      30              : use utils::backoff;
      31              : use utils::backoff::exponential_backoff_duration_seconds;
      32              : 
      33              : use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
      34              : use crate::config::AzureConfig;
      35              : use crate::error::Cancelled;
      36              : use crate::metrics::{AttemptOutcome, RequestKind, start_measuring_requests};
      37              : use crate::{
      38              :     ConcurrencyLimiter, Download, DownloadError, DownloadKind, DownloadOpts, Listing, ListingMode,
      39              :     ListingObject, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
      40              : };
      41              : 
      42              : pub struct AzureBlobStorage {
      43              :     client: ContainerClient,
      44              :     container_name: String,
      45              :     prefix_in_container: Option<String>,
      46              :     max_keys_per_list_response: Option<NonZeroU32>,
      47              :     concurrency_limiter: ConcurrencyLimiter,
      48              :     // Per-request timeout. Accessible for tests.
      49              :     pub timeout: Duration,
      50              : 
      51              :     // Alternative timeout used for metadata objects which are expected to be small
      52              :     pub small_timeout: Duration,
      53              : }
      54              : 
      55              : impl AzureBlobStorage {
      56           10 :     pub fn new(
      57           10 :         azure_config: &AzureConfig,
      58           10 :         timeout: Duration,
      59           10 :         small_timeout: Duration,
      60           10 :     ) -> Result<Self> {
      61           10 :         debug!(
      62            0 :             "Creating azure remote storage for azure container {}",
      63              :             azure_config.container_name
      64              :         );
      65              : 
      66              :         // Use the storage account from the config by default, fall back to env var if not present.
      67           10 :         let account = azure_config.storage_account.clone().unwrap_or_else(|| {
      68           10 :             env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT")
      69           10 :         });
      70              : 
      71              :         // If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
      72              :         // otherwise try the token based credentials.
      73           10 :         let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
      74           10 :             StorageCredentials::access_key(account.clone(), access_key)
      75              :         } else {
      76            0 :             let token_credential = azure_identity::create_default_credential()
      77            0 :                 .context("trying to obtain Azure default credentials")?;
      78            0 :             StorageCredentials::token_credential(token_credential)
      79              :         };
      80              : 
      81           10 :         let builder = ClientBuilder::new(account, credentials)
      82           10 :             // we have an outer retry
      83           10 :             .retry(RetryOptions::none())
      84           10 :             // Customize transport to configure conneciton pooling
      85           10 :             .transport(TransportOptions::new(Self::reqwest_client(
      86           10 :                 azure_config.conn_pool_size,
      87           10 :             )));
      88           10 : 
      89           10 :         let client = builder.container_client(azure_config.container_name.to_owned());
      90              : 
      91           10 :         let max_keys_per_list_response =
      92           10 :             if let Some(limit) = azure_config.max_keys_per_list_response {
      93              :                 Some(
      94            4 :                     NonZeroU32::new(limit as u32)
      95            4 :                         .ok_or_else(|| anyhow::anyhow!("max_keys_per_list_response can't be 0"))?,
      96              :                 )
      97              :             } else {
      98            6 :                 None
      99              :             };
     100              : 
     101           10 :         Ok(AzureBlobStorage {
     102           10 :             client,
     103           10 :             container_name: azure_config.container_name.to_owned(),
     104           10 :             prefix_in_container: azure_config.prefix_in_container.to_owned(),
     105           10 :             max_keys_per_list_response,
     106           10 :             concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
     107           10 :             timeout,
     108           10 :             small_timeout,
     109           10 :         })
     110           10 :     }
     111              : 
     112           10 :     fn reqwest_client(conn_pool_size: usize) -> Arc<dyn HttpClient> {
     113           10 :         let client = reqwest::ClientBuilder::new()
     114           10 :             .pool_max_idle_per_host(conn_pool_size)
     115           10 :             .build()
     116           10 :             .expect("failed to build `reqwest` client");
     117           10 :         Arc::new(client)
     118           10 :     }
     119              : 
     120          237 :     pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
     121          237 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     122          237 :         let path_string = path.get_path().as_str();
     123          237 :         match &self.prefix_in_container {
     124          237 :             Some(prefix) => {
     125          237 :                 if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     126          237 :                     prefix.clone() + path_string
     127              :                 } else {
     128            0 :                     format!("{prefix}{REMOTE_STORAGE_PREFIX_SEPARATOR}{path_string}")
     129              :                 }
     130              :             }
     131            0 :             None => path_string.to_string(),
     132              :         }
     133          237 :     }
     134              : 
     135          249 :     fn name_to_relative_path(&self, key: &str) -> RemotePath {
     136          249 :         let relative_path =
     137          249 :             match key.strip_prefix(self.prefix_in_container.as_deref().unwrap_or_default()) {
     138          249 :                 Some(stripped) => stripped,
     139              :                 // we rely on Azure to return properly prefixed paths
     140              :                 // for requests with a certain prefix
     141            0 :                 None => panic!(
     142            0 :                     "Key {key} does not start with container prefix {:?}",
     143            0 :                     self.prefix_in_container
     144            0 :                 ),
     145              :             };
     146          249 :         RemotePath(
     147          249 :             relative_path
     148          249 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     149          249 :                 .collect(),
     150          249 :         )
     151          249 :     }
     152              : 
     153           11 :     async fn download_for_builder(
     154           11 :         &self,
     155           11 :         builder: GetBlobBuilder,
     156           11 :         timeout: Duration,
     157           11 :         cancel: &CancellationToken,
     158           11 :     ) -> Result<Download, DownloadError> {
     159           11 :         let kind = RequestKind::Get;
     160              : 
     161           11 :         let _permit = self.permit(kind, cancel).await?;
     162           11 :         let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     163           11 :         let cancel_or_timeout_ = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     164           11 : 
     165           11 :         let mut etag = None;
     166           11 :         let mut last_modified = None;
     167           11 :         let mut metadata = HashMap::new();
     168           11 : 
     169           11 :         let started_at = start_measuring_requests(kind);
     170           11 : 
     171           11 :         let download = async {
     172           11 :             let response = builder
     173           11 :                 // convert to concrete Pageable
     174           11 :                 .into_stream()
     175           11 :                 // convert to TryStream
     176           11 :                 .into_stream()
     177           11 :                 .map_err(to_download_error);
     178           11 : 
     179           11 :             // apply per request timeout
     180           11 :             let response = tokio_stream::StreamExt::timeout(response, timeout);
     181           11 : 
     182           11 :             // flatten
     183           11 :             let response = response.map(|res| match res {
     184           11 :                 Ok(res) => res,
     185            0 :                 Err(_elapsed) => Err(DownloadError::Timeout),
     186           11 :             });
     187           11 : 
     188           11 :             let mut response = Box::pin(response);
     189              : 
     190           11 :             let Some(part) = response.next().await else {
     191            0 :                 return Err(DownloadError::Other(anyhow::anyhow!(
     192            0 :                     "Azure GET response contained no response body"
     193            0 :                 )));
     194              :             };
     195           11 :             let part = part?;
     196            9 :             if etag.is_none() {
     197            9 :                 etag = Some(part.blob.properties.etag);
     198            9 :             }
     199            9 :             if last_modified.is_none() {
     200            9 :                 last_modified = Some(part.blob.properties.last_modified.into());
     201            9 :             }
     202            9 :             if let Some(blob_meta) = part.blob.metadata {
     203            0 :                 metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned())));
     204            9 :             }
     205              : 
     206              :             // unwrap safety: if these were None, bufs would be empty and we would have returned an error already
     207            9 :             let etag = etag.unwrap();
     208            9 :             let last_modified = last_modified.unwrap();
     209            9 : 
     210            9 :             let tail_stream = response
     211            9 :                 .map(|part| match part {
     212            0 :                     Ok(part) => Either::Left(part.data.map(|r| r.map_err(io::Error::other))),
     213            0 :                     Err(e) => {
     214            0 :                         Either::Right(futures::stream::once(async { Err(io::Error::other(e)) }))
     215              :                     }
     216            9 :                 })
     217            9 :                 .flatten();
     218            9 :             let stream = part
     219            9 :                 .data
     220            9 :                 .map(|r| r.map_err(io::Error::other))
     221            9 :                 .chain(sync_wrapper::SyncStream::new(tail_stream));
     222            9 :             //.chain(SyncStream::from_pin(Box::pin(tail_stream)));
     223            9 : 
     224            9 :             let download_stream = crate::support::DownloadStream::new(cancel_or_timeout_, stream);
     225            9 : 
     226            9 :             Ok(Download {
     227            9 :                 download_stream: Box::pin(download_stream),
     228            9 :                 etag,
     229            9 :                 last_modified,
     230            9 :                 metadata: Some(StorageMetadata(metadata)),
     231            9 :             })
     232           11 :         };
     233              : 
     234           11 :         let download = tokio::select! {
     235           11 :             bufs = download => bufs,
     236           11 :             cancel_or_timeout = cancel_or_timeout => match cancel_or_timeout {
     237            0 :                 TimeoutOrCancel::Timeout => return Err(DownloadError::Timeout),
     238            0 :                 TimeoutOrCancel::Cancel => return Err(DownloadError::Cancelled),
     239              :             },
     240              :         };
     241           11 :         let started_at = ScopeGuard::into_inner(started_at);
     242           11 :         let outcome = match &download {
     243            9 :             Ok(_) => AttemptOutcome::Ok,
     244              :             // At this level in the stack 404 and 304 responses do not indicate an error.
     245              :             // There's expected cases when a blob may not exist or hasn't been modified since
     246              :             // the last get (e.g. probing for timeline indices and heatmap downloads).
     247              :             // Callers should handle errors if they are unexpected.
     248            2 :             Err(DownloadError::NotFound | DownloadError::Unmodified) => AttemptOutcome::Ok,
     249            0 :             Err(_) => AttemptOutcome::Err,
     250              :         };
     251           11 :         crate::metrics::BUCKET_METRICS
     252           11 :             .req_seconds
     253           11 :             .observe_elapsed(kind, outcome, started_at);
     254           11 :         download
     255           11 :     }
     256              : 
     257           26 :     fn list_streaming_for_fn<T: Default + ListingCollector>(
     258           26 :         &self,
     259           26 :         prefix: Option<&RemotePath>,
     260           26 :         mode: ListingMode,
     261           26 :         max_keys: Option<NonZeroU32>,
     262           26 :         cancel: &CancellationToken,
     263           26 :         request_kind: RequestKind,
     264           26 :         customize_builder: impl Fn(ListBlobsBuilder) -> ListBlobsBuilder,
     265           26 :     ) -> impl Stream<Item = Result<T, DownloadError>> {
     266           26 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     267           26 :         let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
     268           10 :             self.prefix_in_container.clone().map(|mut s| {
     269           10 :                 if !s.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     270            0 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     271           10 :                 }
     272           10 :                 s
     273           10 :             })
     274           26 :         });
     275           26 : 
     276           26 :         async_stream::stream! {
     277           26 :             let _permit = self.permit(request_kind, cancel).await?;
     278           26 : 
     279           26 :             let mut builder = self.client.list_blobs();
     280           26 : 
     281           26 :             if let ListingMode::WithDelimiter = mode {
     282           26 :                 builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     283           26 :             }
     284           26 : 
     285           26 :             if let Some(prefix) = list_prefix {
     286           26 :                 builder = builder.prefix(Cow::from(prefix.to_owned()));
     287           26 :             }
     288           26 : 
     289           26 :             if let Some(limit) = self.max_keys_per_list_response {
     290           26 :                 builder = builder.max_results(MaxResults::new(limit));
     291           26 :             }
     292           26 : 
     293           26 :             builder = customize_builder(builder);
     294           26 : 
     295           26 :             let mut next_marker = None;
     296           26 : 
     297           26 :             let mut timeout_try_cnt = 1;
     298           26 : 
     299           26 :             'outer: loop {
     300           26 :                 let mut builder = builder.clone();
     301           26 :                 if let Some(marker) = next_marker.clone() {
     302           26 :                     builder = builder.marker(marker);
     303           26 :                 }
     304           26 :                 // Azure Blob Rust SDK does not expose the list blob API directly. Users have to use
     305           26 :                 // their pageable iterator wrapper that returns all keys as a stream. We want to have
     306           26 :                 // full control of paging, and therefore we only take the first item from the stream.
     307           26 :                 let mut response_stream = builder.into_stream();
     308           26 :                 let response = response_stream.next();
     309           26 :                 // Timeout mechanism: Azure client will sometimes stuck on a request, but retrying that request
     310           26 :                 // would immediately succeed. Therefore, we use exponential backoff timeout to retry the request.
     311           26 :                 // (Usually, exponential backoff is used to determine the sleep time between two retries.) We
     312           26 :                 // start with 10.0 second timeout, and double the timeout for each failure, up to 5 failures.
     313           26 :                 // timeout = min(5 * (1.0+1.0)^n, self.timeout).
     314           26 :                 let this_timeout = (5.0 * exponential_backoff_duration_seconds(timeout_try_cnt, 1.0, self.timeout.as_secs_f64())).min(self.timeout.as_secs_f64());
     315           26 :                 let response = tokio::time::timeout(Duration::from_secs_f64(this_timeout), response);
     316           45 :                 let response = response.map(|res| {
     317           45 :                     match res {
     318           45 :                         Ok(Some(Ok(res))) => Ok(Some(res)),
     319           26 :                         Ok(Some(Err(e)))  => Err(to_download_error(e)),
     320           26 :                         Ok(None) => Ok(None),
     321           26 :                         Err(_elasped) => Err(DownloadError::Timeout),
     322           26 :                     }
     323           45 :                 });
     324           26 :                 let mut max_keys = max_keys.map(|mk| mk.get());
     325           26 :                 let next_item = tokio::select! {
     326           26 :                     op = response => op,
     327           26 :                     _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     328           26 :                 };
     329           26 : 
     330           26 :                 if let Err(DownloadError::Timeout) = &next_item {
     331           26 :                     timeout_try_cnt += 1;
     332           26 :                     if timeout_try_cnt <= 5 {
     333           26 :                         continue 'outer;
     334           26 :                     }
     335           26 :                 }
     336           26 : 
     337           26 :                 let next_item = match next_item {
     338           26 :                     Ok(next_item) => next_item,
     339           26 :                     Err(e) => {
     340           26 :                         // The error is potentially retryable, so we must rewind the loop after yielding.
     341           26 :                         yield Err(e);
     342           26 :                         continue 'outer;
     343           26 :                     },
     344           26 :                 };
     345           26 : 
     346           26 :                 // Log a warning if we saw two timeouts in a row before a successful request
     347           26 :                 if timeout_try_cnt > 2 {
     348           26 :                     tracing::warn!("Azure Blob Storage list timed out and succeeded after {} tries", timeout_try_cnt);
     349           26 :                 }
     350           26 :                 timeout_try_cnt = 1;
     351           26 : 
     352           26 :                 let Some(entry) = next_item else {
     353           26 :                     // The list is complete, so yield it.
     354           26 :                     break;
     355           26 :                 };
     356           26 : 
     357           26 :                 let mut res = T::default();
     358           26 :                 next_marker = entry.continuation();
     359           26 :                 let prefix_iter = entry
     360           26 :                     .blobs
     361           26 :                     .prefixes()
     362           52 :                     .map(|prefix| self.name_to_relative_path(&prefix.name));
     363           26 :                 res.add_prefixes(self, prefix_iter);
     364           26 : 
     365           26 :                 let blob_iter = entry
     366           26 :                     .blobs
     367           26 :                     .blobs();
     368           26 : 
     369           26 :                 for key in blob_iter {
     370           26 :                     res.add_blob(self, key);
     371           26 : 
     372           26 :                     if let Some(mut mk) = max_keys {
     373           26 :                         assert!(mk > 0);
     374           26 :                         mk -= 1;
     375           26 :                         if mk == 0 {
     376           26 :                             yield Ok(res); // limit reached
     377           26 :                             break 'outer;
     378           26 :                         }
     379           26 :                         max_keys = Some(mk);
     380           26 :                     }
     381           26 :                 }
     382           26 :                 yield Ok(res);
     383           26 : 
     384           26 :                 // We are done here
     385           26 :                 if next_marker.is_none() {
     386           26 :                     break;
     387           26 :                 }
     388           26 :             }
     389           26 :         }
     390           26 :     }
     391              : 
     392          227 :     async fn permit(
     393          227 :         &self,
     394          227 :         kind: RequestKind,
     395          227 :         cancel: &CancellationToken,
     396          227 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     397          227 :         let acquire = self.concurrency_limiter.acquire(kind);
     398          227 : 
     399          227 :         tokio::select! {
     400          227 :             permit = acquire => Ok(permit.expect("never closed")),
     401          227 :             _ = cancel.cancelled() => Err(Cancelled),
     402              :         }
     403          227 :     }
     404              : 
     405            0 :     pub fn container_name(&self) -> &str {
     406            0 :         &self.container_name
     407            0 :     }
     408              : }
     409              : 
     410              : trait ListingCollector {
     411              :     fn add_prefixes(&mut self, abs: &AzureBlobStorage, prefix_it: impl Iterator<Item = RemotePath>);
     412              :     fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob);
     413              : }
     414              : 
     415              : impl ListingCollector for Listing {
     416           45 :     fn add_prefixes(
     417           45 :         &mut self,
     418           45 :         _abs: &AzureBlobStorage,
     419           45 :         prefix_it: impl Iterator<Item = RemotePath>,
     420           45 :     ) {
     421           45 :         self.prefixes.extend(prefix_it);
     422           45 :     }
     423          197 :     fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
     424          197 :         self.keys.push(ListingObject {
     425          197 :             key: abs.name_to_relative_path(&blob.name),
     426          197 :             last_modified: blob.properties.last_modified.into(),
     427          197 :             size: blob.properties.content_length,
     428          197 :         });
     429          197 :     }
     430              : }
     431              : 
     432              : impl ListingCollector for crate::VersionListing {
     433            0 :     fn add_prefixes(
     434            0 :         &mut self,
     435            0 :         _abs: &AzureBlobStorage,
     436            0 :         _prefix_it: impl Iterator<Item = RemotePath>,
     437            0 :     ) {
     438            0 :         // nothing
     439            0 :     }
     440            0 :     fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
     441            0 :         let id = crate::VersionId(blob.version_id.clone().expect("didn't find version ID"));
     442            0 :         self.versions.push(crate::Version {
     443            0 :             key: abs.name_to_relative_path(&blob.name),
     444            0 :             last_modified: blob.properties.last_modified.into(),
     445            0 :             kind: crate::VersionKind::Version(id),
     446            0 :         });
     447            0 :     }
     448              : }
     449              : 
     450            0 : fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
     451            0 :     let mut res = Metadata::new();
     452            0 :     for (k, v) in metadata.0.into_iter() {
     453            0 :         res.insert(k, v);
     454            0 :     }
     455            0 :     res
     456            0 : }
     457              : 
     458            3 : fn to_download_error(error: azure_core::Error) -> DownloadError {
     459            3 :     if let Some(http_err) = error.as_http_error() {
     460            3 :         match http_err.status() {
     461            1 :             StatusCode::NotFound => DownloadError::NotFound,
     462            2 :             StatusCode::NotModified => DownloadError::Unmodified,
     463            0 :             StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
     464            0 :             _ => DownloadError::Other(anyhow::Error::new(error)),
     465              :         }
     466              :     } else {
     467            0 :         DownloadError::Other(error.into())
     468              :     }
     469            3 : }
     470              : 
     471              : impl RemoteStorage for AzureBlobStorage {
     472           26 :     fn list_streaming(
     473           26 :         &self,
     474           26 :         prefix: Option<&RemotePath>,
     475           26 :         mode: ListingMode,
     476           26 :         max_keys: Option<NonZeroU32>,
     477           26 :         cancel: &CancellationToken,
     478           26 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     479           26 :         let customize_builder = |builder| builder;
     480           26 :         let kind = RequestKind::ListVersions;
     481           26 :         self.list_streaming_for_fn(prefix, mode, max_keys, cancel, kind, customize_builder)
     482           26 :     }
     483              : 
     484            0 :     async fn list_versions(
     485            0 :         &self,
     486            0 :         prefix: Option<&RemotePath>,
     487            0 :         mode: ListingMode,
     488            0 :         max_keys: Option<NonZeroU32>,
     489            0 :         cancel: &CancellationToken,
     490            0 :     ) -> std::result::Result<crate::VersionListing, DownloadError> {
     491            0 :         let customize_builder = |mut builder: ListBlobsBuilder| {
     492            0 :             builder = builder.include_versions(true);
     493            0 :             builder
     494            0 :         };
     495            0 :         let kind = RequestKind::ListVersions;
     496            0 : 
     497            0 :         let mut stream = std::pin::pin!(self.list_streaming_for_fn(
     498            0 :             prefix,
     499            0 :             mode,
     500            0 :             max_keys,
     501            0 :             cancel,
     502            0 :             kind,
     503            0 :             customize_builder
     504            0 :         ));
     505            0 :         let mut combined: crate::VersionListing =
     506            0 :             stream.next().await.expect("At least one item required")?;
     507            0 :         while let Some(list) = stream.next().await {
     508            0 :             let list = list?;
     509            0 :             combined.versions.extend(list.versions.into_iter());
     510              :         }
     511            0 :         Ok(combined)
     512            0 :     }
     513              : 
     514            3 :     async fn head_object(
     515            3 :         &self,
     516            3 :         key: &RemotePath,
     517            3 :         cancel: &CancellationToken,
     518            3 :     ) -> Result<ListingObject, DownloadError> {
     519            3 :         let kind = RequestKind::Head;
     520            3 :         let _permit = self.permit(kind, cancel).await?;
     521              : 
     522            3 :         let started_at = start_measuring_requests(kind);
     523            3 : 
     524            3 :         let blob_client = self.client.blob_client(self.relative_path_to_name(key));
     525            3 :         let properties_future = blob_client.get_properties().into_future();
     526            3 : 
     527            3 :         let properties_future = tokio::time::timeout(self.small_timeout, properties_future);
     528              : 
     529            3 :         let res = tokio::select! {
     530            3 :             res = properties_future => res,
     531            3 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     532              :         };
     533              : 
     534            3 :         if let Ok(inner) = &res {
     535            3 :             // do not incl. timeouts as errors in metrics but cancellations
     536            3 :             let started_at = ScopeGuard::into_inner(started_at);
     537            3 :             crate::metrics::BUCKET_METRICS
     538            3 :                 .req_seconds
     539            3 :                 .observe_elapsed(kind, inner, started_at);
     540            3 :         }
     541              : 
     542            3 :         let data = match res {
     543            2 :             Ok(Ok(data)) => Ok(data),
     544            1 :             Ok(Err(sdk)) => Err(to_download_error(sdk)),
     545            0 :             Err(_timeout) => Err(DownloadError::Timeout),
     546            1 :         }?;
     547              : 
     548            2 :         let properties = data.blob.properties;
     549            2 :         Ok(ListingObject {
     550            2 :             key: key.to_owned(),
     551            2 :             last_modified: SystemTime::from(properties.last_modified),
     552            2 :             size: properties.content_length,
     553            2 :         })
     554            3 :     }
     555              : 
     556           93 :     async fn upload(
     557           93 :         &self,
     558           93 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     559           93 :         data_size_bytes: usize,
     560           93 :         to: &RemotePath,
     561           93 :         metadata: Option<StorageMetadata>,
     562           93 :         cancel: &CancellationToken,
     563           93 :     ) -> anyhow::Result<()> {
     564           93 :         let kind = RequestKind::Put;
     565           93 :         let _permit = self.permit(kind, cancel).await?;
     566              : 
     567           93 :         let started_at = start_measuring_requests(kind);
     568           93 : 
     569           93 :         let op = async {
     570           93 :             let blob_client = self.client.blob_client(self.relative_path_to_name(to));
     571           93 : 
     572           93 :             let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
     573           93 :                 Box::pin(from);
     574           93 : 
     575           93 :             let from = NonSeekableStream::new(from, data_size_bytes);
     576           93 : 
     577           93 :             let body = azure_core::Body::SeekableStream(Box::new(from));
     578           93 : 
     579           93 :             let mut builder = blob_client.put_block_blob(body);
     580              : 
     581           93 :             if let Some(metadata) = metadata {
     582            0 :                 builder = builder.metadata(to_azure_metadata(metadata));
     583            0 :             }
     584              : 
     585           93 :             let fut = builder.into_future();
     586           93 :             let fut = tokio::time::timeout(self.timeout, fut);
     587           93 : 
     588           93 :             match fut.await {
     589           93 :                 Ok(Ok(_response)) => Ok(()),
     590            0 :                 Ok(Err(azure)) => Err(azure.into()),
     591            0 :                 Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     592              :             }
     593            0 :         };
     594              : 
     595           93 :         let res = tokio::select! {
     596           93 :             res = op => res,
     597           93 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     598              :         };
     599              : 
     600           93 :         let outcome = match res {
     601           93 :             Ok(_) => AttemptOutcome::Ok,
     602            0 :             Err(_) => AttemptOutcome::Err,
     603              :         };
     604           93 :         let started_at = ScopeGuard::into_inner(started_at);
     605           93 :         crate::metrics::BUCKET_METRICS
     606           93 :             .req_seconds
     607           93 :             .observe_elapsed(kind, outcome, started_at);
     608           93 : 
     609           93 :         res
     610            0 :     }
     611              : 
     612           11 :     async fn download(
     613           11 :         &self,
     614           11 :         from: &RemotePath,
     615           11 :         opts: &DownloadOpts,
     616           11 :         cancel: &CancellationToken,
     617           11 :     ) -> Result<Download, DownloadError> {
     618           11 :         let blob_client = self.client.blob_client(self.relative_path_to_name(from));
     619           11 : 
     620           11 :         let mut builder = blob_client.get();
     621              : 
     622           11 :         if let Some(ref etag) = opts.etag {
     623            3 :             builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()));
     624            8 :         }
     625              : 
     626           11 :         if let Some(ref version_id) = opts.version_id {
     627            0 :             let version_id = azure_storage_blobs::prelude::VersionId::new(version_id.0.clone());
     628            0 :             builder = builder.blob_versioning(version_id);
     629           11 :         }
     630              : 
     631           11 :         if let Some((start, end)) = opts.byte_range() {
     632            5 :             builder = builder.range(match end {
     633            3 :                 Some(end) => Range::Range(start..end),
     634            2 :                 None => Range::RangeFrom(start..),
     635              :             });
     636            6 :         }
     637              : 
     638           11 :         let timeout = match opts.kind {
     639            0 :             DownloadKind::Small => self.small_timeout,
     640           11 :             DownloadKind::Large => self.timeout,
     641              :         };
     642              : 
     643           11 :         self.download_for_builder(builder, timeout, cancel).await
     644           11 :     }
     645              : 
     646           86 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     647           86 :         self.delete_objects(std::array::from_ref(path), cancel)
     648           86 :             .await
     649           86 :     }
     650              : 
     651           93 :     async fn delete_objects(
     652           93 :         &self,
     653           93 :         paths: &[RemotePath],
     654           93 :         cancel: &CancellationToken,
     655           93 :     ) -> anyhow::Result<()> {
     656           93 :         let kind = RequestKind::Delete;
     657           93 :         let _permit = self.permit(kind, cancel).await?;
     658           93 :         let started_at = start_measuring_requests(kind);
     659           93 : 
     660           93 :         let op = async {
     661              :             // TODO batch requests are not supported by the SDK
     662              :             // https://github.com/Azure/azure-sdk-for-rust/issues/1068
     663          205 :             for path in paths {
     664          112 :                 #[derive(Debug)]
     665          112 :                 enum AzureOrTimeout {
     666          112 :                     AzureError(azure_core::Error),
     667          112 :                     Timeout,
     668          112 :                     Cancel,
     669          112 :                 }
     670          112 :                 impl Display for AzureOrTimeout {
     671            0 :                     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     672            0 :                         write!(f, "{self:?}")
     673            0 :                     }
     674              :                 }
     675          112 :                 let warn_threshold = 3;
     676          112 :                 let max_retries = 5;
     677          112 :                 backoff::retry(
     678          112 :                     || async {
     679          112 :                         let blob_client = self.client.blob_client(self.relative_path_to_name(path));
     680          112 : 
     681          112 :                         let request = blob_client.delete().into_future();
     682              : 
     683          112 :                         let res = tokio::time::timeout(self.timeout, request).await;
     684              : 
     685          112 :                         match res {
     686           90 :                             Ok(Ok(_v)) => Ok(()),
     687           22 :                             Ok(Err(azure_err)) => {
     688           22 :                                 if let Some(http_err) = azure_err.as_http_error() {
     689           22 :                                     if http_err.status() == StatusCode::NotFound {
     690           22 :                                         return Ok(());
     691            0 :                                     }
     692            0 :                                 }
     693            0 :                                 Err(AzureOrTimeout::AzureError(azure_err))
     694              :                             }
     695            0 :                             Err(_elapsed) => Err(AzureOrTimeout::Timeout),
     696              :                         }
     697          224 :                     },
     698          112 :                     |err| match err {
     699            0 :                         AzureOrTimeout::AzureError(_) | AzureOrTimeout::Timeout => false,
     700            0 :                         AzureOrTimeout::Cancel => true,
     701          112 :                     },
     702          112 :                     warn_threshold,
     703          112 :                     max_retries,
     704          112 :                     "deleting remote object",
     705          112 :                     cancel,
     706          112 :                 )
     707          112 :                 .await
     708          112 :                 .ok_or_else(|| AzureOrTimeout::Cancel)
     709          112 :                 .and_then(|x| x)
     710          112 :                 .map_err(|e| match e {
     711            0 :                     AzureOrTimeout::AzureError(err) => anyhow::Error::from(err),
     712            0 :                     AzureOrTimeout::Timeout => TimeoutOrCancel::Timeout.into(),
     713            0 :                     AzureOrTimeout::Cancel => TimeoutOrCancel::Cancel.into(),
     714          112 :                 })?;
     715              :             }
     716           93 :             Ok(())
     717           93 :         };
     718              : 
     719           93 :         let res = tokio::select! {
     720           93 :             res = op => res,
     721           93 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     722              :         };
     723              : 
     724           93 :         let started_at = ScopeGuard::into_inner(started_at);
     725           93 :         crate::metrics::BUCKET_METRICS
     726           93 :             .req_seconds
     727           93 :             .observe_elapsed(kind, &res, started_at);
     728           93 :         res
     729           93 :     }
     730              : 
     731            0 :     fn max_keys_per_delete(&self) -> usize {
     732            0 :         super::MAX_KEYS_PER_DELETE_AZURE
     733            0 :     }
     734              : 
     735            1 :     async fn copy(
     736            1 :         &self,
     737            1 :         from: &RemotePath,
     738            1 :         to: &RemotePath,
     739            1 :         cancel: &CancellationToken,
     740            1 :     ) -> anyhow::Result<()> {
     741            1 :         let kind = RequestKind::Copy;
     742            1 :         let _permit = self.permit(kind, cancel).await?;
     743            1 :         let started_at = start_measuring_requests(kind);
     744            1 : 
     745            1 :         let timeout = tokio::time::sleep(self.timeout);
     746            1 : 
     747            1 :         let mut copy_status = None;
     748            1 : 
     749            1 :         let op = async {
     750            1 :             let blob_client = self.client.blob_client(self.relative_path_to_name(to));
     751              : 
     752            1 :             let source_url = format!(
     753            1 :                 "{}/{}",
     754            1 :                 self.client.url()?,
     755            1 :                 self.relative_path_to_name(from)
     756              :             );
     757              : 
     758            1 :             let builder = blob_client.copy(Url::from_str(&source_url)?);
     759            1 :             let copy = builder.into_future();
     760              : 
     761            1 :             let result = copy.await?;
     762              : 
     763            1 :             copy_status = Some(result.copy_status);
     764              :             loop {
     765            1 :                 match copy_status.as_ref().expect("we always set it to Some") {
     766              :                     CopyStatus::Aborted => {
     767            0 :                         anyhow::bail!("Received abort for copy from {from} to {to}.");
     768              :                     }
     769              :                     CopyStatus::Failed => {
     770            0 :                         anyhow::bail!("Received failure response for copy from {from} to {to}.");
     771              :                     }
     772            1 :                     CopyStatus::Success => return Ok(()),
     773            0 :                     CopyStatus::Pending => (),
     774            0 :                 }
     775            0 :                 // The copy is taking longer. Waiting a second and then re-trying.
     776            0 :                 // TODO estimate time based on copy_progress and adjust time based on that
     777            0 :                 tokio::time::sleep(Duration::from_millis(1000)).await;
     778            0 :                 let properties = blob_client.get_properties().into_future().await?;
     779            0 :                 let Some(status) = properties.blob.properties.copy_status else {
     780            0 :                     tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
     781            0 :                     return Ok(());
     782              :                 };
     783            0 :                 copy_status = Some(status);
     784              :             }
     785            1 :         };
     786              : 
     787            1 :         let res = tokio::select! {
     788            1 :             res = op => res,
     789            1 :             _ = cancel.cancelled() => return Err(anyhow::Error::new(TimeoutOrCancel::Cancel)),
     790            1 :             _ = timeout => {
     791            0 :                 let e = anyhow::Error::new(TimeoutOrCancel::Timeout);
     792            0 :                 let e = e.context(format!("Timeout, last status: {copy_status:?}"));
     793            0 :                 Err(e)
     794              :             },
     795              :         };
     796              : 
     797            1 :         let started_at = ScopeGuard::into_inner(started_at);
     798            1 :         crate::metrics::BUCKET_METRICS
     799            1 :             .req_seconds
     800            1 :             .observe_elapsed(kind, &res, started_at);
     801            1 :         res
     802            1 :     }
     803              : 
     804            0 :     async fn time_travel_recover(
     805            0 :         &self,
     806            0 :         _prefix: Option<&RemotePath>,
     807            0 :         _timestamp: SystemTime,
     808            0 :         _done_if_after: SystemTime,
     809            0 :         _cancel: &CancellationToken,
     810            0 :     ) -> Result<(), TimeTravelError> {
     811            0 :         // TODO use Azure point in time recovery feature for this
     812            0 :         // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
     813            0 :         Err(TimeTravelError::Unimplemented)
     814            0 :     }
     815              : }
     816              : 
     817              : pin_project_lite::pin_project! {
     818              :     /// Hack to work around not being able to stream once with azure sdk.
     819              :     ///
     820              :     /// Azure sdk clones streams around with the assumption that they are like
     821              :     /// `Arc<tokio::fs::File>` (except not supporting tokio), however our streams are not like
     822              :     /// that. For example for an `index_part.json` we just have a single chunk of [`Bytes`]
     823              :     /// representing the whole serialized vec. It could be trivially cloneable and "semi-trivially"
     824              :     /// seekable, but we can also just re-try the request easier.
     825              :     #[project = NonSeekableStreamProj]
     826              :     enum NonSeekableStream<S> {
     827              :         /// A stream wrappers initial form.
     828              :         ///
     829              :         /// Mutex exists to allow moving when cloning. If the sdk changes to do less than 1
     830              :         /// clone before first request, then this must be changed.
     831              :         Initial {
     832              :             inner: std::sync::Mutex<Option<tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>>>,
     833              :             len: usize,
     834              :         },
     835              :         /// The actually readable variant, produced by cloning the Initial variant.
     836              :         ///
     837              :         /// The sdk currently always clones once, even without retry policy.
     838              :         Actual {
     839              :             #[pin]
     840              :             inner: tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>,
     841              :             len: usize,
     842              :             read_any: bool,
     843              :         },
     844              :         /// Most likely unneeded, but left to make life easier, in case more clones are added.
     845              :         Cloned {
     846              :             len_was: usize,
     847              :         }
     848              :     }
     849              : }
     850              : 
     851              : impl<S> NonSeekableStream<S>
     852              : where
     853              :     S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     854              : {
     855           93 :     fn new(inner: S, len: usize) -> NonSeekableStream<S> {
     856              :         use tokio_util::compat::TokioAsyncReadCompatExt;
     857              : 
     858           93 :         let inner = tokio_util::io::StreamReader::new(inner).compat();
     859           93 :         let inner = Some(inner);
     860           93 :         let inner = std::sync::Mutex::new(inner);
     861           93 :         NonSeekableStream::Initial { inner, len }
     862           93 :     }
     863              : }
     864              : 
     865              : impl<S> std::fmt::Debug for NonSeekableStream<S> {
     866            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     867            0 :         match self {
     868            0 :             Self::Initial { len, .. } => f.debug_struct("Initial").field("len", len).finish(),
     869            0 :             Self::Actual { len, .. } => f.debug_struct("Actual").field("len", len).finish(),
     870            0 :             Self::Cloned { len_was, .. } => f.debug_struct("Cloned").field("len", len_was).finish(),
     871              :         }
     872            0 :     }
     873              : }
     874              : 
     875              : impl<S> futures::io::AsyncRead for NonSeekableStream<S>
     876              : where
     877              :     S: Stream<Item = std::io::Result<Bytes>>,
     878              : {
     879           93 :     fn poll_read(
     880           93 :         self: std::pin::Pin<&mut Self>,
     881           93 :         cx: &mut std::task::Context<'_>,
     882           93 :         buf: &mut [u8],
     883           93 :     ) -> std::task::Poll<std::io::Result<usize>> {
     884           93 :         match self.project() {
     885              :             NonSeekableStreamProj::Actual {
     886           93 :                 inner, read_any, ..
     887           93 :             } => {
     888           93 :                 *read_any = true;
     889           93 :                 inner.poll_read(cx, buf)
     890              :             }
     891              :             // NonSeekableStream::Initial does not support reading because it is just much easier
     892              :             // to have the mutex in place where one does not poll the contents, or that's how it
     893              :             // seemed originally. If there is a version upgrade which changes the cloning, then
     894              :             // that support needs to be hacked in.
     895              :             //
     896              :             // including {self:?} into the message would be useful, but unsure how to unproject.
     897            0 :             _ => std::task::Poll::Ready(Err(std::io::Error::other(
     898            0 :                 "cloned or initial values cannot be read",
     899            0 :             ))),
     900              :         }
     901            0 :     }
     902              : }
     903              : 
     904              : impl<S> Clone for NonSeekableStream<S> {
     905              :     /// Weird clone implementation exists to support the sdk doing cloning before issuing the first
     906              :     /// request, see type documentation.
     907           93 :     fn clone(&self) -> Self {
     908              :         use NonSeekableStream::*;
     909              : 
     910           93 :         match self {
     911           93 :             Initial { inner, len } => {
     912           93 :                 if let Some(inner) = inner.lock().unwrap().take() {
     913           93 :                     Actual {
     914           93 :                         inner,
     915           93 :                         len: *len,
     916           93 :                         read_any: false,
     917           93 :                     }
     918              :                 } else {
     919            0 :                     Self::Cloned { len_was: *len }
     920              :                 }
     921              :             }
     922            0 :             Actual { len, .. } => Cloned { len_was: *len },
     923            0 :             Cloned { len_was } => Cloned { len_was: *len_was },
     924              :         }
     925            0 :     }
     926              : }
     927              : 
     928              : #[async_trait::async_trait]
     929              : impl<S> azure_core::SeekableStream for NonSeekableStream<S>
     930              : where
     931              :     S: Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync + 'static,
     932              : {
     933            0 :     async fn reset(&mut self) -> azure_core::error::Result<()> {
     934              :         use NonSeekableStream::*;
     935              : 
     936            0 :         let msg = match self {
     937            0 :             Initial { inner, .. } => {
     938            0 :                 if inner.get_mut().unwrap().is_some() {
     939            0 :                     return Ok(());
     940              :                 } else {
     941            0 :                     "reset after first clone is not supported"
     942              :                 }
     943              :             }
     944            0 :             Actual { read_any, .. } if !*read_any => return Ok(()),
     945            0 :             Actual { .. } => "reset after reading is not supported",
     946            0 :             Cloned { .. } => "reset after second clone is not supported",
     947              :         };
     948            0 :         Err(azure_core::error::Error::new(
     949            0 :             azure_core::error::ErrorKind::Io,
     950            0 :             std::io::Error::other(msg),
     951            0 :         ))
     952            0 :     }
     953              : 
     954              :     // Note: it is not documented if this should be the total or remaining length, total passes the
     955              :     // tests.
     956           93 :     fn len(&self) -> usize {
     957              :         use NonSeekableStream::*;
     958           93 :         match self {
     959           93 :             Initial { len, .. } => *len,
     960            0 :             Actual { len, .. } => *len,
     961            0 :             Cloned { len_was, .. } => *len_was,
     962              :         }
     963            0 :     }
     964              : }
        

Generated by: LCOV version 2.1-beta