LCOV - code coverage report
Current view: top level - libs/remote_storage/src - azure_blob.rs (source / functions) Coverage Total Hit
Test: ac1e0b9bf1b4ead74961174b01ba016322d3f9a6.info Lines: 58.1 % 657 382
Test Date: 2025-07-08 09:16:10 Functions: 43.6 % 101 44

            Line data    Source code
       1              : //! Azure Blob Storage wrapper
       2              : 
       3              : use std::borrow::Cow;
       4              : use std::collections::HashMap;
       5              : use std::fmt::Display;
       6              : use std::num::NonZeroU32;
       7              : use std::pin::Pin;
       8              : use std::str::FromStr;
       9              : use std::sync::Arc;
      10              : use std::time::{Duration, SystemTime};
      11              : use std::{env, io};
      12              : 
      13              : use anyhow::{Context, Result, anyhow};
      14              : use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
      15              : use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
      16              : use azure_storage::StorageCredentials;
      17              : use azure_storage_blobs::blob::operations::GetBlobBuilder;
      18              : use azure_storage_blobs::blob::{Blob, CopyStatus};
      19              : use azure_storage_blobs::container::operations::ListBlobsBuilder;
      20              : use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
      21              : use bytes::Bytes;
      22              : use futures::FutureExt;
      23              : use futures::future::Either;
      24              : use futures::stream::Stream;
      25              : use futures_util::{StreamExt, TryStreamExt};
      26              : use http_types::{StatusCode, Url};
      27              : use scopeguard::ScopeGuard;
      28              : use tokio_util::sync::CancellationToken;
      29              : use tracing::debug;
      30              : use utils::backoff;
      31              : use utils::backoff::exponential_backoff_duration_seconds;
      32              : 
      33              : use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
      34              : use crate::config::AzureConfig;
      35              : use crate::error::Cancelled;
      36              : use crate::metrics::{AttemptOutcome, RequestKind, start_measuring_requests};
      37              : use crate::{
      38              :     ConcurrencyLimiter, Download, DownloadError, DownloadKind, DownloadOpts, Listing, ListingMode,
      39              :     ListingObject, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
      40              :     Version, VersionKind,
      41              : };
      42              : 
      43              : pub struct AzureBlobStorage {
      44              :     client: ContainerClient,
      45              :     container_name: String,
      46              :     prefix_in_container: Option<String>,
      47              :     max_keys_per_list_response: Option<NonZeroU32>,
      48              :     concurrency_limiter: ConcurrencyLimiter,
      49              :     // Per-request timeout. Accessible for tests.
      50              :     pub timeout: Duration,
      51              : 
      52              :     // Alternative timeout used for metadata objects which are expected to be small
      53              :     pub small_timeout: Duration,
      54              : }
      55              : 
      56              : impl AzureBlobStorage {
      57           10 :     pub fn new(
      58           10 :         azure_config: &AzureConfig,
      59           10 :         timeout: Duration,
      60           10 :         small_timeout: Duration,
      61           10 :     ) -> Result<Self> {
      62           10 :         debug!(
      63            0 :             "Creating azure remote storage for azure container {}",
      64              :             azure_config.container_name
      65              :         );
      66              : 
      67              :         // Use the storage account from the config by default, fall back to env var if not present.
      68           10 :         let account = azure_config.storage_account.clone().unwrap_or_else(|| {
      69           10 :             env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT")
      70           10 :         });
      71              : 
      72              :         // If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
      73              :         // otherwise try the token based credentials.
      74           10 :         let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
      75           10 :             StorageCredentials::access_key(account.clone(), access_key)
      76              :         } else {
      77            0 :             let token_credential = azure_identity::create_default_credential()
      78            0 :                 .context("trying to obtain Azure default credentials")?;
      79            0 :             StorageCredentials::token_credential(token_credential)
      80              :         };
      81              : 
      82           10 :         let builder = ClientBuilder::new(account, credentials)
      83              :             // we have an outer retry
      84           10 :             .retry(RetryOptions::none())
      85              :             // Customize transport to configure conneciton pooling
      86           10 :             .transport(TransportOptions::new(Self::reqwest_client(
      87           10 :                 azure_config.conn_pool_size,
      88           10 :             )));
      89              : 
      90           10 :         let client = builder.container_client(azure_config.container_name.to_owned());
      91              : 
      92           10 :         let max_keys_per_list_response =
      93           10 :             if let Some(limit) = azure_config.max_keys_per_list_response {
      94              :                 Some(
      95            4 :                     NonZeroU32::new(limit as u32)
      96            4 :                         .ok_or_else(|| anyhow::anyhow!("max_keys_per_list_response can't be 0"))?,
      97              :                 )
      98              :             } else {
      99            6 :                 None
     100              :             };
     101              : 
     102           10 :         Ok(AzureBlobStorage {
     103           10 :             client,
     104           10 :             container_name: azure_config.container_name.to_owned(),
     105           10 :             prefix_in_container: azure_config.prefix_in_container.to_owned(),
     106           10 :             max_keys_per_list_response,
     107           10 :             concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
     108           10 :             timeout,
     109           10 :             small_timeout,
     110           10 :         })
     111           10 :     }
     112              : 
     113           10 :     fn reqwest_client(conn_pool_size: usize) -> Arc<dyn HttpClient> {
     114           10 :         let client = reqwest::ClientBuilder::new()
     115           10 :             .pool_max_idle_per_host(conn_pool_size)
     116           10 :             .build()
     117           10 :             .expect("failed to build `reqwest` client");
     118           10 :         Arc::new(client)
     119           10 :     }
     120              : 
     121          237 :     pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
     122          237 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
     123          237 :         let path_string = path.get_path().as_str();
     124          237 :         match &self.prefix_in_container {
     125          237 :             Some(prefix) => {
     126          237 :                 if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     127          237 :                     prefix.clone() + path_string
     128              :                 } else {
     129            0 :                     format!("{prefix}{REMOTE_STORAGE_PREFIX_SEPARATOR}{path_string}")
     130              :                 }
     131              :             }
     132            0 :             None => path_string.to_string(),
     133              :         }
     134          237 :     }
     135              : 
     136          249 :     fn name_to_relative_path(&self, key: &str) -> RemotePath {
     137          249 :         let relative_path =
     138          249 :             match key.strip_prefix(self.prefix_in_container.as_deref().unwrap_or_default()) {
     139          249 :                 Some(stripped) => stripped,
     140              :                 // we rely on Azure to return properly prefixed paths
     141              :                 // for requests with a certain prefix
     142            0 :                 None => panic!(
     143            0 :                     "Key {key} does not start with container prefix {:?}",
     144              :                     self.prefix_in_container
     145              :                 ),
     146              :             };
     147          249 :         RemotePath(
     148          249 :             relative_path
     149          249 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     150          249 :                 .collect(),
     151          249 :         )
     152          249 :     }
     153              : 
     154           11 :     async fn download_for_builder(
     155           11 :         &self,
     156           11 :         builder: GetBlobBuilder,
     157           11 :         timeout: Duration,
     158           11 :         cancel: &CancellationToken,
     159           11 :     ) -> Result<Download, DownloadError> {
     160           11 :         let kind = RequestKind::Get;
     161              : 
     162           11 :         let _permit = self.permit(kind, cancel).await?;
     163           11 :         let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     164           11 :         let cancel_or_timeout_ = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
     165              : 
     166           11 :         let mut etag = None;
     167           11 :         let mut last_modified = None;
     168           11 :         let mut metadata = HashMap::new();
     169              : 
     170           11 :         let started_at = start_measuring_requests(kind);
     171              : 
     172           11 :         let download = async {
     173           11 :             let response = builder
     174              :                 // convert to concrete Pageable
     175           11 :                 .into_stream()
     176              :                 // convert to TryStream
     177           11 :                 .into_stream()
     178           11 :                 .map_err(to_download_error);
     179              : 
     180              :             // apply per request timeout
     181           11 :             let response = tokio_stream::StreamExt::timeout(response, timeout);
     182              : 
     183              :             // flatten
     184           11 :             let response = response.map(|res| match res {
     185           11 :                 Ok(res) => res,
     186            0 :                 Err(_elapsed) => Err(DownloadError::Timeout),
     187           11 :             });
     188              : 
     189           11 :             let mut response = Box::pin(response);
     190              : 
     191           11 :             let Some(part) = response.next().await else {
     192            0 :                 return Err(DownloadError::Other(anyhow::anyhow!(
     193            0 :                     "Azure GET response contained no response body"
     194            0 :                 )));
     195              :             };
     196           11 :             let part = part?;
     197            9 :             if etag.is_none() {
     198            9 :                 etag = Some(part.blob.properties.etag);
     199            9 :             }
     200            9 :             if last_modified.is_none() {
     201            9 :                 last_modified = Some(part.blob.properties.last_modified.into());
     202            9 :             }
     203            9 :             if let Some(blob_meta) = part.blob.metadata {
     204            0 :                 metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned())));
     205            9 :             }
     206              : 
     207              :             // unwrap safety: if these were None, bufs would be empty and we would have returned an error already
     208            9 :             let etag = etag.unwrap();
     209            9 :             let last_modified = last_modified.unwrap();
     210              : 
     211            9 :             let tail_stream = response
     212            9 :                 .map(|part| match part {
     213            0 :                     Ok(part) => Either::Left(part.data.map(|r| r.map_err(io::Error::other))),
     214            0 :                     Err(e) => {
     215            0 :                         Either::Right(futures::stream::once(async { Err(io::Error::other(e)) }))
     216              :                     }
     217            0 :                 })
     218            9 :                 .flatten();
     219            9 :             let stream = part
     220            9 :                 .data
     221            9 :                 .map(|r| r.map_err(io::Error::other))
     222            9 :                 .chain(sync_wrapper::SyncStream::new(tail_stream));
     223              :             //.chain(SyncStream::from_pin(Box::pin(tail_stream)));
     224              : 
     225            9 :             let download_stream = crate::support::DownloadStream::new(cancel_or_timeout_, stream);
     226              : 
     227            9 :             Ok(Download {
     228            9 :                 download_stream: Box::pin(download_stream),
     229            9 :                 etag,
     230            9 :                 last_modified,
     231            9 :                 metadata: Some(StorageMetadata(metadata)),
     232            9 :             })
     233           11 :         };
     234              : 
     235           11 :         let download = tokio::select! {
     236           11 :             bufs = download => bufs,
     237           11 :             cancel_or_timeout = cancel_or_timeout => match cancel_or_timeout {
     238            0 :                 TimeoutOrCancel::Timeout => return Err(DownloadError::Timeout),
     239            0 :                 TimeoutOrCancel::Cancel => return Err(DownloadError::Cancelled),
     240              :             },
     241              :         };
     242           11 :         let started_at = ScopeGuard::into_inner(started_at);
     243           11 :         let outcome = match &download {
     244            9 :             Ok(_) => AttemptOutcome::Ok,
     245              :             // At this level in the stack 404 and 304 responses do not indicate an error.
     246              :             // There's expected cases when a blob may not exist or hasn't been modified since
     247              :             // the last get (e.g. probing for timeline indices and heatmap downloads).
     248              :             // Callers should handle errors if they are unexpected.
     249            2 :             Err(DownloadError::NotFound | DownloadError::Unmodified) => AttemptOutcome::Ok,
     250            0 :             Err(_) => AttemptOutcome::Err,
     251              :         };
     252           11 :         crate::metrics::BUCKET_METRICS
     253           11 :             .req_seconds
     254           11 :             .observe_elapsed(kind, outcome, started_at);
     255           11 :         download
     256           11 :     }
     257              : 
     258           26 :     fn list_streaming_for_fn<T: Default + ListingCollector>(
     259           26 :         &self,
     260           26 :         prefix: Option<&RemotePath>,
     261           26 :         mode: ListingMode,
     262           26 :         max_keys: Option<NonZeroU32>,
     263           26 :         cancel: &CancellationToken,
     264           26 :         request_kind: RequestKind,
     265           26 :         customize_builder: impl Fn(ListBlobsBuilder) -> ListBlobsBuilder,
     266           26 :     ) -> impl Stream<Item = Result<T, DownloadError>> {
     267              :         // get the passed prefix or if it is not set use prefix_in_bucket value
     268           26 :         let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
     269           10 :             self.prefix_in_container.clone().map(|mut s| {
     270           10 :                 if !s.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     271            0 :                     s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     272           10 :                 }
     273           10 :                 s
     274           10 :             })
     275           10 :         });
     276              : 
     277           26 :         async_stream::stream! {
     278              :             let _permit = self.permit(request_kind, cancel).await?;
     279              : 
     280              :             let mut builder = self.client.list_blobs();
     281              : 
     282           26 :             if let ListingMode::WithDelimiter = mode {
     283              :                 builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     284              :             }
     285              : 
     286           26 :             if let Some(prefix) = list_prefix {
     287              :                 builder = builder.prefix(Cow::from(prefix.to_owned()));
     288              :             }
     289              : 
     290           26 :             if let Some(limit) = self.max_keys_per_list_response {
     291              :                 builder = builder.max_results(MaxResults::new(limit));
     292              :             }
     293              : 
     294              :             builder = customize_builder(builder);
     295              : 
     296              :             let mut next_marker = None;
     297              : 
     298              :             let mut timeout_try_cnt = 1;
     299              : 
     300              :             'outer: loop {
     301              :                 let mut builder = builder.clone();
     302              :                 if let Some(marker) = next_marker.clone() {
     303              :                     builder = builder.marker(marker);
     304              :                 }
     305              :                 // Azure Blob Rust SDK does not expose the list blob API directly. Users have to use
     306              :                 // their pageable iterator wrapper that returns all keys as a stream. We want to have
     307              :                 // full control of paging, and therefore we only take the first item from the stream.
     308              :                 let mut response_stream = builder.into_stream();
     309              :                 let response = response_stream.next();
     310              :                 // Timeout mechanism: Azure client will sometimes stuck on a request, but retrying that request
     311              :                 // would immediately succeed. Therefore, we use exponential backoff timeout to retry the request.
     312              :                 // (Usually, exponential backoff is used to determine the sleep time between two retries.) We
     313              :                 // start with 10.0 second timeout, and double the timeout for each failure, up to 5 failures.
     314              :                 // timeout = min(5 * (1.0+1.0)^n, self.timeout).
     315              :                 let this_timeout = (5.0 * exponential_backoff_duration_seconds(timeout_try_cnt, 1.0, self.timeout.as_secs_f64())).min(self.timeout.as_secs_f64());
     316              :                 let response = tokio::time::timeout(Duration::from_secs_f64(this_timeout), response);
     317           45 :                 let response = response.map(|res| {
     318           45 :                     match res {
     319           45 :                         Ok(Some(Ok(res))) => Ok(Some(res)),
     320            0 :                         Ok(Some(Err(e)))  => Err(to_download_error(e)),
     321            0 :                         Ok(None) => Ok(None),
     322            0 :                         Err(_elasped) => Err(DownloadError::Timeout),
     323              :                     }
     324           45 :                 });
     325            1 :                 let mut max_keys = max_keys.map(|mk| mk.get());
     326              :                 let next_item = tokio::select! {
     327              :                     op = response => op,
     328              :                     _ = cancel.cancelled() => Err(DownloadError::Cancelled),
     329              :                 };
     330              : 
     331              :                 if let Err(DownloadError::Timeout) = &next_item {
     332              :                     timeout_try_cnt += 1;
     333              :                     if timeout_try_cnt <= 5 {
     334              :                         continue 'outer;
     335              :                     }
     336              :                 }
     337              : 
     338              :                 let next_item = match next_item {
     339              :                     Ok(next_item) => next_item,
     340              :                     Err(e) => {
     341              :                         // The error is potentially retryable, so we must rewind the loop after yielding.
     342              :                         yield Err(e);
     343              :                         continue 'outer;
     344              :                     },
     345              :                 };
     346              : 
     347              :                 // Log a warning if we saw two timeouts in a row before a successful request
     348              :                 if timeout_try_cnt > 2 {
     349              :                     tracing::warn!("Azure Blob Storage list timed out and succeeded after {} tries", timeout_try_cnt);
     350              :                 }
     351              :                 timeout_try_cnt = 1;
     352              : 
     353              :                 let Some(entry) = next_item else {
     354              :                     // The list is complete, so yield it.
     355              :                     break;
     356              :                 };
     357              : 
     358              :                 let mut res = T::default();
     359              :                 next_marker = entry.continuation();
     360              :                 let prefix_iter = entry
     361              :                     .blobs
     362              :                     .prefixes()
     363           52 :                     .map(|prefix| self.name_to_relative_path(&prefix.name));
     364              :                 res.add_prefixes(self, prefix_iter);
     365              : 
     366              :                 let blob_iter = entry
     367              :                     .blobs
     368              :                     .blobs();
     369              : 
     370              :                 for key in blob_iter {
     371              :                     res.add_blob(self, key);
     372              : 
     373              :                     if let Some(mut mk) = max_keys {
     374              :                         assert!(mk > 0);
     375              :                         mk -= 1;
     376              :                         if mk == 0 {
     377              :                             yield Ok(res); // limit reached
     378              :                             break 'outer;
     379              :                         }
     380              :                         max_keys = Some(mk);
     381              :                     }
     382              :                 }
     383              :                 yield Ok(res);
     384              : 
     385              :                 // We are done here
     386              :                 if next_marker.is_none() {
     387              :                     break;
     388              :                 }
     389              :             }
     390              :         }
     391           26 :     }
     392              : 
     393          227 :     async fn permit(
     394          227 :         &self,
     395          227 :         kind: RequestKind,
     396          227 :         cancel: &CancellationToken,
     397          227 :     ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
     398          227 :         let acquire = self.concurrency_limiter.acquire(kind);
     399              : 
     400          227 :         tokio::select! {
     401          227 :             permit = acquire => Ok(permit.expect("never closed")),
     402          227 :             _ = cancel.cancelled() => Err(Cancelled),
     403              :         }
     404          227 :     }
     405              : 
     406            0 :     pub fn container_name(&self) -> &str {
     407            0 :         &self.container_name
     408            0 :     }
     409              : 
     410            0 :     async fn list_versions_with_permit(
     411            0 :         &self,
     412            0 :         _permit: &tokio::sync::SemaphorePermit<'_>,
     413            0 :         prefix: Option<&RemotePath>,
     414            0 :         mode: ListingMode,
     415            0 :         max_keys: Option<NonZeroU32>,
     416            0 :         cancel: &CancellationToken,
     417            0 :     ) -> Result<crate::VersionListing, DownloadError> {
     418            0 :         let customize_builder = |mut builder: ListBlobsBuilder| {
     419            0 :             builder = builder.include_versions(true);
     420              :             // We do not return this info back to `VersionListing` yet.
     421            0 :             builder = builder.include_deleted(true);
     422            0 :             builder
     423            0 :         };
     424            0 :         let kind = RequestKind::ListVersions;
     425              : 
     426            0 :         let mut stream = std::pin::pin!(self.list_streaming_for_fn(
     427            0 :             prefix,
     428            0 :             mode,
     429            0 :             max_keys,
     430            0 :             cancel,
     431            0 :             kind,
     432            0 :             customize_builder
     433              :         ));
     434            0 :         let mut combined: crate::VersionListing =
     435            0 :             stream.next().await.expect("At least one item required")?;
     436            0 :         while let Some(list) = stream.next().await {
     437            0 :             let list = list?;
     438            0 :             combined.versions.extend(list.versions.into_iter());
     439              :         }
     440            0 :         Ok(combined)
     441            0 :     }
     442              : }
     443              : 
     444              : trait ListingCollector {
     445              :     fn add_prefixes(&mut self, abs: &AzureBlobStorage, prefix_it: impl Iterator<Item = RemotePath>);
     446              :     fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob);
     447              : }
     448              : 
     449              : impl ListingCollector for Listing {
     450           45 :     fn add_prefixes(
     451           45 :         &mut self,
     452           45 :         _abs: &AzureBlobStorage,
     453           45 :         prefix_it: impl Iterator<Item = RemotePath>,
     454           45 :     ) {
     455           45 :         self.prefixes.extend(prefix_it);
     456           45 :     }
     457          197 :     fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
     458          197 :         self.keys.push(ListingObject {
     459          197 :             key: abs.name_to_relative_path(&blob.name),
     460          197 :             last_modified: blob.properties.last_modified.into(),
     461          197 :             size: blob.properties.content_length,
     462          197 :         });
     463          197 :     }
     464              : }
     465              : 
     466              : impl ListingCollector for crate::VersionListing {
     467            0 :     fn add_prefixes(
     468            0 :         &mut self,
     469            0 :         _abs: &AzureBlobStorage,
     470            0 :         _prefix_it: impl Iterator<Item = RemotePath>,
     471            0 :     ) {
     472              :         // nothing
     473            0 :     }
     474            0 :     fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
     475            0 :         let id = crate::VersionId(blob.version_id.clone().expect("didn't find version ID"));
     476            0 :         self.versions.push(crate::Version {
     477            0 :             key: abs.name_to_relative_path(&blob.name),
     478            0 :             last_modified: blob.properties.last_modified.into(),
     479            0 :             kind: crate::VersionKind::Version(id),
     480            0 :         });
     481            0 :     }
     482              : }
     483              : 
     484            0 : fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
     485            0 :     let mut res = Metadata::new();
     486            0 :     for (k, v) in metadata.0.into_iter() {
     487            0 :         res.insert(k, v);
     488            0 :     }
     489            0 :     res
     490            0 : }
     491              : 
     492            3 : fn to_download_error(error: azure_core::Error) -> DownloadError {
     493            3 :     if let Some(http_err) = error.as_http_error() {
     494            3 :         match http_err.status() {
     495            1 :             StatusCode::NotFound => DownloadError::NotFound,
     496            2 :             StatusCode::NotModified => DownloadError::Unmodified,
     497            0 :             StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
     498            0 :             _ => DownloadError::Other(anyhow::Error::new(error)),
     499              :         }
     500              :     } else {
     501            0 :         DownloadError::Other(error.into())
     502              :     }
     503            3 : }
     504              : 
     505              : impl RemoteStorage for AzureBlobStorage {
     506           26 :     fn list_streaming(
     507           26 :         &self,
     508           26 :         prefix: Option<&RemotePath>,
     509           26 :         mode: ListingMode,
     510           26 :         max_keys: Option<NonZeroU32>,
     511           26 :         cancel: &CancellationToken,
     512           26 :     ) -> impl Stream<Item = Result<Listing, DownloadError>> {
     513           26 :         let customize_builder = |builder| builder;
     514           26 :         let kind = RequestKind::ListVersions;
     515           26 :         self.list_streaming_for_fn(prefix, mode, max_keys, cancel, kind, customize_builder)
     516           26 :     }
     517              : 
     518            0 :     async fn list_versions(
     519            0 :         &self,
     520            0 :         prefix: Option<&RemotePath>,
     521            0 :         mode: ListingMode,
     522            0 :         max_keys: Option<NonZeroU32>,
     523            0 :         cancel: &CancellationToken,
     524            0 :     ) -> std::result::Result<crate::VersionListing, DownloadError> {
     525            0 :         let kind = RequestKind::ListVersions;
     526            0 :         let permit = self.permit(kind, cancel).await?;
     527            0 :         self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
     528            0 :             .await
     529            0 :     }
     530              : 
     531            3 :     async fn head_object(
     532            3 :         &self,
     533            3 :         key: &RemotePath,
     534            3 :         cancel: &CancellationToken,
     535            3 :     ) -> Result<ListingObject, DownloadError> {
     536            3 :         let kind = RequestKind::Head;
     537            3 :         let _permit = self.permit(kind, cancel).await?;
     538              : 
     539            3 :         let started_at = start_measuring_requests(kind);
     540              : 
     541            3 :         let blob_client = self.client.blob_client(self.relative_path_to_name(key));
     542            3 :         let properties_future = blob_client.get_properties().into_future();
     543              : 
     544            3 :         let properties_future = tokio::time::timeout(self.small_timeout, properties_future);
     545              : 
     546            3 :         let res = tokio::select! {
     547            3 :             res = properties_future => res,
     548            3 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     549              :         };
     550              : 
     551            3 :         if let Ok(inner) = &res {
     552            3 :             // do not incl. timeouts as errors in metrics but cancellations
     553            3 :             let started_at = ScopeGuard::into_inner(started_at);
     554            3 :             crate::metrics::BUCKET_METRICS
     555            3 :                 .req_seconds
     556            3 :                 .observe_elapsed(kind, inner, started_at);
     557            3 :         }
     558              : 
     559            3 :         let data = match res {
     560            2 :             Ok(Ok(data)) => Ok(data),
     561            1 :             Ok(Err(sdk)) => Err(to_download_error(sdk)),
     562            0 :             Err(_timeout) => Err(DownloadError::Timeout),
     563            1 :         }?;
     564              : 
     565            2 :         let properties = data.blob.properties;
     566            2 :         Ok(ListingObject {
     567            2 :             key: key.to_owned(),
     568            2 :             last_modified: SystemTime::from(properties.last_modified),
     569            2 :             size: properties.content_length,
     570            2 :         })
     571            3 :     }
     572              : 
     573           93 :     async fn upload(
     574           93 :         &self,
     575           93 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     576           93 :         data_size_bytes: usize,
     577           93 :         to: &RemotePath,
     578           93 :         metadata: Option<StorageMetadata>,
     579           93 :         cancel: &CancellationToken,
     580           93 :     ) -> anyhow::Result<()> {
     581           93 :         let kind = RequestKind::Put;
     582           93 :         let _permit = self.permit(kind, cancel).await?;
     583              : 
     584           93 :         let started_at = start_measuring_requests(kind);
     585              : 
     586           93 :         let op = async {
     587           93 :             let blob_client = self.client.blob_client(self.relative_path_to_name(to));
     588              : 
     589           93 :             let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
     590           93 :                 Box::pin(from);
     591              : 
     592           93 :             let from = NonSeekableStream::new(from, data_size_bytes);
     593              : 
     594           93 :             let body = azure_core::Body::SeekableStream(Box::new(from));
     595              : 
     596           93 :             let mut builder = blob_client.put_block_blob(body);
     597              : 
     598           93 :             if let Some(metadata) = metadata {
     599            0 :                 builder = builder.metadata(to_azure_metadata(metadata));
     600            0 :             }
     601              : 
     602           93 :             let fut = builder.into_future();
     603           93 :             let fut = tokio::time::timeout(self.timeout, fut);
     604              : 
     605           93 :             match fut.await {
     606           93 :                 Ok(Ok(_response)) => Ok(()),
     607            0 :                 Ok(Err(azure)) => Err(azure.into()),
     608            0 :                 Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
     609              :             }
     610            0 :         };
     611              : 
     612           93 :         let res = tokio::select! {
     613           93 :             res = op => res,
     614           93 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     615              :         };
     616              : 
     617           93 :         let outcome = match res {
     618           93 :             Ok(_) => AttemptOutcome::Ok,
     619            0 :             Err(_) => AttemptOutcome::Err,
     620              :         };
     621           93 :         let started_at = ScopeGuard::into_inner(started_at);
     622           93 :         crate::metrics::BUCKET_METRICS
     623           93 :             .req_seconds
     624           93 :             .observe_elapsed(kind, outcome, started_at);
     625              : 
     626           93 :         res
     627            0 :     }
     628              : 
     629           11 :     async fn download(
     630           11 :         &self,
     631           11 :         from: &RemotePath,
     632           11 :         opts: &DownloadOpts,
     633           11 :         cancel: &CancellationToken,
     634           11 :     ) -> Result<Download, DownloadError> {
     635           11 :         let blob_client = self.client.blob_client(self.relative_path_to_name(from));
     636              : 
     637           11 :         let mut builder = blob_client.get();
     638              : 
     639           11 :         if let Some(ref etag) = opts.etag {
     640            3 :             builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()));
     641            8 :         }
     642              : 
     643           11 :         if let Some(ref version_id) = opts.version_id {
     644            0 :             let version_id = azure_storage_blobs::prelude::VersionId::new(version_id.0.clone());
     645            0 :             builder = builder.blob_versioning(version_id);
     646           11 :         }
     647              : 
     648           11 :         if let Some((start, end)) = opts.byte_range() {
     649            5 :             builder = builder.range(match end {
     650            3 :                 Some(end) => Range::Range(start..end),
     651            2 :                 None => Range::RangeFrom(start..),
     652              :             });
     653            6 :         }
     654              : 
     655           11 :         let timeout = match opts.kind {
     656            0 :             DownloadKind::Small => self.small_timeout,
     657           11 :             DownloadKind::Large => self.timeout,
     658              :         };
     659              : 
     660           11 :         self.download_for_builder(builder, timeout, cancel).await
     661           11 :     }
     662              : 
     663           86 :     async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
     664           86 :         self.delete_objects(std::array::from_ref(path), cancel)
     665           86 :             .await
     666           86 :     }
     667              : 
     668           93 :     async fn delete_objects(
     669           93 :         &self,
     670           93 :         paths: &[RemotePath],
     671           93 :         cancel: &CancellationToken,
     672           93 :     ) -> anyhow::Result<()> {
     673           93 :         let kind = RequestKind::Delete;
     674           93 :         let _permit = self.permit(kind, cancel).await?;
     675           93 :         let started_at = start_measuring_requests(kind);
     676              : 
     677           93 :         let op = async {
     678              :             // TODO batch requests are not supported by the SDK
     679              :             // https://github.com/Azure/azure-sdk-for-rust/issues/1068
     680          205 :             for path in paths {
     681              :                 #[derive(Debug)]
     682              :                 enum AzureOrTimeout {
     683              :                     AzureError(azure_core::Error),
     684              :                     Timeout,
     685              :                     Cancel,
     686              :                 }
     687              :                 impl Display for AzureOrTimeout {
     688            0 :                     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     689            0 :                         write!(f, "{self:?}")
     690            0 :                     }
     691              :                 }
     692          112 :                 let warn_threshold = 3;
     693          112 :                 let max_retries = 5;
     694          112 :                 backoff::retry(
     695          112 :                     || async {
     696          112 :                         let blob_client = self.client.blob_client(self.relative_path_to_name(path));
     697              : 
     698          112 :                         let request = blob_client.delete().into_future();
     699              : 
     700          112 :                         let res = tokio::time::timeout(self.timeout, request).await;
     701              : 
     702          112 :                         match res {
     703           90 :                             Ok(Ok(_v)) => Ok(()),
     704           22 :                             Ok(Err(azure_err)) => {
     705           22 :                                 if let Some(http_err) = azure_err.as_http_error() {
     706           22 :                                     if http_err.status() == StatusCode::NotFound {
     707           22 :                                         return Ok(());
     708            0 :                                     }
     709            0 :                                 }
     710            0 :                                 Err(AzureOrTimeout::AzureError(azure_err))
     711              :                             }
     712            0 :                             Err(_elapsed) => Err(AzureOrTimeout::Timeout),
     713              :                         }
     714          224 :                     },
     715            0 :                     |err| match err {
     716            0 :                         AzureOrTimeout::AzureError(_) | AzureOrTimeout::Timeout => false,
     717            0 :                         AzureOrTimeout::Cancel => true,
     718            0 :                     },
     719          112 :                     warn_threshold,
     720          112 :                     max_retries,
     721          112 :                     "deleting remote object",
     722          112 :                     cancel,
     723              :                 )
     724          112 :                 .await
     725          112 :                 .ok_or_else(|| AzureOrTimeout::Cancel)
     726          112 :                 .and_then(|x| x)
     727          112 :                 .map_err(|e| match e {
     728            0 :                     AzureOrTimeout::AzureError(err) => anyhow::Error::from(err),
     729            0 :                     AzureOrTimeout::Timeout => TimeoutOrCancel::Timeout.into(),
     730            0 :                     AzureOrTimeout::Cancel => TimeoutOrCancel::Cancel.into(),
     731            0 :                 })?;
     732              :             }
     733           93 :             Ok(())
     734           93 :         };
     735              : 
     736           93 :         let res = tokio::select! {
     737           93 :             res = op => res,
     738           93 :             _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
     739              :         };
     740              : 
     741           93 :         let started_at = ScopeGuard::into_inner(started_at);
     742           93 :         crate::metrics::BUCKET_METRICS
     743           93 :             .req_seconds
     744           93 :             .observe_elapsed(kind, &res, started_at);
     745           93 :         res
     746           93 :     }
     747              : 
     748            0 :     fn max_keys_per_delete(&self) -> usize {
     749            0 :         super::MAX_KEYS_PER_DELETE_AZURE
     750            0 :     }
     751              : 
     752            1 :     async fn copy(
     753            1 :         &self,
     754            1 :         from: &RemotePath,
     755            1 :         to: &RemotePath,
     756            1 :         cancel: &CancellationToken,
     757            1 :     ) -> anyhow::Result<()> {
     758            1 :         let kind = RequestKind::Copy;
     759            1 :         let _permit = self.permit(kind, cancel).await?;
     760            1 :         let started_at = start_measuring_requests(kind);
     761              : 
     762            1 :         let timeout = tokio::time::sleep(self.timeout);
     763              : 
     764            1 :         let mut copy_status = None;
     765              : 
     766            1 :         let op = async {
     767            1 :             let blob_client = self.client.blob_client(self.relative_path_to_name(to));
     768              : 
     769            1 :             let source_url = format!(
     770            1 :                 "{}/{}",
     771            1 :                 self.client.url()?,
     772            1 :                 self.relative_path_to_name(from)
     773              :             );
     774              : 
     775            1 :             let builder = blob_client.copy(Url::from_str(&source_url)?);
     776            1 :             let copy = builder.into_future();
     777              : 
     778            1 :             let result = copy.await?;
     779              : 
     780            1 :             copy_status = Some(result.copy_status);
     781              :             loop {
     782            1 :                 match copy_status.as_ref().expect("we always set it to Some") {
     783              :                     CopyStatus::Aborted => {
     784            0 :                         anyhow::bail!("Received abort for copy from {from} to {to}.");
     785              :                     }
     786              :                     CopyStatus::Failed => {
     787            0 :                         anyhow::bail!("Received failure response for copy from {from} to {to}.");
     788              :                     }
     789            1 :                     CopyStatus::Success => return Ok(()),
     790            0 :                     CopyStatus::Pending => (),
     791              :                 }
     792              :                 // The copy is taking longer. Waiting a second and then re-trying.
     793              :                 // TODO estimate time based on copy_progress and adjust time based on that
     794            0 :                 tokio::time::sleep(Duration::from_millis(1000)).await;
     795            0 :                 let properties = blob_client.get_properties().into_future().await?;
     796            0 :                 let Some(status) = properties.blob.properties.copy_status else {
     797            0 :                     tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
     798            0 :                     return Ok(());
     799              :                 };
     800            0 :                 copy_status = Some(status);
     801              :             }
     802            1 :         };
     803              : 
     804            1 :         let res = tokio::select! {
     805            1 :             res = op => res,
     806            1 :             _ = cancel.cancelled() => return Err(anyhow::Error::new(TimeoutOrCancel::Cancel)),
     807            1 :             _ = timeout => {
     808            0 :                 let e = anyhow::Error::new(TimeoutOrCancel::Timeout);
     809            0 :                 let e = e.context(format!("Timeout, last status: {copy_status:?}"));
     810            0 :                 Err(e)
     811              :             },
     812              :         };
     813              : 
     814            1 :         let started_at = ScopeGuard::into_inner(started_at);
     815            1 :         crate::metrics::BUCKET_METRICS
     816            1 :             .req_seconds
     817            1 :             .observe_elapsed(kind, &res, started_at);
     818            1 :         res
     819            1 :     }
     820              : 
     821            0 :     async fn time_travel_recover(
     822            0 :         &self,
     823            0 :         prefix: Option<&RemotePath>,
     824            0 :         timestamp: SystemTime,
     825            0 :         done_if_after: SystemTime,
     826            0 :         cancel: &CancellationToken,
     827            0 :         _complexity_limit: Option<NonZeroU32>,
     828            0 :     ) -> Result<(), TimeTravelError> {
     829            0 :         let msg = "PLEASE NOTE: Azure Blob storage time-travel recovery may not work as expected "
     830            0 :             .to_string()
     831            0 :             + "for some specific files. If a file gets deleted but then overwritten and we want to recover "
     832            0 :             + "to the time during the file was not present, this functionality will recover the file. Only "
     833            0 :             + "use the functionality for services that can tolerate this. For example, recovering a state of the "
     834            0 :             + "pageserver tenants.";
     835            0 :         tracing::error!("{}", msg);
     836              : 
     837            0 :         let kind = RequestKind::TimeTravel;
     838            0 :         let permit = self.permit(kind, cancel).await?;
     839              : 
     840            0 :         let mode = ListingMode::NoDelimiter;
     841            0 :         let version_listing = self
     842            0 :             .list_versions_with_permit(&permit, prefix, mode, None, cancel)
     843            0 :             .await
     844            0 :             .map_err(|err| match err {
     845            0 :                 DownloadError::Other(e) => TimeTravelError::Other(e),
     846            0 :                 DownloadError::Cancelled => TimeTravelError::Cancelled,
     847            0 :                 other => TimeTravelError::Other(other.into()),
     848            0 :             })?;
     849            0 :         let versions_and_deletes = version_listing.versions;
     850              : 
     851            0 :         tracing::info!(
     852            0 :             "Built list for time travel with {} versions and deletions",
     853            0 :             versions_and_deletes.len()
     854              :         );
     855              : 
     856              :         // Work on the list of references instead of the objects directly,
     857              :         // otherwise we get lifetime errors in the sort_by_key call below.
     858            0 :         let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
     859              : 
     860            0 :         versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
     861              : 
     862            0 :         let mut vds_for_key = HashMap::<_, Vec<_>>::new();
     863              : 
     864            0 :         for vd in &versions_and_deletes {
     865            0 :             let Version { key, .. } = &vd;
     866            0 :             let version_id = vd.version_id().map(|v| v.0.as_str());
     867            0 :             if version_id == Some("null") {
     868            0 :                 return Err(TimeTravelError::Other(anyhow!(
     869            0 :                     "Received ListVersions response for key={key} with version_id='null', \
     870            0 :                         indicating either disabled versioning, or legacy objects with null version id values"
     871            0 :                 )));
     872            0 :             }
     873            0 :             tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
     874              : 
     875            0 :             vds_for_key.entry(key).or_default().push(vd);
     876              :         }
     877              : 
     878            0 :         let warn_threshold = 3;
     879            0 :         let max_retries = 10;
     880            0 :         let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
     881              : 
     882            0 :         for (key, versions) in vds_for_key {
     883            0 :             let last_vd = versions.last().unwrap();
     884            0 :             let key = self.relative_path_to_name(key);
     885            0 :             if last_vd.last_modified > done_if_after {
     886            0 :                 tracing::debug!("Key {key} has version later than done_if_after, skipping");
     887            0 :                 continue;
     888            0 :             }
     889              :             // the version we want to restore to.
     890            0 :             let version_to_restore_to =
     891            0 :                 match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
     892            0 :                     Ok(v) => v,
     893            0 :                     Err(e) => e,
     894              :                 };
     895            0 :             if version_to_restore_to == versions.len() {
     896            0 :                 tracing::debug!("Key {key} has no changes since timestamp, skipping");
     897            0 :                 continue;
     898            0 :             }
     899            0 :             let mut do_delete = false;
     900            0 :             if version_to_restore_to == 0 {
     901              :                 // All versions more recent, so the key didn't exist at the specified time point.
     902            0 :                 tracing::debug!(
     903            0 :                     "All {} versions more recent for {key}, deleting",
     904            0 :                     versions.len()
     905              :                 );
     906            0 :                 do_delete = true;
     907              :             } else {
     908            0 :                 match &versions[version_to_restore_to - 1] {
     909              :                     Version {
     910            0 :                         kind: VersionKind::Version(version_id),
     911              :                         ..
     912              :                     } => {
     913            0 :                         let source_url = format!(
     914            0 :                             "{}/{}?versionid={}",
     915            0 :                             self.client
     916            0 :                                 .url()
     917            0 :                                 .map_err(|e| TimeTravelError::Other(anyhow!("{e}")))?,
     918              :                             key,
     919              :                             version_id.0
     920              :                         );
     921            0 :                         tracing::debug!(
     922            0 :                             "Promoting old version {} for {key} at {}...",
     923              :                             version_id.0,
     924              :                             source_url
     925              :                         );
     926            0 :                         backoff::retry(
     927            0 :                             || async {
     928            0 :                                 let blob_client = self.client.blob_client(key.clone());
     929            0 :                                 let op = blob_client.copy(Url::from_str(&source_url).unwrap());
     930            0 :                                 tokio::select! {
     931            0 :                                     res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
     932            0 :                                     _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
     933              :                                 }
     934            0 :                             },
     935            0 :                             is_permanent,
     936            0 :                             warn_threshold,
     937            0 :                             max_retries,
     938            0 :                             "copying object version for time_travel_recover",
     939            0 :                             cancel,
     940              :                         )
     941            0 :                         .await
     942            0 :                         .ok_or_else(|| TimeTravelError::Cancelled)
     943            0 :                         .and_then(|x| x)?;
     944            0 :                         tracing::info!(?version_id, %key, "Copied old version in Azure blob storage");
     945              :                     }
     946              :                     Version {
     947              :                         kind: VersionKind::DeletionMarker,
     948              :                         ..
     949            0 :                     } => {
     950            0 :                         do_delete = true;
     951            0 :                     }
     952              :                 }
     953              :             };
     954            0 :             if do_delete {
     955            0 :                 if matches!(last_vd.kind, VersionKind::DeletionMarker) {
     956              :                     // Key has since been deleted (but there was some history), no need to do anything
     957            0 :                     tracing::debug!("Key {key} already deleted, skipping.");
     958              :                 } else {
     959            0 :                     tracing::debug!("Deleting {key}...");
     960              : 
     961            0 :                     self.delete(&RemotePath::from_string(&key).unwrap(), cancel)
     962            0 :                         .await
     963            0 :                         .map_err(|e| {
     964              :                             // delete_oid0 will use TimeoutOrCancel
     965            0 :                             if TimeoutOrCancel::caused_by_cancel(&e) {
     966            0 :                                 TimeTravelError::Cancelled
     967              :                             } else {
     968            0 :                                 TimeTravelError::Other(e)
     969              :                             }
     970            0 :                         })?;
     971              :                 }
     972            0 :             }
     973              :         }
     974              : 
     975            0 :         Ok(())
     976            0 :     }
     977              : }
     978              : 
     979              : pin_project_lite::pin_project! {
     980              :     /// Hack to work around not being able to stream once with azure sdk.
     981              :     ///
     982              :     /// Azure sdk clones streams around with the assumption that they are like
     983              :     /// `Arc<tokio::fs::File>` (except not supporting tokio), however our streams are not like
     984              :     /// that. For example for an `index_part.json` we just have a single chunk of [`Bytes`]
     985              :     /// representing the whole serialized vec. It could be trivially cloneable and "semi-trivially"
     986              :     /// seekable, but we can also just re-try the request easier.
     987              :     #[project = NonSeekableStreamProj]
     988              :     enum NonSeekableStream<S> {
     989              :         /// A stream wrappers initial form.
     990              :         ///
     991              :         /// Mutex exists to allow moving when cloning. If the sdk changes to do less than 1
     992              :         /// clone before first request, then this must be changed.
     993              :         Initial {
     994              :             inner: std::sync::Mutex<Option<tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>>>,
     995              :             len: usize,
     996              :         },
     997              :         /// The actually readable variant, produced by cloning the Initial variant.
     998              :         ///
     999              :         /// The sdk currently always clones once, even without retry policy.
    1000              :         Actual {
    1001              :             #[pin]
    1002              :             inner: tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>,
    1003              :             len: usize,
    1004              :             read_any: bool,
    1005              :         },
    1006              :         /// Most likely unneeded, but left to make life easier, in case more clones are added.
    1007              :         Cloned {
    1008              :             len_was: usize,
    1009              :         }
    1010              :     }
    1011              : }
    1012              : 
    1013              : impl<S> NonSeekableStream<S>
    1014              : where
    1015              :     S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
    1016              : {
    1017           93 :     fn new(inner: S, len: usize) -> NonSeekableStream<S> {
    1018              :         use tokio_util::compat::TokioAsyncReadCompatExt;
    1019              : 
    1020           93 :         let inner = tokio_util::io::StreamReader::new(inner).compat();
    1021           93 :         let inner = Some(inner);
    1022           93 :         let inner = std::sync::Mutex::new(inner);
    1023           93 :         NonSeekableStream::Initial { inner, len }
    1024            0 :     }
    1025              : }
    1026              : 
    1027              : impl<S> std::fmt::Debug for NonSeekableStream<S> {
    1028            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    1029            0 :         match self {
    1030            0 :             Self::Initial { len, .. } => f.debug_struct("Initial").field("len", len).finish(),
    1031            0 :             Self::Actual { len, .. } => f.debug_struct("Actual").field("len", len).finish(),
    1032            0 :             Self::Cloned { len_was, .. } => f.debug_struct("Cloned").field("len", len_was).finish(),
    1033              :         }
    1034            0 :     }
    1035              : }
    1036              : 
    1037              : impl<S> futures::io::AsyncRead for NonSeekableStream<S>
    1038              : where
    1039              :     S: Stream<Item = std::io::Result<Bytes>>,
    1040              : {
    1041           93 :     fn poll_read(
    1042           93 :         self: std::pin::Pin<&mut Self>,
    1043           93 :         cx: &mut std::task::Context<'_>,
    1044           93 :         buf: &mut [u8],
    1045           93 :     ) -> std::task::Poll<std::io::Result<usize>> {
    1046           93 :         match self.project() {
    1047              :             NonSeekableStreamProj::Actual {
    1048           93 :                 inner, read_any, ..
    1049              :             } => {
    1050           93 :                 *read_any = true;
    1051           93 :                 inner.poll_read(cx, buf)
    1052              :             }
    1053              :             // NonSeekableStream::Initial does not support reading because it is just much easier
    1054              :             // to have the mutex in place where one does not poll the contents, or that's how it
    1055              :             // seemed originally. If there is a version upgrade which changes the cloning, then
    1056              :             // that support needs to be hacked in.
    1057              :             //
    1058              :             // including {self:?} into the message would be useful, but unsure how to unproject.
    1059            0 :             _ => std::task::Poll::Ready(Err(std::io::Error::other(
    1060            0 :                 "cloned or initial values cannot be read",
    1061            0 :             ))),
    1062              :         }
    1063            0 :     }
    1064              : }
    1065              : 
    1066              : impl<S> Clone for NonSeekableStream<S> {
    1067              :     /// Weird clone implementation exists to support the sdk doing cloning before issuing the first
    1068              :     /// request, see type documentation.
    1069           93 :     fn clone(&self) -> Self {
    1070              :         use NonSeekableStream::*;
    1071              : 
    1072           93 :         match self {
    1073           93 :             Initial { inner, len } => {
    1074           93 :                 if let Some(inner) = inner.lock().unwrap().take() {
    1075           93 :                     Actual {
    1076           93 :                         inner,
    1077           93 :                         len: *len,
    1078           93 :                         read_any: false,
    1079           93 :                     }
    1080              :                 } else {
    1081            0 :                     Self::Cloned { len_was: *len }
    1082              :                 }
    1083              :             }
    1084            0 :             Actual { len, .. } => Cloned { len_was: *len },
    1085            0 :             Cloned { len_was } => Cloned { len_was: *len_was },
    1086              :         }
    1087            0 :     }
    1088              : }
    1089              : 
    1090              : #[async_trait::async_trait]
    1091              : impl<S> azure_core::SeekableStream for NonSeekableStream<S>
    1092              : where
    1093              :     S: Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync + 'static,
    1094              : {
    1095            0 :     async fn reset(&mut self) -> azure_core::error::Result<()> {
    1096              :         use NonSeekableStream::*;
    1097              : 
    1098            0 :         let msg = match self {
    1099            0 :             Initial { inner, .. } => {
    1100            0 :                 if inner.get_mut().unwrap().is_some() {
    1101            0 :                     return Ok(());
    1102              :                 } else {
    1103            0 :                     "reset after first clone is not supported"
    1104              :                 }
    1105              :             }
    1106            0 :             Actual { read_any, .. } if !*read_any => return Ok(()),
    1107            0 :             Actual { .. } => "reset after reading is not supported",
    1108            0 :             Cloned { .. } => "reset after second clone is not supported",
    1109              :         };
    1110            0 :         Err(azure_core::error::Error::new(
    1111            0 :             azure_core::error::ErrorKind::Io,
    1112            0 :             std::io::Error::other(msg),
    1113            0 :         ))
    1114            0 :     }
    1115              : 
    1116              :     // Note: it is not documented if this should be the total or remaining length, total passes the
    1117              :     // tests.
    1118           93 :     fn len(&self) -> usize {
    1119              :         use NonSeekableStream::*;
    1120           93 :         match self {
    1121           93 :             Initial { len, .. } => *len,
    1122            0 :             Actual { len, .. } => *len,
    1123            0 :             Cloned { len_was, .. } => *len_was,
    1124              :         }
    1125            0 :     }
    1126              : }
        

Generated by: LCOV version 2.1-beta