Line data Source code
1 : //! Helper functions to upload files to remote storage with a RemoteStorage
2 :
3 : use anyhow::{bail, Context};
4 : use fail::fail_point;
5 : use std::{io::ErrorKind, path::Path};
6 : use tokio::fs;
7 :
8 : use super::Generation;
9 : use crate::{
10 : config::PageServerConf,
11 : tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
12 : };
13 : use remote_storage::GenericRemoteStorage;
14 : use utils::id::{TenantId, TimelineId};
15 :
16 : use super::index::LayerFileMetadata;
17 :
18 : use tracing::info;
19 :
20 : /// Serializes and uploads the given index part data to the remote storage.
21 5246 : pub(super) async fn upload_index_part<'a>(
22 5246 : storage: &'a GenericRemoteStorage,
23 5246 : tenant_id: &TenantId,
24 5246 : timeline_id: &TimelineId,
25 5246 : generation: Generation,
26 5246 : index_part: &'a IndexPart,
27 5246 : ) -> anyhow::Result<()> {
28 0 : tracing::trace!("uploading new index part");
29 :
30 5246 : fail_point!("before-upload-index", |_| {
31 7 : bail!("failpoint before-upload-index")
32 5246 : });
33 :
34 5239 : let index_part_bytes =
35 5239 : serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
36 5239 : let index_part_size = index_part_bytes.len();
37 5239 : let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
38 5239 :
39 5239 : let remote_path = remote_index_path(tenant_id, timeline_id, generation);
40 5239 : storage
41 5239 : .upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
42 14484 : .await
43 5237 : .with_context(|| format!("upload index part for '{tenant_id} / {timeline_id}'"))
44 5244 : }
45 :
46 : /// Attempts to upload given layer files.
47 : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
48 : ///
49 : /// On an error, bumps the retries count and reschedules the entire task.
50 14125 : pub(super) async fn upload_timeline_layer<'a>(
51 14125 : conf: &'static PageServerConf,
52 14125 : storage: &'a GenericRemoteStorage,
53 14125 : source_path: &'a Path,
54 14125 : known_metadata: &'a LayerFileMetadata,
55 14125 : generation: Generation,
56 14125 : ) -> anyhow::Result<()> {
57 14125 : fail_point!("before-upload-layer", |_| {
58 4 : bail!("failpoint before-upload-layer")
59 14125 : });
60 :
61 14121 : let storage_path = remote_path(conf, source_path, generation)?;
62 14121 : let source_file_res = fs::File::open(&source_path).await;
63 14120 : let source_file = match source_file_res {
64 14120 : Ok(source_file) => source_file,
65 1 : Err(e) if e.kind() == ErrorKind::NotFound => {
66 : // If we encounter this arm, it wasn't intended, but it's also not
67 : // a big problem, if it's because the file was deleted before an
68 : // upload. However, a nonexistent file can also be indicative of
69 : // something worse, like when a file is scheduled for upload before
70 : // it has been written to disk yet.
71 1 : info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
72 1 : return Ok(());
73 : }
74 0 : Err(e) => {
75 0 : Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
76 : }
77 : };
78 :
79 14120 : let fs_size = source_file
80 14120 : .metadata()
81 13998 : .await
82 14119 : .with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
83 14119 : .len();
84 14119 :
85 14119 : let metadata_size = known_metadata.file_size();
86 14119 : if metadata_size != fs_size {
87 0 : bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
88 14119 : }
89 :
90 14119 : let fs_size = usize::try_from(fs_size)
91 14119 : .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
92 :
93 14119 : storage
94 14119 : .upload(source_file, fs_size, &storage_path, None)
95 1313870 : .await
96 14111 : .with_context(|| format!("upload layer from local path '{}'", source_path.display()))?;
97 :
98 12204 : Ok(())
99 14116 : }
|