LCOV - code coverage report
Current view: top level - safekeeper/src - pull_timeline.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 366 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 27 0

            Line data    Source code
       1              : use std::cmp::min;
       2              : use std::io::{self, ErrorKind};
       3              : use std::ops::RangeInclusive;
       4              : use std::sync::Arc;
       5              : 
       6              : use anyhow::{Context, Result, anyhow, bail};
       7              : use bytes::Bytes;
       8              : use camino::Utf8PathBuf;
       9              : use chrono::{DateTime, Utc};
      10              : use futures::{SinkExt, StreamExt, TryStreamExt};
      11              : use http::StatusCode;
      12              : use http_utils::error::ApiError;
      13              : use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
      14              : use remote_storage::GenericRemoteStorage;
      15              : use reqwest::Certificate;
      16              : use safekeeper_api::Term;
      17              : use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
      18              : use safekeeper_client::mgmt_api;
      19              : use safekeeper_client::mgmt_api::Client;
      20              : use serde::Deserialize;
      21              : use tokio::fs::OpenOptions;
      22              : use tokio::io::AsyncWrite;
      23              : use tokio::sync::mpsc;
      24              : use tokio::task;
      25              : use tokio::time::sleep;
      26              : use tokio_tar::{Archive, Builder, Header};
      27              : use tokio_util::io::{CopyToBytes, SinkWriter};
      28              : use tokio_util::sync::PollSender;
      29              : use tracing::{error, info, instrument, warn};
      30              : use utils::crashsafe::fsync_async_opt;
      31              : use utils::id::{NodeId, TenantTimelineId};
      32              : use utils::logging::SecretString;
      33              : use utils::lsn::Lsn;
      34              : use utils::pausable_failpoint;
      35              : 
      36              : use crate::control_file::CONTROL_FILE_NAME;
      37              : use crate::state::{EvictionState, TimelinePersistentState};
      38              : use crate::timeline::{Timeline, TimelineError, WalResidentTimeline};
      39              : use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline};
      40              : use crate::wal_storage::{open_wal_file, wal_file_paths};
      41              : use crate::{GlobalTimelines, debug_dump, wal_backup};
      42              : 
      43              : /// Stream tar archive of timeline to tx.
      44              : #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
      45              : pub async fn stream_snapshot(
      46              :     tli: Arc<Timeline>,
      47              :     source: NodeId,
      48              :     destination: NodeId,
      49              :     tx: mpsc::Sender<Result<Bytes>>,
      50              :     storage: Option<Arc<GenericRemoteStorage>>,
      51              : ) {
      52              :     match tli.try_wal_residence_guard().await {
      53              :         Err(e) => {
      54              :             tx.send(Err(anyhow!("Error checking residence: {:#}", e)))
      55              :                 .await
      56              :                 .ok();
      57              :         }
      58              :         Ok(maybe_resident_tli) => {
      59              :             if let Err(e) = match maybe_resident_tli {
      60              :                 Some(resident_tli) => {
      61              :                     stream_snapshot_resident_guts(
      62              :                         resident_tli,
      63              :                         source,
      64              :                         destination,
      65              :                         tx.clone(),
      66              :                         storage,
      67              :                     )
      68              :                     .await
      69              :                 }
      70              :                 None => {
      71              :                     if let Some(storage) = storage {
      72              :                         stream_snapshot_offloaded_guts(
      73              :                             tli,
      74              :                             source,
      75              :                             destination,
      76              :                             tx.clone(),
      77              :                             &storage,
      78              :                         )
      79              :                         .await
      80              :                     } else {
      81              :                         tx.send(Err(anyhow!("remote storage not configured")))
      82              :                             .await
      83              :                             .ok();
      84              :                         return;
      85              :                     }
      86              :                 }
      87              :             } {
      88              :                 // Error type/contents don't matter as they won't can't reach the client
      89              :                 // (hyper likely doesn't do anything with it), but http stream will be
      90              :                 // prematurely terminated. It would be nice to try to send the error in
      91              :                 // trailers though.
      92              :                 tx.send(Err(anyhow!("snapshot failed"))).await.ok();
      93              :                 error!("snapshot failed: {:#}", e);
      94              :             }
      95              :         }
      96              :     }
      97              : }
      98              : 
      99              : /// State needed while streaming the snapshot.
     100              : pub struct SnapshotContext {
     101              :     /// The interval of segment numbers. If None, the timeline hasn't had writes yet, so only send the control file
     102              :     pub from_to_segno: Option<RangeInclusive<XLogSegNo>>,
     103              :     pub term: Term,
     104              :     pub last_log_term: Term,
     105              :     pub flush_lsn: Lsn,
     106              :     pub wal_seg_size: usize,
     107              :     // used to remove WAL hold off in Drop.
     108              :     pub tli: WalResidentTimeline,
     109              : }
     110              : 
     111              : impl Drop for SnapshotContext {
     112            0 :     fn drop(&mut self) {
     113            0 :         let tli = self.tli.clone();
     114            0 :         task::spawn(async move {
     115            0 :             let mut shared_state = tli.write_shared_state().await;
     116            0 :             shared_state.wal_removal_on_hold = false;
     117            0 :         });
     118            0 :     }
     119              : }
     120              : 
     121              : /// Build a tokio_tar stream that sends encoded bytes into a Bytes channel.
     122            0 : fn prepare_tar_stream(
     123            0 :     tx: mpsc::Sender<Result<Bytes>>,
     124            0 : ) -> tokio_tar::Builder<impl AsyncWrite + Unpin + Send> {
     125              :     // tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
     126              :     // use SinkWriter as a Write impl. That is,
     127              :     // - create Sink from the tx. It returns PollSendError if chan is closed.
     128            0 :     let sink = PollSender::new(tx);
     129              :     // - SinkWriter needs sink error to be io one, map it.
     130            0 :     let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
     131              :     // - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
     132              :     //   it with with(). Note that with() accepts async function which we don't
     133              :     //   need and allows the map to fail, which we don't need either, but hence
     134              :     //   two Oks.
     135            0 :     let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
     136              :     // - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
     137              :     // into CopyToBytes. This is a data copy.
     138            0 :     let copy_to_bytes = CopyToBytes::new(oksink);
     139            0 :     let writer = SinkWriter::new(copy_to_bytes);
     140            0 :     let pinned_writer = Box::pin(writer);
     141              : 
     142              :     // Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
     143              :     // which is also likely suboptimal.
     144            0 :     Builder::new_non_terminated(pinned_writer)
     145            0 : }
     146              : 
     147              : /// Implementation of snapshot for an offloaded timeline, only reads control file
     148            0 : pub(crate) async fn stream_snapshot_offloaded_guts(
     149            0 :     tli: Arc<Timeline>,
     150            0 :     source: NodeId,
     151            0 :     destination: NodeId,
     152            0 :     tx: mpsc::Sender<Result<Bytes>>,
     153            0 :     storage: &GenericRemoteStorage,
     154            0 : ) -> Result<()> {
     155            0 :     let mut ar = prepare_tar_stream(tx);
     156              : 
     157            0 :     tli.snapshot_offloaded(&mut ar, source, destination, storage)
     158            0 :         .await?;
     159              : 
     160            0 :     ar.finish().await?;
     161              : 
     162            0 :     Ok(())
     163            0 : }
     164              : 
     165              : /// Implementation of snapshot for a timeline which is resident (includes some segment data)
     166            0 : pub async fn stream_snapshot_resident_guts(
     167            0 :     tli: WalResidentTimeline,
     168            0 :     source: NodeId,
     169            0 :     destination: NodeId,
     170            0 :     tx: mpsc::Sender<Result<Bytes>>,
     171            0 :     storage: Option<Arc<GenericRemoteStorage>>,
     172            0 : ) -> Result<()> {
     173            0 :     let mut ar = prepare_tar_stream(tx);
     174              : 
     175            0 :     let bctx = tli
     176            0 :         .start_snapshot(&mut ar, source, destination, storage)
     177            0 :         .await?;
     178            0 :     pausable_failpoint!("sk-snapshot-after-list-pausable");
     179              : 
     180            0 :     if let Some(from_to_segno) = &bctx.from_to_segno {
     181            0 :         let tli_dir = tli.get_timeline_dir();
     182            0 :         info!(
     183            0 :             "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
     184            0 :             from_to_segno.end() - from_to_segno.start() + 1,
     185            0 :             from_to_segno.start(),
     186            0 :             from_to_segno.end(),
     187              :             bctx.term,
     188              :             bctx.last_log_term,
     189              :             bctx.flush_lsn,
     190              :         );
     191            0 :         for segno in from_to_segno.clone() {
     192            0 :             let Some((mut sf, is_partial)) =
     193            0 :                 open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?
     194              :             else {
     195              :                 // File is not found
     196            0 :                 let (wal_file_path, _wal_file_partial_path) =
     197            0 :                     wal_file_paths(&tli_dir, segno, bctx.wal_seg_size);
     198            0 :                 tracing::warn!("couldn't find WAL segment file {wal_file_path}");
     199            0 :                 bail!("couldn't find WAL segment file {wal_file_path}")
     200              :             };
     201            0 :             let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
     202            0 :             if is_partial {
     203            0 :                 wal_file_name.push_str(".partial");
     204            0 :             }
     205            0 :             ar.append_file(&wal_file_name, &mut sf).await?;
     206              :         }
     207              :     } else {
     208            0 :         info!("Not including any segments into the snapshot");
     209              :     }
     210              : 
     211              :     // Do the term check before ar.finish to make archive corrupted in case of
     212              :     // term change. Client shouldn't ignore abrupt stream end, but to be sure.
     213            0 :     tli.finish_snapshot(&bctx).await?;
     214              : 
     215            0 :     ar.finish().await?;
     216              : 
     217            0 :     Ok(())
     218            0 : }
     219              : 
     220              : impl Timeline {
     221              :     /// Simple snapshot for an offloaded timeline: we will only upload a renamed partial segment and
     222              :     /// pass a modified control file into the provided tar stream (nothing with data segments on disk, since
     223              :     /// we are offloaded and there aren't any)
     224            0 :     async fn snapshot_offloaded<W: AsyncWrite + Unpin + Send>(
     225            0 :         self: &Arc<Timeline>,
     226            0 :         ar: &mut tokio_tar::Builder<W>,
     227            0 :         source: NodeId,
     228            0 :         destination: NodeId,
     229            0 :         storage: &GenericRemoteStorage,
     230            0 :     ) -> Result<()> {
     231              :         // Take initial copy of control file, then release state lock
     232            0 :         let mut control_file = {
     233            0 :             let shared_state = self.write_shared_state().await;
     234              : 
     235            0 :             let control_file = TimelinePersistentState::clone(shared_state.sk.state());
     236              : 
     237              :             // Rare race: we got unevicted between entering function and reading control file.
     238              :             // We error out and let API caller retry.
     239            0 :             if !matches!(control_file.eviction_state, EvictionState::Offloaded(_)) {
     240            0 :                 bail!("Timeline was un-evicted during snapshot, please retry");
     241            0 :             }
     242              : 
     243            0 :             control_file
     244              :         };
     245              : 
     246              :         // Modify the partial segment of the in-memory copy for the control file to
     247              :         // point to the destination safekeeper.
     248            0 :         let replace = control_file
     249            0 :             .partial_backup
     250            0 :             .replace_uploaded_segment(source, destination)?;
     251              : 
     252            0 :         let Some(replace) = replace else {
     253              :             // In Manager:: ready_for_eviction, we do not permit eviction unless the timeline
     254              :             // has a partial segment.  It is unexpected that
     255            0 :             anyhow::bail!("Timeline has no partial segment, cannot generate snapshot");
     256              :         };
     257              : 
     258            0 :         tracing::info!("Replacing uploaded partial segment in in-mem control file: {replace:?}");
     259              : 
     260              :         // Optimistically try to copy the partial segment to the destination's path: this
     261              :         // can fail if the timeline was un-evicted and modified in the background.
     262            0 :         let remote_timeline_path = &self.remote_path;
     263            0 :         wal_backup::copy_partial_segment(
     264            0 :             storage,
     265            0 :             &replace.previous.remote_path(remote_timeline_path),
     266            0 :             &replace.current.remote_path(remote_timeline_path),
     267            0 :         )
     268            0 :         .await?;
     269              : 
     270              :         // Since the S3 copy succeeded with the path given in our control file snapshot, and
     271              :         // we are sending that snapshot in our response, we are giving the caller a consistent
     272              :         // snapshot even if our local Timeline was unevicted or otherwise modified in the meantime.
     273            0 :         let buf = control_file
     274            0 :             .write_to_buf()
     275            0 :             .with_context(|| "failed to serialize control store")?;
     276            0 :         let mut header = Header::new_gnu();
     277            0 :         header.set_size(buf.len().try_into().expect("never breaches u64"));
     278            0 :         ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
     279            0 :             .await
     280            0 :             .with_context(|| "failed to append to archive")?;
     281              : 
     282            0 :         Ok(())
     283            0 :     }
     284              : }
     285              : 
     286              : impl WalResidentTimeline {
     287              :     /// Start streaming tar archive with timeline:
     288              :     /// 1) stream control file under lock;
     289              :     /// 2) hold off WAL removal;
     290              :     /// 3) collect SnapshotContext to understand which WAL segments should be
     291              :     ///    streamed.
     292              :     ///
     293              :     /// Snapshot streams data up to flush_lsn. To make this safe, we must check
     294              :     /// that term doesn't change during the procedure, or we risk sending mix of
     295              :     /// WAL from different histories. Term is remembered in the SnapshotContext
     296              :     /// and checked in finish_snapshot. Note that in the last segment some WAL
     297              :     /// higher than flush_lsn set here might be streamed; that's fine as long as
     298              :     /// terms doesn't change.
     299              :     ///
     300              :     /// Alternatively we could send only up to commit_lsn to get some valid
     301              :     /// state which later will be recovered by compute, in this case term check
     302              :     /// is not needed, but we likely don't want that as there might be no
     303              :     /// compute which could perform the recovery.
     304              :     ///
     305              :     /// When returned SnapshotContext is dropped WAL hold is removed.
     306            0 :     async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
     307            0 :         &self,
     308            0 :         ar: &mut tokio_tar::Builder<W>,
     309            0 :         source: NodeId,
     310            0 :         destination: NodeId,
     311            0 :         storage: Option<Arc<GenericRemoteStorage>>,
     312            0 :     ) -> Result<SnapshotContext> {
     313            0 :         let mut shared_state = self.write_shared_state().await;
     314            0 :         let wal_seg_size = shared_state.get_wal_seg_size();
     315              : 
     316            0 :         let mut control_store = TimelinePersistentState::clone(shared_state.sk.state());
     317              :         // Modify the partial segment of the in-memory copy for the control file to
     318              :         // point to the destination safekeeper.
     319            0 :         let replace = control_store
     320            0 :             .partial_backup
     321            0 :             .replace_uploaded_segment(source, destination)?;
     322              : 
     323            0 :         if let Some(replace) = replace {
     324              :             // The deserialized control file has an uploaded partial. We upload a copy
     325              :             // of it to object storage for the destination safekeeper and send an updated
     326              :             // control file in the snapshot.
     327            0 :             tracing::info!(
     328            0 :                 "Replacing uploaded partial segment in in-mem control file: {replace:?}"
     329              :             );
     330              : 
     331            0 :             let remote_timeline_path = &self.tli.remote_path;
     332            0 :             wal_backup::copy_partial_segment(
     333            0 :                 &*storage.context("remote storage not configured")?,
     334            0 :                 &replace.previous.remote_path(remote_timeline_path),
     335            0 :                 &replace.current.remote_path(remote_timeline_path),
     336              :             )
     337            0 :             .await?;
     338            0 :         }
     339              : 
     340            0 :         let buf = control_store
     341            0 :             .write_to_buf()
     342            0 :             .with_context(|| "failed to serialize control store")?;
     343            0 :         let mut header = Header::new_gnu();
     344            0 :         header.set_size(buf.len().try_into().expect("never breaches u64"));
     345            0 :         ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
     346            0 :             .await
     347            0 :             .with_context(|| "failed to append to archive")?;
     348              : 
     349              :         // We need to stream since the oldest segment someone (s3 or pageserver)
     350              :         // still needs. This duplicates calc_horizon_lsn logic.
     351              :         //
     352              :         // We know that WAL wasn't removed up to this point because it cannot be
     353              :         // removed further than `backup_lsn`. Since we're holding shared_state
     354              :         // lock and setting `wal_removal_on_hold` later, it guarantees that WAL
     355              :         // won't be removed until we're done.
     356            0 :         let timeline_state = shared_state.sk.state();
     357            0 :         let from_lsn = min(
     358            0 :             timeline_state.remote_consistent_lsn,
     359            0 :             timeline_state.backup_lsn,
     360              :         );
     361            0 :         let flush_lsn = shared_state.sk.flush_lsn();
     362            0 :         let (send_segments, msg) = if from_lsn == Lsn::INVALID {
     363            0 :             (false, "snapshot is called on uninitialized timeline")
     364              :         } else {
     365            0 :             (true, "timeline is initialized")
     366              :         };
     367            0 :         tracing::info!(
     368            0 :             remote_consistent_lsn=%timeline_state.remote_consistent_lsn,
     369            0 :             backup_lsn=%timeline_state.backup_lsn,
     370              :             %flush_lsn,
     371            0 :             "{msg}"
     372              :         );
     373            0 :         let from_segno = from_lsn.segment_number(wal_seg_size);
     374            0 :         let term = shared_state.sk.state().acceptor_state.term;
     375            0 :         let last_log_term = shared_state.sk.last_log_term();
     376            0 :         let upto_segno = flush_lsn.segment_number(wal_seg_size);
     377              :         // have some limit on max number of segments as a sanity check
     378              :         const MAX_ALLOWED_SEGS: u64 = 1000;
     379            0 :         let num_segs = upto_segno - from_segno + 1;
     380            0 :         if num_segs > MAX_ALLOWED_SEGS {
     381            0 :             bail!(
     382            0 :                 "snapshot is called on timeline with {} segments, but the limit is {}",
     383              :                 num_segs,
     384              :                 MAX_ALLOWED_SEGS
     385              :             );
     386            0 :         }
     387              : 
     388              :         // Prevent WAL removal while we're streaming data.
     389              :         //
     390              :         // Since this a flag, not a counter just bail out if already set; we
     391              :         // shouldn't need concurrent snapshotting.
     392            0 :         if shared_state.wal_removal_on_hold {
     393            0 :             bail!("wal_removal_on_hold is already true");
     394            0 :         }
     395            0 :         shared_state.wal_removal_on_hold = true;
     396              : 
     397              :         // Drop shared_state to release the lock, before calling wal_residence_guard().
     398            0 :         drop(shared_state);
     399              : 
     400            0 :         let tli_copy = self.wal_residence_guard().await?;
     401            0 :         let from_to_segno = send_segments.then_some(from_segno..=upto_segno);
     402            0 :         let bctx = SnapshotContext {
     403            0 :             from_to_segno,
     404            0 :             term,
     405            0 :             last_log_term,
     406            0 :             flush_lsn,
     407            0 :             wal_seg_size,
     408            0 :             tli: tli_copy,
     409            0 :         };
     410              : 
     411            0 :         Ok(bctx)
     412            0 :     }
     413              : 
     414              :     /// Finish snapshotting: check that term(s) hasn't changed.
     415              :     ///
     416              :     /// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
     417              :     /// forget this if snapshotting fails mid the way.
     418            0 :     pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
     419            0 :         let shared_state = self.read_shared_state().await;
     420            0 :         let term = shared_state.sk.state().acceptor_state.term;
     421            0 :         let last_log_term = shared_state.sk.last_log_term();
     422              :         // There are some cases to relax this check (e.g. last_log_term might
     423              :         // change, but as long as older history is strictly part of new that's
     424              :         // fine), but there is no need to do it.
     425            0 :         if bctx.term != term || bctx.last_log_term != last_log_term {
     426            0 :             bail!(
     427            0 :                 "term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
     428              :                 bctx.term,
     429              :                 bctx.last_log_term,
     430              :                 term,
     431              :                 last_log_term
     432              :             );
     433            0 :         }
     434            0 :         Ok(())
     435            0 :     }
     436              : }
     437              : 
     438              : /// Response for debug dump request.
     439            0 : #[derive(Debug, Deserialize)]
     440              : pub struct DebugDumpResponse {
     441              :     pub start_time: DateTime<Utc>,
     442              :     pub finish_time: DateTime<Utc>,
     443              :     pub timelines: Vec<debug_dump::Timeline>,
     444              :     pub timelines_count: usize,
     445              :     pub config: debug_dump::Config,
     446              : }
     447              : 
     448              : /// Find the most advanced safekeeper and pull timeline from it.
     449            0 : pub async fn handle_request(
     450            0 :     request: PullTimelineRequest,
     451            0 :     sk_auth_token: Option<SecretString>,
     452            0 :     ssl_ca_certs: Vec<Certificate>,
     453            0 :     global_timelines: Arc<GlobalTimelines>,
     454            0 :     wait_for_peer_timeline_status: bool,
     455            0 : ) -> Result<PullTimelineResponse, ApiError> {
     456            0 :     let existing_tli = global_timelines.get(TenantTimelineId::new(
     457            0 :         request.tenant_id,
     458            0 :         request.timeline_id,
     459            0 :     ));
     460            0 :     if existing_tli.is_ok() {
     461            0 :         info!("Timeline {} already exists", request.timeline_id);
     462            0 :         return Ok(PullTimelineResponse {
     463            0 :             safekeeper_host: None,
     464            0 :         });
     465            0 :     }
     466              : 
     467            0 :     let mut http_client = reqwest::Client::builder();
     468            0 :     for ssl_ca_cert in ssl_ca_certs {
     469            0 :         http_client = http_client.add_root_certificate(ssl_ca_cert);
     470            0 :     }
     471            0 :     let http_client = http_client
     472            0 :         .build()
     473            0 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     474              : 
     475            0 :     let http_hosts = request.http_hosts.clone();
     476              : 
     477              :     // Figure out statuses of potential donors.
     478            0 :     let mut statuses = Vec::new();
     479            0 :     if !wait_for_peer_timeline_status {
     480            0 :         let responses: Vec<Result<TimelineStatus, mgmt_api::Error>> =
     481            0 :             futures::future::join_all(http_hosts.iter().map(|url| async {
     482            0 :                 let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
     483            0 :                 let resp = cclient
     484            0 :                     .timeline_status(request.tenant_id, request.timeline_id)
     485            0 :                     .await?;
     486            0 :                 let info: TimelineStatus = resp
     487            0 :                     .json()
     488            0 :                     .await
     489            0 :                     .context("Failed to deserialize timeline status")
     490            0 :                     .map_err(|e| mgmt_api::Error::ReceiveErrorBody(e.to_string()))?;
     491            0 :                 Ok(info)
     492            0 :             }))
     493            0 :             .await;
     494              : 
     495            0 :         for (i, response) in responses.into_iter().enumerate() {
     496            0 :             match response {
     497            0 :                 Ok(status) => {
     498            0 :                     statuses.push((status, i));
     499            0 :                 }
     500            0 :                 Err(e) => {
     501            0 :                     info!("error fetching status from {}: {e}", http_hosts[i]);
     502              :                 }
     503              :             }
     504              :         }
     505              : 
     506              :         // Allow missing responses from up to one safekeeper (say due to downtime)
     507              :         // e.g. if we created a timeline on PS A and B, with C being offline. Then B goes
     508              :         // offline and C comes online. Then we want a pull on C with A and B as hosts to work.
     509            0 :         let min_required_successful = (http_hosts.len() - 1).max(1);
     510            0 :         if statuses.len() < min_required_successful {
     511            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
     512            0 :                 "only got {} successful status responses. required: {min_required_successful}",
     513            0 :                 statuses.len()
     514            0 :             )));
     515            0 :         }
     516              :     } else {
     517            0 :         let mut retry = true;
     518              :         // We must get status from all other peers.
     519              :         // Otherwise, we may run into split-brain scenario.
     520            0 :         while retry {
     521            0 :             statuses.clear();
     522            0 :             retry = false;
     523            0 :             for (i, url) in http_hosts.iter().enumerate() {
     524            0 :                 let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
     525            0 :                 match cclient
     526            0 :                     .timeline_status(request.tenant_id, request.timeline_id)
     527            0 :                     .await
     528              :                 {
     529            0 :                     Ok(resp) => {
     530            0 :                         if resp.status() == StatusCode::NOT_FOUND {
     531            0 :                             warn!(
     532            0 :                                 "Timeline {} not found on peer SK {}, no need to pull it",
     533            0 :                                 TenantTimelineId::new(request.tenant_id, request.timeline_id),
     534              :                                 url
     535              :                             );
     536            0 :                             return Ok(PullTimelineResponse {
     537            0 :                                 safekeeper_host: None,
     538            0 :                             });
     539            0 :                         }
     540            0 :                         let info: TimelineStatus = resp
     541            0 :                             .json()
     542            0 :                             .await
     543            0 :                             .context("Failed to deserialize timeline status")
     544            0 :                             .map_err(ApiError::InternalServerError)?;
     545            0 :                         statuses.push((info, i));
     546              :                     }
     547            0 :                     Err(e) => {
     548            0 :                         match e {
     549              :                             // If we get a 404, it means the timeline doesn't exist on this safekeeper.
     550              :                             // We can ignore this error.
     551            0 :                             mgmt_api::Error::ApiError(status, _)
     552            0 :                                 if status == StatusCode::NOT_FOUND =>
     553              :                             {
     554            0 :                                 warn!(
     555            0 :                                     "Timeline {} not found on peer SK {}, no need to pull it",
     556            0 :                                     TenantTimelineId::new(request.tenant_id, request.timeline_id),
     557              :                                     url
     558              :                                 );
     559            0 :                                 return Ok(PullTimelineResponse {
     560            0 :                                     safekeeper_host: None,
     561            0 :                                 });
     562              :                             }
     563            0 :                             _ => {}
     564              :                         }
     565            0 :                         retry = true;
     566            0 :                         error!("Failed to get timeline status from {}: {:#}", url, e);
     567              :                     }
     568              :                 }
     569              :             }
     570            0 :             sleep(std::time::Duration::from_millis(100)).await;
     571              :         }
     572              :     }
     573              : 
     574              :     // Find the most advanced safekeeper
     575            0 :     let (status, i) = statuses
     576            0 :         .into_iter()
     577            0 :         .max_by_key(|(status, _)| {
     578            0 :             (
     579            0 :                 status.acceptor_state.epoch,
     580            0 :                 /* BEGIN_HADRON */
     581            0 :                 // We need to pull from the SK with the highest term.
     582            0 :                 // This is because another compute may come online and vote the same highest term again on the other two SKs.
     583            0 :                 // Then, there will be 2 computes running on the same term.
     584            0 :                 status.acceptor_state.term,
     585            0 :                 /* END_HADRON */
     586            0 :                 status.flush_lsn,
     587            0 :                 status.commit_lsn,
     588            0 :             )
     589            0 :         })
     590            0 :         .unwrap();
     591            0 :     let safekeeper_host = http_hosts[i].clone();
     592              : 
     593            0 :     assert!(status.tenant_id == request.tenant_id);
     594            0 :     assert!(status.timeline_id == request.timeline_id);
     595              : 
     596            0 :     let check_tombstone = !request.ignore_tombstone.unwrap_or_default();
     597              : 
     598            0 :     match pull_timeline(
     599            0 :         status,
     600            0 :         safekeeper_host,
     601            0 :         sk_auth_token,
     602            0 :         http_client,
     603            0 :         global_timelines,
     604            0 :         check_tombstone,
     605              :     )
     606            0 :     .await
     607              :     {
     608            0 :         Ok(resp) => Ok(resp),
     609            0 :         Err(e) => {
     610            0 :             match e.downcast_ref::<TimelineError>() {
     611            0 :                 Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
     612            0 :                     safekeeper_host: None,
     613            0 :                 }),
     614              :                 Some(TimelineError::CreationInProgress(_)) => {
     615              :                     // We don't return success here because creation might still fail.
     616            0 :                     Err(ApiError::Conflict("Creation in progress".to_owned()))
     617              :                 }
     618            0 :                 _ => Err(ApiError::InternalServerError(e)),
     619              :             }
     620              :         }
     621              :     }
     622            0 : }
     623              : 
     624            0 : async fn pull_timeline(
     625            0 :     status: TimelineStatus,
     626            0 :     host: String,
     627            0 :     sk_auth_token: Option<SecretString>,
     628            0 :     http_client: reqwest::Client,
     629            0 :     global_timelines: Arc<GlobalTimelines>,
     630            0 :     check_tombstone: bool,
     631            0 : ) -> Result<PullTimelineResponse> {
     632            0 :     let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
     633            0 :     info!(
     634            0 :         "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
     635              :         ttid,
     636              :         host,
     637              :         status.commit_lsn,
     638              :         status.flush_lsn,
     639              :         status.acceptor_state.term,
     640              :         status.acceptor_state.epoch
     641              :     );
     642              : 
     643            0 :     let conf = &global_timelines.get_global_config();
     644              : 
     645            0 :     let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
     646            0 :     let client = Client::new(http_client, host.clone(), sk_auth_token.clone());
     647              :     // Request stream with basebackup archive.
     648            0 :     let bb_resp = client
     649            0 :         .snapshot(status.tenant_id, status.timeline_id, conf.my_id)
     650            0 :         .await?;
     651              : 
     652              :     // Make Stream of Bytes from it...
     653            0 :     let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
     654              :     // and turn it into StreamReader implementing AsyncRead.
     655            0 :     let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
     656              : 
     657              :     // Extract it on the fly to the disk. We don't use simple unpack() to fsync
     658              :     // files.
     659            0 :     let mut entries = Archive::new(bb_reader).entries()?;
     660            0 :     while let Some(base_tar_entry) = entries.next().await {
     661            0 :         let mut entry = base_tar_entry?;
     662            0 :         let header = entry.header();
     663            0 :         let file_path = header.path()?.into_owned();
     664            0 :         match header.entry_type() {
     665              :             tokio_tar::EntryType::Regular => {
     666            0 :                 let utf8_file_path =
     667            0 :                     Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
     668            0 :                 let dst_path = tli_dir_path.join(utf8_file_path);
     669            0 :                 let mut f = OpenOptions::new()
     670            0 :                     .create(true)
     671            0 :                     .truncate(true)
     672            0 :                     .write(true)
     673            0 :                     .open(&dst_path)
     674            0 :                     .await?;
     675            0 :                 tokio::io::copy(&mut entry, &mut f).await?;
     676              :                 // fsync the file
     677            0 :                 f.sync_all().await?;
     678              :             }
     679              :             _ => {
     680            0 :                 bail!(
     681            0 :                     "entry {} in backup tar archive is of unexpected type: {:?}",
     682            0 :                     file_path.display(),
     683            0 :                     header.entry_type()
     684              :                 );
     685              :             }
     686              :         }
     687              :     }
     688              : 
     689              :     // fsync temp timeline directory to remember its contents.
     690            0 :     fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
     691              : 
     692              :     // Let's create timeline from temp directory and verify that it's correct
     693            0 :     let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
     694            0 :     info!(
     695            0 :         "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
     696              :         ttid, commit_lsn, flush_lsn
     697              :     );
     698            0 :     assert!(status.commit_lsn <= status.flush_lsn);
     699              : 
     700              :     // Finally, load the timeline.
     701            0 :     let _tli = global_timelines
     702            0 :         .load_temp_timeline(ttid, &tli_dir_path, check_tombstone)
     703            0 :         .await?;
     704              : 
     705            0 :     Ok(PullTimelineResponse {
     706            0 :         safekeeper_host: Some(host),
     707            0 :     })
     708            0 : }
        

Generated by: LCOV version 2.1-beta