Line data Source code
1 : //! Local filesystem acting as a remote storage.
2 : //! Multiple API users can use the same "storage" of this kind by using different storage roots.
3 : //!
4 : //! This storage used in tests, but can also be used in cases when a certain persistent
5 : //! volume is mounted to the local FS.
6 :
7 : use std::collections::HashSet;
8 : use std::io::ErrorKind;
9 : use std::num::NonZeroU32;
10 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
11 :
12 : use anyhow::{Context, bail, ensure};
13 : use bytes::Bytes;
14 : use camino::{Utf8Path, Utf8PathBuf};
15 : use futures::stream::Stream;
16 : use tokio::fs;
17 : use tokio::io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
18 : use tokio_util::io::ReaderStream;
19 : use tokio_util::sync::CancellationToken;
20 : use utils::crashsafe::path_with_suffix_extension;
21 :
22 : use super::{RemoteStorage, StorageMetadata};
23 : use crate::{
24 : Download, DownloadError, DownloadOpts, Etag, Listing, ListingMode, ListingObject,
25 : REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, TimeTravelError, TimeoutOrCancel,
26 : };
27 :
28 : const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
29 :
30 : #[derive(Debug, Clone)]
31 : pub struct LocalFs {
32 : storage_root: Utf8PathBuf,
33 : timeout: Duration,
34 : }
35 :
36 : impl LocalFs {
37 : /// Attempts to create local FS storage, along with its root directory.
38 : /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
39 1501 : pub fn new(mut storage_root: Utf8PathBuf, timeout: Duration) -> anyhow::Result<Self> {
40 1501 : if !storage_root.exists() {
41 33 : std::fs::create_dir_all(&storage_root).with_context(|| {
42 0 : format!("Failed to create all directories in the given root path {storage_root:?}")
43 33 : })?;
44 1468 : }
45 1501 : if !storage_root.is_absolute() {
46 1392 : storage_root = storage_root.canonicalize_utf8().with_context(|| {
47 0 : format!("Failed to represent path {storage_root:?} as an absolute path")
48 1392 : })?;
49 109 : }
50 :
51 1501 : Ok(Self {
52 1501 : storage_root,
53 1501 : timeout,
54 1501 : })
55 1501 : }
56 :
57 : // mirrors S3Bucket::s3_object_to_relative_path
58 405 : fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
59 405 : let relative_path = key
60 405 : .strip_prefix(&self.storage_root)
61 405 : .expect("relative path must contain storage_root as prefix");
62 405 : RemotePath(relative_path.into())
63 405 : }
64 :
65 326 : async fn read_storage_metadata(
66 326 : &self,
67 326 : file_path: &Utf8Path,
68 326 : ) -> anyhow::Result<Option<StorageMetadata>> {
69 326 : let metadata_path = storage_metadata_path(file_path);
70 326 : if metadata_path.exists() && metadata_path.is_file() {
71 6 : let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
72 0 : format!("Failed to read metadata from the local storage at '{metadata_path}'")
73 6 : })?;
74 :
75 6 : serde_json::from_str(&metadata_string)
76 6 : .with_context(|| {
77 0 : format!(
78 0 : "Failed to deserialize metadata from the local storage at '{metadata_path}'",
79 0 : )
80 6 : })
81 6 : .map(|metadata| Some(StorageMetadata(metadata)))
82 : } else {
83 320 : Ok(None)
84 : }
85 326 : }
86 :
87 : #[cfg(test)]
88 9 : async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
89 : use std::future::Future;
90 : use std::pin::Pin;
91 27 : fn get_all_files<'a, P>(
92 27 : directory_path: P,
93 27 : ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
94 27 : where
95 27 : P: AsRef<Utf8Path> + Send + Sync + 'a,
96 27 : {
97 27 : Box::pin(async move {
98 27 : let directory_path = directory_path.as_ref();
99 27 : if directory_path.exists() {
100 27 : if directory_path.is_dir() {
101 27 : let mut paths = Vec::new();
102 27 : let mut dir_contents = fs::read_dir(directory_path).await?;
103 54 : while let Some(dir_entry) = dir_contents.next_entry().await? {
104 27 : let file_type = dir_entry.file_type().await?;
105 27 : let entry_path =
106 27 : Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
107 0 : anyhow::Error::msg(format!(
108 0 : "non-Unicode path: {}",
109 0 : pb.to_string_lossy()
110 0 : ))
111 27 : })?;
112 27 : if file_type.is_symlink() {
113 0 : tracing::debug!("{entry_path:?} is a symlink, skipping")
114 27 : } else if file_type.is_dir() {
115 18 : paths.extend(get_all_files(&entry_path).await?.into_iter())
116 9 : } else {
117 9 : paths.push(entry_path);
118 9 : }
119 : }
120 27 : Ok(paths)
121 : } else {
122 0 : bail!("Path {directory_path:?} is not a directory")
123 : }
124 : } else {
125 0 : Ok(Vec::new())
126 : }
127 27 : })
128 27 : }
129 :
130 9 : Ok(get_all_files(&self.storage_root)
131 9 : .await?
132 9 : .into_iter()
133 9 : .map(|path| {
134 9 : path.strip_prefix(&self.storage_root)
135 9 : .context("Failed to strip storage root prefix")
136 9 : .and_then(RemotePath::new)
137 9 : .expect(
138 9 : "We list files for storage root, hence should be able to remote the prefix",
139 9 : )
140 9 : })
141 9 : .collect())
142 9 : }
143 :
144 : // recursively lists all files in a directory,
145 : // mirroring the `list_files` for `s3_bucket`
146 2808 : async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
147 2808 : let full_path = match folder {
148 2802 : Some(folder) => folder.with_base(&self.storage_root),
149 6 : None => self.storage_root.clone(),
150 : };
151 :
152 : // If we were given a directory, we may use it as our starting point.
153 : // Otherwise, we must go up to the first ancestor dir that exists. This is because
154 : // S3 object list prefixes can be arbitrary strings, but when reading
155 : // the local filesystem we need a directory to start calling read_dir on.
156 2808 : let mut initial_dir = full_path.clone();
157 2808 :
158 2808 : // If there's no trailing slash, we have to start looking from one above: even if
159 2808 : // `initial_dir` is a directory, we should still list any prefixes in the parent
160 2808 : // that start with the same string.
161 2808 : if !full_path.to_string().ends_with('/') {
162 1410 : initial_dir.pop();
163 1410 : }
164 :
165 : loop {
166 : // Did we make it to the root?
167 9600 : if initial_dir.parent().is_none() {
168 0 : anyhow::bail!("list_files: failed to find valid ancestor dir for {full_path}");
169 9600 : }
170 9600 :
171 9600 : match fs::metadata(initial_dir.clone()).await {
172 2808 : Ok(meta) if meta.is_dir() => {
173 2808 : // We found a directory, break
174 2808 : break;
175 : }
176 0 : Ok(_meta) => {
177 0 : // It's not a directory: strip back to the parent
178 0 : initial_dir.pop();
179 0 : }
180 6792 : Err(e) if e.kind() == ErrorKind::NotFound => {
181 6792 : // It's not a file that exists: strip the prefix back to the parent directory
182 6792 : initial_dir.pop();
183 6792 : }
184 0 : Err(e) => {
185 0 : // Unexpected I/O error
186 0 : anyhow::bail!(e)
187 : }
188 : }
189 : }
190 : // Note that Utf8PathBuf starts_with only considers full path segments, but
191 : // object prefixes are arbitrary strings, so we need the strings for doing
192 : // starts_with later.
193 2808 : let prefix = full_path.as_str();
194 2808 :
195 2808 : let mut files = vec![];
196 2808 : let mut directory_queue = vec![initial_dir];
197 5720 : while let Some(cur_folder) = directory_queue.pop() {
198 2912 : let mut entries = cur_folder.read_dir_utf8()?;
199 3542 : while let Some(Ok(entry)) = entries.next() {
200 630 : let file_name = entry.file_name();
201 630 : let full_file_name = cur_folder.join(file_name);
202 630 : if full_file_name.as_str().starts_with(prefix) {
203 405 : let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
204 405 : files.push(file_remote_path);
205 405 : if full_file_name.is_dir() {
206 104 : directory_queue.push(full_file_name);
207 301 : }
208 225 : }
209 : }
210 : }
211 :
212 2808 : Ok(files)
213 2808 : }
214 :
215 20008 : async fn upload0(
216 20008 : &self,
217 20008 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
218 20008 : data_size_bytes: usize,
219 20008 : to: &RemotePath,
220 20008 : metadata: Option<StorageMetadata>,
221 20008 : cancel: &CancellationToken,
222 20008 : ) -> anyhow::Result<()> {
223 20008 : let target_file_path = to.with_base(&self.storage_root);
224 20008 : create_target_directory(&target_file_path).await?;
225 : // We need this dance with sort of durable rename (without fsyncs)
226 : // to prevent partial uploads. This was really hit when pageserver shutdown
227 : // cancelled the upload and partial file was left on the fs
228 : // NOTE: Because temp file suffix always the same this operation is racy.
229 : // Two concurrent operations can lead to the following sequence:
230 : // T1: write(temp)
231 : // T2: write(temp) -> overwrites the content
232 : // T1: rename(temp, dst) -> succeeds
233 : // T2: rename(temp, dst) -> fails, temp no longet exists
234 : // This can be solved by supplying unique temp suffix every time, but this situation
235 : // is not normal in the first place, the error can help (and helped at least once)
236 : // to discover bugs in upper level synchronization.
237 20004 : let temp_file_path =
238 20004 : path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
239 19938 : let mut destination = io::BufWriter::new(
240 20004 : fs::OpenOptions::new()
241 20004 : .write(true)
242 20004 : .create(true)
243 20004 : .truncate(true)
244 20004 : .open(&temp_file_path)
245 20004 : .await
246 19938 : .with_context(|| {
247 0 : format!("Failed to open target fs destination at '{target_file_path}'")
248 19938 : })?,
249 : );
250 :
251 19938 : let from_size_bytes = data_size_bytes as u64;
252 19938 : let data = tokio_util::io::StreamReader::new(data);
253 19938 : let data = std::pin::pin!(data);
254 19938 : let mut buffer_to_read = data.take(from_size_bytes);
255 19938 :
256 19938 : // alternatively we could just write the bytes to a file, but local_fs is a testing utility
257 19938 : let copy = io::copy_buf(&mut buffer_to_read, &mut destination);
258 :
259 19938 : let bytes_read = tokio::select! {
260 : biased;
261 19938 : _ = cancel.cancelled() => {
262 3 : let file = destination.into_inner();
263 3 : // wait for the inflight operation(s) to complete so that there could be a next
264 3 : // attempt right away and our writes are not directed to their file.
265 3 : file.into_std().await;
266 :
267 : // TODO: leave the temp or not? leaving is probably less racy. enabled truncate at
268 : // least.
269 3 : fs::remove_file(temp_file_path).await.context("remove temp_file_path after cancellation or timeout")?;
270 3 : return Err(TimeoutOrCancel::Cancel.into());
271 : }
272 19938 : read = copy => read,
273 : };
274 :
275 19850 : let bytes_read =
276 19850 : bytes_read.with_context(|| {
277 0 : format!(
278 0 : "Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
279 0 : )
280 19850 : })?;
281 :
282 19850 : if bytes_read < from_size_bytes {
283 3 : bail!(
284 3 : "Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes"
285 3 : );
286 19847 : }
287 19847 : // Check if there is any extra data after the given size.
288 19847 : let mut from = buffer_to_read.into_inner();
289 19847 : let extra_read = from.read(&mut [1]).await?;
290 19830 : ensure!(
291 19830 : extra_read == 0,
292 6 : "Provided stream was larger than expected: expected {from_size_bytes} bytes",
293 : );
294 :
295 19824 : destination.flush().await.with_context(|| {
296 0 : format!(
297 0 : "Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
298 0 : )
299 19824 : })?;
300 :
301 19824 : fs::rename(temp_file_path, &target_file_path)
302 19824 : .await
303 19771 : .with_context(|| {
304 0 : format!(
305 0 : "Failed to upload (rename) file to the local storage at '{target_file_path}'",
306 0 : )
307 19771 : })?;
308 :
309 19771 : if let Some(storage_metadata) = metadata {
310 : // FIXME: we must not be using metadata much, since this would forget the old metadata
311 : // for new writes? or perhaps metadata is sticky; could consider removing if it's never
312 : // used.
313 3 : let storage_metadata_path = storage_metadata_path(&target_file_path);
314 3 : fs::write(
315 3 : &storage_metadata_path,
316 3 : serde_json::to_string(&storage_metadata.0)
317 3 : .context("Failed to serialize storage metadata as json")?,
318 : )
319 3 : .await
320 3 : .with_context(|| {
321 0 : format!(
322 0 : "Failed to write metadata to the local storage at '{storage_metadata_path}'",
323 0 : )
324 3 : })?;
325 45 : }
326 :
327 19771 : Ok(())
328 60 : }
329 : }
330 :
331 : impl RemoteStorage for LocalFs {
332 6 : fn list_streaming(
333 6 : &self,
334 6 : prefix: Option<&RemotePath>,
335 6 : mode: ListingMode,
336 6 : max_keys: Option<NonZeroU32>,
337 6 : cancel: &CancellationToken,
338 6 : ) -> impl Stream<Item = Result<Listing, DownloadError>> {
339 6 : let listing = self.list(prefix, mode, max_keys, cancel);
340 6 : futures::stream::once(listing)
341 6 : }
342 :
343 2808 : async fn list(
344 2808 : &self,
345 2808 : prefix: Option<&RemotePath>,
346 2808 : mode: ListingMode,
347 2808 : max_keys: Option<NonZeroU32>,
348 2808 : cancel: &CancellationToken,
349 2808 : ) -> Result<Listing, DownloadError> {
350 2808 : let op = async {
351 2808 : let mut result = Listing::default();
352 :
353 : // Filter out directories: in S3 directories don't exist, only the keys within them do.
354 2808 : let keys = self
355 2808 : .list_recursive(prefix)
356 2808 : .await
357 2808 : .map_err(DownloadError::Other)?;
358 2808 : let mut objects = Vec::with_capacity(keys.len());
359 3213 : for key in keys {
360 405 : let path = key.with_base(&self.storage_root);
361 405 : let metadata = file_metadata(&path).await;
362 0 : if let Err(DownloadError::NotFound) = metadata {
363 : // Race: if the file is deleted between listing and metadata check, ignore it.
364 0 : continue;
365 405 : }
366 405 : let metadata = metadata?;
367 405 : if metadata.is_dir() {
368 104 : continue;
369 301 : }
370 301 : objects.push(ListingObject {
371 301 : key: key.clone(),
372 301 : last_modified: metadata.modified()?,
373 301 : size: metadata.len(),
374 : });
375 : }
376 2808 : let objects = objects;
377 2808 :
378 2808 : if let ListingMode::NoDelimiter = mode {
379 1401 : result.keys = objects;
380 1401 : } else {
381 1407 : let mut prefixes = HashSet::new();
382 1581 : for object in objects {
383 174 : let key = object.key;
384 : // If the part after the prefix includes a "/", take only the first part and put it in `prefixes`.
385 174 : let relative_key = if let Some(prefix) = prefix {
386 165 : let mut prefix = prefix.clone();
387 165 : // We only strip the dirname of the prefix, so that when we strip it from the start of keys we
388 165 : // end up with full file/dir names.
389 165 : let prefix_full_local_path = prefix.with_base(&self.storage_root);
390 165 : let has_slash = prefix.0.to_string().ends_with('/');
391 165 : let strip_prefix = if prefix_full_local_path.is_dir() && has_slash {
392 141 : prefix
393 : } else {
394 24 : prefix.0.pop();
395 24 : prefix
396 : };
397 :
398 165 : RemotePath::new(key.strip_prefix(&strip_prefix).unwrap()).unwrap()
399 : } else {
400 9 : key
401 : };
402 :
403 174 : let relative_key = format!("{}", relative_key);
404 174 : if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
405 171 : let first_part = relative_key
406 171 : .split(REMOTE_STORAGE_PREFIX_SEPARATOR)
407 171 : .next()
408 171 : .unwrap()
409 171 : .to_owned();
410 171 : prefixes.insert(first_part);
411 171 : } else {
412 3 : result.keys.push(ListingObject {
413 3 : key: RemotePath::from_string(&relative_key).unwrap(),
414 3 : last_modified: object.last_modified,
415 3 : size: object.size,
416 3 : });
417 3 : }
418 : }
419 1407 : result.prefixes = prefixes
420 1407 : .into_iter()
421 1407 : .map(|s| RemotePath::from_string(&s).unwrap())
422 1407 : .collect();
423 1407 : }
424 :
425 2808 : if let Some(max_keys) = max_keys {
426 0 : result.keys.truncate(max_keys.get() as usize);
427 2808 : }
428 2808 : Ok(result)
429 2808 : };
430 :
431 2808 : let timeout = async {
432 2757 : tokio::time::sleep(self.timeout).await;
433 0 : Err(DownloadError::Timeout)
434 0 : };
435 :
436 2808 : let cancelled = async {
437 2776 : cancel.cancelled().await;
438 0 : Err(DownloadError::Cancelled)
439 0 : };
440 :
441 2808 : tokio::select! {
442 2808 : res = op => res,
443 2808 : res = timeout => res,
444 2808 : res = cancelled => res,
445 : }
446 2808 : }
447 :
448 0 : async fn list_versions(
449 0 : &self,
450 0 : _prefix: Option<&RemotePath>,
451 0 : _mode: ListingMode,
452 0 : _max_keys: Option<NonZeroU32>,
453 0 : _cancel: &CancellationToken,
454 0 : ) -> Result<crate::VersionListing, DownloadError> {
455 0 : unimplemented!()
456 : }
457 :
458 0 : async fn head_object(
459 0 : &self,
460 0 : key: &RemotePath,
461 0 : _cancel: &CancellationToken,
462 0 : ) -> Result<ListingObject, DownloadError> {
463 0 : let target_file_path = key.with_base(&self.storage_root);
464 0 : let metadata = file_metadata(&target_file_path).await?;
465 : Ok(ListingObject {
466 0 : key: key.clone(),
467 0 : last_modified: metadata.modified()?,
468 0 : size: metadata.len(),
469 : })
470 0 : }
471 :
472 20008 : async fn upload(
473 20008 : &self,
474 20008 : data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
475 20008 : data_size_bytes: usize,
476 20008 : to: &RemotePath,
477 20008 : metadata: Option<StorageMetadata>,
478 20008 : cancel: &CancellationToken,
479 20008 : ) -> anyhow::Result<()> {
480 20008 : let cancel = cancel.child_token();
481 20008 :
482 20008 : let op = self.upload0(data, data_size_bytes, to, metadata, &cancel);
483 20008 : let mut op = std::pin::pin!(op);
484 :
485 : // race the upload0 to the timeout; if it goes over, do a graceful shutdown
486 20008 : let (res, timeout) = tokio::select! {
487 20008 : res = &mut op => (res, false),
488 20008 : _ = tokio::time::sleep(self.timeout) => {
489 0 : cancel.cancel();
490 0 : (op.await, true)
491 : }
492 : };
493 :
494 12 : match res {
495 12 : Err(e) if timeout && TimeoutOrCancel::caused_by_cancel(&e) => {
496 0 : // we caused this cancel (or they happened simultaneously) -- swap it out to
497 0 : // Timeout
498 0 : Err(TimeoutOrCancel::Timeout.into())
499 : }
500 19783 : res => res,
501 : }
502 60 : }
503 :
504 4499 : async fn download(
505 4499 : &self,
506 4499 : from: &RemotePath,
507 4499 : opts: &DownloadOpts,
508 4499 : cancel: &CancellationToken,
509 4499 : ) -> Result<Download, DownloadError> {
510 4499 : let target_path = from.with_base(&self.storage_root);
511 :
512 4499 : let file_metadata = file_metadata(&target_path).await?;
513 329 : let etag = mock_etag(&file_metadata);
514 329 :
515 329 : if opts.etag.as_ref() == Some(&etag) {
516 0 : return Err(DownloadError::Unmodified);
517 329 : }
518 :
519 329 : let mut file = fs::OpenOptions::new()
520 329 : .read(true)
521 329 : .open(&target_path)
522 329 : .await
523 329 : .with_context(|| {
524 0 : format!("Failed to open source file {target_path:?} to use in the download")
525 329 : })
526 329 : .map_err(DownloadError::Other)?;
527 :
528 329 : let mut take = file_metadata.len();
529 329 : if let Some((start, end)) = opts.byte_range() {
530 15 : if start > 0 {
531 6 : file.seek(io::SeekFrom::Start(start))
532 6 : .await
533 6 : .context("Failed to seek to the range start in a local storage file")
534 6 : .map_err(DownloadError::Other)?;
535 9 : }
536 15 : if let Some(end) = end {
537 12 : take = end - start;
538 12 : }
539 314 : }
540 :
541 329 : let source = ReaderStream::new(file.take(take));
542 :
543 329 : let metadata = self
544 329 : .read_storage_metadata(&target_path)
545 329 : .await
546 326 : .map_err(DownloadError::Other)?;
547 :
548 326 : let cancel_or_timeout = crate::support::cancel_or_timeout(self.timeout, cancel.clone());
549 326 : let source = crate::support::DownloadStream::new(cancel_or_timeout, source);
550 326 :
551 326 : Ok(Download {
552 326 : metadata,
553 326 : last_modified: file_metadata
554 326 : .modified()
555 326 : .map_err(|e| DownloadError::Other(anyhow::anyhow!(e).context("Reading mtime")))?,
556 326 : etag,
557 326 : download_stream: Box::pin(source),
558 : })
559 4496 : }
560 :
561 1825 : async fn delete(&self, path: &RemotePath, _cancel: &CancellationToken) -> anyhow::Result<()> {
562 1825 : let file_path = path.with_base(&self.storage_root);
563 1825 : match fs::remove_file(&file_path).await {
564 1796 : Ok(()) => Ok(()),
565 : // The file doesn't exist. This shouldn't yield an error to mirror S3's behaviour.
566 : // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
567 : // > If there isn't a null version, Amazon S3 does not remove any objects but will still respond that the command was successful.
568 3 : Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
569 0 : Err(e) => Err(anyhow::anyhow!(e)),
570 : }
571 1799 : }
572 :
573 42 : async fn delete_objects(
574 42 : &self,
575 42 : paths: &[RemotePath],
576 42 : cancel: &CancellationToken,
577 42 : ) -> anyhow::Result<()> {
578 88 : for path in paths {
579 46 : self.delete(path, cancel).await?
580 : }
581 42 : Ok(())
582 42 : }
583 :
584 48 : fn max_keys_per_delete(&self) -> usize {
585 48 : super::MAX_KEYS_PER_DELETE_S3
586 48 : }
587 :
588 0 : async fn copy(
589 0 : &self,
590 0 : from: &RemotePath,
591 0 : to: &RemotePath,
592 0 : _cancel: &CancellationToken,
593 0 : ) -> anyhow::Result<()> {
594 0 : let from_path = from.with_base(&self.storage_root);
595 0 : let to_path = to.with_base(&self.storage_root);
596 0 : create_target_directory(&to_path).await?;
597 0 : fs::copy(&from_path, &to_path).await.with_context(|| {
598 0 : format!(
599 0 : "Failed to copy file from '{from_path}' to '{to_path}'",
600 0 : from_path = from_path,
601 0 : to_path = to_path
602 0 : )
603 0 : })?;
604 0 : Ok(())
605 0 : }
606 :
607 0 : async fn time_travel_recover(
608 0 : &self,
609 0 : _prefix: Option<&RemotePath>,
610 0 : _timestamp: SystemTime,
611 0 : _done_if_after: SystemTime,
612 0 : _cancel: &CancellationToken,
613 0 : ) -> Result<(), TimeTravelError> {
614 0 : Err(TimeTravelError::Unimplemented)
615 0 : }
616 : }
617 :
618 329 : fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
619 329 : path_with_suffix_extension(original_path, "metadata")
620 329 : }
621 :
622 20008 : async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
623 20008 : let target_dir = match target_file_path.parent() {
624 20008 : Some(parent_dir) => parent_dir,
625 0 : None => bail!("File path '{target_file_path}' has no parent directory"),
626 : };
627 20008 : if !target_dir.exists() {
628 4050 : fs::create_dir_all(target_dir).await?;
629 15958 : }
630 20004 : Ok(())
631 20004 : }
632 :
633 4904 : async fn file_metadata(file_path: &Utf8Path) -> Result<std::fs::Metadata, DownloadError> {
634 4904 : tokio::fs::metadata(&file_path).await.map_err(|e| {
635 4170 : if e.kind() == ErrorKind::NotFound {
636 4170 : DownloadError::NotFound
637 : } else {
638 0 : DownloadError::BadInput(e.into())
639 : }
640 4904 : })
641 4904 : }
642 :
643 : // Use mtime as stand-in for ETag. We could calculate a meaningful one by md5'ing the contents of files we
644 : // read, but that's expensive and the local_fs test helper's whole reason for existence is to run small tests
645 : // quickly, with less overhead than using a mock S3 server.
646 329 : fn mock_etag(meta: &std::fs::Metadata) -> Etag {
647 329 : let mtime = meta.modified().expect("Filesystem mtime missing");
648 329 : format!("{}", mtime.duration_since(UNIX_EPOCH).unwrap().as_millis()).into()
649 329 : }
650 :
651 : #[cfg(test)]
652 : mod fs_tests {
653 : use std::collections::HashMap;
654 : use std::io::Write;
655 : use std::ops::Bound;
656 :
657 : use camino_tempfile::tempdir;
658 :
659 : use super::*;
660 :
661 9 : async fn read_and_check_metadata(
662 9 : storage: &LocalFs,
663 9 : remote_storage_path: &RemotePath,
664 9 : expected_metadata: Option<&StorageMetadata>,
665 9 : ) -> anyhow::Result<String> {
666 9 : let cancel = CancellationToken::new();
667 9 : let download = storage
668 9 : .download(remote_storage_path, &DownloadOpts::default(), &cancel)
669 9 : .await
670 9 : .map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
671 9 : ensure!(
672 9 : download.metadata.as_ref() == expected_metadata,
673 0 : "Unexpected metadata returned for the downloaded file"
674 : );
675 :
676 9 : let contents = aggregate(download.download_stream).await?;
677 :
678 9 : String::from_utf8(contents).map_err(anyhow::Error::new)
679 9 : }
680 :
681 : #[tokio::test]
682 3 : async fn upload_file() -> anyhow::Result<()> {
683 3 : let (storage, cancel) = create_storage()?;
684 3 :
685 3 : let target_path_1 = upload_dummy_file(&storage, "upload_1", None, &cancel).await?;
686 3 : assert_eq!(
687 3 : storage.list_all().await?,
688 3 : vec![target_path_1.clone()],
689 3 : "Should list a single file after first upload"
690 3 : );
691 3 :
692 3 : let target_path_2 = upload_dummy_file(&storage, "upload_2", None, &cancel).await?;
693 3 : assert_eq!(
694 3 : list_files_sorted(&storage).await?,
695 3 : vec![target_path_1.clone(), target_path_2.clone()],
696 3 : "Should list a two different files after second upload"
697 3 : );
698 3 :
699 3 : Ok(())
700 3 : }
701 :
702 : #[tokio::test]
703 3 : async fn upload_file_negatives() -> anyhow::Result<()> {
704 3 : let (storage, cancel) = create_storage()?;
705 3 :
706 3 : let id = RemotePath::new(Utf8Path::new("dummy"))?;
707 3 : let content = Bytes::from_static(b"12345");
708 12 : let content = move || futures::stream::once(futures::future::ready(Ok(content.clone())));
709 3 :
710 3 : // Check that you get an error if the size parameter doesn't match the actual
711 3 : // size of the stream.
712 3 : storage
713 3 : .upload(content(), 0, &id, None, &cancel)
714 3 : .await
715 3 : .expect_err("upload with zero size succeeded");
716 3 : storage
717 3 : .upload(content(), 4, &id, None, &cancel)
718 3 : .await
719 3 : .expect_err("upload with too short size succeeded");
720 3 : storage
721 3 : .upload(content(), 6, &id, None, &cancel)
722 3 : .await
723 3 : .expect_err("upload with too large size succeeded");
724 3 :
725 3 : // Correct size is 5, this should succeed.
726 3 : storage.upload(content(), 5, &id, None, &cancel).await?;
727 3 :
728 3 : Ok(())
729 3 : }
730 :
731 33 : fn create_storage() -> anyhow::Result<(LocalFs, CancellationToken)> {
732 33 : let storage_root = tempdir()?.path().to_path_buf();
733 33 : LocalFs::new(storage_root, Duration::from_secs(120)).map(|s| (s, CancellationToken::new()))
734 33 : }
735 :
736 : #[tokio::test]
737 3 : async fn download_file() -> anyhow::Result<()> {
738 3 : let (storage, cancel) = create_storage()?;
739 3 : let upload_name = "upload_1";
740 3 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
741 3 :
742 3 : let contents = read_and_check_metadata(&storage, &upload_target, None).await?;
743 3 : assert_eq!(
744 3 : dummy_contents(upload_name),
745 3 : contents,
746 3 : "We should upload and download the same contents"
747 3 : );
748 3 :
749 3 : let non_existing_path = RemotePath::new(Utf8Path::new("somewhere/else"))?;
750 3 : match storage
751 3 : .download(&non_existing_path, &DownloadOpts::default(), &cancel)
752 3 : .await
753 3 : {
754 3 : Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
755 3 : other => panic!(
756 0 : "Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"
757 0 : ),
758 3 : }
759 3 : Ok(())
760 3 : }
761 :
762 : #[tokio::test]
763 3 : async fn download_file_range_positive() -> anyhow::Result<()> {
764 3 : let (storage, cancel) = create_storage()?;
765 3 : let upload_name = "upload_1";
766 3 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
767 3 :
768 3 : let full_range_download_contents =
769 3 : read_and_check_metadata(&storage, &upload_target, None).await?;
770 3 : assert_eq!(
771 3 : dummy_contents(upload_name),
772 3 : full_range_download_contents,
773 3 : "Download full range should return the whole upload"
774 3 : );
775 3 :
776 3 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
777 3 : let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
778 3 :
779 3 : let first_part_download = storage
780 3 : .download(
781 3 : &upload_target,
782 3 : &DownloadOpts {
783 3 : byte_end: Bound::Excluded(first_part_local.len() as u64),
784 3 : ..Default::default()
785 3 : },
786 3 : &cancel,
787 3 : )
788 3 : .await?;
789 3 : assert!(
790 3 : first_part_download.metadata.is_none(),
791 3 : "No metadata should be returned for no metadata upload"
792 3 : );
793 3 :
794 3 : let first_part_remote = aggregate(first_part_download.download_stream).await?;
795 3 : assert_eq!(
796 3 : first_part_local, first_part_remote,
797 3 : "First part bytes should be returned when requested"
798 3 : );
799 3 :
800 3 : let second_part_download = storage
801 3 : .download(
802 3 : &upload_target,
803 3 : &DownloadOpts {
804 3 : byte_start: Bound::Included(first_part_local.len() as u64),
805 3 : byte_end: Bound::Excluded(
806 3 : (first_part_local.len() + second_part_local.len()) as u64,
807 3 : ),
808 3 : ..Default::default()
809 3 : },
810 3 : &cancel,
811 3 : )
812 3 : .await?;
813 3 : assert!(
814 3 : second_part_download.metadata.is_none(),
815 3 : "No metadata should be returned for no metadata upload"
816 3 : );
817 3 :
818 3 : let second_part_remote = aggregate(second_part_download.download_stream).await?;
819 3 : assert_eq!(
820 3 : second_part_local, second_part_remote,
821 3 : "Second part bytes should be returned when requested"
822 3 : );
823 3 :
824 3 : let suffix_bytes = storage
825 3 : .download(
826 3 : &upload_target,
827 3 : &DownloadOpts {
828 3 : byte_start: Bound::Included(13),
829 3 : ..Default::default()
830 3 : },
831 3 : &cancel,
832 3 : )
833 3 : .await?
834 3 : .download_stream;
835 3 : let suffix_bytes = aggregate(suffix_bytes).await?;
836 3 : let suffix = std::str::from_utf8(&suffix_bytes)?;
837 3 : assert_eq!(upload_name, suffix);
838 3 :
839 3 : let all_bytes = storage
840 3 : .download(&upload_target, &DownloadOpts::default(), &cancel)
841 3 : .await?
842 3 : .download_stream;
843 3 : let all_bytes = aggregate(all_bytes).await?;
844 3 : let all_bytes = std::str::from_utf8(&all_bytes)?;
845 3 : assert_eq!(dummy_contents("upload_1"), all_bytes);
846 3 :
847 3 : Ok(())
848 3 : }
849 :
850 : #[tokio::test]
851 : #[should_panic(expected = "at or before start")]
852 3 : async fn download_file_range_negative() {
853 3 : let (storage, cancel) = create_storage().unwrap();
854 3 : let upload_name = "upload_1";
855 3 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel)
856 3 : .await
857 3 : .unwrap();
858 3 :
859 3 : storage
860 3 : .download(
861 3 : &upload_target,
862 3 : &DownloadOpts {
863 3 : byte_start: Bound::Included(10),
864 3 : byte_end: Bound::Excluded(10),
865 3 : ..Default::default()
866 3 : },
867 3 : &cancel,
868 3 : )
869 3 : .await
870 3 : .unwrap();
871 3 : }
872 :
873 : #[tokio::test]
874 3 : async fn delete_file() -> anyhow::Result<()> {
875 3 : let (storage, cancel) = create_storage()?;
876 3 : let upload_name = "upload_1";
877 3 : let upload_target = upload_dummy_file(&storage, upload_name, None, &cancel).await?;
878 3 :
879 3 : storage.delete(&upload_target, &cancel).await?;
880 3 : assert!(storage.list_all().await?.is_empty());
881 3 :
882 3 : storage
883 3 : .delete(&upload_target, &cancel)
884 3 : .await
885 3 : .expect("Should allow deleting non-existing storage files");
886 3 :
887 3 : Ok(())
888 3 : }
889 :
890 : #[tokio::test]
891 3 : async fn file_with_metadata() -> anyhow::Result<()> {
892 3 : let (storage, cancel) = create_storage()?;
893 3 : let upload_name = "upload_1";
894 3 : let metadata = StorageMetadata(HashMap::from([
895 3 : ("one".to_string(), "1".to_string()),
896 3 : ("two".to_string(), "2".to_string()),
897 3 : ]));
898 3 : let upload_target =
899 3 : upload_dummy_file(&storage, upload_name, Some(metadata.clone()), &cancel).await?;
900 3 :
901 3 : let full_range_download_contents =
902 3 : read_and_check_metadata(&storage, &upload_target, Some(&metadata)).await?;
903 3 : assert_eq!(
904 3 : dummy_contents(upload_name),
905 3 : full_range_download_contents,
906 3 : "We should upload and download the same contents"
907 3 : );
908 3 :
909 3 : let uploaded_bytes = dummy_contents(upload_name).into_bytes();
910 3 : let (first_part_local, _) = uploaded_bytes.split_at(3);
911 3 :
912 3 : let partial_download_with_metadata = storage
913 3 : .download(
914 3 : &upload_target,
915 3 : &DownloadOpts {
916 3 : byte_end: Bound::Excluded(first_part_local.len() as u64),
917 3 : ..Default::default()
918 3 : },
919 3 : &cancel,
920 3 : )
921 3 : .await?;
922 3 : let first_part_remote = aggregate(partial_download_with_metadata.download_stream).await?;
923 3 : assert_eq!(
924 3 : first_part_local,
925 3 : first_part_remote.as_slice(),
926 3 : "First part bytes should be returned when requested"
927 3 : );
928 3 :
929 3 : assert_eq!(
930 3 : partial_download_with_metadata.metadata,
931 3 : Some(metadata),
932 3 : "We should get the same metadata back for partial download"
933 3 : );
934 3 :
935 3 : Ok(())
936 3 : }
937 :
938 : #[tokio::test]
939 3 : async fn list() -> anyhow::Result<()> {
940 3 : // No delimiter: should recursively list everything
941 3 : let (storage, cancel) = create_storage()?;
942 3 : let child = upload_dummy_file(&storage, "grandparent/parent/child", None, &cancel).await?;
943 3 : let child_sibling =
944 3 : upload_dummy_file(&storage, "grandparent/parent/child_sibling", None, &cancel).await?;
945 3 : let uncle = upload_dummy_file(&storage, "grandparent/uncle", None, &cancel).await?;
946 3 :
947 3 : let listing = storage
948 3 : .list(None, ListingMode::NoDelimiter, None, &cancel)
949 3 : .await?;
950 3 : assert!(listing.prefixes.is_empty());
951 3 : assert_eq!(
952 3 : listing
953 3 : .keys
954 3 : .into_iter()
955 9 : .map(|o| o.key)
956 3 : .collect::<HashSet<_>>(),
957 3 : HashSet::from([uncle.clone(), child.clone(), child_sibling.clone()])
958 3 : );
959 3 :
960 3 : // Delimiter: should only go one deep
961 3 : let listing = storage
962 3 : .list(None, ListingMode::WithDelimiter, None, &cancel)
963 3 : .await?;
964 3 :
965 3 : assert_eq!(
966 3 : listing.prefixes,
967 3 : [RemotePath::from_string("timelines").unwrap()].to_vec()
968 3 : );
969 3 : assert!(listing.keys.is_empty());
970 3 :
971 3 : // Delimiter & prefix with a trailing slash
972 3 : let listing = storage
973 3 : .list(
974 3 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent/").unwrap()),
975 3 : ListingMode::WithDelimiter,
976 3 : None,
977 3 : &cancel,
978 3 : )
979 3 : .await?;
980 3 : assert_eq!(
981 3 : listing.keys.into_iter().map(|o| o.key).collect::<Vec<_>>(),
982 3 : [RemotePath::from_string("uncle").unwrap()].to_vec()
983 3 : );
984 3 : assert_eq!(
985 3 : listing.prefixes,
986 3 : [RemotePath::from_string("parent").unwrap()].to_vec()
987 3 : );
988 3 :
989 3 : // Delimiter and prefix without a trailing slash
990 3 : let listing = storage
991 3 : .list(
992 3 : Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
993 3 : ListingMode::WithDelimiter,
994 3 : None,
995 3 : &cancel,
996 3 : )
997 3 : .await?;
998 3 : assert_eq!(listing.keys, vec![]);
999 3 : assert_eq!(
1000 3 : listing.prefixes,
1001 3 : [RemotePath::from_string("grandparent").unwrap()].to_vec()
1002 3 : );
1003 3 :
1004 3 : // Delimiter and prefix that's partway through a path component
1005 3 : let listing = storage
1006 3 : .list(
1007 3 : Some(&RemotePath::from_string("timelines/some_timeline/grandp").unwrap()),
1008 3 : ListingMode::WithDelimiter,
1009 3 : None,
1010 3 : &cancel,
1011 3 : )
1012 3 : .await?;
1013 3 : assert_eq!(listing.keys, vec![]);
1014 3 : assert_eq!(
1015 3 : listing.prefixes,
1016 3 : [RemotePath::from_string("grandparent").unwrap()].to_vec()
1017 3 : );
1018 3 :
1019 3 : Ok(())
1020 3 : }
1021 :
1022 : #[tokio::test]
1023 3 : async fn list_part_component() -> anyhow::Result<()> {
1024 3 : // No delimiter: should recursively list everything
1025 3 : let (storage, cancel) = create_storage()?;
1026 3 :
1027 3 : // Imitates what happens in a tenant path when we have an unsharded path and a sharded path, and do a listing
1028 3 : // of the unsharded path: although there is a "directory" at the unsharded path, it should be handled as
1029 3 : // a freeform prefix.
1030 3 : let _child_a =
1031 3 : upload_dummy_file(&storage, "grandparent/tenant-01/child", None, &cancel).await?;
1032 3 : let _child_b =
1033 3 : upload_dummy_file(&storage, "grandparent/tenant/child", None, &cancel).await?;
1034 3 :
1035 3 : // Delimiter and prefix that's partway through a path component
1036 3 : let listing = storage
1037 3 : .list(
1038 3 : Some(
1039 3 : &RemotePath::from_string("timelines/some_timeline/grandparent/tenant").unwrap(),
1040 3 : ),
1041 3 : ListingMode::WithDelimiter,
1042 3 : None,
1043 3 : &cancel,
1044 3 : )
1045 3 : .await?;
1046 3 : assert_eq!(listing.keys, vec![]);
1047 3 :
1048 3 : let mut found_prefixes = listing.prefixes.clone();
1049 3 : found_prefixes.sort();
1050 3 : assert_eq!(
1051 3 : found_prefixes,
1052 3 : [
1053 3 : RemotePath::from_string("tenant").unwrap(),
1054 3 : RemotePath::from_string("tenant-01").unwrap(),
1055 3 : ]
1056 3 : .to_vec()
1057 3 : );
1058 3 :
1059 3 : Ok(())
1060 3 : }
1061 :
1062 : #[tokio::test]
1063 3 : async fn overwrite_shorter_file() -> anyhow::Result<()> {
1064 3 : let (storage, cancel) = create_storage()?;
1065 3 :
1066 3 : let path = RemotePath::new("does/not/matter/file".into())?;
1067 3 :
1068 3 : let body = Bytes::from_static(b"long file contents is long");
1069 3 : {
1070 3 : let len = body.len();
1071 3 : let body =
1072 3 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1073 3 : storage.upload(body, len, &path, None, &cancel).await?;
1074 3 : }
1075 3 :
1076 3 : let read = aggregate(
1077 3 : storage
1078 3 : .download(&path, &DownloadOpts::default(), &cancel)
1079 3 : .await?
1080 3 : .download_stream,
1081 3 : )
1082 3 : .await?;
1083 3 : assert_eq!(body, read);
1084 3 :
1085 3 : let shorter = Bytes::from_static(b"shorter body");
1086 3 : {
1087 3 : let len = shorter.len();
1088 3 : let body =
1089 3 : futures::stream::once(futures::future::ready(std::io::Result::Ok(shorter.clone())));
1090 3 : storage.upload(body, len, &path, None, &cancel).await?;
1091 3 : }
1092 3 :
1093 3 : let read = aggregate(
1094 3 : storage
1095 3 : .download(&path, &DownloadOpts::default(), &cancel)
1096 3 : .await?
1097 3 : .download_stream,
1098 3 : )
1099 3 : .await?;
1100 3 : assert_eq!(shorter, read);
1101 3 : Ok(())
1102 3 : }
1103 :
1104 : #[tokio::test]
1105 3 : async fn cancelled_upload_can_later_be_retried() -> anyhow::Result<()> {
1106 3 : let (storage, cancel) = create_storage()?;
1107 3 :
1108 3 : let path = RemotePath::new("does/not/matter/file".into())?;
1109 3 :
1110 3 : let body = Bytes::from_static(b"long file contents is long");
1111 3 : {
1112 3 : let len = body.len();
1113 3 : let body =
1114 3 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1115 3 : let cancel = cancel.child_token();
1116 3 : cancel.cancel();
1117 3 : let e = storage
1118 3 : .upload(body, len, &path, None, &cancel)
1119 3 : .await
1120 3 : .unwrap_err();
1121 3 :
1122 3 : assert!(TimeoutOrCancel::caused_by_cancel(&e));
1123 3 : }
1124 3 :
1125 3 : {
1126 3 : let len = body.len();
1127 3 : let body =
1128 3 : futures::stream::once(futures::future::ready(std::io::Result::Ok(body.clone())));
1129 3 : storage.upload(body, len, &path, None, &cancel).await?;
1130 3 : }
1131 3 :
1132 3 : let read = aggregate(
1133 3 : storage
1134 3 : .download(&path, &DownloadOpts::default(), &cancel)
1135 3 : .await?
1136 3 : .download_stream,
1137 3 : )
1138 3 : .await?;
1139 3 : assert_eq!(body, read);
1140 3 :
1141 3 : Ok(())
1142 3 : }
1143 :
1144 36 : async fn upload_dummy_file(
1145 36 : storage: &LocalFs,
1146 36 : name: &str,
1147 36 : metadata: Option<StorageMetadata>,
1148 36 : cancel: &CancellationToken,
1149 36 : ) -> anyhow::Result<RemotePath> {
1150 36 : let from_path = storage
1151 36 : .storage_root
1152 36 : .join("timelines")
1153 36 : .join("some_timeline")
1154 36 : .join(name);
1155 36 : let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
1156 :
1157 36 : let relative_path = from_path
1158 36 : .strip_prefix(&storage.storage_root)
1159 36 : .context("Failed to strip storage root prefix")
1160 36 : .and_then(RemotePath::new)
1161 36 : .with_context(|| {
1162 0 : format!(
1163 0 : "Failed to resolve remote part of path {:?} for base {:?}",
1164 0 : from_path, storage.storage_root
1165 0 : )
1166 36 : })?;
1167 :
1168 36 : let file = tokio_util::io::ReaderStream::new(file);
1169 36 :
1170 36 : storage
1171 36 : .upload(file, size, &relative_path, metadata, cancel)
1172 36 : .await?;
1173 36 : Ok(relative_path)
1174 36 : }
1175 :
1176 36 : async fn create_file_for_upload(
1177 36 : path: &Utf8Path,
1178 36 : contents: &str,
1179 36 : ) -> anyhow::Result<(fs::File, usize)> {
1180 36 : std::fs::create_dir_all(path.parent().unwrap())?;
1181 36 : let mut file_for_writing = std::fs::OpenOptions::new()
1182 36 : .write(true)
1183 36 : .create_new(true)
1184 36 : .open(path)?;
1185 36 : write!(file_for_writing, "{}", contents)?;
1186 36 : drop(file_for_writing);
1187 36 : let file_size = path.metadata()?.len() as usize;
1188 36 : Ok((
1189 36 : fs::OpenOptions::new().read(true).open(&path).await?,
1190 36 : file_size,
1191 : ))
1192 36 : }
1193 :
1194 54 : fn dummy_contents(name: &str) -> String {
1195 54 : format!("contents for {name}")
1196 54 : }
1197 :
1198 3 : async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
1199 3 : let mut files = storage.list_all().await?;
1200 3 : files.sort_by(|a, b| a.0.cmp(&b.0));
1201 3 : Ok(files)
1202 3 : }
1203 :
1204 33 : async fn aggregate(
1205 33 : stream: impl Stream<Item = std::io::Result<Bytes>>,
1206 33 : ) -> anyhow::Result<Vec<u8>> {
1207 : use futures::stream::StreamExt;
1208 33 : let mut out = Vec::new();
1209 33 : let mut stream = std::pin::pin!(stream);
1210 66 : while let Some(res) = stream.next().await {
1211 33 : out.extend_from_slice(&res?[..]);
1212 : }
1213 33 : Ok(out)
1214 33 : }
1215 : }
|