Line data Source code
1 : //! Azure Blob Storage wrapper
2 :
3 : use std::borrow::Cow;
4 : use std::collections::HashMap;
5 : use std::fmt::Display;
6 : use std::num::NonZeroU32;
7 : use std::pin::Pin;
8 : use std::str::FromStr;
9 : use std::sync::Arc;
10 : use std::time::{Duration, SystemTime};
11 : use std::{env, io};
12 :
13 : use anyhow::{Context, Result, anyhow};
14 : use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
15 : use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
16 : use azure_storage::StorageCredentials;
17 : use azure_storage_blobs::blob::operations::GetBlobBuilder;
18 : use azure_storage_blobs::blob::{Blob, CopyStatus};
19 : use azure_storage_blobs::container::operations::ListBlobsBuilder;
20 : use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
21 : use bytes::Bytes;
22 : use futures::FutureExt;
23 : use futures::future::Either;
24 : use futures::stream::Stream;
25 : use futures_util::{StreamExt, TryStreamExt};
26 : use http_types::{StatusCode, Url};
27 : use scopeguard::ScopeGuard;
28 : use tokio_util::sync::CancellationToken;
29 : use tracing::debug;
30 : use utils::backoff;
31 : use utils::backoff::exponential_backoff_duration_seconds;
32 :
33 : use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
34 : use crate::config::AzureConfig;
35 : use crate::error::Cancelled;
36 : use crate::metrics::{AttemptOutcome, RequestKind, start_measuring_requests};
37 : use crate::{
38 : ConcurrencyLimiter, Download, DownloadError, DownloadKind, DownloadOpts, Listing, ListingMode,
39 : ListingObject, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
40 : Version, VersionKind,
41 : };
42 :
43 : pub struct AzureBlobStorage {
44 : client: ContainerClient,
45 : container_name: String,
46 : prefix_in_container: Option<String>,
47 : max_keys_per_list_response: Option<NonZeroU32>,
48 : concurrency_limiter: ConcurrencyLimiter,
49 : // Per-request timeout. Accessible for tests.
50 : pub timeout: Duration,
51 :
52 : // Alternative timeout used for metadata objects which are expected to be small
53 : pub small_timeout: Duration,
54 : }
55 :
56 : impl AzureBlobStorage {
57 10 : pub fn new(
58 10 : azure_config: &AzureConfig,
59 10 : timeout: Duration,
60 10 : small_timeout: Duration,
61 10 : ) -> Result<Self> {
62 10 : debug!(
63 0 : "Creating azure remote storage for azure container {}",
64 : azure_config.container_name
65 : );
66 :
67 : // Use the storage account from the config by default, fall back to env var if not present.
68 10 : let account = azure_config.storage_account.clone().unwrap_or_else(|| {
69 10 : env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT")
70 10 : });
71 :
72 : // If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
73 : // otherwise try the token based credentials.
74 10 : let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
75 10 : StorageCredentials::access_key(account.clone(), access_key)
76 : } else {
77 0 : let token_credential = azure_identity::create_default_credential()
78 0 : .context("trying to obtain Azure default credentials")?;
79 0 : StorageCredentials::token_credential(token_credential)
80 : };
81 :
82 10 : let builder = ClientBuilder::new(account, credentials)
83 : // we have an outer retry
84 10 : .retry(RetryOptions::none())
85 : // Customize transport to configure conneciton pooling
86 10 : .transport(TransportOptions::new(Self::reqwest_client(
87 10 : azure_config.conn_pool_size,
88 10 : )));
89 :
90 10 : let client = builder.container_client(azure_config.container_name.to_owned());
91 :
92 10 : let max_keys_per_list_response =
93 10 : if let Some(limit) = azure_config.max_keys_per_list_response {
94 : Some(
95 4 : NonZeroU32::new(limit as u32)
96 4 : .ok_or_else(|| anyhow::anyhow!("max_keys_per_list_response can't be 0"))?,
97 : )
98 : } else {
99 6 : None
100 : };
101 :
102 10 : Ok(AzureBlobStorage {
103 10 : client,
104 10 : container_name: azure_config.container_name.to_owned(),
105 10 : prefix_in_container: azure_config.prefix_in_container.to_owned(),
106 10 : max_keys_per_list_response,
107 10 : concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
108 10 : timeout,
109 10 : small_timeout,
110 10 : })
111 10 : }
112 :
113 10 : fn reqwest_client(conn_pool_size: usize) -> Arc<dyn HttpClient> {
114 10 : let client = reqwest::ClientBuilder::new()
115 10 : .pool_max_idle_per_host(conn_pool_size)
116 10 : .build()
117 10 : .expect("failed to build `reqwest` client");
118 10 : Arc::new(client)
119 10 : }
120 :
121 237 : pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
122 237 : assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
123 237 : let path_string = path.get_path().as_str();
124 237 : match &self.prefix_in_container {
125 237 : Some(prefix) => {
126 237 : if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
127 237 : prefix.clone() + path_string
128 : } else {
129 0 : format!("{prefix}{REMOTE_STORAGE_PREFIX_SEPARATOR}{path_string}")
130 : }
131 : }
132 0 : None => path_string.to_string(),
133 : }
134 237 : }
135 :
136 249 : fn name_to_relative_path(&self, key: &str) -> RemotePath {
137 249 : let relative_path =
138 249 : match key.strip_prefix(self.prefix_in_container.as_deref().unwrap_or_default()) {
139 249 : Some(stripped) => stripped,
140 : // we rely on Azure to return properly prefixed paths
141 : // for requests with a certain prefix
142 0 : None => panic!(
143 0 : "Key {key} does not start with container prefix {:?}",
144 : self.prefix_in_container
145 : ),
146 : };
147 249 : RemotePath(
148 249 : relative_path
149 249 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
150 249 : .collect(),
151 249 : )
152 249 : }
153 :
154 11 : async fn download_for_builder(
155 11 : &self,
156 11 : builder: GetBlobBuilder,
157 11 : timeout: Duration,
158 11 : cancel: &CancellationToken,
159 11 : ) -> Result<Download, DownloadError> {
160 11 : let kind = RequestKind::Get;
161 :
162 11 : let _permit = self.permit(kind, cancel).await?;
163 11 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
164 11 : let cancel_or_timeout_ = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
165 :
166 11 : let mut etag = None;
167 11 : let mut last_modified = None;
168 11 : let mut metadata = HashMap::new();
169 :
170 11 : let started_at = start_measuring_requests(kind);
171 :
172 11 : let download = async {
173 11 : let response = builder
174 : // convert to concrete Pageable
175 11 : .into_stream()
176 : // convert to TryStream
177 11 : .into_stream()
178 11 : .map_err(to_download_error);
179 :
180 : // apply per request timeout
181 11 : let response = tokio_stream::StreamExt::timeout(response, timeout);
182 :
183 : // flatten
184 11 : let response = response.map(|res| match res {
185 11 : Ok(res) => res,
186 0 : Err(_elapsed) => Err(DownloadError::Timeout),
187 11 : });
188 :
189 11 : let mut response = Box::pin(response);
190 :
191 11 : let Some(part) = response.next().await else {
192 0 : return Err(DownloadError::Other(anyhow::anyhow!(
193 0 : "Azure GET response contained no response body"
194 0 : )));
195 : };
196 11 : let part = part?;
197 9 : if etag.is_none() {
198 9 : etag = Some(part.blob.properties.etag);
199 9 : }
200 9 : if last_modified.is_none() {
201 9 : last_modified = Some(part.blob.properties.last_modified.into());
202 9 : }
203 9 : if let Some(blob_meta) = part.blob.metadata {
204 0 : metadata.extend(blob_meta.iter().map(|(k, v)| (k.to_owned(), v.to_owned())));
205 9 : }
206 :
207 : // unwrap safety: if these were None, bufs would be empty and we would have returned an error already
208 9 : let etag = etag.unwrap();
209 9 : let last_modified = last_modified.unwrap();
210 :
211 9 : let tail_stream = response
212 9 : .map(|part| match part {
213 0 : Ok(part) => Either::Left(part.data.map(|r| r.map_err(io::Error::other))),
214 0 : Err(e) => {
215 0 : Either::Right(futures::stream::once(async { Err(io::Error::other(e)) }))
216 : }
217 0 : })
218 9 : .flatten();
219 9 : let stream = part
220 9 : .data
221 9 : .map(|r| r.map_err(io::Error::other))
222 9 : .chain(sync_wrapper::SyncStream::new(tail_stream));
223 : //.chain(SyncStream::from_pin(Box::pin(tail_stream)));
224 :
225 9 : let download_stream = crate::support::DownloadStream::new(cancel_or_timeout_, stream);
226 :
227 9 : Ok(Download {
228 9 : download_stream: Box::pin(download_stream),
229 9 : etag,
230 9 : last_modified,
231 9 : metadata: Some(StorageMetadata(metadata)),
232 9 : })
233 11 : };
234 :
235 11 : let download = tokio::select! {
236 11 : bufs = download => bufs,
237 11 : cancel_or_timeout = cancel_or_timeout => match cancel_or_timeout {
238 0 : TimeoutOrCancel::Timeout => return Err(DownloadError::Timeout),
239 0 : TimeoutOrCancel::Cancel => return Err(DownloadError::Cancelled),
240 : },
241 : };
242 11 : let started_at = ScopeGuard::into_inner(started_at);
243 11 : let outcome = match &download {
244 9 : Ok(_) => AttemptOutcome::Ok,
245 : // At this level in the stack 404 and 304 responses do not indicate an error.
246 : // There's expected cases when a blob may not exist or hasn't been modified since
247 : // the last get (e.g. probing for timeline indices and heatmap downloads).
248 : // Callers should handle errors if they are unexpected.
249 2 : Err(DownloadError::NotFound | DownloadError::Unmodified) => AttemptOutcome::Ok,
250 0 : Err(_) => AttemptOutcome::Err,
251 : };
252 11 : crate::metrics::BUCKET_METRICS
253 11 : .req_seconds
254 11 : .observe_elapsed(kind, outcome, started_at);
255 11 : download
256 11 : }
257 :
258 26 : fn list_streaming_for_fn<T: Default + ListingCollector>(
259 26 : &self,
260 26 : prefix: Option<&RemotePath>,
261 26 : mode: ListingMode,
262 26 : max_keys: Option<NonZeroU32>,
263 26 : cancel: &CancellationToken,
264 26 : request_kind: RequestKind,
265 26 : customize_builder: impl Fn(ListBlobsBuilder) -> ListBlobsBuilder,
266 26 : ) -> impl Stream<Item = Result<T, DownloadError>> {
267 : // get the passed prefix or if it is not set use prefix_in_bucket value
268 26 : let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
269 10 : self.prefix_in_container.clone().map(|mut s| {
270 10 : if !s.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
271 0 : s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
272 10 : }
273 10 : s
274 10 : })
275 10 : });
276 :
277 26 : async_stream::stream! {
278 : let _permit = self.permit(request_kind, cancel).await?;
279 :
280 : let mut builder = self.client.list_blobs();
281 :
282 26 : if let ListingMode::WithDelimiter = mode {
283 : builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
284 : }
285 :
286 26 : if let Some(prefix) = list_prefix {
287 : builder = builder.prefix(Cow::from(prefix.to_owned()));
288 : }
289 :
290 26 : if let Some(limit) = self.max_keys_per_list_response {
291 : builder = builder.max_results(MaxResults::new(limit));
292 : }
293 :
294 : builder = customize_builder(builder);
295 :
296 : let mut next_marker = None;
297 :
298 : let mut timeout_try_cnt = 1;
299 :
300 : 'outer: loop {
301 : let mut builder = builder.clone();
302 : if let Some(marker) = next_marker.clone() {
303 : builder = builder.marker(marker);
304 : }
305 : // Azure Blob Rust SDK does not expose the list blob API directly. Users have to use
306 : // their pageable iterator wrapper that returns all keys as a stream. We want to have
307 : // full control of paging, and therefore we only take the first item from the stream.
308 : let mut response_stream = builder.into_stream();
309 : let response = response_stream.next();
310 : // Timeout mechanism: Azure client will sometimes stuck on a request, but retrying that request
311 : // would immediately succeed. Therefore, we use exponential backoff timeout to retry the request.
312 : // (Usually, exponential backoff is used to determine the sleep time between two retries.) We
313 : // start with 10.0 second timeout, and double the timeout for each failure, up to 5 failures.
314 : // timeout = min(5 * (1.0+1.0)^n, self.timeout).
315 : let this_timeout = (5.0 * exponential_backoff_duration_seconds(timeout_try_cnt, 1.0, self.timeout.as_secs_f64())).min(self.timeout.as_secs_f64());
316 : let response = tokio::time::timeout(Duration::from_secs_f64(this_timeout), response);
317 45 : let response = response.map(|res| {
318 45 : match res {
319 45 : Ok(Some(Ok(res))) => Ok(Some(res)),
320 0 : Ok(Some(Err(e))) => Err(to_download_error(e)),
321 0 : Ok(None) => Ok(None),
322 0 : Err(_elasped) => Err(DownloadError::Timeout),
323 : }
324 45 : });
325 1 : let mut max_keys = max_keys.map(|mk| mk.get());
326 : let next_item = tokio::select! {
327 : op = response => op,
328 : _ = cancel.cancelled() => Err(DownloadError::Cancelled),
329 : };
330 :
331 : if let Err(DownloadError::Timeout) = &next_item {
332 : timeout_try_cnt += 1;
333 : if timeout_try_cnt <= 5 {
334 : continue 'outer;
335 : }
336 : }
337 :
338 : let next_item = match next_item {
339 : Ok(next_item) => next_item,
340 : Err(e) => {
341 : // The error is potentially retryable, so we must rewind the loop after yielding.
342 : yield Err(e);
343 : continue 'outer;
344 : },
345 : };
346 :
347 : // Log a warning if we saw two timeouts in a row before a successful request
348 : if timeout_try_cnt > 2 {
349 : tracing::warn!("Azure Blob Storage list timed out and succeeded after {} tries", timeout_try_cnt);
350 : }
351 : timeout_try_cnt = 1;
352 :
353 : let Some(entry) = next_item else {
354 : // The list is complete, so yield it.
355 : break;
356 : };
357 :
358 : let mut res = T::default();
359 : next_marker = entry.continuation();
360 : let prefix_iter = entry
361 : .blobs
362 : .prefixes()
363 52 : .map(|prefix| self.name_to_relative_path(&prefix.name));
364 : res.add_prefixes(self, prefix_iter);
365 :
366 : let blob_iter = entry
367 : .blobs
368 : .blobs();
369 :
370 : for key in blob_iter {
371 : res.add_blob(self, key);
372 :
373 : if let Some(mut mk) = max_keys {
374 : assert!(mk > 0);
375 : mk -= 1;
376 : if mk == 0 {
377 : yield Ok(res); // limit reached
378 : break 'outer;
379 : }
380 : max_keys = Some(mk);
381 : }
382 : }
383 : yield Ok(res);
384 :
385 : // We are done here
386 : if next_marker.is_none() {
387 : break;
388 : }
389 : }
390 : }
391 26 : }
392 :
393 227 : async fn permit(
394 227 : &self,
395 227 : kind: RequestKind,
396 227 : cancel: &CancellationToken,
397 227 : ) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
398 227 : let acquire = self.concurrency_limiter.acquire(kind);
399 :
400 227 : tokio::select! {
401 227 : permit = acquire => Ok(permit.expect("never closed")),
402 227 : _ = cancel.cancelled() => Err(Cancelled),
403 : }
404 227 : }
405 :
406 0 : pub fn container_name(&self) -> &str {
407 0 : &self.container_name
408 0 : }
409 :
410 0 : async fn list_versions_with_permit(
411 0 : &self,
412 0 : _permit: &tokio::sync::SemaphorePermit<'_>,
413 0 : prefix: Option<&RemotePath>,
414 0 : mode: ListingMode,
415 0 : max_keys: Option<NonZeroU32>,
416 0 : cancel: &CancellationToken,
417 0 : ) -> Result<crate::VersionListing, DownloadError> {
418 0 : let customize_builder = |mut builder: ListBlobsBuilder| {
419 0 : builder = builder.include_versions(true);
420 : // We do not return this info back to `VersionListing` yet.
421 0 : builder = builder.include_deleted(true);
422 0 : builder
423 0 : };
424 0 : let kind = RequestKind::ListVersions;
425 :
426 0 : let mut stream = std::pin::pin!(self.list_streaming_for_fn(
427 0 : prefix,
428 0 : mode,
429 0 : max_keys,
430 0 : cancel,
431 0 : kind,
432 0 : customize_builder
433 : ));
434 0 : let mut combined: crate::VersionListing =
435 0 : stream.next().await.expect("At least one item required")?;
436 0 : while let Some(list) = stream.next().await {
437 0 : let list = list?;
438 0 : combined.versions.extend(list.versions.into_iter());
439 : }
440 0 : Ok(combined)
441 0 : }
442 : }
443 :
444 : trait ListingCollector {
445 : fn add_prefixes(&mut self, abs: &AzureBlobStorage, prefix_it: impl Iterator<Item = RemotePath>);
446 : fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob);
447 : }
448 :
449 : impl ListingCollector for Listing {
450 45 : fn add_prefixes(
451 45 : &mut self,
452 45 : _abs: &AzureBlobStorage,
453 45 : prefix_it: impl Iterator<Item = RemotePath>,
454 45 : ) {
455 45 : self.prefixes.extend(prefix_it);
456 45 : }
457 197 : fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
458 197 : self.keys.push(ListingObject {
459 197 : key: abs.name_to_relative_path(&blob.name),
460 197 : last_modified: blob.properties.last_modified.into(),
461 197 : size: blob.properties.content_length,
462 197 : });
463 197 : }
464 : }
465 :
466 : impl ListingCollector for crate::VersionListing {
467 0 : fn add_prefixes(
468 0 : &mut self,
469 0 : _abs: &AzureBlobStorage,
470 0 : _prefix_it: impl Iterator<Item = RemotePath>,
471 0 : ) {
472 : // nothing
473 0 : }
474 0 : fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
475 0 : let id = crate::VersionId(blob.version_id.clone().expect("didn't find version ID"));
476 0 : self.versions.push(crate::Version {
477 0 : key: abs.name_to_relative_path(&blob.name),
478 0 : last_modified: blob.properties.last_modified.into(),
479 0 : kind: crate::VersionKind::Version(id),
480 0 : });
481 0 : }
482 : }
483 :
484 0 : fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
485 0 : let mut res = Metadata::new();
486 0 : for (k, v) in metadata.0.into_iter() {
487 0 : res.insert(k, v);
488 0 : }
489 0 : res
490 0 : }
491 :
492 3 : fn to_download_error(error: azure_core::Error) -> DownloadError {
493 3 : if let Some(http_err) = error.as_http_error() {
494 3 : match http_err.status() {
495 1 : StatusCode::NotFound => DownloadError::NotFound,
496 2 : StatusCode::NotModified => DownloadError::Unmodified,
497 0 : StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
498 0 : _ => DownloadError::Other(anyhow::Error::new(error)),
499 : }
500 : } else {
501 0 : DownloadError::Other(error.into())
502 : }
503 3 : }
504 :
505 : impl RemoteStorage for AzureBlobStorage {
506 26 : fn list_streaming(
507 26 : &self,
508 26 : prefix: Option<&RemotePath>,
509 26 : mode: ListingMode,
510 26 : max_keys: Option<NonZeroU32>,
511 26 : cancel: &CancellationToken,
512 26 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
513 26 : let customize_builder = |builder| builder;
514 26 : let kind = RequestKind::ListVersions;
515 26 : self.list_streaming_for_fn(prefix, mode, max_keys, cancel, kind, customize_builder)
516 26 : }
517 :
518 0 : async fn list_versions(
519 0 : &self,
520 0 : prefix: Option<&RemotePath>,
521 0 : mode: ListingMode,
522 0 : max_keys: Option<NonZeroU32>,
523 0 : cancel: &CancellationToken,
524 0 : ) -> std::result::Result<crate::VersionListing, DownloadError> {
525 0 : let kind = RequestKind::ListVersions;
526 0 : let permit = self.permit(kind, cancel).await?;
527 0 : self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
528 0 : .await
529 0 : }
530 :
531 3 : async fn head_object(
532 3 : &self,
533 3 : key: &RemotePath,
534 3 : cancel: &CancellationToken,
535 3 : ) -> Result<ListingObject, DownloadError> {
536 3 : let kind = RequestKind::Head;
537 3 : let _permit = self.permit(kind, cancel).await?;
538 :
539 3 : let started_at = start_measuring_requests(kind);
540 :
541 3 : let blob_client = self.client.blob_client(self.relative_path_to_name(key));
542 3 : let properties_future = blob_client.get_properties().into_future();
543 :
544 3 : let properties_future = tokio::time::timeout(self.small_timeout, properties_future);
545 :
546 3 : let res = tokio::select! {
547 3 : res = properties_future => res,
548 3 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
549 : };
550 :
551 3 : if let Ok(inner) = &res {
552 3 : // do not incl. timeouts as errors in metrics but cancellations
553 3 : let started_at = ScopeGuard::into_inner(started_at);
554 3 : crate::metrics::BUCKET_METRICS
555 3 : .req_seconds
556 3 : .observe_elapsed(kind, inner, started_at);
557 3 : }
558 :
559 3 : let data = match res {
560 2 : Ok(Ok(data)) => Ok(data),
561 1 : Ok(Err(sdk)) => Err(to_download_error(sdk)),
562 0 : Err(_timeout) => Err(DownloadError::Timeout),
563 1 : }?;
564 :
565 2 : let properties = data.blob.properties;
566 2 : Ok(ListingObject {
567 2 : key: key.to_owned(),
568 2 : last_modified: SystemTime::from(properties.last_modified),
569 2 : size: properties.content_length,
570 2 : })
571 3 : }
572 :
573 93 : async fn upload(
574 93 : &self,
575 93 : from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
576 93 : data_size_bytes: usize,
577 93 : to: &RemotePath,
578 93 : metadata: Option<StorageMetadata>,
579 93 : cancel: &CancellationToken,
580 93 : ) -> anyhow::Result<()> {
581 93 : let kind = RequestKind::Put;
582 93 : let _permit = self.permit(kind, cancel).await?;
583 :
584 93 : let started_at = start_measuring_requests(kind);
585 :
586 93 : let op = async {
587 93 : let blob_client = self.client.blob_client(self.relative_path_to_name(to));
588 :
589 93 : let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
590 93 : Box::pin(from);
591 :
592 93 : let from = NonSeekableStream::new(from, data_size_bytes);
593 :
594 93 : let body = azure_core::Body::SeekableStream(Box::new(from));
595 :
596 93 : let mut builder = blob_client.put_block_blob(body);
597 :
598 93 : if let Some(metadata) = metadata {
599 0 : builder = builder.metadata(to_azure_metadata(metadata));
600 0 : }
601 :
602 93 : let fut = builder.into_future();
603 93 : let fut = tokio::time::timeout(self.timeout, fut);
604 :
605 93 : match fut.await {
606 93 : Ok(Ok(_response)) => Ok(()),
607 0 : Ok(Err(azure)) => Err(azure.into()),
608 0 : Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
609 : }
610 0 : };
611 :
612 93 : let res = tokio::select! {
613 93 : res = op => res,
614 93 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
615 : };
616 :
617 93 : let outcome = match res {
618 93 : Ok(_) => AttemptOutcome::Ok,
619 0 : Err(_) => AttemptOutcome::Err,
620 : };
621 93 : let started_at = ScopeGuard::into_inner(started_at);
622 93 : crate::metrics::BUCKET_METRICS
623 93 : .req_seconds
624 93 : .observe_elapsed(kind, outcome, started_at);
625 :
626 93 : res
627 0 : }
628 :
629 11 : async fn download(
630 11 : &self,
631 11 : from: &RemotePath,
632 11 : opts: &DownloadOpts,
633 11 : cancel: &CancellationToken,
634 11 : ) -> Result<Download, DownloadError> {
635 11 : let blob_client = self.client.blob_client(self.relative_path_to_name(from));
636 :
637 11 : let mut builder = blob_client.get();
638 :
639 11 : if let Some(ref etag) = opts.etag {
640 3 : builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()));
641 8 : }
642 :
643 11 : if let Some(ref version_id) = opts.version_id {
644 0 : let version_id = azure_storage_blobs::prelude::VersionId::new(version_id.0.clone());
645 0 : builder = builder.blob_versioning(version_id);
646 11 : }
647 :
648 11 : if let Some((start, end)) = opts.byte_range() {
649 5 : builder = builder.range(match end {
650 3 : Some(end) => Range::Range(start..end),
651 2 : None => Range::RangeFrom(start..),
652 : });
653 6 : }
654 :
655 11 : let timeout = match opts.kind {
656 0 : DownloadKind::Small => self.small_timeout,
657 11 : DownloadKind::Large => self.timeout,
658 : };
659 :
660 11 : self.download_for_builder(builder, timeout, cancel).await
661 11 : }
662 :
663 86 : async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
664 86 : self.delete_objects(std::array::from_ref(path), cancel)
665 86 : .await
666 86 : }
667 :
668 93 : async fn delete_objects(
669 93 : &self,
670 93 : paths: &[RemotePath],
671 93 : cancel: &CancellationToken,
672 93 : ) -> anyhow::Result<()> {
673 93 : let kind = RequestKind::Delete;
674 93 : let _permit = self.permit(kind, cancel).await?;
675 93 : let started_at = start_measuring_requests(kind);
676 :
677 93 : let op = async {
678 : // TODO batch requests are not supported by the SDK
679 : // https://github.com/Azure/azure-sdk-for-rust/issues/1068
680 205 : for path in paths {
681 : #[derive(Debug)]
682 : enum AzureOrTimeout {
683 : AzureError(azure_core::Error),
684 : Timeout,
685 : Cancel,
686 : }
687 : impl Display for AzureOrTimeout {
688 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
689 0 : write!(f, "{self:?}")
690 0 : }
691 : }
692 112 : let warn_threshold = 3;
693 112 : let max_retries = 5;
694 112 : backoff::retry(
695 112 : || async {
696 112 : let blob_client = self.client.blob_client(self.relative_path_to_name(path));
697 :
698 112 : let request = blob_client.delete().into_future();
699 :
700 112 : let res = tokio::time::timeout(self.timeout, request).await;
701 :
702 112 : match res {
703 90 : Ok(Ok(_v)) => Ok(()),
704 22 : Ok(Err(azure_err)) => {
705 22 : if let Some(http_err) = azure_err.as_http_error() {
706 22 : if http_err.status() == StatusCode::NotFound {
707 22 : return Ok(());
708 0 : }
709 0 : }
710 0 : Err(AzureOrTimeout::AzureError(azure_err))
711 : }
712 0 : Err(_elapsed) => Err(AzureOrTimeout::Timeout),
713 : }
714 224 : },
715 0 : |err| match err {
716 0 : AzureOrTimeout::AzureError(_) | AzureOrTimeout::Timeout => false,
717 0 : AzureOrTimeout::Cancel => true,
718 0 : },
719 112 : warn_threshold,
720 112 : max_retries,
721 112 : "deleting remote object",
722 112 : cancel,
723 : )
724 112 : .await
725 112 : .ok_or_else(|| AzureOrTimeout::Cancel)
726 112 : .and_then(|x| x)
727 112 : .map_err(|e| match e {
728 0 : AzureOrTimeout::AzureError(err) => anyhow::Error::from(err),
729 0 : AzureOrTimeout::Timeout => TimeoutOrCancel::Timeout.into(),
730 0 : AzureOrTimeout::Cancel => TimeoutOrCancel::Cancel.into(),
731 0 : })?;
732 : }
733 93 : Ok(())
734 93 : };
735 :
736 93 : let res = tokio::select! {
737 93 : res = op => res,
738 93 : _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
739 : };
740 :
741 93 : let started_at = ScopeGuard::into_inner(started_at);
742 93 : crate::metrics::BUCKET_METRICS
743 93 : .req_seconds
744 93 : .observe_elapsed(kind, &res, started_at);
745 93 : res
746 93 : }
747 :
748 0 : fn max_keys_per_delete(&self) -> usize {
749 0 : super::MAX_KEYS_PER_DELETE_AZURE
750 0 : }
751 :
752 1 : async fn copy(
753 1 : &self,
754 1 : from: &RemotePath,
755 1 : to: &RemotePath,
756 1 : cancel: &CancellationToken,
757 1 : ) -> anyhow::Result<()> {
758 1 : let kind = RequestKind::Copy;
759 1 : let _permit = self.permit(kind, cancel).await?;
760 1 : let started_at = start_measuring_requests(kind);
761 :
762 1 : let timeout = tokio::time::sleep(self.timeout);
763 :
764 1 : let mut copy_status = None;
765 :
766 1 : let op = async {
767 1 : let blob_client = self.client.blob_client(self.relative_path_to_name(to));
768 :
769 1 : let source_url = format!(
770 1 : "{}/{}",
771 1 : self.client.url()?,
772 1 : self.relative_path_to_name(from)
773 : );
774 :
775 1 : let builder = blob_client.copy(Url::from_str(&source_url)?);
776 1 : let copy = builder.into_future();
777 :
778 1 : let result = copy.await?;
779 :
780 1 : copy_status = Some(result.copy_status);
781 : loop {
782 1 : match copy_status.as_ref().expect("we always set it to Some") {
783 : CopyStatus::Aborted => {
784 0 : anyhow::bail!("Received abort for copy from {from} to {to}.");
785 : }
786 : CopyStatus::Failed => {
787 0 : anyhow::bail!("Received failure response for copy from {from} to {to}.");
788 : }
789 1 : CopyStatus::Success => return Ok(()),
790 0 : CopyStatus::Pending => (),
791 : }
792 : // The copy is taking longer. Waiting a second and then re-trying.
793 : // TODO estimate time based on copy_progress and adjust time based on that
794 0 : tokio::time::sleep(Duration::from_millis(1000)).await;
795 0 : let properties = blob_client.get_properties().into_future().await?;
796 0 : let Some(status) = properties.blob.properties.copy_status else {
797 0 : tracing::warn!("copy_status for copy is None!, from={from}, to={to}");
798 0 : return Ok(());
799 : };
800 0 : copy_status = Some(status);
801 : }
802 1 : };
803 :
804 1 : let res = tokio::select! {
805 1 : res = op => res,
806 1 : _ = cancel.cancelled() => return Err(anyhow::Error::new(TimeoutOrCancel::Cancel)),
807 1 : _ = timeout => {
808 0 : let e = anyhow::Error::new(TimeoutOrCancel::Timeout);
809 0 : let e = e.context(format!("Timeout, last status: {copy_status:?}"));
810 0 : Err(e)
811 : },
812 : };
813 :
814 1 : let started_at = ScopeGuard::into_inner(started_at);
815 1 : crate::metrics::BUCKET_METRICS
816 1 : .req_seconds
817 1 : .observe_elapsed(kind, &res, started_at);
818 1 : res
819 1 : }
820 :
821 0 : async fn time_travel_recover(
822 0 : &self,
823 0 : prefix: Option<&RemotePath>,
824 0 : timestamp: SystemTime,
825 0 : done_if_after: SystemTime,
826 0 : cancel: &CancellationToken,
827 0 : _complexity_limit: Option<NonZeroU32>,
828 0 : ) -> Result<(), TimeTravelError> {
829 0 : let msg = "PLEASE NOTE: Azure Blob storage time-travel recovery may not work as expected "
830 0 : .to_string()
831 0 : + "for some specific files. If a file gets deleted but then overwritten and we want to recover "
832 0 : + "to the time during the file was not present, this functionality will recover the file. Only "
833 0 : + "use the functionality for services that can tolerate this. For example, recovering a state of the "
834 0 : + "pageserver tenants.";
835 0 : tracing::error!("{}", msg);
836 :
837 0 : let kind = RequestKind::TimeTravel;
838 0 : let permit = self.permit(kind, cancel).await?;
839 :
840 0 : let mode = ListingMode::NoDelimiter;
841 0 : let version_listing = self
842 0 : .list_versions_with_permit(&permit, prefix, mode, None, cancel)
843 0 : .await
844 0 : .map_err(|err| match err {
845 0 : DownloadError::Other(e) => TimeTravelError::Other(e),
846 0 : DownloadError::Cancelled => TimeTravelError::Cancelled,
847 0 : other => TimeTravelError::Other(other.into()),
848 0 : })?;
849 0 : let versions_and_deletes = version_listing.versions;
850 :
851 0 : tracing::info!(
852 0 : "Built list for time travel with {} versions and deletions",
853 0 : versions_and_deletes.len()
854 : );
855 :
856 : // Work on the list of references instead of the objects directly,
857 : // otherwise we get lifetime errors in the sort_by_key call below.
858 0 : let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
859 :
860 0 : versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
861 :
862 0 : let mut vds_for_key = HashMap::<_, Vec<_>>::new();
863 :
864 0 : for vd in &versions_and_deletes {
865 0 : let Version { key, .. } = &vd;
866 0 : let version_id = vd.version_id().map(|v| v.0.as_str());
867 0 : if version_id == Some("null") {
868 0 : return Err(TimeTravelError::Other(anyhow!(
869 0 : "Received ListVersions response for key={key} with version_id='null', \
870 0 : indicating either disabled versioning, or legacy objects with null version id values"
871 0 : )));
872 0 : }
873 0 : tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
874 :
875 0 : vds_for_key.entry(key).or_default().push(vd);
876 : }
877 :
878 0 : let warn_threshold = 3;
879 0 : let max_retries = 10;
880 0 : let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
881 :
882 0 : for (key, versions) in vds_for_key {
883 0 : let last_vd = versions.last().unwrap();
884 0 : let key = self.relative_path_to_name(key);
885 0 : if last_vd.last_modified > done_if_after {
886 0 : tracing::debug!("Key {key} has version later than done_if_after, skipping");
887 0 : continue;
888 0 : }
889 : // the version we want to restore to.
890 0 : let version_to_restore_to =
891 0 : match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) {
892 0 : Ok(v) => v,
893 0 : Err(e) => e,
894 : };
895 0 : if version_to_restore_to == versions.len() {
896 0 : tracing::debug!("Key {key} has no changes since timestamp, skipping");
897 0 : continue;
898 0 : }
899 0 : let mut do_delete = false;
900 0 : if version_to_restore_to == 0 {
901 : // All versions more recent, so the key didn't exist at the specified time point.
902 0 : tracing::debug!(
903 0 : "All {} versions more recent for {key}, deleting",
904 0 : versions.len()
905 : );
906 0 : do_delete = true;
907 : } else {
908 0 : match &versions[version_to_restore_to - 1] {
909 : Version {
910 0 : kind: VersionKind::Version(version_id),
911 : ..
912 : } => {
913 0 : let source_url = format!(
914 0 : "{}/{}?versionid={}",
915 0 : self.client
916 0 : .url()
917 0 : .map_err(|e| TimeTravelError::Other(anyhow!("{e}")))?,
918 : key,
919 : version_id.0
920 : );
921 0 : tracing::debug!(
922 0 : "Promoting old version {} for {key} at {}...",
923 : version_id.0,
924 : source_url
925 : );
926 0 : backoff::retry(
927 0 : || async {
928 0 : let blob_client = self.client.blob_client(key.clone());
929 0 : let op = blob_client.copy(Url::from_str(&source_url).unwrap());
930 0 : tokio::select! {
931 0 : res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
932 0 : _ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
933 : }
934 0 : },
935 0 : is_permanent,
936 0 : warn_threshold,
937 0 : max_retries,
938 0 : "copying object version for time_travel_recover",
939 0 : cancel,
940 : )
941 0 : .await
942 0 : .ok_or_else(|| TimeTravelError::Cancelled)
943 0 : .and_then(|x| x)?;
944 0 : tracing::info!(?version_id, %key, "Copied old version in Azure blob storage");
945 : }
946 : Version {
947 : kind: VersionKind::DeletionMarker,
948 : ..
949 0 : } => {
950 0 : do_delete = true;
951 0 : }
952 : }
953 : };
954 0 : if do_delete {
955 0 : if matches!(last_vd.kind, VersionKind::DeletionMarker) {
956 : // Key has since been deleted (but there was some history), no need to do anything
957 0 : tracing::debug!("Key {key} already deleted, skipping.");
958 : } else {
959 0 : tracing::debug!("Deleting {key}...");
960 :
961 0 : self.delete(&RemotePath::from_string(&key).unwrap(), cancel)
962 0 : .await
963 0 : .map_err(|e| {
964 : // delete_oid0 will use TimeoutOrCancel
965 0 : if TimeoutOrCancel::caused_by_cancel(&e) {
966 0 : TimeTravelError::Cancelled
967 : } else {
968 0 : TimeTravelError::Other(e)
969 : }
970 0 : })?;
971 : }
972 0 : }
973 : }
974 :
975 0 : Ok(())
976 0 : }
977 : }
978 :
979 : pin_project_lite::pin_project! {
980 : /// Hack to work around not being able to stream once with azure sdk.
981 : ///
982 : /// Azure sdk clones streams around with the assumption that they are like
983 : /// `Arc<tokio::fs::File>` (except not supporting tokio), however our streams are not like
984 : /// that. For example for an `index_part.json` we just have a single chunk of [`Bytes`]
985 : /// representing the whole serialized vec. It could be trivially cloneable and "semi-trivially"
986 : /// seekable, but we can also just re-try the request easier.
987 : #[project = NonSeekableStreamProj]
988 : enum NonSeekableStream<S> {
989 : /// A stream wrappers initial form.
990 : ///
991 : /// Mutex exists to allow moving when cloning. If the sdk changes to do less than 1
992 : /// clone before first request, then this must be changed.
993 : Initial {
994 : inner: std::sync::Mutex<Option<tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>>>,
995 : len: usize,
996 : },
997 : /// The actually readable variant, produced by cloning the Initial variant.
998 : ///
999 : /// The sdk currently always clones once, even without retry policy.
1000 : Actual {
1001 : #[pin]
1002 : inner: tokio_util::compat::Compat<tokio_util::io::StreamReader<S, Bytes>>,
1003 : len: usize,
1004 : read_any: bool,
1005 : },
1006 : /// Most likely unneeded, but left to make life easier, in case more clones are added.
1007 : Cloned {
1008 : len_was: usize,
1009 : }
1010 : }
1011 : }
1012 :
1013 : impl<S> NonSeekableStream<S>
1014 : where
1015 : S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
1016 : {
1017 93 : fn new(inner: S, len: usize) -> NonSeekableStream<S> {
1018 : use tokio_util::compat::TokioAsyncReadCompatExt;
1019 :
1020 93 : let inner = tokio_util::io::StreamReader::new(inner).compat();
1021 93 : let inner = Some(inner);
1022 93 : let inner = std::sync::Mutex::new(inner);
1023 93 : NonSeekableStream::Initial { inner, len }
1024 0 : }
1025 : }
1026 :
1027 : impl<S> std::fmt::Debug for NonSeekableStream<S> {
1028 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1029 0 : match self {
1030 0 : Self::Initial { len, .. } => f.debug_struct("Initial").field("len", len).finish(),
1031 0 : Self::Actual { len, .. } => f.debug_struct("Actual").field("len", len).finish(),
1032 0 : Self::Cloned { len_was, .. } => f.debug_struct("Cloned").field("len", len_was).finish(),
1033 : }
1034 0 : }
1035 : }
1036 :
1037 : impl<S> futures::io::AsyncRead for NonSeekableStream<S>
1038 : where
1039 : S: Stream<Item = std::io::Result<Bytes>>,
1040 : {
1041 93 : fn poll_read(
1042 93 : self: std::pin::Pin<&mut Self>,
1043 93 : cx: &mut std::task::Context<'_>,
1044 93 : buf: &mut [u8],
1045 93 : ) -> std::task::Poll<std::io::Result<usize>> {
1046 93 : match self.project() {
1047 : NonSeekableStreamProj::Actual {
1048 93 : inner, read_any, ..
1049 : } => {
1050 93 : *read_any = true;
1051 93 : inner.poll_read(cx, buf)
1052 : }
1053 : // NonSeekableStream::Initial does not support reading because it is just much easier
1054 : // to have the mutex in place where one does not poll the contents, or that's how it
1055 : // seemed originally. If there is a version upgrade which changes the cloning, then
1056 : // that support needs to be hacked in.
1057 : //
1058 : // including {self:?} into the message would be useful, but unsure how to unproject.
1059 0 : _ => std::task::Poll::Ready(Err(std::io::Error::other(
1060 0 : "cloned or initial values cannot be read",
1061 0 : ))),
1062 : }
1063 0 : }
1064 : }
1065 :
1066 : impl<S> Clone for NonSeekableStream<S> {
1067 : /// Weird clone implementation exists to support the sdk doing cloning before issuing the first
1068 : /// request, see type documentation.
1069 93 : fn clone(&self) -> Self {
1070 : use NonSeekableStream::*;
1071 :
1072 93 : match self {
1073 93 : Initial { inner, len } => {
1074 93 : if let Some(inner) = inner.lock().unwrap().take() {
1075 93 : Actual {
1076 93 : inner,
1077 93 : len: *len,
1078 93 : read_any: false,
1079 93 : }
1080 : } else {
1081 0 : Self::Cloned { len_was: *len }
1082 : }
1083 : }
1084 0 : Actual { len, .. } => Cloned { len_was: *len },
1085 0 : Cloned { len_was } => Cloned { len_was: *len_was },
1086 : }
1087 0 : }
1088 : }
1089 :
1090 : #[async_trait::async_trait]
1091 : impl<S> azure_core::SeekableStream for NonSeekableStream<S>
1092 : where
1093 : S: Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync + 'static,
1094 : {
1095 0 : async fn reset(&mut self) -> azure_core::error::Result<()> {
1096 : use NonSeekableStream::*;
1097 :
1098 0 : let msg = match self {
1099 0 : Initial { inner, .. } => {
1100 0 : if inner.get_mut().unwrap().is_some() {
1101 0 : return Ok(());
1102 : } else {
1103 0 : "reset after first clone is not supported"
1104 : }
1105 : }
1106 0 : Actual { read_any, .. } if !*read_any => return Ok(()),
1107 0 : Actual { .. } => "reset after reading is not supported",
1108 0 : Cloned { .. } => "reset after second clone is not supported",
1109 : };
1110 0 : Err(azure_core::error::Error::new(
1111 0 : azure_core::error::ErrorKind::Io,
1112 0 : std::io::Error::other(msg),
1113 0 : ))
1114 0 : }
1115 :
1116 : // Note: it is not documented if this should be the total or remaining length, total passes the
1117 : // tests.
1118 93 : fn len(&self) -> usize {
1119 : use NonSeekableStream::*;
1120 93 : match self {
1121 93 : Initial { len, .. } => *len,
1122 0 : Actual { len, .. } => *len,
1123 0 : Cloned { len_was, .. } => *len_was,
1124 : }
1125 0 : }
1126 : }
|