TLA Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 : use std::time::Duration;
9 :
10 : use anyhow::{anyhow, Context};
11 : use camino::Utf8Path;
12 : use tokio::fs;
13 : use tokio::io::AsyncWriteExt;
14 : use tokio_util::sync::CancellationToken;
15 : use utils::{backoff, crashsafe};
16 :
17 : use crate::config::PageServerConf;
18 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
19 : use crate::tenant::storage_layer::LayerFileName;
20 : use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
21 : use crate::tenant::Generation;
22 : use remote_storage::{DownloadError, GenericRemoteStorage};
23 : use utils::crashsafe::path_with_suffix_extension;
24 : use utils::id::{TenantId, TimelineId};
25 :
26 : use super::index::{IndexPart, LayerFileMetadata};
27 : use super::{
28 : parse_remote_index_path, remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
29 : FAILED_REMOTE_OP_RETRIES,
30 : };
31 :
32 : static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120);
33 :
34 : ///
35 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
36 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
37 : ///
38 : /// Returns the size of the downloaded file.
39 CBC 1379 : pub async fn download_layer_file<'a>(
40 1379 : conf: &'static PageServerConf,
41 1379 : storage: &'a GenericRemoteStorage,
42 1379 : tenant_id: TenantId,
43 1379 : timeline_id: TimelineId,
44 1379 : layer_file_name: &'a LayerFileName,
45 1379 : layer_metadata: &'a LayerFileMetadata,
46 1379 : ) -> Result<u64, DownloadError> {
47 1379 : debug_assert_current_span_has_tenant_and_timeline_id();
48 1379 :
49 1379 : let local_path = conf
50 1379 : .timeline_path(&tenant_id, &timeline_id)
51 1379 : .join(layer_file_name.file_name());
52 1379 :
53 1379 : let remote_path = remote_layer_path(
54 1379 : &tenant_id,
55 1379 : &timeline_id,
56 1379 : layer_file_name,
57 1379 : layer_metadata.generation,
58 1379 : );
59 1379 :
60 1379 : // Perform a rename inspired by durable_rename from file_utils.c.
61 1379 : // The sequence:
62 1379 : // write(tmp)
63 1379 : // fsync(tmp)
64 1379 : // rename(tmp, new)
65 1379 : // fsync(new)
66 1379 : // fsync(parent)
67 1379 : // For more context about durable_rename check this email from postgres mailing list:
68 1379 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
69 1379 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
70 1379 : let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
71 :
72 1379 : let (mut destination_file, bytes_amount) = download_retry(
73 1426 : || async {
74 : // TODO: this doesn't use the cached fd for some reason?
75 1426 : let mut destination_file = fs::File::create(&temp_file_path)
76 1389 : .await
77 1426 : .with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
78 1426 : .map_err(DownloadError::Other)?;
79 1426 : let mut download = storage
80 1426 : .download(&remote_path)
81 2330 : .await
82 1426 : .with_context(|| {
83 47 : format!(
84 47 : "open a download stream for layer with remote storage path '{remote_path:?}'"
85 47 : )
86 1426 : })
87 1426 : .map_err(DownloadError::Other)?;
88 :
89 1379 : let bytes_amount = tokio::time::timeout(
90 1379 : MAX_DOWNLOAD_DURATION,
91 1379 : tokio::io::copy(&mut download.download_stream, &mut destination_file),
92 1379 : )
93 450593 : .await
94 1377 : .map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out {:?}", e)))?
95 1377 : .with_context(|| {
96 UBC 0 : format!(
97 0 : "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
98 0 : )
99 CBC 1377 : })
100 1377 : .map_err(DownloadError::Other)?;
101 :
102 1377 : Ok((destination_file, bytes_amount))
103 1424 : },
104 1379 : &format!("download {remote_path:?}"),
105 1379 : )
106 454312 : .await?;
107 :
108 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
109 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
110 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
111 : // you should call flush before dropping it.
112 : //
113 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
114 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
115 : // But for additional safety lets check/wait for any pending operations.
116 1377 : destination_file
117 1377 : .flush()
118 UBC 0 : .await
119 CBC 1377 : .with_context(|| format!("flush source file at {temp_file_path}"))
120 1377 : .map_err(DownloadError::Other)?;
121 :
122 1377 : let expected = layer_metadata.file_size();
123 1377 : if expected != bytes_amount {
124 UBC 0 : return Err(DownloadError::Other(anyhow!(
125 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
126 0 : )));
127 CBC 1377 : }
128 1377 :
129 1377 : // not using sync_data because it can lose file size update
130 1377 : destination_file
131 1377 : .sync_all()
132 1598 : .await
133 1376 : .with_context(|| format!("failed to fsync source file at {temp_file_path}"))
134 1376 : .map_err(DownloadError::Other)?;
135 1376 : drop(destination_file);
136 1376 :
137 1376 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
138 6 : Err(DownloadError::Other(anyhow!(
139 6 : "remote-storage-download-pre-rename failpoint triggered"
140 6 : )))
141 1376 : });
142 :
143 1370 : fs::rename(&temp_file_path, &local_path)
144 1344 : .await
145 1370 : .with_context(|| format!("rename download layer file to {local_path}"))
146 1370 : .map_err(DownloadError::Other)?;
147 :
148 1370 : crashsafe::fsync_async(&local_path)
149 2709 : .await
150 1370 : .with_context(|| format!("fsync layer file {local_path}"))
151 1370 : .map_err(DownloadError::Other)?;
152 :
153 UBC 0 : tracing::debug!("download complete: {local_path}");
154 :
155 CBC 1370 : Ok(bytes_amount)
156 1376 : }
157 :
158 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
159 :
160 85 : pub fn is_temp_download_file(path: &Utf8Path) -> bool {
161 85 : let extension = path.extension();
162 85 : match extension {
163 8 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
164 8 : Some(_) => false,
165 77 : None => false,
166 : }
167 85 : }
168 :
169 : /// List timelines of given tenant in remote storage
170 48 : pub async fn list_remote_timelines(
171 48 : storage: &GenericRemoteStorage,
172 48 : tenant_id: TenantId,
173 48 : ) -> anyhow::Result<HashSet<TimelineId>> {
174 48 : let remote_path = remote_timelines_path(&tenant_id);
175 48 :
176 48 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
177 6 : anyhow::bail!("storage-sync-list-remote-timelines");
178 48 : });
179 :
180 42 : let timelines = download_retry(
181 51 : || storage.list_prefixes(Some(&remote_path)),
182 42 : &format!("list prefixes for {tenant_id}"),
183 42 : )
184 137 : .await?;
185 :
186 42 : if timelines.is_empty() {
187 UBC 0 : anyhow::bail!("no timelines found on the remote storage")
188 CBC 42 : }
189 42 :
190 42 : let mut timeline_ids = HashSet::new();
191 :
192 100 : for timeline_remote_storage_key in timelines {
193 58 : let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
194 UBC 0 : anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
195 CBC 58 : })?;
196 :
197 58 : let timeline_id: TimelineId = object_name
198 58 : .parse()
199 58 : .with_context(|| format!("parse object name into timeline id '{object_name}'"))?;
200 :
201 : // list_prefixes is assumed to return unique names. Ensure this here.
202 : // NB: it's safer to bail out than warn-log this because the pageserver
203 : // needs to absolutely know about _all_ timelines that exist, so that
204 : // GC knows all the branchpoints. If we skipped over a timeline instead,
205 : // GC could delete a layer that's still needed by that timeline.
206 58 : anyhow::ensure!(
207 58 : !timeline_ids.contains(&timeline_id),
208 UBC 0 : "list_prefixes contains duplicate timeline id {timeline_id}"
209 : );
210 CBC 58 : timeline_ids.insert(timeline_id);
211 : }
212 :
213 42 : Ok(timeline_ids)
214 48 : }
215 :
216 378 : async fn do_download_index_part(
217 378 : storage: &GenericRemoteStorage,
218 378 : tenant_id: &TenantId,
219 378 : timeline_id: &TimelineId,
220 378 : index_generation: Generation,
221 378 : ) -> Result<IndexPart, DownloadError> {
222 378 : let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
223 :
224 378 : let index_part_bytes = download_retry(
225 410 : || async {
226 799 : let mut index_part_download = storage.download(&remote_path).await?;
227 :
228 342 : let mut index_part_bytes = Vec::new();
229 342 : tokio::io::copy(
230 342 : &mut index_part_download.download_stream,
231 342 : &mut index_part_bytes,
232 342 : )
233 550 : .await
234 342 : .with_context(|| format!("download index part at {remote_path:?}"))
235 342 : .map_err(DownloadError::Other)?;
236 342 : Ok(index_part_bytes)
237 410 : },
238 378 : &format!("download {remote_path:?}"),
239 378 : )
240 1349 : .await?;
241 :
242 342 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
243 342 : .with_context(|| format!("download index part file at {remote_path:?}"))
244 342 : .map_err(DownloadError::Other)?;
245 :
246 342 : Ok(index_part)
247 378 : }
248 :
249 : /// index_part.json objects are suffixed with a generation number, so we cannot
250 : /// directly GET the latest index part without doing some probing.
251 : ///
252 : /// In this function we probe for the most recent index in a generation <= our current generation.
253 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
254 1059 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
255 : pub(super) async fn download_index_part(
256 : storage: &GenericRemoteStorage,
257 : tenant_id: &TenantId,
258 : timeline_id: &TimelineId,
259 : my_generation: Generation,
260 : ) -> Result<IndexPart, DownloadError> {
261 : debug_assert_current_span_has_tenant_and_timeline_id();
262 :
263 : if my_generation.is_none() {
264 : // Operating without generations: just fetch the generation-less path
265 : return do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
266 : }
267 :
268 : // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
269 : // index in our generation.
270 : //
271 : // This is an optimization to avoid doing the listing for the general case below.
272 : let res = do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
273 : match res {
274 : Ok(index_part) => {
275 UBC 0 : tracing::debug!(
276 0 : "Found index_part from current generation (this is a stale attachment)"
277 0 : );
278 : return Ok(index_part);
279 : }
280 : Err(DownloadError::NotFound) => {}
281 : Err(e) => return Err(e),
282 : };
283 :
284 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded
285 : // and index part. We may safely start from this index without doing a listing, because:
286 : // - We checked for current generation case above
287 : // - generations > my_generation are to be ignored
288 : // - any other indices that exist would have an older generation than `previous_gen`, and
289 : // we want to find the most recent index from a previous generation.
290 : //
291 : // This is an optimization to avoid doing the listing for the general case below.
292 : let res =
293 : do_download_index_part(storage, tenant_id, timeline_id, my_generation.previous()).await;
294 : match res {
295 : Ok(index_part) => {
296 0 : tracing::debug!("Found index_part from previous generation");
297 : return Ok(index_part);
298 : }
299 : Err(DownloadError::NotFound) => {
300 0 : tracing::debug!(
301 0 : "No index_part found from previous generation, falling back to listing"
302 0 : );
303 : }
304 : Err(e) => {
305 : return Err(e);
306 : }
307 : }
308 :
309 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
310 : // objects, and select the highest one with a generation <= my_generation.
311 : let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none());
312 : let indices = backoff::retry(
313 CBC 23 : || async { storage.list_files(Some(&index_prefix)).await },
314 3 : |_| false,
315 : FAILED_DOWNLOAD_WARN_THRESHOLD,
316 : FAILED_REMOTE_OP_RETRIES,
317 : "listing index_part files",
318 : // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
319 UBC 0 : backoff::Cancel::new(CancellationToken::new(), || -> anyhow::Error {
320 0 : unreachable!()
321 0 : }),
322 : )
323 : .await
324 : .map_err(DownloadError::Other)?;
325 :
326 : // General case logic for which index to use: the latest index whose generation
327 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
328 : let max_previous_generation = indices
329 : .into_iter()
330 : .filter_map(parse_remote_index_path)
331 CBC 11 : .filter(|g| g <= &my_generation)
332 : .max();
333 :
334 : match max_previous_generation {
335 : Some(g) => {
336 UBC 0 : tracing::debug!("Found index_part in generation {g:?}");
337 : do_download_index_part(storage, tenant_id, timeline_id, g).await
338 : }
339 : None => {
340 : // Migration from legacy pre-generation state: we have a generation but no prior
341 : // attached pageservers did. Try to load from a no-generation path.
342 CBC 2 : tracing::info!("No index_part.json* found");
343 : do_download_index_part(storage, tenant_id, timeline_id, Generation::none()).await
344 : }
345 : }
346 : }
347 :
348 : /// Helper function to handle retries for a download operation.
349 : ///
350 : /// Remote operations can fail due to rate limits (IAM, S3), spurious network
351 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
352 : /// with backoff.
353 : ///
354 : /// (See similar logic for uploads in `perform_upload_task`)
355 1799 : async fn download_retry<T, O, F>(op: O, description: &str) -> Result<T, DownloadError>
356 1799 : where
357 1799 : O: FnMut() -> F,
358 1799 : F: Future<Output = Result<T, DownloadError>>,
359 1799 : {
360 1799 : backoff::retry(
361 1799 : op,
362 1799 : |e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
363 1799 : FAILED_DOWNLOAD_WARN_THRESHOLD,
364 1799 : FAILED_REMOTE_OP_RETRIES,
365 1799 : description,
366 1799 : // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
367 1799 : backoff::Cancel::new(CancellationToken::new(), || -> DownloadError {
368 UBC 0 : unreachable!()
369 CBC 1799 : }),
370 1799 : )
371 455798 : .await
372 1797 : }
|