LCOV - code coverage report
Current view: top level - safekeeper/src - handler.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 16.0 % 376 60
Test Date: 2025-02-20 13:11:02 Functions: 7.3 % 41 3

            Line data    Source code
       1              : //! Part of Safekeeper pretending to be Postgres, i.e. handling Postgres
       2              : //! protocol commands.
       3              : 
       4              : use anyhow::Context;
       5              : use pageserver_api::models::ShardParameters;
       6              : use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
       7              : use safekeeper_api::models::ConnectionId;
       8              : use safekeeper_api::Term;
       9              : use std::future::Future;
      10              : use std::str::{self, FromStr};
      11              : use std::sync::Arc;
      12              : use tokio::io::{AsyncRead, AsyncWrite};
      13              : use tracing::{debug, info, info_span, Instrument};
      14              : use utils::postgres_client::PostgresClientProtocol;
      15              : use utils::shard::{ShardCount, ShardNumber};
      16              : 
      17              : use crate::auth::check_permission;
      18              : use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
      19              : 
      20              : use crate::metrics::{TrafficMetrics, PG_QUERIES_GAUGE};
      21              : use crate::timeline::TimelineError;
      22              : use crate::{GlobalTimelines, SafeKeeperConf};
      23              : use postgres_backend::PostgresBackend;
      24              : use postgres_backend::QueryError;
      25              : use postgres_ffi::PG_TLI;
      26              : use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
      27              : use regex::Regex;
      28              : use utils::auth::{Claims, JwtAuth, Scope};
      29              : use utils::{
      30              :     id::{TenantId, TenantTimelineId, TimelineId},
      31              :     lsn::Lsn,
      32              : };
      33              : 
      34              : /// Safekeeper handler of postgres commands
      35              : pub struct SafekeeperPostgresHandler {
      36              :     pub conf: Arc<SafeKeeperConf>,
      37              :     /// assigned application name
      38              :     pub appname: Option<String>,
      39              :     pub tenant_id: Option<TenantId>,
      40              :     pub timeline_id: Option<TimelineId>,
      41              :     pub ttid: TenantTimelineId,
      42              :     pub shard: Option<ShardIdentity>,
      43              :     pub protocol: Option<PostgresClientProtocol>,
      44              :     /// Unique connection id is logged in spans for observability.
      45              :     pub conn_id: ConnectionId,
      46              :     pub global_timelines: Arc<GlobalTimelines>,
      47              :     /// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
      48              :     auth: Option<(Scope, Arc<JwtAuth>)>,
      49              :     claims: Option<Claims>,
      50              :     io_metrics: Option<TrafficMetrics>,
      51              : }
      52              : 
      53              : /// Parsed Postgres command.
      54              : enum SafekeeperPostgresCommand {
      55              :     StartWalPush {
      56              :         proto_version: u32,
      57              :         // Eventually timelines will be always created explicitly by storcon.
      58              :         // This option allows legacy behaviour for compute to do that until we
      59              :         // fully migrate.
      60              :         allow_timeline_creation: bool,
      61              :     },
      62              :     StartReplication {
      63              :         start_lsn: Lsn,
      64              :         term: Option<Term>,
      65              :     },
      66              :     IdentifySystem,
      67              :     TimelineStatus,
      68              :     JSONCtrl {
      69              :         cmd: AppendLogicalMessage,
      70              :     },
      71              : }
      72              : 
      73            2 : fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
      74            2 :     if cmd.starts_with("START_WAL_PUSH") {
      75              :         // Allow additional options in postgres START_REPLICATION style like
      76              :         //   START_WAL_PUSH (proto_version '3', allow_timeline_creation 'false').
      77              :         // Parsing here is very naive and breaks in case of commas or
      78              :         // whitespaces in values, but enough for our purposes.
      79            2 :         let re = Regex::new(r"START_WAL_PUSH(\s+?\((.*)\))?").unwrap();
      80            2 :         let caps = re
      81            2 :             .captures(cmd)
      82            2 :             .context(format!("failed to parse START_WAL_PUSH command {}", cmd))?;
      83              :         // capture () content
      84            2 :         let options = caps.get(2).map(|m| m.as_str()).unwrap_or("");
      85            2 :         // default values
      86            2 :         let mut proto_version = 2;
      87            2 :         let mut allow_timeline_creation = true;
      88            4 :         for kvstr in options.split(",") {
      89            4 :             if kvstr.is_empty() {
      90            1 :                 continue;
      91            3 :             }
      92            3 :             let mut kvit = kvstr.split_whitespace();
      93            3 :             let key = kvit.next().context(format!(
      94            3 :                 "failed to parse key in kv {} in command {}",
      95            3 :                 kvstr, cmd
      96            3 :             ))?;
      97            3 :             let value = kvit.next().context(format!(
      98            3 :                 "failed to parse value in kv {} in command {}",
      99            3 :                 kvstr, cmd
     100            3 :             ))?;
     101            3 :             let value_trimmed = value.trim_matches('\'');
     102            3 :             if key == "proto_version" {
     103            1 :                 proto_version = value_trimmed.parse::<u32>().context(format!(
     104            1 :                     "failed to parse proto_version value {} in command {}",
     105            1 :                     value, cmd
     106            1 :                 ))?;
     107            2 :             }
     108            3 :             if key == "allow_timeline_creation" {
     109            1 :                 allow_timeline_creation = value_trimmed.parse::<bool>().context(format!(
     110            1 :                     "failed to parse allow_timeline_creation value {} in command {}",
     111            1 :                     value, cmd
     112            1 :                 ))?;
     113            2 :             }
     114              :         }
     115            2 :         Ok(SafekeeperPostgresCommand::StartWalPush {
     116            2 :             proto_version,
     117            2 :             allow_timeline_creation,
     118            2 :         })
     119            0 :     } else if cmd.starts_with("START_REPLICATION") {
     120            0 :         let re = Regex::new(
     121            0 :             // We follow postgres START_REPLICATION LOGICAL options to pass term.
     122            0 :             r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?",
     123            0 :         )
     124            0 :         .unwrap();
     125            0 :         let caps = re
     126            0 :             .captures(cmd)
     127            0 :             .context(format!("failed to parse START_REPLICATION command {}", cmd))?;
     128            0 :         let start_lsn =
     129            0 :             Lsn::from_str(&caps[1]).context("parse start LSN from START_REPLICATION command")?;
     130            0 :         let term = if let Some(m) = caps.get(2) {
     131            0 :             Some(m.as_str().parse::<u64>().context("invalid term")?)
     132              :         } else {
     133            0 :             None
     134              :         };
     135            0 :         Ok(SafekeeperPostgresCommand::StartReplication { start_lsn, term })
     136            0 :     } else if cmd.starts_with("IDENTIFY_SYSTEM") {
     137            0 :         Ok(SafekeeperPostgresCommand::IdentifySystem)
     138            0 :     } else if cmd.starts_with("TIMELINE_STATUS") {
     139            0 :         Ok(SafekeeperPostgresCommand::TimelineStatus)
     140            0 :     } else if cmd.starts_with("JSON_CTRL") {
     141            0 :         let cmd = cmd.strip_prefix("JSON_CTRL").context("invalid prefix")?;
     142              :         Ok(SafekeeperPostgresCommand::JSONCtrl {
     143            0 :             cmd: serde_json::from_str(cmd)?,
     144              :         })
     145              :     } else {
     146            0 :         anyhow::bail!("unsupported command {cmd}");
     147              :     }
     148            2 : }
     149              : 
     150            0 : fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
     151            0 :     match cmd {
     152            0 :         SafekeeperPostgresCommand::StartWalPush { .. } => "START_WAL_PUSH",
     153            0 :         SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
     154            0 :         SafekeeperPostgresCommand::TimelineStatus => "TIMELINE_STATUS",
     155            0 :         SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM",
     156            0 :         SafekeeperPostgresCommand::JSONCtrl { .. } => "JSON_CTRL",
     157              :     }
     158            0 : }
     159              : 
     160              : impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
     161              :     for SafekeeperPostgresHandler
     162              : {
     163              :     // tenant_id and timeline_id are passed in connection string params
     164            0 :     fn startup(
     165            0 :         &mut self,
     166            0 :         _pgb: &mut PostgresBackend<IO>,
     167            0 :         sm: &FeStartupPacket,
     168            0 :     ) -> Result<(), QueryError> {
     169            0 :         if let FeStartupPacket::StartupMessage { params, .. } = sm {
     170            0 :             if let Some(options) = params.options_raw() {
     171            0 :                 let mut shard_count: Option<u8> = None;
     172            0 :                 let mut shard_number: Option<u8> = None;
     173            0 :                 let mut shard_stripe_size: Option<u32> = None;
     174              : 
     175            0 :                 for opt in options {
     176              :                     // FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
     177              :                     // remove these after the PR gets deployed:
     178              :                     // https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
     179            0 :                     match opt.split_once('=') {
     180            0 :                         Some(("protocol", value)) => {
     181            0 :                             self.protocol =
     182            0 :                                 Some(serde_json::from_str(value).with_context(|| {
     183            0 :                                     format!("Failed to parse {value} as protocol")
     184            0 :                                 })?);
     185              :                         }
     186            0 :                         Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
     187            0 :                             self.tenant_id = Some(value.parse().with_context(|| {
     188            0 :                                 format!("Failed to parse {value} as tenant id")
     189            0 :                             })?);
     190              :                         }
     191            0 :                         Some(("ztimelineid", value)) | Some(("timeline_id", value)) => {
     192            0 :                             self.timeline_id = Some(value.parse().with_context(|| {
     193            0 :                                 format!("Failed to parse {value} as timeline id")
     194            0 :                             })?);
     195              :                         }
     196            0 :                         Some(("availability_zone", client_az)) => {
     197            0 :                             if let Some(metrics) = self.io_metrics.as_ref() {
     198            0 :                                 metrics.set_client_az(client_az)
     199            0 :                             }
     200              :                         }
     201            0 :                         Some(("shard_count", value)) => {
     202            0 :                             shard_count = Some(value.parse::<u8>().with_context(|| {
     203            0 :                                 format!("Failed to parse {value} as shard count")
     204            0 :                             })?);
     205              :                         }
     206            0 :                         Some(("shard_number", value)) => {
     207            0 :                             shard_number = Some(value.parse::<u8>().with_context(|| {
     208            0 :                                 format!("Failed to parse {value} as shard number")
     209            0 :                             })?);
     210              :                         }
     211            0 :                         Some(("shard_stripe_size", value)) => {
     212            0 :                             shard_stripe_size = Some(value.parse::<u32>().with_context(|| {
     213            0 :                                 format!("Failed to parse {value} as shard stripe size")
     214            0 :                             })?);
     215              :                         }
     216            0 :                         _ => continue,
     217              :                     }
     218              :                 }
     219              : 
     220            0 :                 match self.protocol() {
     221              :                     PostgresClientProtocol::Vanilla => {
     222            0 :                         if shard_count.is_some()
     223            0 :                             || shard_number.is_some()
     224            0 :                             || shard_stripe_size.is_some()
     225              :                         {
     226            0 :                             return Err(QueryError::Other(anyhow::anyhow!(
     227            0 :                                 "Shard params specified for vanilla protocol"
     228            0 :                             )));
     229            0 :                         }
     230              :                     }
     231              :                     PostgresClientProtocol::Interpreted { .. } => {
     232            0 :                         match (shard_count, shard_number, shard_stripe_size) {
     233            0 :                             (Some(count), Some(number), Some(stripe_size)) => {
     234            0 :                                 let params = ShardParameters {
     235            0 :                                     count: ShardCount(count),
     236            0 :                                     stripe_size: ShardStripeSize(stripe_size),
     237            0 :                                 };
     238            0 :                                 self.shard =
     239            0 :                                     Some(ShardIdentity::from_params(ShardNumber(number), &params));
     240            0 :                             }
     241              :                             _ => {
     242            0 :                                 return Err(QueryError::Other(anyhow::anyhow!(
     243            0 :                                     "Shard params were not specified"
     244            0 :                                 )));
     245              :                             }
     246              :                         }
     247              :                     }
     248              :                 }
     249            0 :             }
     250              : 
     251            0 :             if let Some(app_name) = params.get("application_name") {
     252            0 :                 self.appname = Some(app_name.to_owned());
     253            0 :                 if let Some(metrics) = self.io_metrics.as_ref() {
     254            0 :                     metrics.set_app_name(app_name)
     255            0 :                 }
     256            0 :             }
     257              : 
     258            0 :             let ttid = TenantTimelineId::new(
     259            0 :                 self.tenant_id.unwrap_or(TenantId::from([0u8; 16])),
     260            0 :                 self.timeline_id.unwrap_or(TimelineId::from([0u8; 16])),
     261            0 :             );
     262            0 :             tracing::Span::current()
     263            0 :                 .record("ttid", tracing::field::display(ttid))
     264            0 :                 .record(
     265            0 :                     "application_name",
     266            0 :                     tracing::field::debug(self.appname.clone()),
     267            0 :                 );
     268              : 
     269            0 :             if let Some(shard) = self.shard.as_ref() {
     270            0 :                 if let Some(slug) = shard.shard_slug().strip_prefix("-") {
     271            0 :                     tracing::Span::current().record("shard", tracing::field::display(slug));
     272            0 :                 }
     273            0 :             }
     274              : 
     275            0 :             Ok(())
     276              :         } else {
     277            0 :             Err(QueryError::Other(anyhow::anyhow!(
     278            0 :                 "Safekeeper received unexpected initial message: {sm:?}"
     279            0 :             )))
     280              :         }
     281            0 :     }
     282              : 
     283            0 :     fn check_auth_jwt(
     284            0 :         &mut self,
     285            0 :         _pgb: &mut PostgresBackend<IO>,
     286            0 :         jwt_response: &[u8],
     287            0 :     ) -> Result<(), QueryError> {
     288            0 :         // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
     289            0 :         // which requires auth to be present
     290            0 :         let (allowed_auth_scope, auth) = self
     291            0 :             .auth
     292            0 :             .as_ref()
     293            0 :             .expect("auth_type is configured but .auth of handler is missing");
     294            0 :         let data = auth
     295            0 :             .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
     296            0 :             .map_err(|e| QueryError::Unauthorized(e.0))?;
     297              : 
     298              :         // The handler might be configured to allow only tenant scope tokens.
     299            0 :         if matches!(allowed_auth_scope, Scope::Tenant)
     300            0 :             && !matches!(data.claims.scope, Scope::Tenant)
     301              :         {
     302            0 :             return Err(QueryError::Unauthorized(
     303            0 :                 "passed JWT token is for full access, but only tenant scope is allowed".into(),
     304            0 :             ));
     305            0 :         }
     306              : 
     307            0 :         if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
     308            0 :             return Err(QueryError::Unauthorized(
     309            0 :                 "jwt token scope is Tenant, but tenant id is missing".into(),
     310            0 :             ));
     311            0 :         }
     312            0 : 
     313            0 :         debug!(
     314            0 :             "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
     315              :             data.claims.scope, data.claims.tenant_id,
     316              :         );
     317              : 
     318            0 :         self.claims = Some(data.claims);
     319            0 :         Ok(())
     320            0 :     }
     321              : 
     322            0 :     fn process_query(
     323            0 :         &mut self,
     324            0 :         pgb: &mut PostgresBackend<IO>,
     325            0 :         query_string: &str,
     326            0 :     ) -> impl Future<Output = Result<(), QueryError>> {
     327            0 :         Box::pin(async move {
     328            0 :             if query_string
     329            0 :                 .to_ascii_lowercase()
     330            0 :                 .starts_with("set datestyle to ")
     331              :             {
     332              :                 // important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
     333            0 :                 pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
     334            0 :                 return Ok(());
     335            0 :             }
     336              : 
     337            0 :             let cmd = parse_cmd(query_string)?;
     338            0 :             let cmd_str = cmd_to_string(&cmd);
     339            0 : 
     340            0 :             let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
     341            0 : 
     342            0 :             info!("got query {:?}", query_string);
     343              : 
     344            0 :             let tenant_id = self.tenant_id.context("tenantid is required")?;
     345            0 :             let timeline_id = self.timeline_id.context("timelineid is required")?;
     346            0 :             self.check_permission(Some(tenant_id))?;
     347            0 :             self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
     348            0 : 
     349            0 :             match cmd {
     350              :                 SafekeeperPostgresCommand::StartWalPush {
     351            0 :                     proto_version,
     352            0 :                     allow_timeline_creation,
     353            0 :                 } => {
     354            0 :                     self.handle_start_wal_push(pgb, proto_version, allow_timeline_creation)
     355            0 :                         .instrument(info_span!("WAL receiver"))
     356            0 :                         .await
     357              :                 }
     358            0 :                 SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
     359            0 :                     self.handle_start_replication(pgb, start_lsn, term)
     360            0 :                         .instrument(info_span!("WAL sender"))
     361            0 :                         .await
     362              :                 }
     363            0 :                 SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
     364            0 :                 SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
     365            0 :                 SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
     366            0 :                     handle_json_ctrl(self, pgb, cmd).await
     367              :                 }
     368              :             }
     369            0 :         })
     370            0 :     }
     371              : }
     372              : 
     373              : impl SafekeeperPostgresHandler {
     374            0 :     pub fn new(
     375            0 :         conf: Arc<SafeKeeperConf>,
     376            0 :         conn_id: u32,
     377            0 :         io_metrics: Option<TrafficMetrics>,
     378            0 :         auth: Option<(Scope, Arc<JwtAuth>)>,
     379            0 :         global_timelines: Arc<GlobalTimelines>,
     380            0 :     ) -> Self {
     381            0 :         SafekeeperPostgresHandler {
     382            0 :             conf,
     383            0 :             appname: None,
     384            0 :             tenant_id: None,
     385            0 :             timeline_id: None,
     386            0 :             ttid: TenantTimelineId::empty(),
     387            0 :             shard: None,
     388            0 :             protocol: None,
     389            0 :             conn_id,
     390            0 :             claims: None,
     391            0 :             auth,
     392            0 :             io_metrics,
     393            0 :             global_timelines,
     394            0 :         }
     395            0 :     }
     396              : 
     397            0 :     pub fn protocol(&self) -> PostgresClientProtocol {
     398            0 :         self.protocol.unwrap_or(PostgresClientProtocol::Vanilla)
     399            0 :     }
     400              : 
     401              :     // when accessing management api supply None as an argument
     402              :     // when using to authorize tenant pass corresponding tenant id
     403            0 :     fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
     404            0 :         if self.auth.is_none() {
     405              :             // auth is set to Trust, nothing to check so just return ok
     406            0 :             return Ok(());
     407            0 :         }
     408            0 :         // auth is some, just checked above, when auth is some
     409            0 :         // then claims are always present because of checks during connection init
     410            0 :         // so this expect won't trigger
     411            0 :         let claims = self
     412            0 :             .claims
     413            0 :             .as_ref()
     414            0 :             .expect("claims presence already checked");
     415            0 :         check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
     416            0 :     }
     417              : 
     418            0 :     async fn handle_timeline_status<IO: AsyncRead + AsyncWrite + Unpin>(
     419            0 :         &mut self,
     420            0 :         pgb: &mut PostgresBackend<IO>,
     421            0 :     ) -> Result<(), QueryError> {
     422              :         // Get timeline, handling "not found" error
     423            0 :         let tli = match self.global_timelines.get(self.ttid) {
     424            0 :             Ok(tli) => Ok(Some(tli)),
     425            0 :             Err(TimelineError::NotFound(_)) => Ok(None),
     426            0 :             Err(e) => Err(QueryError::Other(e.into())),
     427            0 :         }?;
     428              : 
     429              :         // Write row description
     430            0 :         pgb.write_message_noflush(&BeMessage::RowDescription(&[
     431            0 :             RowDescriptor::text_col(b"flush_lsn"),
     432            0 :             RowDescriptor::text_col(b"commit_lsn"),
     433            0 :         ]))?;
     434              : 
     435              :         // Write row if timeline exists
     436            0 :         if let Some(tli) = tli {
     437            0 :             let (inmem, _state) = tli.get_state().await;
     438            0 :             let flush_lsn = tli.get_flush_lsn().await;
     439            0 :             let commit_lsn = inmem.commit_lsn;
     440            0 :             pgb.write_message_noflush(&BeMessage::DataRow(&[
     441            0 :                 Some(flush_lsn.to_string().as_bytes()),
     442            0 :                 Some(commit_lsn.to_string().as_bytes()),
     443            0 :             ]))?;
     444            0 :         }
     445              : 
     446            0 :         pgb.write_message_noflush(&BeMessage::CommandComplete(b"TIMELINE_STATUS"))?;
     447            0 :         Ok(())
     448            0 :     }
     449              : 
     450              :     ///
     451              :     /// Handle IDENTIFY_SYSTEM replication command
     452              :     ///
     453            0 :     async fn handle_identify_system<IO: AsyncRead + AsyncWrite + Unpin>(
     454            0 :         &mut self,
     455            0 :         pgb: &mut PostgresBackend<IO>,
     456            0 :     ) -> Result<(), QueryError> {
     457            0 :         let tli = self
     458            0 :             .global_timelines
     459            0 :             .get(self.ttid)
     460            0 :             .map_err(|e| QueryError::Other(e.into()))?;
     461              : 
     462            0 :         let lsn = if self.is_walproposer_recovery() {
     463              :             // walproposer should get all local WAL until flush_lsn
     464            0 :             tli.get_flush_lsn().await
     465              :         } else {
     466              :             // other clients shouldn't get any uncommitted WAL
     467            0 :             tli.get_state().await.0.commit_lsn
     468              :         }
     469            0 :         .to_string();
     470              : 
     471            0 :         let sysid = tli.get_state().await.1.server.system_id.to_string();
     472            0 :         let lsn_bytes = lsn.as_bytes();
     473            0 :         let tli = PG_TLI.to_string();
     474            0 :         let tli_bytes = tli.as_bytes();
     475            0 :         let sysid_bytes = sysid.as_bytes();
     476            0 : 
     477            0 :         pgb.write_message_noflush(&BeMessage::RowDescription(&[
     478            0 :             RowDescriptor {
     479            0 :                 name: b"systemid",
     480            0 :                 typoid: TEXT_OID,
     481            0 :                 typlen: -1,
     482            0 :                 ..Default::default()
     483            0 :             },
     484            0 :             RowDescriptor {
     485            0 :                 name: b"timeline",
     486            0 :                 typoid: INT4_OID,
     487            0 :                 typlen: 4,
     488            0 :                 ..Default::default()
     489            0 :             },
     490            0 :             RowDescriptor {
     491            0 :                 name: b"xlogpos",
     492            0 :                 typoid: TEXT_OID,
     493            0 :                 typlen: -1,
     494            0 :                 ..Default::default()
     495            0 :             },
     496            0 :             RowDescriptor {
     497            0 :                 name: b"dbname",
     498            0 :                 typoid: TEXT_OID,
     499            0 :                 typlen: -1,
     500            0 :                 ..Default::default()
     501            0 :             },
     502            0 :         ]))?
     503            0 :         .write_message_noflush(&BeMessage::DataRow(&[
     504            0 :             Some(sysid_bytes),
     505            0 :             Some(tli_bytes),
     506            0 :             Some(lsn_bytes),
     507            0 :             None,
     508            0 :         ]))?
     509            0 :         .write_message_noflush(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
     510            0 :         Ok(())
     511            0 :     }
     512              : 
     513              :     /// Returns true if current connection is a replication connection, originating
     514              :     /// from a walproposer recovery function. This connection gets a special handling:
     515              :     /// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.
     516            0 :     pub fn is_walproposer_recovery(&self) -> bool {
     517            0 :         match &self.appname {
     518            0 :             None => false,
     519            0 :             Some(appname) => {
     520            0 :                 appname == "wal_proposer_recovery" ||
     521              :                 // set by safekeeper peer recovery
     522            0 :                 appname.starts_with("safekeeper")
     523              :             }
     524              :         }
     525            0 :     }
     526              : }
     527              : 
     528              : #[cfg(test)]
     529              : mod tests {
     530              :     use super::SafekeeperPostgresCommand;
     531              : 
     532              :     /// Test parsing of START_WAL_PUSH command
     533              :     #[test]
     534            1 :     fn test_start_wal_push_parse() {
     535            1 :         let cmd = "START_WAL_PUSH";
     536            1 :         let parsed = super::parse_cmd(cmd).expect("failed to parse");
     537            1 :         match parsed {
     538              :             SafekeeperPostgresCommand::StartWalPush {
     539            1 :                 proto_version,
     540            1 :                 allow_timeline_creation,
     541            1 :             } => {
     542            1 :                 assert_eq!(proto_version, 2);
     543            1 :                 assert!(allow_timeline_creation);
     544              :             }
     545            0 :             _ => panic!("unexpected command"),
     546              :         }
     547              : 
     548            1 :         let cmd =
     549            1 :             "START_WAL_PUSH (proto_version '3', allow_timeline_creation 'false', unknown 'hoho')";
     550            1 :         let parsed = super::parse_cmd(cmd).expect("failed to parse");
     551            1 :         match parsed {
     552              :             SafekeeperPostgresCommand::StartWalPush {
     553            1 :                 proto_version,
     554            1 :                 allow_timeline_creation,
     555            1 :             } => {
     556            1 :                 assert_eq!(proto_version, 3);
     557            1 :                 assert!(!allow_timeline_creation);
     558              :             }
     559            0 :             _ => panic!("unexpected command"),
     560              :         }
     561            1 :     }
     562              : }
        

Generated by: LCOV version 2.1-beta