LCOV - code coverage report
Current view: top level - safekeeper/src - pull_timeline.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 0.0 % 290 0
Test Date: 2025-01-07 20:58:07 Functions: 0.0 % 45 0

            Line data    Source code
       1              : use anyhow::{anyhow, bail, Context, Result};
       2              : use bytes::Bytes;
       3              : use camino::Utf8PathBuf;
       4              : use chrono::{DateTime, Utc};
       5              : use futures::{SinkExt, StreamExt, TryStreamExt};
       6              : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
       7              : use safekeeper_api::{models::TimelineStatus, Term};
       8              : use safekeeper_client::mgmt_api;
       9              : use safekeeper_client::mgmt_api::Client;
      10              : use serde::{Deserialize, Serialize};
      11              : use std::{
      12              :     cmp::min,
      13              :     io::{self, ErrorKind},
      14              :     sync::Arc,
      15              : };
      16              : use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
      17              : use tokio_tar::{Archive, Builder, Header};
      18              : use tokio_util::{
      19              :     io::{CopyToBytes, SinkWriter},
      20              :     sync::PollSender,
      21              : };
      22              : use tracing::{error, info, instrument};
      23              : 
      24              : use crate::{
      25              :     control_file::CONTROL_FILE_NAME,
      26              :     debug_dump,
      27              :     state::{EvictionState, TimelinePersistentState},
      28              :     timeline::{Timeline, WalResidentTimeline},
      29              :     timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
      30              :     wal_backup,
      31              :     wal_storage::open_wal_file,
      32              :     GlobalTimelines,
      33              : };
      34              : use utils::{
      35              :     crashsafe::fsync_async_opt,
      36              :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      37              :     logging::SecretString,
      38              :     lsn::Lsn,
      39              :     pausable_failpoint,
      40              : };
      41              : 
      42              : /// Stream tar archive of timeline to tx.
      43            0 : #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
      44              : pub async fn stream_snapshot(
      45              :     tli: Arc<Timeline>,
      46              :     source: NodeId,
      47              :     destination: NodeId,
      48              :     tx: mpsc::Sender<Result<Bytes>>,
      49              : ) {
      50              :     match tli.try_wal_residence_guard().await {
      51              :         Err(e) => {
      52              :             tx.send(Err(anyhow!("Error checking residence: {:#}", e)))
      53              :                 .await
      54              :                 .ok();
      55              :         }
      56              :         Ok(maybe_resident_tli) => {
      57              :             if let Err(e) = match maybe_resident_tli {
      58              :                 Some(resident_tli) => {
      59              :                     stream_snapshot_resident_guts(resident_tli, source, destination, tx.clone())
      60              :                         .await
      61              :                 }
      62              :                 None => stream_snapshot_offloaded_guts(tli, source, destination, tx.clone()).await,
      63              :             } {
      64              :                 // Error type/contents don't matter as they won't can't reach the client
      65              :                 // (hyper likely doesn't do anything with it), but http stream will be
      66              :                 // prematurely terminated. It would be nice to try to send the error in
      67              :                 // trailers though.
      68              :                 tx.send(Err(anyhow!("snapshot failed"))).await.ok();
      69              :                 error!("snapshot failed: {:#}", e);
      70              :             }
      71              :         }
      72              :     }
      73              : }
      74              : 
      75              : /// State needed while streaming the snapshot.
      76              : pub struct SnapshotContext {
      77              :     pub from_segno: XLogSegNo, // including
      78              :     pub upto_segno: XLogSegNo, // including
      79              :     pub term: Term,
      80              :     pub last_log_term: Term,
      81              :     pub flush_lsn: Lsn,
      82              :     pub wal_seg_size: usize,
      83              :     // used to remove WAL hold off in Drop.
      84              :     pub tli: WalResidentTimeline,
      85              : }
      86              : 
      87              : impl Drop for SnapshotContext {
      88            0 :     fn drop(&mut self) {
      89            0 :         let tli = self.tli.clone();
      90            0 :         task::spawn(async move {
      91            0 :             let mut shared_state = tli.write_shared_state().await;
      92            0 :             shared_state.wal_removal_on_hold = false;
      93            0 :         });
      94            0 :     }
      95              : }
      96              : 
      97              : /// Build a tokio_tar stream that sends encoded bytes into a Bytes channel.
      98            0 : fn prepare_tar_stream(
      99            0 :     tx: mpsc::Sender<Result<Bytes>>,
     100            0 : ) -> tokio_tar::Builder<impl AsyncWrite + Unpin + Send> {
     101            0 :     // tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
     102            0 :     // use SinkWriter as a Write impl. That is,
     103            0 :     // - create Sink from the tx. It returns PollSendError if chan is closed.
     104            0 :     let sink = PollSender::new(tx);
     105            0 :     // - SinkWriter needs sink error to be io one, map it.
     106            0 :     let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
     107            0 :     // - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
     108            0 :     //   it with with(). Note that with() accepts async function which we don't
     109            0 :     //   need and allows the map to fail, which we don't need either, but hence
     110            0 :     //   two Oks.
     111            0 :     let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
     112            0 :     // - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
     113            0 :     // into CopyToBytes. This is a data copy.
     114            0 :     let copy_to_bytes = CopyToBytes::new(oksink);
     115            0 :     let writer = SinkWriter::new(copy_to_bytes);
     116            0 :     let pinned_writer = Box::pin(writer);
     117            0 : 
     118            0 :     // Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
     119            0 :     // which is also likely suboptimal.
     120            0 :     Builder::new_non_terminated(pinned_writer)
     121            0 : }
     122              : 
     123              : /// Implementation of snapshot for an offloaded timeline, only reads control file
     124            0 : pub(crate) async fn stream_snapshot_offloaded_guts(
     125            0 :     tli: Arc<Timeline>,
     126            0 :     source: NodeId,
     127            0 :     destination: NodeId,
     128            0 :     tx: mpsc::Sender<Result<Bytes>>,
     129            0 : ) -> Result<()> {
     130            0 :     let mut ar = prepare_tar_stream(tx);
     131            0 : 
     132            0 :     tli.snapshot_offloaded(&mut ar, source, destination).await?;
     133              : 
     134            0 :     ar.finish().await?;
     135              : 
     136            0 :     Ok(())
     137            0 : }
     138              : 
     139              : /// Implementation of snapshot for a timeline which is resident (includes some segment data)
     140            0 : pub async fn stream_snapshot_resident_guts(
     141            0 :     tli: WalResidentTimeline,
     142            0 :     source: NodeId,
     143            0 :     destination: NodeId,
     144            0 :     tx: mpsc::Sender<Result<Bytes>>,
     145            0 : ) -> Result<()> {
     146            0 :     let mut ar = prepare_tar_stream(tx);
     147              : 
     148            0 :     let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
     149            0 :     pausable_failpoint!("sk-snapshot-after-list-pausable");
     150              : 
     151            0 :     let tli_dir = tli.get_timeline_dir();
     152            0 :     info!(
     153            0 :         "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
     154            0 :         bctx.upto_segno - bctx.from_segno + 1,
     155              :         bctx.from_segno,
     156              :         bctx.upto_segno,
     157              :         bctx.term,
     158              :         bctx.last_log_term,
     159              :         bctx.flush_lsn,
     160              :     );
     161            0 :     for segno in bctx.from_segno..=bctx.upto_segno {
     162            0 :         let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?;
     163            0 :         let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
     164            0 :         if is_partial {
     165            0 :             wal_file_name.push_str(".partial");
     166            0 :         }
     167            0 :         ar.append_file(&wal_file_name, &mut sf).await?;
     168              :     }
     169              : 
     170              :     // Do the term check before ar.finish to make archive corrupted in case of
     171              :     // term change. Client shouldn't ignore abrupt stream end, but to be sure.
     172            0 :     tli.finish_snapshot(&bctx).await?;
     173              : 
     174            0 :     ar.finish().await?;
     175              : 
     176            0 :     Ok(())
     177            0 : }
     178              : 
     179              : impl Timeline {
     180              :     /// Simple snapshot for an offloaded timeline: we will only upload a renamed partial segment and
     181              :     /// pass a modified control file into the provided tar stream (nothing with data segments on disk, since
     182              :     /// we are offloaded and there aren't any)
     183            0 :     async fn snapshot_offloaded<W: AsyncWrite + Unpin + Send>(
     184            0 :         self: &Arc<Timeline>,
     185            0 :         ar: &mut tokio_tar::Builder<W>,
     186            0 :         source: NodeId,
     187            0 :         destination: NodeId,
     188            0 :     ) -> Result<()> {
     189              :         // Take initial copy of control file, then release state lock
     190            0 :         let mut control_file = {
     191            0 :             let shared_state = self.write_shared_state().await;
     192              : 
     193            0 :             let control_file = TimelinePersistentState::clone(shared_state.sk.state());
     194              : 
     195              :             // Rare race: we got unevicted between entering function and reading control file.
     196              :             // We error out and let API caller retry.
     197            0 :             if !matches!(control_file.eviction_state, EvictionState::Offloaded(_)) {
     198            0 :                 bail!("Timeline was un-evicted during snapshot, please retry");
     199            0 :             }
     200            0 : 
     201            0 :             control_file
     202              :         };
     203              : 
     204              :         // Modify the partial segment of the in-memory copy for the control file to
     205              :         // point to the destination safekeeper.
     206            0 :         let replace = control_file
     207            0 :             .partial_backup
     208            0 :             .replace_uploaded_segment(source, destination)?;
     209              : 
     210            0 :         let Some(replace) = replace else {
     211              :             // In Manager:: ready_for_eviction, we do not permit eviction unless the timeline
     212              :             // has a partial segment.  It is unexpected that
     213            0 :             anyhow::bail!("Timeline has no partial segment, cannot generate snapshot");
     214              :         };
     215              : 
     216            0 :         tracing::info!("Replacing uploaded partial segment in in-mem control file: {replace:?}");
     217              : 
     218              :         // Optimistically try to copy the partial segment to the destination's path: this
     219              :         // can fail if the timeline was un-evicted and modified in the background.
     220            0 :         let remote_timeline_path = &self.remote_path;
     221            0 :         wal_backup::copy_partial_segment(
     222            0 :             &replace.previous.remote_path(remote_timeline_path),
     223            0 :             &replace.current.remote_path(remote_timeline_path),
     224            0 :         )
     225            0 :         .await?;
     226              : 
     227              :         // Since the S3 copy succeeded with the path given in our control file snapshot, and
     228              :         // we are sending that snapshot in our response, we are giving the caller a consistent
     229              :         // snapshot even if our local Timeline was unevicted or otherwise modified in the meantime.
     230            0 :         let buf = control_file
     231            0 :             .write_to_buf()
     232            0 :             .with_context(|| "failed to serialize control store")?;
     233            0 :         let mut header = Header::new_gnu();
     234            0 :         header.set_size(buf.len().try_into().expect("never breaches u64"));
     235            0 :         ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
     236            0 :             .await
     237            0 :             .with_context(|| "failed to append to archive")?;
     238              : 
     239            0 :         Ok(())
     240            0 :     }
     241              : }
     242              : 
     243              : impl WalResidentTimeline {
     244              :     /// Start streaming tar archive with timeline:
     245              :     /// 1) stream control file under lock;
     246              :     /// 2) hold off WAL removal;
     247              :     /// 3) collect SnapshotContext to understand which WAL segments should be
     248              :     ///    streamed.
     249              :     ///
     250              :     /// Snapshot streams data up to flush_lsn. To make this safe, we must check
     251              :     /// that term doesn't change during the procedure, or we risk sending mix of
     252              :     /// WAL from different histories. Term is remembered in the SnapshotContext
     253              :     /// and checked in finish_snapshot. Note that in the last segment some WAL
     254              :     /// higher than flush_lsn set here might be streamed; that's fine as long as
     255              :     /// terms doesn't change.
     256              :     ///
     257              :     /// Alternatively we could send only up to commit_lsn to get some valid
     258              :     /// state which later will be recovered by compute, in this case term check
     259              :     /// is not needed, but we likely don't want that as there might be no
     260              :     /// compute which could perform the recovery.
     261              :     ///
     262              :     /// When returned SnapshotContext is dropped WAL hold is removed.
     263            0 :     async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
     264            0 :         &self,
     265            0 :         ar: &mut tokio_tar::Builder<W>,
     266            0 :         source: NodeId,
     267            0 :         destination: NodeId,
     268            0 :     ) -> Result<SnapshotContext> {
     269            0 :         let mut shared_state = self.write_shared_state().await;
     270            0 :         let wal_seg_size = shared_state.get_wal_seg_size();
     271            0 : 
     272            0 :         let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
     273              :         // Modify the partial segment of the in-memory copy for the control file to
     274              :         // point to the destination safekeeper.
     275            0 :         let replace = control_store
     276            0 :             .partial_backup
     277            0 :             .replace_uploaded_segment(source, destination)?;
     278              : 
     279            0 :         if let Some(replace) = replace {
     280              :             // The deserialized control file has an uploaded partial. We upload a copy
     281              :             // of it to object storage for the destination safekeeper and send an updated
     282              :             // control file in the snapshot.
     283            0 :             tracing::info!(
     284            0 :                 "Replacing uploaded partial segment in in-mem control file: {replace:?}"
     285              :             );
     286              : 
     287            0 :             let remote_timeline_path = &self.tli.remote_path;
     288            0 :             wal_backup::copy_partial_segment(
     289            0 :                 &replace.previous.remote_path(remote_timeline_path),
     290            0 :                 &replace.current.remote_path(remote_timeline_path),
     291            0 :             )
     292            0 :             .await?;
     293            0 :         }
     294              : 
     295            0 :         let buf = control_store
     296            0 :             .write_to_buf()
     297            0 :             .with_context(|| "failed to serialize control store")?;
     298            0 :         let mut header = Header::new_gnu();
     299            0 :         header.set_size(buf.len().try_into().expect("never breaches u64"));
     300            0 :         ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
     301            0 :             .await
     302            0 :             .with_context(|| "failed to append to archive")?;
     303              : 
     304              :         // We need to stream since the oldest segment someone (s3 or pageserver)
     305              :         // still needs. This duplicates calc_horizon_lsn logic.
     306              :         //
     307              :         // We know that WAL wasn't removed up to this point because it cannot be
     308              :         // removed further than `backup_lsn`. Since we're holding shared_state
     309              :         // lock and setting `wal_removal_on_hold` later, it guarantees that WAL
     310              :         // won't be removed until we're done.
     311            0 :         let from_lsn = min(
     312            0 :             shared_state.sk.state().remote_consistent_lsn,
     313            0 :             shared_state.sk.state().backup_lsn,
     314            0 :         );
     315            0 :         if from_lsn == Lsn::INVALID {
     316              :             // this is possible if snapshot is called before handling first
     317              :             // elected message
     318            0 :             bail!("snapshot is called on uninitialized timeline");
     319            0 :         }
     320            0 :         let from_segno = from_lsn.segment_number(wal_seg_size);
     321            0 :         let term = shared_state.sk.state().acceptor_state.term;
     322            0 :         let last_log_term = shared_state.sk.last_log_term();
     323            0 :         let flush_lsn = shared_state.sk.flush_lsn();
     324            0 :         let upto_segno = flush_lsn.segment_number(wal_seg_size);
     325              :         // have some limit on max number of segments as a sanity check
     326              :         const MAX_ALLOWED_SEGS: u64 = 1000;
     327            0 :         let num_segs = upto_segno - from_segno + 1;
     328            0 :         if num_segs > MAX_ALLOWED_SEGS {
     329            0 :             bail!(
     330            0 :                 "snapshot is called on timeline with {} segments, but the limit is {}",
     331            0 :                 num_segs,
     332            0 :                 MAX_ALLOWED_SEGS
     333            0 :             );
     334            0 :         }
     335            0 : 
     336            0 :         // Prevent WAL removal while we're streaming data.
     337            0 :         //
     338            0 :         // Since this a flag, not a counter just bail out if already set; we
     339            0 :         // shouldn't need concurrent snapshotting.
     340            0 :         if shared_state.wal_removal_on_hold {
     341            0 :             bail!("wal_removal_on_hold is already true");
     342            0 :         }
     343            0 :         shared_state.wal_removal_on_hold = true;
     344            0 : 
     345            0 :         // Drop shared_state to release the lock, before calling wal_residence_guard().
     346            0 :         drop(shared_state);
     347              : 
     348            0 :         let tli_copy = self.wal_residence_guard().await?;
     349            0 :         let bctx = SnapshotContext {
     350            0 :             from_segno,
     351            0 :             upto_segno,
     352            0 :             term,
     353            0 :             last_log_term,
     354            0 :             flush_lsn,
     355            0 :             wal_seg_size,
     356            0 :             tli: tli_copy,
     357            0 :         };
     358            0 : 
     359            0 :         Ok(bctx)
     360            0 :     }
     361              : 
     362              :     /// Finish snapshotting: check that term(s) hasn't changed.
     363              :     ///
     364              :     /// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
     365              :     /// forget this if snapshotting fails mid the way.
     366            0 :     pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
     367            0 :         let shared_state = self.read_shared_state().await;
     368            0 :         let term = shared_state.sk.state().acceptor_state.term;
     369            0 :         let last_log_term = shared_state.sk.last_log_term();
     370            0 :         // There are some cases to relax this check (e.g. last_log_term might
     371            0 :         // change, but as long as older history is strictly part of new that's
     372            0 :         // fine), but there is no need to do it.
     373            0 :         if bctx.term != term || bctx.last_log_term != last_log_term {
     374            0 :             bail!("term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
     375            0 :               bctx.term, bctx.last_log_term, term, last_log_term);
     376            0 :         }
     377            0 :         Ok(())
     378            0 :     }
     379              : }
     380              : 
     381              : /// pull_timeline request body.
     382            0 : #[derive(Debug, Deserialize)]
     383              : pub struct Request {
     384              :     pub tenant_id: TenantId,
     385              :     pub timeline_id: TimelineId,
     386              :     pub http_hosts: Vec<String>,
     387              : }
     388              : 
     389              : #[derive(Debug, Serialize)]
     390              : pub struct Response {
     391              :     // Donor safekeeper host
     392              :     pub safekeeper_host: String,
     393              :     // TODO: add more fields?
     394              : }
     395              : 
     396              : /// Response for debug dump request.
     397            0 : #[derive(Debug, Deserialize)]
     398              : pub struct DebugDumpResponse {
     399              :     pub start_time: DateTime<Utc>,
     400              :     pub finish_time: DateTime<Utc>,
     401              :     pub timelines: Vec<debug_dump::Timeline>,
     402              :     pub timelines_count: usize,
     403              :     pub config: debug_dump::Config,
     404              : }
     405              : 
     406              : /// Find the most advanced safekeeper and pull timeline from it.
     407            0 : pub async fn handle_request(
     408            0 :     request: Request,
     409            0 :     sk_auth_token: Option<SecretString>,
     410            0 :     global_timelines: Arc<GlobalTimelines>,
     411            0 : ) -> Result<Response> {
     412            0 :     let existing_tli = global_timelines.get(TenantTimelineId::new(
     413            0 :         request.tenant_id,
     414            0 :         request.timeline_id,
     415            0 :     ));
     416            0 :     if existing_tli.is_ok() {
     417            0 :         bail!("Timeline {} already exists", request.timeline_id);
     418            0 :     }
     419            0 : 
     420            0 :     let http_hosts = request.http_hosts.clone();
     421              : 
     422              :     // Figure out statuses of potential donors.
     423            0 :     let responses: Vec<Result<TimelineStatus, mgmt_api::Error>> =
     424            0 :         futures::future::join_all(http_hosts.iter().map(|url| async {
     425            0 :             let cclient = Client::new(url.clone(), sk_auth_token.clone());
     426            0 :             let info = cclient
     427            0 :                 .timeline_status(request.tenant_id, request.timeline_id)
     428            0 :                 .await?;
     429            0 :             Ok(info)
     430            0 :         }))
     431            0 :         .await;
     432              : 
     433            0 :     let mut statuses = Vec::new();
     434            0 :     for (i, response) in responses.into_iter().enumerate() {
     435            0 :         let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
     436            0 :         statuses.push((status, i));
     437              :     }
     438              : 
     439              :     // Find the most advanced safekeeper
     440            0 :     let (status, i) = statuses
     441            0 :         .into_iter()
     442            0 :         .max_by_key(|(status, _)| {
     443            0 :             (
     444            0 :                 status.acceptor_state.epoch,
     445            0 :                 status.flush_lsn,
     446            0 :                 status.commit_lsn,
     447            0 :             )
     448            0 :         })
     449            0 :         .unwrap();
     450            0 :     let safekeeper_host = http_hosts[i].clone();
     451            0 : 
     452            0 :     assert!(status.tenant_id == request.tenant_id);
     453            0 :     assert!(status.timeline_id == request.timeline_id);
     454              : 
     455            0 :     pull_timeline(status, safekeeper_host, sk_auth_token, global_timelines).await
     456            0 : }
     457              : 
     458            0 : async fn pull_timeline(
     459            0 :     status: TimelineStatus,
     460            0 :     host: String,
     461            0 :     sk_auth_token: Option<SecretString>,
     462            0 :     global_timelines: Arc<GlobalTimelines>,
     463            0 : ) -> Result<Response> {
     464            0 :     let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
     465            0 :     info!(
     466            0 :         "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
     467              :         ttid,
     468              :         host,
     469              :         status.commit_lsn,
     470              :         status.flush_lsn,
     471              :         status.acceptor_state.term,
     472              :         status.acceptor_state.epoch
     473              :     );
     474              : 
     475            0 :     let conf = &global_timelines.get_global_config();
     476              : 
     477            0 :     let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
     478              : 
     479            0 :     let client = Client::new(host.clone(), sk_auth_token.clone());
     480              :     // Request stream with basebackup archive.
     481            0 :     let bb_resp = client
     482            0 :         .snapshot(status.tenant_id, status.timeline_id, conf.my_id)
     483            0 :         .await?;
     484              : 
     485              :     // Make Stream of Bytes from it...
     486            0 :     let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
     487            0 :     // and turn it into StreamReader implementing AsyncRead.
     488            0 :     let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
     489              : 
     490              :     // Extract it on the fly to the disk. We don't use simple unpack() to fsync
     491              :     // files.
     492            0 :     let mut entries = Archive::new(bb_reader).entries()?;
     493            0 :     while let Some(base_tar_entry) = entries.next().await {
     494            0 :         let mut entry = base_tar_entry?;
     495            0 :         let header = entry.header();
     496            0 :         let file_path = header.path()?.into_owned();
     497            0 :         match header.entry_type() {
     498              :             tokio_tar::EntryType::Regular => {
     499            0 :                 let utf8_file_path =
     500            0 :                     Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
     501            0 :                 let dst_path = tli_dir_path.join(utf8_file_path);
     502            0 :                 let mut f = OpenOptions::new()
     503            0 :                     .create(true)
     504            0 :                     .truncate(true)
     505            0 :                     .write(true)
     506            0 :                     .open(&dst_path)
     507            0 :                     .await?;
     508            0 :                 tokio::io::copy(&mut entry, &mut f).await?;
     509              :                 // fsync the file
     510            0 :                 f.sync_all().await?;
     511              :             }
     512              :             _ => {
     513            0 :                 bail!(
     514            0 :                     "entry {} in backup tar archive is of unexpected type: {:?}",
     515            0 :                     file_path.display(),
     516            0 :                     header.entry_type()
     517            0 :                 );
     518              :             }
     519              :         }
     520              :     }
     521              : 
     522              :     // fsync temp timeline directory to remember its contents.
     523            0 :     fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
     524              : 
     525              :     // Let's create timeline from temp directory and verify that it's correct
     526            0 :     let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
     527            0 :     info!(
     528            0 :         "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
     529              :         ttid, commit_lsn, flush_lsn
     530              :     );
     531            0 :     assert!(status.commit_lsn <= status.flush_lsn);
     532              : 
     533              :     // Finally, load the timeline.
     534            0 :     let _tli = global_timelines
     535            0 :         .load_temp_timeline(ttid, &tli_dir_path, false)
     536            0 :         .await?;
     537              : 
     538            0 :     Ok(Response {
     539            0 :         safekeeper_host: host,
     540            0 :     })
     541            0 : }
        

Generated by: LCOV version 2.1-beta