LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline/walreceiver - walreceiver_connection.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 88.0 % 324 285
Test Date: 2024-02-07 07:37:29 Functions: 54.5 % 44 24

            Line data    Source code
       1              : //! Actual Postgres connection handler to stream WAL to the server.
       2              : 
       3              : use std::{
       4              :     error::Error,
       5              :     pin::pin,
       6              :     str::FromStr,
       7              :     sync::Arc,
       8              :     time::{Duration, SystemTime},
       9              : };
      10              : 
      11              : use anyhow::{anyhow, Context};
      12              : use bytes::BytesMut;
      13              : use chrono::{NaiveDateTime, Utc};
      14              : use fail::fail_point;
      15              : use futures::StreamExt;
      16              : use postgres::{error::SqlState, SimpleQueryMessage, SimpleQueryRow};
      17              : use postgres_ffi::WAL_SEGMENT_SIZE;
      18              : use postgres_ffi::{v14::xlog_utils::normalize_lsn, waldecoder::WalDecodeError};
      19              : use postgres_protocol::message::backend::ReplicationMessage;
      20              : use postgres_types::PgLsn;
      21              : use tokio::{select, sync::watch, time};
      22              : use tokio_postgres::{replication::ReplicationStream, Client};
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::{debug, error, info, trace, warn, Instrument};
      25              : 
      26              : use super::TaskStateUpdate;
      27              : use crate::{
      28              :     context::RequestContext,
      29              :     metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
      30              :     task_mgr,
      31              :     task_mgr::TaskKind,
      32              :     task_mgr::WALRECEIVER_RUNTIME,
      33              :     tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
      34              :     walingest::WalIngest,
      35              :     walrecord::DecodedWALRecord,
      36              : };
      37              : use postgres_backend::is_expected_io_error;
      38              : use postgres_connection::PgConnectionConfig;
      39              : use postgres_ffi::waldecoder::WalStreamDecoder;
      40              : use utils::pageserver_feedback::PageserverFeedback;
      41              : use utils::{id::NodeId, lsn::Lsn};
      42              : 
      43              : /// Status of the connection.
      44       726333 : #[derive(Debug, Clone, Copy)]
      45              : pub(super) struct WalConnectionStatus {
      46              :     /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
      47              :     pub is_connected: bool,
      48              :     /// Defines a healthy connection as one on which pageserver received WAL from safekeeper
      49              :     /// and is able to process it in walingest without errors.
      50              :     pub has_processed_wal: bool,
      51              :     /// Connection establishment time or the timestamp of a latest connection message received.
      52              :     pub latest_connection_update: NaiveDateTime,
      53              :     /// Time of the latest WAL message received.
      54              :     pub latest_wal_update: NaiveDateTime,
      55              :     /// Latest WAL update contained WAL up to this LSN. Next WAL message with start from that LSN.
      56              :     pub streaming_lsn: Option<Lsn>,
      57              :     /// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
      58              :     pub commit_lsn: Option<Lsn>,
      59              :     /// The node it is connected to
      60              :     pub node: NodeId,
      61              : }
      62              : 
      63              : pub(super) enum WalReceiverError {
      64              :     /// An error of a type that does not indicate an issue, e.g. a connection closing
      65              :     ExpectedSafekeeperError(postgres::Error),
      66              :     /// An "error" message that carries a SUCCESSFUL_COMPLETION status code.  Carries
      67              :     /// the message part of the original postgres error
      68              :     SuccessfulCompletion(String),
      69              :     /// Generic error
      70              :     Other(anyhow::Error),
      71              : }
      72              : 
      73              : impl From<tokio_postgres::Error> for WalReceiverError {
      74         1800 :     fn from(err: tokio_postgres::Error) -> Self {
      75         1800 :         if let Some(dberror) = err.as_db_error().filter(|db_error| {
      76           54 :             db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
      77           54 :                 && db_error.message().contains("ending streaming")
      78         1800 :         }) {
      79              :             // Strip the outer DbError, which carries a misleading "error" severity
      80           54 :             Self::SuccessfulCompletion(dberror.message().to_string())
      81         1746 :         } else if err.is_closed()
      82          969 :             || err
      83          969 :                 .source()
      84          969 :                 .and_then(|source| source.downcast_ref::<std::io::Error>())
      85          969 :                 .map(is_expected_io_error)
      86          969 :                 .unwrap_or(false)
      87              :         {
      88         1740 :             Self::ExpectedSafekeeperError(err)
      89              :         } else {
      90            6 :             Self::Other(anyhow::Error::new(err))
      91              :         }
      92         1800 :     }
      93              : }
      94              : 
      95              : impl From<anyhow::Error> for WalReceiverError {
      96           18 :     fn from(err: anyhow::Error) -> Self {
      97           18 :         Self::Other(err)
      98           18 :     }
      99              : }
     100              : 
     101              : impl From<WalDecodeError> for WalReceiverError {
     102            0 :     fn from(err: WalDecodeError) -> Self {
     103            0 :         Self::Other(anyhow::Error::new(err))
     104            0 :     }
     105              : }
     106              : 
     107              : /// Open a connection to the given safekeeper and receive WAL, sending back progress
     108              : /// messages as we go.
     109              : #[allow(clippy::too_many_arguments)]
     110         1702 : pub(super) async fn handle_walreceiver_connection(
     111         1702 :     timeline: Arc<Timeline>,
     112         1702 :     wal_source_connconf: PgConnectionConfig,
     113         1702 :     events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
     114         1702 :     cancellation: CancellationToken,
     115         1702 :     connect_timeout: Duration,
     116         1702 :     ctx: RequestContext,
     117         1702 :     node: NodeId,
     118         1702 :     ingest_batch_size: u64,
     119         1702 : ) -> Result<(), WalReceiverError> {
     120         1702 :     debug_assert_current_span_has_tenant_and_timeline_id();
     121         1702 : 
     122         1702 :     WALRECEIVER_STARTED_CONNECTIONS.inc();
     123              : 
     124              :     // Connect to the database in replication mode.
     125         1702 :     info!("connecting to {wal_source_connconf:?}");
     126              : 
     127          744 :     let (replication_client, connection) = {
     128         1702 :         let mut config = wal_source_connconf.to_tokio_postgres_config();
     129         1702 :         config.application_name("pageserver");
     130         1702 :         config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
     131         2410 :         match time::timeout(connect_timeout, config.connect(postgres::NoTls)).await {
     132         1702 :             Ok(client_and_conn) => client_and_conn?,
     133            0 :             Err(_elapsed) => {
     134              :                 // Timing out to connect to a safekeeper node could happen long time, due to
     135              :                 // many reasons that pageserver cannot control.
     136              :                 // Do not produce an error, but make it visible, that timeouts happen by logging the `event.
     137            0 :                 info!("Timed out while waiting {connect_timeout:?} for walreceiver connection to open");
     138            0 :                 return Ok(());
     139              :             }
     140              :         }
     141              :     };
     142              : 
     143            0 :     debug!("connected!");
     144          744 :     let mut connection_status = WalConnectionStatus {
     145          744 :         is_connected: true,
     146          744 :         has_processed_wal: false,
     147          744 :         latest_connection_update: Utc::now().naive_utc(),
     148          744 :         latest_wal_update: Utc::now().naive_utc(),
     149          744 :         streaming_lsn: None,
     150          744 :         commit_lsn: None,
     151          744 :         node,
     152          744 :     };
     153          744 :     if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     154            0 :         warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
     155            0 :         return Ok(());
     156          744 :     }
     157          744 : 
     158          744 :     // The connection object performs the actual communication with the database,
     159          744 :     // so spawn it off to run on its own.
     160          744 :     let _connection_ctx = ctx.detached_child(
     161          744 :         TaskKind::WalReceiverConnectionPoller,
     162          744 :         ctx.download_behavior(),
     163          744 :     );
     164          744 :     let connection_cancellation = cancellation.clone();
     165          744 :     task_mgr::spawn(
     166          744 :         WALRECEIVER_RUNTIME.handle(),
     167          744 :         TaskKind::WalReceiverConnectionPoller,
     168          744 :         Some(timeline.tenant_shard_id),
     169          744 :         Some(timeline.timeline_id),
     170          744 :         "walreceiver connection",
     171              :         false,
     172          744 :         async move {
     173          744 :             debug_assert_current_span_has_tenant_and_timeline_id();
     174          744 : 
     175      1317479 :             select! {
     176          423 :                 connection_result = connection => match connection_result {
     177            0 :                     Ok(()) => debug!("Walreceiver db connection closed"),
     178              :                     Err(connection_error) => {
     179              :                         match WalReceiverError::from(connection_error) {
     180              :                             WalReceiverError::ExpectedSafekeeperError(_) => {
     181              :                                 // silence, because most likely we've already exited the outer call
     182              :                                 // with a similar error.
     183              :                             },
     184              :                             WalReceiverError::SuccessfulCompletion(_) => {}
     185              :                             WalReceiverError::Other(err) => {
     186            6 :                                 warn!("Connection aborted: {err:#}")
     187              :                             }
     188              :                         }
     189              :                     }
     190              :                 },
     191            0 :                 _ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
     192              :             }
     193          702 :             Ok(())
     194          702 :         }
     195              :         // Enrich the log lines emitted by this closure with meaningful context.
     196              :         // TODO: technically, this task outlives the surrounding function, so, the
     197              :         // spans won't be properly nested.
     198          744 :         .instrument(tracing::info_span!("poller")),
     199              :     );
     200              : 
     201              :     // Immediately increment the gauge, then create a job to decrement it on task exit.
     202              :     // One of the pros of `defer!` is that this will *most probably*
     203              :     // get called, even in presence of panics.
     204          744 :     let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
     205          744 :     gauge.inc();
     206          702 :     scopeguard::defer! {
     207          702 :         gauge.dec();
     208          702 :     }
     209              : 
     210          744 :     let identify = identify_system(&replication_client).await?;
     211          742 :     info!("{identify:?}");
     212              : 
     213          742 :     let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
     214          742 :     let mut caught_up = false;
     215          742 : 
     216          742 :     connection_status.latest_connection_update = Utc::now().naive_utc();
     217          742 :     connection_status.latest_wal_update = Utc::now().naive_utc();
     218          742 :     connection_status.commit_lsn = Some(end_of_wal);
     219          742 :     if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     220            0 :         warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
     221            0 :         return Ok(());
     222          742 :     }
     223          742 : 
     224          742 :     //
     225          742 :     // Start streaming the WAL, from where we left off previously.
     226          742 :     //
     227          742 :     // If we had previously received WAL up to some point in the middle of a WAL record, we
     228          742 :     // better start from the end of last full WAL record, not in the middle of one.
     229          742 :     let mut last_rec_lsn = timeline.get_last_record_lsn();
     230          742 :     let mut startpoint = last_rec_lsn;
     231          742 : 
     232          742 :     if startpoint == Lsn(0) {
     233            0 :         return Err(WalReceiverError::Other(anyhow!("No previous WAL position")));
     234          742 :     }
     235          742 : 
     236          742 :     // There might be some padding after the last full record, skip it.
     237          742 :     startpoint += startpoint.calc_padding(8u32);
     238          742 : 
     239          742 :     // If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
     240          742 :     // for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
     241          742 :     //. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
     242          742 :     // but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
     243          742 :     // header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
     244          742 :     //  to the safekeepers.
     245          742 :     startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
     246              : 
     247          742 :     info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
     248              : 
     249          742 :     let query = format!("START_REPLICATION PHYSICAL {startpoint}");
     250              : 
     251          742 :     let copy_stream = replication_client.copy_both_simple(&query).await?;
     252          740 :     let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
     253          740 : 
     254          740 :     let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
     255              : 
     256          740 :     let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
     257              : 
     258       795925 :     while let Some(replication_message) = {
     259       796171 :         select! {
     260              :             _ = cancellation.cancelled() => {
     261            0 :                 debug!("walreceiver interrupted");
     262              :                 None
     263              :             }
     264       795925 :             replication_message = physical_stream.next() => replication_message,
     265              :         }
     266              :     } {
     267       795925 :         let replication_message = replication_message?;
     268              : 
     269       795474 :         let now = Utc::now().naive_utc();
     270       795474 :         let last_rec_lsn_before_msg = last_rec_lsn;
     271       795474 : 
     272       795474 :         // Update the connection status before processing the message. If the message processing
     273       795474 :         // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
     274       795474 :         match &replication_message {
     275       792330 :             ReplicationMessage::XLogData(xlog_data) => {
     276       792330 :                 connection_status.latest_connection_update = now;
     277       792330 :                 connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
     278       792330 :                 connection_status.streaming_lsn = Some(Lsn::from(
     279       792330 :                     xlog_data.wal_start() + xlog_data.data().len() as u64,
     280       792330 :                 ));
     281       792330 :                 if !xlog_data.data().is_empty() {
     282       792330 :                     connection_status.latest_wal_update = now;
     283       792330 :                 }
     284              :             }
     285         3144 :             ReplicationMessage::PrimaryKeepAlive(keepalive) => {
     286         3144 :                 connection_status.latest_connection_update = now;
     287         3144 :                 connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
     288         3144 :             }
     289            0 :             &_ => {}
     290              :         };
     291       795474 :         if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     292            0 :             warn!("Wal connection event listener dropped, aborting the connection: {e}");
     293            0 :             return Ok(());
     294       795474 :         }
     295              : 
     296       795474 :         let status_update = match replication_message {
     297       792330 :             ReplicationMessage::XLogData(xlog_data) => {
     298       792330 :                 // Pass the WAL data to the decoder, and see if we can decode
     299       792330 :                 // more records as a result.
     300       792330 :                 let data = xlog_data.data();
     301       792330 :                 let startlsn = Lsn::from(xlog_data.wal_start());
     302       792330 :                 let endlsn = startlsn + data.len() as u64;
     303              : 
     304            0 :                 trace!("received XLogData between {startlsn} and {endlsn}");
     305              : 
     306       792330 :                 waldecoder.feed_bytes(data);
     307       792330 : 
     308       792330 :                 {
     309       792330 :                     let mut decoded = DecodedWALRecord::default();
     310       792330 :                     let mut modification = timeline.begin_modification(startlsn);
     311       792330 :                     let mut uncommitted_records = 0;
     312       792330 :                     let mut filtered_records = 0;
     313     73861301 :                     while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     314              :                         // It is important to deal with the aligned records as lsn in getPage@LSN is
     315              :                         // aligned and can be several bytes bigger. Without this alignment we are
     316              :                         // at risk of hitting a deadlock.
     317     73068975 :                         if !lsn.is_aligned() {
     318            0 :                             return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
     319     73068975 :                         }
     320              : 
     321              :                         // Ingest the records without immediately committing them.
     322     73068975 :                         let ingested = walingest
     323     73068975 :                             .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
     324        12952 :                             .await
     325     73068975 :                             .with_context(|| format!("could not ingest record at {lsn}"))?;
     326     73068972 :                         if !ingested {
     327            0 :                             tracing::debug!("ingest: filtered out record @ LSN {lsn}");
     328     66449894 :                             WAL_INGEST.records_filtered.inc();
     329     66449894 :                             filtered_records += 1;
     330      6619078 :                         }
     331              : 
     332            0 :                         fail_point!("walreceiver-after-ingest");
     333              : 
     334     73068972 :                         last_rec_lsn = lsn;
     335     73068972 : 
     336     73068972 :                         // Commit every ingest_batch_size records. Even if we filtered out
     337     73068972 :                         // all records, we still need to call commit to advance the LSN.
     338     73068972 :                         uncommitted_records += 1;
     339     73068972 :                         if uncommitted_records >= ingest_batch_size {
     340       605982 :                             WAL_INGEST
     341       605982 :                                 .records_committed
     342       605982 :                                 .inc_by(uncommitted_records - filtered_records);
     343       605982 :                             modification.commit(&ctx).await?;
     344       605981 :                             uncommitted_records = 0;
     345       605981 :                             filtered_records = 0;
     346       605981 : 
     347       605981 :                             //
     348       605981 :                             // We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise
     349       605981 :                             // layer size can become much larger than `checkpoint_distance`.
     350       605981 :                             // It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large
     351       605981 :                             // amount of data to key-value storage. So performing this check only after processing
     352       605981 :                             // all WAL records in the chunk, can cause huge L0 layer files.
     353       605981 :                             //
     354       605981 :                             timeline
     355       605981 :                                 .check_checkpoint_distance()
     356         7282 :                                 .await
     357       605981 :                                 .with_context(|| {
     358            0 :                                     format!(
     359            0 :                                         "Failed to check checkpoint distance for timeline {}",
     360            0 :                                         timeline.timeline_id
     361            0 :                                     )
     362       605981 :                                 })?;
     363     72462990 :                         }
     364              :                     }
     365              : 
     366              :                     // Commit the remaining records.
     367       792326 :                     if uncommitted_records > 0 {
     368       790918 :                         WAL_INGEST
     369       790918 :                             .records_committed
     370       790918 :                             .inc_by(uncommitted_records - filtered_records);
     371       790918 :                         modification.commit(&ctx).await?;
     372         1408 :                     }
     373              :                 }
     374              : 
     375       792326 :                 if !caught_up && endlsn >= end_of_wal {
     376          611 :                     info!("caught up at LSN {endlsn}");
     377          611 :                     caught_up = true;
     378       791715 :                 }
     379              : 
     380       792326 :                 Some(endlsn)
     381              :             }
     382              : 
     383         3144 :             ReplicationMessage::PrimaryKeepAlive(keepalive) => {
     384         3144 :                 let wal_end = keepalive.wal_end();
     385         3144 :                 let timestamp = keepalive.timestamp();
     386         3144 :                 let reply_requested = keepalive.reply() != 0;
     387              : 
     388            0 :                 trace!("received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})");
     389              : 
     390         3144 :                 if reply_requested {
     391         3144 :                     Some(last_rec_lsn)
     392              :                 } else {
     393            0 :                     None
     394              :                 }
     395              :             }
     396              : 
     397            0 :             _ => None,
     398              :         };
     399              : 
     400       795470 :         if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
     401              :             // We have successfully processed at least one WAL record.
     402          618 :             connection_status.has_processed_wal = true;
     403          618 :             if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     404            0 :                 warn!("Wal connection event listener dropped, aborting the connection: {e}");
     405            0 :                 return Ok(());
     406          618 :             }
     407       794852 :         }
     408              : 
     409       795470 :         timeline
     410       795470 :             .check_checkpoint_distance()
     411         2651 :             .await
     412       795470 :             .with_context(|| {
     413            0 :                 format!(
     414            0 :                     "Failed to check checkpoint distance for timeline {}",
     415            0 :                     timeline.timeline_id
     416            0 :                 )
     417       795470 :             })?;
     418              : 
     419       795470 :         if let Some(last_lsn) = status_update {
     420       795470 :             let timeline_remote_consistent_lsn = timeline
     421       795470 :                 .get_remote_consistent_lsn_visible()
     422       795470 :                 .unwrap_or(Lsn(0));
     423       795470 : 
     424       795470 :             // The last LSN we processed. It is not guaranteed to survive pageserver crash.
     425       795470 :             let last_received_lsn = last_lsn;
     426       795470 :             // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
     427       795470 :             let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     428       795470 :             // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
     429       795470 :             // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
     430       795470 :             let remote_consistent_lsn = timeline_remote_consistent_lsn;
     431       795470 :             let ts = SystemTime::now();
     432       795470 : 
     433       795470 :             // Update the status about what we just received. This is shown in the mgmt API.
     434       795470 :             let last_received_wal = WalReceiverInfo {
     435       795470 :                 wal_source_connconf: wal_source_connconf.clone(),
     436       795470 :                 last_received_msg_lsn: last_lsn,
     437       795470 :                 last_received_msg_ts: ts
     438       795470 :                     .duration_since(SystemTime::UNIX_EPOCH)
     439       795470 :                     .expect("Received message time should be before UNIX EPOCH!")
     440       795470 :                     .as_micros(),
     441       795470 :             };
     442       795470 :             *timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
     443              : 
     444              :             // Send the replication feedback message.
     445              :             // Regular standby_status_update fields are put into this message.
     446       795470 :             let current_timeline_size = if timeline.tenant_shard_id.is_zero() {
     447       707157 :                 timeline
     448       707157 :                     .get_current_logical_size(
     449       707157 :                         crate::tenant::timeline::GetLogicalSizePriority::User,
     450       707157 :                         &ctx,
     451       707157 :                     )
     452       707157 :                     // FIXME: https://github.com/neondatabase/neon/issues/5963
     453       707157 :                     .size_dont_care_about_accuracy()
     454              :             } else {
     455              :                 // Non-zero shards send zero for logical size.  The safekeeper will ignore
     456              :                 // this number.  This is because in a sharded tenant, only shard zero maintains
     457              :                 // accurate logical size.
     458        88313 :                 0
     459              :             };
     460              : 
     461       795470 :             let status_update = PageserverFeedback {
     462       795470 :                 current_timeline_size,
     463       795470 :                 last_received_lsn,
     464       795470 :                 disk_consistent_lsn,
     465       795470 :                 remote_consistent_lsn,
     466       795470 :                 replytime: ts,
     467       795470 :             };
     468              : 
     469            0 :             debug!("neon_status_update {status_update:?}");
     470              : 
     471       795470 :             let mut data = BytesMut::new();
     472       795470 :             status_update.serialize(&mut data);
     473       795470 :             physical_stream
     474       795470 :                 .as_mut()
     475       795470 :                 .zenith_status_update(data.len() as u64, &data)
     476        52716 :                 .await?;
     477            0 :         }
     478              :     }
     479              : 
     480          205 :     Ok(())
     481         1660 : }
     482              : 
     483              : /// Data returned from the postgres `IDENTIFY_SYSTEM` command
     484              : ///
     485              : /// See the [postgres docs] for more details.
     486              : ///
     487              : /// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
     488          742 : #[derive(Debug)]
     489              : // As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
     490              : // unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
     491              : #[allow(dead_code)]
     492              : struct IdentifySystem {
     493              :     systemid: u64,
     494              :     timeline: u32,
     495              :     xlogpos: PgLsn,
     496              :     dbname: Option<String>,
     497              : }
     498              : 
     499              : /// There was a problem parsing the response to
     500              : /// a postgres IDENTIFY_SYSTEM command.
     501            0 : #[derive(Debug, thiserror::Error)]
     502              : #[error("IDENTIFY_SYSTEM parse error")]
     503              : struct IdentifyError;
     504              : 
     505              : /// Run the postgres `IDENTIFY_SYSTEM` command
     506          744 : async fn identify_system(client: &Client) -> anyhow::Result<IdentifySystem> {
     507          744 :     let query_str = "IDENTIFY_SYSTEM";
     508          744 :     let response = client.simple_query(query_str).await?;
     509              : 
     510              :     // get(N) from row, then parse it as some destination type.
     511         2968 :     fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
     512         2968 :     where
     513         2968 :         T: FromStr,
     514         2968 :     {
     515         2968 :         let val = row.get(idx).ok_or(IdentifyError)?;
     516         2226 :         val.parse::<T>().or(Err(IdentifyError))
     517         2968 :     }
     518              : 
     519              :     // extract the row contents into an IdentifySystem struct.
     520              :     // written as a closure so I can use ? for Option here.
     521          742 :     if let Some(SimpleQueryMessage::Row(first_row)) = response.first() {
     522              :         Ok(IdentifySystem {
     523          742 :             systemid: get_parse(first_row, 0)?,
     524          742 :             timeline: get_parse(first_row, 1)?,
     525          742 :             xlogpos: get_parse(first_row, 2)?,
     526          742 :             dbname: get_parse(first_row, 3).ok(),
     527              :         })
     528              :     } else {
     529            0 :         Err(IdentifyError.into())
     530              :     }
     531          744 : }
        

Generated by: LCOV version 2.1-beta