LCOV - code coverage report
Current view: top level - libs/remote_storage/src - azure_blob.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 18.2 % 335 61
Test Date: 2024-02-07 07:37:29 Functions: 20.4 % 54 11

            Line data    Source code
       1              : //! Azure Blob Storage wrapper
       2              : 
       3              : use std::borrow::Cow;
       4              : use std::collections::HashMap;
       5              : use std::env;
       6              : use std::num::NonZeroU32;
       7              : use std::pin::Pin;
       8              : use std::str::FromStr;
       9              : use std::sync::Arc;
      10              : use std::time::Duration;
      11              : use std::time::SystemTime;
      12              : 
      13              : use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
      14              : use anyhow::Result;
      15              : use azure_core::request_options::{MaxResults, Metadata, Range};
      16              : use azure_core::RetryOptions;
      17              : use azure_identity::DefaultAzureCredential;
      18              : use azure_storage::StorageCredentials;
      19              : use azure_storage_blobs::blob::CopyStatus;
      20              : use azure_storage_blobs::prelude::ClientBuilder;
      21              : use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient};
      22              : use bytes::Bytes;
      23              : use futures::stream::Stream;
      24              : use futures_util::StreamExt;
      25              : use http_types::{StatusCode, Url};
      26              : use tokio::time::Instant;
      27              : use tokio_util::sync::CancellationToken;
      28              : use tracing::debug;
      29              : 
      30              : use crate::s3_bucket::RequestKind;
      31              : use crate::TimeTravelError;
      32              : use crate::{
      33              :     AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath,
      34              :     RemoteStorage, StorageMetadata,
      35              : };
      36              : 
      37              : pub struct AzureBlobStorage {
      38              :     client: ContainerClient,
      39              :     prefix_in_container: Option<String>,
      40              :     max_keys_per_list_response: Option<NonZeroU32>,
      41              :     concurrency_limiter: ConcurrencyLimiter,
      42              : }
      43              : 
      44              : impl AzureBlobStorage {
      45            6 :     pub fn new(azure_config: &AzureConfig) -> Result<Self> {
      46            6 :         debug!(
      47            0 :             "Creating azure remote storage for azure container {}",
      48            0 :             azure_config.container_name
      49            0 :         );
      50              : 
      51            6 :         let account = env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT");
      52              : 
      53              :         // If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
      54              :         // otherwise try the token based credentials.
      55            6 :         let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
      56            6 :             StorageCredentials::access_key(account.clone(), access_key)
      57              :         } else {
      58            0 :             let token_credential = DefaultAzureCredential::default();
      59            0 :             StorageCredentials::token_credential(Arc::new(token_credential))
      60              :         };
      61              : 
      62              :         // we have an outer retry
      63            6 :         let builder = ClientBuilder::new(account, credentials).retry(RetryOptions::none());
      64            6 : 
      65            6 :         let client = builder.container_client(azure_config.container_name.to_owned());
      66              : 
      67            6 :         let max_keys_per_list_response =
      68            6 :             if let Some(limit) = azure_config.max_keys_per_list_response {
      69              :                 Some(
      70            2 :                     NonZeroU32::new(limit as u32)
      71            2 :                         .ok_or_else(|| anyhow::anyhow!("max_keys_per_list_response can't be 0"))?,
      72              :                 )
      73              :             } else {
      74            4 :                 None
      75              :             };
      76              : 
      77            6 :         Ok(AzureBlobStorage {
      78            6 :             client,
      79            6 :             prefix_in_container: azure_config.prefix_in_container.to_owned(),
      80            6 :             max_keys_per_list_response,
      81            6 :             concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
      82            6 :         })
      83            6 :     }
      84              : 
      85          107 :     pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
      86          107 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
      87          107 :         let path_string = path
      88          107 :             .get_path()
      89          107 :             .as_str()
      90          107 :             .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
      91          107 :         match &self.prefix_in_container {
      92          107 :             Some(prefix) => {
      93          107 :                 if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
      94          107 :                     prefix.clone() + path_string
      95              :                 } else {
      96            0 :                     format!("{prefix}{REMOTE_STORAGE_PREFIX_SEPARATOR}{path_string}")
      97              :                 }
      98              :             }
      99            0 :             None => path_string.to_string(),
     100              :         }
     101          107 :     }
     102              : 
     103           51 :     fn name_to_relative_path(&self, key: &str) -> RemotePath {
     104           51 :         let relative_path =
     105           51 :             match key.strip_prefix(self.prefix_in_container.as_deref().unwrap_or_default()) {
     106           51 :                 Some(stripped) => stripped,
     107              :                 // we rely on Azure to return properly prefixed paths
     108              :                 // for requests with a certain prefix
     109            0 :                 None => panic!(
     110            0 :                     "Key {key} does not start with container prefix {:?}",
     111            0 :                     self.prefix_in_container
     112            0 :                 ),
     113              :             };
     114           51 :         RemotePath(
     115           51 :             relative_path
     116           51 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     117           51 :                 .collect(),
     118           51 :         )
     119           51 :     }
     120              : 
     121            7 :     async fn download_for_builder(
     122            7 :         &self,
     123            7 :         builder: GetBlobBuilder,
     124            7 :     ) -> Result<Download, DownloadError> {
     125            0 :         let mut response = builder.into_stream();
     126            0 : 
     127            0 :         let mut etag = None;
     128            0 :         let mut last_modified = None;
     129            0 :         let mut metadata = HashMap::new();
     130            0 :         // TODO give proper streaming response instead of buffering into RAM
     131            0 :         // https://github.com/neondatabase/neon/issues/5563
     132            0 : 
     133            0 :         let mut bufs = Vec::new();
     134            0 :         while let Some(part) = response.next().await {
     135            0 :             let part = part.map_err(to_download_error)?;
     136            0 :             let etag_str: &str = part.blob.properties.etag.as_ref();
     137            0 :             if etag.is_none() {
     138            0 :                 etag = Some(etag.unwrap_or_else(|| etag_str.to_owned()));
     139            0 :             }
     140            0 :             if last_modified.is_none() {
     141            0 :                 last_modified = Some(part.blob.properties.last_modified.into());
     142            0 :             }
     143            0 :             if let Some(blob_meta) = part.blob.metadata {
     144            0 :                 metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned())));
     145            0 :             }
     146            0 :             let data = part
     147            0 :                 .data
     148            0 :                 .collect()
     149            0 :                 .await
     150            0 :                 .map_err(|e| DownloadError::Other(e.into()))?;
     151            0 :             bufs.push(data);
     152              :         }
     153            0 :         Ok(Download {
     154            0 :             download_stream: Box::pin(futures::stream::iter(bufs.into_iter().map(Ok))),
     155            0 :             etag,
     156            0 :             last_modified,
     157            0 :             metadata: Some(StorageMetadata(metadata)),
     158            0 :         })
     159            0 :     }
     160              : 
     161          104 :     async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
     162            0 :         self.concurrency_limiter
     163            0 :             .acquire(kind)
     164            0 :             .await
     165            0 :             .expect("semaphore is never closed")
     166            0 :     }
     167              : }
     168              : 
     169            0 : fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
     170            0 :     let mut res = Metadata::new();
     171            0 :     for (k, v) in metadata.0.into_iter() {
     172            0 :         res.insert(k, v);
     173            0 :     }
     174            0 :     res
     175            0 : }
     176              : 
     177            0 : fn to_download_error(error: azure_core::Error) -> DownloadError {
     178            0 :     if let Some(http_err) = error.as_http_error() {
     179            0 :         match http_err.status() {
     180            0 :             StatusCode::NotFound => DownloadError::NotFound,
     181            0 :             StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
     182            0 :             _ => DownloadError::Other(anyhow::Error::new(error)),
     183              :         }
     184              :     } else {
     185            0 :         DownloadError::Other(error.into())
     186              :     }
     187            0 : }
     188              : 
     189              : impl RemoteStorage for AzureBlobStorage {
     190            5 :     async fn list(
     191            5 :         &self,
     192            5 :         prefix: Option<&RemotePath>,
     193            5 :         mode: ListingMode,
     194            5 :     ) -> anyhow::Result<Listing, DownloadError> {
     195            0 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     196            0 :         let list_prefix = prefix
     197            0 :             .map(|p| self.relative_path_to_name(p))
     198            0 :             .or_else(|| self.prefix_in_container.clone())
     199            0 :             .map(|mut p| {
     200              :                 // required to end with a separator
     201              :                 // otherwise request will return only the entry of a prefix
     202            0 :                 if matches!(mode, ListingMode::WithDelimiter)
     203            0 :                     && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
     204            0 :                 {
     205            0 :                     p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     206            0 :                 }
     207            0 :                 p
     208            0 :             });
     209            0 : 
     210            0 :         let mut builder = self.client.list_blobs();
     211            0 : 
     212            0 :         if let ListingMode::WithDelimiter = mode {
     213            0 :             builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     214            0 :         }
     215              : 
     216            0 :         if let Some(prefix) = list_prefix {
     217            0 :             builder = builder.prefix(Cow::from(prefix.to_owned()));
     218            0 :         }
     219              : 
     220            0 :         if let Some(limit) = self.max_keys_per_list_response {
     221            0 :             builder = builder.max_results(MaxResults::new(limit));
     222            0 :         }
     223              : 
     224            0 :         let mut response = builder.into_stream();
     225            0 :         let mut res = Listing::default();
     226            0 :         while let Some(l) = response.next().await {
     227            0 :             let entry = l.map_err(to_download_error)?;
     228            0 :             let prefix_iter = entry
     229            0 :                 .blobs
     230            0 :                 .prefixes()
     231            0 :                 .map(|prefix| self.name_to_relative_path(&prefix.name));
     232            0 :             res.prefixes.extend(prefix_iter);
     233            0 : 
     234            0 :             let blob_iter = entry
     235            0 :                 .blobs
     236            0 :                 .blobs()
     237            0 :                 .map(|k| self.name_to_relative_path(&k.name));
     238            0 :             res.keys.extend(blob_iter);
     239              :         }
     240            0 :         Ok(res)
     241            0 :     }
     242              : 
     243            0 :     async fn upload(
     244            0 :         &self,
     245            0 :         from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     246            0 :         data_size_bytes: usize,
     247            0 :         to: &RemotePath,
     248            0 :         metadata: Option<StorageMetadata>,
     249            0 :     ) -> anyhow::Result<()> {
     250            0 :         let _permit = self.permit(RequestKind::Put).await;
     251            0 :         let blob_client = self.client.blob_client(self.relative_path_to_name(to));
     252            0 : 
     253            0 :         let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
     254            0 :             Box::pin(from);
     255            0 : 
     256            0 :         let from = NonSeekableStream::new(from, data_size_bytes);
     257            0 : 
     258            0 :         let body = azure_core::Body::SeekableStream(Box::new(from));
     259            0 : 
     260            0 :         let mut builder = blob_client.put_block_blob(body);
     261              : 
     262            0 :         if let Some(metadata) = metadata {
     263            0 :             builder = builder.metadata(to_azure_metadata(metadata));
     264            0 :         }
     265              : 
     266            0 :         let _response = builder.into_future().await?;
     267              : 
     268            0 :         Ok(())
     269            0 :     }
     270              : 
     271            2 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     272            0 :         let _permit = self.permit(RequestKind::Get).await;
     273            0 :         let blob_client = self.client.blob_client(self.relative_path_to_name(from));
     274            0 : 
     275            0 :         let builder = blob_client.get();
     276            0 : 
     277            0 :         self.download_for_builder(builder).await
     278            0 :     }
     279              : 
     280            5 :     async fn download_byte_range(
     281            5 :         &self,
     282            5 :         from: &RemotePath,
     283            5 :         start_inclusive: u64,
     284            5 :         end_exclusive: Option<u64>,
     285            5 :     ) -> Result<Download, DownloadError> {
     286            0 :         let _permit = self.permit(RequestKind::Get).await;
     287            0 :         let blob_client = self.client.blob_client(self.relative_path_to_name(from));
     288            0 : 
     289            0 :         let mut builder = blob_client.get();
     290              : 
     291            0 :         let range: Range = if let Some(end_exclusive) = end_exclusive {
     292            0 :             (start_inclusive..end_exclusive).into()
     293              :         } else {
     294            0 :             (start_inclusive..).into()
     295              :         };
     296            0 :         builder = builder.range(range);
     297            0 : 
     298            0 :         self.download_for_builder(builder).await
     299            0 :     }
     300              : 
     301           49 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     302            0 :         let _permit = self.permit(RequestKind::Delete).await;
     303            0 :         let blob_client = self.client.blob_client(self.relative_path_to_name(path));
     304            0 : 
     305            0 :         let builder = blob_client.delete();
     306            0 : 
     307            0 :         match builder.into_future().await {
     308            0 :             Ok(_response) => Ok(()),
     309            0 :             Err(e) => {
     310            0 :                 if let Some(http_err) = e.as_http_error() {
     311            0 :                     if http_err.status() == StatusCode::NotFound {
     312            0 :                         return Ok(());
     313            0 :                     }
     314            0 :                 }
     315            0 :                 Err(anyhow::Error::new(e))
     316              :             }
     317              :         }
     318            0 :     }
     319              : 
     320            3 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     321              :         // Permit is already obtained by inner delete function
     322              : 
     323              :         // TODO batch requests are also not supported by the SDK
     324              :         // https://github.com/Azure/azure-sdk-for-rust/issues/1068
     325              :         // https://github.com/Azure/azure-sdk-for-rust/issues/1249
     326            0 :         for path in paths {
     327            0 :             self.delete(path).await?;
     328              :         }
     329            0 :         Ok(())
     330            0 :     }
     331              : 
     332            1 :     async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
     333            0 :         let _permit = self.permit(RequestKind::Copy).await;
     334            0 :         let blob_client = self.client.blob_client(self.relative_path_to_name(to));
     335              : 
     336            0 :         let source_url = format!(
     337            0 :             "{}/{}",
     338            0 :             self.client.url()?,
     339            0 :             self.relative_path_to_name(from)
     340              :         );
     341            0 :         let builder = blob_client.copy(Url::from_str(&source_url)?);
     342              : 
     343            0 :         let result = builder.into_future().await?;
     344              : 
     345            0 :         let mut copy_status = result.copy_status;
     346            0 :         let start_time = Instant::now();
     347              :         const MAX_WAIT_TIME: Duration = Duration::from_secs(60);
     348              :         loop {
     349            0 :             match copy_status {
     350              :                 CopyStatus::Aborted => {
     351            0 :                     anyhow::bail!("Received abort for copy from {from} to {to}.");
     352              :                 }
     353              :                 CopyStatus::Failed => {
     354            0 :                     anyhow::bail!("Received failure response for copy from {from} to {to}.");
     355              :                 }
     356            0 :                 CopyStatus::Success => return Ok(()),
     357            0 :                 CopyStatus::Pending => (),
     358            0 :             }
     359            0 :             // The copy is taking longer. Waiting a second and then re-trying.
     360            0 :             // TODO estimate time based on copy_progress and adjust time based on that
     361            0 :             tokio::time::sleep(Duration::from_millis(1000)).await;
     362            0 :             let properties = blob_client.get_properties().into_future().await?;
     363            0 :             let Some(status) = properties.blob.properties.copy_status else {
     364            0 :                 tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
     365            0 :                 return Ok(());
     366              :             };
     367            0 :             if start_time.elapsed() > MAX_WAIT_TIME {
     368            0 :                 anyhow::bail!("Copy from from {from} to {to} took longer than limit MAX_WAIT_TIME={}s. copy_pogress={:?}.",
     369            0 :                     MAX_WAIT_TIME.as_secs_f32(),
     370            0 :                     properties.blob.properties.copy_progress,
     371            0 :                 );
     372            0 :             }
     373            0 :             copy_status = status;
     374              :         }
     375            0 :     }
     376              : 
     377            0 :     async fn time_travel_recover(
     378            0 :         &self,
     379            0 :         _prefix: Option<&RemotePath>,
     380            0 :         _timestamp: SystemTime,
     381            0 :         _done_if_after: SystemTime,
     382            0 :         _cancel: &CancellationToken,
     383            0 :     ) -> Result<(), TimeTravelError> {
     384            0 :         // TODO use Azure point in time recovery feature for this
     385            0 :         // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
     386            0 :         Err(TimeTravelError::Unimplemented)
     387            0 :     }
     388              : }
     389              : 
     390              : pin_project_lite::pin_project! {
     391              :     /// Hack to work around not being able to stream once with azure sdk.
     392              :     ///
     393              :     /// Azure sdk clones streams around with the assumption that they are like
     394              :     /// `Arc<tokio::fs::File>` (except not supporting tokio), however our streams are not like
     395              :     /// that. For example for an `index_part.json` we just have a single chunk of [`Bytes`]
     396              :     /// representing the whole serialized vec. It could be trivially cloneable and "semi-trivially"
     397              :     /// seekable, but we can also just re-try the request easier.
     398              :     #[project = NonSeekableStreamProj]
     399              :     enum NonSeekableStream<S> {
     400              :         /// A stream wrappers initial form.
     401              :         ///
     402              :         /// Mutex exists to allow moving when cloning. If the sdk changes to do less than 1
     403              :         /// clone before first request, then this must be changed.
     404              :         Initial {
     405              :             inner: std::sync::Mutex<Option<tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>>>,
     406              :             len: usize,
     407              :         },
     408              :         /// The actually readable variant, produced by cloning the Initial variant.
     409              :         ///
     410              :         /// The sdk currently always clones once, even without retry policy.
     411              :         Actual {
     412              :             #[pin]
     413              :             inner: tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>,
     414              :             len: usize,
     415              :             read_any: bool,
     416              :         },
     417              :         /// Most likely unneeded, but left to make life easier, in case more clones are added.
     418              :         Cloned {
     419              :             len_was: usize,
     420              :         }
     421              :     }
     422              : }
     423              : 
     424              : impl<S> NonSeekableStream<S>
     425              : where
     426              :     S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
     427              : {
     428            0 :     fn new(inner: S, len: usize) -> NonSeekableStream<S> {
     429            0 :         use tokio_util::compat::TokioAsyncReadCompatExt;
     430            0 : 
     431            0 :         let inner = tokio_util::io::StreamReader::new(inner).compat();
     432            0 :         let inner = Some(inner);
     433            0 :         let inner = std::sync::Mutex::new(inner);
     434            0 :         NonSeekableStream::Initial { inner, len }
     435            0 :     }
     436              : }
     437              : 
     438              : impl<S> std::fmt::Debug for NonSeekableStream<S> {
     439            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     440            0 :         match self {
     441            0 :             Self::Initial { len, .. } => f.debug_struct("Initial").field("len", len).finish(),
     442            0 :             Self::Actual { len, .. } => f.debug_struct("Actual").field("len", len).finish(),
     443            0 :             Self::Cloned { len_was, .. } => f.debug_struct("Cloned").field("len", len_was).finish(),
     444              :         }
     445            0 :     }
     446              : }
     447              : 
     448              : impl<S> futures::io::AsyncRead for NonSeekableStream<S>
     449              : where
     450              :     S: Stream<Item = std::io::Result<Bytes>>,
     451              : {
     452            0 :     fn poll_read(
     453            0 :         self: std::pin::Pin<&mut Self>,
     454            0 :         cx: &mut std::task::Context<'_>,
     455            0 :         buf: &mut [u8],
     456            0 :     ) -> std::task::Poll<std::io::Result<usize>> {
     457            0 :         match self.project() {
     458              :             NonSeekableStreamProj::Actual {
     459            0 :                 inner, read_any, ..
     460            0 :             } => {
     461            0 :                 *read_any = true;
     462            0 :                 inner.poll_read(cx, buf)
     463              :             }
     464              :             // NonSeekableStream::Initial does not support reading because it is just much easier
     465              :             // to have the mutex in place where one does not poll the contents, or that's how it
     466              :             // seemed originally. If there is a version upgrade which changes the cloning, then
     467              :             // that support needs to be hacked in.
     468              :             //
     469              :             // including {self:?} into the message would be useful, but unsure how to unproject.
     470            0 :             _ => std::task::Poll::Ready(Err(std::io::Error::new(
     471            0 :                 std::io::ErrorKind::Other,
     472            0 :                 "cloned or initial values cannot be read",
     473            0 :             ))),
     474              :         }
     475            0 :     }
     476              : }
     477              : 
     478              : impl<S> Clone for NonSeekableStream<S> {
     479              :     /// Weird clone implementation exists to support the sdk doing cloning before issuing the first
     480              :     /// request, see type documentation.
     481            0 :     fn clone(&self) -> Self {
     482            0 :         use NonSeekableStream::*;
     483            0 : 
     484            0 :         match self {
     485            0 :             Initial { inner, len } => {
     486            0 :                 if let Some(inner) = inner.lock().unwrap().take() {
     487            0 :                     Actual {
     488            0 :                         inner,
     489            0 :                         len: *len,
     490            0 :                         read_any: false,
     491            0 :                     }
     492              :                 } else {
     493            0 :                     Self::Cloned { len_was: *len }
     494              :                 }
     495              :             }
     496            0 :             Actual { len, .. } => Cloned { len_was: *len },
     497            0 :             Cloned { len_was } => Cloned { len_was: *len_was },
     498              :         }
     499            0 :     }
     500              : }
     501              : 
     502              : #[async_trait::async_trait]
     503              : impl<S> azure_core::SeekableStream for NonSeekableStream<S>
     504              : where
     505              :     S: Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync + 'static,
     506              : {
     507            0 :     async fn reset(&mut self) -> azure_core::error::Result<()> {
     508              :         use NonSeekableStream::*;
     509              : 
     510            0 :         let msg = match self {
     511            0 :             Initial { inner, .. } => {
     512            0 :                 if inner.get_mut().unwrap().is_some() {
     513            0 :                     return Ok(());
     514              :                 } else {
     515            0 :                     "reset after first clone is not supported"
     516              :                 }
     517              :             }
     518            0 :             Actual { read_any, .. } if !*read_any => return Ok(()),
     519            0 :             Actual { .. } => "reset after reading is not supported",
     520            0 :             Cloned { .. } => "reset after second clone is not supported",
     521              :         };
     522            0 :         Err(azure_core::error::Error::new(
     523            0 :             azure_core::error::ErrorKind::Io,
     524            0 :             std::io::Error::new(std::io::ErrorKind::Other, msg),
     525            0 :         ))
     526            0 :     }
     527              : 
     528              :     // Note: it is not documented if this should be the total or remaining length, total passes the
     529              :     // tests.
     530            0 :     fn len(&self) -> usize {
     531            0 :         use NonSeekableStream::*;
     532            0 :         match self {
     533            0 :             Initial { len, .. } => *len,
     534            0 :             Actual { len, .. } => *len,
     535            0 :             Cloned { len_was, .. } => *len_was,
     536              :         }
     537            0 :     }
     538              : }
        

Generated by: LCOV version 2.1-beta