TLA Line data Source code
1 : //! Part of Safekeeper pretending to be Postgres, i.e. handling Postgres
2 : //! protocol commands.
3 :
4 : use anyhow::Context;
5 : use std::str::FromStr;
6 : use std::str::{self};
7 : use std::sync::Arc;
8 : use tokio::io::{AsyncRead, AsyncWrite};
9 : use tracing::{info, info_span, Instrument};
10 :
11 : use crate::auth::check_permission;
12 : use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
13 :
14 : use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED};
15 : use crate::safekeeper::Term;
16 : use crate::timeline::TimelineError;
17 : use crate::wal_service::ConnectionId;
18 : use crate::{GlobalTimelines, SafeKeeperConf};
19 : use postgres_backend::QueryError;
20 : use postgres_backend::{self, PostgresBackend};
21 : use postgres_ffi::PG_TLI;
22 : use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
23 : use regex::Regex;
24 : use utils::auth::{Claims, JwtAuth, Scope};
25 : use utils::{
26 : id::{TenantId, TenantTimelineId, TimelineId},
27 : lsn::Lsn,
28 : };
29 :
30 : /// Safekeeper handler of postgres commands
31 : pub struct SafekeeperPostgresHandler {
32 : pub conf: SafeKeeperConf,
33 : /// assigned application name
34 : pub appname: Option<String>,
35 : pub tenant_id: Option<TenantId>,
36 : pub timeline_id: Option<TimelineId>,
37 : pub ttid: TenantTimelineId,
38 : /// Unique connection id is logged in spans for observability.
39 : pub conn_id: ConnectionId,
40 : /// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
41 : auth: Option<(Scope, Arc<JwtAuth>)>,
42 : claims: Option<Claims>,
43 : io_metrics: Option<TrafficMetrics>,
44 : }
45 :
46 : /// Parsed Postgres command.
47 : enum SafekeeperPostgresCommand {
48 : StartWalPush,
49 : StartReplication { start_lsn: Lsn, term: Option<Term> },
50 : IdentifySystem,
51 : TimelineStatus,
52 : JSONCtrl { cmd: AppendLogicalMessage },
53 : }
54 :
55 CBC 4242 : fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
56 4242 : if cmd.starts_with("START_WAL_PUSH") {
57 2013 : Ok(SafekeeperPostgresCommand::StartWalPush)
58 2229 : } else if cmd.starts_with("START_REPLICATION") {
59 783 : let re = Regex::new(
60 783 : // We follow postgres START_REPLICATION LOGICAL options to pass term.
61 783 : r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?",
62 783 : )
63 783 : .unwrap();
64 783 : let caps = re
65 783 : .captures(cmd)
66 783 : .context(format!("failed to parse START_REPLICATION command {}", cmd))?;
67 783 : let start_lsn =
68 783 : Lsn::from_str(&caps[1]).context("parse start LSN from START_REPLICATION command")?;
69 783 : let term = if let Some(m) = caps.get(2) {
70 1 : Some(m.as_str().parse::<u64>().context("invalid term")?)
71 : } else {
72 782 : None
73 : };
74 783 : Ok(SafekeeperPostgresCommand::StartReplication { start_lsn, term })
75 1446 : } else if cmd.starts_with("IDENTIFY_SYSTEM") {
76 699 : Ok(SafekeeperPostgresCommand::IdentifySystem)
77 747 : } else if cmd.starts_with("TIMELINE_STATUS") {
78 744 : Ok(SafekeeperPostgresCommand::TimelineStatus)
79 3 : } else if cmd.starts_with("JSON_CTRL") {
80 3 : let cmd = cmd.strip_prefix("JSON_CTRL").context("invalid prefix")?;
81 : Ok(SafekeeperPostgresCommand::JSONCtrl {
82 3 : cmd: serde_json::from_str(cmd)?,
83 : })
84 : } else {
85 UBC 0 : anyhow::bail!("unsupported command {cmd}");
86 : }
87 CBC 4242 : }
88 :
89 4242 : fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
90 4242 : match cmd {
91 2013 : SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH",
92 783 : SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
93 744 : SafekeeperPostgresCommand::TimelineStatus => "TIMELINE_STATUS",
94 699 : SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM",
95 3 : SafekeeperPostgresCommand::JSONCtrl { .. } => "JSON_CTRL",
96 : }
97 4242 : }
98 :
99 : #[async_trait::async_trait]
100 : impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
101 : for SafekeeperPostgresHandler
102 : {
103 : // tenant_id and timeline_id are passed in connection string params
104 : fn startup(
105 : &mut self,
106 : _pgb: &mut PostgresBackend<IO>,
107 : sm: &FeStartupPacket,
108 : ) -> Result<(), QueryError> {
109 3560 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
110 3560 : if let Some(options) = params.options_raw() {
111 14254 : for opt in options {
112 : // FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
113 : // remove these after the PR gets deployed:
114 : // https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
115 10694 : match opt.split_once('=') {
116 7140 : Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
117 3560 : self.tenant_id = Some(value.parse().with_context(|| {
118 UBC 0 : format!("Failed to parse {value} as tenant id")
119 CBC 3560 : })?);
120 : }
121 3580 : Some(("ztimelineid", value)) | Some(("timeline_id", value)) => {
122 3560 : self.timeline_id = Some(value.parse().with_context(|| {
123 UBC 0 : format!("Failed to parse {value} as timeline id")
124 CBC 3560 : })?);
125 : }
126 20 : Some(("availability_zone", client_az)) => {
127 4 : if let Some(metrics) = self.io_metrics.as_ref() {
128 4 : metrics.set_client_az(client_az)
129 UBC 0 : }
130 : }
131 CBC 3570 : _ => continue,
132 : }
133 : }
134 UBC 0 : }
135 :
136 CBC 3560 : if let Some(app_name) = params.get("application_name") {
137 782 : self.appname = Some(app_name.to_owned());
138 782 : if let Some(metrics) = self.io_metrics.as_ref() {
139 782 : metrics.set_app_name(app_name)
140 UBC 0 : }
141 CBC 2778 : }
142 :
143 3560 : Ok(())
144 : } else {
145 UBC 0 : Err(QueryError::Other(anyhow::anyhow!(
146 0 : "Safekeeper received unexpected initial message: {sm:?}"
147 0 : )))
148 : }
149 CBC 3560 : }
150 :
151 138 : fn check_auth_jwt(
152 138 : &mut self,
153 138 : _pgb: &mut PostgresBackend<IO>,
154 138 : jwt_response: &[u8],
155 138 : ) -> Result<(), QueryError> {
156 138 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
157 138 : // which requires auth to be present
158 138 : let (allowed_auth_scope, auth) = self
159 138 : .auth
160 138 : .as_ref()
161 138 : .expect("auth_type is configured but .auth of handler is missing");
162 138 : let data =
163 138 : auth.decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
164 :
165 : // The handler might be configured to allow only tenant scope tokens.
166 138 : if matches!(allowed_auth_scope, Scope::Tenant)
167 107 : && !matches!(data.claims.scope, Scope::Tenant)
168 : {
169 1 : return Err(QueryError::Other(anyhow::anyhow!(
170 1 : "passed JWT token is for full access, but only tenant scope is allowed"
171 1 : )));
172 137 : }
173 :
174 137 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
175 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
176 0 : "jwt token scope is Tenant, but tenant id is missing"
177 0 : )));
178 CBC 137 : }
179 137 :
180 137 : info!(
181 137 : "jwt auth succeeded for scope: {:#?} by tenant id: {:?}",
182 137 : data.claims.scope, data.claims.tenant_id,
183 137 : );
184 :
185 137 : self.claims = Some(data.claims);
186 137 : Ok(())
187 138 : }
188 :
189 4253 : async fn process_query(
190 4253 : &mut self,
191 4253 : pgb: &mut PostgresBackend<IO>,
192 4253 : query_string: &str,
193 4253 : ) -> Result<(), QueryError> {
194 4253 : if query_string
195 4253 : .to_ascii_lowercase()
196 4253 : .starts_with("set datestyle to ")
197 : {
198 : // important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
199 11 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
200 11 : return Ok(());
201 4242 : }
202 :
203 4242 : let cmd = parse_cmd(query_string)?;
204 4242 : let cmd_str = cmd_to_string(&cmd);
205 4242 :
206 4242 : PG_QUERIES_RECEIVED.with_label_values(&[cmd_str]).inc();
207 4242 : scopeguard::defer! {
208 3845 : PG_QUERIES_FINISHED.with_label_values(&[cmd_str]).inc();
209 3845 : }
210 4242 :
211 4242 : info!(
212 4242 : "got query {:?} in timeline {:?}",
213 4242 : query_string, self.timeline_id
214 4242 : );
215 :
216 4242 : let tenant_id = self.tenant_id.context("tenantid is required")?;
217 4242 : let timeline_id = self.timeline_id.context("timelineid is required")?;
218 4242 : self.check_permission(Some(tenant_id))?;
219 4240 : self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
220 4240 : let span_ttid = self.ttid; // satisfy borrow checker
221 4240 :
222 4240 : match cmd {
223 : SafekeeperPostgresCommand::StartWalPush => {
224 2013 : self.handle_start_wal_push(pgb)
225 2013 : .instrument(info_span!("WAL receiver", ttid = %span_ttid))
226 5068421 : .await
227 : }
228 783 : SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
229 783 : self.handle_start_replication(pgb, start_lsn, term)
230 783 : .instrument(info_span!("WAL sender", ttid = %span_ttid))
231 2947784 : .await
232 : }
233 697 : SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
234 744 : SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
235 3 : SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
236 9713 : handle_json_ctrl(self, pgb, cmd).await
237 : }
238 : }
239 8109 : }
240 : }
241 :
242 : impl SafekeeperPostgresHandler {
243 3561 : pub fn new(
244 3561 : conf: SafeKeeperConf,
245 3561 : conn_id: u32,
246 3561 : io_metrics: Option<TrafficMetrics>,
247 3561 : auth: Option<(Scope, Arc<JwtAuth>)>,
248 3561 : ) -> Self {
249 3561 : SafekeeperPostgresHandler {
250 3561 : conf,
251 3561 : appname: None,
252 3561 : tenant_id: None,
253 3561 : timeline_id: None,
254 3561 : ttid: TenantTimelineId::empty(),
255 3561 : conn_id,
256 3561 : claims: None,
257 3561 : auth,
258 3561 : io_metrics,
259 3561 : }
260 3561 : }
261 :
262 : // when accessing management api supply None as an argument
263 : // when using to authorize tenant pass corresponding tenant id
264 4242 : fn check_permission(&self, tenant_id: Option<TenantId>) -> anyhow::Result<()> {
265 4242 : if self.auth.is_none() {
266 : // auth is set to Trust, nothing to check so just return ok
267 4081 : return Ok(());
268 161 : }
269 161 : // auth is some, just checked above, when auth is some
270 161 : // then claims are always present because of checks during connection init
271 161 : // so this expect won't trigger
272 161 : let claims = self
273 161 : .claims
274 161 : .as_ref()
275 161 : .expect("claims presence already checked");
276 161 : check_permission(claims, tenant_id)
277 4242 : }
278 :
279 744 : async fn handle_timeline_status<IO: AsyncRead + AsyncWrite + Unpin>(
280 744 : &mut self,
281 744 : pgb: &mut PostgresBackend<IO>,
282 744 : ) -> Result<(), QueryError> {
283 : // Get timeline, handling "not found" error
284 744 : let tli = match GlobalTimelines::get(self.ttid) {
285 275 : Ok(tli) => Ok(Some(tli)),
286 469 : Err(TimelineError::NotFound(_)) => Ok(None),
287 UBC 0 : Err(e) => Err(QueryError::Other(e.into())),
288 0 : }?;
289 :
290 : // Write row description
291 CBC 744 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
292 744 : RowDescriptor::text_col(b"flush_lsn"),
293 744 : RowDescriptor::text_col(b"commit_lsn"),
294 744 : ]))?;
295 :
296 : // Write row if timeline exists
297 744 : if let Some(tli) = tli {
298 275 : let (inmem, _state) = tli.get_state().await;
299 275 : let flush_lsn = tli.get_flush_lsn().await;
300 275 : let commit_lsn = inmem.commit_lsn;
301 275 : pgb.write_message_noflush(&BeMessage::DataRow(&[
302 275 : Some(flush_lsn.to_string().as_bytes()),
303 275 : Some(commit_lsn.to_string().as_bytes()),
304 275 : ]))?;
305 469 : }
306 :
307 744 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"TIMELINE_STATUS"))?;
308 744 : Ok(())
309 744 : }
310 :
311 : ///
312 : /// Handle IDENTIFY_SYSTEM replication command
313 : ///
314 697 : async fn handle_identify_system<IO: AsyncRead + AsyncWrite + Unpin>(
315 697 : &mut self,
316 697 : pgb: &mut PostgresBackend<IO>,
317 697 : ) -> Result<(), QueryError> {
318 697 : let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
319 :
320 697 : let lsn = if self.is_walproposer_recovery() {
321 : // walproposer should get all local WAL until flush_lsn
322 UBC 0 : tli.get_flush_lsn().await
323 : } else {
324 : // other clients shouldn't get any uncommitted WAL
325 CBC 697 : tli.get_state().await.0.commit_lsn
326 : }
327 697 : .to_string();
328 :
329 697 : let sysid = tli.get_state().await.1.server.system_id.to_string();
330 697 : let lsn_bytes = lsn.as_bytes();
331 697 : let tli = PG_TLI.to_string();
332 697 : let tli_bytes = tli.as_bytes();
333 697 : let sysid_bytes = sysid.as_bytes();
334 697 :
335 697 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
336 697 : RowDescriptor {
337 697 : name: b"systemid",
338 697 : typoid: TEXT_OID,
339 697 : typlen: -1,
340 697 : ..Default::default()
341 697 : },
342 697 : RowDescriptor {
343 697 : name: b"timeline",
344 697 : typoid: INT4_OID,
345 697 : typlen: 4,
346 697 : ..Default::default()
347 697 : },
348 697 : RowDescriptor {
349 697 : name: b"xlogpos",
350 697 : typoid: TEXT_OID,
351 697 : typlen: -1,
352 697 : ..Default::default()
353 697 : },
354 697 : RowDescriptor {
355 697 : name: b"dbname",
356 697 : typoid: TEXT_OID,
357 697 : typlen: -1,
358 697 : ..Default::default()
359 697 : },
360 697 : ]))?
361 697 : .write_message_noflush(&BeMessage::DataRow(&[
362 697 : Some(sysid_bytes),
363 697 : Some(tli_bytes),
364 697 : Some(lsn_bytes),
365 697 : None,
366 697 : ]))?
367 697 : .write_message_noflush(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
368 697 : Ok(())
369 697 : }
370 :
371 : /// Returns true if current connection is a replication connection, originating
372 : /// from a walproposer recovery function. This connection gets a special handling:
373 : /// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.
374 1480 : pub fn is_walproposer_recovery(&self) -> bool {
375 1480 : self.appname == Some("wal_proposer_recovery".to_string())
376 1480 : }
377 : }
|