Line data Source code
1 : //! Helper functions to download files from remote storage with a RemoteStorage
2 : //!
3 : //! The functions in this module retry failed operations automatically, according
4 : //! to the FAILED_DOWNLOAD_RETRIES constant.
5 :
6 : use std::collections::HashSet;
7 : use std::future::Future;
8 :
9 : use anyhow::{anyhow, Context};
10 : use camino::{Utf8Path, Utf8PathBuf};
11 : use pageserver_api::shard::TenantShardId;
12 : use tokio::fs::{self, File, OpenOptions};
13 : use tokio::io::{AsyncSeekExt, AsyncWriteExt};
14 : use tokio_util::io::StreamReader;
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::warn;
17 : use utils::backoff;
18 :
19 : use crate::config::PageServerConf;
20 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
21 : use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
22 : use crate::tenant::storage_layer::LayerFileName;
23 : use crate::tenant::Generation;
24 : use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
25 : use crate::TEMP_FILE_SUFFIX;
26 : use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
27 : use utils::crashsafe::path_with_suffix_extension;
28 : use utils::id::TimelineId;
29 :
30 : use super::index::{IndexPart, LayerFileMetadata};
31 : use super::{
32 : parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
33 : remote_initdb_preserved_archive_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
34 : INITDB_PATH,
35 : };
36 :
37 : ///
38 : /// If 'metadata' is given, we will validate that the downloaded file's size matches that
39 : /// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
40 : ///
41 : /// Returns the size of the downloaded file.
42 6 : pub async fn download_layer_file<'a>(
43 6 : conf: &'static PageServerConf,
44 6 : storage: &'a GenericRemoteStorage,
45 6 : tenant_shard_id: TenantShardId,
46 6 : timeline_id: TimelineId,
47 6 : layer_file_name: &'a LayerFileName,
48 6 : layer_metadata: &'a LayerFileMetadata,
49 6 : cancel: &CancellationToken,
50 6 : ) -> Result<u64, DownloadError> {
51 6 : debug_assert_current_span_has_tenant_and_timeline_id();
52 6 :
53 6 : let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
54 6 : let local_path = timeline_path.join(layer_file_name.file_name());
55 6 :
56 6 : let remote_path = remote_layer_path(
57 6 : &tenant_shard_id.tenant_id,
58 6 : &timeline_id,
59 6 : layer_metadata.shard,
60 6 : layer_file_name,
61 6 : layer_metadata.generation,
62 6 : );
63 6 :
64 6 : // Perform a rename inspired by durable_rename from file_utils.c.
65 6 : // The sequence:
66 6 : // write(tmp)
67 6 : // fsync(tmp)
68 6 : // rename(tmp, new)
69 6 : // fsync(new)
70 6 : // fsync(parent)
71 6 : // For more context about durable_rename check this email from postgres mailing list:
72 6 : // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
73 6 : // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
74 6 : let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
75 :
76 6 : let bytes_amount = download_retry(
77 72 : || async { download_object(storage, &remote_path, &temp_file_path, cancel).await },
78 6 : &format!("download {remote_path:?}"),
79 6 : cancel,
80 6 : )
81 72 : .await?;
82 :
83 6 : let expected = layer_metadata.file_size();
84 6 : if expected != bytes_amount {
85 0 : return Err(DownloadError::Other(anyhow!(
86 0 : "According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
87 0 : )));
88 6 : }
89 6 :
90 6 : fail::fail_point!("remote-storage-download-pre-rename", |_| {
91 0 : Err(DownloadError::Other(anyhow!(
92 0 : "remote-storage-download-pre-rename failpoint triggered"
93 0 : )))
94 6 : });
95 :
96 6 : fs::rename(&temp_file_path, &local_path)
97 6 : .await
98 6 : .with_context(|| format!("rename download layer file to {local_path}"))
99 6 : .map_err(DownloadError::Other)?;
100 :
101 : // We use fatal_err() below because the after the rename above,
102 : // the in-memory state of the filesystem already has the layer file in its final place,
103 : // and subsequent pageserver code could think it's durable while it really isn't.
104 6 : let work = async move {
105 6 : let timeline_dir = VirtualFile::open(&timeline_path)
106 3 : .await
107 6 : .fatal_err("VirtualFile::open for timeline dir fsync");
108 6 : timeline_dir
109 6 : .sync_all()
110 3 : .await
111 6 : .fatal_err("VirtualFile::sync_all timeline dir");
112 6 : };
113 6 : crate::virtual_file::io_engine::get()
114 6 : .spawn_blocking_and_block_on_if_std(work)
115 9 : .await;
116 :
117 6 : tracing::debug!("download complete: {local_path}");
118 :
119 6 : Ok(bytes_amount)
120 6 : }
121 :
122 : /// Download the object `src_path` in the remote `storage` to local path `dst_path`.
123 : ///
124 : /// If Ok() is returned, the download succeeded and the inode & data have been made durable.
125 : /// (Note that the directory entry for the inode is not made durable.)
126 : /// The file size in bytes is returned.
127 : ///
128 : /// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
129 : /// The unlinking has _not_ been made durable.
130 6 : async fn download_object<'a>(
131 6 : storage: &'a GenericRemoteStorage,
132 6 : src_path: &RemotePath,
133 6 : dst_path: &Utf8PathBuf,
134 6 : cancel: &CancellationToken,
135 6 : ) -> Result<u64, DownloadError> {
136 6 : let res = match crate::virtual_file::io_engine::get() {
137 0 : crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
138 : crate::virtual_file::io_engine::IoEngine::StdFs => {
139 3 : async {
140 3 : let destination_file = tokio::fs::File::create(dst_path)
141 3 : .await
142 3 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
143 3 : .map_err(DownloadError::Other)?;
144 :
145 6 : let download = storage.download(src_path, cancel).await?;
146 :
147 3 : let mut buf_writer =
148 3 : tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
149 3 :
150 3 : let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
151 :
152 24 : let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
153 3 : buf_writer.flush().await?;
154 :
155 3 : let mut destination_file = buf_writer.into_inner();
156 3 :
157 3 : // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
158 3 : // A file will not be closed immediately when it goes out of scope if there are any IO operations
159 3 : // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
160 3 : // you should call flush before dropping it.
161 3 : //
162 3 : // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
163 3 : // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
164 3 : // But for additional safety lets check/wait for any pending operations.
165 3 : destination_file
166 3 : .flush()
167 0 : .await
168 3 : .with_context(|| format!("flush source file at {dst_path}"))
169 3 : .map_err(DownloadError::Other)?;
170 :
171 : // not using sync_data because it can lose file size update
172 3 : destination_file
173 3 : .sync_all()
174 3 : .await
175 3 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
176 3 : .map_err(DownloadError::Other)?;
177 :
178 3 : Ok(bytes_amount)
179 3 : }
180 36 : .await
181 : }
182 : #[cfg(target_os = "linux")]
183 : crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
184 : use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
185 3 : async {
186 3 : let destination_file = VirtualFile::create(dst_path)
187 3 : .await
188 3 : .with_context(|| format!("create a destination file for layer '{dst_path}'"))
189 3 : .map_err(DownloadError::Other)?;
190 :
191 6 : let mut download = storage.download(src_path, cancel).await?;
192 :
193 : // TODO: use vectored write (writev) once supported by tokio-epoll-uring.
194 : // There's chunks_vectored() on the stream.
195 3 : let (bytes_amount, destination_file) = async {
196 3 : let size_tracking = size_tracking_writer::Writer::new(destination_file);
197 3 : let mut buffered = owned_buffers_io::write::BufferedWriter::<
198 3 : { super::BUFFER_SIZE },
199 3 : _,
200 3 : >::new(size_tracking);
201 18 : while let Some(res) =
202 21 : futures::StreamExt::next(&mut download.download_stream).await
203 : {
204 18 : let chunk = match res {
205 18 : Ok(chunk) => chunk,
206 0 : Err(e) => return Err(e),
207 : };
208 18 : buffered
209 18 : .write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk))
210 0 : .await?;
211 : }
212 3 : let size_tracking = buffered.flush_and_into_inner().await?;
213 3 : Ok(size_tracking.into_inner())
214 3 : }
215 24 : .await?;
216 :
217 : // not using sync_data because it can lose file size update
218 3 : destination_file
219 3 : .sync_all()
220 3 : .await
221 3 : .with_context(|| format!("failed to fsync source file at {dst_path}"))
222 3 : .map_err(DownloadError::Other)?;
223 :
224 3 : Ok(bytes_amount)
225 3 : }
226 36 : .await
227 : }
228 : };
229 :
230 : // in case the download failed, clean up
231 6 : match res {
232 6 : Ok(bytes_amount) => Ok(bytes_amount),
233 0 : Err(e) => {
234 0 : if let Err(e) = tokio::fs::remove_file(dst_path).await {
235 0 : if e.kind() != std::io::ErrorKind::NotFound {
236 0 : on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
237 0 : }
238 0 : }
239 0 : Err(e)
240 : }
241 : }
242 6 : }
243 :
244 : const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
245 :
246 0 : pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
247 0 : let extension = path.extension();
248 0 : match extension {
249 0 : Some(TEMP_DOWNLOAD_EXTENSION) => true,
250 0 : Some(_) => false,
251 0 : None => false,
252 : }
253 0 : }
254 :
255 : /// List timelines of given tenant in remote storage
256 108 : pub async fn list_remote_timelines(
257 108 : storage: &GenericRemoteStorage,
258 108 : tenant_shard_id: TenantShardId,
259 108 : cancel: CancellationToken,
260 108 : ) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
261 108 : let remote_path = remote_timelines_path(&tenant_shard_id);
262 108 :
263 108 : fail::fail_point!("storage-sync-list-remote-timelines", |_| {
264 0 : anyhow::bail!("storage-sync-list-remote-timelines");
265 108 : });
266 :
267 108 : let listing = download_retry_forever(
268 108 : || {
269 108 : storage.list(
270 108 : Some(&remote_path),
271 108 : ListingMode::WithDelimiter,
272 108 : None,
273 108 : &cancel,
274 108 : )
275 108 : },
276 108 : &format!("list timelines for {tenant_shard_id}"),
277 108 : &cancel,
278 108 : )
279 10 : .await?;
280 :
281 108 : let mut timeline_ids = HashSet::new();
282 108 : let mut other_prefixes = HashSet::new();
283 :
284 114 : for timeline_remote_storage_key in listing.prefixes {
285 6 : let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
286 0 : anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
287 6 : })?;
288 :
289 6 : match object_name.parse::<TimelineId>() {
290 6 : Ok(t) => timeline_ids.insert(t),
291 0 : Err(_) => other_prefixes.insert(object_name.to_string()),
292 : };
293 : }
294 :
295 108 : for key in listing.keys {
296 0 : let object_name = key
297 0 : .object_name()
298 0 : .ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
299 0 : other_prefixes.insert(object_name.to_string());
300 : }
301 :
302 108 : Ok((timeline_ids, other_prefixes))
303 108 : }
304 :
305 34 : async fn do_download_index_part(
306 34 : storage: &GenericRemoteStorage,
307 34 : tenant_shard_id: &TenantShardId,
308 34 : timeline_id: &TimelineId,
309 34 : index_generation: Generation,
310 34 : cancel: &CancellationToken,
311 34 : ) -> Result<IndexPart, DownloadError> {
312 34 : let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
313 :
314 34 : let index_part_bytes = download_retry_forever(
315 34 : || async {
316 54 : let download = storage.download(&remote_path, cancel).await?;
317 :
318 20 : let mut bytes = Vec::new();
319 20 :
320 20 : let stream = download.download_stream;
321 20 : let mut stream = StreamReader::new(stream);
322 20 :
323 40 : tokio::io::copy_buf(&mut stream, &mut bytes).await?;
324 :
325 20 : Ok(bytes)
326 68 : },
327 34 : &format!("download {remote_path:?}"),
328 34 : cancel,
329 34 : )
330 94 : .await?;
331 :
332 20 : let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
333 20 : .with_context(|| format!("deserialize index part file at {remote_path:?}"))
334 20 : .map_err(DownloadError::Other)?;
335 :
336 20 : Ok(index_part)
337 34 : }
338 :
339 : /// index_part.json objects are suffixed with a generation number, so we cannot
340 : /// directly GET the latest index part without doing some probing.
341 : ///
342 : /// In this function we probe for the most recent index in a generation <= our current generation.
343 : /// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
344 40 : #[tracing::instrument(skip_all, fields(generation=?my_generation))]
345 : pub(super) async fn download_index_part(
346 : storage: &GenericRemoteStorage,
347 : tenant_shard_id: &TenantShardId,
348 : timeline_id: &TimelineId,
349 : my_generation: Generation,
350 : cancel: &CancellationToken,
351 : ) -> Result<IndexPart, DownloadError> {
352 : debug_assert_current_span_has_tenant_and_timeline_id();
353 :
354 : if my_generation.is_none() {
355 : // Operating without generations: just fetch the generation-less path
356 : return do_download_index_part(
357 : storage,
358 : tenant_shard_id,
359 : timeline_id,
360 : my_generation,
361 : cancel,
362 : )
363 : .await;
364 : }
365 :
366 : // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
367 : // index in our generation.
368 : //
369 : // This is an optimization to avoid doing the listing for the general case below.
370 : let res =
371 : do_download_index_part(storage, tenant_shard_id, timeline_id, my_generation, cancel).await;
372 : match res {
373 : Ok(index_part) => {
374 0 : tracing::debug!(
375 0 : "Found index_part from current generation (this is a stale attachment)"
376 0 : );
377 : return Ok(index_part);
378 : }
379 : Err(DownloadError::NotFound) => {}
380 : Err(e) => return Err(e),
381 : };
382 :
383 : // Typical case: the previous generation of this tenant was running healthily, and had uploaded
384 : // and index part. We may safely start from this index without doing a listing, because:
385 : // - We checked for current generation case above
386 : // - generations > my_generation are to be ignored
387 : // - any other indices that exist would have an older generation than `previous_gen`, and
388 : // we want to find the most recent index from a previous generation.
389 : //
390 : // This is an optimization to avoid doing the listing for the general case below.
391 : let res = do_download_index_part(
392 : storage,
393 : tenant_shard_id,
394 : timeline_id,
395 : my_generation.previous(),
396 : cancel,
397 : )
398 : .await;
399 : match res {
400 : Ok(index_part) => {
401 0 : tracing::debug!("Found index_part from previous generation");
402 : return Ok(index_part);
403 : }
404 : Err(DownloadError::NotFound) => {
405 0 : tracing::debug!(
406 0 : "No index_part found from previous generation, falling back to listing"
407 0 : );
408 : }
409 : Err(e) => {
410 : return Err(e);
411 : }
412 : }
413 :
414 : // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
415 : // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
416 : // to constructing a full index path with no generation, because the generation is a suffix.
417 : let index_prefix = remote_index_path(tenant_shard_id, timeline_id, Generation::none());
418 :
419 : let indices = download_retry(
420 12 : || async { storage.list_files(Some(&index_prefix), None, cancel).await },
421 : "list index_part files",
422 : cancel,
423 : )
424 : .await?;
425 :
426 : // General case logic for which index to use: the latest index whose generation
427 : // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
428 : let max_previous_generation = indices
429 : .into_iter()
430 : .filter_map(parse_remote_index_path)
431 12 : .filter(|g| g <= &my_generation)
432 : .max();
433 :
434 : match max_previous_generation {
435 : Some(g) => {
436 0 : tracing::debug!("Found index_part in generation {g:?}");
437 : do_download_index_part(storage, tenant_shard_id, timeline_id, g, cancel).await
438 : }
439 : None => {
440 : // Migration from legacy pre-generation state: we have a generation but no prior
441 : // attached pageservers did. Try to load from a no-generation path.
442 0 : tracing::debug!("No index_part.json* found");
443 : do_download_index_part(
444 : storage,
445 : tenant_shard_id,
446 : timeline_id,
447 : Generation::none(),
448 : cancel,
449 : )
450 : .await
451 : }
452 : }
453 : }
454 :
455 2 : pub(crate) async fn download_initdb_tar_zst(
456 2 : conf: &'static PageServerConf,
457 2 : storage: &GenericRemoteStorage,
458 2 : tenant_shard_id: &TenantShardId,
459 2 : timeline_id: &TimelineId,
460 2 : cancel: &CancellationToken,
461 2 : ) -> Result<(Utf8PathBuf, File), DownloadError> {
462 2 : debug_assert_current_span_has_tenant_and_timeline_id();
463 2 :
464 2 : let remote_path = remote_initdb_archive_path(&tenant_shard_id.tenant_id, timeline_id);
465 2 :
466 2 : let remote_preserved_path =
467 2 : remote_initdb_preserved_archive_path(&tenant_shard_id.tenant_id, timeline_id);
468 2 :
469 2 : let timeline_path = conf.timelines_path(tenant_shard_id);
470 2 :
471 2 : if !timeline_path.exists() {
472 0 : tokio::fs::create_dir_all(&timeline_path)
473 0 : .await
474 0 : .with_context(|| format!("timeline dir creation {timeline_path}"))
475 0 : .map_err(DownloadError::Other)?;
476 2 : }
477 2 : let temp_path = timeline_path.join(format!(
478 2 : "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}"
479 2 : ));
480 :
481 2 : let file = download_retry(
482 2 : || async {
483 2 : let file = OpenOptions::new()
484 2 : .create(true)
485 2 : .truncate(true)
486 2 : .read(true)
487 2 : .write(true)
488 2 : .open(&temp_path)
489 2 : .await
490 2 : .with_context(|| format!("tempfile creation {temp_path}"))
491 2 : .map_err(DownloadError::Other)?;
492 :
493 4 : let download = match storage.download(&remote_path, cancel).await {
494 2 : Ok(dl) => dl,
495 : Err(DownloadError::NotFound) => {
496 0 : storage.download(&remote_preserved_path, cancel).await?
497 : }
498 0 : Err(other) => Err(other)?,
499 : };
500 2 : let mut download = tokio_util::io::StreamReader::new(download.download_stream);
501 2 : let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
502 2 :
503 730 : tokio::io::copy_buf(&mut download, &mut writer).await?;
504 :
505 2 : let mut file = writer.into_inner();
506 2 :
507 2 : file.seek(std::io::SeekFrom::Start(0))
508 2 : .await
509 2 : .with_context(|| format!("rewinding initdb.tar.zst at: {remote_path:?}"))
510 2 : .map_err(DownloadError::Other)?;
511 :
512 2 : Ok(file)
513 4 : },
514 2 : &format!("download {remote_path}"),
515 2 : cancel,
516 2 : )
517 738 : .await
518 2 : .map_err(|e| {
519 : // Do a best-effort attempt at deleting the temporary file upon encountering an error.
520 : // We don't have async here nor do we want to pile on any extra errors.
521 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
522 0 : if e.kind() != std::io::ErrorKind::NotFound {
523 0 : warn!("error deleting temporary file {temp_path}: {e}");
524 0 : }
525 0 : }
526 0 : e
527 2 : })?;
528 :
529 2 : Ok((temp_path, file))
530 2 : }
531 :
532 : /// Helper function to handle retries for a download operation.
533 : ///
534 : /// Remote operations can fail due to rate limits (S3), spurious network
535 : /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
536 : /// with backoff.
537 : ///
538 : /// (See similar logic for uploads in `perform_upload_task`)
539 14 : pub(super) async fn download_retry<T, O, F>(
540 14 : op: O,
541 14 : description: &str,
542 14 : cancel: &CancellationToken,
543 14 : ) -> Result<T, DownloadError>
544 14 : where
545 14 : O: FnMut() -> F,
546 14 : F: Future<Output = Result<T, DownloadError>>,
547 14 : {
548 14 : backoff::retry(
549 14 : op,
550 14 : DownloadError::is_permanent,
551 14 : FAILED_DOWNLOAD_WARN_THRESHOLD,
552 14 : FAILED_REMOTE_OP_RETRIES,
553 14 : description,
554 14 : cancel,
555 14 : )
556 822 : .await
557 14 : .ok_or_else(|| DownloadError::Cancelled)
558 14 : .and_then(|x| x)
559 14 : }
560 :
561 142 : async fn download_retry_forever<T, O, F>(
562 142 : op: O,
563 142 : description: &str,
564 142 : cancel: &CancellationToken,
565 142 : ) -> Result<T, DownloadError>
566 142 : where
567 142 : O: FnMut() -> F,
568 142 : F: Future<Output = Result<T, DownloadError>>,
569 142 : {
570 142 : backoff::retry(
571 142 : op,
572 142 : DownloadError::is_permanent,
573 142 : FAILED_DOWNLOAD_WARN_THRESHOLD,
574 142 : u32::MAX,
575 142 : description,
576 142 : cancel,
577 142 : )
578 104 : .await
579 142 : .ok_or_else(|| DownloadError::Cancelled)
580 142 : .and_then(|x| x)
581 142 : }
|