Line data Source code
1 : //! Azure Blob Storage wrapper
2 :
3 : use std::borrow::Cow;
4 : use std::collections::HashMap;
5 : use std::env;
6 : use std::fmt::Display;
7 : use std::io;
8 : use std::num::NonZeroU32;
9 : use std::pin::Pin;
10 : use std::str::FromStr;
11 : use std::sync::Arc;
12 : use std::time::Duration;
13 : use std::time::SystemTime;
14 :
15 : use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
16 : use anyhow::Result;
17 : use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
18 : use azure_core::{Continuable, RetryOptions};
19 : use azure_identity::DefaultAzureCredential;
20 : use azure_storage::StorageCredentials;
21 : use azure_storage_blobs::blob::CopyStatus;
22 : use azure_storage_blobs::prelude::ClientBuilder;
23 : use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient};
24 : use bytes::Bytes;
25 : use futures::future::Either;
26 : use futures::stream::Stream;
27 : use futures_util::StreamExt;
28 : use futures_util::TryStreamExt;
29 : use http_types::{StatusCode, Url};
30 : use scopeguard::ScopeGuard;
31 : use tokio_util::sync::CancellationToken;
32 : use tracing::debug;
33 : use utils::backoff;
34 :
35 : use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
36 : use crate::{
37 : config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError,
38 : DownloadOpts, Listing, ListingMode, ListingObject, RemotePath, RemoteStorage, StorageMetadata,
39 : TimeTravelError, TimeoutOrCancel,
40 : };
41 :
42 : pub struct AzureBlobStorage {
43 : client: ContainerClient,
44 : container_name: String,
45 : prefix_in_container: Option<String>,
46 : max_keys_per_list_response: Option<NonZeroU32>,
47 : concurrency_limiter: ConcurrencyLimiter,
48 : // Per-request timeout. Accessible for tests.
49 : pub timeout: Duration,
50 : }
51 :
52 : impl AzureBlobStorage {
53 10 : pub fn new(azure_config: &AzureConfig, timeout: Duration) -> Result<Self> {
54 10 : debug!(
55 0 : "Creating azure remote storage for azure container {}",
56 : azure_config.container_name
57 : );
58 :
59 : // Use the storage account from the config by default, fall back to env var if not present.
60 10 : let account = azure_config.storage_account.clone().unwrap_or_else(|| {
61 10 : env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT")
62 10 : });
63 :
64 : // If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
65 : // otherwise try the token based credentials.
66 10 : let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
67 10 : StorageCredentials::access_key(account.clone(), access_key)
68 : } else {
69 0 : let token_credential = DefaultAzureCredential::default();
70 0 : StorageCredentials::token_credential(Arc::new(token_credential))
71 : };
72 :
73 : // we have an outer retry
74 10 : let builder = ClientBuilder::new(account, credentials).retry(RetryOptions::none());
75 10 :
76 10 : let client = builder.container_client(azure_config.container_name.to_owned());
77 :
78 10 : let max_keys_per_list_response =
79 10 : if let Some(limit) = azure_config.max_keys_per_list_response {
80 : Some(
81 4 : NonZeroU32::new(limit as u32)
82 4 : .ok_or_else(|| anyhow::anyhow!("max_keys_per_list_response can't be 0"))?,
83 : )
84 : } else {
85 6 : None
86 : };
87 :
88 10 : Ok(AzureBlobStorage {
89 10 : client,
90 10 : container_name: azure_config.container_name.to_owned(),
91 10 : prefix_in_container: azure_config.prefix_in_container.to_owned(),
92 10 : max_keys_per_list_response,
93 10 : concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
94 10 : timeout,
95 10 : })
96 10 : }
97 :
98 237 : pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
99 237 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
100 237 : let path_string = path
101 237 : .get_path()
102 237 : .as_str()
103 237 : .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
104 237 : match &self.prefix_in_container {
105 237 : Some(prefix) => {
106 237 : if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
107 237 : prefix.clone() + path_string
108 : } else {
109 0 : format!("{prefix}{REMOTE_STORAGE_PREFIX_SEPARATOR}{path_string}")
110 : }
111 : }
112 0 : None => path_string.to_string(),
113 : }
114 237 : }
115 :
116 246 : fn name_to_relative_path(&self, key: &str) -> RemotePath {
117 246 : let relative_path =
118 246 : match key.strip_prefix(self.prefix_in_container.as_deref().unwrap_or_default()) {
119 246 : Some(stripped) => stripped,
120 : // we rely on Azure to return properly prefixed paths
121 : // for requests with a certain prefix
122 0 : None => panic!(
123 0 : "Key {key} does not start with container prefix {:?}",
124 0 : self.prefix_in_container
125 0 : ),
126 : };
127 246 : RemotePath(
128 246 : relative_path
129 246 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
130 246 : .collect(),
131 246 : )
132 246 : }
133 :
134 11 : async fn download_for_builder(
135 11 : &self,
136 11 : builder: GetBlobBuilder,
137 11 : cancel: &CancellationToken,
138 11 : ) -> Result<Download, DownloadError> {
139 11 : let kind = RequestKind::Get;
140 :
141 11 : let _permit = self.permit(kind, cancel).await?;
142 11 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
143 11 : let cancel_or_timeout_ = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
144 11 :
145 11 : let mut etag = None;
146 11 : let mut last_modified = None;
147 11 : let mut metadata = HashMap::new();
148 11 :
149 11 : let started_at = start_measuring_requests(kind);
150 11 :
151 11 : let download = async {
152 11 : let response = builder
153 11 : // convert to concrete Pageable
154 11 : .into_stream()
155 11 : // convert to TryStream
156 11 : .into_stream()
157 11 : .map_err(to_download_error);
158 11 :
159 11 : // apply per request timeout
160 11 : let response = tokio_stream::StreamExt::timeout(response, self.timeout);
161 11 :
162 11 : // flatten
163 11 : let response = response.map(|res| match res {
164 11 : Ok(res) => res,
165 0 : Err(_elapsed) => Err(DownloadError::Timeout),
166 11 : });
167 11 :
168 11 : let mut response = Box::pin(response);
169 :
170 56 : let Some(part) = response.next().await else {
171 0 : return Err(DownloadError::Other(anyhow::anyhow!(
172 0 : "Azure GET response contained no response body"
173 0 : )));
174 : };
175 11 : let part = part?;
176 9 : if etag.is_none() {
177 9 : etag = Some(part.blob.properties.etag);
178 9 : }
179 9 : if last_modified.is_none() {
180 9 : last_modified = Some(part.blob.properties.last_modified.into());
181 9 : }
182 9 : if let Some(blob_meta) = part.blob.metadata {
183 0 : metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned())));
184 9 : }
185 :
186 : // unwrap safety: if these were None, bufs would be empty and we would have returned an error already
187 9 : let etag = etag.unwrap();
188 9 : let last_modified = last_modified.unwrap();
189 9 :
190 9 : let tail_stream = response
191 9 : .map(|part| match part {
192 0 : Ok(part) => Either::Left(part.data.map(|r| r.map_err(io::Error::other))),
193 0 : Err(e) => {
194 0 : Either::Right(futures::stream::once(async { Err(io::Error::other(e)) }))
195 : }
196 9 : })
197 9 : .flatten();
198 9 : let stream = part
199 9 : .data
200 9 : .map(|r| r.map_err(io::Error::other))
201 9 : .chain(sync_wrapper::SyncStream::new(tail_stream));
202 9 : //.chain(SyncStream::from_pin(Box::pin(tail_stream)));
203 9 :
204 9 : let download_stream = crate::support::DownloadStream::new(cancel_or_timeout_, stream);
205 9 :
206 9 : Ok(Download {
207 9 : download_stream: Box::pin(download_stream),
208 9 : etag,
209 9 : last_modified,
210 9 : metadata: Some(StorageMetadata(metadata)),
211 9 : })
212 11 : };
213 :
214 11 : let download = tokio::select! {
215 11 : bufs = download => bufs,
216 11 : cancel_or_timeout = cancel_or_timeout => match cancel_or_timeout {
217 0 : TimeoutOrCancel::Timeout => return Err(DownloadError::Timeout),
218 0 : TimeoutOrCancel::Cancel => return Err(DownloadError::Cancelled),
219 : },
220 : };
221 11 : let started_at = ScopeGuard::into_inner(started_at);
222 11 : let outcome = match &download {
223 9 : Ok(_) => AttemptOutcome::Ok,
224 2 : Err(_) => AttemptOutcome::Err,
225 : };
226 11 : crate::metrics::BUCKET_METRICS
227 11 : .req_seconds
228 11 : .observe_elapsed(kind, outcome, started_at);
229 11 : download
230 11 : }
231 :
232 227 : async fn permit(
233 227 : &self,
234 227 : kind: RequestKind,
235 227 : cancel: &CancellationToken,
236 227 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
237 227 : let acquire = self.concurrency_limiter.acquire(kind);
238 227 :
239 227 : tokio::select! {
240 227 : permit = acquire => Ok(permit.expect("never closed")),
241 227 : _ = cancel.cancelled() => Err(Cancelled),
242 : }
243 227 : }
244 :
245 0 : pub fn container_name(&self) -> &str {
246 0 : &self.container_name
247 0 : }
248 : }
249 :
250 0 : fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
251 0 : let mut res = Metadata::new();
252 0 : for (k, v) in metadata.0.into_iter() {
253 0 : res.insert(k, v);
254 0 : }
255 0 : res
256 0 : }
257 :
258 3 : fn to_download_error(error: azure_core::Error) -> DownloadError {
259 3 : if let Some(http_err) = error.as_http_error() {
260 3 : match http_err.status() {
261 1 : StatusCode::NotFound => DownloadError::NotFound,
262 2 : StatusCode::NotModified => DownloadError::Unmodified,
263 0 : StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
264 0 : _ => DownloadError::Other(anyhow::Error::new(error)),
265 : }
266 : } else {
267 0 : DownloadError::Other(error.into())
268 : }
269 3 : }
270 :
271 : impl RemoteStorage for AzureBlobStorage {
272 26 : fn list_streaming(
273 26 : &self,
274 26 : prefix: Option<&RemotePath>,
275 26 : mode: ListingMode,
276 26 : max_keys: Option<NonZeroU32>,
277 26 : cancel: &CancellationToken,
278 26 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
279 26 : // get the passed prefix or if it is not set use prefix_in_bucket value
280 26 : let list_prefix = prefix
281 26 : .map(|p| self.relative_path_to_name(p))
282 26 : .or_else(|| self.prefix_in_container.clone())
283 26 : .map(|mut p| {
284 : // required to end with a separator
285 : // otherwise request will return only the entry of a prefix
286 26 : if matches!(mode, ListingMode::WithDelimiter)
287 6 : && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
288 3 : {
289 3 : p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
290 23 : }
291 26 : p
292 26 : });
293 26 :
294 26 : async_stream::stream! {
295 26 : let _permit = self.permit(RequestKind::List, cancel).await?;
296 26 :
297 26 : let mut builder = self.client.list_blobs();
298 26 :
299 26 : if let ListingMode::WithDelimiter = mode {
300 26 : builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
301 26 : }
302 26 :
303 26 : if let Some(prefix) = list_prefix {
304 26 : builder = builder.prefix(Cow::from(prefix.to_owned()));
305 26 : }
306 26 :
307 26 : if let Some(limit) = self.max_keys_per_list_response {
308 26 : builder = builder.max_results(MaxResults::new(limit));
309 26 : }
310 26 :
311 26 : let mut next_marker = None;
312 26 :
313 26 : 'outer: loop {
314 26 : let mut builder = builder.clone();
315 26 : if let Some(marker) = next_marker.clone() {
316 26 : builder = builder.marker(marker);
317 26 : }
318 26 : let response = builder.into_stream();
319 26 : let response = response.into_stream().map_err(to_download_error);
320 26 : let response = tokio_stream::StreamExt::timeout(response, self.timeout);
321 45 : let response = response.map(|res| match res {
322 45 : Ok(res) => res,
323 26 : Err(_elapsed) => Err(DownloadError::Timeout),
324 45 : });
325 26 :
326 26 : let mut response = std::pin::pin!(response);
327 26 :
328 26 : let mut max_keys = max_keys.map(|mk| mk.get());
329 26 : let next_item = tokio::select! {
330 26 : op = response.next() => Ok(op),
331 26 : _ = cancel.cancelled() => Err(DownloadError::Cancelled),
332 26 : }?;
333 26 : let Some(entry) = next_item else {
334 26 : // The list is complete, so yield it.
335 26 : break;
336 26 : };
337 26 :
338 26 : let mut res = Listing::default();
339 26 : let entry = match entry {
340 26 : Ok(entry) => entry,
341 26 : Err(e) => {
342 26 : // The error is potentially retryable, so we must rewind the loop after yielding.
343 26 : yield Err(e);
344 26 : continue;
345 26 : }
346 26 : };
347 26 : next_marker = entry.continuation();
348 26 : let prefix_iter = entry
349 26 : .blobs
350 26 : .prefixes()
351 48 : .map(|prefix| self.name_to_relative_path(&prefix.name));
352 26 : res.prefixes.extend(prefix_iter);
353 26 :
354 26 : let blob_iter = entry
355 26 : .blobs
356 26 : .blobs()
357 198 : .map(|k| ListingObject{
358 198 : key: self.name_to_relative_path(&k.name),
359 198 : last_modified: k.properties.last_modified.into(),
360 198 : size: k.properties.content_length,
361 198 : }
362 26 : );
363 26 :
364 26 : for key in blob_iter {
365 26 : res.keys.push(key);
366 26 :
367 26 : if let Some(mut mk) = max_keys {
368 26 : assert!(mk > 0);
369 26 : mk -= 1;
370 26 : if mk == 0 {
371 26 : yield Ok(res); // limit reached
372 26 : break 'outer;
373 26 : }
374 26 : max_keys = Some(mk);
375 26 : }
376 26 : }
377 26 : yield Ok(res);
378 26 :
379 26 : // We are done here
380 26 : if next_marker.is_none() {
381 26 : break;
382 26 : }
383 26 : }
384 26 : }
385 26 : }
386 :
387 3 : async fn head_object(
388 3 : &self,
389 3 : key: &RemotePath,
390 3 : cancel: &CancellationToken,
391 3 : ) -> Result<ListingObject, DownloadError> {
392 3 : let kind = RequestKind::Head;
393 3 : let _permit = self.permit(kind, cancel).await?;
394 :
395 3 : let started_at = start_measuring_requests(kind);
396 3 :
397 3 : let blob_client = self.client.blob_client(self.relative_path_to_name(key));
398 3 : let properties_future = blob_client.get_properties().into_future();
399 3 :
400 3 : let properties_future = tokio::time::timeout(self.timeout, properties_future);
401 :
402 3 : let res = tokio::select! {
403 3 : res = properties_future => res,
404 3 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
405 : };
406 :
407 3 : if let Ok(inner) = &res {
408 3 : // do not incl. timeouts as errors in metrics but cancellations
409 3 : let started_at = ScopeGuard::into_inner(started_at);
410 3 : crate::metrics::BUCKET_METRICS
411 3 : .req_seconds
412 3 : .observe_elapsed(kind, inner, started_at);
413 3 : }
414 :
415 3 : let data = match res {
416 2 : Ok(Ok(data)) => Ok(data),
417 1 : Ok(Err(sdk)) => Err(to_download_error(sdk)),
418 0 : Err(_timeout) => Err(DownloadError::Timeout),
419 1 : }?;
420 :
421 2 : let properties = data.blob.properties;
422 2 : Ok(ListingObject {
423 2 : key: key.to_owned(),
424 2 : last_modified: SystemTime::from(properties.last_modified),
425 2 : size: properties.content_length,
426 2 : })
427 3 : }
428 :
429 93 : async fn upload(
430 93 : &self,
431 93 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
432 93 : data_size_bytes: usize,
433 93 : to: &RemotePath,
434 93 : metadata: Option<StorageMetadata>,
435 93 : cancel: &CancellationToken,
436 93 : ) -> anyhow::Result<()> {
437 93 : let kind = RequestKind::Put;
438 93 : let _permit = self.permit(kind, cancel).await?;
439 :
440 93 : let started_at = start_measuring_requests(kind);
441 93 :
442 93 : let op = async {
443 93 : let blob_client = self.client.blob_client(self.relative_path_to_name(to));
444 93 :
445 93 : let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
446 93 : Box::pin(from);
447 93 :
448 93 : let from = NonSeekableStream::new(from, data_size_bytes);
449 93 :
450 93 : let body = azure_core::Body::SeekableStream(Box::new(from));
451 93 :
452 93 : let mut builder = blob_client.put_block_blob(body);
453 :
454 93 : if let Some(metadata) = metadata {
455 0 : builder = builder.metadata(to_azure_metadata(metadata));
456 93 : }
457 :
458 93 : let fut = builder.into_future();
459 93 : let fut = tokio::time::timeout(self.timeout, fut);
460 93 :
461 557 : match fut.await {
462 93 : Ok(Ok(_response)) => Ok(()),
463 0 : Ok(Err(azure)) => Err(azure.into()),
464 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
465 : }
466 93 : };
467 :
468 93 : let res = tokio::select! {
469 93 : res = op => res,
470 93 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
471 : };
472 :
473 93 : let outcome = match res {
474 93 : Ok(_) => AttemptOutcome::Ok,
475 0 : Err(_) => AttemptOutcome::Err,
476 : };
477 93 : let started_at = ScopeGuard::into_inner(started_at);
478 93 : crate::metrics::BUCKET_METRICS
479 93 : .req_seconds
480 93 : .observe_elapsed(kind, outcome, started_at);
481 93 :
482 93 : res
483 93 : }
484 :
485 11 : async fn download(
486 11 : &self,
487 11 : from: &RemotePath,
488 11 : opts: &DownloadOpts,
489 11 : cancel: &CancellationToken,
490 11 : ) -> Result<Download, DownloadError> {
491 11 : let blob_client = self.client.blob_client(self.relative_path_to_name(from));
492 11 :
493 11 : let mut builder = blob_client.get();
494 :
495 11 : if let Some(ref etag) = opts.etag {
496 3 : builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
497 8 : }
498 :
499 11 : if let Some((start, end)) = opts.byte_range() {
500 5 : builder = builder.range(match end {
501 3 : Some(end) => Range::Range(start..end),
502 2 : None => Range::RangeFrom(start..),
503 : });
504 6 : }
505 :
506 56 : self.download_for_builder(builder, cancel).await
507 11 : }
508 :
509 86 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
510 86 : self.delete_objects(std::array::from_ref(path), cancel)
511 437 : .await
512 86 : }
513 :
514 93 : async fn delete_objects<'a>(
515 93 : &self,
516 93 : paths: &'a [RemotePath],
517 93 : cancel: &CancellationToken,
518 93 : ) -> anyhow::Result<()> {
519 93 : let kind = RequestKind::Delete;
520 93 : let _permit = self.permit(kind, cancel).await?;
521 93 : let started_at = start_measuring_requests(kind);
522 93 :
523 93 : let op = async {
524 : // TODO batch requests are not supported by the SDK
525 : // https://github.com/Azure/azure-sdk-for-rust/issues/1068
526 205 : for path in paths {
527 112 : #[derive(Debug)]
528 112 : enum AzureOrTimeout {
529 : AzureError(azure_core::Error),
530 : Timeout,
531 : Cancel,
532 112 : }
533 112 : impl Display for AzureOrTimeout {
534 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
535 0 : write!(f, "{self:?}")
536 0 : }
537 : }
538 112 : let warn_threshold = 3;
539 112 : let max_retries = 5;
540 112 : backoff::retry(
541 112 : || async {
542 112 : let blob_client = self.client.blob_client(self.relative_path_to_name(path));
543 112 :
544 112 : let request = blob_client.delete().into_future();
545 :
546 567 : let res = tokio::time::timeout(self.timeout, request).await;
547 :
548 112 : match res {
549 90 : Ok(Ok(_v)) => Ok(()),
550 22 : Ok(Err(azure_err)) => {
551 22 : if let Some(http_err) = azure_err.as_http_error() {
552 22 : if http_err.status() == StatusCode::NotFound {
553 22 : return Ok(());
554 0 : }
555 0 : }
556 0 : Err(AzureOrTimeout::AzureError(azure_err))
557 : }
558 0 : Err(_elapsed) => Err(AzureOrTimeout::Timeout),
559 : }
560 224 : },
561 112 : |err| match err {
562 0 : AzureOrTimeout::AzureError(_) | AzureOrTimeout::Timeout => false,
563 0 : AzureOrTimeout::Cancel => true,
564 112 : },
565 112 : warn_threshold,
566 112 : max_retries,
567 112 : "deleting remote object",
568 112 : cancel,
569 112 : )
570 567 : .await
571 112 : .ok_or_else(|| AzureOrTimeout::Cancel)
572 112 : .and_then(|x| x)
573 112 : .map_err(|e| match e {
574 0 : AzureOrTimeout::AzureError(err) => anyhow::Error::from(err),
575 0 : AzureOrTimeout::Timeout => TimeoutOrCancel::Timeout.into(),
576 0 : AzureOrTimeout::Cancel => TimeoutOrCancel::Cancel.into(),
577 112 : })?;
578 : }
579 93 : Ok(())
580 93 : };
581 :
582 93 : let res = tokio::select! {
583 93 : res = op => res,
584 93 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
585 : };
586 :
587 93 : let started_at = ScopeGuard::into_inner(started_at);
588 93 : crate::metrics::BUCKET_METRICS
589 93 : .req_seconds
590 93 : .observe_elapsed(kind, &res, started_at);
591 93 : res
592 93 : }
593 :
594 1 : async fn copy(
595 1 : &self,
596 1 : from: &RemotePath,
597 1 : to: &RemotePath,
598 1 : cancel: &CancellationToken,
599 1 : ) -> anyhow::Result<()> {
600 1 : let kind = RequestKind::Copy;
601 1 : let _permit = self.permit(kind, cancel).await?;
602 1 : let started_at = start_measuring_requests(kind);
603 1 :
604 1 : let timeout = tokio::time::sleep(self.timeout);
605 1 :
606 1 : let mut copy_status = None;
607 1 :
608 1 : let op = async {
609 1 : let blob_client = self.client.blob_client(self.relative_path_to_name(to));
610 :
611 1 : let source_url = format!(
612 1 : "{}/{}",
613 1 : self.client.url()?,
614 1 : self.relative_path_to_name(from)
615 : );
616 :
617 1 : let builder = blob_client.copy(Url::from_str(&source_url)?);
618 1 : let copy = builder.into_future();
619 :
620 5 : let result = copy.await?;
621 :
622 1 : copy_status = Some(result.copy_status);
623 : loop {
624 1 : match copy_status.as_ref().expect("we always set it to Some") {
625 : CopyStatus::Aborted => {
626 0 : anyhow::bail!("Received abort for copy from {from} to {to}.");
627 : }
628 : CopyStatus::Failed => {
629 0 : anyhow::bail!("Received failure response for copy from {from} to {to}.");
630 : }
631 1 : CopyStatus::Success => return Ok(()),
632 0 : CopyStatus::Pending => (),
633 0 : }
634 0 : // The copy is taking longer. Waiting a second and then re-trying.
635 0 : // TODO estimate time based on copy_progress and adjust time based on that
636 0 : tokio::time::sleep(Duration::from_millis(1000)).await;
637 0 : let properties = blob_client.get_properties().into_future().await?;
638 0 : let Some(status) = properties.blob.properties.copy_status else {
639 0 : tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
640 0 : return Ok(());
641 : };
642 0 : copy_status = Some(status);
643 : }
644 1 : };
645 :
646 1 : let res = tokio::select! {
647 1 : res = op => res,
648 1 : _ = cancel.cancelled() => return Err(anyhow::Error::new(TimeoutOrCancel::Cancel)),
649 1 : _ = timeout => {
650 0 : let e = anyhow::Error::new(TimeoutOrCancel::Timeout);
651 0 : let e = e.context(format!("Timeout, last status: {copy_status:?}"));
652 0 : Err(e)
653 : },
654 : };
655 :
656 1 : let started_at = ScopeGuard::into_inner(started_at);
657 1 : crate::metrics::BUCKET_METRICS
658 1 : .req_seconds
659 1 : .observe_elapsed(kind, &res, started_at);
660 1 : res
661 1 : }
662 :
663 0 : async fn time_travel_recover(
664 0 : &self,
665 0 : _prefix: Option<&RemotePath>,
666 0 : _timestamp: SystemTime,
667 0 : _done_if_after: SystemTime,
668 0 : _cancel: &CancellationToken,
669 0 : ) -> Result<(), TimeTravelError> {
670 0 : // TODO use Azure point in time recovery feature for this
671 0 : // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
672 0 : Err(TimeTravelError::Unimplemented)
673 0 : }
674 : }
675 :
676 : pin_project_lite::pin_project! {
677 : /// Hack to work around not being able to stream once with azure sdk.
678 : ///
679 : /// Azure sdk clones streams around with the assumption that they are like
680 : /// `Arc<tokio::fs::File>` (except not supporting tokio), however our streams are not like
681 : /// that. For example for an `index_part.json` we just have a single chunk of [`Bytes`]
682 : /// representing the whole serialized vec. It could be trivially cloneable and "semi-trivially"
683 : /// seekable, but we can also just re-try the request easier.
684 : #[project = NonSeekableStreamProj]
685 : enum NonSeekableStream<S> {
686 : /// A stream wrappers initial form.
687 : ///
688 : /// Mutex exists to allow moving when cloning. If the sdk changes to do less than 1
689 : /// clone before first request, then this must be changed.
690 : Initial {
691 : inner: std::sync::Mutex<Option<tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>>>,
692 : len: usize,
693 : },
694 : /// The actually readable variant, produced by cloning the Initial variant.
695 : ///
696 : /// The sdk currently always clones once, even without retry policy.
697 : Actual {
698 : #[pin]
699 : inner: tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>,
700 : len: usize,
701 : read_any: bool,
702 : },
703 : /// Most likely unneeded, but left to make life easier, in case more clones are added.
704 : Cloned {
705 : len_was: usize,
706 : }
707 : }
708 : }
709 :
710 : impl<S> NonSeekableStream<S>
711 : where
712 : S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
713 : {
714 93 : fn new(inner: S, len: usize) -> NonSeekableStream<S> {
715 : use tokio_util::compat::TokioAsyncReadCompatExt;
716 :
717 93 : let inner = tokio_util::io::StreamReader::new(inner).compat();
718 93 : let inner = Some(inner);
719 93 : let inner = std::sync::Mutex::new(inner);
720 93 : NonSeekableStream::Initial { inner, len }
721 93 : }
722 : }
723 :
724 : impl<S> std::fmt::Debug for NonSeekableStream<S> {
725 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726 0 : match self {
727 0 : Self::Initial { len, .. } => f.debug_struct("Initial").field("len", len).finish(),
728 0 : Self::Actual { len, .. } => f.debug_struct("Actual").field("len", len).finish(),
729 0 : Self::Cloned { len_was, .. } => f.debug_struct("Cloned").field("len", len_was).finish(),
730 : }
731 0 : }
732 : }
733 :
734 : impl<S> futures::io::AsyncRead for NonSeekableStream<S>
735 : where
736 : S: Stream<Item = std::io::Result<Bytes>>,
737 : {
738 93 : fn poll_read(
739 93 : self: std::pin::Pin<&mut Self>,
740 93 : cx: &mut std::task::Context<'_>,
741 93 : buf: &mut [u8],
742 93 : ) -> std::task::Poll<std::io::Result<usize>> {
743 93 : match self.project() {
744 : NonSeekableStreamProj::Actual {
745 93 : inner, read_any, ..
746 93 : } => {
747 93 : *read_any = true;
748 93 : inner.poll_read(cx, buf)
749 : }
750 : // NonSeekableStream::Initial does not support reading because it is just much easier
751 : // to have the mutex in place where one does not poll the contents, or that's how it
752 : // seemed originally. If there is a version upgrade which changes the cloning, then
753 : // that support needs to be hacked in.
754 : //
755 : // including {self:?} into the message would be useful, but unsure how to unproject.
756 0 : _ => std::task::Poll::Ready(Err(std::io::Error::new(
757 0 : std::io::ErrorKind::Other,
758 0 : "cloned or initial values cannot be read",
759 0 : ))),
760 : }
761 93 : }
762 : }
763 :
764 : impl<S> Clone for NonSeekableStream<S> {
765 : /// Weird clone implementation exists to support the sdk doing cloning before issuing the first
766 : /// request, see type documentation.
767 93 : fn clone(&self) -> Self {
768 : use NonSeekableStream::*;
769 :
770 93 : match self {
771 93 : Initial { inner, len } => {
772 93 : if let Some(inner) = inner.lock().unwrap().take() {
773 93 : Actual {
774 93 : inner,
775 93 : len: *len,
776 93 : read_any: false,
777 93 : }
778 : } else {
779 0 : Self::Cloned { len_was: *len }
780 : }
781 : }
782 0 : Actual { len, .. } => Cloned { len_was: *len },
783 0 : Cloned { len_was } => Cloned { len_was: *len_was },
784 : }
785 93 : }
786 : }
787 :
788 : #[async_trait::async_trait]
789 : impl<S> azure_core::SeekableStream for NonSeekableStream<S>
790 : where
791 : S: Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync + 'static,
792 : {
793 0 : async fn reset(&mut self) -> azure_core::error::Result<()> {
794 : use NonSeekableStream::*;
795 :
796 0 : let msg = match self {
797 0 : Initial { inner, .. } => {
798 0 : if inner.get_mut().unwrap().is_some() {
799 0 : return Ok(());
800 : } else {
801 0 : "reset after first clone is not supported"
802 : }
803 : }
804 0 : Actual { read_any, .. } if !*read_any => return Ok(()),
805 0 : Actual { .. } => "reset after reading is not supported",
806 0 : Cloned { .. } => "reset after second clone is not supported",
807 : };
808 0 : Err(azure_core::error::Error::new(
809 0 : azure_core::error::ErrorKind::Io,
810 0 : std::io::Error::new(std::io::ErrorKind::Other, msg),
811 0 : ))
812 0 : }
813 :
814 : // Note: it is not documented if this should be the total or remaining length, total passes the
815 : // tests.
816 93 : fn len(&self) -> usize {
817 : use NonSeekableStream::*;
818 93 : match self {
819 93 : Initial { len, .. } => *len,
820 0 : Actual { len, .. } => *len,
821 0 : Cloned { len_was, .. } => *len_was,
822 : }
823 93 : }
824 : }
|