Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{
8 : collections::HashSet,
9 : io::ErrorKind,
10 : num::NonZeroU32,
11 : time::{Duration, SystemTime, UNIX_EPOCH},
12 : };
13 :
14 : use anyhow::{bail, ensure, Context};
15 : use bytes::Bytes;
16 : use camino::{Utf8Path, Utf8PathBuf};
17 : use futures::stream::Stream;
18 : use tokio::{
19 : fs,
20 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
21 : };
22 : use tokio_util::{io::ReaderStream, sync::CancellationToken};
23 : use utils::crashsafe::path_with_suffix_extension;
24 :
25 : use crate::{
26 : Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorageActivity,
27 : TimeTravelError, TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
28 : };
29 :
30 : use super::{RemoteStorage, StorageMetadata};
31 : use crate::Etag;
32 :
33 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
34 :
35 : #[derive(Debug, Clone)]
36 : pub struct LocalFs {
37 : storage_root: Utf8PathBuf,
38 : timeout: Duration,
39 : }
40 :
41 : impl LocalFs {
42 : /// Attempts to create local FS storage, along with its root directory.
43 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
44 168 : pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
45 168 : if !storage_root.exists() {
46 22 : std::fs::create_dir_all(&storage_root).with_context(|| {
47 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
48 22 : })?;
49 146 : }
50 168 : if !storage_root.is_absolute() {
51 130 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
52 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
53 130 : })?;
54 38 : }
55 :
56 168 : Ok(Self {
57 168 : storage_root,
58 168 : timeout,
59 168 : })
60 168 : }
61 :
62 : // mirrors S3Bucket::s3_object_to_relative_path
63 114 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
64 114 : let relative_path = key
65 114 : .strip_prefix(&self.storage_root)
66 114 : .expect("relative path must contain storage_root as prefix");
67 114 : RemotePath(relative_path.into())
68 114 : }
69 :
70 50 : async fn read_storage_metadata(
71 50 : &self,
72 50 : file_path: &Utf8Path,
73 50 : ) -> anyhow::Result<Option<StorageMetadata>> {
74 50 : let metadata_path = storage_metadata_path(file_path);
75 50 : if metadata_path.exists() && metadata_path.is_file() {
76 4 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
77 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
78 4 : })?;
79 :
80 4 : serde_json::from_str(&metadata_string)
81 4 : .with_context(|| {
82 0 : format!(
83 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
84 0 : )
85 4 : })
86 4 : .map(|metadata| Some(StorageMetadata(metadata)))
87 : } else {
88 46 : Ok(None)
89 : }
90 50 : }
91 :
92 : #[cfg(test)]
93 6 : async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
94 6 : use std::{future::Future, pin::Pin};
95 18 : fn get_all_files<'a, P>(
96 18 : directory_path: P,
97 18 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
98 18 : where
99 18 : P: AsRef<Utf8Path> + Send + Sync + 'a,
100 18 : {
101 18 : Box::pin(async move {
102 18 : let directory_path = directory_path.as_ref();
103 18 : if directory_path.exists() {
104 18 : if directory_path.is_dir() {
105 18 : let mut paths = Vec::new();
106 18 : let mut dir_contents = fs::read_dir(directory_path).await?;
107 36 : while let Some(dir_entry) = dir_contents.next_entry().await? {
108 18 : let file_type = dir_entry.file_type().await?;
109 18 : let entry_path =
110 18 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
111 0 : anyhow::Error::msg(format!(
112 0 : "non-Unicode path: {}",
113 0 : pb.to_string_lossy()
114 0 : ))
115 18 : })?;
116 18 : if file_type.is_symlink() {
117 6 : tracing::debug!("{entry_path:?} is a symlink, skipping")
118 18 : } else if file_type.is_dir() {
119 18 : paths.extend(get_all_files(&entry_path).await?.into_iter())
120 6 : } else {
121 6 : paths.push(entry_path);
122 6 : }
123 6 : }
124 18 : Ok(paths)
125 6 : } else {
126 6 : bail!("Path {directory_path:?} is not a directory")
127 6 : }
128 6 : } else {
129 6 : Ok(Vec::new())
130 6 : }
131 18 : })
132 18 : }
133 6 :
134 6 : Ok(get_all_files(&self.storage_root)
135 18 : .await?
136 6 : .into_iter()
137 6 : .map(|path| {
138 6 : path.strip_prefix(&self.storage_root)
139 6 : .context("Failed to strip storage root prefix")
140 6 : .and_then(RemotePath::new)
141 6 : .expect(
142 6 : "We list files for storage root, hence should be able to remote the prefix",
143 6 : )
144 6 : })
145 6 : .collect())
146 6 : }
147 :
148 : // recursively lists all files in a directory,
149 : // mirroring the `list_files` for `s3_bucket`
150 148 : async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
151 148 : let full_path = match folder {
152 144 : Some(folder) => folder.with_base(&self.storage_root),
153 4 : None => self.storage_root.clone(),
154 : };
155 :
156 : // If we were given a directory, we may use it as our starting point.
157 : // Otherwise, we must go up to the first ancestor dir that exists. This is because
158 : // S3 object list prefixes can be arbitrary strings, but when reading
159 : // the local filesystem we need a directory to start calling read_dir on.
160 148 : let mut initial_dir = full_path.clone();
161 148 :
162 148 : // If there's no trailing slash, we have to start looking from one above: even if
163 148 : // `initial_dir` is a directory, we should still list any prefixes in the parent
164 148 : // that start with the same string.
165 148 : if !full_path.to_string().ends_with('/') {
166 16 : initial_dir.pop();
167 132 : }
168 :
169 526 : loop {
170 526 : // Did we make it to the root?
171 526 : if initial_dir.parent().is_none() {
172 0 : anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
173 526 : }
174 526 :
175 526 : match fs::metadata(initial_dir.clone()).await {
176 148 : Ok(meta) if meta.is_dir() => {
177 148 : // We found a directory, break
178 148 : break;
179 : }
180 0 : Ok(_meta) => {
181 0 : // It's not a directory: strip back to the parent
182 0 : initial_dir.pop();
183 0 : }
184 378 : Err(e) if e.kind() == ErrorKind::NotFound => {
185 378 : // It's not a file that exists: strip the prefix back to the parent directory
186 378 : initial_dir.pop();
187 378 : }
188 0 : Err(e) => {
189 0 : // Unexpected I/O error
190 0 : anyhow::bail!(e)
191 : }
192 : }
193 : }
194 : // Note that Utf8PathBuf starts_with only considers full path segments, but
195 : // object prefixes are arbitrary strings, so we need the strings for doing
196 : // starts_with later.
197 148 : let prefix = full_path.as_str();
198 148 :
199 148 : let mut files = vec![];
200 148 : let mut directory_queue = vec![initial_dir];
201 336 : while let Some(cur_folder) = directory_queue.pop() {
202 188 : let mut entries = cur_folder.read_dir_utf8()?;
203 402 : while let Some(Ok(entry)) = entries.next() {
204 214 : let file_name = entry.file_name();
205 214 : let full_file_name = cur_folder.join(file_name);
206 214 : if full_file_name.as_str().starts_with(prefix) {
207 114 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
208 114 : files.push(file_remote_path);
209 114 : if full_file_name.is_dir() {
210 40 : directory_queue.push(full_file_name);
211 74 : }
212 100 : }
213 : }
214 : }
215 :
216 148 : Ok(files)
217 148 : }
218 :
219 2546 : async fn upload0(
220 2546 : &self,
221 2546 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
222 2546 : data_size_bytes: usize,
223 2546 : to: &RemotePath,
224 2546 : metadata: Option<StorageMetadata>,
225 2546 : cancel: &CancellationToken,
226 2546 : ) -> anyhow::Result<()> {
227 2546 : let target_file_path = to.with_base(&self.storage_root);
228 2546 : create_target_directory(&target_file_path).await?;
229 : // We need this dance with sort of durable rename (without fsyncs)
230 : // to prevent partial uploads. This was really hit when pageserver shutdown
231 : // cancelled the upload and partial file was left on the fs
232 : // NOTE: Because temp file suffix always the same this operation is racy.
233 : // Two concurrent operations can lead to the following sequence:
234 : // T1: write(temp)
235 : // T2: write(temp) -> overwrites the content
236 : // T1: rename(temp, dst) -> succeeds
237 : // T2: rename(temp, dst) -> fails, temp no longet exists
238 : // This can be solved by supplying unique temp suffix every time, but this situation
239 : // is not normal in the first place, the error can help (and helped at least once)
240 : // to discover bugs in upper level synchronization.
241 2544 : let temp_file_path =
242 2544 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
243 2540 : let mut destination = io::BufWriter::new(
244 2544 : fs::OpenOptions::new()
245 2544 : .write(true)
246 2544 : .create(true)
247 2544 : .truncate(true)
248 2544 : .open(&temp_file_path)
249 2285 : .await
250 2540 : .with_context(|| {
251 0 : format!("Failed to open target fs destination at '{target_file_path}'")
252 2540 : })?,
253 : );
254 :
255 2540 : let from_size_bytes = data_size_bytes as u64;
256 2540 : let data = tokio_util::io::StreamReader::new(data);
257 2540 : let data = std::pin::pin!(data);
258 2540 : let mut buffer_to_read = data.take(from_size_bytes);
259 2540 :
260 2540 : // alternatively we could just write the bytes to a file, but local_fs is a testing utility
261 2540 : let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
262 :
263 2526 : let bytes_read = tokio::select! {
264 : biased;
265 : _ = cancel.cancelled() => {
266 : let file = destination.into_inner();
267 : // wait for the inflight operation(s) to complete so that there could be a next
268 : // attempt right away and our writes are not directed to their file.
269 : file.into_std().await;
270 :
271 : // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
272 : // least.
273 : fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
274 : return Err(TimeoutOrCancel::Cancel.into());
275 : }
276 : read = copy => read,
277 : };
278 :
279 2526 : let bytes_read =
280 2526 : bytes_read.with_context(|| {
281 0 : format!(
282 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
283 0 : )
284 2526 : })?;
285 :
286 2526 : if bytes_read < from_size_bytes {
287 2 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
288 2524 : }
289 2524 : // Check if there is any extra data after the given size.
290 2524 : let mut from = buffer_to_read.into_inner();
291 2524 : let extra_read = from.read(&mut [1]).await?;
292 2523 : ensure!(
293 2523 : extra_read == 0,
294 4 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
295 : );
296 :
297 2519 : destination.flush().await.with_context(|| {
298 0 : format!(
299 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
300 0 : )
301 2519 : })?;
302 :
303 2519 : fs::rename(temp_file_path, &target_file_path)
304 2198 : .await
305 2511 : .with_context(|| {
306 0 : format!(
307 0 : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
308 0 : )
309 2511 : })?;
310 :
311 2511 : if let Some(storage_metadata) = metadata {
312 : // FIXME: we must not be using metadata much, since this would forget the old metadata
313 : // for new writes? or perhaps metadata is sticky; could consider removing if it's never
314 : // used.
315 2 : let storage_metadata_path = storage_metadata_path(&target_file_path);
316 2 : fs::write(
317 2 : &storage_metadata_path,
318 2 : serde_json::to_string(&storage_metadata.0)
319 2 : .context("Failed to serialize storage metadata as json")?,
320 : )
321 2 : .await
322 2 : .with_context(|| {
323 0 : format!(
324 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
325 0 : )
326 2 : })?;
327 2509 : }
328 :
329 2511 : Ok(())
330 2519 : }
331 : }
332 :
333 : impl RemoteStorage for LocalFs {
334 148 : async fn list(
335 148 : &self,
336 148 : prefix: Option<&RemotePath>,
337 148 : mode: ListingMode,
338 148 : max_keys: Option<NonZeroU32>,
339 148 : cancel: &CancellationToken,
340 148 : ) -> Result<Listing, DownloadError> {
341 148 : let op = async {
342 148 : let mut result = Listing::default();
343 :
344 : // Filter out directories: in S3 directories don't exist, only the keys within them do.
345 148 : let keys = self
346 148 : .list_recursive(prefix)
347 522 : .await
348 148 : .map_err(DownloadError::Other)?;
349 148 : let keys = keys
350 148 : .into_iter()
351 148 : .filter(|k| {
352 114 : let path = k.with_base(&self.storage_root);
353 114 : !path.is_dir()
354 148 : })
355 148 : .collect();
356 148 :
357 148 : if let ListingMode::NoDelimiter = mode {
358 8 : result.keys = keys;
359 8 : } else {
360 140 : let mut prefixes = HashSet::new();
361 190 : for key in keys {
362 : // If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
363 50 : let relative_key = if let Some(prefix) = prefix {
364 44 : let mut prefix = prefix.clone();
365 44 : // We only strip the dirname of the prefix, so that when we strip it from the start of keys we
366 44 : // end up with full file/dir names.
367 44 : let prefix_full_local_path = prefix.with_base(&self.storage_root);
368 44 : let has_slash = prefix.0.to_string().ends_with('/');
369 44 : let strip_prefix = if prefix_full_local_path.is_dir() && has_slash {
370 28 : prefix
371 : } else {
372 16 : prefix.0.pop();
373 16 : prefix
374 : };
375 :
376 44 : RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap()
377 : } else {
378 6 : key
379 : };
380 :
381 50 : let relative_key = format!("{}", relative_key);
382 50 : if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
383 48 : let first_part = relative_key
384 48 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
385 48 : .next()
386 48 : .unwrap()
387 48 : .to_owned();
388 48 : prefixes.insert(first_part);
389 48 : } else {
390 2 : result
391 2 : .keys
392 2 : .push(RemotePath::from_string(&relative_key).unwrap());
393 2 : }
394 : }
395 140 : result.prefixes = prefixes
396 140 : .into_iter()
397 140 : .map(|s| RemotePath::from_string(&s).unwrap())
398 140 : .collect();
399 : }
400 :
401 148 : if let Some(max_keys) = max_keys {
402 0 : result.keys.truncate(max_keys.get() as usize);
403 148 : }
404 148 : Ok(result)
405 148 : };
406 :
407 148 : let timeout = async {
408 420 : tokio::time::sleep(self.timeout).await;
409 0 : Err(DownloadError::Timeout)
410 0 : };
411 :
412 148 : let cancelled = async {
413 474 : cancel.cancelled().await;
414 0 : Err(DownloadError::Cancelled)
415 0 : };
416 :
417 : tokio::select! {
418 : res = op => res,
419 : res = timeout => res,
420 : res = cancelled => res,
421 : }
422 148 : }
423 :
424 2546 : async fn upload(
425 2546 : &self,
426 2546 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
427 2546 : data_size_bytes: usize,
428 2546 : to: &RemotePath,
429 2546 : metadata: Option<StorageMetadata>,
430 2546 : cancel: &CancellationToken,
431 2546 : ) -> anyhow::Result<()> {
432 2546 : let cancel = cancel.child_token();
433 2546 :
434 2546 : let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
435 2546 : let mut op = std::pin::pin!(op);
436 :
437 : // race the upload0 to the timeout; if it goes over, do a graceful shutdown
438 2519 : let (res, timeout) = tokio::select! {
439 : res = &mut op => (res, false),
440 : _ = tokio::time::sleep(self.timeout) => {
441 : cancel.cancel();
442 : (op.await, true)
443 : }
444 : };
445 :
446 8 : match res {
447 8 : Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
448 0 : // we caused this cancel (or they happened simultaneously) -- swap it out to
449 0 : // Timeout
450 0 : Err(TimeoutOrCancel::Timeout.into())
451 : }
452 2519 : res => res,
453 : }
454 2519 : }
455 :
456 56 : async fn download(
457 56 : &self,
458 56 : from: &RemotePath,
459 56 : cancel: &CancellationToken,
460 56 : ) -> Result<Download, DownloadError> {
461 56 : let target_path = from.with_base(&self.storage_root);
462 :
463 56 : let file_metadata = file_metadata(&target_path).await?;
464 :
465 40 : let source = ReaderStream::new(
466 40 : fs::OpenOptions::new()
467 40 : .read(true)
468 40 : .open(&target_path)
469 40 : .await
470 40 : .with_context(|| {
471 0 : format!("Failed to open source file {target_path:?} to use in the download")
472 40 : })
473 40 : .map_err(DownloadError::Other)?,
474 : );
475 :
476 40 : let metadata = self
477 40 : .read_storage_metadata(&target_path)
478 2 : .await
479 40 : .map_err(DownloadError::Other)?;
480 :
481 40 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
482 40 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
483 40 :
484 40 : let etag = mock_etag(&file_metadata);
485 40 : Ok(Download {
486 40 : metadata,
487 40 : last_modified: file_metadata
488 40 : .modified()
489 40 : .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
490 40 : etag,
491 40 : download_stream: Box::pin(source),
492 : })
493 56 : }
494 :
495 14 : async fn download_byte_range(
496 14 : &self,
497 14 : from: &RemotePath,
498 14 : start_inclusive: u64,
499 14 : end_exclusive: Option<u64>,
500 14 : cancel: &CancellationToken,
501 14 : ) -> Result<Download, DownloadError> {
502 14 : if let Some(end_exclusive) = end_exclusive {
503 10 : if end_exclusive <= start_inclusive {
504 2 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
505 8 : };
506 8 : if start_inclusive == end_exclusive.saturating_sub(1) {
507 2 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
508 6 : }
509 4 : }
510 :
511 10 : let target_path = from.with_base(&self.storage_root);
512 10 : let file_metadata = file_metadata(&target_path).await?;
513 10 : let mut source = tokio::fs::OpenOptions::new()
514 10 : .read(true)
515 10 : .open(&target_path)
516 10 : .await
517 10 : .with_context(|| {
518 0 : format!("Failed to open source file {target_path:?} to use in the download")
519 10 : })
520 10 : .map_err(DownloadError::Other)?;
521 :
522 10 : let len = source
523 10 : .metadata()
524 10 : .await
525 10 : .context("query file length")
526 10 : .map_err(DownloadError::Other)?
527 10 : .len();
528 10 :
529 10 : source
530 10 : .seek(io::SeekFrom::Start(start_inclusive))
531 10 : .await
532 10 : .context("Failed to seek to the range start in a local storage file")
533 10 : .map_err(DownloadError::Other)?;
534 :
535 10 : let metadata = self
536 10 : .read_storage_metadata(&target_path)
537 2 : .await
538 10 : .map_err(DownloadError::Other)?;
539 :
540 10 : let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
541 10 : let source = ReaderStream::new(source);
542 10 :
543 10 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
544 10 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
545 10 :
546 10 : let etag = mock_etag(&file_metadata);
547 10 : Ok(Download {
548 10 : metadata,
549 10 : last_modified: file_metadata
550 10 : .modified()
551 10 : .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
552 10 : etag,
553 10 : download_stream: Box::pin(source),
554 : })
555 14 : }
556 :
557 12 : async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
558 12 : let file_path = path.with_base(&self.storage_root);
559 12 : match fs::remove_file(&file_path).await {
560 10 : Ok(()) => Ok(()),
561 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
562 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
563 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
564 2 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
565 0 : Err(e) => Err(anyhow::anyhow!(e)),
566 : }
567 12 : }
568 :
569 6 : async fn delete_objects<'a>(
570 6 : &self,
571 6 : paths: &'a [RemotePath],
572 6 : cancel: &CancellationToken,
573 6 : ) -> anyhow::Result<()> {
574 12 : for path in paths {
575 6 : self.delete(path, cancel).await?
576 : }
577 6 : Ok(())
578 6 : }
579 :
580 0 : async fn copy(
581 0 : &self,
582 0 : from: &RemotePath,
583 0 : to: &RemotePath,
584 0 : _cancel: &CancellationToken,
585 0 : ) -> anyhow::Result<()> {
586 0 : let from_path = from.with_base(&self.storage_root);
587 0 : let to_path = to.with_base(&self.storage_root);
588 0 : create_target_directory(&to_path).await?;
589 0 : fs::copy(&from_path, &to_path).await.with_context(|| {
590 0 : format!(
591 0 : "Failed to copy file from '{from_path}' to '{to_path}'",
592 0 : from_path = from_path,
593 0 : to_path = to_path
594 0 : )
595 0 : })?;
596 0 : Ok(())
597 0 : }
598 :
599 0 : async fn time_travel_recover(
600 0 : &self,
601 0 : _prefix: Option<&RemotePath>,
602 0 : _timestamp: SystemTime,
603 0 : _done_if_after: SystemTime,
604 0 : _cancel: &CancellationToken,
605 0 : ) -> Result<(), TimeTravelError> {
606 0 : Err(TimeTravelError::Unimplemented)
607 0 : }
608 :
609 0 : fn activity(&self) -> RemoteStorageActivity {
610 0 : // LocalFS has no concurrency limiting: give callers the impression that plenty of units are available
611 0 : RemoteStorageActivity {
612 0 : read_available: 16,
613 0 : read_total: 16,
614 0 : write_available: 16,
615 0 : write_total: 16,
616 0 : }
617 0 : }
618 : }
619 :
620 52 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
621 52 : path_with_suffix_extension(original_path, "metadata")
622 52 : }
623 :
624 2546 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
625 2546 : let target_dir = match target_file_path.parent() {
626 2546 : Some(parent_dir) => parent_dir,
627 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
628 : };
629 2546 : if !target_dir.exists() {
630 352 : fs::create_dir_all(target_dir).await?;
631 2194 : }
632 2544 : Ok(())
633 2544 : }
634 :
635 66 : async fn file_metadata(file_path: &Utf8Path) -> Result<std::fs::Metadata, DownloadError> {
636 66 : tokio::fs::metadata(&file_path).await.map_err(|e| {
637 16 : if e.kind() == ErrorKind::NotFound {
638 16 : DownloadError::NotFound
639 : } else {
640 0 : DownloadError::BadInput(e.into())
641 : }
642 66 : })
643 66 : }
644 :
645 : // Use mtime as stand-in for ETag. We could calculate a meaningful one by md5'ing the contents of files we
646 : // read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests
647 : // quickly, with less overhead than using a mock S3 server.
648 50 : fn mock_etag(meta: &std::fs::Metadata) -> Etag {
649 50 : let mtime = meta.modified().expect("Filesystem mtime missing");
650 50 : format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into()
651 50 : }
652 :
653 : #[cfg(test)]
654 : mod fs_tests {
655 : use super::*;
656 :
657 : use camino_tempfile::tempdir;
658 : use std::{collections::HashMap, io::Write};
659 :
660 6 : async fn read_and_check_metadata(
661 6 : storage: &LocalFs,
662 6 : remote_storage_path: &RemotePath,
663 6 : expected_metadata: Option<&StorageMetadata>,
664 6 : ) -> anyhow::Result<String> {
665 6 : let cancel = CancellationToken::new();
666 6 : let download = storage
667 6 : .download(remote_storage_path, &cancel)
668 14 : .await
669 6 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
670 6 : ensure!(
671 6 : download.metadata.as_ref() == expected_metadata,
672 0 : "Unexpected metadata returned for the downloaded file"
673 : );
674 :
675 12 : let contents = aggregate(download.download_stream).await?;
676 :
677 6 : String::from_utf8(contents).map_err(anyhow::Error::new)
678 6 : }
679 :
680 : #[tokio::test]
681 2 : async fn upload_file() -> anyhow::Result<()> {
682 2 : let (storage, cancel) = create_storage()?;
683 2 :
684 11 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
685 2 : assert_eq!(
686 6 : storage.list_all().await?,
687 2 : vec![target_path_1.clone()],
688 2 : "Should list a single file after first upload"
689 2 : );
690 2 :
691 12 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
692 2 : assert_eq!(
693 6 : list_files_sorted(&storage).await?,
694 2 : vec![target_path_1.clone(), target_path_2.clone()],
695 2 : "Should list a two different files after second upload"
696 2 : );
697 2 :
698 2 : Ok(())
699 2 : }
700 :
701 : #[tokio::test]
702 2 : async fn upload_file_negatives() -> anyhow::Result<()> {
703 2 : let (storage, cancel) = create_storage()?;
704 2 :
705 2 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
706 2 : let content = Bytes::from_static(b"12345");
707 8 : let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
708 2 :
709 2 : // Check that you get an error if the size parameter doesn't match the actual
710 2 : // size of the stream.
711 2 : storage
712 2 : .upload(content(), 0, &id, None, &cancel)
713 2 : .await
714 2 : .expect_err("upload with zero size succeeded");
715 2 : storage
716 2 : .upload(content(), 4, &id, None, &cancel)
717 3 : .await
718 2 : .expect_err("upload with too short size succeeded");
719 2 : storage
720 2 : .upload(content(), 6, &id, None, &cancel)
721 4 : .await
722 2 : .expect_err("upload with too large size succeeded");
723 2 :
724 2 : // Correct size is 5, this should succeed.
725 6 : storage.upload(content(), 5, &id, None, &cancel).await?;
726 2 :
727 2 : Ok(())
728 2 : }
729 :
730 22 : fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
731 22 : let storage_root = tempdir()?.path().to_path_buf();
732 22 : LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
733 22 : }
734 :
735 : #[tokio::test]
736 2 : async fn download_file() -> anyhow::Result<()> {
737 2 : let (storage, cancel) = create_storage()?;
738 2 : let upload_name = "upload_1";
739 11 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
740 2 :
741 8 : let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
742 2 : assert_eq!(
743 2 : dummy_contents(upload_name),
744 2 : contents,
745 2 : "We should upload and download the same contents"
746 2 : );
747 2 :
748 2 : let non_existing_path = "somewhere/else";
749 2 : match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?, &cancel).await {
750 2 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
751 2 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
752 2 : }
753 2 : Ok(())
754 2 : }
755 :
756 : #[tokio::test]
757 2 : async fn download_file_range_positive() -> anyhow::Result<()> {
758 2 : let (storage, cancel) = create_storage()?;
759 2 : let upload_name = "upload_1";
760 10 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
761 2 :
762 2 : let full_range_download_contents =
763 8 : read_and_check_metadata(&storage, &upload_target, None).await?;
764 2 : assert_eq!(
765 2 : dummy_contents(upload_name),
766 2 : full_range_download_contents,
767 2 : "Download full range should return the whole upload"
768 2 : );
769 2 :
770 2 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
771 2 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
772 2 :
773 2 : let first_part_download = storage
774 2 : .download_byte_range(
775 2 : &upload_target,
776 2 : 0,
777 2 : Some(first_part_local.len() as u64),
778 2 : &cancel,
779 2 : )
780 8 : .await?;
781 2 : assert!(
782 2 : first_part_download.metadata.is_none(),
783 2 : "No metadata should be returned for no metadata upload"
784 2 : );
785 2 :
786 2 : let first_part_remote = aggregate(first_part_download.download_stream).await?;
787 2 : assert_eq!(
788 2 : first_part_local, first_part_remote,
789 2 : "First part bytes should be returned when requested"
790 2 : );
791 2 :
792 2 : let second_part_download = storage
793 2 : .download_byte_range(
794 2 : &upload_target,
795 2 : first_part_local.len() as u64,
796 2 : Some((first_part_local.len() + second_part_local.len()) as u64),
797 2 : &cancel,
798 2 : )
799 8 : .await?;
800 2 : assert!(
801 2 : second_part_download.metadata.is_none(),
802 2 : "No metadata should be returned for no metadata upload"
803 2 : );
804 2 :
805 2 : let second_part_remote = aggregate(second_part_download.download_stream).await?;
806 2 : assert_eq!(
807 2 : second_part_local, second_part_remote,
808 2 : "Second part bytes should be returned when requested"
809 2 : );
810 2 :
811 2 : let suffix_bytes = storage
812 2 : .download_byte_range(&upload_target, 13, None, &cancel)
813 8 : .await?
814 2 : .download_stream;
815 2 : let suffix_bytes = aggregate(suffix_bytes).await?;
816 2 : let suffix = std::str::from_utf8(&suffix_bytes)?;
817 2 : assert_eq!(upload_name, suffix);
818 2 :
819 2 : let all_bytes = storage
820 2 : .download_byte_range(&upload_target, 0, None, &cancel)
821 8 : .await?
822 2 : .download_stream;
823 2 : let all_bytes = aggregate(all_bytes).await?;
824 2 : let all_bytes = std::str::from_utf8(&all_bytes)?;
825 2 : assert_eq!(dummy_contents("upload_1"), all_bytes);
826 2 :
827 2 : Ok(())
828 2 : }
829 :
830 : #[tokio::test]
831 2 : async fn download_file_range_negative() -> anyhow::Result<()> {
832 2 : let (storage, cancel) = create_storage()?;
833 2 : let upload_name = "upload_1";
834 10 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
835 2 :
836 2 : let start = 1_000_000_000;
837 2 : let end = start + 1;
838 2 : match storage
839 2 : .download_byte_range(
840 2 : &upload_target,
841 2 : start,
842 2 : Some(end), // exclusive end
843 2 : &cancel,
844 2 : )
845 2 : .await
846 2 : {
847 2 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
848 2 : Err(e) => {
849 2 : let error_string = e.to_string();
850 2 : assert!(error_string.contains("zero bytes"));
851 2 : assert!(error_string.contains(&start.to_string()));
852 2 : assert!(error_string.contains(&end.to_string()));
853 2 : }
854 2 : }
855 2 :
856 2 : let start = 10000;
857 2 : let end = 234;
858 2 : assert!(start > end, "Should test an incorrect range");
859 2 : match storage
860 2 : .download_byte_range(&upload_target, start, Some(end), &cancel)
861 2 : .await
862 2 : {
863 2 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
864 2 : Err(e) => {
865 2 : let error_string = e.to_string();
866 2 : assert!(error_string.contains("Invalid range"));
867 2 : assert!(error_string.contains(&start.to_string()));
868 2 : assert!(error_string.contains(&end.to_string()));
869 2 : }
870 2 : }
871 2 :
872 2 : Ok(())
873 2 : }
874 :
875 : #[tokio::test]
876 2 : async fn delete_file() -> anyhow::Result<()> {
877 2 : let (storage, cancel) = create_storage()?;
878 2 : let upload_name = "upload_1";
879 11 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
880 2 :
881 2 : storage.delete(&upload_target, &cancel).await?;
882 6 : assert!(storage.list_all().await?.is_empty());
883 2 :
884 2 : storage
885 2 : .delete(&upload_target, &cancel)
886 2 : .await
887 2 : .expect("Should allow deleting non-existing storage files");
888 2 :
889 2 : Ok(())
890 2 : }
891 :
892 : #[tokio::test]
893 2 : async fn file_with_metadata() -> anyhow::Result<()> {
894 2 : let (storage, cancel) = create_storage()?;
895 2 : let upload_name = "upload_1";
896 2 : let metadata = StorageMetadata(HashMap::from([
897 2 : ("one".to_string(), "1".to_string()),
898 2 : ("two".to_string(), "2".to_string()),
899 2 : ]));
900 2 : let upload_target =
901 13 : upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
902 2 :
903 2 : let full_range_download_contents =
904 10 : read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
905 2 : assert_eq!(
906 2 : dummy_contents(upload_name),
907 2 : full_range_download_contents,
908 2 : "We should upload and download the same contents"
909 2 : );
910 2 :
911 2 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
912 2 : let (first_part_local, _) = uploaded_bytes.split_at(3);
913 2 :
914 2 : let partial_download_with_metadata = storage
915 2 : .download_byte_range(
916 2 : &upload_target,
917 2 : 0,
918 2 : Some(first_part_local.len() as u64),
919 2 : &cancel,
920 2 : )
921 10 : .await?;
922 2 : let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
923 2 : assert_eq!(
924 2 : first_part_local,
925 2 : first_part_remote.as_slice(),
926 2 : "First part bytes should be returned when requested"
927 2 : );
928 2 :
929 2 : assert_eq!(
930 2 : partial_download_with_metadata.metadata,
931 2 : Some(metadata),
932 2 : "We should get the same metadata back for partial download"
933 2 : );
934 2 :
935 2 : Ok(())
936 2 : }
937 :
938 : #[tokio::test]
939 2 : async fn list() -> anyhow::Result<()> {
940 2 : // No delimiter: should recursively list everything
941 2 : let (storage, cancel) = create_storage()?;
942 11 : let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
943 2 : let child_sibling =
944 12 : upload_dummy_file(&storage, "grandparent/parent/child_sibling", None, &cancel).await?;
945 12 : let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
946 2 :
947 2 : let listing = storage
948 2 : .list(None, ListingMode::NoDelimiter, None, &cancel)
949 2 : .await?;
950 2 : assert!(listing.prefixes.is_empty());
951 2 : assert_eq!(
952 2 : listing.keys.into_iter().collect::<HashSet<_>>(),
953 2 : HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()])
954 2 : );
955 2 :
956 2 : // Delimiter: should only go one deep
957 2 : let listing = storage
958 2 : .list(None, ListingMode::WithDelimiter, None, &cancel)
959 2 : .await?;
960 2 :
961 2 : assert_eq!(
962 2 : listing.prefixes,
963 2 : [RemotePath::from_string("timelines").unwrap()].to_vec()
964 2 : );
965 2 : assert!(listing.keys.is_empty());
966 2 :
967 2 : // Delimiter & prefix with a trailing slash
968 2 : let listing = storage
969 2 : .list(
970 2 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent/").unwrap()),
971 2 : ListingMode::WithDelimiter,
972 2 : None,
973 2 : &cancel,
974 2 : )
975 2 : .await?;
976 2 : assert_eq!(
977 2 : listing.keys,
978 2 : [RemotePath::from_string("uncle").unwrap()].to_vec()
979 2 : );
980 2 : assert_eq!(
981 2 : listing.prefixes,
982 2 : [RemotePath::from_string("parent").unwrap()].to_vec()
983 2 : );
984 2 :
985 2 : // Delimiter and prefix without a trailing slash
986 2 : let listing = storage
987 2 : .list(
988 2 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
989 2 : ListingMode::WithDelimiter,
990 2 : None,
991 2 : &cancel,
992 2 : )
993 2 : .await?;
994 2 : assert_eq!(listing.keys, [].to_vec());
995 2 : assert_eq!(
996 2 : listing.prefixes,
997 2 : [RemotePath::from_string("grandparent").unwrap()].to_vec()
998 2 : );
999 2 :
1000 2 : // Delimiter and prefix that's partway through a path component
1001 2 : let listing = storage
1002 2 : .list(
1003 2 : Some(&RemotePath::from_string("timelines/some_timeline/grandp").unwrap()),
1004 2 : ListingMode::WithDelimiter,
1005 2 : None,
1006 2 : &cancel,
1007 2 : )
1008 2 : .await?;
1009 2 : assert_eq!(listing.keys, [].to_vec());
1010 2 : assert_eq!(
1011 2 : listing.prefixes,
1012 2 : [RemotePath::from_string("grandparent").unwrap()].to_vec()
1013 2 : );
1014 2 :
1015 2 : Ok(())
1016 2 : }
1017 :
1018 : #[tokio::test]
1019 2 : async fn list_part_component() -> anyhow::Result<()> {
1020 2 : // No delimiter: should recursively list everything
1021 2 : let (storage, cancel) = create_storage()?;
1022 2 :
1023 2 : // Imitates what happens in a tenant path when we have an unsharded path and a sharded path, and do a listing
1024 2 : // of the unsharded path: although there is a "directory" at the unsharded path, it should be handled as
1025 2 : // a freeform prefix.
1026 2 : let _child_a =
1027 10 : upload_dummy_file(&storage, "grandparent/tenant-01/child", None, &cancel).await?;
1028 2 : let _child_b =
1029 12 : upload_dummy_file(&storage, "grandparent/tenant/child", None, &cancel).await?;
1030 2 :
1031 2 : // Delimiter and prefix that's partway through a path component
1032 2 : let listing = storage
1033 2 : .list(
1034 2 : Some(
1035 2 : &RemotePath::from_string("timelines/some_timeline/grandparent/tenant").unwrap(),
1036 2 : ),
1037 2 : ListingMode::WithDelimiter,
1038 2 : None,
1039 2 : &cancel,
1040 2 : )
1041 2 : .await?;
1042 2 : assert_eq!(listing.keys, [].to_vec());
1043 2 :
1044 2 : let mut found_prefixes = listing.prefixes.clone();
1045 2 : found_prefixes.sort();
1046 2 : assert_eq!(
1047 2 : found_prefixes,
1048 2 : [
1049 2 : RemotePath::from_string("tenant").unwrap(),
1050 2 : RemotePath::from_string("tenant-01").unwrap(),
1051 2 : ]
1052 2 : .to_vec()
1053 2 : );
1054 2 :
1055 2 : Ok(())
1056 2 : }
1057 :
1058 : #[tokio::test]
1059 2 : async fn overwrite_shorter_file() -> anyhow::Result<()> {
1060 2 : let (storage, cancel) = create_storage()?;
1061 2 :
1062 2 : let path = RemotePath::new("does/not/matter/file".into())?;
1063 2 :
1064 2 : let body = Bytes::from_static(b"long file contents is long");
1065 2 : {
1066 2 : let len = body.len();
1067 2 : let body =
1068 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1069 8 : storage.upload(body, len, &path, None, &cancel).await?;
1070 2 : }
1071 2 :
1072 4 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1073 2 : assert_eq!(body, read);
1074 2 :
1075 2 : let shorter = Bytes::from_static(b"shorter body");
1076 2 : {
1077 2 : let len = shorter.len();
1078 2 : let body =
1079 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
1080 6 : storage.upload(body, len, &path, None, &cancel).await?;
1081 2 : }
1082 2 :
1083 4 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1084 2 : assert_eq!(shorter, read);
1085 2 : Ok(())
1086 2 : }
1087 :
1088 : #[tokio::test]
1089 2 : async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
1090 2 : let (storage, cancel) = create_storage()?;
1091 2 :
1092 2 : let path = RemotePath::new("does/not/matter/file".into())?;
1093 2 :
1094 2 : let body = Bytes::from_static(b"long file contents is long");
1095 2 : {
1096 2 : let len = body.len();
1097 2 : let body =
1098 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1099 2 : let cancel = cancel.child_token();
1100 2 : cancel.cancel();
1101 2 : let e = storage
1102 2 : .upload(body, len, &path, None, &cancel)
1103 6 : .await
1104 2 : .unwrap_err();
1105 2 :
1106 2 : assert!(TimeoutOrCancel::caused_by_cancel(&e));
1107 2 : }
1108 2 :
1109 2 : {
1110 2 : let len = body.len();
1111 2 : let body =
1112 2 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1113 5 : storage.upload(body, len, &path, None, &cancel).await?;
1114 2 : }
1115 2 :
1116 4 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1117 2 : assert_eq!(body, read);
1118 2 :
1119 2 : Ok(())
1120 2 : }
1121 :
1122 24 : async fn upload_dummy_file(
1123 24 : storage: &LocalFs,
1124 24 : name: &str,
1125 24 : metadata: Option<StorageMetadata>,
1126 24 : cancel: &CancellationToken,
1127 24 : ) -> anyhow::Result<RemotePath> {
1128 24 : let from_path = storage
1129 24 : .storage_root
1130 24 : .join("timelines")
1131 24 : .join("some_timeline")
1132 24 : .join(name);
1133 24 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
1134 :
1135 24 : let relative_path = from_path
1136 24 : .strip_prefix(&storage.storage_root)
1137 24 : .context("Failed to strip storage root prefix")
1138 24 : .and_then(RemotePath::new)
1139 24 : .with_context(|| {
1140 0 : format!(
1141 0 : "Failed to resolve remote part of path {:?} for base {:?}",
1142 0 : from_path, storage.storage_root
1143 0 : )
1144 24 : })?;
1145 :
1146 24 : let file = tokio_util::io::ReaderStream::new(file);
1147 24 :
1148 24 : storage
1149 24 : .upload(file, size, &relative_path, metadata, cancel)
1150 111 : .await?;
1151 24 : Ok(relative_path)
1152 24 : }
1153 :
1154 24 : async fn create_file_for_upload(
1155 24 : path: &Utf8Path,
1156 24 : contents: &str,
1157 24 : ) -> anyhow::Result<(fs::File, usize)> {
1158 24 : std::fs::create_dir_all(path.parent().unwrap())?;
1159 24 : let mut file_for_writing = std::fs::OpenOptions::new()
1160 24 : .write(true)
1161 24 : .create_new(true)
1162 24 : .open(path)?;
1163 24 : write!(file_for_writing, "{}", contents)?;
1164 24 : drop(file_for_writing);
1165 24 : let file_size = path.metadata()?.len() as usize;
1166 24 : Ok((
1167 24 : fs::OpenOptions::new().read(true).open(&path).await?,
1168 24 : file_size,
1169 : ))
1170 24 : }
1171 :
1172 36 : fn dummy_contents(name: &str) -> String {
1173 36 : format!("contents for {name}")
1174 36 : }
1175 :
1176 2 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
1177 6 : let mut files = storage.list_all().await?;
1178 2 : files.sort_by(|a, b| a.0.cmp(&b.0));
1179 2 : Ok(files)
1180 2 : }
1181 :
1182 22 : async fn aggregate(
1183 22 : stream: impl Stream<Item = std::io::Result<Bytes>>,
1184 22 : ) -> anyhow::Result<Vec<u8>> {
1185 22 : use futures::stream::StreamExt;
1186 22 : let mut out = Vec::new();
1187 22 : let mut stream = std::pin::pin!(stream);
1188 44 : while let Some(res) = stream.next().await {
1189 22 : out.extend_from_slice(&res?[..]);
1190 : }
1191 22 : Ok(out)
1192 22 : }
1193 : }
|