Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::{
8 : collections::HashSet,
9 : io::ErrorKind,
10 : num::NonZeroU32,
11 : time::{Duration, SystemTime, UNIX_EPOCH},
12 : };
13 :
14 : use anyhow::{bail, ensure, Context};
15 : use bytes::Bytes;
16 : use camino::{Utf8Path, Utf8PathBuf};
17 : use futures::stream::Stream;
18 : use tokio::{
19 : fs,
20 : io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
21 : };
22 : use tokio_util::{io::ReaderStream, sync::CancellationToken};
23 : use utils::crashsafe::path_with_suffix_extension;
24 :
25 : use crate::{
26 : Download, DownloadError, Listing, ListingMode, ListingObject, RemotePath, TimeTravelError,
27 : TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
28 : };
29 :
30 : use super::{RemoteStorage, StorageMetadata};
31 : use crate::Etag;
32 :
33 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
34 :
35 : #[derive(Debug, Clone)]
36 : pub struct LocalFs {
37 : storage_root: Utf8PathBuf,
38 : timeout: Duration,
39 : }
40 :
41 : impl LocalFs {
42 : /// Attempts to create local FS storage, along with its root directory.
43 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
44 234 : pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
45 234 : if !storage_root.exists() {
46 44 : std::fs::create_dir_all(&storage_root).with_context(|| {
47 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
48 44 : })?;
49 190 : }
50 234 : if !storage_root.is_absolute() {
51 174 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
52 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
53 174 : })?;
54 60 : }
55 :
56 234 : Ok(Self {
57 234 : storage_root,
58 234 : timeout,
59 234 : })
60 234 : }
61 :
62 : // mirrors S3Bucket::s3_object_to_relative_path
63 182 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
64 182 : let relative_path = key
65 182 : .strip_prefix(&self.storage_root)
66 182 : .expect("relative path must contain storage_root as prefix");
67 182 : RemotePath(relative_path.into())
68 182 : }
69 :
70 72 : async fn read_storage_metadata(
71 72 : &self,
72 72 : file_path: &Utf8Path,
73 72 : ) -> anyhow::Result<Option<StorageMetadata>> {
74 72 : let metadata_path = storage_metadata_path(file_path);
75 72 : if metadata_path.exists() && metadata_path.is_file() {
76 8 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
77 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
78 8 : })?;
79 :
80 8 : serde_json::from_str(&metadata_string)
81 8 : .with_context(|| {
82 0 : format!(
83 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
84 0 : )
85 8 : })
86 8 : .map(|metadata| Some(StorageMetadata(metadata)))
87 : } else {
88 64 : Ok(None)
89 : }
90 72 : }
91 :
92 : #[cfg(test)]
93 12 : async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
94 12 : use std::{future::Future, pin::Pin};
95 36 : fn get_all_files<'a, P>(
96 36 : directory_path: P,
97 36 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
98 36 : where
99 36 : P: AsRef<Utf8Path> + Send + Sync + 'a,
100 36 : {
101 36 : Box::pin(async move {
102 36 : let directory_path = directory_path.as_ref();
103 36 : if directory_path.exists() {
104 36 : if directory_path.is_dir() {
105 36 : let mut paths = Vec::new();
106 36 : let mut dir_contents = fs::read_dir(directory_path).await?;
107 72 : while let Some(dir_entry) = dir_contents.next_entry().await? {
108 36 : let file_type = dir_entry.file_type().await?;
109 36 : let entry_path =
110 36 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
111 0 : anyhow::Error::msg(format!(
112 0 : "non-Unicode path: {}",
113 0 : pb.to_string_lossy()
114 0 : ))
115 36 : })?;
116 36 : if file_type.is_symlink() {
117 12 : tracing::debug!("{entry_path:?} is a symlink, skipping")
118 36 : } else if file_type.is_dir() {
119 36 : paths.extend(get_all_files(&entry_path).await?.into_iter())
120 12 : } else {
121 12 : paths.push(entry_path);
122 12 : }
123 12 : }
124 36 : Ok(paths)
125 12 : } else {
126 12 : bail!("Path {directory_path:?} is not a directory")
127 12 : }
128 12 : } else {
129 12 : Ok(Vec::new())
130 12 : }
131 36 : })
132 36 : }
133 12 :
134 12 : Ok(get_all_files(&self.storage_root)
135 36 : .await?
136 12 : .into_iter()
137 12 : .map(|path| {
138 12 : path.strip_prefix(&self.storage_root)
139 12 : .context("Failed to strip storage root prefix")
140 12 : .and_then(RemotePath::new)
141 12 : .expect(
142 12 : "We list files for storage root, hence should be able to remote the prefix",
143 12 : )
144 12 : })
145 12 : .collect())
146 12 : }
147 :
148 : // recursively lists all files in a directory,
149 : // mirroring the `list_files` for `s3_bucket`
150 204 : async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
151 204 : let full_path = match folder {
152 196 : Some(folder) => folder.with_base(&self.storage_root),
153 8 : None => self.storage_root.clone(),
154 : };
155 :
156 : // If we were given a directory, we may use it as our starting point.
157 : // Otherwise, we must go up to the first ancestor dir that exists. This is because
158 : // S3 object list prefixes can be arbitrary strings, but when reading
159 : // the local filesystem we need a directory to start calling read_dir on.
160 204 : let mut initial_dir = full_path.clone();
161 204 :
162 204 : // If there's no trailing slash, we have to start looking from one above: even if
163 204 : // `initial_dir` is a directory, we should still list any prefixes in the parent
164 204 : // that start with the same string.
165 204 : if !full_path.to_string().ends_with('/') {
166 26 : initial_dir.pop();
167 178 : }
168 :
169 714 : loop {
170 714 : // Did we make it to the root?
171 714 : if initial_dir.parent().is_none() {
172 0 : anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
173 714 : }
174 714 :
175 714 : match fs::metadata(initial_dir.clone()).await {
176 204 : Ok(meta) if meta.is_dir() => {
177 204 : // We found a directory, break
178 204 : break;
179 : }
180 0 : Ok(_meta) => {
181 0 : // It's not a directory: strip back to the parent
182 0 : initial_dir.pop();
183 0 : }
184 510 : Err(e) if e.kind() == ErrorKind::NotFound => {
185 510 : // It's not a file that exists: strip the prefix back to the parent directory
186 510 : initial_dir.pop();
187 510 : }
188 0 : Err(e) => {
189 0 : // Unexpected I/O error
190 0 : anyhow::bail!(e)
191 : }
192 : }
193 : }
194 : // Note that Utf8PathBuf starts_with only considers full path segments, but
195 : // object prefixes are arbitrary strings, so we need the strings for doing
196 : // starts_with later.
197 204 : let prefix = full_path.as_str();
198 204 :
199 204 : let mut files = vec![];
200 204 : let mut directory_queue = vec![initial_dir];
201 482 : while let Some(cur_folder) = directory_queue.pop() {
202 278 : let mut entries = cur_folder.read_dir_utf8()?;
203 757 : while let Some(Ok(entry)) = entries.next() {
204 479 : let file_name = entry.file_name();
205 479 : let full_file_name = cur_folder.join(file_name);
206 479 : if full_file_name.as_str().starts_with(prefix) {
207 182 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
208 182 : files.push(file_remote_path);
209 182 : if full_file_name.is_dir() {
210 74 : directory_queue.push(full_file_name);
211 108 : }
212 297 : }
213 : }
214 : }
215 :
216 204 : Ok(files)
217 204 : }
218 :
219 2863 : async fn upload0(
220 2863 : &self,
221 2863 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
222 2863 : data_size_bytes: usize,
223 2863 : to: &RemotePath,
224 2863 : metadata: Option<StorageMetadata>,
225 2863 : cancel: &CancellationToken,
226 2863 : ) -> anyhow::Result<()> {
227 2863 : let target_file_path = to.with_base(&self.storage_root);
228 2863 : create_target_directory(&target_file_path).await?;
229 : // We need this dance with sort of durable rename (without fsyncs)
230 : // to prevent partial uploads. This was really hit when pageserver shutdown
231 : // cancelled the upload and partial file was left on the fs
232 : // NOTE: Because temp file suffix always the same this operation is racy.
233 : // Two concurrent operations can lead to the following sequence:
234 : // T1: write(temp)
235 : // T2: write(temp) -> overwrites the content
236 : // T1: rename(temp, dst) -> succeeds
237 : // T2: rename(temp, dst) -> fails, temp no longet exists
238 : // This can be solved by supplying unique temp suffix every time, but this situation
239 : // is not normal in the first place, the error can help (and helped at least once)
240 : // to discover bugs in upper level synchronization.
241 2860 : let temp_file_path =
242 2860 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
243 2756 : let mut destination = io::BufWriter::new(
244 2860 : fs::OpenOptions::new()
245 2860 : .write(true)
246 2860 : .create(true)
247 2860 : .truncate(true)
248 2860 : .open(&temp_file_path)
249 2715 : .await
250 2756 : .with_context(|| {
251 0 : format!("Failed to open target fs destination at '{target_file_path}'")
252 2756 : })?,
253 : );
254 :
255 2756 : let from_size_bytes = data_size_bytes as u64;
256 2756 : let data = tokio_util::io::StreamReader::new(data);
257 2756 : let data = std::pin::pin!(data);
258 2756 : let mut buffer_to_read = data.take(from_size_bytes);
259 2756 :
260 2756 : // alternatively we could just write the bytes to a file, but local_fs is a testing utility
261 2756 : let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
262 :
263 2745 : let bytes_read = tokio::select! {
264 : biased;
265 : _ = cancel.cancelled() => {
266 : let file = destination.into_inner();
267 : // wait for the inflight operation(s) to complete so that there could be a next
268 : // attempt right away and our writes are not directed to their file.
269 : file.into_std().await;
270 :
271 : // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
272 : // least.
273 : fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
274 : return Err(TimeoutOrCancel::Cancel.into());
275 : }
276 : read = copy => read,
277 : };
278 :
279 2745 : let bytes_read =
280 2745 : bytes_read.with_context(|| {
281 0 : format!(
282 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
283 0 : )
284 2745 : })?;
285 :
286 2745 : if bytes_read < from_size_bytes {
287 4 : bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
288 2741 : }
289 2741 : // Check if there is any extra data after the given size.
290 2741 : let mut from = buffer_to_read.into_inner();
291 2741 : let extra_read = from.read(&mut [1]).await?;
292 2740 : ensure!(
293 2740 : extra_read == 0,
294 8 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
295 : );
296 :
297 2732 : destination.flush().await.with_context(|| {
298 0 : format!(
299 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
300 0 : )
301 2732 : })?;
302 :
303 2732 : fs::rename(temp_file_path, &target_file_path)
304 2678 : .await
305 2731 : .with_context(|| {
306 0 : format!(
307 0 : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
308 0 : )
309 2731 : })?;
310 :
311 2731 : if let Some(storage_metadata) = metadata {
312 : // FIXME: we must not be using metadata much, since this would forget the old metadata
313 : // for new writes? or perhaps metadata is sticky; could consider removing if it's never
314 : // used.
315 4 : let storage_metadata_path = storage_metadata_path(&target_file_path);
316 4 : fs::write(
317 4 : &storage_metadata_path,
318 4 : serde_json::to_string(&storage_metadata.0)
319 4 : .context("Failed to serialize storage metadata as json")?,
320 : )
321 4 : .await
322 4 : .with_context(|| {
323 0 : format!(
324 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
325 0 : )
326 4 : })?;
327 2727 : }
328 :
329 2731 : Ok(())
330 2747 : }
331 : }
332 :
333 : impl RemoteStorage for LocalFs {
334 0 : fn list_streaming(
335 0 : &self,
336 0 : prefix: Option<&RemotePath>,
337 0 : mode: ListingMode,
338 0 : max_keys: Option<NonZeroU32>,
339 0 : cancel: &CancellationToken,
340 0 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
341 0 : let listing = self.list(prefix, mode, max_keys, cancel);
342 0 : futures::stream::once(listing)
343 0 : }
344 :
345 204 : async fn list(
346 204 : &self,
347 204 : prefix: Option<&RemotePath>,
348 204 : mode: ListingMode,
349 204 : max_keys: Option<NonZeroU32>,
350 204 : cancel: &CancellationToken,
351 204 : ) -> Result<Listing, DownloadError> {
352 204 : let op = async {
353 204 : let mut result = Listing::default();
354 :
355 : // Filter out directories: in S3 directories don't exist, only the keys within them do.
356 204 : let keys = self
357 204 : .list_recursive(prefix)
358 713 : .await
359 204 : .map_err(DownloadError::Other)?;
360 204 : let objects = keys
361 204 : .into_iter()
362 204 : .filter_map(|k| {
363 182 : let path = k.with_base(&self.storage_root);
364 182 : if path.is_dir() {
365 74 : None
366 : } else {
367 108 : Some(ListingObject {
368 108 : key: k.clone(),
369 108 : // LocalFs is just for testing, so just specify a dummy time
370 108 : last_modified: SystemTime::now(),
371 108 : size: 0,
372 108 : })
373 : }
374 204 : })
375 204 : .collect();
376 204 :
377 204 : if let ListingMode::NoDelimiter = mode {
378 10 : result.keys = objects;
379 10 : } else {
380 194 : let mut prefixes = HashSet::new();
381 272 : for object in objects {
382 78 : let key = object.key;
383 : // If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
384 78 : let relative_key = if let Some(prefix) = prefix {
385 66 : let mut prefix = prefix.clone();
386 66 : // We only strip the dirname of the prefix, so that when we strip it from the start of keys we
387 66 : // end up with full file/dir names.
388 66 : let prefix_full_local_path = prefix.with_base(&self.storage_root);
389 66 : let has_slash = prefix.0.to_string().ends_with('/');
390 66 : let strip_prefix = if prefix_full_local_path.is_dir() && has_slash {
391 34 : prefix
392 : } else {
393 32 : prefix.0.pop();
394 32 : prefix
395 : };
396 :
397 66 : RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap()
398 : } else {
399 12 : key
400 : };
401 :
402 78 : let relative_key = format!("{}", relative_key);
403 78 : if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
404 74 : let first_part = relative_key
405 74 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
406 74 : .next()
407 74 : .unwrap()
408 74 : .to_owned();
409 74 : prefixes.insert(first_part);
410 74 : } else {
411 4 : result.keys.push(ListingObject {
412 4 : key: RemotePath::from_string(&relative_key).unwrap(),
413 4 : // LocalFs is just for testing
414 4 : last_modified: SystemTime::now(),
415 4 : size: 0,
416 4 : });
417 4 : }
418 : }
419 194 : result.prefixes = prefixes
420 194 : .into_iter()
421 194 : .map(|s| RemotePath::from_string(&s).unwrap())
422 194 : .collect();
423 194 : }
424 :
425 204 : if let Some(max_keys) = max_keys {
426 0 : result.keys.truncate(max_keys.get() as usize);
427 204 : }
428 204 : Ok(result)
429 204 : };
430 :
431 204 : let timeout = async {
432 584 : tokio::time::sleep(self.timeout).await;
433 0 : Err(DownloadError::Timeout)
434 0 : };
435 :
436 204 : let cancelled = async {
437 649 : cancel.cancelled().await;
438 0 : Err(DownloadError::Cancelled)
439 0 : };
440 :
441 : tokio::select! {
442 : res = op => res,
443 : res = timeout => res,
444 : res = cancelled => res,
445 : }
446 204 : }
447 :
448 2863 : async fn upload(
449 2863 : &self,
450 2863 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
451 2863 : data_size_bytes: usize,
452 2863 : to: &RemotePath,
453 2863 : metadata: Option<StorageMetadata>,
454 2863 : cancel: &CancellationToken,
455 2863 : ) -> anyhow::Result<()> {
456 2863 : let cancel = cancel.child_token();
457 2863 :
458 2863 : let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
459 2863 : let mut op = std::pin::pin!(op);
460 :
461 : // race the upload0 to the timeout; if it goes over, do a graceful shutdown
462 2747 : let (res, timeout) = tokio::select! {
463 : res = &mut op => (res, false),
464 : _ = tokio::time::sleep(self.timeout) => {
465 : cancel.cancel();
466 : (op.await, true)
467 : }
468 : };
469 :
470 16 : match res {
471 16 : Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
472 0 : // we caused this cancel (or they happened simultaneously) -- swap it out to
473 0 : // Timeout
474 0 : Err(TimeoutOrCancel::Timeout.into())
475 : }
476 2747 : res => res,
477 : }
478 2747 : }
479 :
480 70 : async fn download(
481 70 : &self,
482 70 : from: &RemotePath,
483 70 : cancel: &CancellationToken,
484 70 : ) -> Result<Download, DownloadError> {
485 70 : let target_path = from.with_base(&self.storage_root);
486 :
487 70 : let file_metadata = file_metadata(&target_path).await?;
488 :
489 52 : let source = ReaderStream::new(
490 52 : fs::OpenOptions::new()
491 52 : .read(true)
492 52 : .open(&target_path)
493 49 : .await
494 52 : .with_context(|| {
495 0 : format!("Failed to open source file {target_path:?} to use in the download")
496 52 : })
497 52 : .map_err(DownloadError::Other)?,
498 : );
499 :
500 52 : let metadata = self
501 52 : .read_storage_metadata(&target_path)
502 4 : .await
503 52 : .map_err(DownloadError::Other)?;
504 :
505 52 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
506 52 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
507 52 :
508 52 : let etag = mock_etag(&file_metadata);
509 52 : Ok(Download {
510 52 : metadata,
511 52 : last_modified: file_metadata
512 52 : .modified()
513 52 : .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
514 52 : etag,
515 52 : download_stream: Box::pin(source),
516 : })
517 70 : }
518 :
519 28 : async fn download_byte_range(
520 28 : &self,
521 28 : from: &RemotePath,
522 28 : start_inclusive: u64,
523 28 : end_exclusive: Option<u64>,
524 28 : cancel: &CancellationToken,
525 28 : ) -> Result<Download, DownloadError> {
526 28 : if let Some(end_exclusive) = end_exclusive {
527 20 : if end_exclusive <= start_inclusive {
528 4 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
529 16 : };
530 16 : if start_inclusive == end_exclusive.saturating_sub(1) {
531 4 : return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
532 12 : }
533 8 : }
534 :
535 20 : let target_path = from.with_base(&self.storage_root);
536 20 : let file_metadata = file_metadata(&target_path).await?;
537 20 : let mut source = tokio::fs::OpenOptions::new()
538 20 : .read(true)
539 20 : .open(&target_path)
540 19 : .await
541 20 : .with_context(|| {
542 0 : format!("Failed to open source file {target_path:?} to use in the download")
543 20 : })
544 20 : .map_err(DownloadError::Other)?;
545 :
546 20 : let len = source
547 20 : .metadata()
548 18 : .await
549 20 : .context("query file length")
550 20 : .map_err(DownloadError::Other)?
551 20 : .len();
552 20 :
553 20 : source
554 20 : .seek(io::SeekFrom::Start(start_inclusive))
555 19 : .await
556 20 : .context("Failed to seek to the range start in a local storage file")
557 20 : .map_err(DownloadError::Other)?;
558 :
559 20 : let metadata = self
560 20 : .read_storage_metadata(&target_path)
561 4 : .await
562 20 : .map_err(DownloadError::Other)?;
563 :
564 20 : let source = source.take(end_exclusive.unwrap_or(len) - start_inclusive);
565 20 : let source = ReaderStream::new(source);
566 20 :
567 20 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
568 20 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
569 20 :
570 20 : let etag = mock_etag(&file_metadata);
571 20 : Ok(Download {
572 20 : metadata,
573 20 : last_modified: file_metadata
574 20 : .modified()
575 20 : .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
576 20 : etag,
577 20 : download_stream: Box::pin(source),
578 : })
579 28 : }
580 :
581 16 : async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
582 16 : let file_path = path.with_base(&self.storage_root);
583 16 : match fs::remove_file(&file_path).await {
584 12 : Ok(()) => Ok(()),
585 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
586 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
587 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
588 4 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
589 0 : Err(e) => Err(anyhow::anyhow!(e)),
590 : }
591 16 : }
592 :
593 6 : async fn delete_objects<'a>(
594 6 : &self,
595 6 : paths: &'a [RemotePath],
596 6 : cancel: &CancellationToken,
597 6 : ) -> anyhow::Result<()> {
598 12 : for path in paths {
599 6 : self.delete(path, cancel).await?
600 : }
601 6 : Ok(())
602 6 : }
603 :
604 0 : async fn copy(
605 0 : &self,
606 0 : from: &RemotePath,
607 0 : to: &RemotePath,
608 0 : _cancel: &CancellationToken,
609 0 : ) -> anyhow::Result<()> {
610 0 : let from_path = from.with_base(&self.storage_root);
611 0 : let to_path = to.with_base(&self.storage_root);
612 0 : create_target_directory(&to_path).await?;
613 0 : fs::copy(&from_path, &to_path).await.with_context(|| {
614 0 : format!(
615 0 : "Failed to copy file from '{from_path}' to '{to_path}'",
616 0 : from_path = from_path,
617 0 : to_path = to_path
618 0 : )
619 0 : })?;
620 0 : Ok(())
621 0 : }
622 :
623 0 : async fn time_travel_recover(
624 0 : &self,
625 0 : _prefix: Option<&RemotePath>,
626 0 : _timestamp: SystemTime,
627 0 : _done_if_after: SystemTime,
628 0 : _cancel: &CancellationToken,
629 0 : ) -> Result<(), TimeTravelError> {
630 0 : Err(TimeTravelError::Unimplemented)
631 0 : }
632 : }
633 :
634 76 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
635 76 : path_with_suffix_extension(original_path, "metadata")
636 76 : }
637 :
638 2863 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
639 2863 : let target_dir = match target_file_path.parent() {
640 2863 : Some(parent_dir) => parent_dir,
641 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
642 : };
643 2863 : if !target_dir.exists() {
644 397 : fs::create_dir_all(target_dir).await?;
645 2466 : }
646 2860 : Ok(())
647 2860 : }
648 :
649 90 : async fn file_metadata(file_path: &Utf8Path) -> Result<std::fs::Metadata, DownloadError> {
650 90 : tokio::fs::metadata(&file_path).await.map_err(|e| {
651 18 : if e.kind() == ErrorKind::NotFound {
652 18 : DownloadError::NotFound
653 : } else {
654 0 : DownloadError::BadInput(e.into())
655 : }
656 90 : })
657 90 : }
658 :
659 : // Use mtime as stand-in for ETag. We could calculate a meaningful one by md5'ing the contents of files we
660 : // read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests
661 : // quickly, with less overhead than using a mock S3 server.
662 72 : fn mock_etag(meta: &std::fs::Metadata) -> Etag {
663 72 : let mtime = meta.modified().expect("Filesystem mtime missing");
664 72 : format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into()
665 72 : }
666 :
667 : #[cfg(test)]
668 : mod fs_tests {
669 : use super::*;
670 :
671 : use camino_tempfile::tempdir;
672 : use std::{collections::HashMap, io::Write};
673 :
674 12 : async fn read_and_check_metadata(
675 12 : storage: &LocalFs,
676 12 : remote_storage_path: &RemotePath,
677 12 : expected_metadata: Option<&StorageMetadata>,
678 12 : ) -> anyhow::Result<String> {
679 12 : let cancel = CancellationToken::new();
680 12 : let download = storage
681 12 : .download(remote_storage_path, &cancel)
682 28 : .await
683 12 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
684 12 : ensure!(
685 12 : download.metadata.as_ref() == expected_metadata,
686 0 : "Unexpected metadata returned for the downloaded file"
687 : );
688 :
689 23 : let contents = aggregate(download.download_stream).await?;
690 :
691 12 : String::from_utf8(contents).map_err(anyhow::Error::new)
692 12 : }
693 :
694 : #[tokio::test]
695 4 : async fn upload_file() -> anyhow::Result<()> {
696 4 : let (storage, cancel) = create_storage()?;
697 4 :
698 24 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
699 4 : assert_eq!(
700 12 : storage.list_all().await?,
701 4 : vec![target_path_1.clone()],
702 4 : "Should list a single file after first upload"
703 4 : );
704 4 :
705 24 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
706 4 : assert_eq!(
707 12 : list_files_sorted(&storage).await?,
708 4 : vec![target_path_1.clone(), target_path_2.clone()],
709 4 : "Should list a two different files after second upload"
710 4 : );
711 4 :
712 4 : Ok(())
713 4 : }
714 :
715 : #[tokio::test]
716 4 : async fn upload_file_negatives() -> anyhow::Result<()> {
717 4 : let (storage, cancel) = create_storage()?;
718 4 :
719 4 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
720 4 : let content = Bytes::from_static(b"12345");
721 16 : let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
722 4 :
723 4 : // Check that you get an error if the size parameter doesn't match the actual
724 4 : // size of the stream.
725 4 : storage
726 4 : .upload(content(), 0, &id, None, &cancel)
727 4 : .await
728 4 : .expect_err("upload with zero size succeeded");
729 4 : storage
730 4 : .upload(content(), 4, &id, None, &cancel)
731 8 : .await
732 4 : .expect_err("upload with too short size succeeded");
733 4 : storage
734 4 : .upload(content(), 6, &id, None, &cancel)
735 8 : .await
736 4 : .expect_err("upload with too large size succeeded");
737 4 :
738 4 : // Correct size is 5, this should succeed.
739 12 : storage.upload(content(), 5, &id, None, &cancel).await?;
740 4 :
741 4 : Ok(())
742 4 : }
743 :
744 44 : fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
745 44 : let storage_root = tempdir()?.path().to_path_buf();
746 44 : LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
747 44 : }
748 :
749 : #[tokio::test]
750 4 : async fn download_file() -> anyhow::Result<()> {
751 4 : let (storage, cancel) = create_storage()?;
752 4 : let upload_name = "upload_1";
753 24 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
754 4 :
755 16 : let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
756 4 : assert_eq!(
757 4 : dummy_contents(upload_name),
758 4 : contents,
759 4 : "We should upload and download the same contents"
760 4 : );
761 4 :
762 4 : let non_existing_path = "somewhere/else";
763 4 : match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?, &cancel).await {
764 4 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
765 4 : other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
766 4 : }
767 4 : Ok(())
768 4 : }
769 :
770 : #[tokio::test]
771 4 : async fn download_file_range_positive() -> anyhow::Result<()> {
772 4 : let (storage, cancel) = create_storage()?;
773 4 : let upload_name = "upload_1";
774 24 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
775 4 :
776 4 : let full_range_download_contents =
777 15 : read_and_check_metadata(&storage, &upload_target, None).await?;
778 4 : assert_eq!(
779 4 : dummy_contents(upload_name),
780 4 : full_range_download_contents,
781 4 : "Download full range should return the whole upload"
782 4 : );
783 4 :
784 4 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
785 4 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
786 4 :
787 4 : let first_part_download = storage
788 4 : .download_byte_range(
789 4 : &upload_target,
790 4 : 0,
791 4 : Some(first_part_local.len() as u64),
792 4 : &cancel,
793 4 : )
794 13 : .await?;
795 4 : assert!(
796 4 : first_part_download.metadata.is_none(),
797 4 : "No metadata should be returned for no metadata upload"
798 4 : );
799 4 :
800 4 : let first_part_remote = aggregate(first_part_download.download_stream).await?;
801 4 : assert_eq!(
802 4 : first_part_local, first_part_remote,
803 4 : "First part bytes should be returned when requested"
804 4 : );
805 4 :
806 4 : let second_part_download = storage
807 4 : .download_byte_range(
808 4 : &upload_target,
809 4 : first_part_local.len() as u64,
810 4 : Some((first_part_local.len() + second_part_local.len()) as u64),
811 4 : &cancel,
812 4 : )
813 16 : .await?;
814 4 : assert!(
815 4 : second_part_download.metadata.is_none(),
816 4 : "No metadata should be returned for no metadata upload"
817 4 : );
818 4 :
819 4 : let second_part_remote = aggregate(second_part_download.download_stream).await?;
820 4 : assert_eq!(
821 4 : second_part_local, second_part_remote,
822 4 : "Second part bytes should be returned when requested"
823 4 : );
824 4 :
825 4 : let suffix_bytes = storage
826 4 : .download_byte_range(&upload_target, 13, None, &cancel)
827 14 : .await?
828 4 : .download_stream;
829 4 : let suffix_bytes = aggregate(suffix_bytes).await?;
830 4 : let suffix = std::str::from_utf8(&suffix_bytes)?;
831 4 : assert_eq!(upload_name, suffix);
832 4 :
833 4 : let all_bytes = storage
834 4 : .download_byte_range(&upload_target, 0, None, &cancel)
835 16 : .await?
836 4 : .download_stream;
837 4 : let all_bytes = aggregate(all_bytes).await?;
838 4 : let all_bytes = std::str::from_utf8(&all_bytes)?;
839 4 : assert_eq!(dummy_contents("upload_1"), all_bytes);
840 4 :
841 4 : Ok(())
842 4 : }
843 :
844 : #[tokio::test]
845 4 : async fn download_file_range_negative() -> anyhow::Result<()> {
846 4 : let (storage, cancel) = create_storage()?;
847 4 : let upload_name = "upload_1";
848 24 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
849 4 :
850 4 : let start = 1_000_000_000;
851 4 : let end = start + 1;
852 4 : match storage
853 4 : .download_byte_range(
854 4 : &upload_target,
855 4 : start,
856 4 : Some(end), // exclusive end
857 4 : &cancel,
858 4 : )
859 4 : .await
860 4 : {
861 4 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
862 4 : Err(e) => {
863 4 : let error_string = e.to_string();
864 4 : assert!(error_string.contains("zero bytes"));
865 4 : assert!(error_string.contains(&start.to_string()));
866 4 : assert!(error_string.contains(&end.to_string()));
867 4 : }
868 4 : }
869 4 :
870 4 : let start = 10000;
871 4 : let end = 234;
872 4 : assert!(start > end, "Should test an incorrect range");
873 4 : match storage
874 4 : .download_byte_range(&upload_target, start, Some(end), &cancel)
875 4 : .await
876 4 : {
877 4 : Ok(_) => panic!("Should not allow downloading wrong ranges"),
878 4 : Err(e) => {
879 4 : let error_string = e.to_string();
880 4 : assert!(error_string.contains("Invalid range"));
881 4 : assert!(error_string.contains(&start.to_string()));
882 4 : assert!(error_string.contains(&end.to_string()));
883 4 : }
884 4 : }
885 4 :
886 4 : Ok(())
887 4 : }
888 :
889 : #[tokio::test]
890 4 : async fn delete_file() -> anyhow::Result<()> {
891 4 : let (storage, cancel) = create_storage()?;
892 4 : let upload_name = "upload_1";
893 21 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
894 4 :
895 4 : storage.delete(&upload_target, &cancel).await?;
896 12 : assert!(storage.list_all().await?.is_empty());
897 4 :
898 4 : storage
899 4 : .delete(&upload_target, &cancel)
900 4 : .await
901 4 : .expect("Should allow deleting non-existing storage files");
902 4 :
903 4 : Ok(())
904 4 : }
905 :
906 : #[tokio::test]
907 4 : async fn file_with_metadata() -> anyhow::Result<()> {
908 4 : let (storage, cancel) = create_storage()?;
909 4 : let upload_name = "upload_1";
910 4 : let metadata = StorageMetadata(HashMap::from([
911 4 : ("one".to_string(), "1".to_string()),
912 4 : ("two".to_string(), "2".to_string()),
913 4 : ]));
914 4 : let upload_target =
915 28 : upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
916 4 :
917 4 : let full_range_download_contents =
918 20 : read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
919 4 : assert_eq!(
920 4 : dummy_contents(upload_name),
921 4 : full_range_download_contents,
922 4 : "We should upload and download the same contents"
923 4 : );
924 4 :
925 4 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
926 4 : let (first_part_local, _) = uploaded_bytes.split_at(3);
927 4 :
928 4 : let partial_download_with_metadata = storage
929 4 : .download_byte_range(
930 4 : &upload_target,
931 4 : 0,
932 4 : Some(first_part_local.len() as u64),
933 4 : &cancel,
934 4 : )
935 20 : .await?;
936 4 : let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
937 4 : assert_eq!(
938 4 : first_part_local,
939 4 : first_part_remote.as_slice(),
940 4 : "First part bytes should be returned when requested"
941 4 : );
942 4 :
943 4 : assert_eq!(
944 4 : partial_download_with_metadata.metadata,
945 4 : Some(metadata),
946 4 : "We should get the same metadata back for partial download"
947 4 : );
948 4 :
949 4 : Ok(())
950 4 : }
951 :
952 : #[tokio::test]
953 4 : async fn list() -> anyhow::Result<()> {
954 4 : // No delimiter: should recursively list everything
955 4 : let (storage, cancel) = create_storage()?;
956 24 : let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
957 4 : let child_sibling =
958 24 : upload_dummy_file(&storage, "grandparent/parent/child_sibling", None, &cancel).await?;
959 24 : let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
960 4 :
961 4 : let listing = storage
962 4 : .list(None, ListingMode::NoDelimiter, None, &cancel)
963 4 : .await?;
964 4 : assert!(listing.prefixes.is_empty());
965 4 : assert_eq!(
966 4 : listing
967 4 : .keys
968 4 : .into_iter()
969 12 : .map(|o| o.key)
970 4 : .collect::<HashSet<_>>(),
971 4 : HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()])
972 4 : );
973 4 :
974 4 : // Delimiter: should only go one deep
975 4 : let listing = storage
976 4 : .list(None, ListingMode::WithDelimiter, None, &cancel)
977 4 : .await?;
978 4 :
979 4 : assert_eq!(
980 4 : listing.prefixes,
981 4 : [RemotePath::from_string("timelines").unwrap()].to_vec()
982 4 : );
983 4 : assert!(listing.keys.is_empty());
984 4 :
985 4 : // Delimiter & prefix with a trailing slash
986 4 : let listing = storage
987 4 : .list(
988 4 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent/").unwrap()),
989 4 : ListingMode::WithDelimiter,
990 4 : None,
991 4 : &cancel,
992 4 : )
993 4 : .await?;
994 4 : assert_eq!(
995 4 : listing.keys.into_iter().map(|o| o.key).collect::<Vec<_>>(),
996 4 : [RemotePath::from_string("uncle").unwrap()].to_vec()
997 4 : );
998 4 : assert_eq!(
999 4 : listing.prefixes,
1000 4 : [RemotePath::from_string("parent").unwrap()].to_vec()
1001 4 : );
1002 4 :
1003 4 : // Delimiter and prefix without a trailing slash
1004 4 : let listing = storage
1005 4 : .list(
1006 4 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
1007 4 : ListingMode::WithDelimiter,
1008 4 : None,
1009 4 : &cancel,
1010 4 : )
1011 4 : .await?;
1012 4 : assert_eq!(listing.keys, vec![]);
1013 4 : assert_eq!(
1014 4 : listing.prefixes,
1015 4 : [RemotePath::from_string("grandparent").unwrap()].to_vec()
1016 4 : );
1017 4 :
1018 4 : // Delimiter and prefix that's partway through a path component
1019 4 : let listing = storage
1020 4 : .list(
1021 4 : Some(&RemotePath::from_string("timelines/some_timeline/grandp").unwrap()),
1022 4 : ListingMode::WithDelimiter,
1023 4 : None,
1024 4 : &cancel,
1025 4 : )
1026 4 : .await?;
1027 4 : assert_eq!(listing.keys, vec![]);
1028 4 : assert_eq!(
1029 4 : listing.prefixes,
1030 4 : [RemotePath::from_string("grandparent").unwrap()].to_vec()
1031 4 : );
1032 4 :
1033 4 : Ok(())
1034 4 : }
1035 :
1036 : #[tokio::test]
1037 4 : async fn list_part_component() -> anyhow::Result<()> {
1038 4 : // No delimiter: should recursively list everything
1039 4 : let (storage, cancel) = create_storage()?;
1040 4 :
1041 4 : // Imitates what happens in a tenant path when we have an unsharded path and a sharded path, and do a listing
1042 4 : // of the unsharded path: although there is a "directory" at the unsharded path, it should be handled as
1043 4 : // a freeform prefix.
1044 4 : let _child_a =
1045 23 : upload_dummy_file(&storage, "grandparent/tenant-01/child", None, &cancel).await?;
1046 4 : let _child_b =
1047 24 : upload_dummy_file(&storage, "grandparent/tenant/child", None, &cancel).await?;
1048 4 :
1049 4 : // Delimiter and prefix that's partway through a path component
1050 4 : let listing = storage
1051 4 : .list(
1052 4 : Some(
1053 4 : &RemotePath::from_string("timelines/some_timeline/grandparent/tenant").unwrap(),
1054 4 : ),
1055 4 : ListingMode::WithDelimiter,
1056 4 : None,
1057 4 : &cancel,
1058 4 : )
1059 4 : .await?;
1060 4 : assert_eq!(listing.keys, vec![]);
1061 4 :
1062 4 : let mut found_prefixes = listing.prefixes.clone();
1063 4 : found_prefixes.sort();
1064 4 : assert_eq!(
1065 4 : found_prefixes,
1066 4 : [
1067 4 : RemotePath::from_string("tenant").unwrap(),
1068 4 : RemotePath::from_string("tenant-01").unwrap(),
1069 4 : ]
1070 4 : .to_vec()
1071 4 : );
1072 4 :
1073 4 : Ok(())
1074 4 : }
1075 :
1076 : #[tokio::test]
1077 4 : async fn overwrite_shorter_file() -> anyhow::Result<()> {
1078 4 : let (storage, cancel) = create_storage()?;
1079 4 :
1080 4 : let path = RemotePath::new("does/not/matter/file".into())?;
1081 4 :
1082 4 : let body = Bytes::from_static(b"long file contents is long");
1083 4 : {
1084 4 : let len = body.len();
1085 4 : let body =
1086 4 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1087 15 : storage.upload(body, len, &path, None, &cancel).await?;
1088 4 : }
1089 4 :
1090 8 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1091 4 : assert_eq!(body, read);
1092 4 :
1093 4 : let shorter = Bytes::from_static(b"shorter body");
1094 4 : {
1095 4 : let len = shorter.len();
1096 4 : let body =
1097 4 : futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
1098 12 : storage.upload(body, len, &path, None, &cancel).await?;
1099 4 : }
1100 4 :
1101 8 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1102 4 : assert_eq!(shorter, read);
1103 4 : Ok(())
1104 4 : }
1105 :
1106 : #[tokio::test]
1107 4 : async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
1108 4 : let (storage, cancel) = create_storage()?;
1109 4 :
1110 4 : let path = RemotePath::new("does/not/matter/file".into())?;
1111 4 :
1112 4 : let body = Bytes::from_static(b"long file contents is long");
1113 4 : {
1114 4 : let len = body.len();
1115 4 : let body =
1116 4 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1117 4 : let cancel = cancel.child_token();
1118 4 : cancel.cancel();
1119 4 : let e = storage
1120 4 : .upload(body, len, &path, None, &cancel)
1121 12 : .await
1122 4 : .unwrap_err();
1123 4 :
1124 4 : assert!(TimeoutOrCancel::caused_by_cancel(&e));
1125 4 : }
1126 4 :
1127 4 : {
1128 4 : let len = body.len();
1129 4 : let body =
1130 4 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1131 12 : storage.upload(body, len, &path, None, &cancel).await?;
1132 4 : }
1133 4 :
1134 8 : let read = aggregate(storage.download(&path, &cancel).await?.download_stream).await?;
1135 4 : assert_eq!(body, read);
1136 4 :
1137 4 : Ok(())
1138 4 : }
1139 :
1140 48 : async fn upload_dummy_file(
1141 48 : storage: &LocalFs,
1142 48 : name: &str,
1143 48 : metadata: Option<StorageMetadata>,
1144 48 : cancel: &CancellationToken,
1145 48 : ) -> anyhow::Result<RemotePath> {
1146 48 : let from_path = storage
1147 48 : .storage_root
1148 48 : .join("timelines")
1149 48 : .join("some_timeline")
1150 48 : .join(name);
1151 48 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
1152 :
1153 48 : let relative_path = from_path
1154 48 : .strip_prefix(&storage.storage_root)
1155 48 : .context("Failed to strip storage root prefix")
1156 48 : .and_then(RemotePath::new)
1157 48 : .with_context(|| {
1158 0 : format!(
1159 0 : "Failed to resolve remote part of path {:?} for base {:?}",
1160 0 : from_path, storage.storage_root
1161 0 : )
1162 48 : })?;
1163 :
1164 48 : let file = tokio_util::io::ReaderStream::new(file);
1165 48 :
1166 48 : storage
1167 48 : .upload(file, size, &relative_path, metadata, cancel)
1168 241 : .await?;
1169 48 : Ok(relative_path)
1170 48 : }
1171 :
1172 48 : async fn create_file_for_upload(
1173 48 : path: &Utf8Path,
1174 48 : contents: &str,
1175 48 : ) -> anyhow::Result<(fs::File, usize)> {
1176 48 : std::fs::create_dir_all(path.parent().unwrap())?;
1177 48 : let mut file_for_writing = std::fs::OpenOptions::new()
1178 48 : .write(true)
1179 48 : .create_new(true)
1180 48 : .open(path)?;
1181 48 : write!(file_for_writing, "{}", contents)?;
1182 48 : drop(file_for_writing);
1183 48 : let file_size = path.metadata()?.len() as usize;
1184 48 : Ok((
1185 48 : fs::OpenOptions::new().read(true).open(&path).await?,
1186 48 : file_size,
1187 : ))
1188 48 : }
1189 :
1190 72 : fn dummy_contents(name: &str) -> String {
1191 72 : format!("contents for {name}")
1192 72 : }
1193 :
1194 4 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
1195 12 : let mut files = storage.list_all().await?;
1196 4 : files.sort_by(|a, b| a.0.cmp(&b.0));
1197 4 : Ok(files)
1198 4 : }
1199 :
1200 44 : async fn aggregate(
1201 44 : stream: impl Stream<Item = std::io::Result<Bytes>>,
1202 44 : ) -> anyhow::Result<Vec<u8>> {
1203 44 : use futures::stream::StreamExt;
1204 44 : let mut out = Vec::new();
1205 44 : let mut stream = std::pin::pin!(stream);
1206 88 : while let Some(res) = stream.next().await {
1207 44 : out.extend_from_slice(&res?[..]);
1208 : }
1209 44 : Ok(out)
1210 44 : }
1211 : }
|