|             Line data    Source code 
       1              : use anyhow::{anyhow, bail, Context, Result};
       2              : use bytes::Bytes;
       3              : use camino::Utf8PathBuf;
       4              : use camino_tempfile::Utf8TempDir;
       5              : use chrono::{DateTime, Utc};
       6              : use futures::{SinkExt, StreamExt, TryStreamExt};
       7              : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
       8              : use serde::{Deserialize, Serialize};
       9              : use std::{
      10              :     cmp::min,
      11              :     io::{self, ErrorKind},
      12              :     sync::Arc,
      13              : };
      14              : use tokio::{
      15              :     fs::{File, OpenOptions},
      16              :     io::AsyncWrite,
      17              :     sync::mpsc,
      18              :     task,
      19              : };
      20              : use tokio_tar::{Archive, Builder};
      21              : use tokio_util::{
      22              :     io::{CopyToBytes, SinkWriter},
      23              :     sync::PollSender,
      24              : };
      25              : use tracing::{error, info, instrument};
      26              : 
      27              : use crate::{
      28              :     control_file::{self, CONTROL_FILE_NAME},
      29              :     debug_dump,
      30              :     http::{
      31              :         client::{self, Client},
      32              :         routes::TimelineStatus,
      33              :     },
      34              :     safekeeper::Term,
      35              :     timeline::{get_tenant_dir, get_timeline_dir, FullAccessTimeline, Timeline, TimelineError},
      36              :     wal_storage::{self, open_wal_file, Storage},
      37              :     GlobalTimelines, SafeKeeperConf,
      38              : };
      39              : use utils::{
      40              :     crashsafe::{durable_rename, fsync_async_opt},
      41              :     id::{TenantId, TenantTimelineId, TimelineId},
      42              :     logging::SecretString,
      43              :     lsn::Lsn,
      44              :     pausable_failpoint,
      45              : };
      46              : 
      47              : /// Stream tar archive of timeline to tx.
      48            0 : #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
      49              : pub async fn stream_snapshot(tli: FullAccessTimeline, tx: mpsc::Sender<Result<Bytes>>) {
      50              :     if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
      51              :         // Error type/contents don't matter as they won't can't reach the client
      52              :         // (hyper likely doesn't do anything with it), but http stream will be
      53              :         // prematurely terminated. It would be nice to try to send the error in
      54              :         // trailers though.
      55              :         tx.send(Err(anyhow!("snapshot failed"))).await.ok();
      56              :         error!("snapshot failed: {:#}", e);
      57              :     }
      58              : }
      59              : 
      60              : /// State needed while streaming the snapshot.
      61              : pub struct SnapshotContext {
      62              :     pub from_segno: XLogSegNo, // including
      63              :     pub upto_segno: XLogSegNo, // including
      64              :     pub term: Term,
      65              :     pub last_log_term: Term,
      66              :     pub flush_lsn: Lsn,
      67              :     pub wal_seg_size: usize,
      68              :     // used to remove WAL hold off in Drop.
      69              :     pub tli: FullAccessTimeline,
      70              : }
      71              : 
      72              : impl Drop for SnapshotContext {
      73            0 :     fn drop(&mut self) {
      74            0 :         let tli = self.tli.clone();
      75            0 :         task::spawn(async move {
      76            0 :             let mut shared_state = tli.write_shared_state().await;
      77            0 :             shared_state.wal_removal_on_hold = false;
      78            0 :         });
      79            0 :     }
      80              : }
      81              : 
      82            0 : pub async fn stream_snapshot_guts(
      83            0 :     tli: FullAccessTimeline,
      84            0 :     tx: mpsc::Sender<Result<Bytes>>,
      85            0 : ) -> Result<()> {
      86            0 :     // tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
      87            0 :     // use SinkWriter as a Write impl. That is,
      88            0 :     // - create Sink from the tx. It returns PollSendError if chan is closed.
      89            0 :     let sink = PollSender::new(tx);
      90            0 :     // - SinkWriter needs sink error to be io one, map it.
      91            0 :     let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe));
      92            0 :     // - SinkWriter wants sink type to be just Bytes, not Result<Bytes>, so map
      93            0 :     //   it with with(). Note that with() accepts async function which we don't
      94            0 :     //   need and allows the map to fail, which we don't need either, but hence
      95            0 :     //   two Oks.
      96            0 :     let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) });
      97            0 :     // - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
      98            0 :     // into CopyToBytes. This is a data copy.
      99            0 :     let copy_to_bytes = CopyToBytes::new(oksink);
     100            0 :     let mut writer = SinkWriter::new(copy_to_bytes);
     101            0 :     let pinned_writer = std::pin::pin!(writer);
     102            0 : 
     103            0 :     // Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
     104            0 :     // which is also likely suboptimal.
     105            0 :     let mut ar = Builder::new_non_terminated(pinned_writer);
     106              : 
     107            0 :     let bctx = tli.start_snapshot(&mut ar).await?;
     108              :     pausable_failpoint!("sk-snapshot-after-list-pausable");
     109              : 
     110            0 :     let tli_dir = tli.get_timeline_dir();
     111            0 :     info!(
     112            0 :         "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}",
     113            0 :         bctx.upto_segno - bctx.from_segno + 1,
     114              :         bctx.from_segno,
     115              :         bctx.upto_segno,
     116              :         bctx.term,
     117              :         bctx.last_log_term,
     118              :         bctx.flush_lsn,
     119              :     );
     120            0 :     for segno in bctx.from_segno..=bctx.upto_segno {
     121            0 :         let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?;
     122            0 :         let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size);
     123            0 :         if is_partial {
     124            0 :             wal_file_name.push_str(".partial");
     125            0 :         }
     126            0 :         ar.append_file(&wal_file_name, &mut sf).await?;
     127              :     }
     128              : 
     129              :     // Do the term check before ar.finish to make archive corrupted in case of
     130              :     // term change. Client shouldn't ignore abrupt stream end, but to be sure.
     131            0 :     tli.finish_snapshot(&bctx).await?;
     132              : 
     133            0 :     ar.finish().await?;
     134              : 
     135            0 :     Ok(())
     136            0 : }
     137              : 
     138              : impl FullAccessTimeline {
     139              :     /// Start streaming tar archive with timeline:
     140              :     /// 1) stream control file under lock;
     141              :     /// 2) hold off WAL removal;
     142              :     /// 3) collect SnapshotContext to understand which WAL segments should be
     143              :     ///    streamed.
     144              :     ///
     145              :     /// Snapshot streams data up to flush_lsn. To make this safe, we must check
     146              :     /// that term doesn't change during the procedure, or we risk sending mix of
     147              :     /// WAL from different histories. Term is remembered in the SnapshotContext
     148              :     /// and checked in finish_snapshot. Note that in the last segment some WAL
     149              :     /// higher than flush_lsn set here might be streamed; that's fine as long as
     150              :     /// terms doesn't change.
     151              :     ///
     152              :     /// Alternatively we could send only up to commit_lsn to get some valid
     153              :     /// state which later will be recovered by compute, in this case term check
     154              :     /// is not needed, but we likely don't want that as there might be no
     155              :     /// compute which could perform the recovery.
     156              :     ///
     157              :     /// When returned SnapshotContext is dropped WAL hold is removed.
     158            0 :     async fn start_snapshot<W: AsyncWrite + Unpin + Send>(
     159            0 :         &self,
     160            0 :         ar: &mut tokio_tar::Builder<W>,
     161            0 :     ) -> Result<SnapshotContext> {
     162            0 :         let mut shared_state = self.write_shared_state().await;
     163              : 
     164            0 :         let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
     165            0 :         let mut cf = File::open(cf_path).await?;
     166            0 :         ar.append_file(CONTROL_FILE_NAME, &mut cf).await?;
     167              : 
     168              :         // We need to stream since the oldest segment someone (s3 or pageserver)
     169              :         // still needs. This duplicates calc_horizon_lsn logic.
     170              :         //
     171              :         // We know that WAL wasn't removed up to this point because it cannot be
     172              :         // removed further than `backup_lsn`. Since we're holding shared_state
     173              :         // lock and setting `wal_removal_on_hold` later, it guarantees that WAL
     174              :         // won't be removed until we're done.
     175            0 :         let from_lsn = min(
     176            0 :             shared_state.sk.state.remote_consistent_lsn,
     177            0 :             shared_state.sk.state.backup_lsn,
     178            0 :         );
     179            0 :         if from_lsn == Lsn::INVALID {
     180              :             // this is possible if snapshot is called before handling first
     181              :             // elected message
     182            0 :             bail!("snapshot is called on uninitialized timeline");
     183            0 :         }
     184            0 :         let from_segno = from_lsn.segment_number(shared_state.get_wal_seg_size());
     185            0 :         let term = shared_state.sk.get_term();
     186            0 :         let last_log_term = shared_state.sk.get_last_log_term();
     187            0 :         let flush_lsn = shared_state.sk.flush_lsn();
     188            0 :         let upto_segno = flush_lsn.segment_number(shared_state.get_wal_seg_size());
     189            0 :         // have some limit on max number of segments as a sanity check
     190            0 :         const MAX_ALLOWED_SEGS: u64 = 1000;
     191            0 :         let num_segs = upto_segno - from_segno + 1;
     192            0 :         if num_segs > MAX_ALLOWED_SEGS {
     193            0 :             bail!(
     194            0 :                 "snapshot is called on timeline with {} segments, but the limit is {}",
     195            0 :                 num_segs,
     196            0 :                 MAX_ALLOWED_SEGS
     197            0 :             );
     198            0 :         }
     199            0 : 
     200            0 :         // Prevent WAL removal while we're streaming data.
     201            0 :         //
     202            0 :         // Since this a flag, not a counter just bail out if already set; we
     203            0 :         // shouldn't need concurrent snapshotting.
     204            0 :         if shared_state.wal_removal_on_hold {
     205            0 :             bail!("wal_removal_on_hold is already true");
     206            0 :         }
     207            0 :         shared_state.wal_removal_on_hold = true;
     208            0 : 
     209            0 :         let bctx = SnapshotContext {
     210            0 :             from_segno,
     211            0 :             upto_segno,
     212            0 :             term,
     213            0 :             last_log_term,
     214            0 :             flush_lsn,
     215            0 :             wal_seg_size: shared_state.get_wal_seg_size(),
     216            0 :             tli: self.clone(),
     217            0 :         };
     218            0 : 
     219            0 :         Ok(bctx)
     220            0 :     }
     221              : 
     222              :     /// Finish snapshotting: check that term(s) hasn't changed.
     223              :     ///
     224              :     /// Note that WAL gc hold off is removed in Drop of SnapshotContext to not
     225              :     /// forget this if snapshotting fails mid the way.
     226            0 :     pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
     227            0 :         let shared_state = self.read_shared_state().await;
     228            0 :         let term = shared_state.sk.get_term();
     229            0 :         let last_log_term = shared_state.sk.get_last_log_term();
     230            0 :         // There are some cases to relax this check (e.g. last_log_term might
     231            0 :         // change, but as long as older history is strictly part of new that's
     232            0 :         // fine), but there is no need to do it.
     233            0 :         if bctx.term != term || bctx.last_log_term != last_log_term {
     234            0 :             bail!("term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}",
     235            0 :               bctx.term, bctx.last_log_term, term, last_log_term);
     236            0 :         }
     237            0 :         Ok(())
     238            0 :     }
     239              : }
     240              : 
     241              : /// pull_timeline request body.
     242            0 : #[derive(Debug, Serialize, Deserialize)]
     243              : pub struct Request {
     244              :     pub tenant_id: TenantId,
     245              :     pub timeline_id: TimelineId,
     246              :     pub http_hosts: Vec<String>,
     247              : }
     248              : 
     249              : #[derive(Debug, Serialize)]
     250              : pub struct Response {
     251              :     // Donor safekeeper host
     252              :     pub safekeeper_host: String,
     253              :     // TODO: add more fields?
     254              : }
     255              : 
     256              : /// Response for debug dump request.
     257            0 : #[derive(Debug, Serialize, Deserialize)]
     258              : pub struct DebugDumpResponse {
     259              :     pub start_time: DateTime<Utc>,
     260              :     pub finish_time: DateTime<Utc>,
     261              :     pub timelines: Vec<debug_dump::Timeline>,
     262              :     pub timelines_count: usize,
     263              :     pub config: debug_dump::Config,
     264              : }
     265              : 
     266              : /// Find the most advanced safekeeper and pull timeline from it.
     267            0 : pub async fn handle_request(
     268            0 :     request: Request,
     269            0 :     sk_auth_token: Option<SecretString>,
     270            0 : ) -> Result<Response> {
     271            0 :     let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
     272            0 :         request.tenant_id,
     273            0 :         request.timeline_id,
     274            0 :     ));
     275            0 :     if existing_tli.is_ok() {
     276            0 :         bail!("Timeline {} already exists", request.timeline_id);
     277            0 :     }
     278            0 : 
     279            0 :     let http_hosts = request.http_hosts.clone();
     280              : 
     281              :     // Figure out statuses of potential donors.
     282            0 :     let responses: Vec<Result<TimelineStatus, client::Error>> =
     283            0 :         futures::future::join_all(http_hosts.iter().map(|url| async {
     284            0 :             let cclient = Client::new(url.clone(), sk_auth_token.clone());
     285            0 :             let info = cclient
     286            0 :                 .timeline_status(request.tenant_id, request.timeline_id)
     287            0 :                 .await?;
     288            0 :             Ok(info)
     289            0 :         }))
     290            0 :         .await;
     291              : 
     292            0 :     let mut statuses = Vec::new();
     293            0 :     for (i, response) in responses.into_iter().enumerate() {
     294            0 :         let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
     295            0 :         statuses.push((status, i));
     296              :     }
     297              : 
     298              :     // Find the most advanced safekeeper
     299            0 :     let (status, i) = statuses
     300            0 :         .into_iter()
     301            0 :         .max_by_key(|(status, _)| {
     302            0 :             (
     303            0 :                 status.acceptor_state.epoch,
     304            0 :                 status.flush_lsn,
     305            0 :                 status.commit_lsn,
     306            0 :             )
     307            0 :         })
     308            0 :         .unwrap();
     309            0 :     let safekeeper_host = http_hosts[i].clone();
     310            0 : 
     311            0 :     assert!(status.tenant_id == request.tenant_id);
     312            0 :     assert!(status.timeline_id == request.timeline_id);
     313              : 
     314            0 :     pull_timeline(status, safekeeper_host, sk_auth_token).await
     315            0 : }
     316              : 
     317            0 : async fn pull_timeline(
     318            0 :     status: TimelineStatus,
     319            0 :     host: String,
     320            0 :     sk_auth_token: Option<SecretString>,
     321            0 : ) -> Result<Response> {
     322            0 :     let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
     323            0 :     info!(
     324            0 :         "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
     325              :         ttid,
     326              :         host,
     327              :         status.commit_lsn,
     328              :         status.flush_lsn,
     329              :         status.acceptor_state.term,
     330              :         status.acceptor_state.epoch
     331              :     );
     332              : 
     333            0 :     let conf = &GlobalTimelines::get_global_config();
     334              : 
     335            0 :     let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
     336              : 
     337            0 :     let client = Client::new(host.clone(), sk_auth_token.clone());
     338              :     // Request stream with basebackup archive.
     339            0 :     let bb_resp = client
     340            0 :         .snapshot(status.tenant_id, status.timeline_id)
     341            0 :         .await?;
     342              : 
     343              :     // Make Stream of Bytes from it...
     344            0 :     let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other);
     345            0 :     // and turn it into StreamReader implementing AsyncRead.
     346            0 :     let bb_reader = tokio_util::io::StreamReader::new(bb_stream);
     347              : 
     348              :     // Extract it on the fly to the disk. We don't use simple unpack() to fsync
     349              :     // files.
     350            0 :     let mut entries = Archive::new(bb_reader).entries()?;
     351            0 :     while let Some(base_tar_entry) = entries.next().await {
     352            0 :         let mut entry = base_tar_entry?;
     353            0 :         let header = entry.header();
     354            0 :         let file_path = header.path()?.into_owned();
     355            0 :         match header.entry_type() {
     356              :             tokio_tar::EntryType::Regular => {
     357            0 :                 let utf8_file_path =
     358            0 :                     Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path");
     359            0 :                 let dst_path = tli_dir_path.join(utf8_file_path);
     360            0 :                 let mut f = OpenOptions::new()
     361            0 :                     .create(true)
     362            0 :                     .truncate(true)
     363            0 :                     .write(true)
     364            0 :                     .open(&dst_path)
     365            0 :                     .await?;
     366            0 :                 tokio::io::copy(&mut entry, &mut f).await?;
     367              :                 // fsync the file
     368            0 :                 f.sync_all().await?;
     369              :             }
     370              :             _ => {
     371            0 :                 bail!(
     372            0 :                     "entry {} in backup tar archive is of unexpected type: {:?}",
     373            0 :                     file_path.display(),
     374            0 :                     header.entry_type()
     375            0 :                 );
     376              :             }
     377              :         }
     378              :     }
     379              : 
     380              :     // fsync temp timeline directory to remember its contents.
     381            0 :     fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
     382              : 
     383              :     // Let's create timeline from temp directory and verify that it's correct
     384            0 :     let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
     385            0 :     info!(
     386            0 :         "finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
     387              :         ttid, commit_lsn, flush_lsn
     388              :     );
     389            0 :     assert!(status.commit_lsn <= status.flush_lsn);
     390              : 
     391              :     // Finally, load the timeline.
     392            0 :     let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
     393              : 
     394            0 :     Ok(Response {
     395            0 :         safekeeper_host: host,
     396            0 :     })
     397            0 : }
     398              : 
     399              : /// Create temp directory for a new timeline. It needs to be located on the same
     400              : /// filesystem as the rest of the timelines. It will be automatically deleted when
     401              : /// Utf8TempDir goes out of scope.
     402            0 : pub async fn create_temp_timeline_dir(
     403            0 :     conf: &SafeKeeperConf,
     404            0 :     ttid: TenantTimelineId,
     405            0 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
     406              :     // conf.workdir is usually /storage/safekeeper/data
     407              :     // will try to transform it into /storage/safekeeper/tmp
     408            0 :     let temp_base = conf
     409            0 :         .workdir
     410            0 :         .parent()
     411            0 :         .ok_or(anyhow::anyhow!("workdir has no parent"))?
     412            0 :         .join("tmp");
     413            0 : 
     414            0 :     tokio::fs::create_dir_all(&temp_base).await?;
     415              : 
     416            0 :     let tli_dir = camino_tempfile::Builder::new()
     417            0 :         .suffix("_temptli")
     418            0 :         .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
     419            0 :         .tempdir_in(temp_base)?;
     420              : 
     421            0 :     let tli_dir_path = tli_dir.path().to_path_buf();
     422            0 : 
     423            0 :     Ok((tli_dir, tli_dir_path))
     424            0 : }
     425              : 
     426              : /// Do basic validation of a temp timeline, before moving it to the global map.
     427            0 : pub async fn validate_temp_timeline(
     428            0 :     conf: &SafeKeeperConf,
     429            0 :     ttid: TenantTimelineId,
     430            0 :     path: &Utf8PathBuf,
     431            0 : ) -> Result<(Lsn, Lsn)> {
     432            0 :     let control_path = path.join("safekeeper.control");
     433              : 
     434            0 :     let control_store = control_file::FileStorage::load_control_file(control_path)?;
     435            0 :     if control_store.server.wal_seg_size == 0 {
     436            0 :         bail!("wal_seg_size is not set");
     437            0 :     }
     438              : 
     439            0 :     let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
     440              : 
     441            0 :     let commit_lsn = control_store.commit_lsn;
     442            0 :     let flush_lsn = wal_store.flush_lsn();
     443            0 : 
     444            0 :     Ok((commit_lsn, flush_lsn))
     445            0 : }
     446              : 
     447              : /// Move timeline from a temp directory to the main storage, and load it to the global map.
     448              : /// This operation is done under a lock to prevent bugs if several concurrent requests are
     449              : /// trying to load the same timeline. Note that it doesn't guard against creating the
     450              : /// timeline with the same ttid, but no one should be doing this anyway.
     451            0 : pub async fn load_temp_timeline(
     452            0 :     conf: &SafeKeeperConf,
     453            0 :     ttid: TenantTimelineId,
     454            0 :     tmp_path: &Utf8PathBuf,
     455            0 : ) -> Result<Arc<Timeline>> {
     456              :     // Take a lock to prevent concurrent loadings
     457            0 :     let load_lock = GlobalTimelines::loading_lock().await;
     458            0 :     let guard = load_lock.lock().await;
     459              : 
     460            0 :     if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
     461            0 :         bail!("timeline already exists, cannot overwrite it")
     462            0 :     }
     463            0 : 
     464            0 :     // Move timeline dir to the correct location
     465            0 :     let timeline_path = get_timeline_dir(conf, &ttid);
     466            0 : 
     467            0 :     info!(
     468            0 :         "moving timeline {} from {} to {}",
     469              :         ttid, tmp_path, timeline_path
     470              :     );
     471            0 :     tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
     472              :     // fsync tenant dir creation
     473            0 :     fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
     474            0 :     durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
     475              : 
     476            0 :     let tli = GlobalTimelines::load_timeline(&guard, ttid)
     477            0 :         .await
     478            0 :         .context("Failed to load timeline after copy")?;
     479              : 
     480            0 :     info!(
     481            0 :         "loaded timeline {}, flush_lsn={}",
     482            0 :         ttid,
     483            0 :         tli.get_flush_lsn().await
     484              :     );
     485              : 
     486            0 :     Ok(tli)
     487            0 : }
         |