Line data Source code
1 : //! Helper functions to upload files to remote storage with a RemoteStorage
2 :
3 : use anyhow::{bail, Context};
4 : use camino::Utf8Path;
5 : use fail::fail_point;
6 : use pageserver_api::shard::TenantShardId;
7 : use std::io::{ErrorKind, SeekFrom};
8 : use std::time::SystemTime;
9 : use tokio::fs::{self, File};
10 : use tokio::io::AsyncSeekExt;
11 : use tokio_util::sync::CancellationToken;
12 : use utils::backoff;
13 :
14 : use super::Generation;
15 : use crate::{
16 : config::PageServerConf,
17 : tenant::remote_timeline_client::{
18 : index::IndexPart, remote_index_path, remote_initdb_archive_path,
19 : remote_initdb_preserved_archive_path, remote_path,
20 : },
21 : };
22 : use remote_storage::{GenericRemoteStorage, TimeTravelError};
23 : use utils::id::{TenantId, TimelineId};
24 :
25 : use super::index::LayerFileMetadata;
26 :
27 : use tracing::info;
28 :
29 : /// Serializes and uploads the given index part data to the remote storage.
30 747 : pub(crate) async fn upload_index_part<'a>(
31 747 : storage: &'a GenericRemoteStorage,
32 747 : tenant_shard_id: &TenantShardId,
33 747 : timeline_id: &TimelineId,
34 747 : generation: Generation,
35 747 : index_part: &'a IndexPart,
36 747 : cancel: &CancellationToken,
37 747 : ) -> anyhow::Result<()> {
38 0 : tracing::trace!("uploading new index part");
39 :
40 747 : fail_point!("before-upload-index", |_| {
41 0 : bail!("failpoint before-upload-index")
42 747 : });
43 747 : pausable_failpoint!("before-upload-index-pausable");
44 :
45 747 : let index_part_bytes = index_part
46 747 : .to_s3_bytes()
47 747 : .context("serialize index part file into bytes")?;
48 747 : let index_part_size = index_part_bytes.len();
49 747 : let index_part_bytes = bytes::Bytes::from(index_part_bytes);
50 747 :
51 747 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation);
52 747 : storage
53 747 : .upload_storage_object(
54 747 : futures::stream::once(futures::future::ready(Ok(index_part_bytes))),
55 747 : index_part_size,
56 747 : &remote_path,
57 747 : cancel,
58 747 : )
59 2429 : .await
60 745 : .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
61 745 : }
62 :
63 : /// Attempts to upload given layer files.
64 : /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
65 : ///
66 : /// On an error, bumps the retries count and reschedules the entire task.
67 555 : pub(super) async fn upload_timeline_layer<'a>(
68 555 : conf: &'static PageServerConf,
69 555 : storage: &'a GenericRemoteStorage,
70 555 : source_path: &'a Utf8Path,
71 555 : known_metadata: &'a LayerFileMetadata,
72 555 : generation: Generation,
73 555 : cancel: &CancellationToken,
74 555 : ) -> anyhow::Result<()> {
75 555 : fail_point!("before-upload-layer", |_| {
76 0 : bail!("failpoint before-upload-layer")
77 555 : });
78 :
79 555 : pausable_failpoint!("before-upload-layer-pausable");
80 :
81 554 : let storage_path = remote_path(conf, source_path, generation)?;
82 554 : let source_file_res = fs::File::open(&source_path).await;
83 554 : let source_file = match source_file_res {
84 554 : Ok(source_file) => source_file,
85 0 : Err(e) if e.kind() == ErrorKind::NotFound => {
86 : // If we encounter this arm, it wasn't intended, but it's also not
87 : // a big problem, if it's because the file was deleted before an
88 : // upload. However, a nonexistent file can also be indicative of
89 : // something worse, like when a file is scheduled for upload before
90 : // it has been written to disk yet.
91 : //
92 : // This is tested against `test_compaction_delete_before_upload`
93 0 : info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
94 0 : return Ok(());
95 : }
96 0 : Err(e) => {
97 0 : Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
98 : }
99 : };
100 :
101 554 : let fs_size = source_file
102 554 : .metadata()
103 553 : .await
104 553 : .with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
105 553 : .len();
106 553 :
107 553 : let metadata_size = known_metadata.file_size();
108 553 : if metadata_size != fs_size {
109 0 : bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
110 553 : }
111 :
112 553 : let fs_size = usize::try_from(fs_size)
113 553 : .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
114 :
115 553 : let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
116 553 :
117 553 : storage
118 553 : .upload(reader, fs_size, &storage_path, None, cancel)
119 17231 : .await
120 543 : .with_context(|| format!("upload layer from local path '{source_path}'"))
121 543 : }
122 :
123 : /// Uploads the given `initdb` data to the remote storage.
124 0 : pub(crate) async fn upload_initdb_dir(
125 0 : storage: &GenericRemoteStorage,
126 0 : tenant_id: &TenantId,
127 0 : timeline_id: &TimelineId,
128 0 : mut initdb_tar_zst: File,
129 0 : size: u64,
130 0 : cancel: &CancellationToken,
131 0 : ) -> anyhow::Result<()> {
132 0 : tracing::trace!("uploading initdb dir");
133 :
134 : // We might have read somewhat into the file already in the prior retry attempt
135 0 : initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
136 :
137 0 : let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
138 0 :
139 0 : let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
140 0 : storage
141 0 : .upload_storage_object(file, size as usize, &remote_path, cancel)
142 0 : .await
143 0 : .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
144 0 : }
145 :
146 0 : pub(crate) async fn preserve_initdb_archive(
147 0 : storage: &GenericRemoteStorage,
148 0 : tenant_id: &TenantId,
149 0 : timeline_id: &TimelineId,
150 0 : cancel: &CancellationToken,
151 0 : ) -> anyhow::Result<()> {
152 0 : let source_path = remote_initdb_archive_path(tenant_id, timeline_id);
153 0 : let dest_path = remote_initdb_preserved_archive_path(tenant_id, timeline_id);
154 0 : storage
155 0 : .copy_object(&source_path, &dest_path, cancel)
156 0 : .await
157 0 : .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
158 0 : }
159 :
160 0 : pub(crate) async fn time_travel_recover_tenant(
161 0 : storage: &GenericRemoteStorage,
162 0 : tenant_shard_id: &TenantShardId,
163 0 : timestamp: SystemTime,
164 0 : done_if_after: SystemTime,
165 0 : cancel: &CancellationToken,
166 0 : ) -> Result<(), TimeTravelError> {
167 0 : let warn_after = 3;
168 0 : let max_attempts = 10;
169 0 : let mut prefixes = Vec::with_capacity(2);
170 0 : if tenant_shard_id.is_zero() {
171 0 : // Also recover the unsharded prefix for a shard of zero:
172 0 : // - if the tenant is totally unsharded, the unsharded prefix contains all the data
173 0 : // - if the tenant is sharded, we still want to recover the initdb data, but we only
174 0 : // want to do it once, so let's do it on the 0 shard
175 0 : let timelines_path_unsharded =
176 0 : super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
177 0 : prefixes.push(timelines_path_unsharded);
178 0 : }
179 0 : if !tenant_shard_id.is_unsharded() {
180 0 : // If the tenant is sharded, we need to recover the sharded prefix
181 0 : let timelines_path = super::remote_timelines_path(tenant_shard_id);
182 0 : prefixes.push(timelines_path);
183 0 : }
184 0 : for prefix in &prefixes {
185 0 : backoff::retry(
186 0 : || async {
187 0 : storage
188 0 : .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
189 0 : .await
190 0 : },
191 0 : |e| !matches!(e, TimeTravelError::Other(_)),
192 0 : warn_after,
193 0 : max_attempts,
194 0 : "time travel recovery of tenant prefix",
195 0 : cancel,
196 0 : )
197 0 : .await
198 0 : .ok_or_else(|| TimeTravelError::Cancelled)
199 0 : .and_then(|x| x)?;
200 : }
201 0 : Ok(())
202 0 : }
|