TLA Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin};
8 :
9 : use anyhow::{bail, ensure, Context};
10 : use camino::{Utf8Path, Utf8PathBuf};
11 : use tokio::{
12 : fs,
13 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
14 : };
15 : use tracing::*;
16 : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
17 :
18 : use crate::{Download, DownloadError, RemotePath};
19 :
20 : use super::{RemoteStorage, StorageMetadata};
21 :
22 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
23 :
24 CBC 3221 : #[derive(Debug, Clone)]
25 : pub struct LocalFs {
26 : storage_root: Utf8PathBuf,
27 : }
28 :
29 : impl LocalFs {
30 : /// Attempts to create local FS storage, along with its root directory.
31 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
32 382 : pub fn new(mut storage_root: Utf8PathBuf) -> anyhow::Result<Self> {
33 382 : if !storage_root.exists() {
34 218 : std::fs::create_dir_all(&storage_root).with_context(|| {
35 UBC 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
36 CBC 218 : })?;
37 164 : }
38 382 : if !storage_root.is_absolute() {
39 41 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
40 UBC 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
41 CBC 41 : })?;
42 341 : }
43 :
44 382 : Ok(Self { storage_root })
45 382 : }
46 :
47 : // mirrors S3Bucket::s3_object_to_relative_path
48 39 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
49 39 : let relative_path = key
50 39 : .strip_prefix(&self.storage_root)
51 39 : .expect("relative path must contain storage_root as prefix");
52 39 : RemotePath(relative_path.into())
53 39 : }
54 :
55 826 : async fn read_storage_metadata(
56 826 : &self,
57 826 : file_path: &Utf8Path,
58 826 : ) -> anyhow::Result<Option<StorageMetadata>> {
59 826 : let metadata_path = storage_metadata_path(file_path);
60 826 : if metadata_path.exists() && metadata_path.is_file() {
61 2 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
62 UBC 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
63 CBC 2 : })?;
64 :
65 2 : serde_json::from_str(&metadata_string)
66 2 : .with_context(|| {
67 UBC 0 : format!(
68 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
69 0 : )
70 CBC 2 : })
71 2 : .map(|metadata| Some(StorageMetadata(metadata)))
72 : } else {
73 824 : Ok(None)
74 : }
75 826 : }
76 :
77 : #[cfg(test)]
78 3 : async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
79 3 : Ok(get_all_files(&self.storage_root, true)
80 18 : .await?
81 3 : .into_iter()
82 3 : .map(|path| {
83 3 : path.strip_prefix(&self.storage_root)
84 3 : .context("Failed to strip storage root prefix")
85 3 : .and_then(RemotePath::new)
86 3 : .expect(
87 3 : "We list files for storage root, hence should be able to remote the prefix",
88 3 : )
89 3 : })
90 3 : .collect())
91 3 : }
92 : }
93 :
94 : #[async_trait::async_trait]
95 : impl RemoteStorage for LocalFs {
96 20 : async fn list_prefixes(
97 20 : &self,
98 20 : prefix: Option<&RemotePath>,
99 20 : ) -> Result<Vec<RemotePath>, DownloadError> {
100 20 : let path = match prefix {
101 20 : Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
102 UBC 0 : None => Cow::Borrowed(&self.storage_root),
103 : };
104 :
105 CBC 20 : let prefixes_to_filter = get_all_files(path.as_ref(), false)
106 40 : .await
107 20 : .map_err(DownloadError::Other)?;
108 :
109 20 : let mut prefixes = Vec::with_capacity(prefixes_to_filter.len());
110 :
111 : // filter out empty directories to mirror s3 behavior.
112 48 : for prefix in prefixes_to_filter {
113 28 : if prefix.is_dir()
114 28 : && is_directory_empty(&prefix)
115 30 : .await
116 28 : .map_err(DownloadError::Other)?
117 : {
118 2 : continue;
119 26 : }
120 26 :
121 26 : prefixes.push(
122 26 : prefix
123 26 : .strip_prefix(&self.storage_root)
124 26 : .context("Failed to strip prefix")
125 26 : .and_then(RemotePath::new)
126 26 : .expect(
127 26 : "We list files for storage root, hence should be able to remote the prefix",
128 26 : ),
129 26 : )
130 : }
131 :
132 20 : Ok(prefixes)
133 40 : }
134 :
135 : // recursively lists all files in a directory,
136 : // mirroring the `list_files` for `s3_bucket`
137 30 : async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
138 30 : let full_path = match folder {
139 30 : Some(folder) => folder.with_base(&self.storage_root),
140 UBC 0 : None => self.storage_root.clone(),
141 : };
142 :
143 : // If we were given a directory, we may use it as our starting point.
144 : // Otherwise, we must go up to the parent directory. This is because
145 : // S3 object list prefixes can be arbitrary strings, but when reading
146 : // the local filesystem we need a directory to start calling read_dir on.
147 CBC 30 : let mut initial_dir = full_path.clone();
148 30 : match fs::metadata(full_path.clone()).await {
149 29 : Ok(meta) => {
150 29 : if !meta.is_dir() {
151 3 : // It's not a directory: strip back to the parent
152 3 : initial_dir.pop();
153 26 : }
154 : }
155 1 : Err(e) if e.kind() == ErrorKind::NotFound => {
156 1 : // It's not a file that exists: strip the prefix back to the parent directory
157 1 : initial_dir.pop();
158 1 : }
159 UBC 0 : Err(e) => {
160 0 : // Unexpected I/O error
161 0 : anyhow::bail!(e)
162 : }
163 : }
164 :
165 : // Note that Utf8PathBuf starts_with only considers full path segments, but
166 : // object prefixes are arbitrary strings, so we need the strings for doing
167 : // starts_with later.
168 CBC 30 : let prefix = full_path.as_str();
169 30 :
170 30 : let mut files = vec![];
171 30 : let mut directory_queue = vec![initial_dir];
172 60 : while let Some(cur_folder) = directory_queue.pop() {
173 30 : let mut entries = cur_folder.read_dir_utf8()?;
174 75 : while let Some(Ok(entry)) = entries.next() {
175 45 : let file_name = entry.file_name();
176 45 : let full_file_name = cur_folder.join(file_name);
177 45 : if full_file_name.as_str().starts_with(prefix) {
178 39 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
179 39 : files.push(file_remote_path);
180 39 : if full_file_name.is_dir() {
181 UBC 0 : directory_queue.push(full_file_name);
182 CBC 39 : }
183 6 : }
184 : }
185 : }
186 :
187 30 : Ok(files)
188 60 : }
189 :
190 10270 : async fn upload(
191 10270 : &self,
192 10270 : data: impl io::AsyncRead + Unpin + Send + Sync + 'static,
193 10270 : data_size_bytes: usize,
194 10270 : to: &RemotePath,
195 10270 : metadata: Option<StorageMetadata>,
196 10270 : ) -> anyhow::Result<()> {
197 10270 : let target_file_path = to.with_base(&self.storage_root);
198 10270 : create_target_directory(&target_file_path).await?;
199 : // We need this dance with sort of durable rename (without fsyncs)
200 : // to prevent partial uploads. This was really hit when pageserver shutdown
201 : // cancelled the upload and partial file was left on the fs
202 : // NOTE: Because temp file suffix always the same this operation is racy.
203 : // Two concurrent operations can lead to the following sequence:
204 : // T1: write(temp)
205 : // T2: write(temp) -> overwrites the content
206 : // T1: rename(temp, dst) -> succeeds
207 : // T2: rename(temp, dst) -> fails, temp no longet exists
208 : // This can be solved by supplying unique temp suffix every time, but this situation
209 : // is not normal in the first place, the error can help (and helped at least once)
210 : // to discover bugs in upper level synchronization.
211 10270 : let temp_file_path =
212 10270 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
213 10268 : let mut destination = io::BufWriter::new(
214 10270 : fs::OpenOptions::new()
215 10270 : .write(true)
216 10270 : .create(true)
217 10270 : .open(&temp_file_path)
218 10183 : .await
219 10268 : .with_context(|| {
220 UBC 0 : format!("Failed to open target fs destination at '{target_file_path}'")
221 CBC 10268 : })?,
222 : );
223 :
224 10268 : let from_size_bytes = data_size_bytes as u64;
225 10268 : let mut buffer_to_read = data.take(from_size_bytes);
226 :
227 10268 : let bytes_read = io::copy(&mut buffer_to_read, &mut destination)
228 3164262 : .await
229 10265 : .with_context(|| {
230 UBC 0 : format!(
231 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
232 0 : )
233 CBC 10265 : })?;
234 :
235 10265 : if bytes_read < from_size_bytes {
236 1 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
237 10264 : }
238 10264 : // Check if there is any extra data after the given size.
239 10264 : let mut from = buffer_to_read.into_inner();
240 10264 : let extra_read = from.read(&mut [1]).await?;
241 10264 : ensure!(
242 10264 : extra_read == 0,
243 2 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
244 : );
245 :
246 10262 : destination.flush().await.with_context(|| {
247 UBC 0 : format!(
248 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
249 0 : )
250 CBC 10262 : })?;
251 :
252 10262 : fs::rename(temp_file_path, &target_file_path)
253 10224 : .await
254 10262 : .with_context(|| {
255 LBC (1) : format!(
256 (1) : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
257 (1) : )
258 CBC 10262 : })?;
259 :
260 10262 : if let Some(storage_metadata) = metadata {
261 1 : let storage_metadata_path = storage_metadata_path(&target_file_path);
262 1 : fs::write(
263 1 : &storage_metadata_path,
264 1 : serde_json::to_string(&storage_metadata.0)
265 1 : .context("Failed to serialize storage metadata as json")?,
266 : )
267 1 : .await
268 1 : .with_context(|| {
269 UBC 0 : format!(
270 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
271 0 : )
272 CBC 1 : })?;
273 10261 : }
274 :
275 10262 : Ok(())
276 20535 : }
277 :
278 1255 : async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
279 1255 : let target_path = from.with_base(&self.storage_root);
280 1255 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
281 821 : let source = io::BufReader::new(
282 821 : fs::OpenOptions::new()
283 821 : .read(true)
284 821 : .open(&target_path)
285 788 : .await
286 821 : .with_context(|| {
287 UBC 0 : format!("Failed to open source file {target_path:?} to use in the download")
288 CBC 821 : })
289 821 : .map_err(DownloadError::Other)?,
290 : );
291 :
292 821 : let metadata = self
293 821 : .read_storage_metadata(&target_path)
294 1 : .await
295 821 : .map_err(DownloadError::Other)?;
296 821 : Ok(Download {
297 821 : metadata,
298 821 : download_stream: Box::pin(source),
299 821 : })
300 : } else {
301 434 : Err(DownloadError::NotFound)
302 : }
303 2510 : }
304 :
305 7 : async fn download_byte_range(
306 7 : &self,
307 7 : from: &RemotePath,
308 7 : start_inclusive: u64,
309 7 : end_exclusive: Option<u64>,
310 7 : ) -> Result<Download, DownloadError> {
311 7 : if let Some(end_exclusive) = end_exclusive {
312 5 : if end_exclusive <= start_inclusive {
313 1 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
314 4 : };
315 4 : if start_inclusive == end_exclusive.saturating_sub(1) {
316 1 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
317 3 : }
318 2 : }
319 5 : let target_path = from.with_base(&self.storage_root);
320 5 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
321 5 : let mut source = io::BufReader::new(
322 5 : fs::OpenOptions::new()
323 5 : .read(true)
324 5 : .open(&target_path)
325 5 : .await
326 5 : .with_context(|| {
327 UBC 0 : format!("Failed to open source file {target_path:?} to use in the download")
328 CBC 5 : })
329 5 : .map_err(DownloadError::Other)?,
330 : );
331 5 : source
332 5 : .seek(io::SeekFrom::Start(start_inclusive))
333 5 : .await
334 5 : .context("Failed to seek to the range start in a local storage file")
335 5 : .map_err(DownloadError::Other)?;
336 5 : let metadata = self
337 5 : .read_storage_metadata(&target_path)
338 1 : .await
339 5 : .map_err(DownloadError::Other)?;
340 :
341 5 : Ok(match end_exclusive {
342 3 : Some(end_exclusive) => Download {
343 3 : metadata,
344 3 : download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
345 3 : },
346 2 : None => Download {
347 2 : metadata,
348 2 : download_stream: Box::pin(source),
349 2 : },
350 : })
351 : } else {
352 UBC 0 : Err(DownloadError::NotFound)
353 : }
354 CBC 14 : }
355 :
356 2738 : async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
357 2738 : let file_path = path.with_base(&self.storage_root);
358 2738 : match fs::remove_file(&file_path).await {
359 2706 : Ok(()) => Ok(()),
360 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
361 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
362 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
363 32 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
364 UBC 0 : Err(e) => Err(anyhow::anyhow!(e)),
365 : }
366 CBC 5476 : }
367 :
368 168 : async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
369 2861 : for path in paths {
370 2693 : self.delete(path).await?
371 : }
372 168 : Ok(())
373 336 : }
374 : }
375 :
376 827 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
377 827 : path_with_suffix_extension(original_path, "metadata")
378 827 : }
379 :
380 29 : fn get_all_files<'a, P>(
381 29 : directory_path: P,
382 29 : recursive: bool,
383 29 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
384 29 : where
385 29 : P: AsRef<Utf8Path> + Send + Sync + 'a,
386 29 : {
387 29 : Box::pin(async move {
388 29 : let directory_path = directory_path.as_ref();
389 29 : if directory_path.exists() {
390 29 : if directory_path.is_dir() {
391 29 : let mut paths = Vec::new();
392 29 : let mut dir_contents = fs::read_dir(directory_path).await?;
393 66 : while let Some(dir_entry) = dir_contents.next_entry().await? {
394 37 : let file_type = dir_entry.file_type().await?;
395 37 : let entry_path =
396 37 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
397 UBC 0 : anyhow::Error::msg(format!(
398 0 : "non-Unicode path: {}",
399 0 : pb.to_string_lossy()
400 0 : ))
401 CBC 37 : })?;
402 37 : if file_type.is_symlink() {
403 UBC 0 : debug!("{entry_path:?} is a symlink, skipping")
404 CBC 37 : } else if file_type.is_dir() {
405 34 : if recursive {
406 18 : paths.extend(get_all_files(&entry_path, true).await?.into_iter())
407 : } else {
408 28 : paths.push(entry_path)
409 : }
410 3 : } else {
411 3 : paths.push(entry_path);
412 3 : }
413 : }
414 29 : Ok(paths)
415 : } else {
416 UBC 0 : bail!("Path {directory_path:?} is not a directory")
417 : }
418 : } else {
419 0 : Ok(Vec::new())
420 : }
421 CBC 29 : })
422 29 : }
423 :
424 10270 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
425 10270 : let target_dir = match target_file_path.parent() {
426 10270 : Some(parent_dir) => parent_dir,
427 UBC 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
428 : };
429 CBC 10270 : if !target_dir.exists() {
430 672 : fs::create_dir_all(target_dir).await?;
431 9598 : }
432 10270 : Ok(())
433 10270 : }
434 :
435 1260 : fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
436 1260 : if file_path.exists() {
437 826 : ensure!(file_path.is_file(), "file path '{file_path}' is not a file");
438 826 : Ok(true)
439 : } else {
440 434 : Ok(false)
441 : }
442 1260 : }
443 :
444 : #[cfg(test)]
445 : mod fs_tests {
446 : use super::*;
447 :
448 : use camino_tempfile::tempdir;
449 : use std::{collections::HashMap, io::Write};
450 :
451 3 : async fn read_and_assert_remote_file_contents(
452 3 : storage: &LocalFs,
453 3 : #[allow(clippy::ptr_arg)]
454 3 : // have to use &Utf8PathBuf due to `storage.local_path` parameter requirements
455 3 : remote_storage_path: &RemotePath,
456 3 : expected_metadata: Option<&StorageMetadata>,
457 3 : ) -> anyhow::Result<String> {
458 3 : let mut download = storage
459 3 : .download(remote_storage_path)
460 4 : .await
461 3 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
462 3 : ensure!(
463 3 : download.metadata.as_ref() == expected_metadata,
464 UBC 0 : "Unexpected metadata returned for the downloaded file"
465 : );
466 :
467 CBC 3 : let mut contents = String::new();
468 3 : download
469 3 : .download_stream
470 3 : .read_to_string(&mut contents)
471 6 : .await
472 3 : .context("Failed to read remote file contents into string")?;
473 3 : Ok(contents)
474 3 : }
475 :
476 1 : #[tokio::test]
477 1 : async fn upload_file() -> anyhow::Result<()> {
478 1 : let storage = create_storage()?;
479 :
480 6 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
481 1 : assert_eq!(
482 6 : storage.list().await?,
483 1 : vec![target_path_1.clone()],
484 UBC 0 : "Should list a single file after first upload"
485 : );
486 :
487 CBC 6 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
488 1 : assert_eq!(
489 6 : list_files_sorted(&storage).await?,
490 1 : vec![target_path_1.clone(), target_path_2.clone()],
491 UBC 0 : "Should list a two different files after second upload"
492 : );
493 :
494 CBC 1 : Ok(())
495 : }
496 :
497 1 : #[tokio::test]
498 1 : async fn upload_file_negatives() -> anyhow::Result<()> {
499 1 : let storage = create_storage()?;
500 :
501 1 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
502 1 : let content = std::io::Cursor::new(b"12345");
503 1 :
504 1 : // Check that you get an error if the size parameter doesn't match the actual
505 1 : // size of the stream.
506 1 : storage
507 1 : .upload(Box::new(content.clone()), 0, &id, None)
508 1 : .await
509 1 : .expect_err("upload with zero size succeeded");
510 1 : storage
511 1 : .upload(Box::new(content.clone()), 4, &id, None)
512 2 : .await
513 1 : .expect_err("upload with too short size succeeded");
514 1 : storage
515 1 : .upload(Box::new(content.clone()), 6, &id, None)
516 2 : .await
517 1 : .expect_err("upload with too large size succeeded");
518 1 :
519 1 : // Correct size is 5, this should succeed.
520 3 : storage.upload(Box::new(content), 5, &id, None).await?;
521 :
522 1 : Ok(())
523 : }
524 :
525 7 : fn create_storage() -> anyhow::Result<LocalFs> {
526 7 : let storage_root = tempdir()?.path().to_path_buf();
527 7 : LocalFs::new(storage_root)
528 7 : }
529 :
530 1 : #[tokio::test]
531 1 : async fn download_file() -> anyhow::Result<()> {
532 1 : let storage = create_storage()?;
533 1 : let upload_name = "upload_1";
534 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
535 :
536 3 : let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
537 1 : assert_eq!(
538 1 : dummy_contents(upload_name),
539 : contents,
540 UBC 0 : "We should upload and download the same contents"
541 : );
542 :
543 CBC 1 : let non_existing_path = "somewhere/else";
544 1 : match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?).await {
545 1 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
546 UBC 0 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
547 : }
548 CBC 1 : Ok(())
549 : }
550 :
551 1 : #[tokio::test]
552 1 : async fn download_file_range_positive() -> anyhow::Result<()> {
553 1 : let storage = create_storage()?;
554 1 : let upload_name = "upload_1";
555 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
556 :
557 1 : let full_range_download_contents =
558 3 : read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
559 1 : assert_eq!(
560 1 : dummy_contents(upload_name),
561 : full_range_download_contents,
562 UBC 0 : "Download full range should return the whole upload"
563 : );
564 :
565 CBC 1 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
566 1 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
567 :
568 1 : let mut first_part_download = storage
569 1 : .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
570 2 : .await?;
571 1 : assert!(
572 1 : first_part_download.metadata.is_none(),
573 UBC 0 : "No metadata should be returned for no metadata upload"
574 : );
575 :
576 CBC 1 : let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
577 1 : io::copy(
578 1 : &mut first_part_download.download_stream,
579 1 : &mut first_part_remote,
580 1 : )
581 1 : .await?;
582 1 : first_part_remote.flush().await?;
583 1 : let first_part_remote = first_part_remote.into_inner().into_inner();
584 1 : assert_eq!(
585 1 : first_part_local,
586 1 : first_part_remote.as_slice(),
587 UBC 0 : "First part bytes should be returned when requested"
588 : );
589 :
590 CBC 1 : let mut second_part_download = storage
591 1 : .download_byte_range(
592 1 : &upload_target,
593 1 : first_part_local.len() as u64,
594 1 : Some((first_part_local.len() + second_part_local.len()) as u64),
595 1 : )
596 2 : .await?;
597 1 : assert!(
598 1 : second_part_download.metadata.is_none(),
599 UBC 0 : "No metadata should be returned for no metadata upload"
600 : );
601 :
602 CBC 1 : let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
603 1 : io::copy(
604 1 : &mut second_part_download.download_stream,
605 1 : &mut second_part_remote,
606 1 : )
607 1 : .await?;
608 1 : second_part_remote.flush().await?;
609 1 : let second_part_remote = second_part_remote.into_inner().into_inner();
610 1 : assert_eq!(
611 1 : second_part_local,
612 1 : second_part_remote.as_slice(),
613 UBC 0 : "Second part bytes should be returned when requested"
614 : );
615 :
616 CBC 1 : Ok(())
617 : }
618 :
619 1 : #[tokio::test]
620 1 : async fn download_file_range_negative() -> anyhow::Result<()> {
621 1 : let storage = create_storage()?;
622 1 : let upload_name = "upload_1";
623 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
624 :
625 1 : let start = 1_000_000_000;
626 1 : let end = start + 1;
627 1 : match storage
628 1 : .download_byte_range(
629 1 : &upload_target,
630 1 : start,
631 1 : Some(end), // exclusive end
632 1 : )
633 UBC 0 : .await
634 : {
635 0 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
636 CBC 1 : Err(e) => {
637 1 : let error_string = e.to_string();
638 1 : assert!(error_string.contains("zero bytes"));
639 1 : assert!(error_string.contains(&start.to_string()));
640 1 : assert!(error_string.contains(&end.to_string()));
641 : }
642 : }
643 :
644 1 : let start = 10000;
645 1 : let end = 234;
646 1 : assert!(start > end, "Should test an incorrect range");
647 1 : match storage
648 1 : .download_byte_range(&upload_target, start, Some(end))
649 UBC 0 : .await
650 : {
651 0 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
652 CBC 1 : Err(e) => {
653 1 : let error_string = e.to_string();
654 1 : assert!(error_string.contains("Invalid range"));
655 1 : assert!(error_string.contains(&start.to_string()));
656 1 : assert!(error_string.contains(&end.to_string()));
657 : }
658 : }
659 :
660 1 : Ok(())
661 : }
662 :
663 1 : #[tokio::test]
664 1 : async fn delete_file() -> anyhow::Result<()> {
665 1 : let storage = create_storage()?;
666 1 : let upload_name = "upload_1";
667 6 : let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
668 :
669 1 : storage.delete(&upload_target).await?;
670 6 : assert!(storage.list().await?.is_empty());
671 :
672 1 : storage
673 1 : .delete(&upload_target)
674 1 : .await
675 1 : .expect("Should allow deleting non-existing storage files");
676 1 :
677 1 : Ok(())
678 : }
679 :
680 1 : #[tokio::test]
681 1 : async fn file_with_metadata() -> anyhow::Result<()> {
682 1 : let storage = create_storage()?;
683 1 : let upload_name = "upload_1";
684 1 : let metadata = StorageMetadata(HashMap::from([
685 1 : ("one".to_string(), "1".to_string()),
686 1 : ("two".to_string(), "2".to_string()),
687 1 : ]));
688 1 : let upload_target =
689 7 : upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
690 :
691 1 : let full_range_download_contents =
692 4 : read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
693 1 : assert_eq!(
694 1 : dummy_contents(upload_name),
695 : full_range_download_contents,
696 UBC 0 : "We should upload and download the same contents"
697 : );
698 :
699 CBC 1 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
700 1 : let (first_part_local, _) = uploaded_bytes.split_at(3);
701 :
702 1 : let mut partial_download_with_metadata = storage
703 1 : .download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
704 3 : .await?;
705 1 : let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
706 1 : io::copy(
707 1 : &mut partial_download_with_metadata.download_stream,
708 1 : &mut first_part_remote,
709 1 : )
710 1 : .await?;
711 1 : first_part_remote.flush().await?;
712 1 : let first_part_remote = first_part_remote.into_inner().into_inner();
713 1 : assert_eq!(
714 1 : first_part_local,
715 1 : first_part_remote.as_slice(),
716 UBC 0 : "First part bytes should be returned when requested"
717 : );
718 :
719 CBC 1 : assert_eq!(
720 1 : partial_download_with_metadata.metadata,
721 1 : Some(metadata),
722 UBC 0 : "We should get the same metadata back for partial download"
723 : );
724 :
725 CBC 1 : Ok(())
726 : }
727 :
728 7 : async fn upload_dummy_file(
729 7 : storage: &LocalFs,
730 7 : name: &str,
731 7 : metadata: Option<StorageMetadata>,
732 7 : ) -> anyhow::Result<RemotePath> {
733 7 : let from_path = storage
734 7 : .storage_root
735 7 : .join("timelines")
736 7 : .join("some_timeline")
737 7 : .join(name);
738 7 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
739 :
740 7 : let relative_path = from_path
741 7 : .strip_prefix(&storage.storage_root)
742 7 : .context("Failed to strip storage root prefix")
743 7 : .and_then(RemotePath::new)
744 7 : .with_context(|| {
745 UBC 0 : format!(
746 0 : "Failed to resolve remote part of path {:?} for base {:?}",
747 0 : from_path, storage.storage_root
748 0 : )
749 CBC 7 : })?;
750 :
751 7 : storage
752 7 : .upload(Box::new(file), size, &relative_path, metadata)
753 36 : .await?;
754 7 : Ok(relative_path)
755 7 : }
756 :
757 7 : async fn create_file_for_upload(
758 7 : path: &Utf8Path,
759 7 : contents: &str,
760 7 : ) -> anyhow::Result<(io::BufReader<fs::File>, usize)> {
761 7 : std::fs::create_dir_all(path.parent().unwrap())?;
762 7 : let mut file_for_writing = std::fs::OpenOptions::new()
763 7 : .write(true)
764 7 : .create_new(true)
765 7 : .open(path)?;
766 7 : write!(file_for_writing, "{}", contents)?;
767 7 : drop(file_for_writing);
768 7 : let file_size = path.metadata()?.len() as usize;
769 7 : Ok((
770 7 : io::BufReader::new(fs::OpenOptions::new().read(true).open(&path).await?),
771 7 : file_size,
772 : ))
773 7 : }
774 :
775 12 : fn dummy_contents(name: &str) -> String {
776 12 : format!("contents for {name}")
777 12 : }
778 :
779 1 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
780 6 : let mut files = storage.list().await?;
781 1 : files.sort_by(|a, b| a.0.cmp(&b.0));
782 1 : Ok(files)
783 1 : }
784 : }
|