Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{
8 : collections::HashSet,
9 : io::ErrorKind,
10 : num::NonZeroU32,
11 : time::{Duration, SystemTime, UNIX_EPOCH},
12 : };
13 :
14 : use anyhow::{bail, ensure, Context};
15 : use bytes::Bytes;
16 : use camino::{Utf8Path, Utf8PathBuf};
17 : use futures::stream::Stream;
18 : use tokio::{
19 : fs,
20 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
21 : };
22 : use tokio_util::{io::ReaderStream, sync::CancellationToken};
23 : use utils::crashsafe::path_with_suffix_extension;
24 :
25 : use crate::{
26 : Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath, TimeTravelError,
27 : TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
28 : };
29 :
30 : use super::{RemoteStorage, StorageMetadata};
31 : use crate::Etag;
32 :
33 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
34 :
35 : #[derive(Debug, Clone)]
36 : pub struct LocalFs {
37 : storage_root: Utf8PathBuf,
38 : timeout: Duration,
39 : }
40 :
41 : impl LocalFs {
42 : /// Attempts to create local FS storage, along with its root directory.
43 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
44 700 : pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
45 700 : if !storage_root.exists() {
46 88 : std::fs::create_dir_all(&storage_root).with_context(|| {
47 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
48 88 : })?;
49 612 : }
50 700 : if !storage_root.is_absolute() {
51 564 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
52 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
53 564 : })?;
54 136 : }
55 :
56 700 : Ok(Self {
57 700 : storage_root,
58 700 : timeout,
59 700 : })
60 700 : }
61 :
62 : // mirrors S3Bucket::s3_object_to_relative_path
63 410 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
64 410 : let relative_path = key
65 410 : .strip_prefix(&self.storage_root)
66 410 : .expect("relative path must contain storage_root as prefix");
67 410 : RemotePath(relative_path.into())
68 410 : }
69 :
70 172 : async fn read_storage_metadata(
71 172 : &self,
72 172 : file_path: &Utf8Path,
73 172 : ) -> anyhow::Result<Option<StorageMetadata>> {
74 172 : let metadata_path = storage_metadata_path(file_path);
75 172 : if metadata_path.exists() && metadata_path.is_file() {
76 16 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
77 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
78 16 : })?;
79 :
80 16 : serde_json::from_str(&metadata_string)
81 16 : .with_context(|| {
82 0 : format!(
83 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
84 0 : )
85 16 : })
86 16 : .map(|metadata| Some(StorageMetadata(metadata)))
87 : } else {
88 156 : Ok(None)
89 : }
90 172 : }
91 :
92 : #[cfg(test)]
93 24 : async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
94 24 : use std::{future::Future, pin::Pin};
95 72 : fn get_all_files<'a, P>(
96 72 : directory_path: P,
97 72 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
98 72 : where
99 72 : P: AsRef<Utf8Path> + Send + Sync + 'a,
100 72 : {
101 72 : Box::pin(async move {
102 72 : let directory_path = directory_path.as_ref();
103 72 : if directory_path.exists() {
104 72 : if directory_path.is_dir() {
105 72 : let mut paths = Vec::new();
106 72 : let mut dir_contents = fs::read_dir(directory_path).await?;
107 144 : while let Some(dir_entry) = dir_contents.next_entry().await? {
108 72 : let file_type = dir_entry.file_type().await?;
109 72 : let entry_path =
110 72 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
111 0 : anyhow::Error::msg(format!(
112 0 : "non-Unicode path: {}",
113 0 : pb.to_string_lossy()
114 0 : ))
115 72 : })?;
116 72 : if file_type.is_symlink() {
117 24 : tracing::debug!("{entry_path:?} is a symlink, skipping")
118 72 : } else if file_type.is_dir() {
119 69 : paths.extend(get_all_files(&entry_path).await?.into_iter())
120 24 : } else {
121 24 : paths.push(entry_path);
122 24 : }
123 24 : }
124 72 : Ok(paths)
125 24 : } else {
126 24 : bail!("Path {directory_path:?} is not a directory")
127 24 : }
128 24 : } else {
129 24 : Ok(Vec::new())
130 24 : }
131 72 : })
132 72 : }
133 24 :
134 24 : Ok(get_all_files(&self.storage_root)
135 69 : .await?
136 24 : .into_iter()
137 24 : .map(|path| {
138 24 : path.strip_prefix(&self.storage_root)
139 24 : .context("Failed to strip storage root prefix")
140 24 : .and_then(RemotePath::new)
141 24 : .expect(
142 24 : "We list files for storage root, hence should be able to remote the prefix",
143 24 : )
144 24 : })
145 24 : .collect())
146 24 : }
147 :
148 : // recursively lists all files in a directory,
149 : // mirroring the `list_files` for `s3_bucket`
150 630 : async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
151 630 : let full_path = match folder {
152 614 : Some(folder) => folder.with_base(&self.storage_root),
153 16 : None => self.storage_root.clone(),
154 : };
155 :
156 : // If we were given a directory, we may use it as our starting point.
157 : // Otherwise, we must go up to the first ancestor dir that exists. This is because
158 : // S3 object list prefixes can be arbitrary strings, but when reading
159 : // the local filesystem we need a directory to start calling read_dir on.
160 630 : let mut initial_dir = full_path.clone();
161 630 :
162 630 : // If there's no trailing slash, we have to start looking from one above: even if
163 630 : // `initial_dir` is a directory, we should still list any prefixes in the parent
164 630 : // that start with the same string.
165 630 : if !full_path.to_string().ends_with('/') {
166 58 : initial_dir.pop();
167 572 : }
168 :
169 2286 : loop {
170 2286 : // Did we make it to the root?
171 2286 : if initial_dir.parent().is_none() {
172 0 : anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
173 2286 : }
174 2286 :
175 2286 : match fs::metadata(initial_dir.clone()).await {
176 630 : Ok(meta) if meta.is_dir() => {
177 630 : // We found a directory, break
178 630 : break;
179 : }
180 0 : Ok(_meta) => {
181 0 : // It's not a directory: strip back to the parent
182 0 : initial_dir.pop();
183 0 : }
184 1656 : Err(e) if e.kind() == ErrorKind::NotFound => {
185 1656 : // It's not a file that exists: strip the prefix back to the parent directory
186 1656 : initial_dir.pop();
187 1656 : }
188 0 : Err(e) => {
189 0 : // Unexpected I/O error
190 0 : anyhow::bail!(e)
191 : }
192 : }
193 : }
194 : // Note that Utf8PathBuf starts_with only considers full path segments, but
195 : // object prefixes are arbitrary strings, so we need the strings for doing
196 : // starts_with later.
197 630 : let prefix = full_path.as_str();
198 630 :
199 630 : let mut files = vec![];
200 630 : let mut directory_queue = vec![initial_dir];
201 1414 : while let Some(cur_folder) = directory_queue.pop() {
202 784 : let mut entries = cur_folder.read_dir_utf8()?;
203 2213 : while let Some(Ok(entry)) = entries.next() {
204 1429 : let file_name = entry.file_name();
205 1429 : let full_file_name = cur_folder.join(file_name);
206 1429 : if full_file_name.as_str().starts_with(prefix) {
207 410 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
208 410 : files.push(file_remote_path);
209 410 : if full_file_name.is_dir() {
210 154 : directory_queue.push(full_file_name);
211 256 : }
212 1019 : }
213 : }
214 : }
215 :
216 630 : Ok(files)
217 630 : }
218 :
219 8574 : async fn upload0(
220 8574 : &self,
221 8574 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
222 8574 : data_size_bytes: usize,
223 8574 : to: &RemotePath,
224 8574 : metadata: Option<StorageMetadata>,
225 8574 : cancel: &CancellationToken,
226 8574 : ) -> anyhow::Result<()> {
227 8574 : let target_file_path = to.with_base(&self.storage_root);
228 8574 : create_target_directory(&target_file_path).await?;
229 : // We need this dance with sort of durable rename (without fsyncs)
230 : // to prevent partial uploads. This was really hit when pageserver shutdown
231 : // cancelled the upload and partial file was left on the fs
232 : // NOTE: Because temp file suffix always the same this operation is racy.
233 : // Two concurrent operations can lead to the following sequence:
234 : // T1: write(temp)
235 : // T2: write(temp) -> overwrites the content
236 : // T1: rename(temp, dst) -> succeeds
237 : // T2: rename(temp, dst) -> fails, temp no longet exists
238 : // This can be solved by supplying unique temp suffix every time, but this situation
239 : // is not normal in the first place, the error can help (and helped at least once)
240 : // to discover bugs in upper level synchronization.
241 8569 : let temp_file_path =
242 8569 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
243 8477 : let mut destination = io::BufWriter::new(
244 8569 : fs::OpenOptions::new()
245 8569 : .write(true)
246 8569 : .create(true)
247 8569 : .truncate(true)
248 8569 : .open(&temp_file_path)
249 7869 : .await
250 8477 : .with_context(|| {
251 0 : format!("Failed to open target fs destination at '{target_file_path}'")
252 8477 : })?,
253 : );
254 :
255 8477 : let from_size_bytes = data_size_bytes as u64;
256 8477 : let data = tokio_util::io::StreamReader::new(data);
257 8477 : let data = std::pin::pin!(data);
258 8477 : let mut buffer_to_read = data.take(from_size_bytes);
259 8477 :
260 8477 : // alternatively we could just write the bytes to a file, but local_fs is a testing utility
261 8477 : let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
262 :
263 8453 : let bytes_read = tokio::select! {
264 : biased;
265 : _ = cancel.cancelled() => {
266 : let file = destination.into_inner();
267 : // wait for the inflight operation(s) to complete so that there could be a next
268 : // attempt right away and our writes are not directed to their file.
269 : file.into_std().await;
270 :
271 : // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
272 : // least.
273 : fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
274 : return Err(TimeoutOrCancel::Cancel.into());
275 : }
276 : read = copy => read,
277 : };
278 :
279 8453 : let bytes_read =
280 8453 : bytes_read.with_context(|| {
281 0 : format!(
282 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
283 0 : )
284 8453 : })?;
285 :
286 8453 : if bytes_read < from_size_bytes {
287 8 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
288 8445 : }
289 8445 : // Check if there is any extra data after the given size.
290 8445 : let mut from = buffer_to_read.into_inner();
291 8445 : let extra_read = from.read(&mut [1]).await?;
292 8444 : ensure!(
293 8444 : extra_read == 0,
294 16 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
295 : );
296 :
297 8428 : destination.flush().await.with_context(|| {
298 0 : format!(
299 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
300 0 : )
301 8428 : })?;
302 :
303 8428 : fs::rename(temp_file_path, &target_file_path)
304 7678 : .await
305 8422 : .with_context(|| {
306 0 : format!(
307 0 : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
308 0 : )
309 8422 : })?;
310 :
311 8422 : if let Some(storage_metadata) = metadata {
312 : // FIXME: we must not be using metadata much, since this would forget the old metadata
313 : // for new writes? or perhaps metadata is sticky; could consider removing if it's never
314 : // used.
315 8 : let storage_metadata_path = storage_metadata_path(&target_file_path);
316 8 : fs::write(
317 8 : &storage_metadata_path,
318 8 : serde_json::to_string(&storage_metadata.0)
319 8 : .context("Failed to serialize storage metadata as json")?,
320 : )
321 7 : .await
322 8 : .with_context(|| {
323 0 : format!(
324 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
325 0 : )
326 8 : })?;
327 8414 : }
328 :
329 8422 : Ok(())
330 8454 : }
331 : }
332 :
333 : impl RemoteStorage for LocalFs {
334 0 : fn list_streaming(
335 0 : &self,
336 0 : prefix: Option<&RemotePath>,
337 0 : mode: ListingMode,
338 0 : max_keys: Option<NonZeroU32>,
339 0 : cancel: &CancellationToken,
340 0 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
341 0 : let listing = self.list(prefix, mode, max_keys, cancel);
342 0 : futures::stream::once(listing)
343 0 : }
344 :
345 630 : async fn list(
346 630 : &self,
347 630 : prefix: Option<&RemotePath>,
348 630 : mode: ListingMode,
349 630 : max_keys: Option<NonZeroU32>,
350 630 : cancel: &CancellationToken,
351 630 : ) -> Result<Listing, DownloadError> {
352 630 : let op = async {
353 630 : let mut result = Listing::default();
354 :
355 : // Filter out directories: in S3 directories don't exist, only the keys within them do.
356 630 : let keys = self
357 630 : .list_recursive(prefix)
358 2241 : .await
359 630 : .map_err(DownloadError::Other)?;
360 630 : let objects = keys
361 630 : .into_iter()
362 630 : .filter_map(|k| {
363 410 : let path = k.with_base(&self.storage_root);
364 410 : if path.is_dir() {
365 154 : None
366 : } else {
367 256 : Some(ListingObject {
368 256 : key: k.clone(),
369 256 : // LocalFs is just for testing, so just specify a dummy time
370 256 : last_modified: SystemTime::now(),
371 256 : size: 0,
372 256 : })
373 : }
374 630 : })
375 630 : .collect();
376 630 :
377 630 : if let ListingMode::NoDelimiter = mode {
378 26 : result.keys = objects;
379 26 : } else {
380 604 : let mut prefixes = HashSet::new();
381 782 : for object in objects {
382 178 : let key = object.key;
383 : // If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
384 178 : let relative_key = if let Some(prefix) = prefix {
385 154 : let mut prefix = prefix.clone();
386 154 : // We only strip the dirname of the prefix, so that when we strip it from the start of keys we
387 154 : // end up with full file/dir names.
388 154 : let prefix_full_local_path = prefix.with_base(&self.storage_root);
389 154 : let has_slash = prefix.0.to_string().ends_with('/');
390 154 : let strip_prefix = if prefix_full_local_path.is_dir() && has_slash {
391 90 : prefix
392 : } else {
393 64 : prefix.0.pop();
394 64 : prefix
395 : };
396 :
397 154 : RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap()
398 : } else {
399 24 : key
400 : };
401 :
402 178 : let relative_key = format!("{}", relative_key);
403 178 : if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
404 170 : let first_part = relative_key
405 170 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
406 170 : .next()
407 170 : .unwrap()
408 170 : .to_owned();
409 170 : prefixes.insert(first_part);
410 170 : } else {
411 8 : result.keys.push(ListingObject {
412 8 : key: RemotePath::from_string(&relative_key).unwrap(),
413 8 : // LocalFs is just for testing
414 8 : last_modified: SystemTime::now(),
415 8 : size: 0,
416 8 : });
417 8 : }
418 : }
419 604 : result.prefixes = prefixes
420 604 : .into_iter()
421 604 : .map(|s| RemotePath::from_string(&s).unwrap())
422 604 : .collect();
423 604 : }
424 :
425 630 : if let Some(max_keys) = max_keys {
426 0 : result.keys.truncate(max_keys.get() as usize);
427 630 : }
428 630 : Ok(result)
429 630 : };
430 :
431 630 : let timeout = async {
432 1825 : tokio::time::sleep(self.timeout).await;
433 0 : Err(DownloadError::Timeout)
434 0 : };
435 :
436 630 : let cancelled = async {
437 2024 : cancel.cancelled().await;
438 0 : Err(DownloadError::Cancelled)
439 0 : };
440 :
441 : tokio::select! {
442 : res = op => res,
443 : res = timeout => res,
444 : res = cancelled => res,
445 : }
446 630 : }
447 :
448 0 : async fn head_object(
449 0 : &self,
450 0 : key: &RemotePath,
451 0 : _cancel: &CancellationToken,
452 0 : ) -> Result<ListingObject, DownloadError> {
453 0 : let target_file_path = key.with_base(&self.storage_root);
454 0 : let metadata = file_metadata(&target_file_path).await?;
455 : Ok(ListingObject {
456 0 : key: key.clone(),
457 0 : last_modified: metadata.modified()?,
458 0 : size: metadata.len(),
459 : })
460 0 : }
461 :
462 8574 : async fn upload(
463 8574 : &self,
464 8574 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
465 8574 : data_size_bytes: usize,
466 8574 : to: &RemotePath,
467 8574 : metadata: Option<StorageMetadata>,
468 8574 : cancel: &CancellationToken,
469 8574 : ) -> anyhow::Result<()> {
470 8574 : let cancel = cancel.child_token();
471 8574 :
472 8574 : let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
473 8574 : let mut op = std::pin::pin!(op);
474 :
475 : // race the upload0 to the timeout; if it goes over, do a graceful shutdown
476 8454 : let (res, timeout) = tokio::select! {
477 : res = &mut op => (res, false),
478 : _ = tokio::time::sleep(self.timeout) => {
479 : cancel.cancel();
480 : (op.await, true)
481 : }
482 : };
483 :
484 32 : match res {
485 32 : Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
486 0 : // we caused this cancel (or they happened simultaneously) -- swap it out to
487 0 : // Timeout
488 0 : Err(TimeoutOrCancel::Timeout.into())
489 : }
490 8454 : res => res,
491 : }
492 8454 : }
493 :
494 182 : async fn download(
495 182 : &self,
496 182 : from: &RemotePath,
497 182 : cancel: &CancellationToken,
498 182 : ) -> Result<Download, DownloadError> {
499 182 : let target_path = from.with_base(&self.storage_root);
500 :
501 182 : let file_metadata = file_metadata(&target_path).await?;
502 :
503 132 : let source = ReaderStream::new(
504 132 : fs::OpenOptions::new()
505 132 : .read(true)
506 132 : .open(&target_path)
507 126 : .await
508 132 : .with_context(|| {
509 0 : format!("Failed to open source file {target_path:?} to use in the download")
510 132 : })
511 132 : .map_err(DownloadError::Other)?,
512 : );
513 :
514 132 : let metadata = self
515 132 : .read_storage_metadata(&target_path)
516 8 : .await
517 132 : .map_err(DownloadError::Other)?;
518 :
519 132 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
520 132 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
521 132 :
522 132 : let etag = mock_etag(&file_metadata);
523 132 : Ok(Download {
524 132 : metadata,
525 132 : last_modified: file_metadata
526 132 : .modified()
527 132 : .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
528 132 : etag,
529 132 : download_stream: Box::pin(source),
530 : })
531 182 : }
532 :
533 56 : async fn download_byte_range(
534 56 : &self,
535 56 : from: &RemotePath,
536 56 : start_inclusive: u64,
537 56 : end_exclusive: Option<u64>,
538 56 : cancel: &CancellationToken,
539 56 : ) -> Result<Download, DownloadError> {
540 56 : if let Some(end_exclusive) = end_exclusive {
541 40 : if end_exclusive <= start_inclusive {
542 8 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
543 32 : };
544 32 : if start_inclusive == end_exclusive.saturating_sub(1) {
545 8 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
546 24 : }
547 16 : }
548 :
549 40 : let target_path = from.with_base(&self.storage_root);
550 40 : let file_metadata = file_metadata(&target_path).await?;
551 40 : let mut source = tokio::fs::OpenOptions::new()
552 40 : .read(true)
553 40 : .open(&target_path)
554 39 : .await
555 40 : .with_context(|| {
556 0 : format!("Failed to open source file {target_path:?} to use in the download")
557 40 : })
558 40 : .map_err(DownloadError::Other)?;
559 :
560 40 : let len = source
561 40 : .metadata()
562 40 : .await
563 40 : .context("query file length")
564 40 : .map_err(DownloadError::Other)?
565 40 : .len();
566 40 :
567 40 : source
568 40 : .seek(io::SeekFrom::Start(start_inclusive))
569 39 : .await
570 40 : .context("Failed to seek to the range start in a local storage file")
571 40 : .map_err(DownloadError::Other)?;
572 :
573 40 : let metadata = self
574 40 : .read_storage_metadata(&target_path)
575 8 : .await
576 40 : .map_err(DownloadError::Other)?;
577 :
578 40 : let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
579 40 : let source = ReaderStream::new(source);
580 40 :
581 40 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
582 40 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
583 40 :
584 40 : let etag = mock_etag(&file_metadata);
585 40 : Ok(Download {
586 40 : metadata,
587 40 : last_modified: file_metadata
588 40 : .modified()
589 40 : .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
590 40 : etag,
591 40 : download_stream: Box::pin(source),
592 : })
593 56 : }
594 :
595 40 : async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
596 40 : let file_path = path.with_base(&self.storage_root);
597 40 : match fs::remove_file(&file_path).await {
598 32 : Ok(()) => Ok(()),
599 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
600 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
601 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
602 8 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
603 0 : Err(e) => Err(anyhow::anyhow!(e)),
604 : }
605 40 : }
606 :
607 18 : async fn delete_objects<'a>(
608 18 : &self,
609 18 : paths: &'a [RemotePath],
610 18 : cancel: &CancellationToken,
611 18 : ) -> anyhow::Result<()> {
612 36 : for path in paths {
613 18 : self.delete(path, cancel).await?
614 : }
615 18 : Ok(())
616 18 : }
617 :
618 0 : async fn copy(
619 0 : &self,
620 0 : from: &RemotePath,
621 0 : to: &RemotePath,
622 0 : _cancel: &CancellationToken,
623 0 : ) -> anyhow::Result<()> {
624 0 : let from_path = from.with_base(&self.storage_root);
625 0 : let to_path = to.with_base(&self.storage_root);
626 0 : create_target_directory(&to_path).await?;
627 0 : fs::copy(&from_path, &to_path).await.with_context(|| {
628 0 : format!(
629 0 : "Failed to copy file from '{from_path}' to '{to_path}'",
630 0 : from_path = from_path,
631 0 : to_path = to_path
632 0 : )
633 0 : })?;
634 0 : Ok(())
635 0 : }
636 :
637 0 : async fn time_travel_recover(
638 0 : &self,
639 0 : _prefix: Option<&RemotePath>,
640 0 : _timestamp: SystemTime,
641 0 : _done_if_after: SystemTime,
642 0 : _cancel: &CancellationToken,
643 0 : ) -> Result<(), TimeTravelError> {
644 0 : Err(TimeTravelError::Unimplemented)
645 0 : }
646 : }
647 :
648 180 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
649 180 : path_with_suffix_extension(original_path, "metadata")
650 180 : }
651 :
652 8574 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
653 8574 : let target_dir = match target_file_path.parent() {
654 8574 : Some(parent_dir) => parent_dir,
655 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
656 : };
657 8574 : if !target_dir.exists() {
658 1235 : fs::create_dir_all(target_dir).await?;
659 7339 : }
660 8569 : Ok(())
661 8569 : }
662 :
663 222 : async fn file_metadata(file_path: &Utf8Path) -> Result<std::fs::Metadata, DownloadError> {
664 222 : tokio::fs::metadata(&file_path).await.map_err(|e| {
665 50 : if e.kind() == ErrorKind::NotFound {
666 50 : DownloadError::NotFound
667 : } else {
668 0 : DownloadError::BadInput(e.into())
669 : }
670 222 : })
671 222 : }
672 :
673 : // Use mtime as stand-in for ETag. We could calculate a meaningful one by md5'ing the contents of files we
674 : // read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests
675 : // quickly, with less overhead than using a mock S3 server.
676 172 : fn mock_etag(meta: &std::fs::Metadata) -> Etag {
677 172 : let mtime = meta.modified().expect("Filesystem mtime missing");
678 172 : format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into()
679 172 : }
680 :
681 : #[cfg(test)]
682 : mod fs_tests {
683 : use super::*;
684 :
685 : use camino_tempfile::tempdir;
686 : use std::{collections::HashMap, io::Write};
687 :
688 24 : async fn read_and_check_metadata(
689 24 : storage: &LocalFs,
690 24 : remote_storage_path: &RemotePath,
691 24 : expected_metadata: Option<&StorageMetadata>,
692 24 : ) -> anyhow::Result<String> {
693 24 : let cancel = CancellationToken::new();
694 24 : let download = storage
695 24 : .download(remote_storage_path, &cancel)
696 55 : .await
697 24 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
698 24 : ensure!(
699 24 : download.metadata.as_ref() == expected_metadata,
700 0 : "Unexpected metadata returned for the downloaded file"
701 : );
702 :
703 45 : let contents = aggregate(download.download_stream).await?;
704 :
705 24 : String::from_utf8(contents).map_err(anyhow::Error::new)
706 24 : }
707 :
708 : #[tokio::test]
709 8 : async fn upload_file() -> anyhow::Result<()> {
710 8 : let (storage, cancel) = create_storage()?;
711 8 :
712 45 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
713 8 : assert_eq!(
714 24 : storage.list_all().await?,
715 8 : vec![target_path_1.clone()],
716 8 : "Should list a single file after first upload"
717 8 : );
718 8 :
719 48 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
720 8 : assert_eq!(
721 24 : list_files_sorted(&storage).await?,
722 8 : vec![target_path_1.clone(), target_path_2.clone()],
723 8 : "Should list a two different files after second upload"
724 8 : );
725 8 :
726 8 : Ok(())
727 8 : }
728 :
729 : #[tokio::test]
730 8 : async fn upload_file_negatives() -> anyhow::Result<()> {
731 8 : let (storage, cancel) = create_storage()?;
732 8 :
733 8 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
734 8 : let content = Bytes::from_static(b"12345");
735 32 : let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
736 8 :
737 8 : // Check that you get an error if the size parameter doesn't match the actual
738 8 : // size of the stream.
739 8 : storage
740 8 : .upload(content(), 0, &id, None, &cancel)
741 8 : .await
742 8 : .expect_err("upload with zero size succeeded");
743 8 : storage
744 8 : .upload(content(), 4, &id, None, &cancel)
745 14 : .await
746 8 : .expect_err("upload with too short size succeeded");
747 8 : storage
748 8 : .upload(content(), 6, &id, None, &cancel)
749 16 : .await
750 8 : .expect_err("upload with too large size succeeded");
751 8 :
752 8 : // Correct size is 5, this should succeed.
753 24 : storage.upload(content(), 5, &id, None, &cancel).await?;
754 8 :
755 8 : Ok(())
756 8 : }
757 :
758 88 : fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
759 88 : let storage_root = tempdir()?.path().to_path_buf();
760 88 : LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
761 88 : }
762 :
763 : #[tokio::test]
764 8 : async fn download_file() -> anyhow::Result<()> {
765 8 : let (storage, cancel) = create_storage()?;
766 8 : let upload_name = "upload_1";
767 43 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
768 8 :
769 32 : let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
770 8 : assert_eq!(
771 8 : dummy_contents(upload_name),
772 8 : contents,
773 8 : "We should upload and download the same contents"
774 8 : );
775 8 :
776 8 : let non_existing_path = "somewhere/else";
777 8 : match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?, &cancel).await {
778 8 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
779 8 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
780 8 : }
781 8 : Ok(())
782 8 : }
783 :
784 : #[tokio::test]
785 8 : async fn download_file_range_positive() -> anyhow::Result<()> {
786 8 : let (storage, cancel) = create_storage()?;
787 8 : let upload_name = "upload_1";
788 44 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
789 8 :
790 8 : let full_range_download_contents =
791 32 : read_and_check_metadata(&storage, &upload_target, None).await?;
792 8 : assert_eq!(
793 8 : dummy_contents(upload_name),
794 8 : full_range_download_contents,
795 8 : "Download full range should return the whole upload"
796 8 : );
797 8 :
798 8 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
799 8 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
800 8 :
801 8 : let first_part_download = storage
802 8 : .download_byte_range(
803 8 : &upload_target,
804 8 : 0,
805 8 : Some(first_part_local.len() as u64),
806 8 : &cancel,
807 8 : )
808 32 : .await?;
809 8 : assert!(
810 8 : first_part_download.metadata.is_none(),
811 8 : "No metadata should be returned for no metadata upload"
812 8 : );
813 8 :
814 8 : let first_part_remote = aggregate(first_part_download.download_stream).await?;
815 8 : assert_eq!(
816 8 : first_part_local, first_part_remote,
817 8 : "First part bytes should be returned when requested"
818 8 : );
819 8 :
820 8 : let second_part_download = storage
821 8 : .download_byte_range(
822 8 : &upload_target,
823 8 : first_part_local.len() as u64,
824 8 : Some((first_part_local.len() + second_part_local.len()) as u64),
825 8 : &cancel,
826 8 : )
827 32 : .await?;
828 8 : assert!(
829 8 : second_part_download.metadata.is_none(),
830 8 : "No metadata should be returned for no metadata upload"
831 8 : );
832 8 :
833 8 : let second_part_remote = aggregate(second_part_download.download_stream).await?;
834 8 : assert_eq!(
835 8 : second_part_local, second_part_remote,
836 8 : "Second part bytes should be returned when requested"
837 8 : );
838 8 :
839 8 : let suffix_bytes = storage
840 8 : .download_byte_range(&upload_target, 13, None, &cancel)
841 32 : .await?
842 8 : .download_stream;
843 8 : let suffix_bytes = aggregate(suffix_bytes).await?;
844 8 : let suffix = std::str::from_utf8(&suffix_bytes)?;
845 8 : assert_eq!(upload_name, suffix);
846 8 :
847 8 : let all_bytes = storage
848 8 : .download_byte_range(&upload_target, 0, None, &cancel)
849 32 : .await?
850 8 : .download_stream;
851 8 : let all_bytes = aggregate(all_bytes).await?;
852 8 : let all_bytes = std::str::from_utf8(&all_bytes)?;
853 8 : assert_eq!(dummy_contents("upload_1"), all_bytes);
854 8 :
855 8 : Ok(())
856 8 : }
857 :
858 : #[tokio::test]
859 8 : async fn download_file_range_negative() -> anyhow::Result<()> {
860 8 : let (storage, cancel) = create_storage()?;
861 8 : let upload_name = "upload_1";
862 45 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
863 8 :
864 8 : let start = 1_000_000_000;
865 8 : let end = start + 1;
866 8 : match storage
867 8 : .download_byte_range(
868 8 : &upload_target,
869 8 : start,
870 8 : Some(end), // exclusive end
871 8 : &cancel,
872 8 : )
873 8 : .await
874 8 : {
875 8 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
876 8 : Err(e) => {
877 8 : let error_string = e.to_string();
878 8 : assert!(error_string.contains("zero bytes"));
879 8 : assert!(error_string.contains(&start.to_string()));
880 8 : assert!(error_string.contains(&end.to_string()));
881 8 : }
882 8 : }
883 8 :
884 8 : let start = 10000;
885 8 : let end = 234;
886 8 : assert!(start > end, "Should test an incorrect range");
887 8 : match storage
888 8 : .download_byte_range(&upload_target, start, Some(end), &cancel)
889 8 : .await
890 8 : {
891 8 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
892 8 : Err(e) => {
893 8 : let error_string = e.to_string();
894 8 : assert!(error_string.contains("Invalid range"));
895 8 : assert!(error_string.contains(&start.to_string()));
896 8 : assert!(error_string.contains(&end.to_string()));
897 8 : }
898 8 : }
899 8 :
900 8 : Ok(())
901 8 : }
902 :
903 : #[tokio::test]
904 8 : async fn delete_file() -> anyhow::Result<()> {
905 8 : let (storage, cancel) = create_storage()?;
906 8 : let upload_name = "upload_1";
907 42 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
908 8 :
909 8 : storage.delete(&upload_target, &cancel).await?;
910 21 : assert!(storage.list_all().await?.is_empty());
911 8 :
912 8 : storage
913 8 : .delete(&upload_target, &cancel)
914 8 : .await
915 8 : .expect("Should allow deleting non-existing storage files");
916 8 :
917 8 : Ok(())
918 8 : }
919 :
920 : #[tokio::test]
921 8 : async fn file_with_metadata() -> anyhow::Result<()> {
922 8 : let (storage, cancel) = create_storage()?;
923 8 : let upload_name = "upload_1";
924 8 : let metadata = StorageMetadata(HashMap::from([
925 8 : ("one".to_string(), "1".to_string()),
926 8 : ("two".to_string(), "2".to_string()),
927 8 : ]));
928 8 : let upload_target =
929 50 : upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
930 8 :
931 8 : let full_range_download_contents =
932 36 : read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
933 8 : assert_eq!(
934 8 : dummy_contents(upload_name),
935 8 : full_range_download_contents,
936 8 : "We should upload and download the same contents"
937 8 : );
938 8 :
939 8 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
940 8 : let (first_part_local, _) = uploaded_bytes.split_at(3);
941 8 :
942 8 : let partial_download_with_metadata = storage
943 8 : .download_byte_range(
944 8 : &upload_target,
945 8 : 0,
946 8 : Some(first_part_local.len() as u64),
947 8 : &cancel,
948 8 : )
949 36 : .await?;
950 8 : let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
951 8 : assert_eq!(
952 8 : first_part_local,
953 8 : first_part_remote.as_slice(),
954 8 : "First part bytes should be returned when requested"
955 8 : );
956 8 :
957 8 : assert_eq!(
958 8 : partial_download_with_metadata.metadata,
959 8 : Some(metadata),
960 8 : "We should get the same metadata back for partial download"
961 8 : );
962 8 :
963 8 : Ok(())
964 8 : }
965 :
966 : #[tokio::test]
967 8 : async fn list() -> anyhow::Result<()> {
968 8 : // No delimiter: should recursively list everything
969 8 : let (storage, cancel) = create_storage()?;
970 41 : let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
971 8 : let child_sibling =
972 43 : upload_dummy_file(&storage, "grandparent/parent/child_sibling", None, &cancel).await?;
973 42 : let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
974 8 :
975 8 : let listing = storage
976 8 : .list(None, ListingMode::NoDelimiter, None, &cancel)
977 8 : .await?;
978 8 : assert!(listing.prefixes.is_empty());
979 8 : assert_eq!(
980 8 : listing
981 8 : .keys
982 8 : .into_iter()
983 24 : .map(|o| o.key)
984 8 : .collect::<HashSet<_>>(),
985 8 : HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()])
986 8 : );
987 8 :
988 8 : // Delimiter: should only go one deep
989 8 : let listing = storage
990 8 : .list(None, ListingMode::WithDelimiter, None, &cancel)
991 8 : .await?;
992 8 :
993 8 : assert_eq!(
994 8 : listing.prefixes,
995 8 : [RemotePath::from_string("timelines").unwrap()].to_vec()
996 8 : );
997 8 : assert!(listing.keys.is_empty());
998 8 :
999 8 : // Delimiter & prefix with a trailing slash
1000 8 : let listing = storage
1001 8 : .list(
1002 8 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent/").unwrap()),
1003 8 : ListingMode::WithDelimiter,
1004 8 : None,
1005 8 : &cancel,
1006 8 : )
1007 8 : .await?;
1008 8 : assert_eq!(
1009 8 : listing.keys.into_iter().map(|o| o.key).collect::<Vec<_>>(),
1010 8 : [RemotePath::from_string("uncle").unwrap()].to_vec()
1011 8 : );
1012 8 : assert_eq!(
1013 8 : listing.prefixes,
1014 8 : [RemotePath::from_string("parent").unwrap()].to_vec()
1015 8 : );
1016 8 :
1017 8 : // Delimiter and prefix without a trailing slash
1018 8 : let listing = storage
1019 8 : .list(
1020 8 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
1021 8 : ListingMode::WithDelimiter,
1022 8 : None,
1023 8 : &cancel,
1024 8 : )
1025 8 : .await?;
1026 8 : assert_eq!(listing.keys, vec![]);
1027 8 : assert_eq!(
1028 8 : listing.prefixes,
1029 8 : [RemotePath::from_string("grandparent").unwrap()].to_vec()
1030 8 : );
1031 8 :
1032 8 : // Delimiter and prefix that's partway through a path component
1033 8 : let listing = storage
1034 8 : .list(
1035 8 : Some(&RemotePath::from_string("timelines/some_timeline/grandp").unwrap()),
1036 8 : ListingMode::WithDelimiter,
1037 8 : None,
1038 8 : &cancel,
1039 8 : )
1040 8 : .await?;
1041 8 : assert_eq!(listing.keys, vec![]);
1042 8 : assert_eq!(
1043 8 : listing.prefixes,
1044 8 : [RemotePath::from_string("grandparent").unwrap()].to_vec()
1045 8 : );
1046 8 :
1047 8 : Ok(())
1048 8 : }
1049 :
1050 : #[tokio::test]
1051 8 : async fn list_part_component() -> anyhow::Result<()> {
1052 8 : // No delimiter: should recursively list everything
1053 8 : let (storage, cancel) = create_storage()?;
1054 8 :
1055 8 : // Imitates what happens in a tenant path when we have an unsharded path and a sharded path, and do a listing
1056 8 : // of the unsharded path: although there is a "directory" at the unsharded path, it should be handled as
1057 8 : // a freeform prefix.
1058 8 : let _child_a =
1059 47 : upload_dummy_file(&storage, "grandparent/tenant-01/child", None, &cancel).await?;
1060 8 : let _child_b =
1061 48 : upload_dummy_file(&storage, "grandparent/tenant/child", None, &cancel).await?;
1062 8 :
1063 8 : // Delimiter and prefix that's partway through a path component
1064 8 : let listing = storage
1065 8 : .list(
1066 8 : Some(
1067 8 : &RemotePath::from_string("timelines/some_timeline/grandparent/tenant").unwrap(),
1068 8 : ),
1069 8 : ListingMode::WithDelimiter,
1070 8 : None,
1071 8 : &cancel,
1072 8 : )
1073 8 : .await?;
1074 8 : assert_eq!(listing.keys, vec![]);
1075 8 :
1076 8 : let mut found_prefixes = listing.prefixes.clone();
1077 8 : found_prefixes.sort();
1078 8 : assert_eq!(
1079 8 : found_prefixes,
1080 8 : [
1081 8 : RemotePath::from_string("tenant").unwrap(),
1082 8 : RemotePath::from_string("tenant-01").unwrap(),
1083 8 : ]
1084 8 : .to_vec()
1085 8 : );
1086 8 :
1087 8 : Ok(())
1088 8 : }
1089 :
1090 : #[tokio::test]
1091 8 : async fn overwrite_shorter_file() -> anyhow::Result<()> {
1092 8 : let (storage, cancel) = create_storage()?;
1093 8 :
1094 8 : let path = RemotePath::new("does/not/matter/file".into())?;
1095 8 :
1096 8 : let body = Bytes::from_static(b"long file contents is long");
1097 8 : {
1098 8 : let len = body.len();
1099 8 : let body =
1100 8 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1101 32 : storage.upload(body, len, &path, None, &cancel).await?;
1102 8 : }
1103 8 :
1104 16 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1105 8 : assert_eq!(body, read);
1106 8 :
1107 8 : let shorter = Bytes::from_static(b"shorter body");
1108 8 : {
1109 8 : let len = shorter.len();
1110 8 : let body =
1111 8 : futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
1112 24 : storage.upload(body, len, &path, None, &cancel).await?;
1113 8 : }
1114 8 :
1115 16 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1116 8 : assert_eq!(shorter, read);
1117 8 : Ok(())
1118 8 : }
1119 :
1120 : #[tokio::test]
1121 8 : async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
1122 8 : let (storage, cancel) = create_storage()?;
1123 8 :
1124 8 : let path = RemotePath::new("does/not/matter/file".into())?;
1125 8 :
1126 8 : let body = Bytes::from_static(b"long file contents is long");
1127 8 : {
1128 8 : let len = body.len();
1129 8 : let body =
1130 8 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1131 8 : let cancel = cancel.child_token();
1132 8 : cancel.cancel();
1133 8 : let e = storage
1134 8 : .upload(body, len, &path, None, &cancel)
1135 24 : .await
1136 8 : .unwrap_err();
1137 8 :
1138 8 : assert!(TimeoutOrCancel::caused_by_cancel(&e));
1139 8 : }
1140 8 :
1141 8 : {
1142 8 : let len = body.len();
1143 8 : let body =
1144 8 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1145 22 : storage.upload(body, len, &path, None, &cancel).await?;
1146 8 : }
1147 8 :
1148 14 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1149 8 : assert_eq!(body, read);
1150 8 :
1151 8 : Ok(())
1152 8 : }
1153 :
1154 96 : async fn upload_dummy_file(
1155 96 : storage: &LocalFs,
1156 96 : name: &str,
1157 96 : metadata: Option<StorageMetadata>,
1158 96 : cancel: &CancellationToken,
1159 96 : ) -> anyhow::Result<RemotePath> {
1160 96 : let from_path = storage
1161 96 : .storage_root
1162 96 : .join("timelines")
1163 96 : .join("some_timeline")
1164 96 : .join(name);
1165 96 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
1166 :
1167 96 : let relative_path = from_path
1168 96 : .strip_prefix(&storage.storage_root)
1169 96 : .context("Failed to strip storage root prefix")
1170 96 : .and_then(RemotePath::new)
1171 96 : .with_context(|| {
1172 0 : format!(
1173 0 : "Failed to resolve remote part of path {:?} for base {:?}",
1174 0 : from_path, storage.storage_root
1175 0 : )
1176 96 : })?;
1177 :
1178 96 : let file = tokio_util::io::ReaderStream::new(file);
1179 96 :
1180 96 : storage
1181 96 : .upload(file, size, &relative_path, metadata, cancel)
1182 443 : .await?;
1183 96 : Ok(relative_path)
1184 96 : }
1185 :
1186 96 : async fn create_file_for_upload(
1187 96 : path: &Utf8Path,
1188 96 : contents: &str,
1189 96 : ) -> anyhow::Result<(fs::File, usize)> {
1190 96 : std::fs::create_dir_all(path.parent().unwrap())?;
1191 96 : let mut file_for_writing = std::fs::OpenOptions::new()
1192 96 : .write(true)
1193 96 : .create_new(true)
1194 96 : .open(path)?;
1195 96 : write!(file_for_writing, "{}", contents)?;
1196 96 : drop(file_for_writing);
1197 96 : let file_size = path.metadata()?.len() as usize;
1198 96 : Ok((
1199 96 : fs::OpenOptions::new().read(true).open(&path).await?,
1200 96 : file_size,
1201 : ))
1202 96 : }
1203 :
1204 144 : fn dummy_contents(name: &str) -> String {
1205 144 : format!("contents for {name}")
1206 144 : }
1207 :
1208 8 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
1209 24 : let mut files = storage.list_all().await?;
1210 8 : files.sort_by(|a, b| a.0.cmp(&b.0));
1211 8 : Ok(files)
1212 8 : }
1213 :
1214 88 : async fn aggregate(
1215 88 : stream: impl Stream<Item = std::io::Result<Bytes>>,
1216 88 : ) -> anyhow::Result<Vec<u8>> {
1217 88 : use futures::stream::StreamExt;
1218 88 : let mut out = Vec::new();
1219 88 : let mut stream = std::pin::pin!(stream);
1220 176 : while let Some(res) = stream.next().await {
1221 88 : out.extend_from_slice(&res?[..]);
1222 : }
1223 88 : Ok(out)
1224 88 : }
1225 : }
|