Line data Source code
1 : //! Helper functions to upload files to remote storage with a RemoteStorage
2 :
3 : use std::io::{ErrorKind, SeekFrom};
4 : use std::num::NonZeroU32;
5 : use std::time::SystemTime;
6 :
7 : use anyhow::{Context, bail};
8 : use bytes::Bytes;
9 : use camino::Utf8Path;
10 : use fail::fail_point;
11 : use pageserver_api::shard::TenantShardId;
12 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
13 : use tokio::fs::{self, File};
14 : use tokio::io::AsyncSeekExt;
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::info;
17 : use utils::id::{TenantId, TimelineId};
18 : use utils::{backoff, pausable_failpoint};
19 :
20 : use super::Generation;
21 : use super::index::IndexPart;
22 : use super::manifest::TenantManifest;
23 : use crate::tenant::remote_timeline_client::{
24 : remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
25 : remote_tenant_manifest_path,
26 : };
27 :
28 : /// Serializes and uploads the given index part data to the remote storage.
29 774 : pub(crate) async fn upload_index_part(
30 774 : storage: &GenericRemoteStorage,
31 774 : tenant_shard_id: &TenantShardId,
32 774 : timeline_id: &TimelineId,
33 774 : generation: Generation,
34 774 : index_part: &IndexPart,
35 774 : cancel: &CancellationToken,
36 774 : ) -> anyhow::Result<()> {
37 774 : tracing::trace!("uploading new index part");
38 :
39 774 : fail_point!("before-upload-index", |_| {
40 0 : bail!("failpoint before-upload-index")
41 0 : });
42 774 : pausable_failpoint!("before-upload-index-pausable");
43 :
44 : // Safety: refuse to persist invalid index metadata, to mitigate the impact of any bug that produces this
45 : // (this should never happen)
46 774 : index_part.validate().map_err(|e| anyhow::anyhow!(e))?;
47 :
48 : // FIXME: this error comes too late
49 774 : let serialized = index_part.to_json_bytes()?;
50 774 : let serialized = Bytes::from(serialized);
51 :
52 774 : let index_part_size = serialized.len();
53 :
54 774 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
55 774 : storage
56 774 : .upload_storage_object(
57 774 : futures::stream::once(futures::future::ready(Ok(serialized))),
58 774 : index_part_size,
59 774 : &remote_path,
60 774 : cancel,
61 774 : )
62 774 : .await
63 770 : .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
64 770 : }
65 :
66 : /// Serializes and uploads the given tenant manifest data to the remote storage.
67 117 : pub(crate) async fn upload_tenant_manifest(
68 117 : storage: &GenericRemoteStorage,
69 117 : tenant_shard_id: &TenantShardId,
70 117 : generation: Generation,
71 117 : tenant_manifest: &TenantManifest,
72 117 : cancel: &CancellationToken,
73 117 : ) -> anyhow::Result<()> {
74 117 : tracing::trace!("uploading new tenant manifest");
75 :
76 117 : fail_point!("before-upload-manifest", |_| {
77 0 : bail!("failpoint before-upload-manifest")
78 0 : });
79 117 : pausable_failpoint!("before-upload-manifest-pausable");
80 :
81 117 : let serialized = Bytes::from(tenant_manifest.to_json_bytes()?);
82 117 : let tenant_manifest_size = serialized.len();
83 117 : let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
84 :
85 117 : storage
86 117 : .upload_storage_object(
87 117 : futures::stream::once(futures::future::ready(Ok(serialized))),
88 117 : tenant_manifest_size,
89 117 : &remote_path,
90 117 : cancel,
91 117 : )
92 117 : .await
93 117 : .with_context(|| format!("upload tenant manifest for '{tenant_shard_id}'"))
94 117 : }
95 :
96 : /// Attempts to upload given layer files.
97 : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
98 : ///
99 : /// On an error, bumps the retries count and reschedules the entire task.
100 890 : pub(super) async fn upload_timeline_layer<'a>(
101 890 : storage: &'a GenericRemoteStorage,
102 890 : local_path: &'a Utf8Path,
103 890 : remote_path: &'a RemotePath,
104 890 : metadata_size: u64,
105 890 : cancel: &CancellationToken,
106 890 : ) -> anyhow::Result<()> {
107 890 : fail_point!("before-upload-layer", |_| {
108 0 : bail!("failpoint before-upload-layer")
109 0 : });
110 :
111 890 : pausable_failpoint!("before-upload-layer-pausable");
112 :
113 889 : let source_file_res = fs::File::open(&local_path).await;
114 888 : let source_file = match source_file_res {
115 888 : Ok(source_file) => source_file,
116 0 : Err(e) if e.kind() == ErrorKind::NotFound => {
117 : // If we encounter this arm, it wasn't intended, but it's also not
118 : // a big problem, if it's because the file was deleted before an
119 : // upload. However, a nonexistent file can also be indicative of
120 : // something worse, like when a file is scheduled for upload before
121 : // it has been written to disk yet.
122 : //
123 : // This is tested against `test_compaction_delete_before_upload`
124 0 : info!(path = %local_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
125 0 : return Ok(());
126 : }
127 0 : Err(e) => Err(e).with_context(|| format!("open a source file for layer {local_path:?}"))?,
128 : };
129 :
130 888 : let fs_size = source_file
131 888 : .metadata()
132 888 : .await
133 888 : .with_context(|| format!("get the source file metadata for layer {local_path:?}"))?
134 888 : .len();
135 :
136 888 : if metadata_size != fs_size {
137 0 : bail!(
138 0 : "File {local_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}"
139 : );
140 888 : }
141 :
142 888 : let fs_size = usize::try_from(fs_size)
143 888 : .with_context(|| format!("convert {local_path:?} size {fs_size} usize"))?;
144 : /* BEGIN_HADRON */
145 888 : let mut metadata = None;
146 888 : match storage {
147 : // Pass the file path as a storage metadata to minimize changes to neon.
148 : // Otherwise, we need to change the upload interface.
149 0 : GenericRemoteStorage::AzureBlob(s) => {
150 0 : let block_size_mb = s.put_block_size_mb.unwrap_or(0);
151 0 : if block_size_mb > 0 && fs_size > block_size_mb * 1024 * 1024 {
152 0 : metadata = Some(remote_storage::StorageMetadata::from([(
153 0 : "databricks_azure_put_block",
154 0 : local_path.as_str(),
155 0 : )]));
156 0 : }
157 : }
158 888 : GenericRemoteStorage::LocalFs(_) => {}
159 0 : GenericRemoteStorage::AwsS3(_) => {}
160 0 : GenericRemoteStorage::Unreliable(_) => {}
161 : };
162 : /* END_HADRON */
163 888 : let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
164 :
165 888 : storage
166 888 : .upload(reader, fs_size, remote_path, metadata, cancel)
167 888 : .await
168 880 : .with_context(|| format!("upload layer from local path '{local_path}'"))
169 880 : }
170 :
171 0 : pub(super) async fn copy_timeline_layer(
172 0 : storage: &GenericRemoteStorage,
173 0 : source_path: &RemotePath,
174 0 : target_path: &RemotePath,
175 0 : cancel: &CancellationToken,
176 0 : ) -> anyhow::Result<()> {
177 0 : fail_point!("before-copy-layer", |_| {
178 0 : bail!("failpoint before-copy-layer")
179 0 : });
180 :
181 0 : pausable_failpoint!("before-copy-layer-pausable");
182 :
183 0 : storage
184 0 : .copy_object(source_path, target_path, cancel)
185 0 : .await
186 0 : .with_context(|| format!("copy layer {source_path} to {target_path}"))
187 0 : }
188 :
189 : /// Uploads the given `initdb` data to the remote storage.
190 0 : pub(crate) async fn upload_initdb_dir(
191 0 : storage: &GenericRemoteStorage,
192 0 : tenant_id: &TenantId,
193 0 : timeline_id: &TimelineId,
194 0 : mut initdb_tar_zst: File,
195 0 : size: u64,
196 0 : cancel: &CancellationToken,
197 0 : ) -> anyhow::Result<()> {
198 0 : tracing::trace!("uploading initdb dir");
199 :
200 : // We might have read somewhat into the file already in the prior retry attempt
201 0 : initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
202 :
203 0 : let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
204 :
205 0 : let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
206 0 : storage
207 0 : .upload_storage_object(file, size as usize, &remote_path, cancel)
208 0 : .await
209 0 : .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
210 0 : }
211 :
212 0 : pub(crate) async fn preserve_initdb_archive(
213 0 : storage: &GenericRemoteStorage,
214 0 : tenant_id: &TenantId,
215 0 : timeline_id: &TimelineId,
216 0 : cancel: &CancellationToken,
217 0 : ) -> anyhow::Result<()> {
218 0 : let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
219 0 : let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
220 0 : storage
221 0 : .copy_object(&source_path, &dest_path, cancel)
222 0 : .await
223 0 : .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
224 0 : }
225 :
226 0 : pub(crate) async fn time_travel_recover_tenant(
227 0 : storage: &GenericRemoteStorage,
228 0 : tenant_shard_id: &TenantShardId,
229 0 : timestamp: SystemTime,
230 0 : done_if_after: SystemTime,
231 0 : cancel: &CancellationToken,
232 0 : ) -> Result<(), TimeTravelError> {
233 0 : let warn_after = 3;
234 0 : let max_attempts = 10;
235 0 : let mut prefixes = Vec::with_capacity(2);
236 0 : if tenant_shard_id.is_shard_zero() {
237 0 : // Also recover the unsharded prefix for a shard of zero:
238 0 : // - if the tenant is totally unsharded, the unsharded prefix contains all the data
239 0 : // - if the tenant is sharded, we still want to recover the initdb data, but we only
240 0 : // want to do it once, so let's do it on the 0 shard
241 0 : let timelines_path_unsharded =
242 0 : super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
243 0 : prefixes.push(timelines_path_unsharded);
244 0 : }
245 0 : if !tenant_shard_id.is_unsharded() {
246 0 : // If the tenant is sharded, we need to recover the sharded prefix
247 0 : let timelines_path = super::remote_timelines_path(tenant_shard_id);
248 0 : prefixes.push(timelines_path);
249 0 : }
250 :
251 : // Limit the number of versions deletions, mostly so that we don't
252 : // keep requesting forever if the list is too long, as we'd put the
253 : // list in RAM.
254 : // Building a list of 100k entries that reaches the limit roughly takes
255 : // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
256 : const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
257 :
258 0 : for prefix in &prefixes {
259 0 : backoff::retry(
260 0 : || async {
261 0 : storage
262 0 : .time_travel_recover(
263 0 : Some(prefix),
264 0 : timestamp,
265 0 : done_if_after,
266 0 : cancel,
267 0 : COMPLEXITY_LIMIT,
268 0 : )
269 0 : .await
270 0 : },
271 0 : |e| !matches!(e, TimeTravelError::Other(_)),
272 0 : warn_after,
273 0 : max_attempts,
274 0 : "time travel recovery of tenant prefix",
275 0 : cancel,
276 : )
277 0 : .await
278 0 : .ok_or_else(|| TimeTravelError::Cancelled)
279 0 : .and_then(|x| x)?;
280 : }
281 0 : Ok(())
282 0 : }
|