Line data Source code
1 : //! Helper functions to upload files to remote storage with a RemoteStorage
2 :
3 : use std::io::{ErrorKind, SeekFrom};
4 : use std::time::SystemTime;
5 :
6 : use anyhow::{Context, bail};
7 : use bytes::Bytes;
8 : use camino::Utf8Path;
9 : use fail::fail_point;
10 : use pageserver_api::shard::TenantShardId;
11 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
12 : use tokio::fs::{self, File};
13 : use tokio::io::AsyncSeekExt;
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::info;
16 : use utils::id::{TenantId, TimelineId};
17 : use utils::{backoff, pausable_failpoint};
18 :
19 : use super::Generation;
20 : use super::index::IndexPart;
21 : use super::manifest::TenantManifest;
22 : use crate::tenant::remote_timeline_client::{
23 : remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
24 : remote_tenant_manifest_path,
25 : };
26 :
27 : /// Serializes and uploads the given index part data to the remote storage.
28 2961 : pub(crate) async fn upload_index_part(
29 2961 : storage: &GenericRemoteStorage,
30 2961 : tenant_shard_id: &TenantShardId,
31 2961 : timeline_id: &TimelineId,
32 2961 : generation: Generation,
33 2961 : index_part: &IndexPart,
34 2961 : cancel: &CancellationToken,
35 2961 : ) -> anyhow::Result<()> {
36 2961 : tracing::trace!("uploading new index part");
37 :
38 2961 : fail_point!("before-upload-index", |_| {
39 0 : bail!("failpoint before-upload-index")
40 2961 : });
41 2961 : pausable_failpoint!("before-upload-index-pausable");
42 :
43 : // Safety: refuse to persist invalid index metadata, to mitigate the impact of any bug that produces this
44 : // (this should never happen)
45 2957 : index_part.validate().map_err(|e| anyhow::anyhow!(e))?;
46 :
47 : // FIXME: this error comes too late
48 2957 : let serialized = index_part.to_json_bytes()?;
49 2957 : let serialized = Bytes::from(serialized);
50 2957 :
51 2957 : let index_part_size = serialized.len();
52 2957 :
53 2957 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
54 2957 : storage
55 2957 : .upload_storage_object(
56 2957 : futures::stream::once(futures::future::ready(Ok(serialized))),
57 2957 : index_part_size,
58 2957 : &remote_path,
59 2957 : cancel,
60 2957 : )
61 2957 : .await
62 2930 : .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
63 2930 : }
64 :
65 : /// Serializes and uploads the given tenant manifest data to the remote storage.
66 452 : pub(crate) async fn upload_tenant_manifest(
67 452 : storage: &GenericRemoteStorage,
68 452 : tenant_shard_id: &TenantShardId,
69 452 : generation: Generation,
70 452 : tenant_manifest: &TenantManifest,
71 452 : cancel: &CancellationToken,
72 452 : ) -> anyhow::Result<()> {
73 452 : tracing::trace!("uploading new tenant manifest");
74 :
75 452 : fail_point!("before-upload-manifest", |_| {
76 0 : bail!("failpoint before-upload-manifest")
77 452 : });
78 452 : pausable_failpoint!("before-upload-manifest-pausable");
79 :
80 452 : let serialized = Bytes::from(tenant_manifest.to_json_bytes()?);
81 452 : let tenant_manifest_size = serialized.len();
82 452 : let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
83 452 :
84 452 : storage
85 452 : .upload_storage_object(
86 452 : futures::stream::once(futures::future::ready(Ok(serialized))),
87 452 : tenant_manifest_size,
88 452 : &remote_path,
89 452 : cancel,
90 452 : )
91 452 : .await
92 452 : .with_context(|| format!("upload tenant manifest for '{tenant_shard_id}'"))
93 452 : }
94 :
95 : /// Attempts to upload given layer files.
96 : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
97 : ///
98 : /// On an error, bumps the retries count and reschedules the entire task.
99 3376 : pub(super) async fn upload_timeline_layer<'a>(
100 3376 : storage: &'a GenericRemoteStorage,
101 3376 : local_path: &'a Utf8Path,
102 3376 : remote_path: &'a RemotePath,
103 3376 : metadata_size: u64,
104 3376 : cancel: &CancellationToken,
105 3376 : ) -> anyhow::Result<()> {
106 3376 : fail_point!("before-upload-layer", |_| {
107 0 : bail!("failpoint before-upload-layer")
108 3376 : });
109 :
110 3376 : pausable_failpoint!("before-upload-layer-pausable");
111 :
112 3204 : let source_file_res = fs::File::open(&local_path).await;
113 3150 : let source_file = match source_file_res {
114 3150 : Ok(source_file) => source_file,
115 0 : Err(e) if e.kind() == ErrorKind::NotFound => {
116 0 : // If we encounter this arm, it wasn't intended, but it's also not
117 0 : // a big problem, if it's because the file was deleted before an
118 0 : // upload. However, a nonexistent file can also be indicative of
119 0 : // something worse, like when a file is scheduled for upload before
120 0 : // it has been written to disk yet.
121 0 : //
122 0 : // This is tested against `test_compaction_delete_before_upload`
123 0 : info!(path = %local_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
124 0 : return Ok(());
125 : }
126 0 : Err(e) => Err(e).with_context(|| format!("open a source file for layer {local_path:?}"))?,
127 : };
128 :
129 3150 : let fs_size = source_file
130 3150 : .metadata()
131 3150 : .await
132 3143 : .with_context(|| format!("get the source file metadata for layer {local_path:?}"))?
133 3143 : .len();
134 3143 :
135 3143 : if metadata_size != fs_size {
136 0 : bail!(
137 0 : "File {local_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}"
138 0 : );
139 3143 : }
140 :
141 3143 : let fs_size = usize::try_from(fs_size)
142 3143 : .with_context(|| format!("convert {local_path:?} size {fs_size} usize"))?;
143 :
144 3143 : let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
145 3143 :
146 3143 : storage
147 3143 : .upload(reader, fs_size, remote_path, None, cancel)
148 3143 : .await
149 3077 : .with_context(|| format!("upload layer from local path '{local_path}'"))
150 3077 : }
151 :
152 0 : pub(super) async fn copy_timeline_layer(
153 0 : storage: &GenericRemoteStorage,
154 0 : source_path: &RemotePath,
155 0 : target_path: &RemotePath,
156 0 : cancel: &CancellationToken,
157 0 : ) -> anyhow::Result<()> {
158 0 : fail_point!("before-copy-layer", |_| {
159 0 : bail!("failpoint before-copy-layer")
160 0 : });
161 :
162 0 : pausable_failpoint!("before-copy-layer-pausable");
163 :
164 0 : storage
165 0 : .copy_object(source_path, target_path, cancel)
166 0 : .await
167 0 : .with_context(|| format!("copy layer {source_path} to {target_path}"))
168 0 : }
169 :
170 : /// Uploads the given `initdb` data to the remote storage.
171 0 : pub(crate) async fn upload_initdb_dir(
172 0 : storage: &GenericRemoteStorage,
173 0 : tenant_id: &TenantId,
174 0 : timeline_id: &TimelineId,
175 0 : mut initdb_tar_zst: File,
176 0 : size: u64,
177 0 : cancel: &CancellationToken,
178 0 : ) -> anyhow::Result<()> {
179 0 : tracing::trace!("uploading initdb dir");
180 :
181 : // We might have read somewhat into the file already in the prior retry attempt
182 0 : initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
183 :
184 0 : let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
185 0 :
186 0 : let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
187 0 : storage
188 0 : .upload_storage_object(file, size as usize, &remote_path, cancel)
189 0 : .await
190 0 : .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
191 0 : }
192 :
193 0 : pub(crate) async fn preserve_initdb_archive(
194 0 : storage: &GenericRemoteStorage,
195 0 : tenant_id: &TenantId,
196 0 : timeline_id: &TimelineId,
197 0 : cancel: &CancellationToken,
198 0 : ) -> anyhow::Result<()> {
199 0 : let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
200 0 : let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
201 0 : storage
202 0 : .copy_object(&source_path, &dest_path, cancel)
203 0 : .await
204 0 : .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
205 0 : }
206 :
207 0 : pub(crate) async fn time_travel_recover_tenant(
208 0 : storage: &GenericRemoteStorage,
209 0 : tenant_shard_id: &TenantShardId,
210 0 : timestamp: SystemTime,
211 0 : done_if_after: SystemTime,
212 0 : cancel: &CancellationToken,
213 0 : ) -> Result<(), TimeTravelError> {
214 0 : let warn_after = 3;
215 0 : let max_attempts = 10;
216 0 : let mut prefixes = Vec::with_capacity(2);
217 0 : if tenant_shard_id.is_shard_zero() {
218 0 : // Also recover the unsharded prefix for a shard of zero:
219 0 : // - if the tenant is totally unsharded, the unsharded prefix contains all the data
220 0 : // - if the tenant is sharded, we still want to recover the initdb data, but we only
221 0 : // want to do it once, so let's do it on the 0 shard
222 0 : let timelines_path_unsharded =
223 0 : super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
224 0 : prefixes.push(timelines_path_unsharded);
225 0 : }
226 0 : if !tenant_shard_id.is_unsharded() {
227 0 : // If the tenant is sharded, we need to recover the sharded prefix
228 0 : let timelines_path = super::remote_timelines_path(tenant_shard_id);
229 0 : prefixes.push(timelines_path);
230 0 : }
231 0 : for prefix in &prefixes {
232 0 : backoff::retry(
233 0 : || async {
234 0 : storage
235 0 : .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
236 0 : .await
237 0 : },
238 0 : |e| !matches!(e, TimeTravelError::Other(_)),
239 0 : warn_after,
240 0 : max_attempts,
241 0 : "time travel recovery of tenant prefix",
242 0 : cancel,
243 0 : )
244 0 : .await
245 0 : .ok_or_else(|| TimeTravelError::Cancelled)
246 0 : .and_then(|x| x)?;
247 : }
248 0 : Ok(())
249 0 : }
|