Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::path::Path;
9 : use std::time::Duration;
10 :
11 : use anyhow::{anyhow, Context};
12 : use tokio::fs;
13 : use tokio::io::AsyncWriteExt;
14 : use tokio_util::sync::CancellationToken;
15 : use utils::{backoff, crashsafe};
16 :
17 : use crate::config::PageServerConf;
18 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
19 : use crate::tenant::storage_layer::LayerFileName;
20 : use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
21 : use crate::tenant::Generation;
22 : use remote_storage::{DownloadError, GenericRemoteStorage};
23 : use utils::crashsafe::path_with_suffix_extension;
24 : use utils::id::{TenantId, TimelineId};
25 :
26 : use super::index::{IndexPart, LayerFileMetadata};
27 : use super::{remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES};
28 :
29 : static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120);
30 :
31 : ///
32 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
33 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
34 : ///
35 : /// Returns the size of the downloaded file.
36 1043 : pub async fn download_layer_file<'a>(
37 1043 : conf: &'static PageServerConf,
38 1043 : storage: &'a GenericRemoteStorage,
39 1043 : tenant_id: TenantId,
40 1043 : timeline_id: TimelineId,
41 1043 : layer_file_name: &'a LayerFileName,
42 1043 : layer_metadata: &'a LayerFileMetadata,
43 1043 : ) -> Result<u64, DownloadError> {
44 1043 : debug_assert_current_span_has_tenant_and_timeline_id();
45 1043 :
46 1043 : let local_path = conf
47 1043 : .timeline_path(&tenant_id, &timeline_id)
48 1043 : .join(layer_file_name.file_name());
49 1043 :
50 1043 : let remote_path = remote_layer_path(&tenant_id, &timeline_id, layer_file_name, layer_metadata);
51 1043 :
52 1043 : // Perform a rename inspired by durable_rename from file_utils.c.
53 1043 : // The sequence:
54 1043 : // write(tmp)
55 1043 : // fsync(tmp)
56 1043 : // rename(tmp, new)
57 1043 : // fsync(new)
58 1043 : // fsync(parent)
59 1043 : // For more context about durable_rename check this email from postgres mailing list:
60 1043 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
61 1043 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
62 1043 : let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
63 :
64 1043 : let (mut destination_file, bytes_amount) = download_retry(
65 1061 : || async {
66 : // TODO: this doesn't use the cached fd for some reason?
67 1061 : let mut destination_file = fs::File::create(&temp_file_path)
68 1029 : .await
69 1061 : .with_context(|| {
70 0 : format!(
71 0 : "create a destination file for layer '{}'",
72 0 : temp_file_path.display()
73 0 : )
74 1061 : })
75 1061 : .map_err(DownloadError::Other)?;
76 1061 : let mut download = storage
77 1061 : .download(&remote_path)
78 1393 : .await
79 1061 : .with_context(|| {
80 18 : format!(
81 18 : "open a download stream for layer with remote storage path '{remote_path:?}'"
82 18 : )
83 1061 : })
84 1061 : .map_err(DownloadError::Other)?;
85 :
86 1043 : let bytes_amount = tokio::time::timeout(
87 1043 : MAX_DOWNLOAD_DURATION,
88 1043 : tokio::io::copy(&mut download.download_stream, &mut destination_file),
89 1043 : )
90 353872 : .await
91 1041 : .map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out {:?}", e)))?
92 1041 : .with_context(|| {
93 0 : format!(
94 0 : "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
95 0 : )
96 1041 : })
97 1041 : .map_err(DownloadError::Other)?;
98 :
99 1041 : Ok((destination_file, bytes_amount))
100 1059 : },
101 1043 : &format!("download {remote_path:?}"),
102 1043 : )
103 356294 : .await?;
104 :
105 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
106 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
107 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
108 : // you should call flush before dropping it.
109 : //
110 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
111 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
112 : // But for additional safety lets check/wait for any pending operations.
113 1041 : destination_file
114 1041 : .flush()
115 0 : .await
116 1041 : .with_context(|| format!("flush source file at {}", temp_file_path.display()))
117 1041 : .map_err(DownloadError::Other)?;
118 :
119 1041 : let expected = layer_metadata.file_size();
120 1041 : if expected != bytes_amount {
121 0 : return Err(DownloadError::Other(anyhow!(
122 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
123 0 : )));
124 1041 : }
125 1041 :
126 1041 : // not using sync_data because it can lose file size update
127 1041 : destination_file
128 1041 : .sync_all()
129 1253 : .await
130 1040 : .with_context(|| {
131 0 : format!(
132 0 : "failed to fsync source file at {}",
133 0 : temp_file_path.display()
134 0 : )
135 1040 : })
136 1040 : .map_err(DownloadError::Other)?;
137 1040 : drop(destination_file);
138 1040 :
139 1040 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
140 32 : Err(DownloadError::Other(anyhow!(
141 32 : "remote-storage-download-pre-rename failpoint triggered"
142 32 : )))
143 1040 : });
144 :
145 1008 : fs::rename(&temp_file_path, &local_path)
146 985 : .await
147 1008 : .with_context(|| format!("rename download layer file to {}", local_path.display(),))
148 1008 : .map_err(DownloadError::Other)?;
149 :
150 1008 : crashsafe::fsync_async(&local_path)
151 1996 : .await
152 1008 : .with_context(|| format!("fsync layer file {}", local_path.display(),))
153 1008 : .map_err(DownloadError::Other)?;
154 :
155 0 : tracing::debug!("download complete: {}", local_path.display());
156 :
157 1008 : Ok(bytes_amount)
158 1040 : }
159 :
160 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
161 :
162 88 : pub fn is_temp_download_file(path: &Path) -> bool {
163 88 : let extension = path.extension().map(|pname| {
164 5 : pname
165 5 : .to_str()
166 5 : .expect("paths passed to this function must be valid Rust strings")
167 88 : });
168 88 : match extension {
169 5 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
170 5 : Some(_) => false,
171 83 : None => false,
172 : }
173 88 : }
174 :
175 : /// List timelines of given tenant in remote storage
176 42 : pub async fn list_remote_timelines(
177 42 : storage: &GenericRemoteStorage,
178 42 : tenant_id: TenantId,
179 42 : ) -> anyhow::Result<HashSet<TimelineId>> {
180 42 : let remote_path = remote_timelines_path(&tenant_id);
181 42 :
182 42 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
183 3 : anyhow::bail!("storage-sync-list-remote-timelines");
184 42 : });
185 :
186 39 : let timelines = download_retry(
187 45 : || storage.list_prefixes(Some(&remote_path)),
188 39 : &format!("list prefixes for {tenant_id}"),
189 39 : )
190 132 : .await?;
191 :
192 39 : if timelines.is_empty() {
193 0 : anyhow::bail!("no timelines found on the remote storage")
194 39 : }
195 39 :
196 39 : let mut timeline_ids = HashSet::new();
197 :
198 94 : for timeline_remote_storage_key in timelines {
199 55 : let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
200 0 : anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
201 55 : })?;
202 :
203 55 : let timeline_id: TimelineId = object_name
204 55 : .parse()
205 55 : .with_context(|| format!("parse object name into timeline id '{object_name}'"))?;
206 :
207 : // list_prefixes is assumed to return unique names. Ensure this here.
208 : // NB: it's safer to bail out than warn-log this because the pageserver
209 : // needs to absolutely know about _all_ timelines that exist, so that
210 : // GC knows all the branchpoints. If we skipped over a timeline instead,
211 : // GC could delete a layer that's still needed by that timeline.
212 55 : anyhow::ensure!(
213 55 : !timeline_ids.contains(&timeline_id),
214 0 : "list_prefixes contains duplicate timeline id {timeline_id}"
215 : );
216 55 : timeline_ids.insert(timeline_id);
217 : }
218 :
219 39 : Ok(timeline_ids)
220 42 : }
221 :
222 204 : pub(super) async fn download_index_part(
223 204 : storage: &GenericRemoteStorage,
224 204 : tenant_id: &TenantId,
225 204 : timeline_id: &TimelineId,
226 204 : generation: Generation,
227 204 : ) -> Result<IndexPart, DownloadError> {
228 204 : let remote_path = remote_index_path(tenant_id, timeline_id, generation);
229 :
230 204 : let index_part_bytes = download_retry(
231 226 : || async {
232 408 : let mut index_part_download = storage.download(&remote_path).await?;
233 :
234 201 : let mut index_part_bytes = Vec::new();
235 201 : tokio::io::copy(
236 201 : &mut index_part_download.download_stream,
237 201 : &mut index_part_bytes,
238 201 : )
239 259 : .await
240 201 : .with_context(|| format!("download index part at {remote_path:?}"))
241 201 : .map_err(DownloadError::Other)?;
242 201 : Ok(index_part_bytes)
243 226 : },
244 204 : &format!("download {remote_path:?}"),
245 204 : )
246 667 : .await?;
247 :
248 201 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
249 201 : .with_context(|| format!("download index part file at {remote_path:?}"))
250 201 : .map_err(DownloadError::Other)?;
251 :
252 201 : Ok(index_part)
253 204 : }
254 :
255 : /// Helper function to handle retries for a download operation.
256 : ///
257 : /// Remote operations can fail due to rate limits (IAM, S3), spurious network
258 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
259 : /// with backoff.
260 : ///
261 : /// (See similar logic for uploads in `perform_upload_task`)
262 1286 : async fn download_retry<T, O, F>(op: O, description: &str) -> Result<T, DownloadError>
263 1286 : where
264 1286 : O: FnMut() -> F,
265 1286 : F: Future<Output = Result<T, DownloadError>>,
266 1286 : {
267 1286 : backoff::retry(
268 1286 : op,
269 1286 : |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
270 1286 : FAILED_DOWNLOAD_WARN_THRESHOLD,
271 1286 : FAILED_REMOTE_OP_RETRIES,
272 1286 : description,
273 1286 : // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
274 1286 : backoff::Cancel::new(CancellationToken::new(), || -> DownloadError {
275 0 : unreachable!()
276 1286 : }),
277 1286 : )
278 357093 : .await
279 1284 : }
|