Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 :
9 : use anyhow::{anyhow, Context};
10 : use camino::{Utf8Path, Utf8PathBuf};
11 : use pageserver_api::shard::TenantShardId;
12 : use tokio::fs::{self, File, OpenOptions};
13 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
14 : use tokio_util::io::StreamReader;
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::warn;
17 : use utils::{backoff, crashsafe};
18 :
19 : use crate::config::PageServerConf;
20 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
21 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
22 : use crate::tenant::storage_layer::LayerFileName;
23 : use crate::tenant::Generation;
24 : use crate::virtual_file::on_fatal_io_error;
25 : use crate::TEMP_FILE_SUFFIX;
26 : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
27 : use utils::crashsafe::path_with_suffix_extension;
28 : use utils::id::TimelineId;
29 :
30 : use super::index::{IndexPart, LayerFileMetadata};
31 : use super::{
32 : parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
33 : remote_initdb_preserved_archive_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
34 : INITDB_PATH,
35 : };
36 :
37 : ///
38 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
39 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
40 : ///
41 : /// Returns the size of the downloaded file.
42 0 : pub async fn download_layer_file<'a>(
43 0 : conf: &'static PageServerConf,
44 0 : storage: &'a GenericRemoteStorage,
45 0 : tenant_shard_id: TenantShardId,
46 0 : timeline_id: TimelineId,
47 0 : layer_file_name: &'a LayerFileName,
48 0 : layer_metadata: &'a LayerFileMetadata,
49 0 : cancel: &CancellationToken,
50 0 : ) -> Result<u64, DownloadError> {
51 0 : debug_assert_current_span_has_tenant_and_timeline_id();
52 0 :
53 0 : let local_path = conf
54 0 : .timeline_path(&tenant_shard_id, &timeline_id)
55 0 : .join(layer_file_name.file_name());
56 0 :
57 0 : let remote_path = remote_layer_path(
58 0 : &tenant_shard_id.tenant_id,
59 0 : &timeline_id,
60 0 : layer_metadata.shard,
61 0 : layer_file_name,
62 0 : layer_metadata.generation,
63 0 : );
64 0 :
65 0 : // Perform a rename inspired by durable_rename from file_utils.c.
66 0 : // The sequence:
67 0 : // write(tmp)
68 0 : // fsync(tmp)
69 0 : // rename(tmp, new)
70 0 : // fsync(new)
71 0 : // fsync(parent)
72 0 : // For more context about durable_rename check this email from postgres mailing list:
73 0 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
74 0 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
75 0 : let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
76 :
77 0 : let (mut destination_file, bytes_amount) = download_retry(
78 0 : || async {
79 0 : let destination_file = tokio::fs::File::create(&temp_file_path)
80 0 : .await
81 0 : .with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
82 0 : .map_err(DownloadError::Other)?;
83 :
84 0 : let download = storage.download(&remote_path, cancel).await?;
85 :
86 0 : let mut destination_file =
87 0 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
88 0 :
89 0 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
90 :
91 0 : let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file).await;
92 :
93 0 : match bytes_amount {
94 0 : Ok(bytes_amount) => {
95 0 : let destination_file = destination_file.into_inner();
96 0 : Ok((destination_file, bytes_amount))
97 : }
98 0 : Err(e) => {
99 0 : if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
100 0 : on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
101 0 : }
102 0 :
103 0 : Err(e.into())
104 : }
105 : }
106 0 : },
107 0 : &format!("download {remote_path:?}"),
108 0 : cancel,
109 0 : )
110 0 : .await?;
111 :
112 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
113 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
114 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
115 : // you should call flush before dropping it.
116 : //
117 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
118 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
119 : // But for additional safety lets check/wait for any pending operations.
120 0 : destination_file
121 0 : .flush()
122 0 : .await
123 0 : .with_context(|| format!("flush source file at {temp_file_path}"))
124 0 : .map_err(DownloadError::Other)?;
125 :
126 0 : let expected = layer_metadata.file_size();
127 0 : if expected != bytes_amount {
128 0 : return Err(DownloadError::Other(anyhow!(
129 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
130 0 : )));
131 0 : }
132 0 :
133 0 : // not using sync_data because it can lose file size update
134 0 : destination_file
135 0 : .sync_all()
136 0 : .await
137 0 : .with_context(|| format!("failed to fsync source file at {temp_file_path}"))
138 0 : .map_err(DownloadError::Other)?;
139 0 : drop(destination_file);
140 0 :
141 0 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
142 0 : Err(DownloadError::Other(anyhow!(
143 0 : "remote-storage-download-pre-rename failpoint triggered"
144 0 : )))
145 0 : });
146 :
147 0 : fs::rename(&temp_file_path, &local_path)
148 0 : .await
149 0 : .with_context(|| format!("rename download layer file to {local_path}"))
150 0 : .map_err(DownloadError::Other)?;
151 :
152 0 : crashsafe::fsync_async(&local_path)
153 0 : .await
154 0 : .with_context(|| format!("fsync layer file {local_path}"))
155 0 : .map_err(DownloadError::Other)?;
156 :
157 0 : tracing::debug!("download complete: {local_path}");
158 :
159 0 : Ok(bytes_amount)
160 0 : }
161 :
162 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
163 :
164 0 : pub fn is_temp_download_file(path: &Utf8Path) -> bool {
165 0 : let extension = path.extension();
166 0 : match extension {
167 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
168 0 : Some(_) => false,
169 0 : None => false,
170 : }
171 0 : }
172 :
173 : /// List timelines of given tenant in remote storage
174 88 : pub async fn list_remote_timelines(
175 88 : storage: &GenericRemoteStorage,
176 88 : tenant_shard_id: TenantShardId,
177 88 : cancel: CancellationToken,
178 88 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
179 88 : let remote_path = remote_timelines_path(&tenant_shard_id);
180 88 :
181 88 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
182 0 : anyhow::bail!("storage-sync-list-remote-timelines");
183 88 : });
184 :
185 88 : let listing = download_retry_forever(
186 88 : || {
187 88 : storage.list(
188 88 : Some(&remote_path),
189 88 : ListingMode::WithDelimiter,
190 88 : None,
191 88 : &cancel,
192 88 : )
193 88 : },
194 88 : &format!("list timelines for {tenant_shard_id}"),
195 88 : &cancel,
196 88 : )
197 10 : .await?;
198 :
199 88 : let mut timeline_ids = HashSet::new();
200 88 : let mut other_prefixes = HashSet::new();
201 :
202 94 : for timeline_remote_storage_key in listing.prefixes {
203 6 : let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
204 0 : anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
205 6 : })?;
206 :
207 6 : match object_name.parse::<TimelineId>() {
208 6 : Ok(t) => timeline_ids.insert(t),
209 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
210 : };
211 : }
212 :
213 88 : for key in listing.keys {
214 0 : let object_name = key
215 0 : .object_name()
216 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
217 0 : other_prefixes.insert(object_name.to_string());
218 : }
219 :
220 88 : Ok((timeline_ids, other_prefixes))
221 88 : }
222 :
223 34 : async fn do_download_index_part(
224 34 : storage: &GenericRemoteStorage,
225 34 : tenant_shard_id: &TenantShardId,
226 34 : timeline_id: &TimelineId,
227 34 : index_generation: Generation,
228 34 : cancel: &CancellationToken,
229 34 : ) -> Result<IndexPart, DownloadError> {
230 34 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
231 :
232 34 : let index_part_bytes = download_retry_forever(
233 34 : || async {
234 34 : let download = storage.download(&remote_path, cancel).await?;
235 :
236 20 : let mut bytes = Vec::new();
237 20 :
238 20 : let stream = download.download_stream;
239 20 : let mut stream = StreamReader::new(stream);
240 20 :
241 40 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
242 :
243 20 : Ok(bytes)
244 68 : },
245 34 : &format!("download {remote_path:?}"),
246 34 : cancel,
247 34 : )
248 60 : .await?;
249 :
250 20 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
251 20 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
252 20 : .map_err(DownloadError::Other)?;
253 :
254 20 : Ok(index_part)
255 34 : }
256 :
257 : /// index_part.json objects are suffixed with a generation number, so we cannot
258 : /// directly GET the latest index part without doing some probing.
259 : ///
260 : /// In this function we probe for the most recent index in a generation <= our current generation.
261 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
262 40 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
263 : pub(super) async fn download_index_part(
264 : storage: &GenericRemoteStorage,
265 : tenant_shard_id: &TenantShardId,
266 : timeline_id: &TimelineId,
267 : my_generation: Generation,
268 : cancel: &CancellationToken,
269 : ) -> Result<IndexPart, DownloadError> {
270 : debug_assert_current_span_has_tenant_and_timeline_id();
271 :
272 : if my_generation.is_none() {
273 : // Operating without generations: just fetch the generation-less path
274 : return do_download_index_part(
275 : storage,
276 : tenant_shard_id,
277 : timeline_id,
278 : my_generation,
279 : cancel,
280 : )
281 : .await;
282 : }
283 :
284 : // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
285 : // index in our generation.
286 : //
287 : // This is an optimization to avoid doing the listing for the general case below.
288 : let res =
289 : do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
290 : match res {
291 : Ok(index_part) => {
292 0 : tracing::debug!(
293 0 : "Found index_part from current generation (this is a stale attachment)"
294 0 : );
295 : return Ok(index_part);
296 : }
297 : Err(DownloadError::NotFound) => {}
298 : Err(e) => return Err(e),
299 : };
300 :
301 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded
302 : // and index part. We may safely start from this index without doing a listing, because:
303 : // - We checked for current generation case above
304 : // - generations > my_generation are to be ignored
305 : // - any other indices that exist would have an older generation than `previous_gen`, and
306 : // we want to find the most recent index from a previous generation.
307 : //
308 : // This is an optimization to avoid doing the listing for the general case below.
309 : let res = do_download_index_part(
310 : storage,
311 : tenant_shard_id,
312 : timeline_id,
313 : my_generation.previous(),
314 : cancel,
315 : )
316 : .await;
317 : match res {
318 : Ok(index_part) => {
319 0 : tracing::debug!("Found index_part from previous generation");
320 : return Ok(index_part);
321 : }
322 : Err(DownloadError::NotFound) => {
323 0 : tracing::debug!(
324 0 : "No index_part found from previous generation, falling back to listing"
325 0 : );
326 : }
327 : Err(e) => {
328 : return Err(e);
329 : }
330 : }
331 :
332 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
333 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
334 : // to constructing a full index path with no generation, because the generation is a suffix.
335 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
336 :
337 : let indices = download_retry(
338 12 : || async { storage.list_files(Some(&index_prefix), None, cancel).await },
339 : "list index_part files",
340 : cancel,
341 : )
342 : .await?;
343 :
344 : // General case logic for which index to use: the latest index whose generation
345 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
346 : let max_previous_generation = indices
347 : .into_iter()
348 : .filter_map(parse_remote_index_path)
349 12 : .filter(|g| g <= &my_generation)
350 : .max();
351 :
352 : match max_previous_generation {
353 : Some(g) => {
354 0 : tracing::debug!("Found index_part in generation {g:?}");
355 : do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
356 : }
357 : None => {
358 : // Migration from legacy pre-generation state: we have a generation but no prior
359 : // attached pageservers did. Try to load from a no-generation path.
360 0 : tracing::debug!("No index_part.json* found");
361 : do_download_index_part(
362 : storage,
363 : tenant_shard_id,
364 : timeline_id,
365 : Generation::none(),
366 : cancel,
367 : )
368 : .await
369 : }
370 : }
371 : }
372 :
373 2 : pub(crate) async fn download_initdb_tar_zst(
374 2 : conf: &'static PageServerConf,
375 2 : storage: &GenericRemoteStorage,
376 2 : tenant_shard_id: &TenantShardId,
377 2 : timeline_id: &TimelineId,
378 2 : cancel: &CancellationToken,
379 2 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
380 2 : debug_assert_current_span_has_tenant_and_timeline_id();
381 2 :
382 2 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
383 2 :
384 2 : let remote_preserved_path =
385 2 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
386 2 :
387 2 : let timeline_path = conf.timelines_path(tenant_shard_id);
388 2 :
389 2 : if !timeline_path.exists() {
390 0 : tokio::fs::create_dir_all(&timeline_path)
391 0 : .await
392 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
393 0 : .map_err(DownloadError::Other)?;
394 2 : }
395 2 : let temp_path = timeline_path.join(format!(
396 2 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
397 2 : ));
398 :
399 2 : let file = download_retry(
400 2 : || async {
401 2 : let file = OpenOptions::new()
402 2 : .create(true)
403 2 : .truncate(true)
404 2 : .read(true)
405 2 : .write(true)
406 2 : .open(&temp_path)
407 2 : .await
408 2 : .with_context(|| format!("tempfile creation {temp_path}"))
409 2 : .map_err(DownloadError::Other)?;
410 :
411 2 : let download = match storage.download(&remote_path, cancel).await {
412 2 : Ok(dl) => dl,
413 : Err(DownloadError::NotFound) => {
414 0 : storage.download(&remote_preserved_path, cancel).await?
415 : }
416 0 : Err(other) => Err(other)?,
417 : };
418 2 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
419 2 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
420 2 :
421 708 : tokio::io::copy_buf(&mut download, &mut writer).await?;
422 :
423 2 : let mut file = writer.into_inner();
424 2 :
425 2 : file.seek(std::io::SeekFrom::Start(0))
426 2 : .await
427 2 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
428 2 : .map_err(DownloadError::Other)?;
429 :
430 2 : Ok(file)
431 4 : },
432 2 : &format!("download {remote_path}"),
433 2 : cancel,
434 2 : )
435 714 : .await
436 2 : .map_err(|e| {
437 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
438 : // We don't have async here nor do we want to pile on any extra errors.
439 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
440 0 : if e.kind() != std::io::ErrorKind::NotFound {
441 0 : warn!("error deleting temporary file {temp_path}: {e}");
442 0 : }
443 0 : }
444 0 : e
445 2 : })?;
446 :
447 2 : Ok((temp_path, file))
448 2 : }
449 :
450 : /// Helper function to handle retries for a download operation.
451 : ///
452 : /// Remote operations can fail due to rate limits (S3), spurious network
453 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
454 : /// with backoff.
455 : ///
456 : /// (See similar logic for uploads in `perform_upload_task`)
457 8 : pub(super) async fn download_retry<T, O, F>(
458 8 : op: O,
459 8 : description: &str,
460 8 : cancel: &CancellationToken,
461 8 : ) -> Result<T, DownloadError>
462 8 : where
463 8 : O: FnMut() -> F,
464 8 : F: Future<Output = Result<T, DownloadError>>,
465 8 : {
466 8 : backoff::retry(
467 8 : op,
468 8 : DownloadError::is_permanent,
469 8 : FAILED_DOWNLOAD_WARN_THRESHOLD,
470 8 : FAILED_REMOTE_OP_RETRIES,
471 8 : description,
472 8 : cancel,
473 8 : )
474 726 : .await
475 8 : .ok_or_else(|| DownloadError::Cancelled)
476 8 : .and_then(|x| x)
477 8 : }
478 :
479 122 : async fn download_retry_forever<T, O, F>(
480 122 : op: O,
481 122 : description: &str,
482 122 : cancel: &CancellationToken,
483 122 : ) -> Result<T, DownloadError>
484 122 : where
485 122 : O: FnMut() -> F,
486 122 : F: Future<Output = Result<T, DownloadError>>,
487 122 : {
488 122 : backoff::retry(
489 122 : op,
490 122 : DownloadError::is_permanent,
491 122 : FAILED_DOWNLOAD_WARN_THRESHOLD,
492 122 : u32::MAX,
493 122 : description,
494 122 : cancel,
495 122 : )
496 70 : .await
497 122 : .ok_or_else(|| DownloadError::Cancelled)
498 122 : .and_then(|x| x)
499 122 : }
|