Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{
8 : borrow::Cow, future::Future, io::ErrorKind, num::NonZeroU32, pin::Pin, time::SystemTime,
9 : };
10 :
11 : use anyhow::{bail, ensure, Context};
12 : use bytes::Bytes;
13 : use camino::{Utf8Path, Utf8PathBuf};
14 : use futures::stream::Stream;
15 : use tokio::{
16 : fs,
17 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
18 : };
19 : use tokio_util::{io::ReaderStream, sync::CancellationToken};
20 : use tracing::*;
21 : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
22 :
23 : use crate::{Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError};
24 :
25 : use super::{RemoteStorage, StorageMetadata};
26 :
27 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
28 :
29 4778 : #[derive(Debug, Clone)]
30 : pub struct LocalFs {
31 : storage_root: Utf8PathBuf,
32 : }
33 :
34 : impl LocalFs {
35 : /// Attempts to create local FS storage, along with its root directory.
36 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
37 496 : pub fn new(mut storage_root: Utf8PathBuf) -> anyhow::Result<Self> {
38 496 : if !storage_root.exists() {
39 17 : std::fs::create_dir_all(&storage_root).with_context(|| {
40 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
41 17 : })?;
42 479 : }
43 496 : if !storage_root.is_absolute() {
44 88 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
45 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
46 88 : })?;
47 408 : }
48 :
49 496 : Ok(Self { storage_root })
50 496 : }
51 :
52 : // mirrors S3Bucket::s3_object_to_relative_path
53 239 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
54 239 : let relative_path = key
55 239 : .strip_prefix(&self.storage_root)
56 239 : .expect("relative path must contain storage_root as prefix");
57 239 : RemotePath(relative_path.into())
58 239 : }
59 :
60 1003 : async fn read_storage_metadata(
61 1003 : &self,
62 1003 : file_path: &Utf8Path,
63 1003 : ) -> anyhow::Result<Option<StorageMetadata>> {
64 1003 : let metadata_path = storage_metadata_path(file_path);
65 1003 : if metadata_path.exists() && metadata_path.is_file() {
66 4 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
67 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
68 4 : })?;
69 :
70 4 : serde_json::from_str(&metadata_string)
71 4 : .with_context(|| {
72 0 : format!(
73 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
74 0 : )
75 4 : })
76 4 : .map(|metadata| Some(StorageMetadata(metadata)))
77 : } else {
78 999 : Ok(None)
79 : }
80 1003 : }
81 :
82 : #[cfg(test)]
83 6 : async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
84 6 : Ok(get_all_files(&self.storage_root, true)
85 18 : .await?
86 6 : .into_iter()
87 6 : .map(|path| {
88 6 : path.strip_prefix(&self.storage_root)
89 6 : .context("Failed to strip storage root prefix")
90 6 : .and_then(RemotePath::new)
91 6 : .expect(
92 6 : "We list files for storage root, hence should be able to remote the prefix",
93 6 : )
94 6 : })
95 6 : .collect())
96 6 : }
97 :
98 : // recursively lists all files in a directory,
99 : // mirroring the `list_files` for `s3_bucket`
100 184 : async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
101 184 : let full_path = match folder {
102 182 : Some(folder) => folder.with_base(&self.storage_root),
103 2 : None => self.storage_root.clone(),
104 : };
105 :
106 : // If we were given a directory, we may use it as our starting point.
107 : // Otherwise, we must go up to the first ancestor dir that exists. This is because
108 : // S3 object list prefixes can be arbitrary strings, but when reading
109 : // the local filesystem we need a directory to start calling read_dir on.
110 184 : let mut initial_dir = full_path.clone();
111 334 : loop {
112 334 : // Did we make it to the root?
113 334 : if initial_dir.parent().is_none() {
114 0 : anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
115 334 : }
116 334 :
117 334 : match fs::metadata(initial_dir.clone()).await {
118 190 : Ok(meta) if meta.is_dir() => {
119 184 : // We found a directory, break
120 184 : break;
121 : }
122 6 : Ok(_meta) => {
123 6 : // It's not a directory: strip back to the parent
124 6 : initial_dir.pop();
125 6 : }
126 144 : Err(e) if e.kind() == ErrorKind::NotFound => {
127 144 : // It's not a file that exists: strip the prefix back to the parent directory
128 144 : initial_dir.pop();
129 144 : }
130 0 : Err(e) => {
131 0 : // Unexpected I/O error
132 0 : anyhow::bail!(e)
133 : }
134 : }
135 : }
136 : // Note that Utf8PathBuf starts_with only considers full path segments, but
137 : // object prefixes are arbitrary strings, so we need the strings for doing
138 : // starts_with later.
139 184 : let prefix = full_path.as_str();
140 184 :
141 184 : let mut files = vec![];
142 184 : let mut directory_queue = vec![initial_dir];
143 376 : while let Some(cur_folder) = directory_queue.pop() {
144 192 : let mut entries = cur_folder.read_dir_utf8()?;
145 3192 : while let Some(Ok(entry)) = entries.next() {
146 3000 : let file_name = entry.file_name();
147 3000 : let full_file_name = cur_folder.join(file_name);
148 3000 : if full_file_name.as_str().starts_with(prefix) {
149 239 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
150 239 : files.push(file_remote_path);
151 239 : if full_file_name.is_dir() {
152 8 : directory_queue.push(full_file_name);
153 231 : }
154 2761 : }
155 : }
156 : }
157 :
158 184 : Ok(files)
159 184 : }
160 : }
161 :
162 : impl RemoteStorage for LocalFs {
163 761 : async fn list(
164 761 : &self,
165 761 : prefix: Option<&RemotePath>,
166 761 : mode: ListingMode,
167 761 : max_keys: Option<NonZeroU32>,
168 761 : ) -> Result<Listing, DownloadError> {
169 761 : let mut result = Listing::default();
170 761 :
171 761 : if let ListingMode::NoDelimiter = mode {
172 184 : let keys = self
173 184 : .list_recursive(prefix)
174 332 : .await
175 184 : .map_err(DownloadError::Other)?;
176 :
177 184 : result.keys = keys
178 184 : .into_iter()
179 239 : .filter(|k| {
180 239 : let path = k.with_base(&self.storage_root);
181 239 : !path.is_dir()
182 239 : })
183 184 : .collect();
184 184 : if let Some(max_keys) = max_keys {
185 0 : result.keys.truncate(max_keys.get() as usize);
186 184 : }
187 :
188 184 : return Ok(result);
189 577 : }
190 :
191 577 : let path = match prefix {
192 575 : Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
193 2 : None => Cow::Borrowed(&self.storage_root),
194 : };
195 :
196 577 : let prefixes_to_filter = get_all_files(path.as_ref(), false)
197 216 : .await
198 577 : .map_err(DownloadError::Other)?;
199 :
200 : // filter out empty directories to mirror s3 behavior.
201 854 : for prefix in prefixes_to_filter {
202 277 : if prefix.is_dir()
203 275 : && is_directory_empty(&prefix)
204 267 : .await
205 275 : .map_err(DownloadError::Other)?
206 : {
207 6 : continue;
208 271 : }
209 271 :
210 271 : let stripped = prefix
211 271 : .strip_prefix(&self.storage_root)
212 271 : .context("Failed to strip prefix")
213 271 : .and_then(RemotePath::new)
214 271 : .expect(
215 271 : "We list files for storage root, hence should be able to remote the prefix",
216 271 : );
217 271 :
218 271 : if prefix.is_dir() {
219 269 : result.prefixes.push(stripped);
220 269 : } else {
221 2 : result.keys.push(stripped);
222 2 : }
223 : }
224 :
225 577 : Ok(result)
226 761 : }
227 :
228 12468 : async fn upload(
229 12468 : &self,
230 12468 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
231 12468 : data_size_bytes: usize,
232 12468 : to: &RemotePath,
233 12468 : metadata: Option<StorageMetadata>,
234 12468 : ) -> anyhow::Result<()> {
235 12468 : let target_file_path = to.with_base(&self.storage_root);
236 12468 : create_target_directory(&target_file_path).await?;
237 : // We need this dance with sort of durable rename (without fsyncs)
238 : // to prevent partial uploads. This was really hit when pageserver shutdown
239 : // cancelled the upload and partial file was left on the fs
240 : // NOTE: Because temp file suffix always the same this operation is racy.
241 : // Two concurrent operations can lead to the following sequence:
242 : // T1: write(temp)
243 : // T2: write(temp) -> overwrites the content
244 : // T1: rename(temp, dst) -> succeeds
245 : // T2: rename(temp, dst) -> fails, temp no longet exists
246 : // This can be solved by supplying unique temp suffix every time, but this situation
247 : // is not normal in the first place, the error can help (and helped at least once)
248 : // to discover bugs in upper level synchronization.
249 12467 : let temp_file_path =
250 12467 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
251 12464 : let mut destination = io::BufWriter::new(
252 12467 : fs::OpenOptions::new()
253 12467 : .write(true)
254 12467 : .create(true)
255 12467 : .open(&temp_file_path)
256 12297 : .await
257 12464 : .with_context(|| {
258 0 : format!("Failed to open target fs destination at '{target_file_path}'")
259 12464 : })?,
260 : );
261 :
262 12464 : let from_size_bytes = data_size_bytes as u64;
263 12464 : let data = tokio_util::io::StreamReader::new(data);
264 12464 : let data = std::pin::pin!(data);
265 12464 : let mut buffer_to_read = data.take(from_size_bytes);
266 :
267 : // alternatively we could just write the bytes to a file, but local_fs is a testing utility
268 12464 : let bytes_read = io::copy_buf(&mut buffer_to_read, &mut destination)
269 910790 : .await
270 12462 : .with_context(|| {
271 0 : format!(
272 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
273 0 : )
274 12462 : })?;
275 :
276 12462 : if bytes_read < from_size_bytes {
277 2 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
278 12460 : }
279 12460 : // Check if there is any extra data after the given size.
280 12460 : let mut from = buffer_to_read.into_inner();
281 12460 : let extra_read = from.read(&mut [1]).await?;
282 12460 : ensure!(
283 12460 : extra_read == 0,
284 4 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
285 : );
286 :
287 12456 : destination.flush().await.with_context(|| {
288 0 : format!(
289 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
290 0 : )
291 12456 : })?;
292 :
293 12456 : fs::rename(temp_file_path, &target_file_path)
294 12175 : .await
295 12454 : .with_context(|| {
296 0 : format!(
297 0 : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
298 0 : )
299 12454 : })?;
300 :
301 12454 : if let Some(storage_metadata) = metadata {
302 2 : let storage_metadata_path = storage_metadata_path(&target_file_path);
303 2 : fs::write(
304 2 : &storage_metadata_path,
305 2 : serde_json::to_string(&storage_metadata.0)
306 2 : .context("Failed to serialize storage metadata as json")?,
307 : )
308 2 : .await
309 2 : .with_context(|| {
310 0 : format!(
311 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
312 0 : )
313 2 : })?;
314 12452 : }
315 :
316 12454 : Ok(())
317 12460 : }
318 :
319 1411 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
320 1411 : let target_path = from.with_base(&self.storage_root);
321 1411 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
322 991 : let source = ReaderStream::new(
323 991 : fs::OpenOptions::new()
324 991 : .read(true)
325 991 : .open(&target_path)
326 973 : .await
327 991 : .with_context(|| {
328 0 : format!("Failed to open source file {target_path:?} to use in the download")
329 991 : })
330 991 : .map_err(DownloadError::Other)?,
331 : );
332 :
333 991 : let metadata = self
334 991 : .read_storage_metadata(&target_path)
335 2 : .await
336 991 : .map_err(DownloadError::Other)?;
337 991 : Ok(Download {
338 991 : metadata,
339 991 : last_modified: None,
340 991 : etag: None,
341 991 : download_stream: Box::pin(source),
342 991 : })
343 : } else {
344 420 : Err(DownloadError::NotFound)
345 : }
346 1411 : }
347 :
348 16 : async fn download_byte_range(
349 16 : &self,
350 16 : from: &RemotePath,
351 16 : start_inclusive: u64,
352 16 : end_exclusive: Option<u64>,
353 16 : ) -> Result<Download, DownloadError> {
354 16 : if let Some(end_exclusive) = end_exclusive {
355 10 : if end_exclusive <= start_inclusive {
356 2 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
357 8 : };
358 8 : if start_inclusive == end_exclusive.saturating_sub(1) {
359 2 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
360 6 : }
361 6 : }
362 12 : let target_path = from.with_base(&self.storage_root);
363 12 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
364 12 : let mut source = tokio::fs::OpenOptions::new()
365 12 : .read(true)
366 12 : .open(&target_path)
367 12 : .await
368 12 : .with_context(|| {
369 0 : format!("Failed to open source file {target_path:?} to use in the download")
370 12 : })
371 12 : .map_err(DownloadError::Other)?;
372 :
373 12 : let len = source
374 12 : .metadata()
375 12 : .await
376 12 : .context("query file length")
377 12 : .map_err(DownloadError::Other)?
378 12 : .len();
379 12 :
380 12 : source
381 12 : .seek(io::SeekFrom::Start(start_inclusive))
382 12 : .await
383 12 : .context("Failed to seek to the range start in a local storage file")
384 12 : .map_err(DownloadError::Other)?;
385 :
386 12 : let metadata = self
387 12 : .read_storage_metadata(&target_path)
388 2 : .await
389 12 : .map_err(DownloadError::Other)?;
390 :
391 12 : let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
392 12 : let source = ReaderStream::new(source);
393 12 :
394 12 : Ok(Download {
395 12 : metadata,
396 12 : last_modified: None,
397 12 : etag: None,
398 12 : download_stream: Box::pin(source),
399 12 : })
400 : } else {
401 0 : Err(DownloadError::NotFound)
402 : }
403 16 : }
404 :
405 1164 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
406 1164 : let file_path = path.with_base(&self.storage_root);
407 1164 : match fs::remove_file(&file_path).await {
408 587 : Ok(()) => Ok(()),
409 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
410 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
411 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
412 577 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
413 0 : Err(e) => Err(anyhow::anyhow!(e)),
414 : }
415 1164 : }
416 :
417 79 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
418 1223 : for path in paths {
419 1144 : self.delete(path).await?
420 : }
421 79 : Ok(())
422 79 : }
423 :
424 2 : async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
425 2 : let from_path = from.with_base(&self.storage_root);
426 2 : let to_path = to.with_base(&self.storage_root);
427 2 : create_target_directory(&to_path).await?;
428 2 : fs::copy(&from_path, &to_path).await.with_context(|| {
429 0 : format!(
430 0 : "Failed to copy file from '{from_path}' to '{to_path}'",
431 0 : from_path = from_path,
432 0 : to_path = to_path
433 0 : )
434 2 : })?;
435 2 : Ok(())
436 2 : }
437 :
438 : #[allow(clippy::diverging_sub_expression)]
439 0 : async fn time_travel_recover(
440 0 : &self,
441 0 : _prefix: Option<&RemotePath>,
442 0 : _timestamp: SystemTime,
443 0 : _done_if_after: SystemTime,
444 0 : _cancel: &CancellationToken,
445 0 : ) -> Result<(), TimeTravelError> {
446 0 : Err(TimeTravelError::Unimplemented)
447 0 : }
448 : }
449 :
450 1005 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
451 1005 : path_with_suffix_extension(original_path, "metadata")
452 1005 : }
453 :
454 595 : fn get_all_files<'a, P>(
455 595 : directory_path: P,
456 595 : recursive: bool,
457 595 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
458 595 : where
459 595 : P: AsRef<Utf8Path> + Send + Sync + 'a,
460 595 : {
461 595 : Box::pin(async move {
462 595 : let directory_path = directory_path.as_ref();
463 595 : if directory_path.exists() {
464 235 : if directory_path.is_dir() {
465 235 : let mut paths = Vec::new();
466 235 : let mut dir_contents = fs::read_dir(directory_path).await?;
467 530 : while let Some(dir_entry) = dir_contents.next_entry().await? {
468 295 : let file_type = dir_entry.file_type().await?;
469 295 : let entry_path =
470 295 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
471 0 : anyhow::Error::msg(format!(
472 0 : "non-Unicode path: {}",
473 0 : pb.to_string_lossy()
474 0 : ))
475 295 : })?;
476 295 : if file_type.is_symlink() {
477 0 : debug!("{entry_path:?} is a symlink, skipping")
478 295 : } else if file_type.is_dir() {
479 287 : if recursive {
480 18 : paths.extend(get_all_files(&entry_path, true).await?.into_iter())
481 : } else {
482 275 : paths.push(entry_path)
483 : }
484 8 : } else {
485 8 : paths.push(entry_path);
486 8 : }
487 : }
488 235 : Ok(paths)
489 : } else {
490 0 : bail!("Path {directory_path:?} is not a directory")
491 : }
492 : } else {
493 360 : Ok(Vec::new())
494 : }
495 595 : })
496 595 : }
497 :
498 12470 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
499 12470 : let target_dir = match target_file_path.parent() {
500 12470 : Some(parent_dir) => parent_dir,
501 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
502 : };
503 12470 : if !target_dir.exists() {
504 926 : fs::create_dir_all(target_dir).await?;
505 11544 : }
506 12469 : Ok(())
507 12469 : }
508 :
509 1423 : fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
510 1423 : if file_path.exists() {
511 1003 : ensure!(file_path.is_file(), "file path '{file_path}' is not a file");
512 1003 : Ok(true)
513 : } else {
514 420 : Ok(false)
515 : }
516 1423 : }
517 :
518 : #[cfg(test)]
519 : mod fs_tests {
520 : use super::*;
521 :
522 : use bytes::Bytes;
523 : use camino_tempfile::tempdir;
524 : use futures_util::Stream;
525 : use std::{collections::HashMap, io::Write};
526 :
527 6 : async fn read_and_check_metadata(
528 6 : storage: &LocalFs,
529 6 : remote_storage_path: &RemotePath,
530 6 : expected_metadata: Option<&StorageMetadata>,
531 6 : ) -> anyhow::Result<String> {
532 6 : let download = storage
533 6 : .download(remote_storage_path)
534 8 : .await
535 6 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
536 6 : ensure!(
537 6 : download.metadata.as_ref() == expected_metadata,
538 0 : "Unexpected metadata returned for the downloaded file"
539 : );
540 :
541 12 : let contents = aggregate(download.download_stream).await?;
542 :
543 6 : String::from_utf8(contents).map_err(anyhow::Error::new)
544 6 : }
545 :
546 2 : #[tokio::test]
547 2 : async fn upload_file() -> anyhow::Result<()> {
548 2 : let storage = create_storage()?;
549 2 :
550 12 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
551 2 : assert_eq!(
552 6 : storage.list_all().await?,
553 2 : vec![target_path_1.clone()],
554 2 : "Should list a single file after first upload"
555 2 : );
556 2 :
557 12 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
558 2 : assert_eq!(
559 6 : list_files_sorted(&storage).await?,
560 2 : vec![target_path_1.clone(), target_path_2.clone()],
561 2 : "Should list a two different files after second upload"
562 2 : );
563 2 :
564 2 : Ok(())
565 2 : }
566 :
567 2 : #[tokio::test]
568 2 : async fn upload_file_negatives() -> anyhow::Result<()> {
569 2 : let storage = create_storage()?;
570 2 :
571 2 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
572 2 : let content = Bytes::from_static(b"12345");
573 8 : let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
574 2 :
575 2 : // Check that you get an error if the size parameter doesn't match the actual
576 2 : // size of the stream.
577 2 : storage
578 2 : .upload(content(), 0, &id, None)
579 2 : .await
580 2 : .expect_err("upload with zero size succeeded");
581 2 : storage
582 2 : .upload(content(), 4, &id, None)
583 4 : .await
584 2 : .expect_err("upload with too short size succeeded");
585 2 : storage
586 2 : .upload(content(), 6, &id, None)
587 4 : .await
588 2 : .expect_err("upload with too large size succeeded");
589 2 :
590 2 : // Correct size is 5, this should succeed.
591 6 : storage.upload(content(), 5, &id, None).await?;
592 2 :
593 2 : Ok(())
594 2 : }
595 :
596 16 : fn create_storage() -> anyhow::Result<LocalFs> {
597 16 : let storage_root = tempdir()?.path().to_path_buf();
598 16 : LocalFs::new(storage_root)
599 16 : }
600 :
601 2 : #[tokio::test]
602 2 : async fn download_file() -> anyhow::Result<()> {
603 2 : let storage = create_storage()?;
604 2 : let upload_name = "upload_1";
605 12 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
606 2 :
607 6 : let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
608 2 : assert_eq!(
609 2 : dummy_contents(upload_name),
610 2 : contents,
611 2 : "We should upload and download the same contents"
612 2 : );
613 2 :
614 2 : let non_existing_path = "somewhere/else";
615 2 : match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?).await {
616 2 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
617 2 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
618 2 : }
619 2 : Ok(())
620 2 : }
621 :
622 2 : #[tokio::test]
623 2 : async fn download_file_range_positive() -> anyhow::Result<()> {
624 2 : let storage = create_storage()?;
625 2 : let upload_name = "upload_1";
626 12 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
627 2 :
628 2 : let full_range_download_contents =
629 6 : read_and_check_metadata(&storage, &upload_target, None).await?;
630 2 : assert_eq!(
631 2 : dummy_contents(upload_name),
632 2 : full_range_download_contents,
633 2 : "Download full range should return the whole upload"
634 2 : );
635 2 :
636 2 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
637 2 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
638 2 :
639 2 : let first_part_download = storage
640 2 : .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
641 6 : .await?;
642 2 : assert!(
643 2 : first_part_download.metadata.is_none(),
644 2 : "No metadata should be returned for no metadata upload"
645 2 : );
646 2 :
647 2 : let first_part_remote = aggregate(first_part_download.download_stream).await?;
648 2 : assert_eq!(
649 2 : first_part_local, first_part_remote,
650 2 : "First part bytes should be returned when requested"
651 2 : );
652 2 :
653 2 : let second_part_download = storage
654 2 : .download_byte_range(
655 2 : &upload_target,
656 2 : first_part_local.len() as u64,
657 2 : Some((first_part_local.len() + second_part_local.len()) as u64),
658 2 : )
659 6 : .await?;
660 2 : assert!(
661 2 : second_part_download.metadata.is_none(),
662 2 : "No metadata should be returned for no metadata upload"
663 2 : );
664 2 :
665 2 : let second_part_remote = aggregate(second_part_download.download_stream).await?;
666 2 : assert_eq!(
667 2 : second_part_local, second_part_remote,
668 2 : "Second part bytes should be returned when requested"
669 2 : );
670 2 :
671 2 : let suffix_bytes = storage
672 2 : .download_byte_range(&upload_target, 13, None)
673 6 : .await?
674 2 : .download_stream;
675 2 : let suffix_bytes = aggregate(suffix_bytes).await?;
676 2 : let suffix = std::str::from_utf8(&suffix_bytes)?;
677 2 : assert_eq!(upload_name, suffix);
678 2 :
679 2 : let all_bytes = storage
680 2 : .download_byte_range(&upload_target, 0, None)
681 6 : .await?
682 2 : .download_stream;
683 2 : let all_bytes = aggregate(all_bytes).await?;
684 2 : let all_bytes = std::str::from_utf8(&all_bytes)?;
685 2 : assert_eq!(dummy_contents("upload_1"), all_bytes);
686 2 :
687 2 : Ok(())
688 2 : }
689 :
690 2 : #[tokio::test]
691 2 : async fn download_file_range_negative() -> anyhow::Result<()> {
692 2 : let storage = create_storage()?;
693 2 : let upload_name = "upload_1";
694 12 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
695 2 :
696 2 : let start = 1_000_000_000;
697 2 : let end = start + 1;
698 2 : match storage
699 2 : .download_byte_range(
700 2 : &upload_target,
701 2 : start,
702 2 : Some(end), // exclusive end
703 2 : )
704 2 : .await
705 2 : {
706 2 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
707 2 : Err(e) => {
708 2 : let error_string = e.to_string();
709 2 : assert!(error_string.contains("zero bytes"));
710 2 : assert!(error_string.contains(&start.to_string()));
711 2 : assert!(error_string.contains(&end.to_string()));
712 2 : }
713 2 : }
714 2 :
715 2 : let start = 10000;
716 2 : let end = 234;
717 2 : assert!(start > end, "Should test an incorrect range");
718 2 : match storage
719 2 : .download_byte_range(&upload_target, start, Some(end))
720 2 : .await
721 2 : {
722 2 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
723 2 : Err(e) => {
724 2 : let error_string = e.to_string();
725 2 : assert!(error_string.contains("Invalid range"));
726 2 : assert!(error_string.contains(&start.to_string()));
727 2 : assert!(error_string.contains(&end.to_string()));
728 2 : }
729 2 : }
730 2 :
731 2 : Ok(())
732 2 : }
733 :
734 2 : #[tokio::test]
735 2 : async fn delete_file() -> anyhow::Result<()> {
736 2 : let storage = create_storage()?;
737 2 : let upload_name = "upload_1";
738 12 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
739 2 :
740 2 : storage.delete(&upload_target).await?;
741 6 : assert!(storage.list_all().await?.is_empty());
742 2 :
743 2 : storage
744 2 : .delete(&upload_target)
745 2 : .await
746 2 : .expect("Should allow deleting non-existing storage files");
747 2 :
748 2 : Ok(())
749 2 : }
750 :
751 2 : #[tokio::test]
752 2 : async fn file_with_metadata() -> anyhow::Result<()> {
753 2 : let storage = create_storage()?;
754 2 : let upload_name = "upload_1";
755 2 : let metadata = StorageMetadata(HashMap::from([
756 2 : ("one".to_string(), "1".to_string()),
757 2 : ("two".to_string(), "2".to_string()),
758 2 : ]));
759 2 : let upload_target =
760 14 : upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
761 2 :
762 2 : let full_range_download_contents =
763 8 : read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
764 2 : assert_eq!(
765 2 : dummy_contents(upload_name),
766 2 : full_range_download_contents,
767 2 : "We should upload and download the same contents"
768 2 : );
769 2 :
770 2 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
771 2 : let (first_part_local, _) = uploaded_bytes.split_at(3);
772 2 :
773 2 : let partial_download_with_metadata = storage
774 2 : .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
775 8 : .await?;
776 2 : let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
777 2 : assert_eq!(
778 2 : first_part_local,
779 2 : first_part_remote.as_slice(),
780 2 : "First part bytes should be returned when requested"
781 2 : );
782 2 :
783 2 : assert_eq!(
784 2 : partial_download_with_metadata.metadata,
785 2 : Some(metadata),
786 2 : "We should get the same metadata back for partial download"
787 2 : );
788 2 :
789 2 : Ok(())
790 2 : }
791 :
792 2 : #[tokio::test]
793 2 : async fn list() -> anyhow::Result<()> {
794 2 : // No delimiter: should recursively list everything
795 2 : let storage = create_storage()?;
796 7 : let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?;
797 12 : let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?;
798 2 :
799 2 : let listing = storage.list(None, ListingMode::NoDelimiter, None).await?;
800 2 : assert!(listing.prefixes.is_empty());
801 2 : assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
802 2 :
803 2 : // Delimiter: should only go one deep
804 4 : let listing = storage.list(None, ListingMode::WithDelimiter, None).await?;
805 2 :
806 2 : assert_eq!(
807 2 : listing.prefixes,
808 2 : [RemotePath::from_string("timelines").unwrap()].to_vec()
809 2 : );
810 2 : assert!(listing.keys.is_empty());
811 2 :
812 2 : // Delimiter & prefix
813 2 : let listing = storage
814 2 : .list(
815 2 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
816 2 : ListingMode::WithDelimiter,
817 2 : None,
818 2 : )
819 4 : .await?;
820 2 : assert_eq!(
821 2 : listing.prefixes,
822 2 : [RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
823 2 : .to_vec()
824 2 : );
825 2 : assert_eq!(listing.keys, [uncle.clone()].to_vec());
826 2 :
827 2 : Ok(())
828 2 : }
829 :
830 18 : async fn upload_dummy_file(
831 18 : storage: &LocalFs,
832 18 : name: &str,
833 18 : metadata: Option<StorageMetadata>,
834 18 : ) -> anyhow::Result<RemotePath> {
835 18 : let from_path = storage
836 18 : .storage_root
837 18 : .join("timelines")
838 18 : .join("some_timeline")
839 18 : .join(name);
840 18 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
841 :
842 18 : let relative_path = from_path
843 18 : .strip_prefix(&storage.storage_root)
844 18 : .context("Failed to strip storage root prefix")
845 18 : .and_then(RemotePath::new)
846 18 : .with_context(|| {
847 0 : format!(
848 0 : "Failed to resolve remote part of path {:?} for base {:?}",
849 0 : from_path, storage.storage_root
850 0 : )
851 18 : })?;
852 :
853 18 : let file = tokio_util::io::ReaderStream::new(file);
854 18 :
855 87 : storage.upload(file, size, &relative_path, metadata).await?;
856 18 : Ok(relative_path)
857 18 : }
858 :
859 18 : async fn create_file_for_upload(
860 18 : path: &Utf8Path,
861 18 : contents: &str,
862 18 : ) -> anyhow::Result<(fs::File, usize)> {
863 18 : std::fs::create_dir_all(path.parent().unwrap())?;
864 18 : let mut file_for_writing = std::fs::OpenOptions::new()
865 18 : .write(true)
866 18 : .create_new(true)
867 18 : .open(path)?;
868 18 : write!(file_for_writing, "{}", contents)?;
869 18 : drop(file_for_writing);
870 18 : let file_size = path.metadata()?.len() as usize;
871 18 : Ok((
872 18 : fs::OpenOptions::new().read(true).open(&path).await?,
873 18 : file_size,
874 : ))
875 18 : }
876 :
877 30 : fn dummy_contents(name: &str) -> String {
878 30 : format!("contents for {name}")
879 30 : }
880 :
881 2 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
882 6 : let mut files = storage.list_all().await?;
883 2 : files.sort_by(|a, b| a.0.cmp(&b.0));
884 2 : Ok(files)
885 2 : }
886 :
887 16 : async fn aggregate(
888 16 : stream: impl Stream<Item = std::io::Result<Bytes>>,
889 16 : ) -> anyhow::Result<Vec<u8>> {
890 16 : use futures::stream::StreamExt;
891 16 : let mut out = Vec::new();
892 16 : let mut stream = std::pin::pin!(stream);
893 32 : while let Some(res) = stream.next().await {
894 16 : out.extend_from_slice(&res?[..]);
895 : }
896 16 : Ok(out)
897 16 : }
898 : }
|