LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/timeline/walreceiver - walreceiver_connection.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 87.6 % 275 241 34 241
Current Date: 2023-10-19 02:04:12 Functions: 54.8 % 42 23 19 23
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Actual Postgres connection handler to stream WAL to the server.
       2                 : 
       3                 : use std::{
       4                 :     error::Error,
       5                 :     pin::pin,
       6                 :     str::FromStr,
       7                 :     sync::Arc,
       8                 :     time::{Duration, SystemTime},
       9                 : };
      10                 : 
      11                 : use anyhow::{anyhow, Context};
      12                 : use bytes::BytesMut;
      13                 : use chrono::{NaiveDateTime, Utc};
      14                 : use fail::fail_point;
      15                 : use futures::StreamExt;
      16                 : use postgres::{error::SqlState, SimpleQueryMessage, SimpleQueryRow};
      17                 : use postgres_ffi::WAL_SEGMENT_SIZE;
      18                 : use postgres_ffi::{v14::xlog_utils::normalize_lsn, waldecoder::WalDecodeError};
      19                 : use postgres_protocol::message::backend::ReplicationMessage;
      20                 : use postgres_types::PgLsn;
      21                 : use tokio::{select, sync::watch, time};
      22                 : use tokio_postgres::{replication::ReplicationStream, Client};
      23                 : use tokio_util::sync::CancellationToken;
      24                 : use tracing::{debug, error, info, trace, warn, Instrument};
      25                 : 
      26                 : use super::TaskStateUpdate;
      27                 : use crate::{
      28                 :     context::RequestContext,
      29                 :     metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS},
      30                 :     task_mgr,
      31                 :     task_mgr::TaskKind,
      32                 :     task_mgr::WALRECEIVER_RUNTIME,
      33                 :     tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
      34                 :     walingest::WalIngest,
      35                 :     walrecord::DecodedWALRecord,
      36                 : };
      37                 : use postgres_backend::is_expected_io_error;
      38                 : use postgres_connection::PgConnectionConfig;
      39                 : use postgres_ffi::waldecoder::WalStreamDecoder;
      40                 : use utils::pageserver_feedback::PageserverFeedback;
      41                 : use utils::{id::NodeId, lsn::Lsn};
      42                 : 
      43                 : /// Status of the connection.
      44 CBC      751946 : #[derive(Debug, Clone, Copy)]
      45                 : pub(super) struct WalConnectionStatus {
      46                 :     /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
      47                 :     pub is_connected: bool,
      48                 :     /// Defines a healthy connection as one on which pageserver received WAL from safekeeper
      49                 :     /// and is able to process it in walingest without errors.
      50                 :     pub has_processed_wal: bool,
      51                 :     /// Connection establishment time or the timestamp of a latest connection message received.
      52                 :     pub latest_connection_update: NaiveDateTime,
      53                 :     /// Time of the latest WAL message received.
      54                 :     pub latest_wal_update: NaiveDateTime,
      55                 :     /// Latest WAL update contained WAL up to this LSN. Next WAL message with start from that LSN.
      56                 :     pub streaming_lsn: Option<Lsn>,
      57                 :     /// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
      58                 :     pub commit_lsn: Option<Lsn>,
      59                 :     /// The node it is connected to
      60                 :     pub node: NodeId,
      61                 : }
      62                 : 
      63                 : pub(super) enum WalReceiverError {
      64                 :     /// An error of a type that does not indicate an issue, e.g. a connection closing
      65                 :     ExpectedSafekeeperError(postgres::Error),
      66                 :     /// An "error" message that carries a SUCCESSFUL_COMPLETION status code.  Carries
      67                 :     /// the message part of the original postgres error
      68                 :     SuccessfulCompletion(String),
      69                 :     /// Generic error
      70                 :     Other(anyhow::Error),
      71                 : }
      72                 : 
      73                 : impl From<tokio_postgres::Error> for WalReceiverError {
      74                 :     fn from(err: tokio_postgres::Error) -> Self {
      75            1266 :         if let Some(dberror) = err.as_db_error().filter(|db_error| {
      76              57 :             db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
      77              57 :                 && db_error.message().contains("ending streaming")
      78            1266 :         }) {
      79                 :             // Strip the outer DbError, which carries a misleading "error" severity
      80              57 :             Self::SuccessfulCompletion(dberror.message().to_string())
      81            1209 :         } else if err.is_closed()
      82             527 :             || err
      83             527 :                 .source()
      84             527 :                 .and_then(|source| source.downcast_ref::<std::io::Error>())
      85             527 :                 .map(is_expected_io_error)
      86             527 :                 .unwrap_or(false)
      87                 :         {
      88            1201 :             Self::ExpectedSafekeeperError(err)
      89                 :         } else {
      90               8 :             Self::Other(anyhow::Error::new(err))
      91                 :         }
      92            1266 :     }
      93                 : }
      94                 : 
      95                 : impl From<anyhow::Error> for WalReceiverError {
      96               9 :     fn from(err: anyhow::Error) -> Self {
      97               9 :         Self::Other(err)
      98               9 :     }
      99                 : }
     100                 : 
     101                 : impl From<WalDecodeError> for WalReceiverError {
     102 UBC           0 :     fn from(err: WalDecodeError) -> Self {
     103               0 :         Self::Other(anyhow::Error::new(err))
     104               0 :     }
     105                 : }
     106                 : 
     107                 : /// Open a connection to the given safekeeper and receive WAL, sending back progress
     108                 : /// messages as we go.
     109 CBC        1189 : pub(super) async fn handle_walreceiver_connection(
     110            1189 :     timeline: Arc<Timeline>,
     111            1189 :     wal_source_connconf: PgConnectionConfig,
     112            1189 :     events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
     113            1189 :     cancellation: CancellationToken,
     114            1189 :     connect_timeout: Duration,
     115            1189 :     ctx: RequestContext,
     116            1189 :     node: NodeId,
     117            1189 : ) -> Result<(), WalReceiverError> {
     118            1188 :     debug_assert_current_span_has_tenant_and_timeline_id();
     119            1188 : 
     120            1188 :     WALRECEIVER_STARTED_CONNECTIONS.inc();
     121                 : 
     122                 :     // Connect to the database in replication mode.
     123            1188 :     info!("connecting to {wal_source_connconf:?}");
     124                 : 
     125             679 :     let (mut replication_client, connection) = {
     126            1188 :         let mut config = wal_source_connconf.to_tokio_postgres_config();
     127            1188 :         config.application_name("pageserver");
     128            1188 :         config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
     129            1893 :         match time::timeout(connect_timeout, config.connect(postgres::NoTls)).await {
     130            1188 :             Ok(client_and_conn) => client_and_conn?,
     131 UBC           0 :             Err(_elapsed) => {
     132                 :                 // Timing out to connect to a safekeeper node could happen long time, due to
     133                 :                 // many reasons that pageserver cannot control.
     134                 :                 // Do not produce an error, but make it visible, that timeouts happen by logging the `event.
     135               0 :                 info!("Timed out while waiting {connect_timeout:?} for walreceiver connection to open");
     136               0 :                 return Ok(());
     137                 :             }
     138                 :         }
     139                 :     };
     140                 : 
     141               0 :     debug!("connected!");
     142 CBC         679 :     let mut connection_status = WalConnectionStatus {
     143             679 :         is_connected: true,
     144             679 :         has_processed_wal: false,
     145             679 :         latest_connection_update: Utc::now().naive_utc(),
     146             679 :         latest_wal_update: Utc::now().naive_utc(),
     147             679 :         streaming_lsn: None,
     148             679 :         commit_lsn: None,
     149             679 :         node,
     150             679 :     };
     151             679 :     if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     152 UBC           0 :         warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
     153               0 :         return Ok(());
     154 CBC         679 :     }
     155             679 : 
     156             679 :     // The connection object performs the actual communication with the database,
     157             679 :     // so spawn it off to run on its own.
     158             679 :     let _connection_ctx = ctx.detached_child(
     159             679 :         TaskKind::WalReceiverConnectionPoller,
     160             679 :         ctx.download_behavior(),
     161             679 :     );
     162             679 :     let connection_cancellation = cancellation.clone();
     163             679 :     task_mgr::spawn(
     164             679 :         WALRECEIVER_RUNTIME.handle(),
     165             679 :         TaskKind::WalReceiverConnectionPoller,
     166             679 :         Some(timeline.tenant_id),
     167             679 :         Some(timeline.timeline_id),
     168             679 :         "walreceiver connection",
     169                 :         false,
     170             679 :         async move {
     171             679 :             debug_assert_current_span_has_tenant_and_timeline_id();
     172             679 : 
     173         1419587 :             select! {
     174             386 :                 connection_result = connection => match connection_result {
     175 UBC           0 :                     Ok(()) => debug!("Walreceiver db connection closed"),
     176                 :                     Err(connection_error) => {
     177                 :                         match WalReceiverError::from(connection_error) {
     178                 :                             WalReceiverError::ExpectedSafekeeperError(_) => {
     179                 :                                 // silence, because most likely we've already exited the outer call
     180                 :                                 // with a similar error.
     181                 :                             },
     182                 :                             WalReceiverError::SuccessfulCompletion(_) => {}
     183                 :                             WalReceiverError::Other(err) => {
     184 CBC           8 :                                 warn!("Connection aborted: {err:#}")
     185                 :                             }
     186                 :                         }
     187                 :                     }
     188                 :                 },
     189 UBC           0 :                 _ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
     190                 :             }
     191 CBC         613 :             Ok(())
     192             613 :         }
     193                 :         // Enrich the log lines emitted by this closure with meaningful context.
     194                 :         // TODO: technically, this task outlives the surrounding function, so, the
     195                 :         // spans won't be properly nested.
     196             679 :         .instrument(tracing::info_span!("poller")),
     197                 :     );
     198                 : 
     199                 :     // Immediately increment the gauge, then create a job to decrement it on task exit.
     200                 :     // One of the pros of `defer!` is that this will *most probably*
     201                 :     // get called, even in presence of panics.
     202             679 :     let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
     203             679 :     gauge.inc();
     204             613 :     scopeguard::defer! {
     205             613 :         gauge.dec();
     206             613 :     }
     207                 : 
     208             679 :     let identify = identify_system(&mut replication_client).await?;
     209             679 :     info!("{identify:?}");
     210                 : 
     211             679 :     let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
     212             679 :     let mut caught_up = false;
     213             679 : 
     214             679 :     connection_status.latest_connection_update = Utc::now().naive_utc();
     215             679 :     connection_status.latest_wal_update = Utc::now().naive_utc();
     216             679 :     connection_status.commit_lsn = Some(end_of_wal);
     217             679 :     if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     218 UBC           0 :         warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
     219               0 :         return Ok(());
     220 CBC         679 :     }
     221             679 : 
     222             679 :     //
     223             679 :     // Start streaming the WAL, from where we left off previously.
     224             679 :     //
     225             679 :     // If we had previously received WAL up to some point in the middle of a WAL record, we
     226             679 :     // better start from the end of last full WAL record, not in the middle of one.
     227             679 :     let mut last_rec_lsn = timeline.get_last_record_lsn();
     228             679 :     let mut startpoint = last_rec_lsn;
     229             679 : 
     230             679 :     if startpoint == Lsn(0) {
     231 UBC           0 :         return Err(WalReceiverError::Other(anyhow!("No previous WAL position")));
     232 CBC         679 :     }
     233             679 : 
     234             679 :     // There might be some padding after the last full record, skip it.
     235             679 :     startpoint += startpoint.calc_padding(8u32);
     236             679 : 
     237             679 :     // If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
     238             679 :     // for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
     239             679 :     //. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
     240             679 :     // but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
     241             679 :     // header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
     242             679 :     //  to the safekeepers.
     243             679 :     startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
     244                 : 
     245             679 :     info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
     246                 : 
     247             679 :     let query = format!("START_REPLICATION PHYSICAL {startpoint}");
     248                 : 
     249             679 :     let copy_stream = replication_client.copy_both_simple(&query).await?;
     250             677 :     let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
     251             677 : 
     252             677 :     let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
     253                 : 
     254             677 :     let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
     255                 : 
     256          777307 :     while let Some(replication_message) = {
     257          777538 :         select! {
     258                 :             _ = cancellation.cancelled() => {
     259 UBC           0 :                 debug!("walreceiver interrupted");
     260                 :                 None
     261                 :             }
     262 CBC      777307 :             replication_message = physical_stream.next() => replication_message,
     263                 :         }
     264                 :     } {
     265          777307 :         let replication_message = replication_message?;
     266                 : 
     267          776912 :         let now = Utc::now().naive_utc();
     268          776912 :         let last_rec_lsn_before_msg = last_rec_lsn;
     269          776912 : 
     270          776912 :         // Update the connection status before processing the message. If the message processing
     271          776912 :         // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
     272          776912 :         match &replication_message {
     273          774979 :             ReplicationMessage::XLogData(xlog_data) => {
     274          774979 :                 connection_status.latest_connection_update = now;
     275          774979 :                 connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
     276          774979 :                 connection_status.streaming_lsn = Some(Lsn::from(
     277          774979 :                     xlog_data.wal_start() + xlog_data.data().len() as u64,
     278          774979 :                 ));
     279          774979 :                 if !xlog_data.data().is_empty() {
     280          774979 :                     connection_status.latest_wal_update = now;
     281          774979 :                 }
     282                 :             }
     283            1933 :             ReplicationMessage::PrimaryKeepAlive(keepalive) => {
     284            1933 :                 connection_status.latest_connection_update = now;
     285            1933 :                 connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
     286            1933 :             }
     287 UBC           0 :             &_ => {}
     288                 :         };
     289 CBC      776912 :         if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     290 UBC           0 :             warn!("Wal connection event listener dropped, aborting the connection: {e}");
     291               0 :             return Ok(());
     292 CBC      776912 :         }
     293                 : 
     294          776912 :         let status_update = match replication_message {
     295          774979 :             ReplicationMessage::XLogData(xlog_data) => {
     296          774979 :                 // Pass the WAL data to the decoder, and see if we can decode
     297          774979 :                 // more records as a result.
     298          774979 :                 let data = xlog_data.data();
     299          774979 :                 let startlsn = Lsn::from(xlog_data.wal_start());
     300          774979 :                 let endlsn = startlsn + data.len() as u64;
     301                 : 
     302 UBC           0 :                 trace!("received XLogData between {startlsn} and {endlsn}");
     303                 : 
     304 CBC      774979 :                 waldecoder.feed_bytes(data);
     305          774979 : 
     306          774979 :                 {
     307          774979 :                     let mut decoded = DecodedWALRecord::default();
     308          774979 :                     let mut modification = timeline.begin_modification(endlsn);
     309        69363164 :                     while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     310                 :                         // It is important to deal with the aligned records as lsn in getPage@LSN is
     311                 :                         // aligned and can be several bytes bigger. Without this alignment we are
     312                 :                         // at risk of hitting a deadlock.
     313        68588194 :                         if !lsn.is_aligned() {
     314 UBC           0 :                             return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
     315 CBC    68588194 :                         }
     316        68588194 : 
     317        68588194 :                         walingest
     318        68588194 :                             .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
     319         4419908 :                             .await
     320        68588185 :                             .with_context(|| format!("could not ingest record at {lsn}"))?;
     321                 : 
     322 UBC           0 :                         fail_point!("walreceiver-after-ingest");
     323                 : 
     324 CBC    68588185 :                         last_rec_lsn = lsn;
     325                 :                     }
     326                 :                 }
     327                 : 
     328          774969 :                 if !caught_up && endlsn >= end_of_wal {
     329             539 :                     info!("caught up at LSN {endlsn}");
     330             539 :                     caught_up = true;
     331          774430 :                 }
     332                 : 
     333          774969 :                 Some(endlsn)
     334                 :             }
     335                 : 
     336            1933 :             ReplicationMessage::PrimaryKeepAlive(keepalive) => {
     337            1933 :                 let wal_end = keepalive.wal_end();
     338            1933 :                 let timestamp = keepalive.timestamp();
     339            1933 :                 let reply_requested = keepalive.reply() != 0;
     340                 : 
     341 UBC           0 :                 trace!("received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})");
     342                 : 
     343 CBC        1933 :                 if reply_requested {
     344            1933 :                     Some(last_rec_lsn)
     345                 :                 } else {
     346 UBC           0 :                     None
     347                 :                 }
     348                 :             }
     349                 : 
     350               0 :             _ => None,
     351                 :         };
     352                 : 
     353 CBC      776902 :         if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
     354                 :             // We have successfully processed at least one WAL record.
     355             550 :             connection_status.has_processed_wal = true;
     356             550 :             if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
     357 UBC           0 :                 warn!("Wal connection event listener dropped, aborting the connection: {e}");
     358               0 :                 return Ok(());
     359 CBC         550 :             }
     360          776352 :         }
     361                 : 
     362          776902 :         timeline
     363          776902 :             .check_checkpoint_distance()
     364            1975 :             .await
     365          776902 :             .with_context(|| {
     366 UBC           0 :                 format!(
     367               0 :                     "Failed to check checkpoint distance for timeline {}",
     368               0 :                     timeline.timeline_id
     369               0 :                 )
     370 CBC      776902 :             })?;
     371                 : 
     372          776902 :         if let Some(last_lsn) = status_update {
     373          776902 :             let timeline_remote_consistent_lsn = timeline
     374          776902 :                 .get_remote_consistent_lsn_visible()
     375          776902 :                 .unwrap_or(Lsn(0));
     376          776902 : 
     377          776902 :             // The last LSN we processed. It is not guaranteed to survive pageserver crash.
     378          776902 :             let last_received_lsn = last_lsn;
     379          776902 :             // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
     380          776902 :             let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     381          776902 :             // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
     382          776902 :             // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
     383          776902 :             let remote_consistent_lsn = timeline_remote_consistent_lsn;
     384          776902 :             let ts = SystemTime::now();
     385          776902 : 
     386          776902 :             // Update the status about what we just received. This is shown in the mgmt API.
     387          776902 :             let last_received_wal = WalReceiverInfo {
     388          776902 :                 wal_source_connconf: wal_source_connconf.clone(),
     389          776902 :                 last_received_msg_lsn: last_lsn,
     390          776902 :                 last_received_msg_ts: ts
     391          776902 :                     .duration_since(SystemTime::UNIX_EPOCH)
     392          776902 :                     .expect("Received message time should be before UNIX EPOCH!")
     393          776902 :                     .as_micros(),
     394          776902 :             };
     395          776902 :             *timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
     396                 : 
     397                 :             // Send the replication feedback message.
     398                 :             // Regular standby_status_update fields are put into this message.
     399          776902 :             let (timeline_logical_size, _) = timeline
     400          776902 :                 .get_current_logical_size(&ctx)
     401          776902 :                 .context("Status update creation failed to get current logical size")?;
     402          776902 :             let status_update = PageserverFeedback {
     403          776902 :                 current_timeline_size: timeline_logical_size,
     404          776902 :                 last_received_lsn,
     405          776902 :                 disk_consistent_lsn,
     406          776902 :                 remote_consistent_lsn,
     407          776902 :                 replytime: ts,
     408          776902 :             };
     409                 : 
     410 UBC           0 :             debug!("neon_status_update {status_update:?}");
     411                 : 
     412 CBC      776902 :             let mut data = BytesMut::new();
     413          776902 :             status_update.serialize(&mut data);
     414          776902 :             physical_stream
     415          776902 :                 .as_mut()
     416          776902 :                 .zenith_status_update(data.len() as u64, &data)
     417           22930 :                 .await?;
     418 UBC           0 :         }
     419                 :     }
     420                 : 
     421 CBC         175 :     Ok(())
     422            1122 : }
     423                 : 
     424                 : /// Data returned from the postgres `IDENTIFY_SYSTEM` command
     425                 : ///
     426                 : /// See the [postgres docs] for more details.
     427                 : ///
     428                 : /// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
     429             679 : #[derive(Debug)]
     430                 : // As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
     431                 : // unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
     432                 : #[allow(dead_code)]
     433                 : struct IdentifySystem {
     434                 :     systemid: u64,
     435                 :     timeline: u32,
     436                 :     xlogpos: PgLsn,
     437                 :     dbname: Option<String>,
     438                 : }
     439                 : 
     440                 : /// There was a problem parsing the response to
     441                 : /// a postgres IDENTIFY_SYSTEM command.
     442 UBC           0 : #[derive(Debug, thiserror::Error)]
     443                 : #[error("IDENTIFY_SYSTEM parse error")]
     444                 : struct IdentifyError;
     445                 : 
     446                 : /// Run the postgres `IDENTIFY_SYSTEM` command
     447 CBC         679 : async fn identify_system(client: &mut Client) -> anyhow::Result<IdentifySystem> {
     448             679 :     let query_str = "IDENTIFY_SYSTEM";
     449             679 :     let response = client.simple_query(query_str).await?;
     450                 : 
     451                 :     // get(N) from row, then parse it as some destination type.
     452            2716 :     fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
     453            2716 :     where
     454            2716 :         T: FromStr,
     455            2716 :     {
     456            2716 :         let val = row.get(idx).ok_or(IdentifyError)?;
     457            2037 :         val.parse::<T>().or(Err(IdentifyError))
     458            2716 :     }
     459                 : 
     460                 :     // extract the row contents into an IdentifySystem struct.
     461                 :     // written as a closure so I can use ? for Option here.
     462             679 :     if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) {
     463                 :         Ok(IdentifySystem {
     464             679 :             systemid: get_parse(first_row, 0)?,
     465             679 :             timeline: get_parse(first_row, 1)?,
     466             679 :             xlogpos: get_parse(first_row, 2)?,
     467             679 :             dbname: get_parse(first_row, 3).ok(),
     468                 :         })
     469                 :     } else {
     470 UBC           0 :         Err(IdentifyError.into())
     471                 :     }
     472 CBC         679 : }
        

Generated by: LCOV version 2.1-beta