Line data Source code
1 : //! Helper functions to upload files to remote storage with a RemoteStorage
2 :
3 : use anyhow::{bail, Context};
4 : use camino::Utf8Path;
5 : use fail::fail_point;
6 : use pageserver_api::shard::TenantShardId;
7 : use std::io::{ErrorKind, SeekFrom};
8 : use std::time::SystemTime;
9 : use tokio::fs::{self, File};
10 : use tokio::io::AsyncSeekExt;
11 : use tokio_util::sync::CancellationToken;
12 : use utils::backoff;
13 :
14 : use super::Generation;
15 : use crate::{
16 : config::PageServerConf,
17 : tenant::remote_timeline_client::{
18 : index::IndexPart, remote_index_path, remote_initdb_archive_path,
19 : remote_initdb_preserved_archive_path, remote_path, upload_cancellable,
20 : },
21 : };
22 : use remote_storage::{GenericRemoteStorage, TimeTravelError};
23 : use utils::id::{TenantId, TimelineId};
24 :
25 : use super::index::LayerFileMetadata;
26 :
27 : use tracing::info;
28 :
29 : /// Serializes and uploads the given index part data to the remote storage.
30 6955 : pub(super) async fn upload_index_part<'a>(
31 6955 : storage: &'a GenericRemoteStorage,
32 6955 : tenant_shard_id: &TenantShardId,
33 6955 : timeline_id: &TimelineId,
34 6955 : generation: Generation,
35 6955 : index_part: &'a IndexPart,
36 6955 : cancel: &CancellationToken,
37 6955 : ) -> anyhow::Result<()> {
38 0 : tracing::trace!("uploading new index part");
39 :
40 6955 : fail_point!("before-upload-index", |_| {
41 4 : bail!("failpoint before-upload-index")
42 6955 : });
43 6951 : pausable_failpoint!("before-upload-index-pausable");
44 :
45 6948 : let index_part_bytes = index_part
46 6948 : .to_s3_bytes()
47 6948 : .context("serialize index part file into bytes")?;
48 6948 : let index_part_size = index_part_bytes.len();
49 6948 : let index_part_bytes = bytes::Bytes::from(index_part_bytes);
50 6948 :
51 6948 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
52 6948 : upload_cancellable(
53 6948 : cancel,
54 6948 : storage.upload_storage_object(
55 6948 : futures::stream::once(futures::future::ready(Ok(index_part_bytes))),
56 6948 : index_part_size,
57 6948 : &remote_path,
58 6948 : ),
59 6948 : )
60 16334 : .await
61 6943 : .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
62 6947 : }
63 :
64 : /// Attempts to upload given layer files.
65 : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
66 : ///
67 : /// On an error, bumps the retries count and reschedules the entire task.
68 23207 : pub(super) async fn upload_timeline_layer<'a>(
69 23207 : conf: &'static PageServerConf,
70 23207 : storage: &'a GenericRemoteStorage,
71 23207 : source_path: &'a Utf8Path,
72 23207 : known_metadata: &'a LayerFileMetadata,
73 23207 : generation: Generation,
74 23207 : cancel: &CancellationToken,
75 23207 : ) -> anyhow::Result<()> {
76 23207 : fail_point!("before-upload-layer", |_| {
77 3 : bail!("failpoint before-upload-layer")
78 23207 : });
79 :
80 23204 : pausable_failpoint!("before-upload-layer-pausable");
81 :
82 23204 : let storage_path = remote_path(conf, source_path, generation)?;
83 23204 : let source_file_res = fs::File::open(&source_path).await;
84 23204 : let source_file = match source_file_res {
85 23204 : Ok(source_file) => source_file,
86 0 : Err(e) if e.kind() == ErrorKind::NotFound => {
87 : // If we encounter this arm, it wasn't intended, but it's also not
88 : // a big problem, if it's because the file was deleted before an
89 : // upload. However, a nonexistent file can also be indicative of
90 : // something worse, like when a file is scheduled for upload before
91 : // it has been written to disk yet.
92 : //
93 : // This is tested against `test_compaction_delete_before_upload`
94 0 : info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
95 0 : return Ok(());
96 : }
97 0 : Err(e) => {
98 0 : Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
99 : }
100 : };
101 :
102 23204 : let fs_size = source_file
103 23204 : .metadata()
104 22614 : .await
105 23204 : .with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
106 23204 : .len();
107 23204 :
108 23204 : let metadata_size = known_metadata.file_size();
109 23204 : if metadata_size != fs_size {
110 0 : bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
111 23204 : }
112 :
113 23204 : let fs_size = usize::try_from(fs_size)
114 23204 : .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
115 :
116 23204 : let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
117 23204 :
118 23204 : upload_cancellable(cancel, storage.upload(reader, fs_size, &storage_path, None))
119 1058775 : .await
120 23203 : .with_context(|| format!("upload layer from local path '{source_path}'"))?;
121 :
122 21333 : Ok(())
123 23206 : }
124 :
125 : /// Uploads the given `initdb` data to the remote storage.
126 632 : pub(crate) async fn upload_initdb_dir(
127 632 : storage: &GenericRemoteStorage,
128 632 : tenant_id: &TenantId,
129 632 : timeline_id: &TimelineId,
130 632 : mut initdb_tar_zst: File,
131 632 : size: u64,
132 632 : cancel: &CancellationToken,
133 632 : ) -> anyhow::Result<()> {
134 0 : tracing::trace!("uploading initdb dir");
135 :
136 : // We might have read somewhat into the file already in the prior retry attempt
137 632 : initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
138 :
139 632 : let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
140 632 :
141 632 : let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
142 632 : upload_cancellable(
143 632 : cancel,
144 632 : storage.upload_storage_object(file, size as usize, &remote_path),
145 632 : )
146 26802 : .await
147 632 : .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
148 632 : }
149 :
150 2 : pub(crate) async fn preserve_initdb_archive(
151 2 : storage: &GenericRemoteStorage,
152 2 : tenant_id: &TenantId,
153 2 : timeline_id: &TimelineId,
154 2 : cancel: &CancellationToken,
155 2 : ) -> anyhow::Result<()> {
156 2 : let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
157 2 : let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
158 2 : upload_cancellable(cancel, storage.copy_object(&source_path, &dest_path))
159 2 : .await
160 2 : .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
161 2 : }
162 :
163 1 : pub(crate) async fn time_travel_recover_tenant(
164 1 : storage: &GenericRemoteStorage,
165 1 : tenant_shard_id: &TenantShardId,
166 1 : timestamp: SystemTime,
167 1 : done_if_after: SystemTime,
168 1 : cancel: &CancellationToken,
169 1 : ) -> Result<(), TimeTravelError> {
170 1 : let warn_after = 3;
171 1 : let max_attempts = 10;
172 1 : let mut prefixes = Vec::with_capacity(2);
173 1 : if tenant_shard_id.is_zero() {
174 1 : // Also recover the unsharded prefix for a shard of zero:
175 1 : // - if the tenant is totally unsharded, the unsharded prefix contains all the data
176 1 : // - if the tenant is sharded, we still want to recover the initdb data, but we only
177 1 : // want to do it once, so let's do it on the 0 shard
178 1 : let timelines_path_unsharded =
179 1 : super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
180 1 : prefixes.push(timelines_path_unsharded);
181 1 : }
182 1 : if !tenant_shard_id.is_unsharded() {
183 0 : // If the tenant is sharded, we need to recover the sharded prefix
184 0 : let timelines_path = super::remote_timelines_path(tenant_shard_id);
185 0 : prefixes.push(timelines_path);
186 1 : }
187 2 : for prefix in &prefixes {
188 1 : backoff::retry(
189 1 : || async {
190 1 : storage
191 1 : .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
192 131 : .await
193 1 : },
194 1 : |e| !matches!(e, TimeTravelError::Other(_)),
195 1 : warn_after,
196 1 : max_attempts,
197 1 : "time travel recovery of tenant prefix",
198 1 : cancel,
199 1 : )
200 131 : .await
201 1 : .ok_or_else(|| TimeTravelError::Cancelled)
202 1 : .and_then(|x| x)?;
203 : }
204 1 : Ok(())
205 1 : }
|