TLA Line data Source code
1 : //! Azure Blob Storage wrapper
2 :
3 : use std::env;
4 : use std::num::NonZeroU32;
5 : use std::sync::Arc;
6 : use std::{borrow::Cow, collections::HashMap, io::Cursor};
7 :
8 : use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
9 : use anyhow::Result;
10 : use azure_core::request_options::{MaxResults, Metadata, Range};
11 : use azure_core::Header;
12 : use azure_identity::DefaultAzureCredential;
13 : use azure_storage::StorageCredentials;
14 : use azure_storage_blobs::prelude::ClientBuilder;
15 : use azure_storage_blobs::{
16 : blob::operations::GetBlobBuilder,
17 : prelude::{BlobClient, ContainerClient},
18 : };
19 : use futures_util::StreamExt;
20 : use http_types::StatusCode;
21 : use tokio::io::AsyncRead;
22 : use tracing::debug;
23 :
24 : use crate::s3_bucket::RequestKind;
25 : use crate::{
26 : AzureConfig, ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage,
27 : StorageMetadata,
28 : };
29 :
30 : pub struct AzureBlobStorage {
31 : client: ContainerClient,
32 : prefix_in_container: Option<String>,
33 : max_keys_per_list_response: Option<NonZeroU32>,
34 : concurrency_limiter: ConcurrencyLimiter,
35 : }
36 :
37 : impl AzureBlobStorage {
38 UBC 0 : pub fn new(azure_config: &AzureConfig) -> Result<Self> {
39 0 : debug!(
40 0 : "Creating azure remote storage for azure container {}",
41 0 : azure_config.container_name
42 0 : );
43 :
44 0 : let account = env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT");
45 :
46 : // If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
47 : // otherwise try the token based credentials.
48 0 : let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
49 0 : StorageCredentials::access_key(account.clone(), access_key)
50 : } else {
51 0 : let token_credential = DefaultAzureCredential::default();
52 0 : StorageCredentials::token_credential(Arc::new(token_credential))
53 : };
54 :
55 0 : let builder = ClientBuilder::new(account, credentials);
56 0 :
57 0 : let client = builder.container_client(azure_config.container_name.to_owned());
58 :
59 0 : let max_keys_per_list_response =
60 0 : if let Some(limit) = azure_config.max_keys_per_list_response {
61 : Some(
62 0 : NonZeroU32::new(limit as u32)
63 0 : .ok_or_else(|| anyhow::anyhow!("max_keys_per_list_response can't be 0"))?,
64 : )
65 : } else {
66 0 : None
67 : };
68 :
69 0 : Ok(AzureBlobStorage {
70 0 : client,
71 0 : prefix_in_container: azure_config.prefix_in_container.to_owned(),
72 0 : max_keys_per_list_response,
73 0 : concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
74 0 : })
75 0 : }
76 :
77 0 : pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
78 0 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
79 0 : let path_string = path
80 0 : .get_path()
81 0 : .as_str()
82 0 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
83 0 : match &self.prefix_in_container {
84 0 : Some(prefix) => {
85 0 : if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
86 0 : prefix.clone() + path_string
87 : } else {
88 0 : format!("{prefix}{REMOTE_STORAGE_PREFIX_SEPARATOR}{path_string}")
89 : }
90 : }
91 0 : None => path_string.to_string(),
92 : }
93 0 : }
94 :
95 0 : fn name_to_relative_path(&self, key: &str) -> RemotePath {
96 0 : let relative_path =
97 0 : match key.strip_prefix(self.prefix_in_container.as_deref().unwrap_or_default()) {
98 0 : Some(stripped) => stripped,
99 : // we rely on Azure to return properly prefixed paths
100 : // for requests with a certain prefix
101 0 : None => panic!(
102 0 : "Key {key} does not start with container prefix {:?}",
103 0 : self.prefix_in_container
104 0 : ),
105 : };
106 0 : RemotePath(
107 0 : relative_path
108 0 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
109 0 : .collect(),
110 0 : )
111 0 : }
112 :
113 0 : async fn download_for_builder(
114 0 : &self,
115 0 : metadata: StorageMetadata,
116 0 : builder: GetBlobBuilder,
117 0 : ) -> Result<Download, DownloadError> {
118 0 : let mut response = builder.into_stream();
119 0 :
120 0 : // TODO give proper streaming response instead of buffering into RAM
121 0 : // https://github.com/neondatabase/neon/issues/5563
122 0 : let mut buf = Vec::new();
123 0 : while let Some(part) = response.next().await {
124 0 : let part = match part {
125 0 : Ok(l) => l,
126 0 : Err(e) => {
127 0 : return Err(if let Some(http_err) = e.as_http_error() {
128 0 : match http_err.status() {
129 0 : StatusCode::NotFound => DownloadError::NotFound,
130 : StatusCode::BadRequest => {
131 0 : DownloadError::BadInput(anyhow::Error::new(e))
132 : }
133 0 : _ => DownloadError::Other(anyhow::Error::new(e)),
134 : }
135 : } else {
136 0 : DownloadError::Other(e.into())
137 : });
138 : }
139 : };
140 0 : let data = part
141 0 : .data
142 0 : .collect()
143 0 : .await
144 0 : .map_err(|e| DownloadError::Other(e.into()))?;
145 0 : buf.extend_from_slice(&data.slice(..));
146 : }
147 0 : Ok(Download {
148 0 : download_stream: Box::pin(Cursor::new(buf)),
149 0 : metadata: Some(metadata),
150 0 : })
151 0 : }
152 : // TODO get rid of this function once we have metadata included in the response
153 : // https://github.com/Azure/azure-sdk-for-rust/issues/1439
154 0 : async fn get_metadata(
155 0 : &self,
156 0 : blob_client: &BlobClient,
157 0 : ) -> Result<StorageMetadata, DownloadError> {
158 0 : let builder = blob_client.get_metadata();
159 0 :
160 0 : match builder.into_future().await {
161 0 : Ok(r) => {
162 0 : let mut map = HashMap::new();
163 :
164 0 : for md in r.metadata.iter() {
165 0 : map.insert(
166 0 : md.name().as_str().to_string(),
167 0 : md.value().as_str().to_string(),
168 0 : );
169 0 : }
170 0 : Ok(StorageMetadata(map))
171 : }
172 0 : Err(e) => {
173 0 : return Err(if let Some(http_err) = e.as_http_error() {
174 0 : match http_err.status() {
175 0 : StatusCode::NotFound => DownloadError::NotFound,
176 0 : StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(e)),
177 0 : _ => DownloadError::Other(anyhow::Error::new(e)),
178 : }
179 : } else {
180 0 : DownloadError::Other(e.into())
181 : });
182 : }
183 : }
184 0 : }
185 :
186 0 : async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> {
187 0 : self.concurrency_limiter
188 0 : .acquire(kind)
189 0 : .await
190 0 : .expect("semaphore is never closed")
191 0 : }
192 : }
193 :
194 0 : fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
195 0 : let mut res = Metadata::new();
196 0 : for (k, v) in metadata.0.into_iter() {
197 0 : res.insert(k, v);
198 0 : }
199 0 : res
200 0 : }
201 :
202 : #[async_trait::async_trait]
203 : impl RemoteStorage for AzureBlobStorage {
204 0 : async fn list_prefixes(
205 0 : &self,
206 0 : prefix: Option<&RemotePath>,
207 0 : ) -> Result<Vec<RemotePath>, DownloadError> {
208 : // get the passed prefix or if it is not set use prefix_in_bucket value
209 0 : let list_prefix = prefix
210 0 : .map(|p| self.relative_path_to_name(p))
211 0 : .or_else(|| self.prefix_in_container.clone())
212 0 : .map(|mut p| {
213 0 : // required to end with a separator
214 0 : // otherwise request will return only the entry of a prefix
215 0 : if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
216 0 : p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
217 0 : }
218 0 : p
219 0 : });
220 0 :
221 0 : let mut builder = self
222 0 : .client
223 0 : .list_blobs()
224 0 : .delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
225 :
226 0 : if let Some(prefix) = list_prefix {
227 0 : builder = builder.prefix(Cow::from(prefix.to_owned()));
228 0 : }
229 :
230 0 : if let Some(limit) = self.max_keys_per_list_response {
231 0 : builder = builder.max_results(MaxResults::new(limit));
232 0 : }
233 :
234 0 : let mut response = builder.into_stream();
235 0 : let mut res = Vec::new();
236 0 : while let Some(l) = response.next().await {
237 0 : let entry = match l {
238 0 : Ok(l) => l,
239 0 : Err(e) => {
240 0 : return Err(if let Some(http_err) = e.as_http_error() {
241 0 : match http_err.status() {
242 0 : StatusCode::NotFound => DownloadError::NotFound,
243 : StatusCode::BadRequest => {
244 0 : DownloadError::BadInput(anyhow::Error::new(e))
245 : }
246 0 : _ => DownloadError::Other(anyhow::Error::new(e)),
247 : }
248 : } else {
249 0 : DownloadError::Other(e.into())
250 : });
251 : }
252 : };
253 0 : let name_iter = entry
254 0 : .blobs
255 0 : .prefixes()
256 0 : .map(|prefix| self.name_to_relative_path(&prefix.name));
257 0 : res.extend(name_iter);
258 : }
259 0 : Ok(res)
260 0 : }
261 :
262 0 : async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
263 0 : let folder_name = folder
264 0 : .map(|p| self.relative_path_to_name(p))
265 0 : .or_else(|| self.prefix_in_container.clone());
266 0 :
267 0 : let mut builder = self.client.list_blobs();
268 :
269 0 : if let Some(folder_name) = folder_name {
270 0 : builder = builder.prefix(Cow::from(folder_name.to_owned()));
271 0 : }
272 :
273 0 : if let Some(limit) = self.max_keys_per_list_response {
274 0 : builder = builder.max_results(MaxResults::new(limit));
275 0 : }
276 :
277 0 : let mut response = builder.into_stream();
278 0 : let mut res = Vec::new();
279 0 : while let Some(l) = response.next().await {
280 0 : let entry = l.map_err(anyhow::Error::new)?;
281 0 : let name_iter = entry
282 0 : .blobs
283 0 : .blobs()
284 0 : .map(|bl| self.name_to_relative_path(&bl.name));
285 0 : res.extend(name_iter);
286 : }
287 0 : Ok(res)
288 0 : }
289 :
290 0 : async fn upload(
291 0 : &self,
292 0 : mut from: impl AsyncRead + Unpin + Send + Sync + 'static,
293 0 : data_size_bytes: usize,
294 0 : to: &RemotePath,
295 0 : metadata: Option<StorageMetadata>,
296 0 : ) -> anyhow::Result<()> {
297 0 : let _permit = self.permit(RequestKind::Put).await;
298 0 : let blob_client = self.client.blob_client(self.relative_path_to_name(to));
299 0 :
300 0 : // TODO FIX THIS UGLY HACK and don't buffer the entire object
301 0 : // into RAM here, but use the streaming interface. For that,
302 0 : // we'd have to change the interface though...
303 0 : // https://github.com/neondatabase/neon/issues/5563
304 0 : let mut buf = Vec::with_capacity(data_size_bytes);
305 0 : tokio::io::copy(&mut from, &mut buf).await?;
306 0 : let body = azure_core::Body::Bytes(buf.into());
307 0 :
308 0 : let mut builder = blob_client.put_block_blob(body);
309 :
310 0 : if let Some(metadata) = metadata {
311 0 : builder = builder.metadata(to_azure_metadata(metadata));
312 0 : }
313 :
314 0 : let _response = builder.into_future().await?;
315 :
316 0 : Ok(())
317 0 : }
318 :
319 0 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
320 0 : let _permit = self.permit(RequestKind::Get).await;
321 0 : let blob_client = self.client.blob_client(self.relative_path_to_name(from));
322 :
323 0 : let metadata = self.get_metadata(&blob_client).await?;
324 :
325 0 : let builder = blob_client.get();
326 0 :
327 0 : self.download_for_builder(metadata, builder).await
328 0 : }
329 :
330 0 : async fn download_byte_range(
331 0 : &self,
332 0 : from: &RemotePath,
333 0 : start_inclusive: u64,
334 0 : end_exclusive: Option<u64>,
335 0 : ) -> Result<Download, DownloadError> {
336 0 : let _permit = self.permit(RequestKind::Get).await;
337 0 : let blob_client = self.client.blob_client(self.relative_path_to_name(from));
338 :
339 0 : let metadata = self.get_metadata(&blob_client).await?;
340 :
341 0 : let mut builder = blob_client.get();
342 :
343 0 : if let Some(end_exclusive) = end_exclusive {
344 0 : builder = builder.range(Range::new(start_inclusive, end_exclusive));
345 0 : } else {
346 0 : // Open ranges are not supported by the SDK so we work around
347 0 : // by setting the upper limit extremely high (but high enough
348 0 : // to still be representable by signed 64 bit integers).
349 0 : // TODO remove workaround once the SDK adds open range support
350 0 : // https://github.com/Azure/azure-sdk-for-rust/issues/1438
351 0 : let end_exclusive = u64::MAX / 4;
352 0 : builder = builder.range(Range::new(start_inclusive, end_exclusive));
353 0 : }
354 :
355 0 : self.download_for_builder(metadata, builder).await
356 0 : }
357 :
358 0 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
359 0 : let _permit = self.permit(RequestKind::Delete).await;
360 0 : let blob_client = self.client.blob_client(self.relative_path_to_name(path));
361 0 :
362 0 : let builder = blob_client.delete();
363 0 :
364 0 : match builder.into_future().await {
365 0 : Ok(_response) => Ok(()),
366 0 : Err(e) => {
367 0 : if let Some(http_err) = e.as_http_error() {
368 0 : if http_err.status() == StatusCode::NotFound {
369 0 : return Ok(());
370 0 : }
371 0 : }
372 0 : Err(anyhow::Error::new(e))
373 : }
374 : }
375 0 : }
376 :
377 0 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
378 : // Permit is already obtained by inner delete function
379 :
380 : // TODO batch requests are also not supported by the SDK
381 : // https://github.com/Azure/azure-sdk-for-rust/issues/1068
382 : // https://github.com/Azure/azure-sdk-for-rust/issues/1249
383 0 : for path in paths {
384 0 : self.delete(path).await?;
385 : }
386 0 : Ok(())
387 0 : }
388 : }
|