TLA Line data Source code
1 : //! Helper functions to upload files to remote storage with a RemoteStorage
2 :
3 : use anyhow::{bail, Context};
4 : use camino::Utf8Path;
5 : use fail::fail_point;
6 : use pageserver_api::shard::TenantShardId;
7 : use std::io::{ErrorKind, SeekFrom};
8 : use tokio::fs::{self, File};
9 : use tokio::io::AsyncSeekExt;
10 : use tokio_util::sync::CancellationToken;
11 :
12 : use super::Generation;
13 : use crate::{
14 : config::PageServerConf,
15 : tenant::remote_timeline_client::{
16 : index::IndexPart, remote_index_path, remote_initdb_archive_path, remote_path,
17 : upload_cancellable,
18 : },
19 : };
20 : use remote_storage::GenericRemoteStorage;
21 : use utils::id::{TenantId, TimelineId};
22 :
23 : use super::index::LayerFileMetadata;
24 :
25 : use tracing::info;
26 :
27 : /// Serializes and uploads the given index part data to the remote storage.
28 CBC 5959 : pub(super) async fn upload_index_part<'a>(
29 5959 : storage: &'a GenericRemoteStorage,
30 5959 : tenant_shard_id: &TenantShardId,
31 5959 : timeline_id: &TimelineId,
32 5959 : generation: Generation,
33 5959 : index_part: &'a IndexPart,
34 5959 : cancel: &CancellationToken,
35 5959 : ) -> anyhow::Result<()> {
36 UBC 0 : tracing::trace!("uploading new index part");
37 :
38 CBC 5959 : fail_point!("before-upload-index", |_| {
39 4 : bail!("failpoint before-upload-index")
40 5959 : });
41 5955 : pausable_failpoint!("before-upload-index-pausable");
42 :
43 5952 : let index_part_bytes = index_part
44 5952 : .to_s3_bytes()
45 5952 : .context("serialize index part file into bytes")?;
46 5952 : let index_part_size = index_part_bytes.len();
47 5952 : let index_part_bytes = bytes::Bytes::from(index_part_bytes);
48 5952 :
49 5952 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
50 5952 : upload_cancellable(
51 5952 : cancel,
52 5952 : storage.upload_storage_object(
53 5952 : futures::stream::once(futures::future::ready(Ok(index_part_bytes))),
54 5952 : index_part_size,
55 5952 : &remote_path,
56 5952 : ),
57 5952 : )
58 13563 : .await
59 5951 : .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
60 5955 : }
61 :
62 : /// Attempts to upload given layer files.
63 : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
64 : ///
65 : /// On an error, bumps the retries count and reschedules the entire task.
66 21499 : pub(super) async fn upload_timeline_layer<'a>(
67 21499 : conf: &'static PageServerConf,
68 21499 : storage: &'a GenericRemoteStorage,
69 21499 : source_path: &'a Utf8Path,
70 21499 : known_metadata: &'a LayerFileMetadata,
71 21499 : generation: Generation,
72 21499 : cancel: &CancellationToken,
73 21499 : ) -> anyhow::Result<()> {
74 21499 : fail_point!("before-upload-layer", |_| {
75 5 : bail!("failpoint before-upload-layer")
76 21499 : });
77 :
78 21494 : pausable_failpoint!("before-upload-layer-pausable");
79 :
80 21493 : let storage_path = remote_path(conf, source_path, generation)?;
81 21493 : let source_file_res = fs::File::open(&source_path).await;
82 21493 : let source_file = match source_file_res {
83 21493 : Ok(source_file) => source_file,
84 UBC 0 : Err(e) if e.kind() == ErrorKind::NotFound => {
85 : // If we encounter this arm, it wasn't intended, but it's also not
86 : // a big problem, if it's because the file was deleted before an
87 : // upload. However, a nonexistent file can also be indicative of
88 : // something worse, like when a file is scheduled for upload before
89 : // it has been written to disk yet.
90 : //
91 : // This is tested against `test_compaction_delete_before_upload`
92 0 : info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
93 0 : return Ok(());
94 : }
95 0 : Err(e) => {
96 0 : Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
97 : }
98 : };
99 :
100 CBC 21493 : let fs_size = source_file
101 21493 : .metadata()
102 20892 : .await
103 21493 : .with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
104 21493 : .len();
105 21493 :
106 21493 : let metadata_size = known_metadata.file_size();
107 21493 : if metadata_size != fs_size {
108 UBC 0 : bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
109 CBC 21493 : }
110 :
111 21493 : let fs_size = usize::try_from(fs_size)
112 21493 : .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
113 :
114 21493 : let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
115 21493 :
116 21493 : upload_cancellable(cancel, storage.upload(reader, fs_size, &storage_path, None))
117 784671 : .await
118 21490 : .with_context(|| format!("upload layer from local path '{source_path}'"))?;
119 :
120 19646 : Ok(())
121 21495 : }
122 :
123 : /// Uploads the given `initdb` data to the remote storage.
124 589 : pub(crate) async fn upload_initdb_dir(
125 589 : storage: &GenericRemoteStorage,
126 589 : tenant_id: &TenantId,
127 589 : timeline_id: &TimelineId,
128 589 : mut initdb_tar_zst: File,
129 589 : size: u64,
130 589 : cancel: &CancellationToken,
131 589 : ) -> anyhow::Result<()> {
132 UBC 0 : tracing::trace!("uploading initdb dir");
133 :
134 : // We might have read somewhat into the file already in the prior retry attempt
135 CBC 589 : initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
136 :
137 589 : let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
138 589 :
139 589 : let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
140 589 : upload_cancellable(
141 589 : cancel,
142 589 : storage.upload_storage_object(file, size as usize, &remote_path),
143 589 : )
144 23154 : .await
145 589 : .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
146 589 : }
|