LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/timeline/walreceiver - walreceiver_connection.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 88.6 % 308 273 35 273
Current Date: 2024-01-09 02:06:09 Functions: 55.8 % 43 24 19 24
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Actual Postgres connection handler to stream WAL to the server.
       2                 : 
       3                 : use std::{
       4                 :     error::Error,
       5                 :     pin::pin,
       6                 :     str::FromStr,
       7                 :     sync::Arc,
       8                 :     time::{Duration, SystemTime},
       9                 : };
      10                 : 
      11                 : use anyhow::{anyhow, Context};
      12                 : use bytes::BytesMut;
      13                 : use chrono::{NaiveDateTime, Utc};
      14                 : use fail::fail_point;
      15                 : use futures::StreamExt;
      16                 : use postgres::{error::SqlState, SimpleQueryMessage, SimpleQueryRow};
      17                 : use postgres_ffi::WAL_SEGMENT_SIZE;
      18                 : use postgres_ffi::{v14::xlog_utils::normalize_lsn, waldecoder::WalDecodeError};
      19                 : use postgres_protocol::message::backend::ReplicationMessage;
      20                 : use postgres_types::PgLsn;
      21                 : use tokio::{select, sync::watch, time};
      22                 : use tokio_postgres::{replication::ReplicationStream, Client};
      23                 : use tokio_util::sync::CancellationToken;
      24                 : use tracing::{debug, error, info, trace, warn, Instrument};
      25                 : 
      26                 : use super::TaskStateUpdate;
      27                 : use crate::{
      28                 :     context::RequestContext,
      29                 :     metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
      30                 :     task_mgr,
      31                 :     task_mgr::TaskKind,
      32                 :     task_mgr::WALRECEIVER_RUNTIME,
      33                 :     tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
      34                 :     walingest::WalIngest,
      35                 :     walrecord::DecodedWALRecord,
      36                 : };
      37                 : use postgres_backend::is_expected_io_error;
      38                 : use postgres_connection::PgConnectionConfig;
      39                 : use postgres_ffi::waldecoder::WalStreamDecoder;
      40                 : use utils::pageserver_feedback::PageserverFeedback;
      41                 : use utils::{id::NodeId, lsn::Lsn};
      42                 : 
      43                 : /// Status of the connection.
      44 CBC      515293 : #[derive(Debug, Clone, Copy)]
      45                 : pub(super) struct WalConnectionStatus {
      46                 :     /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
      47                 :     pub is_connected: bool,
      48                 :     /// Defines a healthy connection as one on which pageserver received WAL from safekeeper
      49                 :     /// and is able to process it in walingest without errors.
      50                 :     pub has_processed_wal: bool,
      51                 :     /// Connection establishment time or the timestamp of a latest connection message received.
      52                 :     pub latest_connection_update: NaiveDateTime,
      53                 :     /// Time of the latest WAL message received.
      54                 :     pub latest_wal_update: NaiveDateTime,
      55                 :     /// Latest WAL update contained WAL up to this LSN. Next WAL message with start from that LSN.
      56                 :     pub streaming_lsn: Option<Lsn>,
      57                 :     /// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
      58                 :     pub commit_lsn: Option<Lsn>,
      59                 :     /// The node it is connected to
      60                 :     pub node: NodeId,
      61                 : }
      62                 : 
      63                 : pub(super) enum WalReceiverError {
      64                 :     /// An error of a type that does not indicate an issue, e.g. a connection closing
      65                 :     ExpectedSafekeeperError(postgres::Error),
      66                 :     /// An "error" message that carries a SUCCESSFUL_COMPLETION status code.  Carries
      67                 :     /// the message part of the original postgres error
      68                 :     SuccessfulCompletion(String),
      69                 :     /// Generic error
      70                 :     Other(anyhow::Error),
      71                 : }
      72                 : 
      73                 : impl From<tokio_postgres::Error> for WalReceiverError {
      74            1570 :     fn from(err: tokio_postgres::Error) -> Self {
      75            1570 :         if let Some(dberror) = err.as_db_error().filter(|db_error| {
      76              48 :             db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
      77              48 :                 && db_error.message().contains("ending streaming")
      78            1570 :         }) {
      79                 :             // Strip the outer DbError, which carries a misleading "error" severity
      80              48 :             Self::SuccessfulCompletion(dberror.message().to_string())
      81            1522 :         } else if err.is_closed()
      82             823 :             || err
      83             823 :                 .source()
      84             823 :                 .and_then(|source| source.downcast_ref::<std::io::Error>())
      85             823 :                 .map(is_expected_io_error)
      86             823 :                 .unwrap_or(false)
      87                 :         {
      88            1518 :             Self::ExpectedSafekeeperError(err)
      89                 :         } else {
      90               4 :             Self::Other(anyhow::Error::new(err))
      91                 :         }
      92            1570 :     }
      93                 : }
      94                 : 
      95                 : impl From<anyhow::Error> for WalReceiverError {
      96              15 :     fn from(err: anyhow::Error) -> Self {
      97              15 :         Self::Other(err)
      98              15 :     }
      99                 : }
     100                 : 
     101                 : impl From<WalDecodeError> for WalReceiverError {
     102 UBC           0 :     fn from(err: WalDecodeError) -> Self {
     103               0 :         Self::Other(anyhow::Error::new(err))
     104               0 :     }
     105                 : }
     106                 : 
     107                 : /// Open a connection to the given safekeeper and receive WAL, sending back progress
     108                 : /// messages as we go.
     109                 : #[allow(clippy::too_many_arguments)]
     110 CBC        1529 : pub(super) async fn handle_walreceiver_connection(
     111            1529 :     timeline: Arc<Timeline>,
     112            1529 :     wal_source_connconf: PgConnectionConfig,
     113            1529 :     events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
     114            1529 :     cancellation: CancellationToken,
     115            1529 :     connect_timeout: Duration,
     116            1529 :     ctx: RequestContext,
     117            1529 :     node: NodeId,
     118            1529 :     ingest_batch_size: u64,
     119            1529 : ) -> Result<(), WalReceiverError> {
     120            1529 :     debug_assert_current_span_has_tenant_and_timeline_id();
     121            1529 : 
     122            1529 :     WALRECEIVER_STARTED_CONNECTIONS.inc();
     123                 : 
     124                 :     // Connect to the database in replication mode.
     125            1529 :     info!("connecting to {wal_source_connconf:?}");
     126                 : 
     127             714 :     let (replication_client, connection) = {
     128            1529 :         let mut config = wal_source_connconf.to_tokio_postgres_config();
     129            1529 :         config.application_name("pageserver");
     130            1529 :         config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
     131            2221 :         match time::timeout(connect_timeout, config.connect(postgres::NoTls)).await {
     132            1529 :             Ok(client_and_conn) => client_and_conn?,
     133 UBC           0 :             Err(_elapsed) => {
     134                 :                 // Timing out to connect to a safekeeper node could happen long time, due to
     135                 :                 // many reasons that pageserver cannot control.
     136                 :                 // Do not produce an error, but make it visible, that timeouts happen by logging the `event.
     137               0 :                 info!("Timed out while waiting {connect_timeout:?} for walreceiver connection to open");
     138               0 :                 return Ok(());
     139                 :             }
     140                 :         }
     141                 :     };
     142                 : 
     143               0 :     debug!("connected!");
     144 CBC         714 :     let mut connection_status = WalConnectionStatus {
     145             714 :         is_connected: true,
     146             714 :         has_processed_wal: false,
     147             714 :         latest_connection_update: Utc::now().naive_utc(),
     148             714 :         latest_wal_update: Utc::now().naive_utc(),
     149             714 :         streaming_lsn: None,
     150             714 :         commit_lsn: None,
     151             714 :         node,
     152             714 :     };
     153             714 :     if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     154 UBC           0 :         warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
     155               0 :         return Ok(());
     156 CBC         714 :     }
     157             714 : 
     158             714 :     // The connection object performs the actual communication with the database,
     159             714 :     // so spawn it off to run on its own.
     160             714 :     let _connection_ctx = ctx.detached_child(
     161             714 :         TaskKind::WalReceiverConnectionPoller,
     162             714 :         ctx.download_behavior(),
     163             714 :     );
     164             714 :     let connection_cancellation = cancellation.clone();
     165             714 :     task_mgr::spawn(
     166             714 :         WALRECEIVER_RUNTIME.handle(),
     167             714 :         TaskKind::WalReceiverConnectionPoller,
     168             714 :         Some(timeline.tenant_shard_id),
     169             714 :         Some(timeline.timeline_id),
     170             714 :         "walreceiver connection",
     171                 :         false,
     172             714 :         async move {
     173             714 :             debug_assert_current_span_has_tenant_and_timeline_id();
     174             714 : 
     175          916556 :             select! {
     176             368 :                 connection_result = connection => match connection_result {
     177 UBC           0 :                     Ok(()) => debug!("Walreceiver db connection closed"),
     178                 :                     Err(connection_error) => {
     179                 :                         match WalReceiverError::from(connection_error) {
     180                 :                             WalReceiverError::ExpectedSafekeeperError(_) => {
     181                 :                                 // silence, because most likely we've already exited the outer call
     182                 :                                 // with a similar error.
     183                 :                             },
     184                 :                             WalReceiverError::SuccessfulCompletion(_) => {}
     185                 :                             WalReceiverError::Other(err) => {
     186 CBC           4 :                                 warn!("Connection aborted: {err:#}")
     187                 :                             }
     188                 :                         }
     189                 :                     }
     190                 :                 },
     191 UBC           0 :                 _ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
     192                 :             }
     193 CBC         661 :             Ok(())
     194             661 :         }
     195                 :         // Enrich the log lines emitted by this closure with meaningful context.
     196                 :         // TODO: technically, this task outlives the surrounding function, so, the
     197                 :         // spans won't be properly nested.
     198             714 :         .instrument(tracing::info_span!("poller")),
     199                 :     );
     200                 : 
     201                 :     // Immediately increment the gauge, then create a job to decrement it on task exit.
     202                 :     // One of the pros of `defer!` is that this will *most probably*
     203                 :     // get called, even in presence of panics.
     204             714 :     let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
     205             714 :     gauge.inc();
     206             661 :     scopeguard::defer! {
     207             661 :         gauge.dec();
     208             661 :     }
     209                 : 
     210             714 :     let identify = identify_system(&replication_client).await?;
     211             713 :     info!("{identify:?}");
     212                 : 
     213             713 :     let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
     214             713 :     let mut caught_up = false;
     215             713 : 
     216             713 :     connection_status.latest_connection_update = Utc::now().naive_utc();
     217             713 :     connection_status.latest_wal_update = Utc::now().naive_utc();
     218             713 :     connection_status.commit_lsn = Some(end_of_wal);
     219             713 :     if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     220 UBC           0 :         warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
     221               0 :         return Ok(());
     222 CBC         713 :     }
     223             713 : 
     224             713 :     //
     225             713 :     // Start streaming the WAL, from where we left off previously.
     226             713 :     //
     227             713 :     // If we had previously received WAL up to some point in the middle of a WAL record, we
     228             713 :     // better start from the end of last full WAL record, not in the middle of one.
     229             713 :     let mut last_rec_lsn = timeline.get_last_record_lsn();
     230             713 :     let mut startpoint = last_rec_lsn;
     231             713 : 
     232             713 :     if startpoint == Lsn(0) {
     233 UBC           0 :         return Err(WalReceiverError::Other(anyhow!("No previous WAL position")));
     234 CBC         713 :     }
     235             713 : 
     236             713 :     // There might be some padding after the last full record, skip it.
     237             713 :     startpoint += startpoint.calc_padding(8u32);
     238             713 : 
     239             713 :     // If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
     240             713 :     // for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
     241             713 :     //. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
     242             713 :     // but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
     243             713 :     // header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
     244             713 :     //  to the safekeepers.
     245             713 :     startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
     246                 : 
     247             713 :     info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
     248                 : 
     249             713 :     let query = format!("START_REPLICATION PHYSICAL {startpoint}");
     250                 : 
     251             713 :     let copy_stream = replication_client.copy_both_simple(&query).await?;
     252             710 :     let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
     253             710 : 
     254             710 :     let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
     255                 : 
     256             710 :     let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
     257                 : 
     258          566275 :     while let Some(replication_message) = {
     259          566535 :         select! {
     260                 :             _ = cancellation.cancelled() => {
     261 UBC           0 :                 debug!("walreceiver interrupted");
     262                 :                 None
     263                 :             }
     264 CBC      566275 :             replication_message = physical_stream.next() => replication_message,
     265                 :         }
     266                 :     } {
     267          566275 :         let replication_message = replication_message?;
     268                 : 
     269          565877 :         let now = Utc::now().naive_utc();
     270          565877 :         let last_rec_lsn_before_msg = last_rec_lsn;
     271          565877 : 
     272          565877 :         // Update the connection status before processing the message. If the message processing
     273          565877 :         // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
     274          565877 :         match &replication_message {
     275          562467 :             ReplicationMessage::XLogData(xlog_data) => {
     276          562467 :                 connection_status.latest_connection_update = now;
     277          562467 :                 connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
     278          562467 :                 connection_status.streaming_lsn = Some(Lsn::from(
     279          562467 :                     xlog_data.wal_start() + xlog_data.data().len() as u64,
     280          562467 :                 ));
     281          562467 :                 if !xlog_data.data().is_empty() {
     282          562467 :                     connection_status.latest_wal_update = now;
     283          562467 :                 }
     284                 :             }
     285            3410 :             ReplicationMessage::PrimaryKeepAlive(keepalive) => {
     286            3410 :                 connection_status.latest_connection_update = now;
     287            3410 :                 connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
     288            3410 :             }
     289 UBC           0 :             &_ => {}
     290                 :         };
     291 CBC      565877 :         if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     292 UBC           0 :             warn!("Wal connection event listener dropped, aborting the connection: {e}");
     293               0 :             return Ok(());
     294 CBC      565877 :         }
     295                 : 
     296          565877 :         let status_update = match replication_message {
     297          562467 :             ReplicationMessage::XLogData(xlog_data) => {
     298          562467 :                 // Pass the WAL data to the decoder, and see if we can decode
     299          562467 :                 // more records as a result.
     300          562467 :                 let data = xlog_data.data();
     301          562467 :                 let startlsn = Lsn::from(xlog_data.wal_start());
     302          562467 :                 let endlsn = startlsn + data.len() as u64;
     303                 : 
     304 UBC           0 :                 trace!("received XLogData between {startlsn} and {endlsn}");
     305                 : 
     306 CBC      562467 :                 waldecoder.feed_bytes(data);
     307          562467 : 
     308          562467 :                 {
     309          562467 :                     let mut decoded = DecodedWALRecord::default();
     310          562467 :                     let mut modification = timeline.begin_modification(startlsn);
     311          562467 :                     let mut uncommitted_records = 0;
     312          562467 :                     let mut filtered_records = 0;
     313        47911620 :                     while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     314                 :                         // It is important to deal with the aligned records as lsn in getPage@LSN is
     315                 :                         // aligned and can be several bytes bigger. Without this alignment we are
     316                 :                         // at risk of hitting a deadlock.
     317        47349159 :                         if !lsn.is_aligned() {
     318 UBC           0 :                             return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
     319 CBC    47349159 :                         }
     320                 : 
     321                 :                         // Ingest the records without immediately committing them.
     322        47349159 :                         let ingested = walingest
     323        47349159 :                             .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
     324            8106 :                             .await
     325        47349159 :                             .with_context(|| format!("could not ingest record at {lsn}"))?;
     326        47349154 :                         if !ingested {
     327 UBC           0 :                             tracing::debug!("ingest: filtered out record @ LSN {lsn}");
     328 CBC    42377190 :                             WAL_INGEST.records_filtered.inc();
     329        42377190 :                             filtered_records += 1;
     330         4971964 :                         }
     331                 : 
     332 UBC           0 :                         fail_point!("walreceiver-after-ingest");
     333                 : 
     334 CBC    47349154 :                         last_rec_lsn = lsn;
     335        47349154 : 
     336        47349154 :                         // Commit every ingest_batch_size records. Even if we filtered out
     337        47349154 :                         // all records, we still need to call commit to advance the LSN.
     338        47349154 :                         uncommitted_records += 1;
     339        47349154 :                         if uncommitted_records >= ingest_batch_size {
     340          398786 :                             WAL_INGEST
     341          398786 :                                 .records_committed
     342          398786 :                                 .inc_by(uncommitted_records - filtered_records);
     343          398786 :                             modification.commit(&ctx).await?;
     344          398785 :                             uncommitted_records = 0;
     345          398785 :                             filtered_records = 0;
     346        46950368 :                         }
     347                 :                     }
     348                 : 
     349                 :                     // Commit the remaining records.
     350          562461 :                     if uncommitted_records > 0 {
     351          561895 :                         WAL_INGEST
     352          561895 :                             .records_committed
     353          561895 :                             .inc_by(uncommitted_records - filtered_records);
     354          561895 :                         modification.commit(&ctx).await?;
     355             566 :                     }
     356                 :                 }
     357                 : 
     358          562461 :                 if !caught_up && endlsn >= end_of_wal {
     359             584 :                     info!("caught up at LSN {endlsn}");
     360             584 :                     caught_up = true;
     361          561877 :                 }
     362                 : 
     363          562461 :                 Some(endlsn)
     364                 :             }
     365                 : 
     366            3410 :             ReplicationMessage::PrimaryKeepAlive(keepalive) => {
     367            3410 :                 let wal_end = keepalive.wal_end();
     368            3410 :                 let timestamp = keepalive.timestamp();
     369            3410 :                 let reply_requested = keepalive.reply() != 0;
     370                 : 
     371 UBC           0 :                 trace!("received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})");
     372                 : 
     373 CBC        3410 :                 if reply_requested {
     374            3410 :                     Some(last_rec_lsn)
     375                 :                 } else {
     376 UBC           0 :                     None
     377                 :                 }
     378                 :             }
     379                 : 
     380               0 :             _ => None,
     381                 :         };
     382                 : 
     383 CBC      565871 :         if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
     384                 :             // We have successfully processed at least one WAL record.
     385             601 :             connection_status.has_processed_wal = true;
     386             601 :             if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     387 UBC           0 :                 warn!("Wal connection event listener dropped, aborting the connection: {e}");
     388               0 :                 return Ok(());
     389 CBC         601 :             }
     390          565270 :         }
     391                 : 
     392          565871 :         timeline
     393          565871 :             .check_checkpoint_distance()
     394            1975 :             .await
     395          565871 :             .with_context(|| {
     396 UBC           0 :                 format!(
     397               0 :                     "Failed to check checkpoint distance for timeline {}",
     398               0 :                     timeline.timeline_id
     399               0 :                 )
     400 CBC      565871 :             })?;
     401                 : 
     402          565871 :         if let Some(last_lsn) = status_update {
     403          565871 :             let timeline_remote_consistent_lsn = timeline
     404          565871 :                 .get_remote_consistent_lsn_visible()
     405          565871 :                 .unwrap_or(Lsn(0));
     406          565871 : 
     407          565871 :             // The last LSN we processed. It is not guaranteed to survive pageserver crash.
     408          565871 :             let last_received_lsn = last_lsn;
     409          565871 :             // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
     410          565871 :             let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     411          565871 :             // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
     412          565871 :             // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
     413          565871 :             let remote_consistent_lsn = timeline_remote_consistent_lsn;
     414          565871 :             let ts = SystemTime::now();
     415          565871 : 
     416          565871 :             // Update the status about what we just received. This is shown in the mgmt API.
     417          565871 :             let last_received_wal = WalReceiverInfo {
     418          565871 :                 wal_source_connconf: wal_source_connconf.clone(),
     419          565871 :                 last_received_msg_lsn: last_lsn,
     420          565871 :                 last_received_msg_ts: ts
     421          565871 :                     .duration_since(SystemTime::UNIX_EPOCH)
     422          565871 :                     .expect("Received message time should be before UNIX EPOCH!")
     423          565871 :                     .as_micros(),
     424          565871 :             };
     425          565871 :             *timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
     426          565871 : 
     427          565871 :             // Send the replication feedback message.
     428          565871 :             // Regular standby_status_update fields are put into this message.
     429          565871 :             let current_timeline_size = timeline
     430          565871 :                 .get_current_logical_size(
     431          565871 :                     crate::tenant::timeline::GetLogicalSizePriority::User,
     432          565871 :                     &ctx,
     433          565871 :                 )
     434          565871 :                 // FIXME: https://github.com/neondatabase/neon/issues/5963
     435          565871 :                 .size_dont_care_about_accuracy();
     436          565871 :             let status_update = PageserverFeedback {
     437          565871 :                 current_timeline_size,
     438          565871 :                 last_received_lsn,
     439          565871 :                 disk_consistent_lsn,
     440          565871 :                 remote_consistent_lsn,
     441          565871 :                 replytime: ts,
     442          565871 :             };
     443                 : 
     444 UBC           0 :             debug!("neon_status_update {status_update:?}");
     445                 : 
     446 CBC      565871 :             let mut data = BytesMut::new();
     447          565871 :             status_update.serialize(&mut data);
     448          565871 :             physical_stream
     449          565871 :                 .as_mut()
     450          565871 :                 .zenith_status_update(data.len() as u64, &data)
     451           40824 :                 .await?;
     452 UBC           0 :         }
     453                 :     }
     454                 : 
     455 CBC         208 :     Ok(())
     456            1476 : }
     457                 : 
     458                 : /// Data returned from the postgres `IDENTIFY_SYSTEM` command
     459                 : ///
     460                 : /// See the [postgres docs] for more details.
     461                 : ///
     462                 : /// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
     463             713 : #[derive(Debug)]
     464                 : // As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
     465                 : // unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
     466                 : #[allow(dead_code)]
     467                 : struct IdentifySystem {
     468                 :     systemid: u64,
     469                 :     timeline: u32,
     470                 :     xlogpos: PgLsn,
     471                 :     dbname: Option<String>,
     472                 : }
     473                 : 
     474                 : /// There was a problem parsing the response to
     475                 : /// a postgres IDENTIFY_SYSTEM command.
     476 UBC           0 : #[derive(Debug, thiserror::Error)]
     477                 : #[error("IDENTIFY_SYSTEM parse error")]
     478                 : struct IdentifyError;
     479                 : 
     480                 : /// Run the postgres `IDENTIFY_SYSTEM` command
     481 CBC         714 : async fn identify_system(client: &Client) -> anyhow::Result<IdentifySystem> {
     482             714 :     let query_str = "IDENTIFY_SYSTEM";
     483             714 :     let response = client.simple_query(query_str).await?;
     484                 : 
     485                 :     // get(N) from row, then parse it as some destination type.
     486            2852 :     fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
     487            2852 :     where
     488            2852 :         T: FromStr,
     489            2852 :     {
     490            2852 :         let val = row.get(idx).ok_or(IdentifyError)?;
     491            2139 :         val.parse::<T>().or(Err(IdentifyError))
     492            2852 :     }
     493                 : 
     494                 :     // extract the row contents into an IdentifySystem struct.
     495                 :     // written as a closure so I can use ? for Option here.
     496             713 :     if let Some(SimpleQueryMessage::Row(first_row)) = response.first() {
     497                 :         Ok(IdentifySystem {
     498             713 :             systemid: get_parse(first_row, 0)?,
     499             713 :             timeline: get_parse(first_row, 1)?,
     500             713 :             xlogpos: get_parse(first_row, 2)?,
     501             713 :             dbname: get_parse(first_row, 3).ok(),
     502                 :         })
     503                 :     } else {
     504 UBC           0 :         Err(IdentifyError.into())
     505                 :     }
     506 CBC         714 : }
        

Generated by: LCOV version 2.1-beta