LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline/walreceiver - walreceiver_connection.rs (source / functions) Coverage Total Hit
Test: 5e392a02abbad1ab595f4dba672e219a49f7f539.info Lines: 0.0 % 515 0
Test Date: 2025-04-11 22:43:24 Functions: 0.0 % 24 0

            Line data    Source code
       1              : //! Actual Postgres connection handler to stream WAL to the server.
       2              : 
       3              : use std::error::Error;
       4              : use std::pin::pin;
       5              : use std::str::FromStr;
       6              : use std::sync::Arc;
       7              : use std::time::{Duration, SystemTime};
       8              : 
       9              : use anyhow::{Context, anyhow};
      10              : use bytes::BytesMut;
      11              : use chrono::{NaiveDateTime, Utc};
      12              : use fail::fail_point;
      13              : use futures::StreamExt;
      14              : use postgres_backend::is_expected_io_error;
      15              : use postgres_connection::PgConnectionConfig;
      16              : use postgres_ffi::WAL_SEGMENT_SIZE;
      17              : use postgres_ffi::v14::xlog_utils::normalize_lsn;
      18              : use postgres_ffi::waldecoder::{WalDecodeError, WalStreamDecoder};
      19              : use postgres_protocol::message::backend::ReplicationMessage;
      20              : use postgres_types::PgLsn;
      21              : use tokio::sync::watch;
      22              : use tokio::{select, time};
      23              : use tokio_postgres::error::SqlState;
      24              : use tokio_postgres::replication::ReplicationStream;
      25              : use tokio_postgres::{Client, SimpleQueryMessage, SimpleQueryRow};
      26              : use tokio_util::sync::CancellationToken;
      27              : use tracing::{Instrument, debug, error, info, trace, warn};
      28              : use utils::critical;
      29              : use utils::id::NodeId;
      30              : use utils::lsn::Lsn;
      31              : use utils::pageserver_feedback::PageserverFeedback;
      32              : use utils::postgres_client::PostgresClientProtocol;
      33              : use utils::sync::gate::GateError;
      34              : use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords};
      35              : use wal_decoder::wire_format::FromWireFormat;
      36              : 
      37              : use super::TaskStateUpdate;
      38              : use crate::context::RequestContext;
      39              : use crate::metrics::{LIVE_CONNECTIONS, WAL_INGEST, WALRECEIVER_STARTED_CONNECTIONS};
      40              : use crate::pgdatadir_mapping::DatadirModification;
      41              : use crate::task_mgr::{TaskKind, WALRECEIVER_RUNTIME};
      42              : use crate::tenant::{
      43              :     Timeline, WalReceiverInfo, debug_assert_current_span_has_tenant_and_timeline_id,
      44              : };
      45              : use crate::walingest::WalIngest;
      46              : 
      47              : /// Status of the connection.
      48              : #[derive(Debug, Clone, Copy)]
      49              : pub(super) struct WalConnectionStatus {
      50              :     /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
      51              :     pub is_connected: bool,
      52              :     /// Defines a healthy connection as one on which pageserver received WAL from safekeeper
      53              :     /// and is able to process it in walingest without errors.
      54              :     pub has_processed_wal: bool,
      55              :     /// Connection establishment time or the timestamp of a latest connection message received.
      56              :     pub latest_connection_update: NaiveDateTime,
      57              :     /// Time of the latest WAL message received.
      58              :     pub latest_wal_update: NaiveDateTime,
      59              :     /// Latest WAL update contained WAL up to this LSN. Next WAL message with start from that LSN.
      60              :     pub streaming_lsn: Option<Lsn>,
      61              :     /// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
      62              :     pub commit_lsn: Option<Lsn>,
      63              :     /// The node it is connected to
      64              :     pub node: NodeId,
      65              : }
      66              : 
      67              : pub(super) enum WalReceiverError {
      68              :     /// An error of a type that does not indicate an issue, e.g. a connection closing
      69              :     ExpectedSafekeeperError(tokio_postgres::Error),
      70              :     /// An "error" message that carries a SUCCESSFUL_COMPLETION status code.  Carries
      71              :     /// the message part of the original postgres error
      72              :     SuccessfulCompletion(String),
      73              :     /// Generic error
      74              :     Other(anyhow::Error),
      75              :     ClosedGate,
      76              :     Cancelled,
      77              : }
      78              : 
      79              : impl From<tokio_postgres::Error> for WalReceiverError {
      80            0 :     fn from(err: tokio_postgres::Error) -> Self {
      81            0 :         if let Some(dberror) = err.as_db_error().filter(|db_error| {
      82            0 :             db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
      83            0 :                 && db_error.message().contains("ending streaming")
      84            0 :         }) {
      85              :             // Strip the outer DbError, which carries a misleading "error" severity
      86            0 :             Self::SuccessfulCompletion(dberror.message().to_string())
      87            0 :         } else if err.is_closed()
      88            0 :             || err
      89            0 :                 .source()
      90            0 :                 .and_then(|source| source.downcast_ref::<std::io::Error>())
      91            0 :                 .map(is_expected_io_error)
      92            0 :                 .unwrap_or(false)
      93              :         {
      94            0 :             Self::ExpectedSafekeeperError(err)
      95              :         } else {
      96            0 :             Self::Other(anyhow::Error::new(err))
      97              :         }
      98            0 :     }
      99              : }
     100              : 
     101              : impl From<anyhow::Error> for WalReceiverError {
     102            0 :     fn from(err: anyhow::Error) -> Self {
     103            0 :         Self::Other(err)
     104            0 :     }
     105              : }
     106              : 
     107              : impl From<WalDecodeError> for WalReceiverError {
     108            0 :     fn from(err: WalDecodeError) -> Self {
     109            0 :         Self::Other(anyhow::Error::new(err))
     110            0 :     }
     111              : }
     112              : 
     113              : /// Open a connection to the given safekeeper and receive WAL, sending back progress
     114              : /// messages as we go.
     115              : #[allow(clippy::too_many_arguments)]
     116            0 : pub(super) async fn handle_walreceiver_connection(
     117            0 :     timeline: Arc<Timeline>,
     118            0 :     protocol: PostgresClientProtocol,
     119            0 :     wal_source_connconf: PgConnectionConfig,
     120            0 :     events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
     121            0 :     cancellation: CancellationToken,
     122            0 :     connect_timeout: Duration,
     123            0 :     ctx: RequestContext,
     124            0 :     safekeeper_node: NodeId,
     125            0 :     ingest_batch_size: u64,
     126            0 :     validate_wal_contiguity: bool,
     127            0 : ) -> Result<(), WalReceiverError> {
     128            0 :     debug_assert_current_span_has_tenant_and_timeline_id();
     129              : 
     130              :     // prevent timeline shutdown from finishing until we have exited
     131            0 :     let _guard = timeline.gate.enter().map_err(|e| match e {
     132            0 :         GateError::GateClosed => WalReceiverError::ClosedGate,
     133            0 :     })?;
     134              :     // This function spawns a side-car task (WalReceiverConnectionPoller).
     135              :     // Get its gate guard now as well.
     136            0 :     let poller_guard = timeline.gate.enter().map_err(|e| match e {
     137            0 :         GateError::GateClosed => WalReceiverError::ClosedGate,
     138            0 :     })?;
     139              : 
     140            0 :     WALRECEIVER_STARTED_CONNECTIONS.inc();
     141            0 : 
     142            0 :     // Connect to the database in replication mode.
     143            0 :     info!("connecting to {wal_source_connconf:?}");
     144              : 
     145            0 :     let (replication_client, connection) = {
     146            0 :         let mut config = wal_source_connconf.to_tokio_postgres_config();
     147            0 :         config.application_name(format!("pageserver-{}", timeline.conf.id.0).as_str());
     148            0 :         config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
     149            0 :         match time::timeout(connect_timeout, config.connect(tokio_postgres::NoTls)).await {
     150            0 :             Ok(client_and_conn) => client_and_conn?,
     151            0 :             Err(_elapsed) => {
     152            0 :                 // Timing out to connect to a safekeeper node could happen long time, due to
     153            0 :                 // many reasons that pageserver cannot control.
     154            0 :                 // Do not produce an error, but make it visible, that timeouts happen by logging the `event.
     155            0 :                 info!(
     156            0 :                     "Timed out while waiting {connect_timeout:?} for walreceiver connection to open"
     157              :                 );
     158            0 :                 return Ok(());
     159              :             }
     160              :         }
     161              :     };
     162              : 
     163            0 :     debug!("connected!");
     164            0 :     let mut connection_status = WalConnectionStatus {
     165            0 :         is_connected: true,
     166            0 :         has_processed_wal: false,
     167            0 :         latest_connection_update: Utc::now().naive_utc(),
     168            0 :         latest_wal_update: Utc::now().naive_utc(),
     169            0 :         streaming_lsn: None,
     170            0 :         commit_lsn: None,
     171            0 :         node: safekeeper_node,
     172            0 :     };
     173            0 :     if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     174            0 :         warn!(
     175            0 :             "Wal connection event listener dropped right after connection init, aborting the connection: {e}"
     176              :         );
     177            0 :         return Ok(());
     178            0 :     }
     179            0 : 
     180            0 :     // The connection object performs the actual communication with the database,
     181            0 :     // so spawn it off to run on its own. It shouldn't outlive this function, but,
     182            0 :     // due to lack of async drop, we can't enforce that. However, we ensure that
     183            0 :     // 1. it is sensitive to `cancellation` and
     184            0 :     // 2. holds the Timeline gate open so that after timeline shutdown,
     185            0 :     //    we know this task is gone.
     186            0 :     let _connection_ctx = ctx.detached_child(
     187            0 :         TaskKind::WalReceiverConnectionPoller,
     188            0 :         ctx.download_behavior(),
     189            0 :     );
     190            0 :     let connection_cancellation = cancellation.clone();
     191            0 :     WALRECEIVER_RUNTIME.spawn(
     192            0 :         async move {
     193            0 :             debug_assert_current_span_has_tenant_and_timeline_id();
     194            0 :             select! {
     195            0 :                 connection_result = connection => match connection_result {
     196            0 :                     Ok(()) => debug!("Walreceiver db connection closed"),
     197            0 :                     Err(connection_error) => {
     198            0 :                         match WalReceiverError::from(connection_error) {
     199            0 :                             WalReceiverError::ExpectedSafekeeperError(_) => {
     200            0 :                                 // silence, because most likely we've already exited the outer call
     201            0 :                                 // with a similar error.
     202            0 :                             },
     203            0 :                             WalReceiverError::SuccessfulCompletion(_) => {}
     204              :                             WalReceiverError::Cancelled => {
     205            0 :                                 debug!("Connection cancelled")
     206              :                             }
     207            0 :                             WalReceiverError::ClosedGate => {
     208            0 :                                 // doesn't happen at runtime
     209            0 :                             }
     210            0 :                             WalReceiverError::Other(err) => {
     211            0 :                                 warn!("Connection aborted: {err:#}")
     212              :                             }
     213              :                         }
     214              :                     }
     215              :                 },
     216            0 :                 _ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
     217              :             }
     218            0 :             drop(poller_guard);
     219            0 :         }
     220              :         // Enrich the log lines emitted by this closure with meaningful context.
     221              :         // TODO: technically, this task outlives the surrounding function, so, the
     222              :         // spans won't be properly nested.
     223            0 :         .instrument(tracing::info_span!("poller")),
     224              :     );
     225              : 
     226            0 :     let _guard = LIVE_CONNECTIONS
     227            0 :         .with_label_values(&["wal_receiver"])
     228            0 :         .guard();
     229              : 
     230            0 :     let identify = identify_system(&replication_client).await?;
     231            0 :     info!("{identify:?}");
     232              : 
     233            0 :     let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
     234            0 :     let mut caught_up = false;
     235            0 : 
     236            0 :     connection_status.latest_connection_update = Utc::now().naive_utc();
     237            0 :     connection_status.latest_wal_update = Utc::now().naive_utc();
     238            0 :     connection_status.commit_lsn = Some(end_of_wal);
     239            0 :     if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     240            0 :         warn!(
     241            0 :             "Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}"
     242              :         );
     243            0 :         return Ok(());
     244            0 :     }
     245            0 : 
     246            0 :     //
     247            0 :     // Start streaming the WAL, from where we left off previously.
     248            0 :     //
     249            0 :     // If we had previously received WAL up to some point in the middle of a WAL record, we
     250            0 :     // better start from the end of last full WAL record, not in the middle of one.
     251            0 :     let mut last_rec_lsn = timeline.get_last_record_lsn();
     252            0 :     let mut startpoint = last_rec_lsn;
     253            0 : 
     254            0 :     if startpoint == Lsn(0) {
     255            0 :         return Err(WalReceiverError::Other(anyhow!("No previous WAL position")));
     256            0 :     }
     257            0 : 
     258            0 :     // There might be some padding after the last full record, skip it.
     259            0 :     startpoint += startpoint.calc_padding(8u32);
     260            0 : 
     261            0 :     // If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
     262            0 :     // for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
     263            0 :     //. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
     264            0 :     // but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
     265            0 :     // header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
     266            0 :     //  to the safekeepers.
     267            0 :     startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
     268            0 : 
     269            0 :     info!(
     270            0 :         "last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}..."
     271              :     );
     272              : 
     273            0 :     let query = format!("START_REPLICATION PHYSICAL {startpoint}");
     274              : 
     275            0 :     let copy_stream = replication_client.copy_both_simple(&query).await?;
     276            0 :     let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
     277            0 : 
     278            0 :     let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
     279              : 
     280            0 :     let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
     281            0 :         .await
     282            0 :         .map_err(|e| match e.kind {
     283            0 :             crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
     284            0 :             _ => WalReceiverError::Other(e.into()),
     285            0 :         })?;
     286              : 
     287            0 :     let shard = vec![*timeline.get_shard_identity()];
     288              : 
     289            0 :     let interpreted_proto_config = match protocol {
     290            0 :         PostgresClientProtocol::Vanilla => None,
     291              :         PostgresClientProtocol::Interpreted {
     292            0 :             format,
     293            0 :             compression,
     294            0 :         } => Some((format, compression)),
     295              :     };
     296              : 
     297            0 :     let mut expected_wal_start = startpoint;
     298            0 :     while let Some(replication_message) = {
     299            0 :         select! {
     300            0 :             _ = cancellation.cancelled() => {
     301            0 :                 debug!("walreceiver interrupted");
     302            0 :                 None
     303              :             }
     304            0 :             replication_message = physical_stream.next() => replication_message,
     305              :         }
     306              :     } {
     307            0 :         let replication_message = replication_message?;
     308              : 
     309            0 :         let now = Utc::now().naive_utc();
     310            0 :         let last_rec_lsn_before_msg = last_rec_lsn;
     311            0 : 
     312            0 :         // Update the connection status before processing the message. If the message processing
     313            0 :         // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
     314            0 :         match &replication_message {
     315            0 :             ReplicationMessage::XLogData(xlog_data) => {
     316            0 :                 connection_status.latest_connection_update = now;
     317            0 :                 connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
     318            0 :                 connection_status.streaming_lsn = Some(Lsn::from(
     319            0 :                     xlog_data.wal_start() + xlog_data.data().len() as u64,
     320            0 :                 ));
     321            0 :                 if !xlog_data.data().is_empty() {
     322            0 :                     connection_status.latest_wal_update = now;
     323            0 :                 }
     324              :             }
     325            0 :             ReplicationMessage::PrimaryKeepAlive(keepalive) => {
     326            0 :                 connection_status.latest_connection_update = now;
     327            0 :                 connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
     328            0 :             }
     329            0 :             ReplicationMessage::RawInterpretedWalRecords(raw) => {
     330            0 :                 connection_status.latest_connection_update = now;
     331            0 :                 if !raw.data().is_empty() {
     332            0 :                     connection_status.latest_wal_update = now;
     333            0 :                 }
     334              : 
     335            0 :                 connection_status.commit_lsn = Some(Lsn::from(raw.commit_lsn()));
     336            0 :                 connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn()));
     337              :             }
     338            0 :             &_ => {}
     339              :         };
     340            0 :         if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     341            0 :             warn!("Wal connection event listener dropped, aborting the connection: {e}");
     342            0 :             return Ok(());
     343            0 :         }
     344              : 
     345            0 :         let status_update = match replication_message {
     346            0 :             ReplicationMessage::RawInterpretedWalRecords(raw) => {
     347            0 :                 WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
     348            0 : 
     349            0 :                 let mut uncommitted_records = 0;
     350            0 : 
     351            0 :                 // This is the end LSN of the raw WAL from which the records
     352            0 :                 // were interpreted.
     353            0 :                 let streaming_lsn = Lsn::from(raw.streaming_lsn());
     354            0 : 
     355            0 :                 let (format, compression) = interpreted_proto_config.unwrap();
     356            0 :                 let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression)
     357            0 :                     .await
     358            0 :                     .with_context(|| {
     359            0 :                         anyhow::anyhow!(
     360            0 :                         "Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
     361            0 :                     )
     362            0 :                     })?;
     363              : 
     364              :                 // Guard against WAL gaps. If the start LSN of the PG WAL section
     365              :                 // from which the interpreted records were extracted, doesn't match
     366              :                 // the end of the previous batch (or the starting point for the first batch),
     367              :                 // then kill this WAL receiver connection and start a new one.
     368            0 :                 if validate_wal_contiguity {
     369            0 :                     if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn {
     370            0 :                         match raw_wal_start_lsn.cmp(&expected_wal_start) {
     371              :                             std::cmp::Ordering::Greater => {
     372            0 :                                 let msg = format!(
     373            0 :                                     "Gap in streamed WAL: [{}, {})",
     374            0 :                                     expected_wal_start, raw_wal_start_lsn
     375            0 :                                 );
     376            0 :                                 critical!("{msg}");
     377            0 :                                 return Err(WalReceiverError::Other(anyhow!(msg)));
     378              :                             }
     379              :                             std::cmp::Ordering::Less => {
     380              :                                 // Other shards are reading WAL behind us.
     381              :                                 // This is valid, but check that we received records
     382              :                                 // that we haven't seen before.
     383            0 :                                 if let Some(first_rec) = batch.records.first() {
     384            0 :                                     if first_rec.next_record_lsn < last_rec_lsn {
     385            0 :                                         let msg = format!(
     386            0 :                                             "Received record with next_record_lsn multiple times ({} < {})",
     387            0 :                                             first_rec.next_record_lsn, expected_wal_start
     388            0 :                                         );
     389            0 :                                         critical!("{msg}");
     390            0 :                                         return Err(WalReceiverError::Other(anyhow!(msg)));
     391            0 :                                     }
     392            0 :                                 }
     393              :                             }
     394            0 :                             std::cmp::Ordering::Equal => {}
     395              :                         }
     396            0 :                     }
     397            0 :                 }
     398              : 
     399              :                 let InterpretedWalRecords {
     400            0 :                     records,
     401            0 :                     next_record_lsn,
     402            0 :                     raw_wal_start_lsn: _,
     403            0 :                 } = batch;
     404            0 : 
     405            0 :                 tracing::debug!(
     406            0 :                     "Received WAL up to {} with next_record_lsn={}",
     407              :                     streaming_lsn,
     408              :                     next_record_lsn
     409              :                 );
     410              : 
     411              :                 // We start the modification at 0 because each interpreted record
     412              :                 // advances it to its end LSN. 0 is just an initialization placeholder.
     413            0 :                 let mut modification = timeline.begin_modification(Lsn(0));
     414              : 
     415            0 :                 async fn commit(
     416            0 :                     modification: &mut DatadirModification<'_>,
     417            0 :                     ctx: &RequestContext,
     418            0 :                     uncommitted: &mut u64,
     419            0 :                 ) -> anyhow::Result<()> {
     420            0 :                     let stats = modification.stats();
     421            0 :                     modification.commit(ctx).await?;
     422            0 :                     WAL_INGEST.records_committed.inc_by(*uncommitted);
     423            0 :                     WAL_INGEST.inc_values_committed(&stats);
     424            0 :                     *uncommitted = 0;
     425            0 :                     Ok(())
     426            0 :                 }
     427              : 
     428            0 :                 if !records.is_empty() {
     429            0 :                     timeline
     430            0 :                         .metrics
     431            0 :                         .wal_records_received
     432            0 :                         .inc_by(records.len() as u64);
     433            0 :                 }
     434              : 
     435            0 :                 for interpreted in records {
     436            0 :                     if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
     437            0 :                         && uncommitted_records > 0
     438              :                     {
     439            0 :                         commit(&mut modification, &ctx, &mut uncommitted_records).await?;
     440            0 :                     }
     441              : 
     442            0 :                     let local_next_record_lsn = interpreted.next_record_lsn;
     443            0 : 
     444            0 :                     if interpreted.is_observed() {
     445            0 :                         WAL_INGEST.records_observed.inc();
     446            0 :                     }
     447              : 
     448            0 :                     walingest
     449            0 :                         .ingest_record(interpreted, &mut modification, &ctx)
     450            0 :                         .await
     451            0 :                         .with_context(|| {
     452            0 :                             format!("could not ingest record at {local_next_record_lsn}")
     453            0 :                         })
     454            0 :                         .inspect_err(|err| {
     455            0 :                             // TODO: we can't differentiate cancellation errors with
     456            0 :                             // anyhow::Error, so just ignore it if we're cancelled.
     457            0 :                             if !cancellation.is_cancelled() && !timeline.is_stopping() {
     458            0 :                                 critical!("{err:?}")
     459            0 :                             }
     460            0 :                         })?;
     461              : 
     462            0 :                     uncommitted_records += 1;
     463            0 : 
     464            0 :                     // FIXME: this cannot be made pausable_failpoint without fixing the
     465            0 :                     // failpoint library; in tests, the added amount of debugging will cause us
     466            0 :                     // to timeout the tests.
     467            0 :                     fail_point!("walreceiver-after-ingest");
     468            0 : 
     469            0 :                     // Commit every ingest_batch_size records. Even if we filtered out
     470            0 :                     // all records, we still need to call commit to advance the LSN.
     471            0 :                     if uncommitted_records >= ingest_batch_size
     472            0 :                         || modification.approx_pending_bytes()
     473            0 :                             > DatadirModification::MAX_PENDING_BYTES
     474              :                     {
     475            0 :                         commit(&mut modification, &ctx, &mut uncommitted_records).await?;
     476            0 :                     }
     477              :                 }
     478              : 
     479              :                 // Records might have been filtered out on the safekeeper side, but we still
     480              :                 // need to advance last record LSN on all shards. If we've not ingested the latest
     481              :                 // record, then set the LSN of the modification past it. This way all shards
     482              :                 // advance their last record LSN at the same time.
     483            0 :                 let needs_last_record_lsn_advance = if next_record_lsn > modification.get_lsn() {
     484            0 :                     modification.set_lsn(next_record_lsn).unwrap();
     485            0 :                     true
     486              :                 } else {
     487            0 :                     false
     488              :                 };
     489              : 
     490            0 :                 if uncommitted_records > 0 || needs_last_record_lsn_advance {
     491              :                     // Commit any uncommitted records
     492            0 :                     commit(&mut modification, &ctx, &mut uncommitted_records).await?;
     493            0 :                 }
     494              : 
     495            0 :                 if !caught_up && streaming_lsn >= end_of_wal {
     496            0 :                     info!("caught up at LSN {streaming_lsn}");
     497            0 :                     caught_up = true;
     498            0 :                 }
     499              : 
     500            0 :                 tracing::debug!(
     501            0 :                     "Ingested WAL up to {streaming_lsn}. Last record LSN is {}",
     502            0 :                     timeline.get_last_record_lsn()
     503              :                 );
     504              : 
     505            0 :                 last_rec_lsn = next_record_lsn;
     506            0 :                 expected_wal_start = streaming_lsn;
     507            0 : 
     508            0 :                 Some(streaming_lsn)
     509              :             }
     510              : 
     511            0 :             ReplicationMessage::XLogData(xlog_data) => {
     512            0 :                 async fn commit(
     513            0 :                     modification: &mut DatadirModification<'_>,
     514            0 :                     uncommitted: &mut u64,
     515            0 :                     filtered: &mut u64,
     516            0 :                     ctx: &RequestContext,
     517            0 :                 ) -> anyhow::Result<()> {
     518            0 :                     let stats = modification.stats();
     519            0 :                     modification.commit(ctx).await?;
     520            0 :                     WAL_INGEST
     521            0 :                         .records_committed
     522            0 :                         .inc_by(*uncommitted - *filtered);
     523            0 :                     WAL_INGEST.inc_values_committed(&stats);
     524            0 :                     *uncommitted = 0;
     525            0 :                     *filtered = 0;
     526            0 :                     Ok(())
     527            0 :                 }
     528              : 
     529              :                 // Pass the WAL data to the decoder, and see if we can decode
     530              :                 // more records as a result.
     531            0 :                 let data = xlog_data.data();
     532            0 :                 let startlsn = Lsn::from(xlog_data.wal_start());
     533            0 :                 let endlsn = startlsn + data.len() as u64;
     534            0 : 
     535            0 :                 trace!("received XLogData between {startlsn} and {endlsn}");
     536              : 
     537            0 :                 WAL_INGEST.bytes_received.inc_by(data.len() as u64);
     538            0 :                 waldecoder.feed_bytes(data);
     539            0 : 
     540            0 :                 {
     541            0 :                     let mut modification = timeline.begin_modification(startlsn);
     542            0 :                     let mut uncommitted_records = 0;
     543            0 :                     let mut filtered_records = 0;
     544              : 
     545            0 :                     while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
     546              :                         // It is important to deal with the aligned records as lsn in getPage@LSN is
     547              :                         // aligned and can be several bytes bigger. Without this alignment we are
     548              :                         // at risk of hitting a deadlock.
     549            0 :                         if !next_record_lsn.is_aligned() {
     550            0 :                             return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
     551            0 :                         }
     552              : 
     553              :                         // Deserialize and interpret WAL record
     554            0 :                         let interpreted = InterpretedWalRecord::from_bytes_filtered(
     555            0 :                             recdata,
     556            0 :                             &shard,
     557            0 :                             next_record_lsn,
     558            0 :                             modification.tline.pg_version,
     559            0 :                         )?
     560            0 :                         .remove(timeline.get_shard_identity())
     561            0 :                         .unwrap();
     562              : 
     563            0 :                         if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
     564            0 :                             && uncommitted_records > 0
     565              :                         {
     566              :                             // Special case: legacy PG database creations operate by reading pages from a 'template' database:
     567              :                             // these are the only kinds of WAL record that require reading data blocks while ingesting.  Ensure
     568              :                             // all earlier writes of data blocks are visible by committing any modification in flight.
     569            0 :                             commit(
     570            0 :                                 &mut modification,
     571            0 :                                 &mut uncommitted_records,
     572            0 :                                 &mut filtered_records,
     573            0 :                                 &ctx,
     574            0 :                             )
     575            0 :                             .await?;
     576            0 :                         }
     577              : 
     578              :                         // Ingest the records without immediately committing them.
     579            0 :                         timeline.metrics.wal_records_received.inc();
     580            0 :                         let ingested = walingest
     581            0 :                             .ingest_record(interpreted, &mut modification, &ctx)
     582            0 :                             .await
     583            0 :                             .with_context(|| {
     584            0 :                                 format!("could not ingest record at {next_record_lsn}")
     585            0 :                             })
     586            0 :                             .inspect_err(|err| {
     587            0 :                                 // TODO: we can't differentiate cancellation errors with
     588            0 :                                 // anyhow::Error, so just ignore it if we're cancelled.
     589            0 :                                 if !cancellation.is_cancelled() && !timeline.is_stopping() {
     590            0 :                                     critical!("{err:?}")
     591            0 :                                 }
     592            0 :                             })?;
     593            0 :                         if !ingested {
     594            0 :                             tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
     595            0 :                             WAL_INGEST.records_filtered.inc();
     596            0 :                             filtered_records += 1;
     597            0 :                         }
     598              : 
     599              :                         // FIXME: this cannot be made pausable_failpoint without fixing the
     600              :                         // failpoint library; in tests, the added amount of debugging will cause us
     601              :                         // to timeout the tests.
     602            0 :                         fail_point!("walreceiver-after-ingest");
     603            0 : 
     604            0 :                         last_rec_lsn = next_record_lsn;
     605            0 : 
     606            0 :                         // Commit every ingest_batch_size records. Even if we filtered out
     607            0 :                         // all records, we still need to call commit to advance the LSN.
     608            0 :                         uncommitted_records += 1;
     609            0 :                         if uncommitted_records >= ingest_batch_size
     610            0 :                             || modification.approx_pending_bytes()
     611            0 :                                 > DatadirModification::MAX_PENDING_BYTES
     612              :                         {
     613            0 :                             commit(
     614            0 :                                 &mut modification,
     615            0 :                                 &mut uncommitted_records,
     616            0 :                                 &mut filtered_records,
     617            0 :                                 &ctx,
     618            0 :                             )
     619            0 :                             .await?;
     620            0 :                         }
     621              :                     }
     622              : 
     623              :                     // Commit the remaining records.
     624            0 :                     if uncommitted_records > 0 {
     625            0 :                         commit(
     626            0 :                             &mut modification,
     627            0 :                             &mut uncommitted_records,
     628            0 :                             &mut filtered_records,
     629            0 :                             &ctx,
     630            0 :                         )
     631            0 :                         .await?;
     632            0 :                     }
     633              :                 }
     634              : 
     635            0 :                 if !caught_up && endlsn >= end_of_wal {
     636            0 :                     info!("caught up at LSN {endlsn}");
     637            0 :                     caught_up = true;
     638            0 :                 }
     639              : 
     640            0 :                 Some(endlsn)
     641              :             }
     642              : 
     643            0 :             ReplicationMessage::PrimaryKeepAlive(keepalive) => {
     644            0 :                 let wal_end = keepalive.wal_end();
     645            0 :                 let timestamp = keepalive.timestamp();
     646            0 :                 let reply_requested = keepalive.reply() != 0;
     647            0 : 
     648            0 :                 trace!(
     649            0 :                     "received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})"
     650              :                 );
     651              : 
     652            0 :                 if reply_requested {
     653            0 :                     Some(last_rec_lsn)
     654              :                 } else {
     655            0 :                     None
     656              :                 }
     657              :             }
     658              : 
     659            0 :             _ => None,
     660              :         };
     661              : 
     662            0 :         if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
     663              :             // We have successfully processed at least one WAL record.
     664            0 :             connection_status.has_processed_wal = true;
     665            0 :             if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     666            0 :                 warn!("Wal connection event listener dropped, aborting the connection: {e}");
     667            0 :                 return Ok(());
     668            0 :             }
     669            0 :         }
     670              : 
     671            0 :         if let Some(last_lsn) = status_update {
     672            0 :             let timeline_remote_consistent_lsn = timeline
     673            0 :                 .get_remote_consistent_lsn_visible()
     674            0 :                 .unwrap_or(Lsn(0));
     675            0 : 
     676            0 :             // The last LSN we processed. It is not guaranteed to survive pageserver crash.
     677            0 :             let last_received_lsn = last_lsn;
     678            0 :             // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
     679            0 :             let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     680            0 :             // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
     681            0 :             // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
     682            0 :             let remote_consistent_lsn = timeline_remote_consistent_lsn;
     683            0 :             let ts = SystemTime::now();
     684            0 : 
     685            0 :             // Update the status about what we just received. This is shown in the mgmt API.
     686            0 :             let last_received_wal = WalReceiverInfo {
     687            0 :                 wal_source_connconf: wal_source_connconf.clone(),
     688            0 :                 last_received_msg_lsn: last_lsn,
     689            0 :                 last_received_msg_ts: ts
     690            0 :                     .duration_since(SystemTime::UNIX_EPOCH)
     691            0 :                     .expect("Received message time should be before UNIX EPOCH!")
     692            0 :                     .as_micros(),
     693            0 :             };
     694            0 :             *timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
     695              : 
     696              :             // Send the replication feedback message.
     697              :             // Regular standby_status_update fields are put into this message.
     698            0 :             let current_timeline_size = if timeline.tenant_shard_id.is_shard_zero() {
     699            0 :                 timeline
     700            0 :                     .get_current_logical_size(
     701            0 :                         crate::tenant::timeline::GetLogicalSizePriority::User,
     702            0 :                         &ctx,
     703            0 :                     )
     704            0 :                     // FIXME: https://github.com/neondatabase/neon/issues/5963
     705            0 :                     .size_dont_care_about_accuracy()
     706              :             } else {
     707              :                 // Non-zero shards send zero for logical size.  The safekeeper will ignore
     708              :                 // this number.  This is because in a sharded tenant, only shard zero maintains
     709              :                 // accurate logical size.
     710            0 :                 0
     711              :             };
     712              : 
     713            0 :             let status_update = PageserverFeedback {
     714            0 :                 current_timeline_size,
     715            0 :                 last_received_lsn,
     716            0 :                 disk_consistent_lsn,
     717            0 :                 remote_consistent_lsn,
     718            0 :                 replytime: ts,
     719            0 :                 shard_number: timeline.tenant_shard_id.shard_number.0 as u32,
     720            0 :             };
     721            0 : 
     722            0 :             debug!("neon_status_update {status_update:?}");
     723              : 
     724            0 :             let mut data = BytesMut::new();
     725            0 :             status_update.serialize(&mut data);
     726            0 :             physical_stream
     727            0 :                 .as_mut()
     728            0 :                 .zenith_status_update(data.len() as u64, &data)
     729            0 :                 .await?;
     730            0 :         }
     731              :     }
     732              : 
     733            0 :     Ok(())
     734            0 : }
     735              : 
     736              : /// Data returned from the postgres `IDENTIFY_SYSTEM` command
     737              : ///
     738              : /// See the [postgres docs] for more details.
     739              : ///
     740              : /// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
     741              : #[derive(Debug)]
     742              : // As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
     743              : // unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
     744              : #[allow(dead_code)]
     745              : struct IdentifySystem {
     746              :     systemid: u64,
     747              :     timeline: u32,
     748              :     xlogpos: PgLsn,
     749              :     dbname: Option<String>,
     750              : }
     751              : 
     752              : /// There was a problem parsing the response to
     753              : /// a postgres IDENTIFY_SYSTEM command.
     754              : #[derive(Debug, thiserror::Error)]
     755              : #[error("IDENTIFY_SYSTEM parse error")]
     756              : struct IdentifyError;
     757              : 
     758              : /// Run the postgres `IDENTIFY_SYSTEM` command
     759            0 : async fn identify_system(client: &Client) -> anyhow::Result<IdentifySystem> {
     760            0 :     let query_str = "IDENTIFY_SYSTEM";
     761            0 :     let response = client.simple_query(query_str).await?;
     762              : 
     763              :     // get(N) from row, then parse it as some destination type.
     764            0 :     fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
     765            0 :     where
     766            0 :         T: FromStr,
     767            0 :     {
     768            0 :         let val = row.get(idx).ok_or(IdentifyError)?;
     769            0 :         val.parse::<T>().or(Err(IdentifyError))
     770            0 :     }
     771              : 
     772              :     // extract the row contents into an IdentifySystem struct.
     773              :     // written as a closure so I can use ? for Option here.
     774            0 :     if let Some(SimpleQueryMessage::Row(first_row)) = response.first() {
     775              :         Ok(IdentifySystem {
     776            0 :             systemid: get_parse(first_row, 0)?,
     777            0 :             timeline: get_parse(first_row, 1)?,
     778            0 :             xlogpos: get_parse(first_row, 2)?,
     779            0 :             dbname: get_parse(first_row, 3).ok(),
     780              :         })
     781              :     } else {
     782            0 :         Err(IdentifyError.into())
     783              :     }
     784            0 : }
        

Generated by: LCOV version 2.1-beta