LCOV - differential code coverage report
Current view: top level - safekeeper/src - handler.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 93.9 % 247 232 15 232
Current Date: 2023-10-19 02:04:12 Functions: 48.5 % 33 16 17 16
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Part of Safekeeper pretending to be Postgres, i.e. handling Postgres
       2                 : //! protocol commands.
       3                 : 
       4                 : use anyhow::Context;
       5                 : use std::str::FromStr;
       6                 : use std::str::{self};
       7                 : use std::sync::Arc;
       8                 : use tokio::io::{AsyncRead, AsyncWrite};
       9                 : use tracing::{info, info_span, Instrument};
      10                 : 
      11                 : use crate::auth::check_permission;
      12                 : use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
      13                 : 
      14                 : use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED};
      15                 : use crate::safekeeper::Term;
      16                 : use crate::timeline::TimelineError;
      17                 : use crate::wal_service::ConnectionId;
      18                 : use crate::{GlobalTimelines, SafeKeeperConf};
      19                 : use postgres_backend::QueryError;
      20                 : use postgres_backend::{self, PostgresBackend};
      21                 : use postgres_ffi::PG_TLI;
      22                 : use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
      23                 : use regex::Regex;
      24                 : use utils::auth::{Claims, JwtAuth, Scope};
      25                 : use utils::{
      26                 :     id::{TenantId, TenantTimelineId, TimelineId},
      27                 :     lsn::Lsn,
      28                 : };
      29                 : 
      30                 : /// Safekeeper handler of postgres commands
      31                 : pub struct SafekeeperPostgresHandler {
      32                 :     pub conf: SafeKeeperConf,
      33                 :     /// assigned application name
      34                 :     pub appname: Option<String>,
      35                 :     pub tenant_id: Option<TenantId>,
      36                 :     pub timeline_id: Option<TimelineId>,
      37                 :     pub ttid: TenantTimelineId,
      38                 :     /// Unique connection id is logged in spans for observability.
      39                 :     pub conn_id: ConnectionId,
      40                 :     /// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
      41                 :     auth: Option<(Scope, Arc<JwtAuth>)>,
      42                 :     claims: Option<Claims>,
      43                 :     io_metrics: Option<TrafficMetrics>,
      44                 : }
      45                 : 
      46                 : /// Parsed Postgres command.
      47                 : enum SafekeeperPostgresCommand {
      48                 :     StartWalPush,
      49                 :     StartReplication { start_lsn: Lsn, term: Option<Term> },
      50                 :     IdentifySystem,
      51                 :     TimelineStatus,
      52                 :     JSONCtrl { cmd: AppendLogicalMessage },
      53                 : }
      54                 : 
      55 CBC        4242 : fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
      56            4242 :     if cmd.starts_with("START_WAL_PUSH") {
      57            2013 :         Ok(SafekeeperPostgresCommand::StartWalPush)
      58            2229 :     } else if cmd.starts_with("START_REPLICATION") {
      59             783 :         let re = Regex::new(
      60             783 :             // We follow postgres START_REPLICATION LOGICAL options to pass term.
      61             783 :             r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?",
      62             783 :         )
      63             783 :         .unwrap();
      64             783 :         let caps = re
      65             783 :             .captures(cmd)
      66             783 :             .context(format!("failed to parse START_REPLICATION command {}", cmd))?;
      67             783 :         let start_lsn =
      68             783 :             Lsn::from_str(&caps[1]).context("parse start LSN from START_REPLICATION command")?;
      69             783 :         let term = if let Some(m) = caps.get(2) {
      70               1 :             Some(m.as_str().parse::<u64>().context("invalid term")?)
      71                 :         } else {
      72             782 :             None
      73                 :         };
      74             783 :         Ok(SafekeeperPostgresCommand::StartReplication { start_lsn, term })
      75            1446 :     } else if cmd.starts_with("IDENTIFY_SYSTEM") {
      76             699 :         Ok(SafekeeperPostgresCommand::IdentifySystem)
      77             747 :     } else if cmd.starts_with("TIMELINE_STATUS") {
      78             744 :         Ok(SafekeeperPostgresCommand::TimelineStatus)
      79               3 :     } else if cmd.starts_with("JSON_CTRL") {
      80               3 :         let cmd = cmd.strip_prefix("JSON_CTRL").context("invalid prefix")?;
      81                 :         Ok(SafekeeperPostgresCommand::JSONCtrl {
      82               3 :             cmd: serde_json::from_str(cmd)?,
      83                 :         })
      84                 :     } else {
      85 UBC           0 :         anyhow::bail!("unsupported command {cmd}");
      86                 :     }
      87 CBC        4242 : }
      88                 : 
      89            4242 : fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
      90            4242 :     match cmd {
      91            2013 :         SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH",
      92             783 :         SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
      93             744 :         SafekeeperPostgresCommand::TimelineStatus => "TIMELINE_STATUS",
      94             699 :         SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM",
      95               3 :         SafekeeperPostgresCommand::JSONCtrl { .. } => "JSON_CTRL",
      96                 :     }
      97            4242 : }
      98                 : 
      99                 : #[async_trait::async_trait]
     100                 : impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
     101                 :     for SafekeeperPostgresHandler
     102                 : {
     103                 :     // tenant_id and timeline_id are passed in connection string params
     104                 :     fn startup(
     105                 :         &mut self,
     106                 :         _pgb: &mut PostgresBackend<IO>,
     107                 :         sm: &FeStartupPacket,
     108                 :     ) -> Result<(), QueryError> {
     109            3560 :         if let FeStartupPacket::StartupMessage { params, .. } = sm {
     110            3560 :             if let Some(options) = params.options_raw() {
     111           14254 :                 for opt in options {
     112                 :                     // FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
     113                 :                     // remove these after the PR gets deployed:
     114                 :                     // https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
     115           10694 :                     match opt.split_once('=') {
     116            7140 :                         Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
     117            3560 :                             self.tenant_id = Some(value.parse().with_context(|| {
     118 UBC           0 :                                 format!("Failed to parse {value} as tenant id")
     119 CBC        3560 :                             })?);
     120                 :                         }
     121            3580 :                         Some(("ztimelineid", value)) | Some(("timeline_id", value)) => {
     122            3560 :                             self.timeline_id = Some(value.parse().with_context(|| {
     123 UBC           0 :                                 format!("Failed to parse {value} as timeline id")
     124 CBC        3560 :                             })?);
     125                 :                         }
     126              20 :                         Some(("availability_zone", client_az)) => {
     127               4 :                             if let Some(metrics) = self.io_metrics.as_ref() {
     128               4 :                                 metrics.set_client_az(client_az)
     129 UBC           0 :                             }
     130                 :                         }
     131 CBC        3570 :                         _ => continue,
     132                 :                     }
     133                 :                 }
     134 UBC           0 :             }
     135                 : 
     136 CBC        3560 :             if let Some(app_name) = params.get("application_name") {
     137             782 :                 self.appname = Some(app_name.to_owned());
     138             782 :                 if let Some(metrics) = self.io_metrics.as_ref() {
     139             782 :                     metrics.set_app_name(app_name)
     140 UBC           0 :                 }
     141 CBC        2778 :             }
     142                 : 
     143            3560 :             Ok(())
     144                 :         } else {
     145 UBC           0 :             Err(QueryError::Other(anyhow::anyhow!(
     146               0 :                 "Safekeeper received unexpected initial message: {sm:?}"
     147               0 :             )))
     148                 :         }
     149 CBC        3560 :     }
     150                 : 
     151             138 :     fn check_auth_jwt(
     152             138 :         &mut self,
     153             138 :         _pgb: &mut PostgresBackend<IO>,
     154             138 :         jwt_response: &[u8],
     155             138 :     ) -> Result<(), QueryError> {
     156             138 :         // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
     157             138 :         // which requires auth to be present
     158             138 :         let (allowed_auth_scope, auth) = self
     159             138 :             .auth
     160             138 :             .as_ref()
     161             138 :             .expect("auth_type is configured but .auth of handler is missing");
     162             138 :         let data =
     163             138 :             auth.decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
     164                 : 
     165                 :         // The handler might be configured to allow only tenant scope tokens.
     166             138 :         if matches!(allowed_auth_scope, Scope::Tenant)
     167             107 :             && !matches!(data.claims.scope, Scope::Tenant)
     168                 :         {
     169               1 :             return Err(QueryError::Other(anyhow::anyhow!(
     170               1 :                 "passed JWT token is for full access, but only tenant scope is allowed"
     171               1 :             )));
     172             137 :         }
     173                 : 
     174             137 :         if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
     175 UBC           0 :             return Err(QueryError::Other(anyhow::anyhow!(
     176               0 :                 "jwt token scope is Tenant, but tenant id is missing"
     177               0 :             )));
     178 CBC         137 :         }
     179             137 : 
     180             137 :         info!(
     181             137 :             "jwt auth succeeded for scope: {:#?} by tenant id: {:?}",
     182             137 :             data.claims.scope, data.claims.tenant_id,
     183             137 :         );
     184                 : 
     185             137 :         self.claims = Some(data.claims);
     186             137 :         Ok(())
     187             138 :     }
     188                 : 
     189            4253 :     async fn process_query(
     190            4253 :         &mut self,
     191            4253 :         pgb: &mut PostgresBackend<IO>,
     192            4253 :         query_string: &str,
     193            4253 :     ) -> Result<(), QueryError> {
     194            4253 :         if query_string
     195            4253 :             .to_ascii_lowercase()
     196            4253 :             .starts_with("set datestyle to ")
     197                 :         {
     198                 :             // important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
     199              11 :             pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
     200              11 :             return Ok(());
     201            4242 :         }
     202                 : 
     203            4242 :         let cmd = parse_cmd(query_string)?;
     204            4242 :         let cmd_str = cmd_to_string(&cmd);
     205            4242 : 
     206            4242 :         PG_QUERIES_RECEIVED.with_label_values(&[cmd_str]).inc();
     207            4242 :         scopeguard::defer! {
     208            3845 :             PG_QUERIES_FINISHED.with_label_values(&[cmd_str]).inc();
     209            3845 :         }
     210            4242 : 
     211            4242 :         info!(
     212            4242 :             "got query {:?} in timeline {:?}",
     213            4242 :             query_string, self.timeline_id
     214            4242 :         );
     215                 : 
     216            4242 :         let tenant_id = self.tenant_id.context("tenantid is required")?;
     217            4242 :         let timeline_id = self.timeline_id.context("timelineid is required")?;
     218            4242 :         self.check_permission(Some(tenant_id))?;
     219            4240 :         self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
     220            4240 :         let span_ttid = self.ttid; // satisfy borrow checker
     221            4240 : 
     222            4240 :         match cmd {
     223                 :             SafekeeperPostgresCommand::StartWalPush => {
     224            2013 :                 self.handle_start_wal_push(pgb)
     225            2013 :                     .instrument(info_span!("WAL receiver", ttid = %span_ttid))
     226         5068421 :                     .await
     227                 :             }
     228             783 :             SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
     229             783 :                 self.handle_start_replication(pgb, start_lsn, term)
     230             783 :                     .instrument(info_span!("WAL sender", ttid = %span_ttid))
     231         2947784 :                     .await
     232                 :             }
     233             697 :             SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
     234             744 :             SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
     235               3 :             SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
     236            9713 :                 handle_json_ctrl(self, pgb, cmd).await
     237                 :             }
     238                 :         }
     239            8109 :     }
     240                 : }
     241                 : 
     242                 : impl SafekeeperPostgresHandler {
     243            3561 :     pub fn new(
     244            3561 :         conf: SafeKeeperConf,
     245            3561 :         conn_id: u32,
     246            3561 :         io_metrics: Option<TrafficMetrics>,
     247            3561 :         auth: Option<(Scope, Arc<JwtAuth>)>,
     248            3561 :     ) -> Self {
     249            3561 :         SafekeeperPostgresHandler {
     250            3561 :             conf,
     251            3561 :             appname: None,
     252            3561 :             tenant_id: None,
     253            3561 :             timeline_id: None,
     254            3561 :             ttid: TenantTimelineId::empty(),
     255            3561 :             conn_id,
     256            3561 :             claims: None,
     257            3561 :             auth,
     258            3561 :             io_metrics,
     259            3561 :         }
     260            3561 :     }
     261                 : 
     262                 :     // when accessing management api supply None as an argument
     263                 :     // when using to authorize tenant pass corresponding tenant id
     264            4242 :     fn check_permission(&self, tenant_id: Option<TenantId>) -> anyhow::Result<()> {
     265            4242 :         if self.auth.is_none() {
     266                 :             // auth is set to Trust, nothing to check so just return ok
     267            4081 :             return Ok(());
     268             161 :         }
     269             161 :         // auth is some, just checked above, when auth is some
     270             161 :         // then claims are always present because of checks during connection init
     271             161 :         // so this expect won't trigger
     272             161 :         let claims = self
     273             161 :             .claims
     274             161 :             .as_ref()
     275             161 :             .expect("claims presence already checked");
     276             161 :         check_permission(claims, tenant_id)
     277            4242 :     }
     278                 : 
     279             744 :     async fn handle_timeline_status<IO: AsyncRead + AsyncWrite + Unpin>(
     280             744 :         &mut self,
     281             744 :         pgb: &mut PostgresBackend<IO>,
     282             744 :     ) -> Result<(), QueryError> {
     283                 :         // Get timeline, handling "not found" error
     284             744 :         let tli = match GlobalTimelines::get(self.ttid) {
     285             275 :             Ok(tli) => Ok(Some(tli)),
     286             469 :             Err(TimelineError::NotFound(_)) => Ok(None),
     287 UBC           0 :             Err(e) => Err(QueryError::Other(e.into())),
     288               0 :         }?;
     289                 : 
     290                 :         // Write row description
     291 CBC         744 :         pgb.write_message_noflush(&BeMessage::RowDescription(&[
     292             744 :             RowDescriptor::text_col(b"flush_lsn"),
     293             744 :             RowDescriptor::text_col(b"commit_lsn"),
     294             744 :         ]))?;
     295                 : 
     296                 :         // Write row if timeline exists
     297             744 :         if let Some(tli) = tli {
     298             275 :             let (inmem, _state) = tli.get_state().await;
     299             275 :             let flush_lsn = tli.get_flush_lsn().await;
     300             275 :             let commit_lsn = inmem.commit_lsn;
     301             275 :             pgb.write_message_noflush(&BeMessage::DataRow(&[
     302             275 :                 Some(flush_lsn.to_string().as_bytes()),
     303             275 :                 Some(commit_lsn.to_string().as_bytes()),
     304             275 :             ]))?;
     305             469 :         }
     306                 : 
     307             744 :         pgb.write_message_noflush(&BeMessage::CommandComplete(b"TIMELINE_STATUS"))?;
     308             744 :         Ok(())
     309             744 :     }
     310                 : 
     311                 :     ///
     312                 :     /// Handle IDENTIFY_SYSTEM replication command
     313                 :     ///
     314             697 :     async fn handle_identify_system<IO: AsyncRead + AsyncWrite + Unpin>(
     315             697 :         &mut self,
     316             697 :         pgb: &mut PostgresBackend<IO>,
     317             697 :     ) -> Result<(), QueryError> {
     318             697 :         let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
     319                 : 
     320             697 :         let lsn = if self.is_walproposer_recovery() {
     321                 :             // walproposer should get all local WAL until flush_lsn
     322 UBC           0 :             tli.get_flush_lsn().await
     323                 :         } else {
     324                 :             // other clients shouldn't get any uncommitted WAL
     325 CBC         697 :             tli.get_state().await.0.commit_lsn
     326                 :         }
     327             697 :         .to_string();
     328                 : 
     329             697 :         let sysid = tli.get_state().await.1.server.system_id.to_string();
     330             697 :         let lsn_bytes = lsn.as_bytes();
     331             697 :         let tli = PG_TLI.to_string();
     332             697 :         let tli_bytes = tli.as_bytes();
     333             697 :         let sysid_bytes = sysid.as_bytes();
     334             697 : 
     335             697 :         pgb.write_message_noflush(&BeMessage::RowDescription(&[
     336             697 :             RowDescriptor {
     337             697 :                 name: b"systemid",
     338             697 :                 typoid: TEXT_OID,
     339             697 :                 typlen: -1,
     340             697 :                 ..Default::default()
     341             697 :             },
     342             697 :             RowDescriptor {
     343             697 :                 name: b"timeline",
     344             697 :                 typoid: INT4_OID,
     345             697 :                 typlen: 4,
     346             697 :                 ..Default::default()
     347             697 :             },
     348             697 :             RowDescriptor {
     349             697 :                 name: b"xlogpos",
     350             697 :                 typoid: TEXT_OID,
     351             697 :                 typlen: -1,
     352             697 :                 ..Default::default()
     353             697 :             },
     354             697 :             RowDescriptor {
     355             697 :                 name: b"dbname",
     356             697 :                 typoid: TEXT_OID,
     357             697 :                 typlen: -1,
     358             697 :                 ..Default::default()
     359             697 :             },
     360             697 :         ]))?
     361             697 :         .write_message_noflush(&BeMessage::DataRow(&[
     362             697 :             Some(sysid_bytes),
     363             697 :             Some(tli_bytes),
     364             697 :             Some(lsn_bytes),
     365             697 :             None,
     366             697 :         ]))?
     367             697 :         .write_message_noflush(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
     368             697 :         Ok(())
     369             697 :     }
     370                 : 
     371                 :     /// Returns true if current connection is a replication connection, originating
     372                 :     /// from a walproposer recovery function. This connection gets a special handling:
     373                 :     /// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.
     374            1480 :     pub fn is_walproposer_recovery(&self) -> bool {
     375            1480 :         self.appname == Some("wal_proposer_recovery".to_string())
     376            1480 :     }
     377                 : }
        

Generated by: LCOV version 2.1-beta