Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{
8 : collections::HashSet,
9 : io::ErrorKind,
10 : num::NonZeroU32,
11 : time::{Duration, SystemTime, UNIX_EPOCH},
12 : };
13 :
14 : use anyhow::{bail, ensure, Context};
15 : use bytes::Bytes;
16 : use camino::{Utf8Path, Utf8PathBuf};
17 : use futures::stream::Stream;
18 : use tokio::{
19 : fs,
20 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
21 : };
22 : use tokio_util::{io::ReaderStream, sync::CancellationToken};
23 : use utils::crashsafe::path_with_suffix_extension;
24 :
25 : use crate::{
26 : Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject, RemotePath,
27 : TimeTravelError, TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
28 : };
29 :
30 : use super::{RemoteStorage, StorageMetadata};
31 : use crate::Etag;
32 :
33 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
34 :
35 : #[derive(Debug, Clone)]
36 : pub struct LocalFs {
37 : storage_root: Utf8PathBuf,
38 : timeout: Duration,
39 : }
40 :
41 : impl LocalFs {
42 : /// Attempts to create local FS storage, along with its root directory.
43 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
44 235 : pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
45 235 : if !storage_root.exists() {
46 33 : std::fs::create_dir_all(&storage_root).with_context(|| {
47 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
48 33 : })?;
49 202 : }
50 235 : if !storage_root.is_absolute() {
51 192 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
52 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
53 192 : })?;
54 43 : }
55 :
56 235 : Ok(Self {
57 235 : storage_root,
58 235 : timeout,
59 235 : })
60 235 : }
61 :
62 : // mirrors S3Bucket::s3_object_to_relative_path
63 148 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
64 148 : let relative_path = key
65 148 : .strip_prefix(&self.storage_root)
66 148 : .expect("relative path must contain storage_root as prefix");
67 148 : RemotePath(relative_path.into())
68 148 : }
69 :
70 61 : async fn read_storage_metadata(
71 61 : &self,
72 61 : file_path: &Utf8Path,
73 61 : ) -> anyhow::Result<Option<StorageMetadata>> {
74 61 : let metadata_path = storage_metadata_path(file_path);
75 61 : if metadata_path.exists() && metadata_path.is_file() {
76 6 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
77 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
78 6 : })?;
79 :
80 6 : serde_json::from_str(&metadata_string)
81 6 : .with_context(|| {
82 0 : format!(
83 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
84 0 : )
85 6 : })
86 6 : .map(|metadata| Some(StorageMetadata(metadata)))
87 : } else {
88 55 : Ok(None)
89 : }
90 61 : }
91 :
92 : #[cfg(test)]
93 9 : async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
94 : use std::{future::Future, pin::Pin};
95 27 : fn get_all_files<'a, P>(
96 27 : directory_path: P,
97 27 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
98 27 : where
99 27 : P: AsRef<Utf8Path> + Send + Sync + 'a,
100 27 : {
101 27 : Box::pin(async move {
102 27 : let directory_path = directory_path.as_ref();
103 27 : if directory_path.exists() {
104 27 : if directory_path.is_dir() {
105 27 : let mut paths = Vec::new();
106 27 : let mut dir_contents = fs::read_dir(directory_path).await?;
107 54 : while let Some(dir_entry) = dir_contents.next_entry().await? {
108 27 : let file_type = dir_entry.file_type().await?;
109 27 : let entry_path =
110 27 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
111 0 : anyhow::Error::msg(format!(
112 0 : "non-Unicode path: {}",
113 0 : pb.to_string_lossy()
114 0 : ))
115 27 : })?;
116 27 : if file_type.is_symlink() {
117 0 : tracing::debug!("{entry_path:?} is a symlink, skipping")
118 27 : } else if file_type.is_dir() {
119 27 : paths.extend(get_all_files(&entry_path).await?.into_iter())
120 9 : } else {
121 9 : paths.push(entry_path);
122 9 : }
123 : }
124 27 : Ok(paths)
125 : } else {
126 0 : bail!("Path {directory_path:?} is not a directory")
127 : }
128 : } else {
129 0 : Ok(Vec::new())
130 : }
131 27 : })
132 27 : }
133 :
134 9 : Ok(get_all_files(&self.storage_root)
135 27 : .await?
136 9 : .into_iter()
137 9 : .map(|path| {
138 9 : path.strip_prefix(&self.storage_root)
139 9 : .context("Failed to strip storage root prefix")
140 9 : .and_then(RemotePath::new)
141 9 : .expect(
142 9 : "We list files for storage root, hence should be able to remote the prefix",
143 9 : )
144 9 : })
145 9 : .collect())
146 9 : }
147 :
148 : // recursively lists all files in a directory,
149 : // mirroring the `list_files` for `s3_bucket`
150 408 : async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
151 408 : let full_path = match folder {
152 402 : Some(folder) => folder.with_base(&self.storage_root),
153 6 : None => self.storage_root.clone(),
154 : };
155 :
156 : // If we were given a directory, we may use it as our starting point.
157 : // Otherwise, we must go up to the first ancestor dir that exists. This is because
158 : // S3 object list prefixes can be arbitrary strings, but when reading
159 : // the local filesystem we need a directory to start calling read_dir on.
160 408 : let mut initial_dir = full_path.clone();
161 408 :
162 408 : // If there's no trailing slash, we have to start looking from one above: even if
163 408 : // `initial_dir` is a directory, we should still list any prefixes in the parent
164 408 : // that start with the same string.
165 408 : if !full_path.to_string().ends_with('/') {
166 213 : initial_dir.pop();
167 213 : }
168 :
169 : loop {
170 : // Did we make it to the root?
171 1348 : if initial_dir.parent().is_none() {
172 0 : anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
173 1348 : }
174 1348 :
175 1348 : match fs::metadata(initial_dir.clone()).await {
176 408 : Ok(meta) if meta.is_dir() => {
177 408 : // We found a directory, break
178 408 : break;
179 : }
180 0 : Ok(_meta) => {
181 0 : // It's not a directory: strip back to the parent
182 0 : initial_dir.pop();
183 0 : }
184 940 : Err(e) if e.kind() == ErrorKind::NotFound => {
185 940 : // It's not a file that exists: strip the prefix back to the parent directory
186 940 : initial_dir.pop();
187 940 : }
188 0 : Err(e) => {
189 0 : // Unexpected I/O error
190 0 : anyhow::bail!(e)
191 : }
192 : }
193 : }
194 : // Note that Utf8PathBuf starts_with only considers full path segments, but
195 : // object prefixes are arbitrary strings, so we need the strings for doing
196 : // starts_with later.
197 408 : let prefix = full_path.as_str();
198 408 :
199 408 : let mut files = vec![];
200 408 : let mut directory_queue = vec![initial_dir];
201 873 : while let Some(cur_folder) = directory_queue.pop() {
202 465 : let mut entries = cur_folder.read_dir_utf8()?;
203 779 : while let Some(Ok(entry)) = entries.next() {
204 314 : let file_name = entry.file_name();
205 314 : let full_file_name = cur_folder.join(file_name);
206 314 : if full_file_name.as_str().starts_with(prefix) {
207 148 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
208 148 : files.push(file_remote_path);
209 148 : if full_file_name.is_dir() {
210 57 : directory_queue.push(full_file_name);
211 91 : }
212 166 : }
213 : }
214 : }
215 :
216 408 : Ok(files)
217 408 : }
218 :
219 2874 : async fn upload0(
220 2874 : &self,
221 2874 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
222 2874 : data_size_bytes: usize,
223 2874 : to: &RemotePath,
224 2874 : metadata: Option<StorageMetadata>,
225 2874 : cancel: &CancellationToken,
226 2874 : ) -> anyhow::Result<()> {
227 2874 : let target_file_path = to.with_base(&self.storage_root);
228 2874 : create_target_directory(&target_file_path).await?;
229 : // We need this dance with sort of durable rename (without fsyncs)
230 : // to prevent partial uploads. This was really hit when pageserver shutdown
231 : // cancelled the upload and partial file was left on the fs
232 : // NOTE: Because temp file suffix always the same this operation is racy.
233 : // Two concurrent operations can lead to the following sequence:
234 : // T1: write(temp)
235 : // T2: write(temp) -> overwrites the content
236 : // T1: rename(temp, dst) -> succeeds
237 : // T2: rename(temp, dst) -> fails, temp no longet exists
238 : // This can be solved by supplying unique temp suffix every time, but this situation
239 : // is not normal in the first place, the error can help (and helped at least once)
240 : // to discover bugs in upper level synchronization.
241 2872 : let temp_file_path =
242 2872 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
243 2845 : let mut destination = io::BufWriter::new(
244 2872 : fs::OpenOptions::new()
245 2872 : .write(true)
246 2872 : .create(true)
247 2872 : .truncate(true)
248 2872 : .open(&temp_file_path)
249 2573 : .await
250 2845 : .with_context(|| {
251 0 : format!("Failed to open target fs destination at '{target_file_path}'")
252 2845 : })?,
253 : );
254 :
255 2845 : let from_size_bytes = data_size_bytes as u64;
256 2845 : let data = tokio_util::io::StreamReader::new(data);
257 2845 : let data = std::pin::pin!(data);
258 2845 : let mut buffer_to_read = data.take(from_size_bytes);
259 2845 :
260 2845 : // alternatively we could just write the bytes to a file, but local_fs is a testing utility
261 2845 : let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
262 :
263 2845 : let bytes_read = tokio::select! {
264 : biased;
265 2845 : _ = cancel.cancelled() => {
266 3 : let file = destination.into_inner();
267 3 : // wait for the inflight operation(s) to complete so that there could be a next
268 3 : // attempt right away and our writes are not directed to their file.
269 3 : file.into_std().await;
270 :
271 : // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
272 : // least.
273 3 : fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
274 3 : return Err(TimeoutOrCancel::Cancel.into());
275 : }
276 2845 : read = copy => read,
277 : };
278 :
279 2837 : let bytes_read =
280 2837 : bytes_read.with_context(|| {
281 0 : format!(
282 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
283 0 : )
284 2837 : })?;
285 :
286 2837 : if bytes_read < from_size_bytes {
287 3 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
288 2834 : }
289 2834 : // Check if there is any extra data after the given size.
290 2834 : let mut from = buffer_to_read.into_inner();
291 2834 : let extra_read = from.read(&mut [1]).await?;
292 2834 : ensure!(
293 2834 : extra_read == 0,
294 6 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
295 : );
296 :
297 2828 : destination.flush().await.with_context(|| {
298 0 : format!(
299 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
300 0 : )
301 2828 : })?;
302 :
303 2828 : fs::rename(temp_file_path, &target_file_path)
304 2528 : .await
305 2828 : .with_context(|| {
306 0 : format!(
307 0 : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
308 0 : )
309 2828 : })?;
310 :
311 2828 : if let Some(storage_metadata) = metadata {
312 : // FIXME: we must not be using metadata much, since this would forget the old metadata
313 : // for new writes? or perhaps metadata is sticky; could consider removing if it's never
314 : // used.
315 3 : let storage_metadata_path = storage_metadata_path(&target_file_path);
316 3 : fs::write(
317 3 : &storage_metadata_path,
318 3 : serde_json::to_string(&storage_metadata.0)
319 3 : .context("Failed to serialize storage metadata as json")?,
320 : )
321 3 : .await
322 3 : .with_context(|| {
323 0 : format!(
324 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
325 0 : )
326 3 : })?;
327 2825 : }
328 :
329 2828 : Ok(())
330 2840 : }
331 : }
332 :
333 : impl RemoteStorage for LocalFs {
334 0 : fn list_streaming(
335 0 : &self,
336 0 : prefix: Option<&RemotePath>,
337 0 : mode: ListingMode,
338 0 : max_keys: Option<NonZeroU32>,
339 0 : cancel: &CancellationToken,
340 0 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
341 0 : let listing = self.list(prefix, mode, max_keys, cancel);
342 0 : futures::stream::once(listing)
343 0 : }
344 :
345 408 : async fn list(
346 408 : &self,
347 408 : prefix: Option<&RemotePath>,
348 408 : mode: ListingMode,
349 408 : max_keys: Option<NonZeroU32>,
350 408 : cancel: &CancellationToken,
351 408 : ) -> Result<Listing, DownloadError> {
352 408 : let op = async {
353 408 : let mut result = Listing::default();
354 :
355 : // Filter out directories: in S3 directories don't exist, only the keys within them do.
356 408 : let keys = self
357 408 : .list_recursive(prefix)
358 1305 : .await
359 408 : .map_err(DownloadError::Other)?;
360 408 : let mut objects = Vec::with_capacity(keys.len());
361 556 : for key in keys {
362 148 : let path = key.with_base(&self.storage_root);
363 148 : let metadata = file_metadata(&path).await;
364 0 : if let Err(DownloadError::NotFound) = metadata {
365 : // Race: if the file is deleted between listing and metadata check, ignore it.
366 0 : continue;
367 148 : }
368 148 : let metadata = metadata?;
369 148 : if metadata.is_dir() {
370 57 : continue;
371 91 : }
372 91 : objects.push(ListingObject {
373 91 : key: key.clone(),
374 91 : last_modified: metadata.modified()?,
375 91 : size: metadata.len(),
376 : });
377 : }
378 408 : let objects = objects;
379 408 :
380 408 : if let ListingMode::NoDelimiter = mode {
381 201 : result.keys = objects;
382 201 : } else {
383 207 : let mut prefixes = HashSet::new();
384 271 : for object in objects {
385 64 : let key = object.key;
386 : // If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
387 64 : let relative_key = if let Some(prefix) = prefix {
388 55 : let mut prefix = prefix.clone();
389 55 : // We only strip the dirname of the prefix, so that when we strip it from the start of keys we
390 55 : // end up with full file/dir names.
391 55 : let prefix_full_local_path = prefix.with_base(&self.storage_root);
392 55 : let has_slash = prefix.0.to_string().ends_with('/');
393 55 : let strip_prefix = if prefix_full_local_path.is_dir() && has_slash {
394 31 : prefix
395 : } else {
396 24 : prefix.0.pop();
397 24 : prefix
398 : };
399 :
400 55 : RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap()
401 : } else {
402 9 : key
403 : };
404 :
405 64 : let relative_key = format!("{}", relative_key);
406 64 : if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
407 61 : let first_part = relative_key
408 61 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
409 61 : .next()
410 61 : .unwrap()
411 61 : .to_owned();
412 61 : prefixes.insert(first_part);
413 61 : } else {
414 3 : result.keys.push(ListingObject {
415 3 : key: RemotePath::from_string(&relative_key).unwrap(),
416 3 : last_modified: object.last_modified,
417 3 : size: object.size,
418 3 : });
419 3 : }
420 : }
421 207 : result.prefixes = prefixes
422 207 : .into_iter()
423 207 : .map(|s| RemotePath::from_string(&s).unwrap())
424 207 : .collect();
425 207 : }
426 :
427 408 : if let Some(max_keys) = max_keys {
428 0 : result.keys.truncate(max_keys.get() as usize);
429 408 : }
430 408 : Ok(result)
431 408 : };
432 :
433 408 : let timeout = async {
434 1150 : tokio::time::sleep(self.timeout).await;
435 0 : Err(DownloadError::Timeout)
436 0 : };
437 :
438 408 : let cancelled = async {
439 1291 : cancel.cancelled().await;
440 0 : Err(DownloadError::Cancelled)
441 0 : };
442 :
443 408 : tokio::select! {
444 408 : res = op => res,
445 408 : res = timeout => res,
446 408 : res = cancelled => res,
447 : }
448 408 : }
449 :
450 0 : async fn head_object(
451 0 : &self,
452 0 : key: &RemotePath,
453 0 : _cancel: &CancellationToken,
454 0 : ) -> Result<ListingObject, DownloadError> {
455 0 : let target_file_path = key.with_base(&self.storage_root);
456 0 : let metadata = file_metadata(&target_file_path).await?;
457 : Ok(ListingObject {
458 0 : key: key.clone(),
459 0 : last_modified: metadata.modified()?,
460 0 : size: metadata.len(),
461 : })
462 0 : }
463 :
464 2874 : async fn upload(
465 2874 : &self,
466 2874 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
467 2874 : data_size_bytes: usize,
468 2874 : to: &RemotePath,
469 2874 : metadata: Option<StorageMetadata>,
470 2874 : cancel: &CancellationToken,
471 2874 : ) -> anyhow::Result<()> {
472 2874 : let cancel = cancel.child_token();
473 2874 :
474 2874 : let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
475 2874 : let mut op = std::pin::pin!(op);
476 :
477 : // race the upload0 to the timeout; if it goes over, do a graceful shutdown
478 2874 : let (res, timeout) = tokio::select! {
479 2874 : res = &mut op => (res, false),
480 2874 : _ = tokio::time::sleep(self.timeout) => {
481 0 : cancel.cancel();
482 0 : (op.await, true)
483 : }
484 : };
485 :
486 12 : match res {
487 12 : Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
488 0 : // we caused this cancel (or they happened simultaneously) -- swap it out to
489 0 : // Timeout
490 0 : Err(TimeoutOrCancel::Timeout.into())
491 : }
492 2840 : res => res,
493 : }
494 2840 : }
495 :
496 657 : async fn download(
497 657 : &self,
498 657 : from: &RemotePath,
499 657 : opts: &DownloadOpts,
500 657 : cancel: &CancellationToken,
501 657 : ) -> Result<Download, DownloadError> {
502 657 : let target_path = from.with_base(&self.storage_root);
503 :
504 657 : let file_metadata = file_metadata(&target_path).await?;
505 64 : let etag = mock_etag(&file_metadata);
506 64 :
507 64 : if opts.etag.as_ref() == Some(&etag) {
508 0 : return Err(DownloadError::Unmodified);
509 64 : }
510 :
511 64 : let mut file = fs::OpenOptions::new()
512 64 : .read(true)
513 64 : .open(&target_path)
514 57 : .await
515 64 : .with_context(|| {
516 0 : format!("Failed to open source file {target_path:?} to use in the download")
517 64 : })
518 64 : .map_err(DownloadError::Other)?;
519 :
520 64 : let mut take = file_metadata.len();
521 64 : if let Some((start, end)) = opts.byte_range() {
522 15 : if start > 0 {
523 6 : file.seek(io::SeekFrom::Start(start))
524 6 : .await
525 6 : .context("Failed to seek to the range start in a local storage file")
526 6 : .map_err(DownloadError::Other)?;
527 9 : }
528 15 : if let Some(end) = end {
529 9 : take = end - start;
530 9 : }
531 49 : }
532 :
533 61 : let source = ReaderStream::new(file.take(take));
534 :
535 61 : let metadata = self
536 61 : .read_storage_metadata(&target_path)
537 6 : .await
538 61 : .map_err(DownloadError::Other)?;
539 :
540 61 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
541 61 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
542 61 :
543 61 : Ok(Download {
544 61 : metadata,
545 61 : last_modified: file_metadata
546 61 : .modified()
547 61 : .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
548 61 : etag,
549 61 : download_stream: Box::pin(source),
550 : })
551 654 : }
552 :
553 14 : async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
554 14 : let file_path = path.with_base(&self.storage_root);
555 14 : match fs::remove_file(&file_path).await {
556 11 : Ok(()) => Ok(()),
557 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
558 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
559 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
560 3 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
561 0 : Err(e) => Err(anyhow::anyhow!(e)),
562 : }
563 14 : }
564 :
565 6 : async fn delete_objects<'a>(
566 6 : &self,
567 6 : paths: &'a [RemotePath],
568 6 : cancel: &CancellationToken,
569 6 : ) -> anyhow::Result<()> {
570 12 : for path in paths {
571 6 : self.delete(path, cancel).await?
572 : }
573 6 : Ok(())
574 6 : }
575 :
576 0 : async fn copy(
577 0 : &self,
578 0 : from: &RemotePath,
579 0 : to: &RemotePath,
580 0 : _cancel: &CancellationToken,
581 0 : ) -> anyhow::Result<()> {
582 0 : let from_path = from.with_base(&self.storage_root);
583 0 : let to_path = to.with_base(&self.storage_root);
584 0 : create_target_directory(&to_path).await?;
585 0 : fs::copy(&from_path, &to_path).await.with_context(|| {
586 0 : format!(
587 0 : "Failed to copy file from '{from_path}' to '{to_path}'",
588 0 : from_path = from_path,
589 0 : to_path = to_path
590 0 : )
591 0 : })?;
592 0 : Ok(())
593 0 : }
594 :
595 0 : async fn time_travel_recover(
596 0 : &self,
597 0 : _prefix: Option<&RemotePath>,
598 0 : _timestamp: SystemTime,
599 0 : _done_if_after: SystemTime,
600 0 : _cancel: &CancellationToken,
601 0 : ) -> Result<(), TimeTravelError> {
602 0 : Err(TimeTravelError::Unimplemented)
603 0 : }
604 : }
605 :
606 64 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
607 64 : path_with_suffix_extension(original_path, "metadata")
608 64 : }
609 :
610 2874 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
611 2874 : let target_dir = match target_file_path.parent() {
612 2874 : Some(parent_dir) => parent_dir,
613 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
614 : };
615 2874 : if !target_dir.exists() {
616 408 : fs::create_dir_all(target_dir).await?;
617 2466 : }
618 2872 : Ok(())
619 2872 : }
620 :
621 805 : async fn file_metadata(file_path: &Utf8Path) -> Result<std::fs::Metadata, DownloadError> {
622 805 : tokio::fs::metadata(&file_path).await.map_err(|e| {
623 593 : if e.kind() == ErrorKind::NotFound {
624 593 : DownloadError::NotFound
625 : } else {
626 0 : DownloadError::BadInput(e.into())
627 : }
628 805 : })
629 805 : }
630 :
631 : // Use mtime as stand-in for ETag. We could calculate a meaningful one by md5'ing the contents of files we
632 : // read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests
633 : // quickly, with less overhead than using a mock S3 server.
634 64 : fn mock_etag(meta: &std::fs::Metadata) -> Etag {
635 64 : let mtime = meta.modified().expect("Filesystem mtime missing");
636 64 : format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into()
637 64 : }
638 :
639 : #[cfg(test)]
640 : mod fs_tests {
641 : use super::*;
642 :
643 : use camino_tempfile::tempdir;
644 : use std::{collections::HashMap, io::Write, ops::Bound};
645 :
646 9 : async fn read_and_check_metadata(
647 9 : storage: &LocalFs,
648 9 : remote_storage_path: &RemotePath,
649 9 : expected_metadata: Option<&StorageMetadata>,
650 9 : ) -> anyhow::Result<String> {
651 9 : let cancel = CancellationToken::new();
652 9 : let download = storage
653 9 : .download(remote_storage_path, &DownloadOpts::default(), &cancel)
654 21 : .await
655 9 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
656 9 : ensure!(
657 9 : download.metadata.as_ref() == expected_metadata,
658 0 : "Unexpected metadata returned for the downloaded file"
659 : );
660 :
661 9 : let contents = aggregate(download.download_stream).await?;
662 :
663 9 : String::from_utf8(contents).map_err(anyhow::Error::new)
664 9 : }
665 :
666 : #[tokio::test]
667 3 : async fn upload_file() -> anyhow::Result<()> {
668 3 : let (storage, cancel) = create_storage()?;
669 3 :
670 18 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
671 3 : assert_eq!(
672 9 : storage.list_all().await?,
673 3 : vec![target_path_1.clone()],
674 3 : "Should list a single file after first upload"
675 3 : );
676 3 :
677 18 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
678 3 : assert_eq!(
679 9 : list_files_sorted(&storage).await?,
680 3 : vec![target_path_1.clone(), target_path_2.clone()],
681 3 : "Should list a two different files after second upload"
682 3 : );
683 3 :
684 3 : Ok(())
685 3 : }
686 :
687 : #[tokio::test]
688 3 : async fn upload_file_negatives() -> anyhow::Result<()> {
689 3 : let (storage, cancel) = create_storage()?;
690 3 :
691 3 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
692 3 : let content = Bytes::from_static(b"12345");
693 12 : let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
694 3 :
695 3 : // Check that you get an error if the size parameter doesn't match the actual
696 3 : // size of the stream.
697 3 : storage
698 3 : .upload(content(), 0, &id, None, &cancel)
699 3 : .await
700 3 : .expect_err("upload with zero size succeeded");
701 3 : storage
702 3 : .upload(content(), 4, &id, None, &cancel)
703 6 : .await
704 3 : .expect_err("upload with too short size succeeded");
705 3 : storage
706 3 : .upload(content(), 6, &id, None, &cancel)
707 6 : .await
708 3 : .expect_err("upload with too large size succeeded");
709 3 :
710 3 : // Correct size is 5, this should succeed.
711 8 : storage.upload(content(), 5, &id, None, &cancel).await?;
712 3 :
713 3 : Ok(())
714 3 : }
715 :
716 33 : fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
717 33 : let storage_root = tempdir()?.path().to_path_buf();
718 33 : LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
719 33 : }
720 :
721 : #[tokio::test]
722 3 : async fn download_file() -> anyhow::Result<()> {
723 3 : let (storage, cancel) = create_storage()?;
724 3 : let upload_name = "upload_1";
725 18 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
726 3 :
727 9 : let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
728 3 : assert_eq!(
729 3 : dummy_contents(upload_name),
730 3 : contents,
731 3 : "We should upload and download the same contents"
732 3 : );
733 3 :
734 3 : let non_existing_path = RemotePath::new(Utf8Path::new("somewhere/else"))?;
735 3 : match storage.download(&non_existing_path, &DownloadOpts::default(), &cancel).await {
736 3 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
737 3 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
738 3 : }
739 3 : Ok(())
740 3 : }
741 :
742 : #[tokio::test]
743 3 : async fn download_file_range_positive() -> anyhow::Result<()> {
744 3 : let (storage, cancel) = create_storage()?;
745 3 : let upload_name = "upload_1";
746 17 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
747 3 :
748 3 : let full_range_download_contents =
749 9 : read_and_check_metadata(&storage, &upload_target, None).await?;
750 3 : assert_eq!(
751 3 : dummy_contents(upload_name),
752 3 : full_range_download_contents,
753 3 : "Download full range should return the whole upload"
754 3 : );
755 3 :
756 3 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
757 3 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
758 3 :
759 3 : let first_part_download = storage
760 3 : .download(
761 3 : &upload_target,
762 3 : &DownloadOpts {
763 3 : byte_end: Bound::Excluded(first_part_local.len() as u64),
764 3 : ..Default::default()
765 3 : },
766 3 : &cancel,
767 3 : )
768 6 : .await?;
769 3 : assert!(
770 3 : first_part_download.metadata.is_none(),
771 3 : "No metadata should be returned for no metadata upload"
772 3 : );
773 3 :
774 3 : let first_part_remote = aggregate(first_part_download.download_stream).await?;
775 3 : assert_eq!(
776 3 : first_part_local, first_part_remote,
777 3 : "First part bytes should be returned when requested"
778 3 : );
779 3 :
780 3 : let second_part_download = storage
781 3 : .download(
782 3 : &upload_target,
783 3 : &DownloadOpts {
784 3 : byte_start: Bound::Included(first_part_local.len() as u64),
785 3 : byte_end: Bound::Excluded(
786 3 : (first_part_local.len() + second_part_local.len()) as u64,
787 3 : ),
788 3 : ..Default::default()
789 3 : },
790 3 : &cancel,
791 3 : )
792 9 : .await?;
793 3 : assert!(
794 3 : second_part_download.metadata.is_none(),
795 3 : "No metadata should be returned for no metadata upload"
796 3 : );
797 3 :
798 3 : let second_part_remote = aggregate(second_part_download.download_stream).await?;
799 3 : assert_eq!(
800 3 : second_part_local, second_part_remote,
801 3 : "Second part bytes should be returned when requested"
802 3 : );
803 3 :
804 3 : let suffix_bytes = storage
805 3 : .download(
806 3 : &upload_target,
807 3 : &DownloadOpts {
808 3 : byte_start: Bound::Included(13),
809 3 : ..Default::default()
810 3 : },
811 3 : &cancel,
812 3 : )
813 9 : .await?
814 3 : .download_stream;
815 6 : let suffix_bytes = aggregate(suffix_bytes).await?;
816 3 : let suffix = std::str::from_utf8(&suffix_bytes)?;
817 3 : assert_eq!(upload_name, suffix);
818 3 :
819 3 : let all_bytes = storage
820 3 : .download(&upload_target, &DownloadOpts::default(), &cancel)
821 6 : .await?
822 3 : .download_stream;
823 3 : let all_bytes = aggregate(all_bytes).await?;
824 3 : let all_bytes = std::str::from_utf8(&all_bytes)?;
825 3 : assert_eq!(dummy_contents("upload_1"), all_bytes);
826 3 :
827 3 : Ok(())
828 3 : }
829 :
830 : #[tokio::test]
831 : #[should_panic(expected = "at or before start")]
832 3 : async fn download_file_range_negative() {
833 3 : let (storage, cancel) = create_storage().unwrap();
834 3 : let upload_name = "upload_1";
835 3 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel)
836 18 : .await
837 3 : .unwrap();
838 3 :
839 3 : storage
840 3 : .download(
841 3 : &upload_target,
842 3 : &DownloadOpts {
843 3 : byte_start: Bound::Included(10),
844 3 : byte_end: Bound::Excluded(10),
845 3 : ..Default::default()
846 3 : },
847 3 : &cancel,
848 3 : )
849 6 : .await
850 3 : .unwrap();
851 3 : }
852 :
853 : #[tokio::test]
854 3 : async fn delete_file() -> anyhow::Result<()> {
855 3 : let (storage, cancel) = create_storage()?;
856 3 : let upload_name = "upload_1";
857 18 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
858 3 :
859 3 : storage.delete(&upload_target, &cancel).await?;
860 9 : assert!(storage.list_all().await?.is_empty());
861 3 :
862 3 : storage
863 3 : .delete(&upload_target, &cancel)
864 3 : .await
865 3 : .expect("Should allow deleting non-existing storage files");
866 3 :
867 3 : Ok(())
868 3 : }
869 :
870 : #[tokio::test]
871 3 : async fn file_with_metadata() -> anyhow::Result<()> {
872 3 : let (storage, cancel) = create_storage()?;
873 3 : let upload_name = "upload_1";
874 3 : let metadata = StorageMetadata(HashMap::from([
875 3 : ("one".to_string(), "1".to_string()),
876 3 : ("two".to_string(), "2".to_string()),
877 3 : ]));
878 3 : let upload_target =
879 20 : upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
880 3 :
881 3 : let full_range_download_contents =
882 12 : read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
883 3 : assert_eq!(
884 3 : dummy_contents(upload_name),
885 3 : full_range_download_contents,
886 3 : "We should upload and download the same contents"
887 3 : );
888 3 :
889 3 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
890 3 : let (first_part_local, _) = uploaded_bytes.split_at(3);
891 3 :
892 3 : let partial_download_with_metadata = storage
893 3 : .download(
894 3 : &upload_target,
895 3 : &DownloadOpts {
896 3 : byte_end: Bound::Excluded(first_part_local.len() as u64),
897 3 : ..Default::default()
898 3 : },
899 3 : &cancel,
900 3 : )
901 9 : .await?;
902 3 : let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
903 3 : assert_eq!(
904 3 : first_part_local,
905 3 : first_part_remote.as_slice(),
906 3 : "First part bytes should be returned when requested"
907 3 : );
908 3 :
909 3 : assert_eq!(
910 3 : partial_download_with_metadata.metadata,
911 3 : Some(metadata),
912 3 : "We should get the same metadata back for partial download"
913 3 : );
914 3 :
915 3 : Ok(())
916 3 : }
917 :
918 : #[tokio::test]
919 3 : async fn list() -> anyhow::Result<()> {
920 3 : // No delimiter: should recursively list everything
921 3 : let (storage, cancel) = create_storage()?;
922 18 : let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
923 3 : let child_sibling =
924 18 : upload_dummy_file(&storage, "grandparent/parent/child_sibling", None, &cancel).await?;
925 16 : let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
926 3 :
927 3 : let listing = storage
928 3 : .list(None, ListingMode::NoDelimiter, None, &cancel)
929 27 : .await?;
930 3 : assert!(listing.prefixes.is_empty());
931 3 : assert_eq!(
932 3 : listing
933 3 : .keys
934 3 : .into_iter()
935 9 : .map(|o| o.key)
936 3 : .collect::<HashSet<_>>(),
937 3 : HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()])
938 3 : );
939 3 :
940 3 : // Delimiter: should only go one deep
941 3 : let listing = storage
942 3 : .list(None, ListingMode::WithDelimiter, None, &cancel)
943 18 : .await?;
944 3 :
945 3 : assert_eq!(
946 3 : listing.prefixes,
947 3 : [RemotePath::from_string("timelines").unwrap()].to_vec()
948 3 : );
949 3 : assert!(listing.keys.is_empty());
950 3 :
951 3 : // Delimiter & prefix with a trailing slash
952 3 : let listing = storage
953 3 : .list(
954 3 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent/").unwrap()),
955 3 : ListingMode::WithDelimiter,
956 3 : None,
957 3 : &cancel,
958 3 : )
959 10 : .await?;
960 3 : assert_eq!(
961 3 : listing.keys.into_iter().map(|o| o.key).collect::<Vec<_>>(),
962 3 : [RemotePath::from_string("uncle").unwrap()].to_vec()
963 3 : );
964 3 : assert_eq!(
965 3 : listing.prefixes,
966 3 : [RemotePath::from_string("parent").unwrap()].to_vec()
967 3 : );
968 3 :
969 3 : // Delimiter and prefix without a trailing slash
970 3 : let listing = storage
971 3 : .list(
972 3 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
973 3 : ListingMode::WithDelimiter,
974 3 : None,
975 3 : &cancel,
976 3 : )
977 12 : .await?;
978 3 : assert_eq!(listing.keys, vec![]);
979 3 : assert_eq!(
980 3 : listing.prefixes,
981 3 : [RemotePath::from_string("grandparent").unwrap()].to_vec()
982 3 : );
983 3 :
984 3 : // Delimiter and prefix that's partway through a path component
985 3 : let listing = storage
986 3 : .list(
987 3 : Some(&RemotePath::from_string("timelines/some_timeline/grandp").unwrap()),
988 3 : ListingMode::WithDelimiter,
989 3 : None,
990 3 : &cancel,
991 3 : )
992 12 : .await?;
993 3 : assert_eq!(listing.keys, vec![]);
994 3 : assert_eq!(
995 3 : listing.prefixes,
996 3 : [RemotePath::from_string("grandparent").unwrap()].to_vec()
997 3 : );
998 3 :
999 3 : Ok(())
1000 3 : }
1001 :
1002 : #[tokio::test]
1003 3 : async fn list_part_component() -> anyhow::Result<()> {
1004 3 : // No delimiter: should recursively list everything
1005 3 : let (storage, cancel) = create_storage()?;
1006 3 :
1007 3 : // Imitates what happens in a tenant path when we have an unsharded path and a sharded path, and do a listing
1008 3 : // of the unsharded path: although there is a "directory" at the unsharded path, it should be handled as
1009 3 : // a freeform prefix.
1010 3 : let _child_a =
1011 18 : upload_dummy_file(&storage, "grandparent/tenant-01/child", None, &cancel).await?;
1012 3 : let _child_b =
1013 18 : upload_dummy_file(&storage, "grandparent/tenant/child", None, &cancel).await?;
1014 3 :
1015 3 : // Delimiter and prefix that's partway through a path component
1016 3 : let listing = storage
1017 3 : .list(
1018 3 : Some(
1019 3 : &RemotePath::from_string("timelines/some_timeline/grandparent/tenant").unwrap(),
1020 3 : ),
1021 3 : ListingMode::WithDelimiter,
1022 3 : None,
1023 3 : &cancel,
1024 3 : )
1025 15 : .await?;
1026 3 : assert_eq!(listing.keys, vec![]);
1027 3 :
1028 3 : let mut found_prefixes = listing.prefixes.clone();
1029 3 : found_prefixes.sort();
1030 3 : assert_eq!(
1031 3 : found_prefixes,
1032 3 : [
1033 3 : RemotePath::from_string("tenant").unwrap(),
1034 3 : RemotePath::from_string("tenant-01").unwrap(),
1035 3 : ]
1036 3 : .to_vec()
1037 3 : );
1038 3 :
1039 3 : Ok(())
1040 3 : }
1041 :
1042 : #[tokio::test]
1043 3 : async fn overwrite_shorter_file() -> anyhow::Result<()> {
1044 3 : let (storage, cancel) = create_storage()?;
1045 3 :
1046 3 : let path = RemotePath::new("does/not/matter/file".into())?;
1047 3 :
1048 3 : let body = Bytes::from_static(b"long file contents is long");
1049 3 : {
1050 3 : let len = body.len();
1051 3 : let body =
1052 3 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1053 12 : storage.upload(body, len, &path, None, &cancel).await?;
1054 3 : }
1055 3 :
1056 3 : let read = aggregate(
1057 3 : storage
1058 3 : .download(&path, &DownloadOpts::default(), &cancel)
1059 6 : .await?
1060 3 : .download_stream,
1061 3 : )
1062 3 : .await?;
1063 3 : assert_eq!(body, read);
1064 3 :
1065 3 : let shorter = Bytes::from_static(b"shorter body");
1066 3 : {
1067 3 : let len = shorter.len();
1068 3 : let body =
1069 3 : futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
1070 6 : storage.upload(body, len, &path, None, &cancel).await?;
1071 3 : }
1072 3 :
1073 3 : let read = aggregate(
1074 3 : storage
1075 3 : .download(&path, &DownloadOpts::default(), &cancel)
1076 4 : .await?
1077 3 : .download_stream,
1078 3 : )
1079 3 : .await?;
1080 3 : assert_eq!(shorter, read);
1081 3 : Ok(())
1082 3 : }
1083 :
1084 : #[tokio::test]
1085 3 : async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
1086 3 : let (storage, cancel) = create_storage()?;
1087 3 :
1088 3 : let path = RemotePath::new("does/not/matter/file".into())?;
1089 3 :
1090 3 : let body = Bytes::from_static(b"long file contents is long");
1091 3 : {
1092 3 : let len = body.len();
1093 3 : let body =
1094 3 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1095 3 : let cancel = cancel.child_token();
1096 3 : cancel.cancel();
1097 3 : let e = storage
1098 3 : .upload(body, len, &path, None, &cancel)
1099 9 : .await
1100 3 : .unwrap_err();
1101 3 :
1102 3 : assert!(TimeoutOrCancel::caused_by_cancel(&e));
1103 3 : }
1104 3 :
1105 3 : {
1106 3 : let len = body.len();
1107 3 : let body =
1108 3 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1109 9 : storage.upload(body, len, &path, None, &cancel).await?;
1110 3 : }
1111 3 :
1112 3 : let read = aggregate(
1113 3 : storage
1114 3 : .download(&path, &DownloadOpts::default(), &cancel)
1115 6 : .await?
1116 3 : .download_stream,
1117 3 : )
1118 3 : .await?;
1119 3 : assert_eq!(body, read);
1120 3 :
1121 3 : Ok(())
1122 3 : }
1123 :
1124 36 : async fn upload_dummy_file(
1125 36 : storage: &LocalFs,
1126 36 : name: &str,
1127 36 : metadata: Option<StorageMetadata>,
1128 36 : cancel: &CancellationToken,
1129 36 : ) -> anyhow::Result<RemotePath> {
1130 36 : let from_path = storage
1131 36 : .storage_root
1132 36 : .join("timelines")
1133 36 : .join("some_timeline")
1134 36 : .join(name);
1135 36 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
1136 :
1137 36 : let relative_path = from_path
1138 36 : .strip_prefix(&storage.storage_root)
1139 36 : .context("Failed to strip storage root prefix")
1140 36 : .and_then(RemotePath::new)
1141 36 : .with_context(|| {
1142 0 : format!(
1143 0 : "Failed to resolve remote part of path {:?} for base {:?}",
1144 0 : from_path, storage.storage_root
1145 0 : )
1146 36 : })?;
1147 :
1148 36 : let file = tokio_util::io::ReaderStream::new(file);
1149 36 :
1150 36 : storage
1151 36 : .upload(file, size, &relative_path, metadata, cancel)
1152 179 : .await?;
1153 36 : Ok(relative_path)
1154 36 : }
1155 :
1156 36 : async fn create_file_for_upload(
1157 36 : path: &Utf8Path,
1158 36 : contents: &str,
1159 36 : ) -> anyhow::Result<(fs::File, usize)> {
1160 36 : std::fs::create_dir_all(path.parent().unwrap())?;
1161 36 : let mut file_for_writing = std::fs::OpenOptions::new()
1162 36 : .write(true)
1163 36 : .create_new(true)
1164 36 : .open(path)?;
1165 36 : write!(file_for_writing, "{}", contents)?;
1166 36 : drop(file_for_writing);
1167 36 : let file_size = path.metadata()?.len() as usize;
1168 36 : Ok((
1169 36 : fs::OpenOptions::new().read(true).open(&path).await?,
1170 36 : file_size,
1171 : ))
1172 36 : }
1173 :
1174 54 : fn dummy_contents(name: &str) -> String {
1175 54 : format!("contents for {name}")
1176 54 : }
1177 :
1178 3 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
1179 9 : let mut files = storage.list_all().await?;
1180 3 : files.sort_by(|a, b| a.0.cmp(&b.0));
1181 3 : Ok(files)
1182 3 : }
1183 :
1184 33 : async fn aggregate(
1185 33 : stream: impl Stream<Item = std::io::Result<Bytes>>,
1186 33 : ) -> anyhow::Result<Vec<u8>> {
1187 : use futures::stream::StreamExt;
1188 33 : let mut out = Vec::new();
1189 33 : let mut stream = std::pin::pin!(stream);
1190 66 : while let Some(res) = stream.next().await {
1191 33 : out.extend_from_slice(&res?[..]);
1192 : }
1193 33 : Ok(out)
1194 33 : }
1195 : }
|