TLA Line data Source code
1 : //! Helper functions to upload files to remote storage with a RemoteStorage
2 :
3 : use anyhow::{bail, Context};
4 : use camino::Utf8Path;
5 : use fail::fail_point;
6 : use std::io::ErrorKind;
7 : use tokio::fs;
8 :
9 : use super::Generation;
10 : use crate::{
11 : config::PageServerConf,
12 : tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
13 : };
14 : use remote_storage::GenericRemoteStorage;
15 : use utils::id::{TenantId, TimelineId};
16 :
17 : use super::index::LayerFileMetadata;
18 :
19 : use tracing::info;
20 :
21 : /// Serializes and uploads the given index part data to the remote storage.
22 CBC 6974 : pub(super) async fn upload_index_part<'a>(
23 6974 : storage: &'a GenericRemoteStorage,
24 6974 : tenant_id: &TenantId,
25 6974 : timeline_id: &TimelineId,
26 6974 : generation: Generation,
27 6974 : index_part: &'a IndexPart,
28 6974 : ) -> anyhow::Result<()> {
29 UBC 0 : tracing::trace!("uploading new index part");
30 :
31 CBC 6974 : fail_point!("before-upload-index", |_| {
32 4 : bail!("failpoint before-upload-index")
33 6974 : });
34 6970 : pausable_failpoint!("before-upload-index-pausable");
35 :
36 6966 : let index_part_bytes =
37 6966 : serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
38 6966 : let index_part_size = index_part_bytes.len();
39 6966 : let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
40 6966 :
41 6966 : let remote_path = remote_index_path(tenant_id, timeline_id, generation);
42 6966 : storage
43 6966 : .upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
44 20298 : .await
45 6965 : .with_context(|| format!("upload index part for '{tenant_id} / {timeline_id}'"))
46 6969 : }
47 :
48 : /// Attempts to upload given layer files.
49 : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
50 : ///
51 : /// On an error, bumps the retries count and reschedules the entire task.
52 20377 : pub(super) async fn upload_timeline_layer<'a>(
53 20377 : conf: &'static PageServerConf,
54 20377 : storage: &'a GenericRemoteStorage,
55 20377 : source_path: &'a Utf8Path,
56 20377 : known_metadata: &'a LayerFileMetadata,
57 20377 : generation: Generation,
58 20377 : ) -> anyhow::Result<()> {
59 20377 : fail_point!("before-upload-layer", |_| {
60 5 : bail!("failpoint before-upload-layer")
61 20377 : });
62 :
63 20372 : let storage_path = remote_path(conf, source_path, generation)?;
64 20372 : let source_file_res = fs::File::open(&source_path).await;
65 20370 : let source_file = match source_file_res {
66 20370 : Ok(source_file) => source_file,
67 1 : Err(e) if e.kind() == ErrorKind::NotFound => {
68 : // If we encounter this arm, it wasn't intended, but it's also not
69 : // a big problem, if it's because the file was deleted before an
70 : // upload. However, a nonexistent file can also be indicative of
71 : // something worse, like when a file is scheduled for upload before
72 : // it has been written to disk yet.
73 1 : info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
74 1 : return Ok(());
75 : }
76 UBC 0 : Err(e) => {
77 0 : Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
78 : }
79 : };
80 :
81 CBC 20370 : let fs_size = source_file
82 20370 : .metadata()
83 20159 : .await
84 20370 : .with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
85 20370 : .len();
86 20370 :
87 20370 : let metadata_size = known_metadata.file_size();
88 20370 : if metadata_size != fs_size {
89 UBC 0 : bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
90 CBC 20370 : }
91 :
92 20370 : let fs_size = usize::try_from(fs_size)
93 20370 : .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
94 :
95 20370 : storage
96 20370 : .upload(source_file, fs_size, &storage_path, None)
97 3206052 : .await
98 20295 : .with_context(|| format!("upload layer from local path '{source_path}'"))?;
99 :
100 18380 : Ok(())
101 20301 : }
|