Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{
8 : borrow::Cow,
9 : future::Future,
10 : io::ErrorKind,
11 : num::NonZeroU32,
12 : pin::Pin,
13 : time::{Duration, SystemTime, UNIX_EPOCH},
14 : };
15 :
16 : use anyhow::{bail, ensure, Context};
17 : use bytes::Bytes;
18 : use camino::{Utf8Path, Utf8PathBuf};
19 : use futures::stream::Stream;
20 : use tokio::{
21 : fs,
22 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
23 : };
24 : use tokio_util::{io::ReaderStream, sync::CancellationToken};
25 : use tracing::*;
26 : use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
27 :
28 : use crate::{
29 : Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
30 : };
31 :
32 : use super::{RemoteStorage, StorageMetadata};
33 : use crate::Etag;
34 :
35 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
36 :
37 : #[derive(Debug, Clone)]
38 : pub struct LocalFs {
39 : storage_root: Utf8PathBuf,
40 : timeout: Duration,
41 : }
42 :
43 : impl LocalFs {
44 : /// Attempts to create local FS storage, along with its root directory.
45 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
46 140 : pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
47 140 : if !storage_root.exists() {
48 20 : std::fs::create_dir_all(&storage_root).with_context(|| {
49 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
50 20 : })?;
51 120 : }
52 140 : if !storage_root.is_absolute() {
53 104 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
54 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
55 104 : })?;
56 36 : }
57 :
58 140 : Ok(Self {
59 140 : storage_root,
60 140 : timeout,
61 140 : })
62 140 : }
63 :
64 : // mirrors S3Bucket::s3_object_to_relative_path
65 30 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
66 30 : let relative_path = key
67 30 : .strip_prefix(&self.storage_root)
68 30 : .expect("relative path must contain storage_root as prefix");
69 30 : RemotePath(relative_path.into())
70 30 : }
71 :
72 50 : async fn read_storage_metadata(
73 50 : &self,
74 50 : file_path: &Utf8Path,
75 50 : ) -> anyhow::Result<Option<StorageMetadata>> {
76 50 : let metadata_path = storage_metadata_path(file_path);
77 50 : if metadata_path.exists() && metadata_path.is_file() {
78 4 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
79 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
80 4 : })?;
81 :
82 4 : serde_json::from_str(&metadata_string)
83 4 : .with_context(|| {
84 0 : format!(
85 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
86 0 : )
87 4 : })
88 4 : .map(|metadata| Some(StorageMetadata(metadata)))
89 : } else {
90 46 : Ok(None)
91 : }
92 50 : }
93 :
94 : #[cfg(test)]
95 6 : async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
96 6 : Ok(get_all_files(&self.storage_root, true)
97 18 : .await?
98 6 : .into_iter()
99 6 : .map(|path| {
100 6 : path.strip_prefix(&self.storage_root)
101 6 : .context("Failed to strip storage root prefix")
102 6 : .and_then(RemotePath::new)
103 6 : .expect(
104 6 : "We list files for storage root, hence should be able to remote the prefix",
105 6 : )
106 6 : })
107 6 : .collect())
108 6 : }
109 :
110 : // recursively lists all files in a directory,
111 : // mirroring the `list_files` for `s3_bucket`
112 8 : async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
113 8 : let full_path = match folder {
114 6 : Some(folder) => folder.with_base(&self.storage_root),
115 2 : None => self.storage_root.clone(),
116 : };
117 :
118 : // If we were given a directory, we may use it as our starting point.
119 : // Otherwise, we must go up to the first ancestor dir that exists. This is because
120 : // S3 object list prefixes can be arbitrary strings, but when reading
121 : // the local filesystem we need a directory to start calling read_dir on.
122 8 : let mut initial_dir = full_path.clone();
123 14 : loop {
124 14 : // Did we make it to the root?
125 14 : if initial_dir.parent().is_none() {
126 0 : anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
127 14 : }
128 14 :
129 14 : match fs::metadata(initial_dir.clone()).await {
130 14 : Ok(meta) if meta.is_dir() => {
131 8 : // We found a directory, break
132 8 : break;
133 : }
134 6 : Ok(_meta) => {
135 6 : // It's not a directory: strip back to the parent
136 6 : initial_dir.pop();
137 6 : }
138 0 : Err(e) if e.kind() == ErrorKind::NotFound => {
139 0 : // It's not a file that exists: strip the prefix back to the parent directory
140 0 : initial_dir.pop();
141 0 : }
142 0 : Err(e) => {
143 0 : // Unexpected I/O error
144 0 : anyhow::bail!(e)
145 : }
146 : }
147 : }
148 : // Note that Utf8PathBuf starts_with only considers full path segments, but
149 : // object prefixes are arbitrary strings, so we need the strings for doing
150 : // starts_with later.
151 8 : let prefix = full_path.as_str();
152 8 :
153 8 : let mut files = vec![];
154 8 : let mut directory_queue = vec![initial_dir];
155 24 : while let Some(cur_folder) = directory_queue.pop() {
156 16 : let mut entries = cur_folder.read_dir_utf8()?;
157 52 : while let Some(Ok(entry)) = entries.next() {
158 36 : let file_name = entry.file_name();
159 36 : let full_file_name = cur_folder.join(file_name);
160 36 : if full_file_name.as_str().starts_with(prefix) {
161 30 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
162 30 : files.push(file_remote_path);
163 30 : if full_file_name.is_dir() {
164 8 : directory_queue.push(full_file_name);
165 22 : }
166 6 : }
167 : }
168 : }
169 :
170 8 : Ok(files)
171 8 : }
172 :
173 1836 : async fn upload0(
174 1836 : &self,
175 1836 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
176 1836 : data_size_bytes: usize,
177 1836 : to: &RemotePath,
178 1836 : metadata: Option<StorageMetadata>,
179 1836 : cancel: &CancellationToken,
180 1836 : ) -> anyhow::Result<()> {
181 1836 : let target_file_path = to.with_base(&self.storage_root);
182 1836 : create_target_directory(&target_file_path).await?;
183 : // We need this dance with sort of durable rename (without fsyncs)
184 : // to prevent partial uploads. This was really hit when pageserver shutdown
185 : // cancelled the upload and partial file was left on the fs
186 : // NOTE: Because temp file suffix always the same this operation is racy.
187 : // Two concurrent operations can lead to the following sequence:
188 : // T1: write(temp)
189 : // T2: write(temp) -> overwrites the content
190 : // T1: rename(temp, dst) -> succeeds
191 : // T2: rename(temp, dst) -> fails, temp no longet exists
192 : // This can be solved by supplying unique temp suffix every time, but this situation
193 : // is not normal in the first place, the error can help (and helped at least once)
194 : // to discover bugs in upper level synchronization.
195 1834 : let temp_file_path =
196 1834 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
197 1798 : let mut destination = io::BufWriter::new(
198 1834 : fs::OpenOptions::new()
199 1834 : .write(true)
200 1834 : .create(true)
201 1834 : .truncate(true)
202 1834 : .open(&temp_file_path)
203 1662 : .await
204 1798 : .with_context(|| {
205 0 : format!("Failed to open target fs destination at '{target_file_path}'")
206 1798 : })?,
207 : );
208 :
209 1798 : let from_size_bytes = data_size_bytes as u64;
210 1798 : let data = tokio_util::io::StreamReader::new(data);
211 1798 : let data = std::pin::pin!(data);
212 1798 : let mut buffer_to_read = data.take(from_size_bytes);
213 1798 :
214 1798 : // alternatively we could just write the bytes to a file, but local_fs is a testing utility
215 1798 : let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
216 :
217 25035 : let bytes_read = tokio::select! {
218 : biased;
219 : _ = cancel.cancelled() => {
220 : let file = destination.into_inner();
221 : // wait for the inflight operation(s) to complete so that there could be a next
222 : // attempt right away and our writes are not directed to their file.
223 : file.into_std().await;
224 :
225 : // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
226 : // least.
227 : fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
228 : return Err(TimeoutOrCancel::Cancel.into());
229 : }
230 1788 : read = copy => read,
231 : };
232 :
233 1788 : let bytes_read =
234 1788 : bytes_read.with_context(|| {
235 0 : format!(
236 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
237 0 : )
238 1788 : })?;
239 :
240 1788 : if bytes_read < from_size_bytes {
241 2 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
242 1786 : }
243 1786 : // Check if there is any extra data after the given size.
244 1786 : let mut from = buffer_to_read.into_inner();
245 1786 : let extra_read = from.read(&mut [1]).await?;
246 1786 : ensure!(
247 1786 : extra_read == 0,
248 4 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
249 : );
250 :
251 1782 : destination.flush().await.with_context(|| {
252 0 : format!(
253 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
254 0 : )
255 1782 : })?;
256 :
257 1782 : fs::rename(temp_file_path, &target_file_path)
258 1602 : .await
259 1779 : .with_context(|| {
260 0 : format!(
261 0 : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
262 0 : )
263 1779 : })?;
264 :
265 1779 : if let Some(storage_metadata) = metadata {
266 : // FIXME: we must not be using metadata much, since this would forget the old metadata
267 : // for new writes? or perhaps metadata is sticky; could consider removing if it's never
268 : // used.
269 2 : let storage_metadata_path = storage_metadata_path(&target_file_path);
270 2 : fs::write(
271 2 : &storage_metadata_path,
272 2 : serde_json::to_string(&storage_metadata.0)
273 2 : .context("Failed to serialize storage metadata as json")?,
274 : )
275 2 : .await
276 2 : .with_context(|| {
277 0 : format!(
278 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
279 0 : )
280 2 : })?;
281 1777 : }
282 :
283 1779 : Ok(())
284 1787 : }
285 : }
286 :
287 : impl RemoteStorage for LocalFs {
288 116 : async fn list(
289 116 : &self,
290 116 : prefix: Option<&RemotePath>,
291 116 : mode: ListingMode,
292 116 : max_keys: Option<NonZeroU32>,
293 116 : cancel: &CancellationToken,
294 116 : ) -> Result<Listing, DownloadError> {
295 116 : let op = async {
296 116 : let mut result = Listing::default();
297 116 :
298 116 : if let ListingMode::NoDelimiter = mode {
299 8 : let keys = self
300 8 : .list_recursive(prefix)
301 14 : .await
302 8 : .map_err(DownloadError::Other)?;
303 :
304 8 : result.keys = keys
305 8 : .into_iter()
306 30 : .filter(|k| {
307 30 : let path = k.with_base(&self.storage_root);
308 30 : !path.is_dir()
309 30 : })
310 8 : .collect();
311 :
312 8 : if let Some(max_keys) = max_keys {
313 0 : result.keys.truncate(max_keys.get() as usize);
314 8 : }
315 :
316 8 : return Ok(result);
317 108 : }
318 :
319 108 : let path = match prefix {
320 106 : Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
321 2 : None => Cow::Borrowed(&self.storage_root),
322 : };
323 :
324 108 : let prefixes_to_filter = get_all_files(path.as_ref(), false)
325 7 : .await
326 108 : .map_err(DownloadError::Other)?;
327 :
328 : // filter out empty directories to mirror s3 behavior.
329 120 : for prefix in prefixes_to_filter {
330 12 : if prefix.is_dir()
331 10 : && is_directory_empty(&prefix)
332 8 : .await
333 10 : .map_err(DownloadError::Other)?
334 : {
335 0 : continue;
336 12 : }
337 12 :
338 12 : let stripped = prefix
339 12 : .strip_prefix(&self.storage_root)
340 12 : .context("Failed to strip prefix")
341 12 : .and_then(RemotePath::new)
342 12 : .expect(
343 12 : "We list files for storage root, hence should be able to remote the prefix",
344 12 : );
345 12 :
346 12 : if prefix.is_dir() {
347 10 : result.prefixes.push(stripped);
348 10 : } else {
349 2 : result.keys.push(stripped);
350 2 : }
351 : }
352 :
353 108 : Ok(result)
354 116 : };
355 :
356 116 : let timeout = async {
357 42 : tokio::time::sleep(self.timeout).await;
358 0 : Err(DownloadError::Timeout)
359 0 : };
360 :
361 116 : let cancelled = async {
362 85 : cancel.cancelled().await;
363 0 : Err(DownloadError::Cancelled)
364 0 : };
365 :
366 145 : tokio::select! {
367 116 : res = op => res,
368 0 : res = timeout => res,
369 0 : res = cancelled => res,
370 : }
371 116 : }
372 :
373 1836 : async fn upload(
374 1836 : &self,
375 1836 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
376 1836 : data_size_bytes: usize,
377 1836 : to: &RemotePath,
378 1836 : metadata: Option<StorageMetadata>,
379 1836 : cancel: &CancellationToken,
380 1836 : ) -> anyhow::Result<()> {
381 1836 : let cancel = cancel.child_token();
382 1836 :
383 1836 : let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
384 1836 : let mut op = std::pin::pin!(op);
385 :
386 : // race the upload0 to the timeout; if it goes over, do a graceful shutdown
387 29355 : let (res, timeout) = tokio::select! {
388 1787 : res = &mut op => (res, false),
389 : _ = tokio::time::sleep(self.timeout) => {
390 : cancel.cancel();
391 : (op.await, true)
392 : }
393 : };
394 :
395 8 : match res {
396 8 : Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
397 0 : // we caused this cancel (or they happened simultaneously) -- swap it out to
398 0 : // Timeout
399 0 : Err(TimeoutOrCancel::Timeout.into())
400 : }
401 1787 : res => res,
402 : }
403 1787 : }
404 :
405 56 : async fn download(
406 56 : &self,
407 56 : from: &RemotePath,
408 56 : cancel: &CancellationToken,
409 56 : ) -> Result<Download, DownloadError> {
410 56 : let target_path = from.with_base(&self.storage_root);
411 :
412 56 : let file_metadata = file_metadata(&target_path).await?;
413 :
414 40 : let source = ReaderStream::new(
415 40 : fs::OpenOptions::new()
416 40 : .read(true)
417 40 : .open(&target_path)
418 38 : .await
419 40 : .with_context(|| {
420 0 : format!("Failed to open source file {target_path:?} to use in the download")
421 40 : })
422 40 : .map_err(DownloadError::Other)?,
423 : );
424 :
425 40 : let metadata = self
426 40 : .read_storage_metadata(&target_path)
427 2 : .await
428 40 : .map_err(DownloadError::Other)?;
429 :
430 40 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
431 40 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
432 40 :
433 40 : let etag = mock_etag(&file_metadata);
434 40 : Ok(Download {
435 40 : metadata,
436 40 : last_modified: file_metadata
437 40 : .modified()
438 40 : .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
439 40 : etag,
440 40 : download_stream: Box::pin(source),
441 : })
442 56 : }
443 :
444 14 : async fn download_byte_range(
445 14 : &self,
446 14 : from: &RemotePath,
447 14 : start_inclusive: u64,
448 14 : end_exclusive: Option<u64>,
449 14 : cancel: &CancellationToken,
450 14 : ) -> Result<Download, DownloadError> {
451 14 : if let Some(end_exclusive) = end_exclusive {
452 10 : if end_exclusive <= start_inclusive {
453 2 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
454 8 : };
455 8 : if start_inclusive == end_exclusive.saturating_sub(1) {
456 2 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
457 6 : }
458 4 : }
459 :
460 10 : let target_path = from.with_base(&self.storage_root);
461 10 : let file_metadata = file_metadata(&target_path).await?;
462 10 : let mut source = tokio::fs::OpenOptions::new()
463 10 : .read(true)
464 10 : .open(&target_path)
465 10 : .await
466 10 : .with_context(|| {
467 0 : format!("Failed to open source file {target_path:?} to use in the download")
468 10 : })
469 10 : .map_err(DownloadError::Other)?;
470 :
471 10 : let len = source
472 10 : .metadata()
473 10 : .await
474 10 : .context("query file length")
475 10 : .map_err(DownloadError::Other)?
476 10 : .len();
477 10 :
478 10 : source
479 10 : .seek(io::SeekFrom::Start(start_inclusive))
480 10 : .await
481 10 : .context("Failed to seek to the range start in a local storage file")
482 10 : .map_err(DownloadError::Other)?;
483 :
484 10 : let metadata = self
485 10 : .read_storage_metadata(&target_path)
486 2 : .await
487 10 : .map_err(DownloadError::Other)?;
488 :
489 10 : let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
490 10 : let source = ReaderStream::new(source);
491 10 :
492 10 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
493 10 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
494 10 :
495 10 : let etag = mock_etag(&file_metadata);
496 10 : Ok(Download {
497 10 : metadata,
498 10 : last_modified: file_metadata
499 10 : .modified()
500 10 : .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
501 10 : etag,
502 10 : download_stream: Box::pin(source),
503 : })
504 14 : }
505 :
506 12 : async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
507 12 : let file_path = path.with_base(&self.storage_root);
508 12 : match fs::remove_file(&file_path).await {
509 10 : Ok(()) => Ok(()),
510 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
511 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
512 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
513 2 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
514 0 : Err(e) => Err(anyhow::anyhow!(e)),
515 : }
516 12 : }
517 :
518 6 : async fn delete_objects<'a>(
519 6 : &self,
520 6 : paths: &'a [RemotePath],
521 6 : cancel: &CancellationToken,
522 6 : ) -> anyhow::Result<()> {
523 12 : for path in paths {
524 6 : self.delete(path, cancel).await?
525 : }
526 6 : Ok(())
527 6 : }
528 :
529 0 : async fn copy(
530 0 : &self,
531 0 : from: &RemotePath,
532 0 : to: &RemotePath,
533 0 : _cancel: &CancellationToken,
534 0 : ) -> anyhow::Result<()> {
535 0 : let from_path = from.with_base(&self.storage_root);
536 0 : let to_path = to.with_base(&self.storage_root);
537 0 : create_target_directory(&to_path).await?;
538 0 : fs::copy(&from_path, &to_path).await.with_context(|| {
539 0 : format!(
540 0 : "Failed to copy file from '{from_path}' to '{to_path}'",
541 0 : from_path = from_path,
542 0 : to_path = to_path
543 0 : )
544 0 : })?;
545 0 : Ok(())
546 0 : }
547 :
548 0 : async fn time_travel_recover(
549 0 : &self,
550 0 : _prefix: Option<&RemotePath>,
551 0 : _timestamp: SystemTime,
552 0 : _done_if_after: SystemTime,
553 0 : _cancel: &CancellationToken,
554 0 : ) -> Result<(), TimeTravelError> {
555 0 : Err(TimeTravelError::Unimplemented)
556 0 : }
557 : }
558 :
559 52 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
560 52 : path_with_suffix_extension(original_path, "metadata")
561 52 : }
562 :
563 126 : fn get_all_files<'a, P>(
564 126 : directory_path: P,
565 126 : recursive: bool,
566 126 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
567 126 : where
568 126 : P: AsRef<Utf8Path> + Send + Sync + 'a,
569 126 : {
570 126 : Box::pin(async move {
571 126 : let directory_path = directory_path.as_ref();
572 126 : if directory_path.exists() {
573 26 : if directory_path.is_dir() {
574 26 : let mut paths = Vec::new();
575 26 : let mut dir_contents = fs::read_dir(directory_path).await?;
576 56 : while let Some(dir_entry) = dir_contents.next_entry().await? {
577 30 : let file_type = dir_entry.file_type().await?;
578 30 : let entry_path =
579 30 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
580 0 : anyhow::Error::msg(format!(
581 0 : "non-Unicode path: {}",
582 0 : pb.to_string_lossy()
583 0 : ))
584 30 : })?;
585 30 : if file_type.is_symlink() {
586 0 : debug!("{entry_path:?} is a symlink, skipping")
587 30 : } else if file_type.is_dir() {
588 22 : if recursive {
589 18 : paths.extend(get_all_files(&entry_path, true).await?.into_iter())
590 : } else {
591 10 : paths.push(entry_path)
592 : }
593 8 : } else {
594 8 : paths.push(entry_path);
595 8 : }
596 : }
597 26 : Ok(paths)
598 : } else {
599 0 : bail!("Path {directory_path:?} is not a directory")
600 : }
601 : } else {
602 100 : Ok(Vec::new())
603 : }
604 126 : })
605 126 : }
606 :
607 1836 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
608 1836 : let target_dir = match target_file_path.parent() {
609 1836 : Some(parent_dir) => parent_dir,
610 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
611 : };
612 1836 : if !target_dir.exists() {
613 321 : fs::create_dir_all(target_dir).await?;
614 1515 : }
615 1834 : Ok(())
616 1834 : }
617 :
618 66 : async fn file_metadata(file_path: &Utf8Path) -> Result<std::fs::Metadata, DownloadError> {
619 66 : tokio::fs::metadata(&file_path).await.map_err(|e| {
620 16 : if e.kind() == ErrorKind::NotFound {
621 16 : DownloadError::NotFound
622 : } else {
623 0 : DownloadError::BadInput(e.into())
624 : }
625 66 : })
626 66 : }
627 :
628 : // Use mtime as stand-in for ETag. We could calculate a meaningful one by md5'ing the contents of files we
629 : // read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests
630 : // quickly, with less overhead than using a mock S3 server.
631 50 : fn mock_etag(meta: &std::fs::Metadata) -> Etag {
632 50 : let mtime = meta.modified().expect("Filesystem mtime missing");
633 50 : format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into()
634 50 : }
635 :
636 : #[cfg(test)]
637 : mod fs_tests {
638 : use super::*;
639 :
640 : use camino_tempfile::tempdir;
641 : use std::{collections::HashMap, io::Write};
642 :
643 6 : async fn read_and_check_metadata(
644 6 : storage: &LocalFs,
645 6 : remote_storage_path: &RemotePath,
646 6 : expected_metadata: Option<&StorageMetadata>,
647 6 : ) -> anyhow::Result<String> {
648 6 : let cancel = CancellationToken::new();
649 6 : let download = storage
650 6 : .download(remote_storage_path, &cancel)
651 14 : .await
652 6 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
653 6 : ensure!(
654 6 : download.metadata.as_ref() == expected_metadata,
655 0 : "Unexpected metadata returned for the downloaded file"
656 : );
657 :
658 12 : let contents = aggregate(download.download_stream).await?;
659 :
660 6 : String::from_utf8(contents).map_err(anyhow::Error::new)
661 6 : }
662 :
663 : #[tokio::test]
664 2 : async fn upload_file() -> anyhow::Result<()> {
665 2 : let (storage, cancel) = create_storage()?;
666 2 :
667 11 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
668 2 : assert_eq!(
669 6 : storage.list_all().await?,
670 2 : vec![target_path_1.clone()],
671 2 : "Should list a single file after first upload"
672 2 : );
673 2 :
674 12 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
675 2 : assert_eq!(
676 6 : list_files_sorted(&storage).await?,
677 2 : vec![target_path_1.clone(), target_path_2.clone()],
678 2 : "Should list a two different files after second upload"
679 2 : );
680 2 :
681 2 : Ok(())
682 2 : }
683 :
684 : #[tokio::test]
685 2 : async fn upload_file_negatives() -> anyhow::Result<()> {
686 2 : let (storage, cancel) = create_storage()?;
687 2 :
688 2 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
689 2 : let content = Bytes::from_static(b"12345");
690 8 : let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
691 2 :
692 2 : // Check that you get an error if the size parameter doesn't match the actual
693 2 : // size of the stream.
694 2 : storage
695 2 : .upload(content(), 0, &id, None, &cancel)
696 2 : .await
697 2 : .expect_err("upload with zero size succeeded");
698 2 : storage
699 2 : .upload(content(), 4, &id, None, &cancel)
700 4 : .await
701 2 : .expect_err("upload with too short size succeeded");
702 2 : storage
703 2 : .upload(content(), 6, &id, None, &cancel)
704 4 : .await
705 2 : .expect_err("upload with too large size succeeded");
706 2 :
707 2 : // Correct size is 5, this should succeed.
708 6 : storage.upload(content(), 5, &id, None, &cancel).await?;
709 2 :
710 2 : Ok(())
711 2 : }
712 :
713 20 : fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
714 20 : let storage_root = tempdir()?.path().to_path_buf();
715 20 : LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
716 20 : }
717 :
718 : #[tokio::test]
719 2 : async fn download_file() -> anyhow::Result<()> {
720 2 : let (storage, cancel) = create_storage()?;
721 2 : let upload_name = "upload_1";
722 11 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
723 2 :
724 8 : let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
725 2 : assert_eq!(
726 2 : dummy_contents(upload_name),
727 2 : contents,
728 2 : "We should upload and download the same contents"
729 2 : );
730 2 :
731 2 : let non_existing_path = "somewhere/else";
732 2 : match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?, &cancel).await {
733 2 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
734 2 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
735 2 : }
736 2 : Ok(())
737 2 : }
738 :
739 : #[tokio::test]
740 2 : async fn download_file_range_positive() -> anyhow::Result<()> {
741 2 : let (storage, cancel) = create_storage()?;
742 2 : let upload_name = "upload_1";
743 12 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
744 2 :
745 2 : let full_range_download_contents =
746 8 : read_and_check_metadata(&storage, &upload_target, None).await?;
747 2 : assert_eq!(
748 2 : dummy_contents(upload_name),
749 2 : full_range_download_contents,
750 2 : "Download full range should return the whole upload"
751 2 : );
752 2 :
753 2 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
754 2 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
755 2 :
756 2 : let first_part_download = storage
757 2 : .download_byte_range(
758 2 : &upload_target,
759 2 : 0,
760 2 : Some(first_part_local.len() as u64),
761 2 : &cancel,
762 2 : )
763 8 : .await?;
764 2 : assert!(
765 2 : first_part_download.metadata.is_none(),
766 2 : "No metadata should be returned for no metadata upload"
767 2 : );
768 2 :
769 2 : let first_part_remote = aggregate(first_part_download.download_stream).await?;
770 2 : assert_eq!(
771 2 : first_part_local, first_part_remote,
772 2 : "First part bytes should be returned when requested"
773 2 : );
774 2 :
775 2 : let second_part_download = storage
776 2 : .download_byte_range(
777 2 : &upload_target,
778 2 : first_part_local.len() as u64,
779 2 : Some((first_part_local.len() + second_part_local.len()) as u64),
780 2 : &cancel,
781 2 : )
782 8 : .await?;
783 2 : assert!(
784 2 : second_part_download.metadata.is_none(),
785 2 : "No metadata should be returned for no metadata upload"
786 2 : );
787 2 :
788 2 : let second_part_remote = aggregate(second_part_download.download_stream).await?;
789 2 : assert_eq!(
790 2 : second_part_local, second_part_remote,
791 2 : "Second part bytes should be returned when requested"
792 2 : );
793 2 :
794 2 : let suffix_bytes = storage
795 2 : .download_byte_range(&upload_target, 13, None, &cancel)
796 8 : .await?
797 2 : .download_stream;
798 2 : let suffix_bytes = aggregate(suffix_bytes).await?;
799 2 : let suffix = std::str::from_utf8(&suffix_bytes)?;
800 2 : assert_eq!(upload_name, suffix);
801 2 :
802 2 : let all_bytes = storage
803 2 : .download_byte_range(&upload_target, 0, None, &cancel)
804 8 : .await?
805 2 : .download_stream;
806 2 : let all_bytes = aggregate(all_bytes).await?;
807 2 : let all_bytes = std::str::from_utf8(&all_bytes)?;
808 2 : assert_eq!(dummy_contents("upload_1"), all_bytes);
809 2 :
810 2 : Ok(())
811 2 : }
812 :
813 : #[tokio::test]
814 2 : async fn download_file_range_negative() -> anyhow::Result<()> {
815 2 : let (storage, cancel) = create_storage()?;
816 2 : let upload_name = "upload_1";
817 12 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
818 2 :
819 2 : let start = 1_000_000_000;
820 2 : let end = start + 1;
821 2 : match storage
822 2 : .download_byte_range(
823 2 : &upload_target,
824 2 : start,
825 2 : Some(end), // exclusive end
826 2 : &cancel,
827 2 : )
828 2 : .await
829 2 : {
830 2 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
831 2 : Err(e) => {
832 2 : let error_string = e.to_string();
833 2 : assert!(error_string.contains("zero bytes"));
834 2 : assert!(error_string.contains(&start.to_string()));
835 2 : assert!(error_string.contains(&end.to_string()));
836 2 : }
837 2 : }
838 2 :
839 2 : let start = 10000;
840 2 : let end = 234;
841 2 : assert!(start > end, "Should test an incorrect range");
842 2 : match storage
843 2 : .download_byte_range(&upload_target, start, Some(end), &cancel)
844 2 : .await
845 2 : {
846 2 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
847 2 : Err(e) => {
848 2 : let error_string = e.to_string();
849 2 : assert!(error_string.contains("Invalid range"));
850 2 : assert!(error_string.contains(&start.to_string()));
851 2 : assert!(error_string.contains(&end.to_string()));
852 2 : }
853 2 : }
854 2 :
855 2 : Ok(())
856 2 : }
857 :
858 : #[tokio::test]
859 2 : async fn delete_file() -> anyhow::Result<()> {
860 2 : let (storage, cancel) = create_storage()?;
861 2 : let upload_name = "upload_1";
862 12 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
863 2 :
864 2 : storage.delete(&upload_target, &cancel).await?;
865 6 : assert!(storage.list_all().await?.is_empty());
866 2 :
867 2 : storage
868 2 : .delete(&upload_target, &cancel)
869 2 : .await
870 2 : .expect("Should allow deleting non-existing storage files");
871 2 :
872 2 : Ok(())
873 2 : }
874 :
875 : #[tokio::test]
876 2 : async fn file_with_metadata() -> anyhow::Result<()> {
877 2 : let (storage, cancel) = create_storage()?;
878 2 : let upload_name = "upload_1";
879 2 : let metadata = StorageMetadata(HashMap::from([
880 2 : ("one".to_string(), "1".to_string()),
881 2 : ("two".to_string(), "2".to_string()),
882 2 : ]));
883 2 : let upload_target =
884 14 : upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
885 2 :
886 2 : let full_range_download_contents =
887 10 : read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
888 2 : assert_eq!(
889 2 : dummy_contents(upload_name),
890 2 : full_range_download_contents,
891 2 : "We should upload and download the same contents"
892 2 : );
893 2 :
894 2 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
895 2 : let (first_part_local, _) = uploaded_bytes.split_at(3);
896 2 :
897 2 : let partial_download_with_metadata = storage
898 2 : .download_byte_range(
899 2 : &upload_target,
900 2 : 0,
901 2 : Some(first_part_local.len() as u64),
902 2 : &cancel,
903 2 : )
904 10 : .await?;
905 2 : let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
906 2 : assert_eq!(
907 2 : first_part_local,
908 2 : first_part_remote.as_slice(),
909 2 : "First part bytes should be returned when requested"
910 2 : );
911 2 :
912 2 : assert_eq!(
913 2 : partial_download_with_metadata.metadata,
914 2 : Some(metadata),
915 2 : "We should get the same metadata back for partial download"
916 2 : );
917 2 :
918 2 : Ok(())
919 2 : }
920 :
921 : #[tokio::test]
922 2 : async fn list() -> anyhow::Result<()> {
923 2 : // No delimiter: should recursively list everything
924 2 : let (storage, cancel) = create_storage()?;
925 12 : let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
926 12 : let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
927 2 :
928 2 : let listing = storage
929 2 : .list(None, ListingMode::NoDelimiter, None, &cancel)
930 2 : .await?;
931 2 : assert!(listing.prefixes.is_empty());
932 2 : assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
933 2 :
934 2 : // Delimiter: should only go one deep
935 2 : let listing = storage
936 2 : .list(None, ListingMode::WithDelimiter, None, &cancel)
937 4 : .await?;
938 2 :
939 2 : assert_eq!(
940 2 : listing.prefixes,
941 2 : [RemotePath::from_string("timelines").unwrap()].to_vec()
942 2 : );
943 2 : assert!(listing.keys.is_empty());
944 2 :
945 2 : // Delimiter & prefix
946 2 : let listing = storage
947 2 : .list(
948 2 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
949 2 : ListingMode::WithDelimiter,
950 2 : None,
951 2 : &cancel,
952 2 : )
953 4 : .await?;
954 2 : assert_eq!(
955 2 : listing.prefixes,
956 2 : [RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
957 2 : .to_vec()
958 2 : );
959 2 : assert_eq!(listing.keys, [uncle.clone()].to_vec());
960 2 :
961 2 : Ok(())
962 2 : }
963 :
964 : #[tokio::test]
965 2 : async fn overwrite_shorter_file() -> anyhow::Result<()> {
966 2 : let (storage, cancel) = create_storage()?;
967 2 :
968 2 : let path = RemotePath::new("does/not/matter/file".into())?;
969 2 :
970 2 : let body = Bytes::from_static(b"long file contents is long");
971 2 : {
972 2 : let len = body.len();
973 2 : let body =
974 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
975 8 : storage.upload(body, len, &path, None, &cancel).await?;
976 2 : }
977 2 :
978 4 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
979 2 : assert_eq!(body, read);
980 2 :
981 2 : let shorter = Bytes::from_static(b"shorter body");
982 2 : {
983 2 : let len = shorter.len();
984 2 : let body =
985 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
986 6 : storage.upload(body, len, &path, None, &cancel).await?;
987 2 : }
988 2 :
989 4 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
990 2 : assert_eq!(shorter, read);
991 2 : Ok(())
992 2 : }
993 :
994 : #[tokio::test]
995 2 : async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
996 2 : let (storage, cancel) = create_storage()?;
997 2 :
998 2 : let path = RemotePath::new("does/not/matter/file".into())?;
999 2 :
1000 2 : let body = Bytes::from_static(b"long file contents is long");
1001 2 : {
1002 2 : let len = body.len();
1003 2 : let body =
1004 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1005 2 : let cancel = cancel.child_token();
1006 2 : cancel.cancel();
1007 2 : let e = storage
1008 2 : .upload(body, len, &path, None, &cancel)
1009 6 : .await
1010 2 : .unwrap_err();
1011 2 :
1012 2 : assert!(TimeoutOrCancel::caused_by_cancel(&e));
1013 2 : }
1014 2 :
1015 2 : {
1016 2 : let len = body.len();
1017 2 : let body =
1018 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1019 6 : storage.upload(body, len, &path, None, &cancel).await?;
1020 2 : }
1021 2 :
1022 4 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1023 2 : assert_eq!(body, read);
1024 2 :
1025 2 : Ok(())
1026 2 : }
1027 :
1028 18 : async fn upload_dummy_file(
1029 18 : storage: &LocalFs,
1030 18 : name: &str,
1031 18 : metadata: Option<StorageMetadata>,
1032 18 : cancel: &CancellationToken,
1033 18 : ) -> anyhow::Result<RemotePath> {
1034 18 : let from_path = storage
1035 18 : .storage_root
1036 18 : .join("timelines")
1037 18 : .join("some_timeline")
1038 18 : .join(name);
1039 18 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
1040 :
1041 18 : let relative_path = from_path
1042 18 : .strip_prefix(&storage.storage_root)
1043 18 : .context("Failed to strip storage root prefix")
1044 18 : .and_then(RemotePath::new)
1045 18 : .with_context(|| {
1046 0 : format!(
1047 0 : "Failed to resolve remote part of path {:?} for base {:?}",
1048 0 : from_path, storage.storage_root
1049 0 : )
1050 18 : })?;
1051 :
1052 18 : let file = tokio_util::io::ReaderStream::new(file);
1053 18 :
1054 18 : storage
1055 18 : .upload(file, size, &relative_path, metadata, cancel)
1056 90 : .await?;
1057 18 : Ok(relative_path)
1058 18 : }
1059 :
1060 18 : async fn create_file_for_upload(
1061 18 : path: &Utf8Path,
1062 18 : contents: &str,
1063 18 : ) -> anyhow::Result<(fs::File, usize)> {
1064 18 : std::fs::create_dir_all(path.parent().unwrap())?;
1065 18 : let mut file_for_writing = std::fs::OpenOptions::new()
1066 18 : .write(true)
1067 18 : .create_new(true)
1068 18 : .open(path)?;
1069 18 : write!(file_for_writing, "{}", contents)?;
1070 18 : drop(file_for_writing);
1071 18 : let file_size = path.metadata()?.len() as usize;
1072 18 : Ok((
1073 18 : fs::OpenOptions::new().read(true).open(&path).await?,
1074 18 : file_size,
1075 : ))
1076 18 : }
1077 :
1078 30 : fn dummy_contents(name: &str) -> String {
1079 30 : format!("contents for {name}")
1080 30 : }
1081 :
1082 2 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
1083 6 : let mut files = storage.list_all().await?;
1084 2 : files.sort_by(|a, b| a.0.cmp(&b.0));
1085 2 : Ok(files)
1086 2 : }
1087 :
1088 22 : async fn aggregate(
1089 22 : stream: impl Stream<Item = std::io::Result<Bytes>>,
1090 22 : ) -> anyhow::Result<Vec<u8>> {
1091 22 : use futures::stream::StreamExt;
1092 22 : let mut out = Vec::new();
1093 22 : let mut stream = std::pin::pin!(stream);
1094 44 : while let Some(res) = stream.next().await {
1095 22 : out.extend_from_slice(&res?[..]);
1096 : }
1097 22 : Ok(out)
1098 22 : }
1099 : }
|