Line data Source code
1 : use std::cmp::min;
2 : use std::io::{self, ErrorKind};
3 : use std::sync::Arc;
4 :
5 : use anyhow::{Context, Result, anyhow, bail};
6 : use bytes::Bytes;
7 : use camino::Utf8PathBuf;
8 : use chrono::{DateTime, Utc};
9 : use futures::{SinkExt, StreamExt, TryStreamExt};
10 : use http_utils::error::ApiError;
11 : use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
12 : use remote_storage::GenericRemoteStorage;
13 : use reqwest::Certificate;
14 : use safekeeper_api::Term;
15 : use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
16 : use safekeeper_client::mgmt_api;
17 : use safekeeper_client::mgmt_api::Client;
18 : use serde::Deserialize;
19 : use tokio::fs::OpenOptions;
20 : use tokio::io::AsyncWrite;
21 : use tokio::sync::mpsc;
22 : use tokio::task;
23 : use tokio_tar::{Archive, Builder, Header};
24 : use tokio_util::io::{CopyToBytes, SinkWriter};
25 : use tokio_util::sync::PollSender;
26 : use tracing::{error, info, instrument};
27 : use utils::crashsafe::fsync_async_opt;
28 : use utils::id::{NodeId, TenantTimelineId};
29 : use utils::logging::SecretString;
30 : use utils::lsn::Lsn;
31 : use utils::pausable_failpoint;
32 :
33 : use crate::control_file::CONTROL_FILE_NAME;
34 : use crate::state::{EvictionState, TimelinePersistentState};
35 : use crate::timeline::{Timeline, TimelineError, WalResidentTimeline};
36 : use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline};
37 : use crate::wal_storage::open_wal_file;
38 : use crate::{GlobalTimelines, debug_dump, wal_backup};
39 :
40 : /// Stream tar archive of timeline to tx.
41 : #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
42 : pub async fn stream_snapshot(
43 : tli: Arc<Timeline>,
44 : source: NodeId,
45 : destination: NodeId,
46 : tx: mpsc::Sender<Result<Bytes>>,
47 : storage: Option<Arc<GenericRemoteStorage>>,
48 : ) {
49 : match tli.try_wal_residence_guard().await {
50 : Err(e) => {
51 : tx.send(Err(anyhow!("Error checking residence: {:#}", e)))
52 : .await
53 : .ok();
54 : }
55 : Ok(maybe_resident_tli) => {
56 : if let Err(e) = match maybe_resident_tli {
57 : Some(resident_tli) => {
58 : stream_snapshot_resident_guts(
59 : resident_tli,
60 : source,
61 : destination,
62 : tx.clone(),
63 : storage,
64 : )
65 : .await
66 : }
67 : None => {
68 : if let Some(storage) = storage {
69 : stream_snapshot_offloaded_guts(
70 : tli,
71 : source,
72 : destination,
73 : tx.clone(),
74 : &storage,
75 : )
76 : .await
77 : } else {
78 : tx.send(Err(anyhow!("remote storage not configured")))
79 : .await
80 : .ok();
81 : return;
82 : }
83 : }
84 : } {
85 : // Error type/contents don't matter as they won't can't reach the client
86 : // (hyper likely doesn't do anything with it), but http stream will be
87 : // prematurely terminated. It would be nice to try to send the error in
88 : // trailers though.
89 : tx.send(Err(anyhow!("snapshot failed"))).await.ok();
90 : error!("snapshot failed: {:#}", e);
91 : }
92 : }
93 : }
94 : }
95 :
96 : /// State needed while streaming the snapshot.
97 : pub struct SnapshotContext {
98 : pub from_segno: XLogSegNo, // including
99 : pub upto_segno: XLogSegNo, // including
100 : pub term: Term,
101 : pub last_log_term: Term,
102 : pub flush_lsn: Lsn,
103 : pub wal_seg_size: usize,
104 : // used to remove WAL hold off in Drop.
105 : pub tli: WalResidentTimeline,
106 : }
107 :
108 : impl Drop for SnapshotContext {
109 0 : fn drop(&mut self) {
110 0 : let tli = self.tli.clone();
111 0 : task::spawn(async move {
112 0 : let mut shared_state = tli.write_shared_state().await;
113 0 : shared_state.wal_removal_on_hold = false;
114 0 : });
115 0 : }
116 : }
117 :
118 : /// Build a tokio_tar stream that sends encoded bytes into a Bytes channel.
119 0 : fn prepare_tar_stream(
120 0 : tx: mpsc::Sender<Result<Bytes>>,
121 0 : ) -> tokio_tar::Builder<impl AsyncWrite + Unpin + Send> {
122 0 : // tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
123 0 : // use SinkWriter as a Write impl. That is,
124 0 : // - create Sink from the tx. It returns PollSendError if chan is closed.
125 0 : let sink = PollSender::new(tx);
126 0 : // - SinkWriter needs sink error to be io one, map it.
127 0 : let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
128 0 : // - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
129 0 : // it with with(). Note that with() accepts async function which we don't
130 0 : // need and allows the map to fail, which we don't need either, but hence
131 0 : // two Oks.
132 0 : let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
133 0 : // - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
134 0 : // into CopyToBytes. This is a data copy.
135 0 : let copy_to_bytes = CopyToBytes::new(oksink);
136 0 : let writer = SinkWriter::new(copy_to_bytes);
137 0 : let pinned_writer = Box::pin(writer);
138 0 :
139 0 : // Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
140 0 : // which is also likely suboptimal.
141 0 : Builder::new_non_terminated(pinned_writer)
142 0 : }
143 :
144 : /// Implementation of snapshot for an offloaded timeline, only reads control file
145 0 : pub(crate) async fn stream_snapshot_offloaded_guts(
146 0 : tli: Arc<Timeline>,
147 0 : source: NodeId,
148 0 : destination: NodeId,
149 0 : tx: mpsc::Sender<Result<Bytes>>,
150 0 : storage: &GenericRemoteStorage,
151 0 : ) -> Result<()> {
152 0 : let mut ar = prepare_tar_stream(tx);
153 0 :
154 0 : tli.snapshot_offloaded(&mut ar, source, destination, storage)
155 0 : .await?;
156 :
157 0 : ar.finish().await?;
158 :
159 0 : Ok(())
160 0 : }
161 :
162 : /// Implementation of snapshot for a timeline which is resident (includes some segment data)
163 0 : pub async fn stream_snapshot_resident_guts(
164 0 : tli: WalResidentTimeline,
165 0 : source: NodeId,
166 0 : destination: NodeId,
167 0 : tx: mpsc::Sender<Result<Bytes>>,
168 0 : storage: Option<Arc<GenericRemoteStorage>>,
169 0 : ) -> Result<()> {
170 0 : let mut ar = prepare_tar_stream(tx);
171 :
172 0 : let bctx = tli
173 0 : .start_snapshot(&mut ar, source, destination, storage)
174 0 : .await?;
175 0 : pausable_failpoint!("sk-snapshot-after-list-pausable");
176 :
177 0 : let tli_dir = tli.get_timeline_dir();
178 0 : info!(
179 0 : "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
180 0 : bctx.upto_segno - bctx.from_segno + 1,
181 : bctx.from_segno,
182 : bctx.upto_segno,
183 : bctx.term,
184 : bctx.last_log_term,
185 : bctx.flush_lsn,
186 : );
187 0 : for segno in bctx.from_segno..=bctx.upto_segno {
188 0 : let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?;
189 0 : let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
190 0 : if is_partial {
191 0 : wal_file_name.push_str(".partial");
192 0 : }
193 0 : ar.append_file(&wal_file_name, &mut sf).await?;
194 : }
195 :
196 : // Do the term check before ar.finish to make archive corrupted in case of
197 : // term change. Client shouldn't ignore abrupt stream end, but to be sure.
198 0 : tli.finish_snapshot(&bctx).await?;
199 :
200 0 : ar.finish().await?;
201 :
202 0 : Ok(())
203 0 : }
204 :
205 : impl Timeline {
206 : /// Simple snapshot for an offloaded timeline: we will only upload a renamed partial segment and
207 : /// pass a modified control file into the provided tar stream (nothing with data segments on disk, since
208 : /// we are offloaded and there aren't any)
209 0 : async fn snapshot_offloaded<W: AsyncWrite + Unpin + Send>(
210 0 : self: &Arc<Timeline>,
211 0 : ar: &mut tokio_tar::Builder<W>,
212 0 : source: NodeId,
213 0 : destination: NodeId,
214 0 : storage: &GenericRemoteStorage,
215 0 : ) -> Result<()> {
216 : // Take initial copy of control file, then release state lock
217 0 : let mut control_file = {
218 0 : let shared_state = self.write_shared_state().await;
219 :
220 0 : let control_file = TimelinePersistentState::clone(shared_state.sk.state());
221 :
222 : // Rare race: we got unevicted between entering function and reading control file.
223 : // We error out and let API caller retry.
224 0 : if !matches!(control_file.eviction_state, EvictionState::Offloaded(_)) {
225 0 : bail!("Timeline was un-evicted during snapshot, please retry");
226 0 : }
227 0 :
228 0 : control_file
229 : };
230 :
231 : // Modify the partial segment of the in-memory copy for the control file to
232 : // point to the destination safekeeper.
233 0 : let replace = control_file
234 0 : .partial_backup
235 0 : .replace_uploaded_segment(source, destination)?;
236 :
237 0 : let Some(replace) = replace else {
238 : // In Manager:: ready_for_eviction, we do not permit eviction unless the timeline
239 : // has a partial segment. It is unexpected that
240 0 : anyhow::bail!("Timeline has no partial segment, cannot generate snapshot");
241 : };
242 :
243 0 : tracing::info!("Replacing uploaded partial segment in in-mem control file: {replace:?}");
244 :
245 : // Optimistically try to copy the partial segment to the destination's path: this
246 : // can fail if the timeline was un-evicted and modified in the background.
247 0 : let remote_timeline_path = &self.remote_path;
248 0 : wal_backup::copy_partial_segment(
249 0 : storage,
250 0 : &replace.previous.remote_path(remote_timeline_path),
251 0 : &replace.current.remote_path(remote_timeline_path),
252 0 : )
253 0 : .await?;
254 :
255 : // Since the S3 copy succeeded with the path given in our control file snapshot, and
256 : // we are sending that snapshot in our response, we are giving the caller a consistent
257 : // snapshot even if our local Timeline was unevicted or otherwise modified in the meantime.
258 0 : let buf = control_file
259 0 : .write_to_buf()
260 0 : .with_context(|| "failed to serialize control store")?;
261 0 : let mut header = Header::new_gnu();
262 0 : header.set_size(buf.len().try_into().expect("never breaches u64"));
263 0 : ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
264 0 : .await
265 0 : .with_context(|| "failed to append to archive")?;
266 :
267 0 : Ok(())
268 0 : }
269 : }
270 :
271 : impl WalResidentTimeline {
272 : /// Start streaming tar archive with timeline:
273 : /// 1) stream control file under lock;
274 : /// 2) hold off WAL removal;
275 : /// 3) collect SnapshotContext to understand which WAL segments should be
276 : /// streamed.
277 : ///
278 : /// Snapshot streams data up to flush_lsn. To make this safe, we must check
279 : /// that term doesn't change during the procedure, or we risk sending mix of
280 : /// WAL from different histories. Term is remembered in the SnapshotContext
281 : /// and checked in finish_snapshot. Note that in the last segment some WAL
282 : /// higher than flush_lsn set here might be streamed; that's fine as long as
283 : /// terms doesn't change.
284 : ///
285 : /// Alternatively we could send only up to commit_lsn to get some valid
286 : /// state which later will be recovered by compute, in this case term check
287 : /// is not needed, but we likely don't want that as there might be no
288 : /// compute which could perform the recovery.
289 : ///
290 : /// When returned SnapshotContext is dropped WAL hold is removed.
291 0 : async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
292 0 : &self,
293 0 : ar: &mut tokio_tar::Builder<W>,
294 0 : source: NodeId,
295 0 : destination: NodeId,
296 0 : storage: Option<Arc<GenericRemoteStorage>>,
297 0 : ) -> Result<SnapshotContext> {
298 0 : let mut shared_state = self.write_shared_state().await;
299 0 : let wal_seg_size = shared_state.get_wal_seg_size();
300 0 :
301 0 : let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
302 : // Modify the partial segment of the in-memory copy for the control file to
303 : // point to the destination safekeeper.
304 0 : let replace = control_store
305 0 : .partial_backup
306 0 : .replace_uploaded_segment(source, destination)?;
307 :
308 0 : if let Some(replace) = replace {
309 : // The deserialized control file has an uploaded partial. We upload a copy
310 : // of it to object storage for the destination safekeeper and send an updated
311 : // control file in the snapshot.
312 0 : tracing::info!(
313 0 : "Replacing uploaded partial segment in in-mem control file: {replace:?}"
314 : );
315 :
316 0 : let remote_timeline_path = &self.tli.remote_path;
317 0 : wal_backup::copy_partial_segment(
318 0 : &*storage.context("remote storage not configured")?,
319 0 : &replace.previous.remote_path(remote_timeline_path),
320 0 : &replace.current.remote_path(remote_timeline_path),
321 0 : )
322 0 : .await?;
323 0 : }
324 :
325 0 : let buf = control_store
326 0 : .write_to_buf()
327 0 : .with_context(|| "failed to serialize control store")?;
328 0 : let mut header = Header::new_gnu();
329 0 : header.set_size(buf.len().try_into().expect("never breaches u64"));
330 0 : ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
331 0 : .await
332 0 : .with_context(|| "failed to append to archive")?;
333 :
334 : // We need to stream since the oldest segment someone (s3 or pageserver)
335 : // still needs. This duplicates calc_horizon_lsn logic.
336 : //
337 : // We know that WAL wasn't removed up to this point because it cannot be
338 : // removed further than `backup_lsn`. Since we're holding shared_state
339 : // lock and setting `wal_removal_on_hold` later, it guarantees that WAL
340 : // won't be removed until we're done.
341 0 : let from_lsn = min(
342 0 : shared_state.sk.state().remote_consistent_lsn,
343 0 : shared_state.sk.state().backup_lsn,
344 0 : );
345 0 : if from_lsn == Lsn::INVALID {
346 : // this is possible if snapshot is called before handling first
347 : // elected message
348 0 : bail!("snapshot is called on uninitialized timeline");
349 0 : }
350 0 : let from_segno = from_lsn.segment_number(wal_seg_size);
351 0 : let term = shared_state.sk.state().acceptor_state.term;
352 0 : let last_log_term = shared_state.sk.last_log_term();
353 0 : let flush_lsn = shared_state.sk.flush_lsn();
354 0 : let upto_segno = flush_lsn.segment_number(wal_seg_size);
355 : // have some limit on max number of segments as a sanity check
356 : const MAX_ALLOWED_SEGS: u64 = 1000;
357 0 : let num_segs = upto_segno - from_segno + 1;
358 0 : if num_segs > MAX_ALLOWED_SEGS {
359 0 : bail!(
360 0 : "snapshot is called on timeline with {} segments, but the limit is {}",
361 0 : num_segs,
362 0 : MAX_ALLOWED_SEGS
363 0 : );
364 0 : }
365 0 :
366 0 : // Prevent WAL removal while we're streaming data.
367 0 : //
368 0 : // Since this a flag, not a counter just bail out if already set; we
369 0 : // shouldn't need concurrent snapshotting.
370 0 : if shared_state.wal_removal_on_hold {
371 0 : bail!("wal_removal_on_hold is already true");
372 0 : }
373 0 : shared_state.wal_removal_on_hold = true;
374 0 :
375 0 : // Drop shared_state to release the lock, before calling wal_residence_guard().
376 0 : drop(shared_state);
377 :
378 0 : let tli_copy = self.wal_residence_guard().await?;
379 0 : let bctx = SnapshotContext {
380 0 : from_segno,
381 0 : upto_segno,
382 0 : term,
383 0 : last_log_term,
384 0 : flush_lsn,
385 0 : wal_seg_size,
386 0 : tli: tli_copy,
387 0 : };
388 0 :
389 0 : Ok(bctx)
390 0 : }
391 :
392 : /// Finish snapshotting: check that term(s) hasn't changed.
393 : ///
394 : /// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
395 : /// forget this if snapshotting fails mid the way.
396 0 : pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
397 0 : let shared_state = self.read_shared_state().await;
398 0 : let term = shared_state.sk.state().acceptor_state.term;
399 0 : let last_log_term = shared_state.sk.last_log_term();
400 0 : // There are some cases to relax this check (e.g. last_log_term might
401 0 : // change, but as long as older history is strictly part of new that's
402 0 : // fine), but there is no need to do it.
403 0 : if bctx.term != term || bctx.last_log_term != last_log_term {
404 0 : bail!(
405 0 : "term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
406 0 : bctx.term,
407 0 : bctx.last_log_term,
408 0 : term,
409 0 : last_log_term
410 0 : );
411 0 : }
412 0 : Ok(())
413 0 : }
414 : }
415 :
416 : /// Response for debug dump request.
417 0 : #[derive(Debug, Deserialize)]
418 : pub struct DebugDumpResponse {
419 : pub start_time: DateTime<Utc>,
420 : pub finish_time: DateTime<Utc>,
421 : pub timelines: Vec<debug_dump::Timeline>,
422 : pub timelines_count: usize,
423 : pub config: debug_dump::Config,
424 : }
425 :
426 : /// Find the most advanced safekeeper and pull timeline from it.
427 0 : pub async fn handle_request(
428 0 : request: PullTimelineRequest,
429 0 : sk_auth_token: Option<SecretString>,
430 0 : ssl_ca_certs: Vec<Certificate>,
431 0 : global_timelines: Arc<GlobalTimelines>,
432 0 : ) -> Result<PullTimelineResponse, ApiError> {
433 0 : let existing_tli = global_timelines.get(TenantTimelineId::new(
434 0 : request.tenant_id,
435 0 : request.timeline_id,
436 0 : ));
437 0 : if existing_tli.is_ok() {
438 0 : info!("Timeline {} already exists", request.timeline_id);
439 0 : return Ok(PullTimelineResponse {
440 0 : safekeeper_host: None,
441 0 : });
442 0 : }
443 0 :
444 0 : let mut http_client = reqwest::Client::builder();
445 0 : for ssl_ca_cert in ssl_ca_certs {
446 0 : http_client = http_client.add_root_certificate(ssl_ca_cert);
447 0 : }
448 0 : let http_client = http_client
449 0 : .build()
450 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
451 :
452 0 : let http_hosts = request.http_hosts.clone();
453 :
454 : // Figure out statuses of potential donors.
455 0 : let responses: Vec<Result<TimelineStatus, mgmt_api::Error>> =
456 0 : futures::future::join_all(http_hosts.iter().map(|url| async {
457 0 : let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
458 0 : let info = cclient
459 0 : .timeline_status(request.tenant_id, request.timeline_id)
460 0 : .await?;
461 0 : Ok(info)
462 0 : }))
463 0 : .await;
464 :
465 0 : let mut statuses = Vec::new();
466 0 : for (i, response) in responses.into_iter().enumerate() {
467 0 : match response {
468 0 : Ok(status) => {
469 0 : statuses.push((status, i));
470 0 : }
471 0 : Err(e) => {
472 0 : info!("error fetching status from {}: {e}", http_hosts[i]);
473 : }
474 : }
475 : }
476 :
477 : // Allow missing responses from up to one safekeeper (say due to downtime)
478 : // e.g. if we created a timeline on PS A and B, with C being offline. Then B goes
479 : // offline and C comes online. Then we want a pull on C with A and B as hosts to work.
480 0 : let min_required_successful = (http_hosts.len() - 1).max(1);
481 0 : if statuses.len() < min_required_successful {
482 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
483 0 : "only got {} successful status responses. required: {min_required_successful}",
484 0 : statuses.len()
485 0 : )));
486 0 : }
487 0 :
488 0 : // Find the most advanced safekeeper
489 0 : let (status, i) = statuses
490 0 : .into_iter()
491 0 : .max_by_key(|(status, _)| {
492 0 : (
493 0 : status.acceptor_state.epoch,
494 0 : status.flush_lsn,
495 0 : status.commit_lsn,
496 0 : )
497 0 : })
498 0 : .unwrap();
499 0 : let safekeeper_host = http_hosts[i].clone();
500 0 :
501 0 : assert!(status.tenant_id == request.tenant_id);
502 0 : assert!(status.timeline_id == request.timeline_id);
503 :
504 0 : let check_tombstone = !request.ignore_tombstone.unwrap_or_default();
505 0 :
506 0 : match pull_timeline(
507 0 : status,
508 0 : safekeeper_host,
509 0 : sk_auth_token,
510 0 : http_client,
511 0 : global_timelines,
512 0 : check_tombstone,
513 0 : )
514 0 : .await
515 : {
516 0 : Ok(resp) => Ok(resp),
517 0 : Err(e) => {
518 0 : match e.downcast_ref::<TimelineError>() {
519 0 : Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
520 0 : safekeeper_host: None,
521 0 : }),
522 : Some(TimelineError::CreationInProgress(_)) => {
523 : // We don't return success here because creation might still fail.
524 0 : Err(ApiError::Conflict("Creation in progress".to_owned()))
525 : }
526 0 : _ => Err(ApiError::InternalServerError(e)),
527 : }
528 : }
529 : }
530 0 : }
531 :
532 0 : async fn pull_timeline(
533 0 : status: TimelineStatus,
534 0 : host: String,
535 0 : sk_auth_token: Option<SecretString>,
536 0 : http_client: reqwest::Client,
537 0 : global_timelines: Arc<GlobalTimelines>,
538 0 : check_tombstone: bool,
539 0 : ) -> Result<PullTimelineResponse> {
540 0 : let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
541 0 : info!(
542 0 : "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
543 : ttid,
544 : host,
545 : status.commit_lsn,
546 : status.flush_lsn,
547 : status.acceptor_state.term,
548 : status.acceptor_state.epoch
549 : );
550 :
551 0 : let conf = &global_timelines.get_global_config();
552 :
553 0 : let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
554 0 : let client = Client::new(http_client, host.clone(), sk_auth_token.clone());
555 : // Request stream with basebackup archive.
556 0 : let bb_resp = client
557 0 : .snapshot(status.tenant_id, status.timeline_id, conf.my_id)
558 0 : .await?;
559 :
560 : // Make Stream of Bytes from it...
561 0 : let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
562 0 : // and turn it into StreamReader implementing AsyncRead.
563 0 : let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
564 :
565 : // Extract it on the fly to the disk. We don't use simple unpack() to fsync
566 : // files.
567 0 : let mut entries = Archive::new(bb_reader).entries()?;
568 0 : while let Some(base_tar_entry) = entries.next().await {
569 0 : let mut entry = base_tar_entry?;
570 0 : let header = entry.header();
571 0 : let file_path = header.path()?.into_owned();
572 0 : match header.entry_type() {
573 : tokio_tar::EntryType::Regular => {
574 0 : let utf8_file_path =
575 0 : Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
576 0 : let dst_path = tli_dir_path.join(utf8_file_path);
577 0 : let mut f = OpenOptions::new()
578 0 : .create(true)
579 0 : .truncate(true)
580 0 : .write(true)
581 0 : .open(&dst_path)
582 0 : .await?;
583 0 : tokio::io::copy(&mut entry, &mut f).await?;
584 : // fsync the file
585 0 : f.sync_all().await?;
586 : }
587 : _ => {
588 0 : bail!(
589 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
590 0 : file_path.display(),
591 0 : header.entry_type()
592 0 : );
593 : }
594 : }
595 : }
596 :
597 : // fsync temp timeline directory to remember its contents.
598 0 : fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
599 :
600 : // Let's create timeline from temp directory and verify that it's correct
601 0 : let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
602 0 : info!(
603 0 : "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
604 : ttid, commit_lsn, flush_lsn
605 : );
606 0 : assert!(status.commit_lsn <= status.flush_lsn);
607 :
608 : // Finally, load the timeline.
609 0 : let _tli = global_timelines
610 0 : .load_temp_timeline(ttid, &tli_dir_path, check_tombstone)
611 0 : .await?;
612 :
613 0 : Ok(PullTimelineResponse {
614 0 : safekeeper_host: Some(host),
615 0 : })
616 0 : }
|