Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{
8 : borrow::Cow,
9 : future::Future,
10 : io::ErrorKind,
11 : num::NonZeroU32,
12 : pin::Pin,
13 : time::{Duration, SystemTime},
14 : };
15 :
16 : use anyhow::{bail, ensure, Context};
17 : use bytes::Bytes;
18 : use camino::{Utf8Path, Utf8PathBuf};
19 : use futures::stream::Stream;
20 : use tokio::{
21 : fs,
22 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23 : };
24 : use tokio_util::{io::ReaderStream, sync::CancellationToken};
25 : use tracing::*;
26 : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
27 :
28 : use crate::{
29 : Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
30 : };
31 :
32 : use super::{RemoteStorage, StorageMetadata};
33 :
34 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
35 :
36 492 : #[derive(Debug, Clone)]
37 : pub struct LocalFs {
38 : storage_root: Utf8PathBuf,
39 : timeout: Duration,
40 : }
41 :
42 : impl LocalFs {
43 : /// Attempts to create local FS storage, along with its root directory.
44 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
45 126 : pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
46 126 : if !storage_root.exists() {
47 20 : std::fs::create_dir_all(&storage_root).with_context(|| {
48 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
49 20 : })?;
50 106 : }
51 126 : if !storage_root.is_absolute() {
52 89 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
53 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
54 89 : })?;
55 37 : }
56 :
57 126 : Ok(Self {
58 126 : storage_root,
59 126 : timeout,
60 126 : })
61 126 : }
62 :
63 : // mirrors S3Bucket::s3_object_to_relative_path
64 30 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
65 30 : let relative_path = key
66 30 : .strip_prefix(&self.storage_root)
67 30 : .expect("relative path must contain storage_root as prefix");
68 30 : RemotePath(relative_path.into())
69 30 : }
70 :
71 44 : async fn read_storage_metadata(
72 44 : &self,
73 44 : file_path: &Utf8Path,
74 44 : ) -> anyhow::Result<Option<StorageMetadata>> {
75 44 : let metadata_path = storage_metadata_path(file_path);
76 44 : if metadata_path.exists() && metadata_path.is_file() {
77 4 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
78 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
79 4 : })?;
80 :
81 4 : serde_json::from_str(&metadata_string)
82 4 : .with_context(|| {
83 0 : format!(
84 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
85 0 : )
86 4 : })
87 4 : .map(|metadata| Some(StorageMetadata(metadata)))
88 : } else {
89 40 : Ok(None)
90 : }
91 44 : }
92 :
93 : #[cfg(test)]
94 6 : async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
95 6 : Ok(get_all_files(&self.storage_root, true)
96 18 : .await?
97 6 : .into_iter()
98 6 : .map(|path| {
99 6 : path.strip_prefix(&self.storage_root)
100 6 : .context("Failed to strip storage root prefix")
101 6 : .and_then(RemotePath::new)
102 6 : .expect(
103 6 : "We list files for storage root, hence should be able to remote the prefix",
104 6 : )
105 6 : })
106 6 : .collect())
107 6 : }
108 :
109 : // recursively lists all files in a directory,
110 : // mirroring the `list_files` for `s3_bucket`
111 8 : async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
112 8 : let full_path = match folder {
113 6 : Some(folder) => folder.with_base(&self.storage_root),
114 2 : None => self.storage_root.clone(),
115 : };
116 :
117 : // If we were given a directory, we may use it as our starting point.
118 : // Otherwise, we must go up to the first ancestor dir that exists. This is because
119 : // S3 object list prefixes can be arbitrary strings, but when reading
120 : // the local filesystem we need a directory to start calling read_dir on.
121 8 : let mut initial_dir = full_path.clone();
122 14 : loop {
123 14 : // Did we make it to the root?
124 14 : if initial_dir.parent().is_none() {
125 0 : anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
126 14 : }
127 14 :
128 14 : match fs::metadata(initial_dir.clone()).await {
129 14 : Ok(meta) if meta.is_dir() => {
130 8 : // We found a directory, break
131 8 : break;
132 : }
133 6 : Ok(_meta) => {
134 6 : // It's not a directory: strip back to the parent
135 6 : initial_dir.pop();
136 6 : }
137 0 : Err(e) if e.kind() == ErrorKind::NotFound => {
138 0 : // It's not a file that exists: strip the prefix back to the parent directory
139 0 : initial_dir.pop();
140 0 : }
141 0 : Err(e) => {
142 0 : // Unexpected I/O error
143 0 : anyhow::bail!(e)
144 : }
145 : }
146 : }
147 : // Note that Utf8PathBuf starts_with only considers full path segments, but
148 : // object prefixes are arbitrary strings, so we need the strings for doing
149 : // starts_with later.
150 8 : let prefix = full_path.as_str();
151 8 :
152 8 : let mut files = vec![];
153 8 : let mut directory_queue = vec![initial_dir];
154 24 : while let Some(cur_folder) = directory_queue.pop() {
155 16 : let mut entries = cur_folder.read_dir_utf8()?;
156 52 : while let Some(Ok(entry)) = entries.next() {
157 36 : let file_name = entry.file_name();
158 36 : let full_file_name = cur_folder.join(file_name);
159 36 : if full_file_name.as_str().starts_with(prefix) {
160 30 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
161 30 : files.push(file_remote_path);
162 30 : if full_file_name.is_dir() {
163 8 : directory_queue.push(full_file_name);
164 22 : }
165 6 : }
166 : }
167 : }
168 :
169 8 : Ok(files)
170 8 : }
171 :
172 1396 : async fn upload0(
173 1396 : &self,
174 1396 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
175 1396 : data_size_bytes: usize,
176 1396 : to: &RemotePath,
177 1396 : metadata: Option<StorageMetadata>,
178 1396 : cancel: &CancellationToken,
179 1396 : ) -> anyhow::Result<()> {
180 1396 : let target_file_path = to.with_base(&self.storage_root);
181 1396 : create_target_directory(&target_file_path).await?;
182 : // We need this dance with sort of durable rename (without fsyncs)
183 : // to prevent partial uploads. This was really hit when pageserver shutdown
184 : // cancelled the upload and partial file was left on the fs
185 : // NOTE: Because temp file suffix always the same this operation is racy.
186 : // Two concurrent operations can lead to the following sequence:
187 : // T1: write(temp)
188 : // T2: write(temp) -> overwrites the content
189 : // T1: rename(temp, dst) -> succeeds
190 : // T2: rename(temp, dst) -> fails, temp no longet exists
191 : // This can be solved by supplying unique temp suffix every time, but this situation
192 : // is not normal in the first place, the error can help (and helped at least once)
193 : // to discover bugs in upper level synchronization.
194 1395 : let temp_file_path =
195 1395 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
196 1393 : let mut destination = io::BufWriter::new(
197 1395 : fs::OpenOptions::new()
198 1395 : .write(true)
199 1395 : .create(true)
200 1395 : .open(&temp_file_path)
201 1391 : .await
202 1393 : .with_context(|| {
203 0 : format!("Failed to open target fs destination at '{target_file_path}'")
204 1393 : })?,
205 : );
206 :
207 1393 : let from_size_bytes = data_size_bytes as u64;
208 1393 : let data = tokio_util::io::StreamReader::new(data);
209 1393 : let data = std::pin::pin!(data);
210 1393 : let mut buffer_to_read = data.take(from_size_bytes);
211 1393 :
212 1393 : // alternatively we could just write the bytes to a file, but local_fs is a testing utility
213 1393 : let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
214 :
215 1393 : let bytes_read = tokio::select! {
216 : biased;
217 : _ = cancel.cancelled() => {
218 : let file = destination.into_inner();
219 : // wait for the inflight operation(s) to complete so that there could be a next
220 : // attempt right away and our writes are not directed to their file.
221 : file.into_std().await;
222 :
223 : // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
224 : // least.
225 : fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
226 : return Err(TimeoutOrCancel::Cancel.into());
227 : }
228 1385 : read = copy => read,
229 : };
230 :
231 1385 : let bytes_read =
232 1385 : bytes_read.with_context(|| {
233 0 : format!(
234 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
235 0 : )
236 1385 : })?;
237 :
238 1385 : if bytes_read < from_size_bytes {
239 2 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
240 1383 : }
241 1383 : // Check if there is any extra data after the given size.
242 1383 : let mut from = buffer_to_read.into_inner();
243 1383 : let extra_read = from.read(&mut [1]).await?;
244 1382 : ensure!(
245 1382 : extra_read == 0,
246 4 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
247 : );
248 :
249 1378 : destination.flush().await.with_context(|| {
250 0 : format!(
251 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
252 0 : )
253 1378 : })?;
254 :
255 1378 : fs::rename(temp_file_path, &target_file_path)
256 1370 : .await
257 1376 : .with_context(|| {
258 0 : format!(
259 0 : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
260 0 : )
261 1376 : })?;
262 :
263 1376 : if let Some(storage_metadata) = metadata {
264 : // FIXME: we must not be using metadata much, since this would forget the old metadata
265 : // for new writes? or perhaps metadata is sticky; could consider removing if it's never
266 : // used.
267 2 : let storage_metadata_path = storage_metadata_path(&target_file_path);
268 2 : fs::write(
269 2 : &storage_metadata_path,
270 2 : serde_json::to_string(&storage_metadata.0)
271 2 : .context("Failed to serialize storage metadata as json")?,
272 : )
273 2 : .await
274 2 : .with_context(|| {
275 0 : format!(
276 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
277 0 : )
278 2 : })?;
279 1374 : }
280 :
281 1376 : Ok(())
282 1384 : }
283 : }
284 :
285 : impl RemoteStorage for LocalFs {
286 100 : async fn list(
287 100 : &self,
288 100 : prefix: Option<&RemotePath>,
289 100 : mode: ListingMode,
290 100 : max_keys: Option<NonZeroU32>,
291 100 : cancel: &CancellationToken,
292 100 : ) -> Result<Listing, DownloadError> {
293 100 : let op = async {
294 100 : let mut result = Listing::default();
295 100 :
296 100 : if let ListingMode::NoDelimiter = mode {
297 8 : let keys = self
298 8 : .list_recursive(prefix)
299 14 : .await
300 8 : .map_err(DownloadError::Other)?;
301 :
302 8 : result.keys = keys
303 8 : .into_iter()
304 30 : .filter(|k| {
305 30 : let path = k.with_base(&self.storage_root);
306 30 : !path.is_dir()
307 30 : })
308 8 : .collect();
309 :
310 8 : if let Some(max_keys) = max_keys {
311 0 : result.keys.truncate(max_keys.get() as usize);
312 8 : }
313 :
314 8 : return Ok(result);
315 92 : }
316 :
317 92 : let path = match prefix {
318 90 : Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
319 2 : None => Cow::Borrowed(&self.storage_root),
320 : };
321 :
322 92 : let prefixes_to_filter = get_all_files(path.as_ref(), false)
323 8 : .await
324 92 : .map_err(DownloadError::Other)?;
325 :
326 : // filter out empty directories to mirror s3 behavior.
327 104 : for prefix in prefixes_to_filter {
328 12 : if prefix.is_dir()
329 10 : && is_directory_empty(&prefix)
330 10 : .await
331 10 : .map_err(DownloadError::Other)?
332 : {
333 0 : continue;
334 12 : }
335 12 :
336 12 : let stripped = prefix
337 12 : .strip_prefix(&self.storage_root)
338 12 : .context("Failed to strip prefix")
339 12 : .and_then(RemotePath::new)
340 12 : .expect(
341 12 : "We list files for storage root, hence should be able to remote the prefix",
342 12 : );
343 12 :
344 12 : if prefix.is_dir() {
345 10 : result.prefixes.push(stripped);
346 10 : } else {
347 2 : result.keys.push(stripped);
348 2 : }
349 : }
350 :
351 92 : Ok(result)
352 100 : };
353 :
354 100 : let timeout = async {
355 40 : tokio::time::sleep(self.timeout).await;
356 0 : Err(DownloadError::Timeout)
357 0 : };
358 :
359 100 : let cancelled = async {
360 68 : cancel.cancelled().await;
361 0 : Err(DownloadError::Cancelled)
362 0 : };
363 :
364 100 : tokio::select! {
365 100 : res = op => res,
366 0 : res = timeout => res,
367 0 : res = cancelled => res,
368 : }
369 100 : }
370 :
371 1396 : async fn upload(
372 1396 : &self,
373 1396 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
374 1396 : data_size_bytes: usize,
375 1396 : to: &RemotePath,
376 1396 : metadata: Option<StorageMetadata>,
377 1396 : cancel: &CancellationToken,
378 1396 : ) -> anyhow::Result<()> {
379 1396 : let cancel = cancel.child_token();
380 1396 :
381 1396 : let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
382 1396 : let mut op = std::pin::pin!(op);
383 :
384 : // race the upload0 to the timeout; if it goes over, do a graceful shutdown
385 1396 : let (res, timeout) = tokio::select! {
386 1384 : res = &mut op => (res, false),
387 : _ = tokio::time::sleep(self.timeout) => {
388 : cancel.cancel();
389 : (op.await, true)
390 : }
391 : };
392 :
393 8 : match res {
394 8 : Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
395 0 : // we caused this cancel (or they happened simultaneously) -- swap it out to
396 0 : // Timeout
397 0 : Err(TimeoutOrCancel::Timeout.into())
398 : }
399 1384 : res => res,
400 : }
401 1384 : }
402 :
403 50 : async fn download(
404 50 : &self,
405 50 : from: &RemotePath,
406 50 : cancel: &CancellationToken,
407 50 : ) -> Result<Download, DownloadError> {
408 50 : let target_path = from.with_base(&self.storage_root);
409 50 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
410 34 : let source = ReaderStream::new(
411 34 : fs::OpenOptions::new()
412 34 : .read(true)
413 34 : .open(&target_path)
414 34 : .await
415 34 : .with_context(|| {
416 0 : format!("Failed to open source file {target_path:?} to use in the download")
417 34 : })
418 34 : .map_err(DownloadError::Other)?,
419 : );
420 :
421 34 : let metadata = self
422 34 : .read_storage_metadata(&target_path)
423 2 : .await
424 34 : .map_err(DownloadError::Other)?;
425 :
426 34 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
427 34 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
428 34 :
429 34 : Ok(Download {
430 34 : metadata,
431 34 : last_modified: None,
432 34 : etag: None,
433 34 : download_stream: Box::pin(source),
434 34 : })
435 : } else {
436 16 : Err(DownloadError::NotFound)
437 : }
438 50 : }
439 :
440 14 : async fn download_byte_range(
441 14 : &self,
442 14 : from: &RemotePath,
443 14 : start_inclusive: u64,
444 14 : end_exclusive: Option<u64>,
445 14 : cancel: &CancellationToken,
446 14 : ) -> Result<Download, DownloadError> {
447 14 : if let Some(end_exclusive) = end_exclusive {
448 10 : if end_exclusive <= start_inclusive {
449 2 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
450 8 : };
451 8 : if start_inclusive == end_exclusive.saturating_sub(1) {
452 2 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
453 6 : }
454 4 : }
455 10 : let target_path = from.with_base(&self.storage_root);
456 10 : if file_exists(&target_path).map_err(DownloadError::BadInput)? {
457 10 : let mut source = tokio::fs::OpenOptions::new()
458 10 : .read(true)
459 10 : .open(&target_path)
460 10 : .await
461 10 : .with_context(|| {
462 0 : format!("Failed to open source file {target_path:?} to use in the download")
463 10 : })
464 10 : .map_err(DownloadError::Other)?;
465 :
466 10 : let len = source
467 10 : .metadata()
468 10 : .await
469 10 : .context("query file length")
470 10 : .map_err(DownloadError::Other)?
471 10 : .len();
472 10 :
473 10 : source
474 10 : .seek(io::SeekFrom::Start(start_inclusive))
475 10 : .await
476 10 : .context("Failed to seek to the range start in a local storage file")
477 10 : .map_err(DownloadError::Other)?;
478 :
479 10 : let metadata = self
480 10 : .read_storage_metadata(&target_path)
481 2 : .await
482 10 : .map_err(DownloadError::Other)?;
483 :
484 10 : let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
485 10 : let source = ReaderStream::new(source);
486 10 :
487 10 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
488 10 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
489 10 :
490 10 : Ok(Download {
491 10 : metadata,
492 10 : last_modified: None,
493 10 : etag: None,
494 10 : download_stream: Box::pin(source),
495 10 : })
496 : } else {
497 0 : Err(DownloadError::NotFound)
498 : }
499 14 : }
500 :
501 12 : async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
502 12 : let file_path = path.with_base(&self.storage_root);
503 12 : match fs::remove_file(&file_path).await {
504 10 : Ok(()) => Ok(()),
505 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
506 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
507 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
508 2 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
509 0 : Err(e) => Err(anyhow::anyhow!(e)),
510 : }
511 12 : }
512 :
513 6 : async fn delete_objects<'a>(
514 6 : &self,
515 6 : paths: &'a [RemotePath],
516 6 : cancel: &CancellationToken,
517 6 : ) -> anyhow::Result<()> {
518 12 : for path in paths {
519 6 : self.delete(path, cancel).await?
520 : }
521 6 : Ok(())
522 6 : }
523 :
524 0 : async fn copy(
525 0 : &self,
526 0 : from: &RemotePath,
527 0 : to: &RemotePath,
528 0 : _cancel: &CancellationToken,
529 0 : ) -> anyhow::Result<()> {
530 0 : let from_path = from.with_base(&self.storage_root);
531 0 : let to_path = to.with_base(&self.storage_root);
532 0 : create_target_directory(&to_path).await?;
533 0 : fs::copy(&from_path, &to_path).await.with_context(|| {
534 0 : format!(
535 0 : "Failed to copy file from '{from_path}' to '{to_path}'",
536 0 : from_path = from_path,
537 0 : to_path = to_path
538 0 : )
539 0 : })?;
540 0 : Ok(())
541 0 : }
542 :
543 0 : async fn time_travel_recover(
544 0 : &self,
545 0 : _prefix: Option<&RemotePath>,
546 0 : _timestamp: SystemTime,
547 0 : _done_if_after: SystemTime,
548 0 : _cancel: &CancellationToken,
549 0 : ) -> Result<(), TimeTravelError> {
550 0 : Err(TimeTravelError::Unimplemented)
551 0 : }
552 : }
553 :
554 46 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
555 46 : path_with_suffix_extension(original_path, "metadata")
556 46 : }
557 :
558 110 : fn get_all_files<'a, P>(
559 110 : directory_path: P,
560 110 : recursive: bool,
561 110 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
562 110 : where
563 110 : P: AsRef<Utf8Path> + Send + Sync + 'a,
564 110 : {
565 110 : Box::pin(async move {
566 110 : let directory_path = directory_path.as_ref();
567 110 : if directory_path.exists() {
568 26 : if directory_path.is_dir() {
569 26 : let mut paths = Vec::new();
570 26 : let mut dir_contents = fs::read_dir(directory_path).await?;
571 56 : while let Some(dir_entry) = dir_contents.next_entry().await? {
572 30 : let file_type = dir_entry.file_type().await?;
573 30 : let entry_path =
574 30 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
575 0 : anyhow::Error::msg(format!(
576 0 : "non-Unicode path: {}",
577 0 : pb.to_string_lossy()
578 0 : ))
579 30 : })?;
580 30 : if file_type.is_symlink() {
581 0 : debug!("{entry_path:?} is a symlink, skipping")
582 30 : } else if file_type.is_dir() {
583 22 : if recursive {
584 18 : paths.extend(get_all_files(&entry_path, true).await?.into_iter())
585 : } else {
586 10 : paths.push(entry_path)
587 : }
588 8 : } else {
589 8 : paths.push(entry_path);
590 8 : }
591 : }
592 26 : Ok(paths)
593 : } else {
594 0 : bail!("Path {directory_path:?} is not a directory")
595 : }
596 : } else {
597 84 : Ok(Vec::new())
598 : }
599 110 : })
600 110 : }
601 :
602 1396 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
603 1396 : let target_dir = match target_file_path.parent() {
604 1396 : Some(parent_dir) => parent_dir,
605 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
606 : };
607 1396 : if !target_dir.exists() {
608 294 : fs::create_dir_all(target_dir).await?;
609 1102 : }
610 1395 : Ok(())
611 1395 : }
612 :
613 60 : fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
614 60 : if file_path.exists() {
615 44 : ensure!(file_path.is_file(), "file path '{file_path}' is not a file");
616 44 : Ok(true)
617 : } else {
618 16 : Ok(false)
619 : }
620 60 : }
621 :
622 : #[cfg(test)]
623 : mod fs_tests {
624 : use super::*;
625 :
626 : use bytes::Bytes;
627 : use camino_tempfile::tempdir;
628 : use futures_util::Stream;
629 : use std::{collections::HashMap, io::Write};
630 :
631 6 : async fn read_and_check_metadata(
632 6 : storage: &LocalFs,
633 6 : remote_storage_path: &RemotePath,
634 6 : expected_metadata: Option<&StorageMetadata>,
635 6 : ) -> anyhow::Result<String> {
636 6 : let cancel = CancellationToken::new();
637 6 : let download = storage
638 6 : .download(remote_storage_path, &cancel)
639 8 : .await
640 6 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
641 6 : ensure!(
642 6 : download.metadata.as_ref() == expected_metadata,
643 0 : "Unexpected metadata returned for the downloaded file"
644 : );
645 :
646 12 : let contents = aggregate(download.download_stream).await?;
647 :
648 6 : String::from_utf8(contents).map_err(anyhow::Error::new)
649 6 : }
650 :
651 2 : #[tokio::test]
652 2 : async fn upload_file() -> anyhow::Result<()> {
653 2 : let (storage, cancel) = create_storage()?;
654 2 :
655 12 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
656 2 : assert_eq!(
657 6 : storage.list_all().await?,
658 2 : vec![target_path_1.clone()],
659 2 : "Should list a single file after first upload"
660 2 : );
661 2 :
662 12 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
663 2 : assert_eq!(
664 6 : list_files_sorted(&storage).await?,
665 2 : vec![target_path_1.clone(), target_path_2.clone()],
666 2 : "Should list a two different files after second upload"
667 2 : );
668 2 :
669 2 : Ok(())
670 2 : }
671 :
672 2 : #[tokio::test]
673 2 : async fn upload_file_negatives() -> anyhow::Result<()> {
674 2 : let (storage, cancel) = create_storage()?;
675 2 :
676 2 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
677 2 : let content = Bytes::from_static(b"12345");
678 8 : let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
679 2 :
680 2 : // Check that you get an error if the size parameter doesn't match the actual
681 2 : // size of the stream.
682 2 : storage
683 2 : .upload(content(), 0, &id, None, &cancel)
684 2 : .await
685 2 : .expect_err("upload with zero size succeeded");
686 2 : storage
687 2 : .upload(content(), 4, &id, None, &cancel)
688 4 : .await
689 2 : .expect_err("upload with too short size succeeded");
690 2 : storage
691 2 : .upload(content(), 6, &id, None, &cancel)
692 4 : .await
693 2 : .expect_err("upload with too large size succeeded");
694 2 :
695 2 : // Correct size is 5, this should succeed.
696 6 : storage.upload(content(), 5, &id, None, &cancel).await?;
697 2 :
698 2 : Ok(())
699 2 : }
700 :
701 20 : fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
702 20 : let storage_root = tempdir()?.path().to_path_buf();
703 20 : LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
704 20 : }
705 :
706 2 : #[tokio::test]
707 2 : async fn download_file() -> anyhow::Result<()> {
708 2 : let (storage, cancel) = create_storage()?;
709 2 : let upload_name = "upload_1";
710 12 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
711 2 :
712 6 : let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
713 2 : assert_eq!(
714 2 : dummy_contents(upload_name),
715 2 : contents,
716 2 : "We should upload and download the same contents"
717 2 : );
718 2 :
719 2 : let non_existing_path = "somewhere/else";
720 2 : match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?, &cancel).await {
721 2 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
722 2 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
723 2 : }
724 2 : Ok(())
725 2 : }
726 :
727 2 : #[tokio::test]
728 2 : async fn download_file_range_positive() -> anyhow::Result<()> {
729 2 : let (storage, cancel) = create_storage()?;
730 2 : let upload_name = "upload_1";
731 12 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
732 2 :
733 2 : let full_range_download_contents =
734 6 : read_and_check_metadata(&storage, &upload_target, None).await?;
735 2 : assert_eq!(
736 2 : dummy_contents(upload_name),
737 2 : full_range_download_contents,
738 2 : "Download full range should return the whole upload"
739 2 : );
740 2 :
741 2 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
742 2 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
743 2 :
744 2 : let first_part_download = storage
745 2 : .download_byte_range(
746 2 : &upload_target,
747 2 : 0,
748 2 : Some(first_part_local.len() as u64),
749 2 : &cancel,
750 2 : )
751 6 : .await?;
752 2 : assert!(
753 2 : first_part_download.metadata.is_none(),
754 2 : "No metadata should be returned for no metadata upload"
755 2 : );
756 2 :
757 2 : let first_part_remote = aggregate(first_part_download.download_stream).await?;
758 2 : assert_eq!(
759 2 : first_part_local, first_part_remote,
760 2 : "First part bytes should be returned when requested"
761 2 : );
762 2 :
763 2 : let second_part_download = storage
764 2 : .download_byte_range(
765 2 : &upload_target,
766 2 : first_part_local.len() as u64,
767 2 : Some((first_part_local.len() + second_part_local.len()) as u64),
768 2 : &cancel,
769 2 : )
770 6 : .await?;
771 2 : assert!(
772 2 : second_part_download.metadata.is_none(),
773 2 : "No metadata should be returned for no metadata upload"
774 2 : );
775 2 :
776 2 : let second_part_remote = aggregate(second_part_download.download_stream).await?;
777 2 : assert_eq!(
778 2 : second_part_local, second_part_remote,
779 2 : "Second part bytes should be returned when requested"
780 2 : );
781 2 :
782 2 : let suffix_bytes = storage
783 2 : .download_byte_range(&upload_target, 13, None, &cancel)
784 6 : .await?
785 2 : .download_stream;
786 2 : let suffix_bytes = aggregate(suffix_bytes).await?;
787 2 : let suffix = std::str::from_utf8(&suffix_bytes)?;
788 2 : assert_eq!(upload_name, suffix);
789 2 :
790 2 : let all_bytes = storage
791 2 : .download_byte_range(&upload_target, 0, None, &cancel)
792 6 : .await?
793 2 : .download_stream;
794 2 : let all_bytes = aggregate(all_bytes).await?;
795 2 : let all_bytes = std::str::from_utf8(&all_bytes)?;
796 2 : assert_eq!(dummy_contents("upload_1"), all_bytes);
797 2 :
798 2 : Ok(())
799 2 : }
800 :
801 2 : #[tokio::test]
802 2 : async fn download_file_range_negative() -> anyhow::Result<()> {
803 2 : let (storage, cancel) = create_storage()?;
804 2 : let upload_name = "upload_1";
805 12 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
806 2 :
807 2 : let start = 1_000_000_000;
808 2 : let end = start + 1;
809 2 : match storage
810 2 : .download_byte_range(
811 2 : &upload_target,
812 2 : start,
813 2 : Some(end), // exclusive end
814 2 : &cancel,
815 2 : )
816 2 : .await
817 2 : {
818 2 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
819 2 : Err(e) => {
820 2 : let error_string = e.to_string();
821 2 : assert!(error_string.contains("zero bytes"));
822 2 : assert!(error_string.contains(&start.to_string()));
823 2 : assert!(error_string.contains(&end.to_string()));
824 2 : }
825 2 : }
826 2 :
827 2 : let start = 10000;
828 2 : let end = 234;
829 2 : assert!(start > end, "Should test an incorrect range");
830 2 : match storage
831 2 : .download_byte_range(&upload_target, start, Some(end), &cancel)
832 2 : .await
833 2 : {
834 2 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
835 2 : Err(e) => {
836 2 : let error_string = e.to_string();
837 2 : assert!(error_string.contains("Invalid range"));
838 2 : assert!(error_string.contains(&start.to_string()));
839 2 : assert!(error_string.contains(&end.to_string()));
840 2 : }
841 2 : }
842 2 :
843 2 : Ok(())
844 2 : }
845 :
846 2 : #[tokio::test]
847 2 : async fn delete_file() -> anyhow::Result<()> {
848 2 : let (storage, cancel) = create_storage()?;
849 2 : let upload_name = "upload_1";
850 12 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
851 2 :
852 2 : storage.delete(&upload_target, &cancel).await?;
853 6 : assert!(storage.list_all().await?.is_empty());
854 2 :
855 2 : storage
856 2 : .delete(&upload_target, &cancel)
857 2 : .await
858 2 : .expect("Should allow deleting non-existing storage files");
859 2 :
860 2 : Ok(())
861 2 : }
862 :
863 2 : #[tokio::test]
864 2 : async fn file_with_metadata() -> anyhow::Result<()> {
865 2 : let (storage, cancel) = create_storage()?;
866 2 : let upload_name = "upload_1";
867 2 : let metadata = StorageMetadata(HashMap::from([
868 2 : ("one".to_string(), "1".to_string()),
869 2 : ("two".to_string(), "2".to_string()),
870 2 : ]));
871 2 : let upload_target =
872 14 : upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
873 2 :
874 2 : let full_range_download_contents =
875 8 : read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
876 2 : assert_eq!(
877 2 : dummy_contents(upload_name),
878 2 : full_range_download_contents,
879 2 : "We should upload and download the same contents"
880 2 : );
881 2 :
882 2 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
883 2 : let (first_part_local, _) = uploaded_bytes.split_at(3);
884 2 :
885 2 : let partial_download_with_metadata = storage
886 2 : .download_byte_range(
887 2 : &upload_target,
888 2 : 0,
889 2 : Some(first_part_local.len() as u64),
890 2 : &cancel,
891 2 : )
892 8 : .await?;
893 2 : let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
894 2 : assert_eq!(
895 2 : first_part_local,
896 2 : first_part_remote.as_slice(),
897 2 : "First part bytes should be returned when requested"
898 2 : );
899 2 :
900 2 : assert_eq!(
901 2 : partial_download_with_metadata.metadata,
902 2 : Some(metadata),
903 2 : "We should get the same metadata back for partial download"
904 2 : );
905 2 :
906 2 : Ok(())
907 2 : }
908 :
909 2 : #[tokio::test]
910 2 : async fn list() -> anyhow::Result<()> {
911 2 : // No delimiter: should recursively list everything
912 2 : let (storage, cancel) = create_storage()?;
913 12 : let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
914 12 : let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
915 2 :
916 2 : let listing = storage
917 2 : .list(None, ListingMode::NoDelimiter, None, &cancel)
918 2 : .await?;
919 2 : assert!(listing.prefixes.is_empty());
920 2 : assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
921 2 :
922 2 : // Delimiter: should only go one deep
923 2 : let listing = storage
924 2 : .list(None, ListingMode::WithDelimiter, None, &cancel)
925 4 : .await?;
926 2 :
927 2 : assert_eq!(
928 2 : listing.prefixes,
929 2 : [RemotePath::from_string("timelines").unwrap()].to_vec()
930 2 : );
931 2 : assert!(listing.keys.is_empty());
932 2 :
933 2 : // Delimiter & prefix
934 2 : let listing = storage
935 2 : .list(
936 2 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
937 2 : ListingMode::WithDelimiter,
938 2 : None,
939 2 : &cancel,
940 2 : )
941 4 : .await?;
942 2 : assert_eq!(
943 2 : listing.prefixes,
944 2 : [RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
945 2 : .to_vec()
946 2 : );
947 2 : assert_eq!(listing.keys, [uncle.clone()].to_vec());
948 2 :
949 2 : Ok(())
950 2 : }
951 :
952 2 : #[tokio::test]
953 2 : async fn overwrite_shorter_file() -> anyhow::Result<()> {
954 2 : let (storage, cancel) = create_storage()?;
955 2 :
956 2 : let path = RemotePath::new("does/not/matter/file".into())?;
957 2 :
958 2 : let body = Bytes::from_static(b"long file contents is long");
959 2 : {
960 2 : let len = body.len();
961 2 : let body =
962 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
963 8 : storage.upload(body, len, &path, None, &cancel).await?;
964 2 : }
965 2 :
966 4 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
967 2 : assert_eq!(body, read);
968 2 :
969 2 : let shorter = Bytes::from_static(b"shorter body");
970 2 : {
971 2 : let len = shorter.len();
972 2 : let body =
973 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
974 6 : storage.upload(body, len, &path, None, &cancel).await?;
975 2 : }
976 2 :
977 4 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
978 2 : assert_eq!(shorter, read);
979 2 : Ok(())
980 2 : }
981 :
982 2 : #[tokio::test]
983 2 : async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
984 2 : let (storage, cancel) = create_storage()?;
985 2 :
986 2 : let path = RemotePath::new("does/not/matter/file".into())?;
987 2 :
988 2 : let body = Bytes::from_static(b"long file contents is long");
989 2 : {
990 2 : let len = body.len();
991 2 : let body =
992 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
993 2 : let cancel = cancel.child_token();
994 2 : cancel.cancel();
995 2 : let e = storage
996 2 : .upload(body, len, &path, None, &cancel)
997 6 : .await
998 2 : .unwrap_err();
999 2 :
1000 2 : assert!(TimeoutOrCancel::caused_by_cancel(&e));
1001 2 : }
1002 2 :
1003 2 : {
1004 2 : let len = body.len();
1005 2 : let body =
1006 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1007 6 : storage.upload(body, len, &path, None, &cancel).await?;
1008 2 : }
1009 2 :
1010 4 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1011 2 : assert_eq!(body, read);
1012 2 :
1013 2 : Ok(())
1014 2 : }
1015 :
1016 18 : async fn upload_dummy_file(
1017 18 : storage: &LocalFs,
1018 18 : name: &str,
1019 18 : metadata: Option<StorageMetadata>,
1020 18 : cancel: &CancellationToken,
1021 18 : ) -> anyhow::Result<RemotePath> {
1022 18 : let from_path = storage
1023 18 : .storage_root
1024 18 : .join("timelines")
1025 18 : .join("some_timeline")
1026 18 : .join(name);
1027 18 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
1028 :
1029 18 : let relative_path = from_path
1030 18 : .strip_prefix(&storage.storage_root)
1031 18 : .context("Failed to strip storage root prefix")
1032 18 : .and_then(RemotePath::new)
1033 18 : .with_context(|| {
1034 0 : format!(
1035 0 : "Failed to resolve remote part of path {:?} for base {:?}",
1036 0 : from_path, storage.storage_root
1037 0 : )
1038 18 : })?;
1039 :
1040 18 : let file = tokio_util::io::ReaderStream::new(file);
1041 18 :
1042 18 : storage
1043 18 : .upload(file, size, &relative_path, metadata, cancel)
1044 92 : .await?;
1045 18 : Ok(relative_path)
1046 18 : }
1047 :
1048 18 : async fn create_file_for_upload(
1049 18 : path: &Utf8Path,
1050 18 : contents: &str,
1051 18 : ) -> anyhow::Result<(fs::File, usize)> {
1052 18 : std::fs::create_dir_all(path.parent().unwrap())?;
1053 18 : let mut file_for_writing = std::fs::OpenOptions::new()
1054 18 : .write(true)
1055 18 : .create_new(true)
1056 18 : .open(path)?;
1057 18 : write!(file_for_writing, "{}", contents)?;
1058 18 : drop(file_for_writing);
1059 18 : let file_size = path.metadata()?.len() as usize;
1060 18 : Ok((
1061 18 : fs::OpenOptions::new().read(true).open(&path).await?,
1062 18 : file_size,
1063 : ))
1064 18 : }
1065 :
1066 30 : fn dummy_contents(name: &str) -> String {
1067 30 : format!("contents for {name}")
1068 30 : }
1069 :
1070 2 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
1071 6 : let mut files = storage.list_all().await?;
1072 2 : files.sort_by(|a, b| a.0.cmp(&b.0));
1073 2 : Ok(files)
1074 2 : }
1075 :
1076 22 : async fn aggregate(
1077 22 : stream: impl Stream<Item = std::io::Result<Bytes>>,
1078 22 : ) -> anyhow::Result<Vec<u8>> {
1079 22 : use futures::stream::StreamExt;
1080 22 : let mut out = Vec::new();
1081 22 : let mut stream = std::pin::pin!(stream);
1082 44 : while let Some(res) = stream.next().await {
1083 22 : out.extend_from_slice(&res?[..]);
1084 : }
1085 22 : Ok(out)
1086 22 : }
1087 : }
|