LCOV - differential code coverage report
Current view: top level - safekeeper/src - recovery.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 80.2 % 263 211 52 211
Current Date: 2024-01-09 02:06:09 Functions: 64.7 % 34 22 12 22
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! This module implements pulling WAL from peer safekeepers if compute can't
       2                 : //! provide it, i.e. safekeeper lags too much.
       3                 : 
       4                 : use std::time::SystemTime;
       5                 : use std::{fmt, pin::pin, sync::Arc};
       6                 : 
       7                 : use anyhow::{bail, Context};
       8                 : use futures::StreamExt;
       9                 : use postgres_protocol::message::backend::ReplicationMessage;
      10                 : use tokio::sync::mpsc::{channel, Receiver, Sender};
      11                 : use tokio::time::timeout;
      12                 : use tokio::{
      13                 :     select,
      14                 :     time::sleep,
      15                 :     time::{self, Duration},
      16                 : };
      17                 : use tokio_postgres::replication::ReplicationStream;
      18                 : use tokio_postgres::types::PgLsn;
      19                 : use tracing::*;
      20                 : use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
      21                 : 
      22                 : use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
      23                 : use crate::safekeeper::{AppendRequest, AppendRequestHeader};
      24                 : use crate::{
      25                 :     http::routes::TimelineStatus,
      26                 :     receive_wal::MSG_QUEUE_SIZE,
      27                 :     safekeeper::{
      28                 :         AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory,
      29                 :         TermLsn, VoteRequest,
      30                 :     },
      31                 :     timeline::{PeerInfo, Timeline},
      32                 :     SafeKeeperConf,
      33                 : };
      34                 : 
      35                 : /// Entrypoint for per timeline task which always runs, checking whether
      36                 : /// recovery for this safekeeper is needed and starting it if so.
      37 CBC           1 : #[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
      38                 : pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
      39               1 :     info!("started");
      40                 :     let mut cancellation_rx = match tli.get_cancellation_rx() {
      41                 :         Ok(rx) => rx,
      42                 :         Err(_) => {
      43 UBC           0 :             info!("timeline canceled during task start");
      44                 :             return;
      45                 :         }
      46                 :     };
      47                 : 
      48 CBC        1278 :     select! {
      49            1278 :         _ = recovery_main_loop(tli, conf) => { unreachable!() }
      50            1278 :         _ = cancellation_rx.changed() => {
      51            1278 :             info!("stopped");
      52            1278 :         }
      53            1278 :     }
      54                 : }
      55                 : 
      56                 : /// Result of Timeline::recovery_needed, contains donor(s) if recovery needed and
      57                 : /// fields to explain the choice.
      58 UBC           0 : #[derive(Debug)]
      59                 : pub struct RecoveryNeededInfo {
      60                 :     /// my term
      61                 :     pub term: Term,
      62                 :     /// my last_log_term
      63                 :     pub last_log_term: Term,
      64                 :     /// my flush_lsn
      65                 :     pub flush_lsn: Lsn,
      66                 :     /// peers from which we can fetch WAL, for observability.
      67                 :     pub peers: Vec<PeerInfo>,
      68                 :     /// for observability
      69                 :     pub num_streaming_computes: usize,
      70                 :     pub donors: Vec<Donor>,
      71                 : }
      72                 : 
      73                 : // Custom to omit not important fields from PeerInfo.
      74                 : impl fmt::Display for RecoveryNeededInfo {
      75 CBC           2 :     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
      76               2 :         write!(f, "{{")?;
      77               2 :         write!(
      78               2 :             f,
      79               2 :             "term: {}, last_log_term: {}, flush_lsn: {}, peers: {{",
      80               2 :             self.term, self.last_log_term, self.flush_lsn
      81               2 :         )?;
      82               6 :         for p in self.peers.iter() {
      83               6 :             write!(
      84               6 :                 f,
      85               6 :                 "PeerInfo {{ sk_id: {}, term: {}, last_log_term: {}, flush_lsn: {} }}, ",
      86               6 :                 p.sk_id, p.term, p.last_log_term, p.flush_lsn
      87               6 :             )?;
      88                 :         }
      89               2 :         write!(
      90               2 :             f,
      91               2 :             "}} num_streaming_computes: {}, donors: {:?}",
      92               2 :             self.num_streaming_computes, self.donors
      93               2 :         )
      94               2 :     }
      95                 : }
      96                 : 
      97               2 : #[derive(Clone, Debug)]
      98                 : pub struct Donor {
      99                 :     pub sk_id: NodeId,
     100                 :     /// equals to last_log_term
     101                 :     pub term: Term,
     102                 :     pub flush_lsn: Lsn,
     103                 :     pub pg_connstr: String,
     104                 :     pub http_connstr: String,
     105                 : }
     106                 : 
     107                 : impl From<&PeerInfo> for Donor {
     108               2 :     fn from(p: &PeerInfo) -> Self {
     109               2 :         Donor {
     110               2 :             sk_id: p.sk_id,
     111               2 :             term: p.term,
     112               2 :             flush_lsn: p.flush_lsn,
     113               2 :             pg_connstr: p.pg_connstr.clone(),
     114               2 :             http_connstr: p.http_connstr.clone(),
     115               2 :         }
     116               2 :     }
     117                 : }
     118                 : 
     119                 : const CHECK_INTERVAL_MS: u64 = 2000;
     120                 : 
     121                 : /// Check regularly whether we need to start recovery.
     122               1 : async fn recovery_main_loop(tli: Arc<Timeline>, conf: SafeKeeperConf) {
     123               1 :     let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
     124                 :     loop {
     125               2 :         let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
     126               2 :         match recovery_needed_info.donors.first() {
     127               1 :             Some(donor) => {
     128               1 :                 info!(
     129               1 :                     "starting recovery from donor {}: {}",
     130               1 :                     donor.sk_id, recovery_needed_info
     131               1 :                 );
     132            1276 :                 match recover(tli.clone(), donor, &conf).await {
     133                 :                     // Note: 'write_wal rewrites WAL written before' error is
     134                 :                     // expected here and might happen if compute and recovery
     135                 :                     // concurrently write the same data. Eventually compute
     136                 :                     // should win.
     137 UBC           0 :                     Err(e) => warn!("recovery failed: {:#}", e),
     138 CBC           1 :                     Ok(msg) => info!("recovery finished: {}", msg),
     139                 :                 }
     140                 :             }
     141                 :             None => {
     142 UBC           0 :                 trace!(
     143               0 :                     "recovery not needed or not possible: {}",
     144               0 :                     recovery_needed_info
     145               0 :                 );
     146                 :             }
     147                 :         }
     148 CBC           2 :         sleep(check_duration).await;
     149                 :     }
     150                 : }
     151                 : 
     152                 : /// Recover from the specified donor. Returns message explaining normal finish
     153                 : /// reason or error.
     154               1 : async fn recover(
     155               1 :     tli: Arc<Timeline>,
     156               1 :     donor: &Donor,
     157               1 :     conf: &SafeKeeperConf,
     158               1 : ) -> anyhow::Result<String> {
     159               1 :     // Learn donor term switch history to figure out starting point.
     160               1 :     let client = reqwest::Client::new();
     161               1 :     let timeline_info: TimelineStatus = client
     162               1 :         .get(format!(
     163               1 :             "http://{}/v1/tenant/{}/timeline/{}",
     164               1 :             donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id
     165               1 :         ))
     166               1 :         .send()
     167               3 :         .await?
     168               1 :         .json()
     169 UBC           0 :         .await?;
     170 CBC           1 :     if timeline_info.acceptor_state.term != donor.term {
     171 UBC           0 :         bail!(
     172               0 :             "donor term changed from {} to {}",
     173               0 :             donor.term,
     174               0 :             timeline_info.acceptor_state.term
     175               0 :         );
     176 CBC           1 :     }
     177               1 :     // convert from API TermSwitchApiEntry into TermLsn.
     178               1 :     let donor_th = TermHistory(
     179               1 :         timeline_info
     180               1 :             .acceptor_state
     181               1 :             .term_history
     182               1 :             .iter()
     183               2 :             .map(|tl| Into::<TermLsn>::into(*tl))
     184               1 :             .collect(),
     185               1 :     );
     186               1 : 
     187               1 :     // Now understand our term history.
     188               1 :     let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term });
     189               1 :     let vote_response = match tli
     190               1 :         .process_msg(&vote_request)
     191               3 :         .await
     192               1 :         .context("VoteRequest handling")?
     193                 :     {
     194               1 :         Some(AcceptorProposerMessage::VoteResponse(vr)) => vr,
     195                 :         _ => {
     196 UBC           0 :             bail!("unexpected VoteRequest response"); // unreachable
     197                 :         }
     198                 :     };
     199 CBC           1 :     if vote_response.term != donor.term {
     200 UBC           0 :         bail!(
     201               0 :             "our term changed from {} to {}",
     202               0 :             donor.term,
     203               0 :             vote_response.term
     204               0 :         );
     205 CBC           1 :     }
     206                 : 
     207               1 :     let last_common_point = match TermHistory::find_highest_common_point(
     208               1 :         &donor_th,
     209               1 :         &vote_response.term_history,
     210               1 :         vote_response.flush_lsn,
     211               1 :     ) {
     212 UBC           0 :         None => bail!(
     213               0 :             "couldn't find common point in histories, donor {:?}, sk {:?}",
     214               0 :             donor_th,
     215               0 :             vote_response.term_history,
     216               0 :         ),
     217 CBC           1 :         Some(lcp) => lcp,
     218                 :     };
     219               1 :     info!("found last common point at {:?}", last_common_point);
     220                 : 
     221                 :     // truncate WAL locally
     222               1 :     let pe = ProposerAcceptorMessage::Elected(ProposerElected {
     223               1 :         term: donor.term,
     224               1 :         start_streaming_at: last_common_point.lsn,
     225               1 :         term_history: donor_th,
     226               1 :         timeline_start_lsn: Lsn::INVALID,
     227               1 :     });
     228               1 :     // Successful ProposerElected handling always returns None. If term changed,
     229               1 :     // we'll find out that during the streaming. Note: it is expected to get
     230               1 :     // 'refusing to overwrite correct WAL' here if walproposer reconnected
     231               1 :     // concurrently, restart helps here.
     232               1 :     tli.process_msg(&pe)
     233            1192 :         .await
     234               1 :         .context("ProposerElected handling")?;
     235                 : 
     236              78 :     recovery_stream(tli, donor, last_common_point.lsn, conf).await
     237               1 : }
     238                 : 
     239                 : // Pull WAL from donor, assuming handshake is already done.
     240               1 : async fn recovery_stream(
     241               1 :     tli: Arc<Timeline>,
     242               1 :     donor: &Donor,
     243               1 :     start_streaming_at: Lsn,
     244               1 :     conf: &SafeKeeperConf,
     245               1 : ) -> anyhow::Result<String> {
     246                 :     // TODO: pass auth token
     247               1 :     let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
     248               1 :     let mut cfg = cfg.to_tokio_postgres_config();
     249               1 :     // It will make safekeeper give out not committed WAL (up to flush_lsn).
     250               1 :     cfg.application_name(&format!("safekeeper_{}", conf.my_id));
     251               1 :     cfg.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
     252               1 : 
     253               1 :     let connect_timeout = Duration::from_millis(10000);
     254               1 :     let (client, connection) = match time::timeout(connect_timeout, cfg.connect(postgres::NoTls))
     255               2 :         .await
     256                 :     {
     257               1 :         Ok(client_and_conn) => client_and_conn?,
     258 UBC           0 :         Err(_elapsed) => {
     259               0 :             bail!("timed out while waiting {connect_timeout:?} for connection to peer safekeeper to open");
     260                 :         }
     261                 :     };
     262               0 :     trace!("connected to {:?}", donor);
     263                 : 
     264                 :     // The connection object performs the actual communication with the
     265                 :     // server, spawn it off to run on its own.
     266 CBC           1 :     let ttid = tli.ttid;
     267               1 :     tokio::spawn(async move {
     268               1 :         if let Err(e) = connection
     269               1 :             .instrument(info_span!("recovery task connection poll", ttid = %ttid))
     270              80 :             .await
     271                 :         {
     272                 :             // This logging isn't very useful as error is anyway forwarded to client.
     273               1 :             trace!(
     274 UBC           0 :                 "tokio_postgres connection object finished with error: {}",
     275               0 :                 e
     276               0 :             );
     277               0 :         }
     278 CBC           1 :     });
     279               1 : 
     280               1 :     let query = format!(
     281               1 :         "START_REPLICATION PHYSICAL {} (term='{}')",
     282               1 :         start_streaming_at, donor.term
     283               1 :     );
     284                 : 
     285               1 :     let copy_stream = client.copy_both_simple(&query).await?;
     286               1 :     let physical_stream = ReplicationStream::new(copy_stream);
     287               1 : 
     288               1 :     // As in normal walreceiver, do networking and writing to disk in parallel.
     289               1 :     let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
     290               1 :     let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
     291               1 :     let wa = WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, None);
     292                 : 
     293               1 :     let res = tokio::select! {
     294               1 :         r = network_io(physical_stream, msg_tx, donor.clone(), tli.clone(), conf.clone()) => r,
     295 UBC           0 :         r = read_replies(reply_rx, donor.term) => r.map(|()| None),
     296                 :     };
     297                 : 
     298                 :     // Join the spawned WalAcceptor. At this point chans to/from it passed to
     299                 :     // network routines are dropped, so it will exit as soon as it touches them.
     300 CBC           1 :     match wa.await {
     301                 :         Ok(Ok(())) => {
     302                 :             // WalAcceptor finished normally, termination reason is different
     303               1 :             match res {
     304               1 :                 Ok(Some(success_desc)) => Ok(success_desc),
     305 UBC           0 :                 Ok(None) => bail!("unexpected recovery end without error/success"), // can't happen
     306               0 :                 Err(e) => Err(e), // network error or term change
     307                 :             }
     308                 :         }
     309               0 :         Ok(Err(e)) => Err(e), // error while processing message
     310               0 :         Err(e) => bail!("WalAcceptor panicked: {}", e),
     311                 :     }
     312 CBC           1 : }
     313                 : 
     314                 : // Perform network part of streaming: read data and push it to msg_tx, send KA
     315                 : // to make sender hear from us. If there is nothing coming for a while, check
     316                 : // for termination.
     317                 : // Returns
     318                 : // - Ok(None) if channel to WalAcceptor closed -- its task should return error.
     319                 : // - Ok(Some(String)) if recovery successfully completed.
     320                 : // - Err if error happened while reading/writing to socket.
     321               1 : async fn network_io(
     322               1 :     physical_stream: ReplicationStream,
     323               1 :     msg_tx: Sender<ProposerAcceptorMessage>,
     324               1 :     donor: Donor,
     325               1 :     tli: Arc<Timeline>,
     326               1 :     conf: SafeKeeperConf,
     327               1 : ) -> anyhow::Result<Option<String>> {
     328               1 :     let mut physical_stream = pin!(physical_stream);
     329               1 :     let mut last_received_lsn = Lsn::INVALID;
     330               1 :     // tear down connection if no data arrives withing this period
     331               1 :     let no_data_timeout = Duration::from_millis(30000);
     332                 : 
     333                 :     loop {
     334             141 :         let msg = match timeout(no_data_timeout, physical_stream.next()).await {
     335             141 :             Ok(next) => match next {
     336 UBC           0 :                 None => bail!("unexpected end of replication stream"),
     337 CBC         141 :                 Some(msg) => msg.context("get replication message")?,
     338                 :             },
     339 UBC           0 :             Err(_) => bail!("no message received within {:?}", no_data_timeout),
     340                 :         };
     341                 : 
     342 CBC         141 :         match msg {
     343             140 :             ReplicationMessage::XLogData(xlog_data) => {
     344             140 :                 let ar_hdr = AppendRequestHeader {
     345             140 :                     term: donor.term,
     346             140 :                     epoch_start_lsn: Lsn::INVALID, // unused
     347             140 :                     begin_lsn: Lsn(xlog_data.wal_start()),
     348             140 :                     end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
     349             140 :                     commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
     350             140 :                     truncate_lsn: Lsn::INVALID, // do not attempt to advance
     351             140 :                     proposer_uuid: [0; 16],
     352             140 :                 };
     353             140 :                 let ar = AppendRequest {
     354             140 :                     h: ar_hdr,
     355             140 :                     wal_data: xlog_data.into_data(),
     356             140 :                 };
     357 UBC           0 :                 trace!(
     358               0 :                     "processing AppendRequest {}-{}, len {}",
     359               0 :                     ar.h.begin_lsn,
     360               0 :                     ar.h.end_lsn,
     361               0 :                     ar.wal_data.len()
     362               0 :                 );
     363 CBC         140 :                 last_received_lsn = ar.h.end_lsn;
     364             140 :                 if msg_tx
     365             140 :                     .send(ProposerAcceptorMessage::AppendRequest(ar))
     366 UBC           0 :                     .await
     367 CBC         140 :                     .is_err()
     368                 :                 {
     369 UBC           0 :                     return Ok(None); // chan closed, WalAcceptor terminated
     370 CBC         140 :                 }
     371                 :             }
     372                 :             ReplicationMessage::PrimaryKeepAlive(_) => {
     373                 :                 // keepalive means nothing is being streamed for a while. Check whether we need to stop.
     374               1 :                 let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
     375                 :                 // do current donors still contain one we currently connected to?
     376               1 :                 if !recovery_needed_info
     377               1 :                     .donors
     378               1 :                     .iter()
     379               1 :                     .any(|d| d.sk_id == donor.sk_id)
     380                 :                 {
     381                 :                     // Most likely it means we are caughtup.
     382                 :                     // note: just exiting makes tokio_postgres send CopyFail to the far end.
     383               1 :                     return Ok(Some(format!(
     384               1 :                         "terminating at {} as connected safekeeper {} with term {} is not a donor anymore: {}",
     385               1 :                         last_received_lsn, donor.sk_id, donor.term, recovery_needed_info
     386               1 :                     )));
     387 UBC           0 :                 }
     388                 :             }
     389               0 :             _ => {}
     390                 :         }
     391                 :         // Send reply to each message to keep connection alive. Ideally we
     392                 :         // should do that once in a while instead, but this again requires
     393                 :         // stream split or similar workaround, and recovery is anyway not that
     394                 :         // performance critical.
     395                 :         //
     396                 :         // We do not know here real write/flush LSNs (need to take mutex again
     397                 :         // or check replies which are read in different future), but neither
     398                 :         // sender much cares about them, so just send last received.
     399 CBC         140 :         physical_stream
     400             140 :             .as_mut()
     401             140 :             .standby_status_update(
     402             140 :                 PgLsn::from(last_received_lsn.0),
     403             140 :                 PgLsn::from(last_received_lsn.0),
     404             140 :                 PgLsn::from(last_received_lsn.0),
     405             140 :                 SystemTime::now(),
     406             140 :                 0,
     407             140 :             )
     408              69 :             .await?;
     409                 :     }
     410               1 : }
     411                 : 
     412                 : // Read replies from WalAcceptor. We are not interested much in sending them to
     413                 : // donor safekeeper, so don't route them anywhere. However, we should check if
     414                 : // term changes and exit if it does.
     415                 : // Returns Ok(()) if channel closed, Err in case of term change.
     416               1 : async fn read_replies(
     417               1 :     mut reply_rx: Receiver<AcceptorProposerMessage>,
     418               1 :     donor_term: Term,
     419               1 : ) -> anyhow::Result<()> {
     420                 :     loop {
     421              73 :         match reply_rx.recv().await {
     422               2 :             Some(msg) => {
     423               2 :                 if let AcceptorProposerMessage::AppendResponse(ar) = msg {
     424               2 :                     if ar.term != donor_term {
     425 UBC           0 :                         bail!("donor term changed from {} to {}", donor_term, ar.term);
     426 CBC           2 :                     }
     427 UBC           0 :                 }
     428                 :             }
     429               0 :             None => return Ok(()), // chan closed, WalAcceptor terminated
     430                 :         }
     431                 :     }
     432               0 : }
        

Generated by: LCOV version 2.1-beta