LCOV - differential code coverage report
Current view: top level - libs/remote_storage/src - azure_blob.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 265 0 265
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 46 0 46
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Azure Blob Storage wrapper
       2                 : 
       3                 : use std::env;
       4                 : use std::num::NonZeroU32;
       5                 : use std::sync::Arc;
       6                 : use std::{borrow::Cow, collections::HashMap, io::Cursor};
       7                 : 
       8                 : use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
       9                 : use anyhow::Result;
      10                 : use azure_core::request_options::{MaxResults, Metadata, Range};
      11                 : use azure_core::Header;
      12                 : use azure_identity::DefaultAzureCredential;
      13                 : use azure_storage::StorageCredentials;
      14                 : use azure_storage_blobs::prelude::ClientBuilder;
      15                 : use azure_storage_blobs::{
      16                 :     blob::operations::GetBlobBuilder,
      17                 :     prelude::{BlobClient, ContainerClient},
      18                 : };
      19                 : use futures_util::StreamExt;
      20                 : use http_types::StatusCode;
      21                 : use tokio::io::AsyncRead;
      22                 : use tracing::debug;
      23                 : 
      24                 : use crate::s3_bucket::RequestKind;
      25                 : use crate::{
      26                 :     AzureConfig, ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage,
      27                 :     StorageMetadata,
      28                 : };
      29                 : 
      30                 : pub struct AzureBlobStorage {
      31                 :     client: ContainerClient,
      32                 :     prefix_in_container: Option<String>,
      33                 :     max_keys_per_list_response: Option<NonZeroU32>,
      34                 :     concurrency_limiter: ConcurrencyLimiter,
      35                 : }
      36                 : 
      37                 : impl AzureBlobStorage {
      38 UBC           0 :     pub fn new(azure_config: &AzureConfig) -> Result<Self> {
      39               0 :         debug!(
      40               0 :             "Creating azure remote storage for azure container {}",
      41               0 :             azure_config.container_name
      42               0 :         );
      43                 : 
      44               0 :         let account = env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT");
      45                 : 
      46                 :         // If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
      47                 :         // otherwise try the token based credentials.
      48               0 :         let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
      49               0 :             StorageCredentials::access_key(account.clone(), access_key)
      50                 :         } else {
      51               0 :             let token_credential = DefaultAzureCredential::default();
      52               0 :             StorageCredentials::token_credential(Arc::new(token_credential))
      53                 :         };
      54                 : 
      55               0 :         let builder = ClientBuilder::new(account, credentials);
      56               0 : 
      57               0 :         let client = builder.container_client(azure_config.container_name.to_owned());
      58                 : 
      59               0 :         let max_keys_per_list_response =
      60               0 :             if let Some(limit) = azure_config.max_keys_per_list_response {
      61                 :                 Some(
      62               0 :                     NonZeroU32::new(limit as u32)
      63               0 :                         .ok_or_else(|| anyhow::anyhow!("max_keys_per_list_response can't be 0"))?,
      64                 :                 )
      65                 :             } else {
      66               0 :                 None
      67                 :             };
      68                 : 
      69               0 :         Ok(AzureBlobStorage {
      70               0 :             client,
      71               0 :             prefix_in_container: azure_config.prefix_in_container.to_owned(),
      72               0 :             max_keys_per_list_response,
      73               0 :             concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
      74               0 :         })
      75               0 :     }
      76                 : 
      77               0 :     pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
      78               0 :         assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
      79               0 :         let path_string = path
      80               0 :             .get_path()
      81               0 :             .as_str()
      82               0 :             .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
      83               0 :         match &self.prefix_in_container {
      84               0 :             Some(prefix) => {
      85               0 :                 if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
      86               0 :                     prefix.clone() + path_string
      87                 :                 } else {
      88               0 :                     format!("{prefix}{REMOTE_STORAGE_PREFIX_SEPARATOR}{path_string}")
      89                 :                 }
      90                 :             }
      91               0 :             None => path_string.to_string(),
      92                 :         }
      93               0 :     }
      94                 : 
      95               0 :     fn name_to_relative_path(&self, key: &str) -> RemotePath {
      96               0 :         let relative_path =
      97               0 :             match key.strip_prefix(self.prefix_in_container.as_deref().unwrap_or_default()) {
      98               0 :                 Some(stripped) => stripped,
      99                 :                 // we rely on Azure to return properly prefixed paths
     100                 :                 // for requests with a certain prefix
     101               0 :                 None => panic!(
     102               0 :                     "Key {key} does not start with container prefix {:?}",
     103               0 :                     self.prefix_in_container
     104               0 :                 ),
     105                 :             };
     106               0 :         RemotePath(
     107               0 :             relative_path
     108               0 :                 .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
     109               0 :                 .collect(),
     110               0 :         )
     111               0 :     }
     112                 : 
     113               0 :     async fn download_for_builder(
     114               0 :         &self,
     115               0 :         metadata: StorageMetadata,
     116               0 :         builder: GetBlobBuilder,
     117               0 :     ) -> Result<Download, DownloadError> {
     118               0 :         let mut response = builder.into_stream();
     119               0 : 
     120               0 :         // TODO give proper streaming response instead of buffering into RAM
     121               0 :         // https://github.com/neondatabase/neon/issues/5563
     122               0 :         let mut buf = Vec::new();
     123               0 :         while let Some(part) = response.next().await {
     124               0 :             let part = match part {
     125               0 :                 Ok(l) => l,
     126               0 :                 Err(e) => {
     127               0 :                     return Err(if let Some(http_err) = e.as_http_error() {
     128               0 :                         match http_err.status() {
     129               0 :                             StatusCode::NotFound => DownloadError::NotFound,
     130                 :                             StatusCode::BadRequest => {
     131               0 :                                 DownloadError::BadInput(anyhow::Error::new(e))
     132                 :                             }
     133               0 :                             _ => DownloadError::Other(anyhow::Error::new(e)),
     134                 :                         }
     135                 :                     } else {
     136               0 :                         DownloadError::Other(e.into())
     137                 :                     });
     138                 :                 }
     139                 :             };
     140               0 :             let data = part
     141               0 :                 .data
     142               0 :                 .collect()
     143               0 :                 .await
     144               0 :                 .map_err(|e| DownloadError::Other(e.into()))?;
     145               0 :             buf.extend_from_slice(&data.slice(..));
     146                 :         }
     147               0 :         Ok(Download {
     148               0 :             download_stream: Box::pin(Cursor::new(buf)),
     149               0 :             metadata: Some(metadata),
     150               0 :         })
     151               0 :     }
     152                 :     // TODO get rid of this function once we have metadata included in the response
     153                 :     // https://github.com/Azure/azure-sdk-for-rust/issues/1439
     154               0 :     async fn get_metadata(
     155               0 :         &self,
     156               0 :         blob_client: &BlobClient,
     157               0 :     ) -> Result<StorageMetadata, DownloadError> {
     158               0 :         let builder = blob_client.get_metadata();
     159               0 : 
     160               0 :         match builder.into_future().await {
     161               0 :             Ok(r) => {
     162               0 :                 let mut map = HashMap::new();
     163                 : 
     164               0 :                 for md in r.metadata.iter() {
     165               0 :                     map.insert(
     166               0 :                         md.name().as_str().to_string(),
     167               0 :                         md.value().as_str().to_string(),
     168               0 :                     );
     169               0 :                 }
     170               0 :                 Ok(StorageMetadata(map))
     171                 :             }
     172               0 :             Err(e) => {
     173               0 :                 return Err(if let Some(http_err) = e.as_http_error() {
     174               0 :                     match http_err.status() {
     175               0 :                         StatusCode::NotFound => DownloadError::NotFound,
     176               0 :                         StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(e)),
     177               0 :                         _ => DownloadError::Other(anyhow::Error::new(e)),
     178                 :                     }
     179                 :                 } else {
     180               0 :                     DownloadError::Other(e.into())
     181                 :                 });
     182                 :             }
     183                 :         }
     184               0 :     }
     185                 : 
     186               0 :     async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
     187               0 :         self.concurrency_limiter
     188               0 :             .acquire(kind)
     189               0 :             .await
     190               0 :             .expect("semaphore is never closed")
     191               0 :     }
     192                 : }
     193                 : 
     194               0 : fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
     195               0 :     let mut res = Metadata::new();
     196               0 :     for (k, v) in metadata.0.into_iter() {
     197               0 :         res.insert(k, v);
     198               0 :     }
     199               0 :     res
     200               0 : }
     201                 : 
     202                 : #[async_trait::async_trait]
     203                 : impl RemoteStorage for AzureBlobStorage {
     204               0 :     async fn list_prefixes(
     205               0 :         &self,
     206               0 :         prefix: Option<&RemotePath>,
     207               0 :     ) -> Result<Vec<RemotePath>, DownloadError> {
     208                 :         // get the passed prefix or if it is not set use prefix_in_bucket value
     209               0 :         let list_prefix = prefix
     210               0 :             .map(|p| self.relative_path_to_name(p))
     211               0 :             .or_else(|| self.prefix_in_container.clone())
     212               0 :             .map(|mut p| {
     213               0 :                 // required to end with a separator
     214               0 :                 // otherwise request will return only the entry of a prefix
     215               0 :                 if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
     216               0 :                     p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
     217               0 :                 }
     218               0 :                 p
     219               0 :             });
     220               0 : 
     221               0 :         let mut builder = self
     222               0 :             .client
     223               0 :             .list_blobs()
     224               0 :             .delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
     225                 : 
     226               0 :         if let Some(prefix) = list_prefix {
     227               0 :             builder = builder.prefix(Cow::from(prefix.to_owned()));
     228               0 :         }
     229                 : 
     230               0 :         if let Some(limit) = self.max_keys_per_list_response {
     231               0 :             builder = builder.max_results(MaxResults::new(limit));
     232               0 :         }
     233                 : 
     234               0 :         let mut response = builder.into_stream();
     235               0 :         let mut res = Vec::new();
     236               0 :         while let Some(l) = response.next().await {
     237               0 :             let entry = match l {
     238               0 :                 Ok(l) => l,
     239               0 :                 Err(e) => {
     240               0 :                     return Err(if let Some(http_err) = e.as_http_error() {
     241               0 :                         match http_err.status() {
     242               0 :                             StatusCode::NotFound => DownloadError::NotFound,
     243                 :                             StatusCode::BadRequest => {
     244               0 :                                 DownloadError::BadInput(anyhow::Error::new(e))
     245                 :                             }
     246               0 :                             _ => DownloadError::Other(anyhow::Error::new(e)),
     247                 :                         }
     248                 :                     } else {
     249               0 :                         DownloadError::Other(e.into())
     250                 :                     });
     251                 :                 }
     252                 :             };
     253               0 :             let name_iter = entry
     254               0 :                 .blobs
     255               0 :                 .prefixes()
     256               0 :                 .map(|prefix| self.name_to_relative_path(&prefix.name));
     257               0 :             res.extend(name_iter);
     258                 :         }
     259               0 :         Ok(res)
     260               0 :     }
     261                 : 
     262               0 :     async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
     263               0 :         let folder_name = folder
     264               0 :             .map(|p| self.relative_path_to_name(p))
     265               0 :             .or_else(|| self.prefix_in_container.clone());
     266               0 : 
     267               0 :         let mut builder = self.client.list_blobs();
     268                 : 
     269               0 :         if let Some(folder_name) = folder_name {
     270               0 :             builder = builder.prefix(Cow::from(folder_name.to_owned()));
     271               0 :         }
     272                 : 
     273               0 :         if let Some(limit) = self.max_keys_per_list_response {
     274               0 :             builder = builder.max_results(MaxResults::new(limit));
     275               0 :         }
     276                 : 
     277               0 :         let mut response = builder.into_stream();
     278               0 :         let mut res = Vec::new();
     279               0 :         while let Some(l) = response.next().await {
     280               0 :             let entry = l.map_err(anyhow::Error::new)?;
     281               0 :             let name_iter = entry
     282               0 :                 .blobs
     283               0 :                 .blobs()
     284               0 :                 .map(|bl| self.name_to_relative_path(&bl.name));
     285               0 :             res.extend(name_iter);
     286                 :         }
     287               0 :         Ok(res)
     288               0 :     }
     289                 : 
     290               0 :     async fn upload(
     291               0 :         &self,
     292               0 :         mut from: impl AsyncRead + Unpin + Send + Sync + 'static,
     293               0 :         data_size_bytes: usize,
     294               0 :         to: &RemotePath,
     295               0 :         metadata: Option<StorageMetadata>,
     296               0 :     ) -> anyhow::Result<()> {
     297               0 :         let _permit = self.permit(RequestKind::Put).await;
     298               0 :         let blob_client = self.client.blob_client(self.relative_path_to_name(to));
     299               0 : 
     300               0 :         // TODO FIX THIS UGLY HACK and don't buffer the entire object
     301               0 :         // into RAM here, but use the streaming interface. For that,
     302               0 :         // we'd have to change the interface though...
     303               0 :         // https://github.com/neondatabase/neon/issues/5563
     304               0 :         let mut buf = Vec::with_capacity(data_size_bytes);
     305               0 :         tokio::io::copy(&mut from, &mut buf).await?;
     306               0 :         let body = azure_core::Body::Bytes(buf.into());
     307               0 : 
     308               0 :         let mut builder = blob_client.put_block_blob(body);
     309                 : 
     310               0 :         if let Some(metadata) = metadata {
     311               0 :             builder = builder.metadata(to_azure_metadata(metadata));
     312               0 :         }
     313                 : 
     314               0 :         let _response = builder.into_future().await?;
     315                 : 
     316               0 :         Ok(())
     317               0 :     }
     318                 : 
     319               0 :     async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
     320               0 :         let _permit = self.permit(RequestKind::Get).await;
     321               0 :         let blob_client = self.client.blob_client(self.relative_path_to_name(from));
     322                 : 
     323               0 :         let metadata = self.get_metadata(&blob_client).await?;
     324                 : 
     325               0 :         let builder = blob_client.get();
     326               0 : 
     327               0 :         self.download_for_builder(metadata, builder).await
     328               0 :     }
     329                 : 
     330               0 :     async fn download_byte_range(
     331               0 :         &self,
     332               0 :         from: &RemotePath,
     333               0 :         start_inclusive: u64,
     334               0 :         end_exclusive: Option<u64>,
     335               0 :     ) -> Result<Download, DownloadError> {
     336               0 :         let _permit = self.permit(RequestKind::Get).await;
     337               0 :         let blob_client = self.client.blob_client(self.relative_path_to_name(from));
     338                 : 
     339               0 :         let metadata = self.get_metadata(&blob_client).await?;
     340                 : 
     341               0 :         let mut builder = blob_client.get();
     342                 : 
     343               0 :         if let Some(end_exclusive) = end_exclusive {
     344               0 :             builder = builder.range(Range::new(start_inclusive, end_exclusive));
     345               0 :         } else {
     346               0 :             // Open ranges are not supported by the SDK so we work around
     347               0 :             // by setting the upper limit extremely high (but high enough
     348               0 :             // to still be representable by signed 64 bit integers).
     349               0 :             // TODO remove workaround once the SDK adds open range support
     350               0 :             // https://github.com/Azure/azure-sdk-for-rust/issues/1438
     351               0 :             let end_exclusive = u64::MAX / 4;
     352               0 :             builder = builder.range(Range::new(start_inclusive, end_exclusive));
     353               0 :         }
     354                 : 
     355               0 :         self.download_for_builder(metadata, builder).await
     356               0 :     }
     357                 : 
     358               0 :     async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
     359               0 :         let _permit = self.permit(RequestKind::Delete).await;
     360               0 :         let blob_client = self.client.blob_client(self.relative_path_to_name(path));
     361               0 : 
     362               0 :         let builder = blob_client.delete();
     363               0 : 
     364               0 :         match builder.into_future().await {
     365               0 :             Ok(_response) => Ok(()),
     366               0 :             Err(e) => {
     367               0 :                 if let Some(http_err) = e.as_http_error() {
     368               0 :                     if http_err.status() == StatusCode::NotFound {
     369               0 :                         return Ok(());
     370               0 :                     }
     371               0 :                 }
     372               0 :                 Err(anyhow::Error::new(e))
     373                 :             }
     374                 :         }
     375               0 :     }
     376                 : 
     377               0 :     async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
     378                 :         // Permit is already obtained by inner delete function
     379                 : 
     380                 :         // TODO batch requests are also not supported by the SDK
     381                 :         // https://github.com/Azure/azure-sdk-for-rust/issues/1068
     382                 :         // https://github.com/Azure/azure-sdk-for-rust/issues/1249
     383               0 :         for path in paths {
     384               0 :             self.delete(path).await?;
     385                 :         }
     386               0 :         Ok(())
     387               0 :     }
     388                 : }
        

Generated by: LCOV version 2.1-beta