Line data Source code
1 : //! Helper functions to upload files to remote storage with a RemoteStorage
2 :
3 : use anyhow::{bail, Context};
4 : use bytes::Bytes;
5 : use camino::Utf8Path;
6 : use fail::fail_point;
7 : use pageserver_api::shard::TenantShardId;
8 : use std::io::{ErrorKind, SeekFrom};
9 : use std::time::SystemTime;
10 : use tokio::fs::{self, File};
11 : use tokio::io::AsyncSeekExt;
12 : use tokio_util::sync::CancellationToken;
13 : use utils::{backoff, pausable_failpoint};
14 :
15 : use super::index::IndexPart;
16 : use super::Generation;
17 : use crate::tenant::remote_timeline_client::{
18 : remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
19 : };
20 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
21 : use utils::id::{TenantId, TimelineId};
22 :
23 : use tracing::info;
24 :
25 : /// Serializes and uploads the given index part data to the remote storage.
26 4247 : pub(crate) async fn upload_index_part<'a>(
27 4247 : storage: &'a GenericRemoteStorage,
28 4247 : tenant_shard_id: &TenantShardId,
29 4247 : timeline_id: &TimelineId,
30 4247 : generation: Generation,
31 4247 : index_part: &IndexPart,
32 4247 : cancel: &CancellationToken,
33 4247 : ) -> anyhow::Result<()> {
34 4247 : tracing::trace!("uploading new index part");
35 :
36 4247 : fail_point!("before-upload-index", |_| {
37 0 : bail!("failpoint before-upload-index")
38 4247 : });
39 4247 : pausable_failpoint!("before-upload-index-pausable");
40 :
41 : // FIXME: this error comes too late
42 4242 : let serialized = index_part.to_s3_bytes()?;
43 4242 : let serialized = Bytes::from(serialized);
44 4242 :
45 4242 : let index_part_size = serialized.len();
46 4242 :
47 4242 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
48 4242 : storage
49 4242 : .upload_storage_object(
50 4242 : futures::stream::once(futures::future::ready(Ok(serialized))),
51 4242 : index_part_size,
52 4242 : &remote_path,
53 4242 : cancel,
54 4242 : )
55 12617 : .await
56 4216 : .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
57 4216 : }
58 :
59 : /// Attempts to upload given layer files.
60 : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
61 : ///
62 : /// On an error, bumps the retries count and reschedules the entire task.
63 4101 : pub(super) async fn upload_timeline_layer<'a>(
64 4101 : storage: &'a GenericRemoteStorage,
65 4101 : local_path: &'a Utf8Path,
66 4101 : remote_path: &'a RemotePath,
67 4101 : metadata_size: u64,
68 4101 : cancel: &CancellationToken,
69 4101 : ) -> anyhow::Result<()> {
70 4101 : fail_point!("before-upload-layer", |_| {
71 0 : bail!("failpoint before-upload-layer")
72 4101 : });
73 :
74 4101 : pausable_failpoint!("before-upload-layer-pausable");
75 :
76 4087 : let source_file_res = fs::File::open(&local_path).await;
77 4046 : let source_file = match source_file_res {
78 4046 : Ok(source_file) => source_file,
79 0 : Err(e) if e.kind() == ErrorKind::NotFound => {
80 0 : // If we encounter this arm, it wasn't intended, but it's also not
81 0 : // a big problem, if it's because the file was deleted before an
82 0 : // upload. However, a nonexistent file can also be indicative of
83 0 : // something worse, like when a file is scheduled for upload before
84 0 : // it has been written to disk yet.
85 0 : //
86 0 : // This is tested against `test_compaction_delete_before_upload`
87 0 : info!(path = %local_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
88 0 : return Ok(());
89 : }
90 0 : Err(e) => Err(e).with_context(|| format!("open a source file for layer {local_path:?}"))?,
91 : };
92 :
93 4046 : let fs_size = source_file
94 4046 : .metadata()
95 3867 : .await
96 4042 : .with_context(|| format!("get the source file metadata for layer {local_path:?}"))?
97 4042 : .len();
98 4042 :
99 4042 : if metadata_size != fs_size {
100 0 : bail!("File {local_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
101 4042 : }
102 :
103 4042 : let fs_size = usize::try_from(fs_size)
104 4042 : .with_context(|| format!("convert {local_path:?} size {fs_size} usize"))?;
105 :
106 4042 : let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
107 4042 :
108 4042 : storage
109 4042 : .upload(reader, fs_size, remote_path, None, cancel)
110 92476 : .await
111 3846 : .with_context(|| format!("upload layer from local path '{local_path}'"))
112 3846 : }
113 :
114 0 : pub(super) async fn copy_timeline_layer(
115 0 : storage: &GenericRemoteStorage,
116 0 : source_path: &RemotePath,
117 0 : target_path: &RemotePath,
118 0 : cancel: &CancellationToken,
119 0 : ) -> anyhow::Result<()> {
120 0 : fail_point!("before-copy-layer", |_| {
121 0 : bail!("failpoint before-copy-layer")
122 0 : });
123 :
124 0 : pausable_failpoint!("before-copy-layer-pausable");
125 :
126 0 : storage
127 0 : .copy_object(source_path, target_path, cancel)
128 0 : .await
129 0 : .with_context(|| format!("copy layer {source_path} to {target_path}"))
130 0 : }
131 :
132 : /// Uploads the given `initdb` data to the remote storage.
133 0 : pub(crate) async fn upload_initdb_dir(
134 0 : storage: &GenericRemoteStorage,
135 0 : tenant_id: &TenantId,
136 0 : timeline_id: &TimelineId,
137 0 : mut initdb_tar_zst: File,
138 0 : size: u64,
139 0 : cancel: &CancellationToken,
140 0 : ) -> anyhow::Result<()> {
141 0 : tracing::trace!("uploading initdb dir");
142 :
143 : // We might have read somewhat into the file already in the prior retry attempt
144 0 : initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
145 :
146 0 : let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
147 0 :
148 0 : let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
149 0 : storage
150 0 : .upload_storage_object(file, size as usize, &remote_path, cancel)
151 0 : .await
152 0 : .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
153 0 : }
154 :
155 0 : pub(crate) async fn preserve_initdb_archive(
156 0 : storage: &GenericRemoteStorage,
157 0 : tenant_id: &TenantId,
158 0 : timeline_id: &TimelineId,
159 0 : cancel: &CancellationToken,
160 0 : ) -> anyhow::Result<()> {
161 0 : let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
162 0 : let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
163 0 : storage
164 0 : .copy_object(&source_path, &dest_path, cancel)
165 0 : .await
166 0 : .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
167 0 : }
168 :
169 0 : pub(crate) async fn time_travel_recover_tenant(
170 0 : storage: &GenericRemoteStorage,
171 0 : tenant_shard_id: &TenantShardId,
172 0 : timestamp: SystemTime,
173 0 : done_if_after: SystemTime,
174 0 : cancel: &CancellationToken,
175 0 : ) -> Result<(), TimeTravelError> {
176 0 : let warn_after = 3;
177 0 : let max_attempts = 10;
178 0 : let mut prefixes = Vec::with_capacity(2);
179 0 : if tenant_shard_id.is_shard_zero() {
180 0 : // Also recover the unsharded prefix for a shard of zero:
181 0 : // - if the tenant is totally unsharded, the unsharded prefix contains all the data
182 0 : // - if the tenant is sharded, we still want to recover the initdb data, but we only
183 0 : // want to do it once, so let's do it on the 0 shard
184 0 : let timelines_path_unsharded =
185 0 : super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
186 0 : prefixes.push(timelines_path_unsharded);
187 0 : }
188 0 : if !tenant_shard_id.is_unsharded() {
189 0 : // If the tenant is sharded, we need to recover the sharded prefix
190 0 : let timelines_path = super::remote_timelines_path(tenant_shard_id);
191 0 : prefixes.push(timelines_path);
192 0 : }
193 0 : for prefix in &prefixes {
194 0 : backoff::retry(
195 0 : || async {
196 0 : storage
197 0 : .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
198 0 : .await
199 0 : },
200 0 : |e| !matches!(e, TimeTravelError::Other(_)),
201 0 : warn_after,
202 0 : max_attempts,
203 0 : "time travel recovery of tenant prefix",
204 0 : cancel,
205 0 : )
206 0 : .await
207 0 : .ok_or_else(|| TimeTravelError::Cancelled)
208 0 : .and_then(|x| x)?;
209 : }
210 0 : Ok(())
211 0 : }
|