Line data Source code
1 : //! Helper functions to upload files to remote storage with a RemoteStorage
2 :
3 : use anyhow::{bail, Context};
4 : use camino::Utf8Path;
5 : use fail::fail_point;
6 : use pageserver_api::shard::TenantShardId;
7 : use std::io::{ErrorKind, SeekFrom};
8 : use std::time::SystemTime;
9 : use tokio::fs::{self, File};
10 : use tokio::io::AsyncSeekExt;
11 : use tokio_util::sync::CancellationToken;
12 : use utils::backoff;
13 :
14 : use super::Generation;
15 : use crate::tenant::remote_timeline_client::{
16 : index::IndexPart, remote_index_path, remote_initdb_archive_path,
17 : remote_initdb_preserved_archive_path,
18 : };
19 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
20 : use utils::id::{TenantId, TimelineId};
21 :
22 : use tracing::info;
23 :
24 : /// Serializes and uploads the given index part data to the remote storage.
25 1151 : pub(crate) async fn upload_index_part<'a>(
26 1151 : storage: &'a GenericRemoteStorage,
27 1151 : tenant_shard_id: &TenantShardId,
28 1151 : timeline_id: &TimelineId,
29 1151 : generation: Generation,
30 1151 : index_part: &'a IndexPart,
31 1151 : cancel: &CancellationToken,
32 1151 : ) -> anyhow::Result<()> {
33 1151 : tracing::trace!("uploading new index part");
34 :
35 1151 : fail_point!("before-upload-index", |_| {
36 0 : bail!("failpoint before-upload-index")
37 1151 : });
38 : pausable_failpoint!("before-upload-index-pausable");
39 :
40 1150 : let index_part_bytes = index_part
41 1150 : .to_s3_bytes()
42 1150 : .context("serialize index part file into bytes")?;
43 1150 : let index_part_size = index_part_bytes.len();
44 1150 : let index_part_bytes = bytes::Bytes::from(index_part_bytes);
45 1150 :
46 1150 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
47 1150 : storage
48 1150 : .upload_storage_object(
49 1150 : futures::stream::once(futures::future::ready(Ok(index_part_bytes))),
50 1150 : index_part_size,
51 1150 : &remote_path,
52 1150 : cancel,
53 1150 : )
54 3592 : .await
55 1144 : .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
56 1144 : }
57 :
58 : /// Attempts to upload given layer files.
59 : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
60 : ///
61 : /// On an error, bumps the retries count and reschedules the entire task.
62 1072 : pub(super) async fn upload_timeline_layer<'a>(
63 1072 : storage: &'a GenericRemoteStorage,
64 1072 : local_path: &'a Utf8Path,
65 1072 : remote_path: &'a RemotePath,
66 1072 : metadata_size: u64,
67 1072 : cancel: &CancellationToken,
68 1072 : ) -> anyhow::Result<()> {
69 1072 : fail_point!("before-upload-layer", |_| {
70 0 : bail!("failpoint before-upload-layer")
71 1072 : });
72 :
73 : pausable_failpoint!("before-upload-layer-pausable");
74 :
75 1066 : let source_file_res = fs::File::open(&local_path).await;
76 1066 : let source_file = match source_file_res {
77 1066 : Ok(source_file) => source_file,
78 0 : Err(e) if e.kind() == ErrorKind::NotFound => {
79 0 : // If we encounter this arm, it wasn't intended, but it's also not
80 0 : // a big problem, if it's because the file was deleted before an
81 0 : // upload. However, a nonexistent file can also be indicative of
82 0 : // something worse, like when a file is scheduled for upload before
83 0 : // it has been written to disk yet.
84 0 : //
85 0 : // This is tested against `test_compaction_delete_before_upload`
86 0 : info!(path = %local_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
87 0 : return Ok(());
88 : }
89 0 : Err(e) => Err(e).with_context(|| format!("open a source file for layer {local_path:?}"))?,
90 : };
91 :
92 1066 : let fs_size = source_file
93 1066 : .metadata()
94 1058 : .await
95 1065 : .with_context(|| format!("get the source file metadata for layer {local_path:?}"))?
96 1065 : .len();
97 1065 :
98 1065 : if metadata_size != fs_size {
99 0 : bail!("File {local_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
100 1065 : }
101 :
102 1065 : let fs_size = usize::try_from(fs_size)
103 1065 : .with_context(|| format!("convert {local_path:?} size {fs_size} usize"))?;
104 :
105 1065 : let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
106 1065 :
107 1065 : storage
108 1065 : .upload(reader, fs_size, remote_path, None, cancel)
109 27423 : .await
110 959 : .with_context(|| format!("upload layer from local path '{local_path}'"))
111 959 : }
112 :
113 0 : pub(super) async fn copy_timeline_layer(
114 0 : storage: &GenericRemoteStorage,
115 0 : source_path: &RemotePath,
116 0 : target_path: &RemotePath,
117 0 : cancel: &CancellationToken,
118 0 : ) -> anyhow::Result<()> {
119 0 : fail_point!("before-copy-layer", |_| {
120 0 : bail!("failpoint before-copy-layer")
121 0 : });
122 :
123 : pausable_failpoint!("before-copy-layer-pausable");
124 :
125 0 : storage
126 0 : .copy_object(source_path, target_path, cancel)
127 0 : .await
128 0 : .with_context(|| format!("copy layer {source_path} to {target_path}"))
129 0 : }
130 :
131 : /// Uploads the given `initdb` data to the remote storage.
132 0 : pub(crate) async fn upload_initdb_dir(
133 0 : storage: &GenericRemoteStorage,
134 0 : tenant_id: &TenantId,
135 0 : timeline_id: &TimelineId,
136 0 : mut initdb_tar_zst: File,
137 0 : size: u64,
138 0 : cancel: &CancellationToken,
139 0 : ) -> anyhow::Result<()> {
140 0 : tracing::trace!("uploading initdb dir");
141 :
142 : // We might have read somewhat into the file already in the prior retry attempt
143 0 : initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
144 :
145 0 : let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
146 0 :
147 0 : let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
148 0 : storage
149 0 : .upload_storage_object(file, size as usize, &remote_path, cancel)
150 0 : .await
151 0 : .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
152 0 : }
153 :
154 0 : pub(crate) async fn preserve_initdb_archive(
155 0 : storage: &GenericRemoteStorage,
156 0 : tenant_id: &TenantId,
157 0 : timeline_id: &TimelineId,
158 0 : cancel: &CancellationToken,
159 0 : ) -> anyhow::Result<()> {
160 0 : let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
161 0 : let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
162 0 : storage
163 0 : .copy_object(&source_path, &dest_path, cancel)
164 0 : .await
165 0 : .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
166 0 : }
167 :
168 0 : pub(crate) async fn time_travel_recover_tenant(
169 0 : storage: &GenericRemoteStorage,
170 0 : tenant_shard_id: &TenantShardId,
171 0 : timestamp: SystemTime,
172 0 : done_if_after: SystemTime,
173 0 : cancel: &CancellationToken,
174 0 : ) -> Result<(), TimeTravelError> {
175 0 : let warn_after = 3;
176 0 : let max_attempts = 10;
177 0 : let mut prefixes = Vec::with_capacity(2);
178 0 : if tenant_shard_id.is_shard_zero() {
179 0 : // Also recover the unsharded prefix for a shard of zero:
180 0 : // - if the tenant is totally unsharded, the unsharded prefix contains all the data
181 0 : // - if the tenant is sharded, we still want to recover the initdb data, but we only
182 0 : // want to do it once, so let's do it on the 0 shard
183 0 : let timelines_path_unsharded =
184 0 : super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
185 0 : prefixes.push(timelines_path_unsharded);
186 0 : }
187 0 : if !tenant_shard_id.is_unsharded() {
188 0 : // If the tenant is sharded, we need to recover the sharded prefix
189 0 : let timelines_path = super::remote_timelines_path(tenant_shard_id);
190 0 : prefixes.push(timelines_path);
191 0 : }
192 0 : for prefix in &prefixes {
193 0 : backoff::retry(
194 0 : || async {
195 0 : storage
196 0 : .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
197 0 : .await
198 0 : },
199 0 : |e| !matches!(e, TimeTravelError::Other(_)),
200 0 : warn_after,
201 0 : max_attempts,
202 0 : "time travel recovery of tenant prefix",
203 0 : cancel,
204 0 : )
205 0 : .await
206 0 : .ok_or_else(|| TimeTravelError::Cancelled)
207 0 : .and_then(|x| x)?;
208 : }
209 0 : Ok(())
210 0 : }
|