TLA Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin};
8 :
9 : use anyhow::{bail, ensure, Context};
10 : use bytes::Bytes;
11 : use camino::{Utf8Path, Utf8PathBuf};
12 : use futures::stream::Stream;
13 : use tokio::{
14 : fs,
15 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
16 : };
17 : use tokio_util::io::ReaderStream;
18 : use tracing::*;
19 : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
20 :
21 : use crate::{Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath};
22 :
23 : use super::{RemoteStorage, StorageMetadata};
24 :
25 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
26 :
27 CBC 4034 : #[derive(Debug, Clone)]
28 : pub struct LocalFs {
29 : storage_root: Utf8PathBuf,
30 : }
31 :
32 : impl LocalFs {
33 : /// Attempts to create local FS storage, along with its root directory.
34 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
35 414 : pub fn new(mut storage_root: Utf8PathBuf) -> anyhow::Result<Self> {
36 414 : if !storage_root.exists() {
37 10 : std::fs::create_dir_all(&storage_root).with_context(|| {
38 UBC 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
39 CBC 10 : })?;
40 404 : }
41 414 : if !storage_root.is_absolute() {
42 45 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
43 UBC 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
44 CBC 45 : })?;
45 369 : }
46 :
47 414 : Ok(Self { storage_root })
48 414 : }
49 :
50 : // mirrors S3Bucket::s3_object_to_relative_path
51 170 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
52 170 : let relative_path = key
53 170 : .strip_prefix(&self.storage_root)
54 170 : .expect("relative path must contain storage_root as prefix");
55 170 : RemotePath(relative_path.into())
56 170 : }
57 :
58 874 : async fn read_storage_metadata(
59 874 : &self,
60 874 : file_path: &Utf8Path,
61 874 : ) -> anyhow::Result<Option<StorageMetadata>> {
62 874 : let metadata_path = storage_metadata_path(file_path);
63 874 : if metadata_path.exists() && metadata_path.is_file() {
64 2 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
65 UBC 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
66 CBC 2 : })?;
67 :
68 2 : serde_json::from_str(&metadata_string)
69 2 : .with_context(|| {
70 UBC 0 : format!(
71 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
72 0 : )
73 CBC 2 : })
74 2 : .map(|metadata| Some(StorageMetadata(metadata)))
75 : } else {
76 872 : Ok(None)
77 : }
78 874 : }
79 :
80 : #[cfg(test)]
81 3 : async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
82 3 : Ok(get_all_files(&self.storage_root, true)
83 9 : .await?
84 3 : .into_iter()
85 3 : .map(|path| {
86 3 : path.strip_prefix(&self.storage_root)
87 3 : .context("Failed to strip storage root prefix")
88 3 : .and_then(RemotePath::new)
89 3 : .expect(
90 3 : "We list files for storage root, hence should be able to remote the prefix",
91 3 : )
92 3 : })
93 3 : .collect())
94 3 : }
95 :
96 : // recursively lists all files in a directory,
97 : // mirroring the `list_files` for `s3_bucket`
98 122 : async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
99 122 : let full_path = match folder {
100 121 : Some(folder) => folder.with_base(&self.storage_root),
101 1 : None => self.storage_root.clone(),
102 : };
103 :
104 : // If we were given a directory, we may use it as our starting point.
105 : // Otherwise, we must go up to the first ancestor dir that exists. This is because
106 : // S3 object list prefixes can be arbitrary strings, but when reading
107 : // the local filesystem we need a directory to start calling read_dir on.
108 122 : let mut initial_dir = full_path.clone();
109 224 : loop {
110 224 : // Did we make it to the root?
111 224 : if initial_dir.parent().is_none() {
112 UBC 0 : anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
113 CBC 224 : }
114 224 :
115 224 : match fs::metadata(initial_dir.clone()).await {
116 125 : Ok(meta) if meta.is_dir() => {
117 122 : // We found a directory, break
118 122 : break;
119 : }
120 3 : Ok(_meta) => {
121 3 : // It's not a directory: strip back to the parent
122 3 : initial_dir.pop();
123 3 : }
124 99 : Err(e) if e.kind() == ErrorKind::NotFound => {
125 99 : // It's not a file that exists: strip the prefix back to the parent directory
126 99 : initial_dir.pop();
127 99 : }
128 UBC 0 : Err(e) => {
129 0 : // Unexpected I/O error
130 0 : anyhow::bail!(e)
131 : }
132 : }
133 : }
134 : // Note that Utf8PathBuf starts_with only considers full path segments, but
135 : // object prefixes are arbitrary strings, so we need the strings for doing
136 : // starts_with later.
137 CBC 122 : let prefix = full_path.as_str();
138 122 :
139 122 : let mut files = vec![];
140 122 : let mut directory_queue = vec![initial_dir];
141 248 : while let Some(cur_folder) = directory_queue.pop() {
142 126 : let mut entries = cur_folder.read_dir_utf8()?;
143 2808 : while let Some(Ok(entry)) = entries.next() {
144 2682 : let file_name = entry.file_name();
145 2682 : let full_file_name = cur_folder.join(file_name);
146 2682 : if full_file_name.as_str().starts_with(prefix) {
147 170 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
148 170 : files.push(file_remote_path);
149 170 : if full_file_name.is_dir() {
150 4 : directory_queue.push(full_file_name);
151 166 : }
152 2512 : }
153 : }
154 : }
155 :
156 122 : Ok(files)
157 122 : }
158 : }
159 :
160 : #[async_trait::async_trait]
161 : impl RemoteStorage for LocalFs {
162 287 : async fn list(
163 287 : &self,
164 287 : prefix: Option<&RemotePath>,
165 287 : mode: ListingMode,
166 287 : ) -> Result<Listing, DownloadError> {
167 287 : let mut result = Listing::default();
168 287 :
169 287 : if let ListingMode::NoDelimiter = mode {
170 122 : let keys = self
171 122 : .list_recursive(prefix)
172 219 : .await
173 122 : .map_err(DownloadError::Other)?;
174 :
175 122 : result.keys = keys
176 122 : .into_iter()
177 170 : .filter(|k| {
178 170 : let path = k.with_base(&self.storage_root);
179 170 : !path.is_dir()
180 170 : })
181 122 : .collect();
182 122 :
183 122 : return Ok(result);
184 165 : }
185 :
186 165 : let path = match prefix {
187 164 : Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
188 1 : None => Cow::Borrowed(&self.storage_root),
189 : };
190 :
191 165 : let prefixes_to_filter = get_all_files(path.as_ref(), false)
192 164 : .await
193 165 : .map_err(DownloadError::Other)?;
194 :
195 : // filter out empty directories to mirror s3 behavior.
196 386 : for prefix in prefixes_to_filter {
197 221 : if prefix.is_dir()
198 220 : && is_directory_empty(&prefix)
199 214 : .await
200 220 : .map_err(DownloadError::Other)?
201 : {
202 4 : continue;
203 217 : }
204 217 :
205 217 : let stripped = prefix
206 217 : .strip_prefix(&self.storage_root)
207 217 : .context("Failed to strip prefix")
208 217 : .and_then(RemotePath::new)
209 217 : .expect(
210 217 : "We list files for storage root, hence should be able to remote the prefix",
211 217 : );
212 217 :
213 217 : if prefix.is_dir() {
214 216 : result.prefixes.push(stripped);
215 216 : } else {
216 1 : result.keys.push(stripped);
217 1 : }
218 : }
219 :
220 165 : Ok(result)
221 574 : }
222 :
223 10273 : async fn upload(
224 10273 : &self,
225 10273 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
226 10273 : data_size_bytes: usize,
227 10273 : to: &RemotePath,
228 10273 : metadata: Option<StorageMetadata>,
229 10273 : ) -> anyhow::Result<()> {
230 10273 : let target_file_path = to.with_base(&self.storage_root);
231 10273 : create_target_directory(&target_file_path).await?;
232 : // We need this dance with sort of durable rename (without fsyncs)
233 : // to prevent partial uploads. This was really hit when pageserver shutdown
234 : // cancelled the upload and partial file was left on the fs
235 : // NOTE: Because temp file suffix always the same this operation is racy.
236 : // Two concurrent operations can lead to the following sequence:
237 : // T1: write(temp)
238 : // T2: write(temp) -> overwrites the content
239 : // T1: rename(temp, dst) -> succeeds
240 : // T2: rename(temp, dst) -> fails, temp no longet exists
241 : // This can be solved by supplying unique temp suffix every time, but this situation
242 : // is not normal in the first place, the error can help (and helped at least once)
243 : // to discover bugs in upper level synchronization.
244 10273 : let temp_file_path =
245 10273 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
246 10270 : let mut destination = io::BufWriter::new(
247 10273 : fs::OpenOptions::new()
248 10273 : .write(true)
249 10273 : .create(true)
250 10273 : .open(&temp_file_path)
251 10105 : .await
252 10270 : .with_context(|| {
253 UBC 0 : format!("Failed to open target fs destination at '{target_file_path}'")
254 CBC 10270 : })?,
255 : );
256 :
257 10270 : let from_size_bytes = data_size_bytes as u64;
258 10270 : let data = tokio_util::io::StreamReader::new(data);
259 10270 : let data = std::pin::pin!(data);
260 10270 : let mut buffer_to_read = data.take(from_size_bytes);
261 :
262 : // alternatively we could just write the bytes to a file, but local_fs is a testing utility
263 10270 : let bytes_read = io::copy_buf(&mut buffer_to_read, &mut destination)
264 744589 : .await
265 10270 : .with_context(|| {
266 UBC 0 : format!(
267 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
268 0 : )
269 CBC 10270 : })?;
270 :
271 10270 : if bytes_read < from_size_bytes {
272 1 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
273 10269 : }
274 10269 : // Check if there is any extra data after the given size.
275 10269 : let mut from = buffer_to_read.into_inner();
276 10269 : let extra_read = from.read(&mut [1]).await?;
277 : ensure!(
278 10269 : extra_read == 0,
279 2 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
280 : );
281 :
282 10267 : destination.flush().await.with_context(|| {
283 UBC 0 : format!(
284 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
285 0 : )
286 CBC 10267 : })?;
287 :
288 10267 : fs::rename(temp_file_path, &target_file_path)
289 9994 : .await
290 10267 : .with_context(|| {
291 LBC (1) : format!(
292 (1) : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
293 (1) : )
294 CBC 10267 : })?;
295 :
296 10267 : if let Some(storage_metadata) = metadata {
297 1 : let storage_metadata_path = storage_metadata_path(&target_file_path);
298 1 : fs::write(
299 1 : &storage_metadata_path,
300 1 : serde_json::to_string(&storage_metadata.0)
301 1 : .context("Failed to serialize storage metadata as json")?,
302 : )
303 1 : .await
304 1 : .with_context(|| {
305 UBC 0 : format!(
306 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
307 0 : )
308 CBC 1 : })?;
309 10266 : }
310 :
311 10267 : Ok(())
312 20543 : }
313 :
314 1189 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
315 1189 : let target_path = from.with_base(&self.storage_root);
316 1189 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
317 869 : let source = ReaderStream::new(
318 869 : fs::OpenOptions::new()
319 869 : .read(true)
320 869 : .open(&target_path)
321 853 : .await
322 869 : .with_context(|| {
323 UBC 0 : format!("Failed to open source file {target_path:?} to use in the download")
324 CBC 869 : })
325 869 : .map_err(DownloadError::Other)?,
326 : );
327 :
328 869 : let metadata = self
329 869 : .read_storage_metadata(&target_path)
330 1 : .await
331 869 : .map_err(DownloadError::Other)?;
332 869 : Ok(Download {
333 869 : metadata,
334 869 : last_modified: None,
335 869 : etag: None,
336 869 : download_stream: Box::pin(source),
337 869 : })
338 : } else {
339 320 : Err(DownloadError::NotFound)
340 : }
341 2378 : }
342 :
343 7 : async fn download_byte_range(
344 7 : &self,
345 7 : from: &RemotePath,
346 7 : start_inclusive: u64,
347 7 : end_exclusive: Option<u64>,
348 7 : ) -> Result<Download, DownloadError> {
349 7 : if let Some(end_exclusive) = end_exclusive {
350 5 : if end_exclusive <= start_inclusive {
351 1 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
352 4 : };
353 4 : if start_inclusive == end_exclusive.saturating_sub(1) {
354 1 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
355 3 : }
356 2 : }
357 5 : let target_path = from.with_base(&self.storage_root);
358 5 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
359 5 : let mut source = tokio::fs::OpenOptions::new()
360 5 : .read(true)
361 5 : .open(&target_path)
362 5 : .await
363 5 : .with_context(|| {
364 UBC 0 : format!("Failed to open source file {target_path:?} to use in the download")
365 CBC 5 : })
366 5 : .map_err(DownloadError::Other)?;
367 5 : source
368 5 : .seek(io::SeekFrom::Start(start_inclusive))
369 5 : .await
370 5 : .context("Failed to seek to the range start in a local storage file")
371 5 : .map_err(DownloadError::Other)?;
372 5 : let metadata = self
373 5 : .read_storage_metadata(&target_path)
374 GBC 1 : .await
375 CBC 5 : .map_err(DownloadError::Other)?;
376 :
377 5 : let download_stream: DownloadStream = match end_exclusive {
378 3 : Some(end_exclusive) => Box::pin(ReaderStream::new(
379 3 : source.take(end_exclusive - start_inclusive),
380 3 : )),
381 2 : None => Box::pin(ReaderStream::new(source)),
382 : };
383 5 : Ok(Download {
384 5 : metadata,
385 5 : last_modified: None,
386 5 : etag: None,
387 5 : download_stream,
388 5 : })
389 : } else {
390 UBC 0 : Err(DownloadError::NotFound)
391 : }
392 CBC 14 : }
393 :
394 1208 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
395 1208 : let file_path = path.with_base(&self.storage_root);
396 1208 : match fs::remove_file(&file_path).await {
397 648 : Ok(()) => Ok(()),
398 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
399 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
400 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
401 560 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
402 UBC 0 : Err(e) => Err(anyhow::anyhow!(e)),
403 : }
404 CBC 2416 : }
405 :
406 50 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
407 1253 : for path in paths {
408 1203 : self.delete(path).await?
409 : }
410 50 : Ok(())
411 100 : }
412 :
413 UBC 0 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
414 0 : let from_path = from.with_base(&self.storage_root);
415 0 : let to_path = to.with_base(&self.storage_root);
416 0 : create_target_directory(&to_path).await?;
417 0 : fs::copy(&from_path, &to_path).await.with_context(|| {
418 0 : format!(
419 0 : "Failed to copy file from '{from_path}' to '{to_path}'",
420 0 : from_path = from_path,
421 0 : to_path = to_path
422 0 : )
423 0 : })?;
424 0 : Ok(())
425 0 : }
426 : }
427 :
428 CBC 875 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
429 875 : path_with_suffix_extension(original_path, "metadata")
430 875 : }
431 :
432 174 : fn get_all_files<'a, P>(
433 174 : directory_path: P,
434 174 : recursive: bool,
435 174 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
436 174 : where
437 174 : P: AsRef<Utf8Path> + Send + Sync + 'a,
438 174 : {
439 174 : Box::pin(async move {
440 174 : let directory_path = directory_path.as_ref();
441 174 : if directory_path.exists() {
442 174 : if directory_path.is_dir() {
443 174 : let mut paths = Vec::new();
444 174 : let mut dir_contents = fs::read_dir(directory_path).await?;
445 404 : while let Some(dir_entry) = dir_contents.next_entry().await? {
446 230 : let file_type = dir_entry.file_type().await?;
447 230 : let entry_path =
448 230 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
449 UBC 0 : anyhow::Error::msg(format!(
450 0 : "non-Unicode path: {}",
451 0 : pb.to_string_lossy()
452 0 : ))
453 CBC 230 : })?;
454 230 : if file_type.is_symlink() {
455 UBC 0 : debug!("{entry_path:?} is a symlink, skipping")
456 CBC 230 : } else if file_type.is_dir() {
457 226 : if recursive {
458 9 : paths.extend(get_all_files(&entry_path, true).await?.into_iter())
459 : } else {
460 220 : paths.push(entry_path)
461 : }
462 4 : } else {
463 4 : paths.push(entry_path);
464 4 : }
465 : }
466 174 : Ok(paths)
467 : } else {
468 UBC 0 : bail!("Path {directory_path:?} is not a directory")
469 : }
470 : } else {
471 0 : Ok(Vec::new())
472 : }
473 CBC 174 : })
474 174 : }
475 :
476 10273 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
477 10273 : let target_dir = match target_file_path.parent() {
478 10273 : Some(parent_dir) => parent_dir,
479 UBC 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
480 : };
481 CBC 10273 : if !target_dir.exists() {
482 712 : fs::create_dir_all(target_dir).await?;
483 9561 : }
484 10273 : Ok(())
485 10273 : }
486 :
487 1194 : fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
488 1194 : if file_path.exists() {
489 874 : ensure!(file_path.is_file(), "file path '{file_path}' is not a file");
490 874 : Ok(true)
491 : } else {
492 320 : Ok(false)
493 : }
494 1194 : }
495 :
496 : #[cfg(test)]
497 : mod fs_tests {
498 : use super::*;
499 :
500 : use bytes::Bytes;
501 : use camino_tempfile::tempdir;
502 : use futures_util::Stream;
503 : use std::{collections::HashMap, io::Write};
504 :
505 3 : async fn read_and_assert_remote_file_contents(
506 3 : storage: &LocalFs,
507 3 : #[allow(clippy::ptr_arg)]
508 3 : // have to use &Utf8PathBuf due to `storage.local_path` parameter requirements
509 3 : remote_storage_path: &RemotePath,
510 3 : expected_metadata: Option<&StorageMetadata>,
511 3 : ) -> anyhow::Result<String> {
512 3 : let download = storage
513 3 : .download(remote_storage_path)
514 4 : .await
515 3 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
516 : ensure!(
517 3 : download.metadata.as_ref() == expected_metadata,
518 UBC 0 : "Unexpected metadata returned for the downloaded file"
519 : );
520 :
521 CBC 6 : let contents = aggregate(download.download_stream).await?;
522 :
523 3 : String::from_utf8(contents).map_err(anyhow::Error::new)
524 3 : }
525 :
526 1 : #[tokio::test]
527 1 : async fn upload_file() -> anyhow::Result<()> {
528 1 : let storage = create_storage()?;
529 :
530 6 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
531 1 : assert_eq!(
532 3 : storage.list_all().await?,
533 1 : vec![target_path_1.clone()],
534 UBC 0 : "Should list a single file after first upload"
535 : );
536 :
537 CBC 6 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
538 1 : assert_eq!(
539 3 : list_files_sorted(&storage).await?,
540 1 : vec![target_path_1.clone(), target_path_2.clone()],
541 UBC 0 : "Should list a two different files after second upload"
542 : );
543 :
544 CBC 1 : Ok(())
545 : }
546 :
547 1 : #[tokio::test]
548 1 : async fn upload_file_negatives() -> anyhow::Result<()> {
549 1 : let storage = create_storage()?;
550 :
551 1 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
552 1 : let content = Bytes::from_static(b"12345");
553 4 : let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
554 :
555 : // Check that you get an error if the size parameter doesn't match the actual
556 : // size of the stream.
557 1 : storage
558 1 : .upload(content(), 0, &id, None)
559 1 : .await
560 1 : .expect_err("upload with zero size succeeded");
561 1 : storage
562 1 : .upload(content(), 4, &id, None)
563 2 : .await
564 1 : .expect_err("upload with too short size succeeded");
565 1 : storage
566 1 : .upload(content(), 6, &id, None)
567 2 : .await
568 1 : .expect_err("upload with too large size succeeded");
569 1 :
570 1 : // Correct size is 5, this should succeed.
571 3 : storage.upload(content(), 5, &id, None).await?;
572 :
573 1 : Ok(())
574 : }
575 :
576 8 : fn create_storage() -> anyhow::Result<LocalFs> {
577 8 : let storage_root = tempdir()?.path().to_path_buf();
578 8 : LocalFs::new(storage_root)
579 8 : }
580 :
581 1 : #[tokio::test]
582 1 : async fn download_file() -> anyhow::Result<()> {
583 1 : let storage = create_storage()?;
584 1 : let upload_name = "upload_1";
585 5 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
586 :
587 3 : let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
588 1 : assert_eq!(
589 1 : dummy_contents(upload_name),
590 : contents,
591 UBC 0 : "We should upload and download the same contents"
592 : );
593 :
594 CBC 1 : let non_existing_path = "somewhere/else";
595 1 : match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?).await {
596 1 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
597 UBC 0 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
598 : }
599 CBC 1 : Ok(())
600 : }
601 :
602 1 : #[tokio::test]
603 1 : async fn download_file_range_positive() -> anyhow::Result<()> {
604 1 : let storage = create_storage()?;
605 1 : let upload_name = "upload_1";
606 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
607 :
608 1 : let full_range_download_contents =
609 3 : read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
610 1 : assert_eq!(
611 1 : dummy_contents(upload_name),
612 : full_range_download_contents,
613 UBC 0 : "Download full range should return the whole upload"
614 : );
615 :
616 CBC 1 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
617 1 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
618 :
619 1 : let first_part_download = storage
620 1 : .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
621 2 : .await?;
622 1 : assert!(
623 1 : first_part_download.metadata.is_none(),
624 UBC 0 : "No metadata should be returned for no metadata upload"
625 : );
626 :
627 CBC 1 : let first_part_remote = aggregate(first_part_download.download_stream).await?;
628 1 : assert_eq!(
629 : first_part_local, first_part_remote,
630 UBC 0 : "First part bytes should be returned when requested"
631 : );
632 :
633 CBC 1 : let second_part_download = storage
634 1 : .download_byte_range(
635 1 : &upload_target,
636 1 : first_part_local.len() as u64,
637 1 : Some((first_part_local.len() + second_part_local.len()) as u64),
638 1 : )
639 2 : .await?;
640 1 : assert!(
641 1 : second_part_download.metadata.is_none(),
642 UBC 0 : "No metadata should be returned for no metadata upload"
643 : );
644 :
645 CBC 1 : let second_part_remote = aggregate(second_part_download.download_stream).await?;
646 1 : assert_eq!(
647 : second_part_local, second_part_remote,
648 UBC 0 : "Second part bytes should be returned when requested"
649 : );
650 :
651 CBC 1 : Ok(())
652 : }
653 :
654 1 : #[tokio::test]
655 1 : async fn download_file_range_negative() -> anyhow::Result<()> {
656 1 : let storage = create_storage()?;
657 1 : let upload_name = "upload_1";
658 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
659 :
660 1 : let start = 1_000_000_000;
661 1 : let end = start + 1;
662 1 : match storage
663 1 : .download_byte_range(
664 1 : &upload_target,
665 1 : start,
666 1 : Some(end), // exclusive end
667 1 : )
668 UBC 0 : .await
669 : {
670 0 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
671 CBC 1 : Err(e) => {
672 1 : let error_string = e.to_string();
673 1 : assert!(error_string.contains("zero bytes"));
674 1 : assert!(error_string.contains(&start.to_string()));
675 1 : assert!(error_string.contains(&end.to_string()));
676 : }
677 : }
678 :
679 1 : let start = 10000;
680 1 : let end = 234;
681 1 : assert!(start > end, "Should test an incorrect range");
682 1 : match storage
683 1 : .download_byte_range(&upload_target, start, Some(end))
684 UBC 0 : .await
685 : {
686 0 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
687 CBC 1 : Err(e) => {
688 1 : let error_string = e.to_string();
689 1 : assert!(error_string.contains("Invalid range"));
690 1 : assert!(error_string.contains(&start.to_string()));
691 1 : assert!(error_string.contains(&end.to_string()));
692 : }
693 : }
694 :
695 1 : Ok(())
696 : }
697 :
698 1 : #[tokio::test]
699 1 : async fn delete_file() -> anyhow::Result<()> {
700 1 : let storage = create_storage()?;
701 1 : let upload_name = "upload_1";
702 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
703 :
704 1 : storage.delete(&upload_target).await?;
705 3 : assert!(storage.list_all().await?.is_empty());
706 :
707 1 : storage
708 1 : .delete(&upload_target)
709 1 : .await
710 1 : .expect("Should allow deleting non-existing storage files");
711 1 :
712 1 : Ok(())
713 : }
714 :
715 1 : #[tokio::test]
716 1 : async fn file_with_metadata() -> anyhow::Result<()> {
717 1 : let storage = create_storage()?;
718 1 : let upload_name = "upload_1";
719 1 : let metadata = StorageMetadata(HashMap::from([
720 1 : ("one".to_string(), "1".to_string()),
721 1 : ("two".to_string(), "2".to_string()),
722 1 : ]));
723 1 : let upload_target =
724 7 : upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
725 :
726 1 : let full_range_download_contents =
727 4 : read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
728 1 : assert_eq!(
729 1 : dummy_contents(upload_name),
730 : full_range_download_contents,
731 UBC 0 : "We should upload and download the same contents"
732 : );
733 :
734 CBC 1 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
735 1 : let (first_part_local, _) = uploaded_bytes.split_at(3);
736 :
737 1 : let partial_download_with_metadata = storage
738 1 : .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
739 3 : .await?;
740 1 : let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
741 1 : assert_eq!(
742 1 : first_part_local,
743 1 : first_part_remote.as_slice(),
744 UBC 0 : "First part bytes should be returned when requested"
745 : );
746 :
747 CBC 1 : assert_eq!(
748 1 : partial_download_with_metadata.metadata,
749 1 : Some(metadata),
750 UBC 0 : "We should get the same metadata back for partial download"
751 : );
752 :
753 CBC 1 : Ok(())
754 : }
755 :
756 1 : #[tokio::test]
757 1 : async fn list() -> anyhow::Result<()> {
758 : // No delimiter: should recursively list everything
759 1 : let storage = create_storage()?;
760 6 : let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?;
761 6 : let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?;
762 :
763 1 : let listing = storage.list(None, ListingMode::NoDelimiter).await?;
764 1 : assert!(listing.prefixes.is_empty());
765 1 : assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
766 :
767 : // Delimiter: should only go one deep
768 2 : let listing = storage.list(None, ListingMode::WithDelimiter).await?;
769 :
770 1 : assert_eq!(
771 1 : listing.prefixes,
772 1 : [RemotePath::from_string("timelines").unwrap()].to_vec()
773 1 : );
774 1 : assert!(listing.keys.is_empty());
775 :
776 : // Delimiter & prefix
777 1 : let listing = storage
778 1 : .list(
779 1 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
780 1 : ListingMode::WithDelimiter,
781 1 : )
782 2 : .await?;
783 1 : assert_eq!(
784 1 : listing.prefixes,
785 1 : [RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
786 1 : .to_vec()
787 1 : );
788 1 : assert_eq!(listing.keys, [uncle.clone()].to_vec());
789 :
790 1 : Ok(())
791 : }
792 :
793 9 : async fn upload_dummy_file(
794 9 : storage: &LocalFs,
795 9 : name: &str,
796 9 : metadata: Option<StorageMetadata>,
797 9 : ) -> anyhow::Result<RemotePath> {
798 9 : let from_path = storage
799 9 : .storage_root
800 9 : .join("timelines")
801 9 : .join("some_timeline")
802 9 : .join(name);
803 9 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
804 :
805 9 : let relative_path = from_path
806 9 : .strip_prefix(&storage.storage_root)
807 9 : .context("Failed to strip storage root prefix")
808 9 : .and_then(RemotePath::new)
809 9 : .with_context(|| {
810 UBC 0 : format!(
811 0 : "Failed to resolve remote part of path {:?} for base {:?}",
812 0 : from_path, storage.storage_root
813 0 : )
814 CBC 9 : })?;
815 :
816 9 : let file = tokio_util::io::ReaderStream::new(file);
817 9 :
818 45 : storage.upload(file, size, &relative_path, metadata).await?;
819 9 : Ok(relative_path)
820 9 : }
821 :
822 9 : async fn create_file_for_upload(
823 9 : path: &Utf8Path,
824 9 : contents: &str,
825 9 : ) -> anyhow::Result<(fs::File, usize)> {
826 9 : std::fs::create_dir_all(path.parent().unwrap())?;
827 9 : let mut file_for_writing = std::fs::OpenOptions::new()
828 9 : .write(true)
829 9 : .create_new(true)
830 9 : .open(path)?;
831 9 : write!(file_for_writing, "{}", contents)?;
832 9 : drop(file_for_writing);
833 9 : let file_size = path.metadata()?.len() as usize;
834 9 : Ok((
835 9 : fs::OpenOptions::new().read(true).open(&path).await?,
836 9 : file_size,
837 : ))
838 9 : }
839 :
840 14 : fn dummy_contents(name: &str) -> String {
841 14 : format!("contents for {name}")
842 14 : }
843 :
844 1 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
845 3 : let mut files = storage.list_all().await?;
846 1 : files.sort_by(|a, b| a.0.cmp(&b.0));
847 1 : Ok(files)
848 1 : }
849 :
850 6 : async fn aggregate(
851 6 : stream: impl Stream<Item = std::io::Result<Bytes>>,
852 6 : ) -> anyhow::Result<Vec<u8>> {
853 6 : use futures::stream::StreamExt;
854 6 : let mut out = Vec::new();
855 6 : let mut stream = std::pin::pin!(stream);
856 12 : while let Some(res) = stream.next().await {
857 6 : out.extend_from_slice(&res?[..]);
858 : }
859 6 : Ok(out)
860 6 : }
861 : }
|