Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::Context;
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use once_cell::sync::OnceCell;
9 : use pageserver_api::models::TenantState;
10 : use pageserver_api::models::{
11 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
12 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
13 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
14 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
15 : PagestreamNblocksResponse, PagestreamProtocolVersion,
16 : };
17 : use pageserver_api::shard::TenantShardId;
18 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
19 : use pq_proto::framed::ConnectionError;
20 : use pq_proto::FeStartupPacket;
21 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
22 : use std::borrow::Cow;
23 : use std::io;
24 : use std::str;
25 : use std::str::FromStr;
26 : use std::sync::Arc;
27 : use std::time::SystemTime;
28 : use std::time::{Duration, Instant};
29 : use tokio::io::AsyncWriteExt;
30 : use tokio::io::{AsyncRead, AsyncWrite};
31 : use tokio::task::JoinHandle;
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::*;
34 : use utils::{
35 : auth::{Claims, Scope, SwappableJwtAuth},
36 : id::{TenantId, TimelineId},
37 : lsn::Lsn,
38 : simple_rcu::RcuReadGuard,
39 : };
40 :
41 : use crate::auth::check_permission;
42 : use crate::basebackup;
43 : use crate::basebackup::BasebackupError;
44 : use crate::config::PageServerConf;
45 : use crate::context::{DownloadBehavior, RequestContext};
46 : use crate::metrics;
47 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
48 : use crate::pgdatadir_mapping::Version;
49 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
50 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
51 : use crate::task_mgr::TaskKind;
52 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
53 : use crate::tenant::mgr::ShardSelector;
54 : use crate::tenant::mgr::TenantManager;
55 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
56 : use crate::tenant::timeline::{self, WaitLsnError};
57 : use crate::tenant::GetTimelineError;
58 : use crate::tenant::PageReconstructError;
59 : use crate::tenant::Timeline;
60 : use pageserver_api::key::rel_block_to_key;
61 : use pageserver_api::reltag::SlruKind;
62 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
63 : use postgres_ffi::BLCKSZ;
64 :
65 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
66 : /// is not yet in state [`TenantState::Active`].
67 : ///
68 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
69 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
70 :
71 : ///////////////////////////////////////////////////////////////////////////////
72 :
73 : pub struct Listener {
74 : cancel: CancellationToken,
75 : /// Cancel the listener task through `listen_cancel` to shut down the listener
76 : /// and get a handle on the existing connections.
77 : task: JoinHandle<Connections>,
78 : }
79 :
80 : pub struct Connections {
81 : cancel: CancellationToken,
82 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
83 : }
84 :
85 0 : pub fn spawn(
86 0 : conf: &'static PageServerConf,
87 0 : tenant_manager: Arc<TenantManager>,
88 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
89 0 : tcp_listener: tokio::net::TcpListener,
90 0 : ) -> Listener {
91 0 : let cancel = CancellationToken::new();
92 0 : let libpq_ctx = RequestContext::todo_child(
93 0 : TaskKind::LibpqEndpointListener,
94 0 : // listener task shouldn't need to download anything. (We will
95 0 : // create a separate sub-contexts for each connection, with their
96 0 : // own download behavior. This context is used only to listen and
97 0 : // accept connections.)
98 0 : DownloadBehavior::Error,
99 0 : );
100 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
101 0 : "libpq listener",
102 0 : libpq_listener_main(
103 0 : tenant_manager,
104 0 : pg_auth,
105 0 : tcp_listener,
106 0 : conf.pg_auth_type,
107 0 : libpq_ctx,
108 0 : cancel.clone(),
109 0 : )
110 0 : .map(anyhow::Ok),
111 0 : ));
112 0 :
113 0 : Listener { cancel, task }
114 0 : }
115 :
116 : impl Listener {
117 0 : pub async fn stop_accepting(self) -> Connections {
118 0 : self.cancel.cancel();
119 0 : self.task
120 0 : .await
121 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
122 0 : }
123 : }
124 : impl Connections {
125 0 : pub async fn shutdown(self) {
126 0 : let Self { cancel, mut tasks } = self;
127 0 : cancel.cancel();
128 0 : while let Some(res) = tasks.join_next().await {
129 : // the logging done here mimics what was formerly done by task_mgr
130 0 : match res {
131 0 : Ok(Ok(())) => {}
132 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
133 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
134 : }
135 : }
136 0 : }
137 : }
138 :
139 : ///
140 : /// Main loop of the page service.
141 : ///
142 : /// Listens for connections, and launches a new handler task for each.
143 : ///
144 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
145 : /// open connections.
146 : ///
147 0 : pub async fn libpq_listener_main(
148 0 : tenant_manager: Arc<TenantManager>,
149 0 : auth: Option<Arc<SwappableJwtAuth>>,
150 0 : listener: tokio::net::TcpListener,
151 0 : auth_type: AuthType,
152 0 : listener_ctx: RequestContext,
153 0 : listener_cancel: CancellationToken,
154 0 : ) -> Connections {
155 0 : let connections_cancel = CancellationToken::new();
156 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
157 :
158 : // Wait for a new connection to arrive, or for server shutdown.
159 0 : while let Some(res) = tokio::select! {
160 : biased;
161 :
162 : _ = listener_cancel.cancelled() => {
163 : // We were requested to shut down.
164 : None
165 : }
166 :
167 : res = listener.accept() => {
168 : Some(res)
169 : }
170 : } {
171 0 : match res {
172 0 : Ok((socket, peer_addr)) => {
173 0 : // Connection established. Spawn a new task to handle it.
174 0 : debug!("accepted connection from {}", peer_addr);
175 0 : let local_auth = auth.clone();
176 0 : let connection_ctx = listener_ctx
177 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
178 0 : connection_handler_tasks.spawn(page_service_conn_main(
179 0 : tenant_manager.clone(),
180 0 : local_auth,
181 0 : socket,
182 0 : auth_type,
183 0 : connection_ctx,
184 0 : connections_cancel.child_token(),
185 0 : ));
186 : }
187 0 : Err(err) => {
188 0 : // accept() failed. Log the error, and loop back to retry on next connection.
189 0 : error!("accept() failed: {:?}", err);
190 : }
191 : }
192 : }
193 :
194 0 : debug!("page_service listener loop terminated");
195 :
196 0 : Connections {
197 0 : cancel: connections_cancel,
198 0 : tasks: connection_handler_tasks,
199 0 : }
200 0 : }
201 :
202 : type ConnectionHandlerResult = anyhow::Result<()>;
203 :
204 0 : #[instrument(skip_all, fields(peer_addr))]
205 : async fn page_service_conn_main(
206 : tenant_manager: Arc<TenantManager>,
207 : auth: Option<Arc<SwappableJwtAuth>>,
208 : socket: tokio::net::TcpStream,
209 : auth_type: AuthType,
210 : connection_ctx: RequestContext,
211 : cancel: CancellationToken,
212 : ) -> ConnectionHandlerResult {
213 : let _guard = LIVE_CONNECTIONS
214 : .with_label_values(&["page_service"])
215 : .guard();
216 :
217 : socket
218 : .set_nodelay(true)
219 : .context("could not set TCP_NODELAY")?;
220 :
221 : let peer_addr = socket.peer_addr().context("get peer address")?;
222 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
223 :
224 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
225 : // - long enough for most valid compute connections
226 : // - less than infinite to stop us from "leaking" connections to long-gone computes
227 : //
228 : // no write timeout is used, because the kernel is assumed to error writes after some time.
229 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
230 :
231 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
232 0 : let socket_timeout_ms = (|| {
233 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
234 : // Exponential distribution for simulating
235 : // poor network conditions, expect about avg_timeout_ms to be around 15
236 : // in tests
237 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
238 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
239 0 : let u = rand::random::<f32>();
240 0 : ((1.0 - u).ln() / (-avg)) as u64
241 : } else {
242 0 : default_timeout_ms
243 : }
244 0 : });
245 0 : default_timeout_ms
246 : })();
247 :
248 : // A timeout here does not mean the client died, it can happen if it's just idle for
249 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
250 : // they reconnect.
251 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
252 : let socket = std::pin::pin!(socket);
253 :
254 : fail::fail_point!("ps::connection-start::pre-login");
255 :
256 : // XXX: pgbackend.run() should take the connection_ctx,
257 : // and create a child per-query context when it invokes process_query.
258 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
259 : // and create the per-query context in process_query ourselves.
260 : let mut conn_handler =
261 : PageServerHandler::new(tenant_manager, auth, connection_ctx, cancel.clone());
262 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
263 :
264 : match pgbackend.run(&mut conn_handler, &cancel).await {
265 : Ok(()) => {
266 : // we've been requested to shut down
267 : Ok(())
268 : }
269 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
270 : if is_expected_io_error(&io_error) {
271 : info!("Postgres client disconnected ({io_error})");
272 : Ok(())
273 : } else {
274 : Err(io_error).context("Postgres connection error")
275 : }
276 : }
277 : other => other.context("Postgres query error"),
278 : }
279 : }
280 :
281 : struct PageServerHandler {
282 : auth: Option<Arc<SwappableJwtAuth>>,
283 : claims: Option<Claims>,
284 :
285 : /// The context created for the lifetime of the connection
286 : /// services by this PageServerHandler.
287 : /// For each query received over the connection,
288 : /// `process_query` creates a child context from this one.
289 : connection_ctx: RequestContext,
290 :
291 : cancel: CancellationToken,
292 :
293 : timeline_handles: TimelineHandles,
294 : }
295 :
296 : struct TimelineHandles {
297 : wrapper: TenantManagerWrapper,
298 : /// Note on size: the typical size of this map is 1. The largest size we expect
299 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
300 : /// or the ratio used when splitting shards (i.e. how many children created from one)
301 : /// parent shard, where a "large" number might be ~8.
302 : handles: timeline::handle::Cache<TenantManagerTypes>,
303 : }
304 :
305 : impl TimelineHandles {
306 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
307 0 : Self {
308 0 : wrapper: TenantManagerWrapper {
309 0 : tenant_manager,
310 0 : tenant_id: OnceCell::new(),
311 0 : },
312 0 : handles: Default::default(),
313 0 : }
314 0 : }
315 0 : async fn get(
316 0 : &mut self,
317 0 : tenant_id: TenantId,
318 0 : timeline_id: TimelineId,
319 0 : shard_selector: ShardSelector,
320 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
321 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
322 0 : return Err(GetActiveTimelineError::Tenant(
323 0 : GetActiveTenantError::SwitchedTenant,
324 0 : ));
325 0 : }
326 0 : self.handles
327 0 : .get(timeline_id, shard_selector, &self.wrapper)
328 0 : .await
329 0 : .map_err(|e| match e {
330 0 : timeline::handle::GetError::TenantManager(e) => e,
331 : timeline::handle::GetError::TimelineGateClosed => {
332 0 : trace!("timeline gate closed");
333 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
334 : }
335 : timeline::handle::GetError::PerTimelineStateShutDown => {
336 0 : trace!("per-timeline state shut down");
337 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
338 : }
339 0 : })
340 0 : }
341 : }
342 :
343 : pub(crate) struct TenantManagerWrapper {
344 : tenant_manager: Arc<TenantManager>,
345 : // We do not support switching tenant_id on a connection at this point.
346 : // We can can add support for this later if needed without changing
347 : // the protocol.
348 : tenant_id: once_cell::sync::OnceCell<TenantId>,
349 : }
350 :
351 : #[derive(Debug)]
352 : pub(crate) struct TenantManagerTypes;
353 :
354 : impl timeline::handle::Types for TenantManagerTypes {
355 : type TenantManagerError = GetActiveTimelineError;
356 : type TenantManager = TenantManagerWrapper;
357 : type Timeline = Arc<Timeline>;
358 : }
359 :
360 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
361 0 : fn gate(&self) -> &utils::sync::gate::Gate {
362 0 : &self.gate
363 0 : }
364 :
365 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
366 0 : Timeline::shard_timeline_id(self)
367 0 : }
368 :
369 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
370 0 : &self.handles
371 0 : }
372 :
373 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
374 0 : Timeline::get_shard_identity(self)
375 0 : }
376 : }
377 :
378 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
379 0 : async fn resolve(
380 0 : &self,
381 0 : timeline_id: TimelineId,
382 0 : shard_selector: ShardSelector,
383 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
384 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
385 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
386 0 : let wait_start = Instant::now();
387 0 : let deadline = wait_start + timeout;
388 0 : let tenant_shard = loop {
389 0 : let resolved = self
390 0 : .tenant_manager
391 0 : .resolve_attached_shard(tenant_id, shard_selector);
392 0 : match resolved {
393 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
394 : ShardResolveResult::NotFound => {
395 0 : return Err(GetActiveTimelineError::Tenant(
396 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
397 0 : ));
398 : }
399 0 : ShardResolveResult::InProgress(barrier) => {
400 : // We can't authoritatively answer right now: wait for InProgress state
401 : // to end, then try again
402 : tokio::select! {
403 : _ = barrier.wait() => {
404 : // The barrier completed: proceed around the loop to try looking up again
405 : },
406 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
407 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
408 : latest_state: None,
409 : wait_time: timeout,
410 : }));
411 : }
412 0 : }
413 0 : }
414 0 : };
415 0 : };
416 :
417 0 : tracing::debug!("Waiting for tenant to enter active state...");
418 0 : tenant_shard
419 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
420 0 : .await
421 0 : .map_err(GetActiveTimelineError::Tenant)?;
422 :
423 0 : let timeline = tenant_shard
424 0 : .get_timeline(timeline_id, true)
425 0 : .map_err(GetActiveTimelineError::Timeline)?;
426 0 : set_tracing_field_shard_id(&timeline);
427 0 : Ok(timeline)
428 0 : }
429 : }
430 :
431 0 : #[derive(thiserror::Error, Debug)]
432 : enum PageStreamError {
433 : /// We encountered an error that should prompt the client to reconnect:
434 : /// in practice this means we drop the connection without sending a response.
435 : #[error("Reconnect required: {0}")]
436 : Reconnect(Cow<'static, str>),
437 :
438 : /// We were instructed to shutdown while processing the query
439 : #[error("Shutting down")]
440 : Shutdown,
441 :
442 : /// Something went wrong reading a page: this likely indicates a pageserver bug
443 : #[error("Read error")]
444 : Read(#[source] PageReconstructError),
445 :
446 : /// Ran out of time waiting for an LSN
447 : #[error("LSN timeout: {0}")]
448 : LsnTimeout(WaitLsnError),
449 :
450 : /// The entity required to serve the request (tenant or timeline) is not found,
451 : /// or is not found in a suitable state to serve a request.
452 : #[error("Not found: {0}")]
453 : NotFound(Cow<'static, str>),
454 :
455 : /// Request asked for something that doesn't make sense, like an invalid LSN
456 : #[error("Bad request: {0}")]
457 : BadRequest(Cow<'static, str>),
458 : }
459 :
460 : impl From<PageReconstructError> for PageStreamError {
461 0 : fn from(value: PageReconstructError) -> Self {
462 0 : match value {
463 0 : PageReconstructError::Cancelled => Self::Shutdown,
464 0 : e => Self::Read(e),
465 : }
466 0 : }
467 : }
468 :
469 : impl From<GetActiveTimelineError> for PageStreamError {
470 0 : fn from(value: GetActiveTimelineError) -> Self {
471 0 : match value {
472 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
473 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
474 : TenantState::Stopping { .. },
475 : ))
476 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
477 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
478 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
479 : }
480 0 : }
481 : }
482 :
483 : impl From<WaitLsnError> for PageStreamError {
484 0 : fn from(value: WaitLsnError) -> Self {
485 0 : match value {
486 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
487 0 : WaitLsnError::Shutdown => Self::Shutdown,
488 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
489 : }
490 0 : }
491 : }
492 :
493 : impl From<WaitLsnError> for QueryError {
494 0 : fn from(value: WaitLsnError) -> Self {
495 0 : match value {
496 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
497 0 : WaitLsnError::Shutdown => Self::Shutdown,
498 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
499 : }
500 0 : }
501 : }
502 :
503 : impl PageServerHandler {
504 0 : pub fn new(
505 0 : tenant_manager: Arc<TenantManager>,
506 0 : auth: Option<Arc<SwappableJwtAuth>>,
507 0 : connection_ctx: RequestContext,
508 0 : cancel: CancellationToken,
509 0 : ) -> Self {
510 0 : PageServerHandler {
511 0 : auth,
512 0 : claims: None,
513 0 : connection_ctx,
514 0 : timeline_handles: TimelineHandles::new(tenant_manager),
515 0 : cancel,
516 0 : }
517 0 : }
518 :
519 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
520 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
521 : /// cancellation if there aren't any timelines in the cache.
522 : ///
523 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
524 : /// timeline cancellation token.
525 0 : async fn flush_cancellable<IO>(
526 0 : &self,
527 0 : pgb: &mut PostgresBackend<IO>,
528 0 : cancel: &CancellationToken,
529 0 : ) -> Result<(), QueryError>
530 0 : where
531 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
532 0 : {
533 : tokio::select!(
534 : flush_r = pgb.flush() => {
535 : Ok(flush_r?)
536 : },
537 : _ = cancel.cancelled() => {
538 : Err(QueryError::Shutdown)
539 : }
540 : )
541 0 : }
542 :
543 : /// Pagestream sub-protocol handler.
544 : ///
545 : /// It is a simple request-response protocol inside a COPYBOTH session.
546 : ///
547 : /// # Coding Discipline
548 : ///
549 : /// Coding discipline within this function: all interaction with the `pgb` connection
550 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
551 : /// This is so that we can shutdown page_service quickly.
552 0 : #[instrument(skip_all)]
553 : async fn handle_pagerequests<IO>(
554 : &mut self,
555 : pgb: &mut PostgresBackend<IO>,
556 : tenant_id: TenantId,
557 : timeline_id: TimelineId,
558 : protocol_version: PagestreamProtocolVersion,
559 : ctx: RequestContext,
560 : ) -> Result<(), QueryError>
561 : where
562 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
563 : {
564 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
565 :
566 : // switch client to COPYBOTH
567 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
568 : tokio::select! {
569 : biased;
570 : _ = self.cancel.cancelled() => {
571 : return Err(QueryError::Shutdown)
572 : }
573 : res = pgb.flush() => {
574 : res?;
575 : }
576 : }
577 :
578 : loop {
579 : // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
580 : let msg = tokio::select! {
581 : biased;
582 : _ = self.cancel.cancelled() => {
583 : return Err(QueryError::Shutdown)
584 : }
585 : msg = pgb.read_message() => { msg }
586 : };
587 : let copy_data_bytes = match msg? {
588 : Some(FeMessage::CopyData(bytes)) => bytes,
589 : Some(FeMessage::Terminate) => break,
590 : Some(m) => {
591 : return Err(QueryError::Other(anyhow::anyhow!(
592 : "unexpected message: {m:?} during COPY"
593 : )));
594 : }
595 : None => break, // client disconnected
596 : };
597 :
598 : trace!("query: {copy_data_bytes:?}");
599 : fail::fail_point!("ps::handle-pagerequest-message");
600 :
601 : // parse request
602 : let neon_fe_msg =
603 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
604 :
605 : // invoke handler function
606 : let (handler_result, span) = match neon_fe_msg {
607 : PagestreamFeMessage::Exists(req) => {
608 : fail::fail_point!("ps::handle-pagerequest-message::exists");
609 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
610 : (
611 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
612 : .instrument(span.clone())
613 : .await,
614 : span,
615 : )
616 : }
617 : PagestreamFeMessage::Nblocks(req) => {
618 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
619 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
620 : (
621 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
622 : .instrument(span.clone())
623 : .await,
624 : span,
625 : )
626 : }
627 : PagestreamFeMessage::GetPage(req) => {
628 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
629 : // shard_id is filled in by the handler
630 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
631 : (
632 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
633 : .instrument(span.clone())
634 : .await,
635 : span,
636 : )
637 : }
638 : PagestreamFeMessage::DbSize(req) => {
639 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
640 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
641 : (
642 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
643 : .instrument(span.clone())
644 : .await,
645 : span,
646 : )
647 : }
648 : PagestreamFeMessage::GetSlruSegment(req) => {
649 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
650 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
651 : (
652 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
653 : .instrument(span.clone())
654 : .await,
655 : span,
656 : )
657 : }
658 : };
659 :
660 : // Map handler result to protocol behavior.
661 : // Some handler errors cause exit from pagestream protocol.
662 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
663 : let response_msg = match handler_result {
664 : Err(e) => match &e {
665 : PageStreamError::Shutdown => {
666 : // If we fail to fulfil a request during shutdown, which may be _because_ of
667 : // shutdown, then do not send the error to the client. Instead just drop the
668 : // connection.
669 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
670 : return Err(QueryError::Shutdown);
671 : }
672 : PageStreamError::Reconnect(reason) => {
673 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
674 : return Err(QueryError::Reconnect);
675 : }
676 : PageStreamError::Read(_)
677 : | PageStreamError::LsnTimeout(_)
678 : | PageStreamError::NotFound(_)
679 : | PageStreamError::BadRequest(_) => {
680 : // print the all details to the log with {:#}, but for the client the
681 : // error message is enough. Do not log if shutting down, as the anyhow::Error
682 : // here includes cancellation which is not an error.
683 : let full = utils::error::report_compact_sources(&e);
684 0 : span.in_scope(|| {
685 0 : error!("error reading relation or page version: {full:#}")
686 0 : });
687 : PagestreamBeMessage::Error(PagestreamErrorResponse {
688 : message: e.to_string(),
689 : })
690 : }
691 : },
692 : Ok(response_msg) => response_msg,
693 : };
694 :
695 : // marshal & transmit response message
696 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
697 : tokio::select! {
698 : biased;
699 : _ = self.cancel.cancelled() => {
700 : // We were requested to shut down.
701 : info!("shutdown request received in page handler");
702 : return Err(QueryError::Shutdown)
703 : }
704 : res = pgb.flush() => {
705 : res?;
706 : }
707 : }
708 : }
709 : Ok(())
710 : }
711 :
712 : /// Helper function to handle the LSN from client request.
713 : ///
714 : /// Each GetPage (and Exists and Nblocks) request includes information about
715 : /// which version of the page is being requested. The primary compute node
716 : /// will always request the latest page version, by setting 'request_lsn' to
717 : /// the last inserted or flushed WAL position, while a standby will request
718 : /// a version at the LSN that it's currently caught up to.
719 : ///
720 : /// In either case, if the page server hasn't received the WAL up to the
721 : /// requested LSN yet, we will wait for it to arrive. The return value is
722 : /// the LSN that should be used to look up the page versions.
723 : ///
724 : /// In addition to the request LSN, each request carries another LSN,
725 : /// 'not_modified_since', which is a hint to the pageserver that the client
726 : /// knows that the page has not been modified between 'not_modified_since'
727 : /// and the request LSN. This allows skipping the wait, as long as the WAL
728 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
729 : /// information about when the page was modified, it will use
730 : /// not_modified_since == lsn. If the client lies and sends a too low
731 : /// not_modified_hint such that there are in fact later page versions, the
732 : /// behavior is undefined: the pageserver may return any of the page versions
733 : /// or an error.
734 0 : async fn wait_or_get_last_lsn(
735 0 : timeline: &Timeline,
736 0 : request_lsn: Lsn,
737 0 : not_modified_since: Lsn,
738 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
739 0 : ctx: &RequestContext,
740 0 : ) -> Result<Lsn, PageStreamError> {
741 0 : let last_record_lsn = timeline.get_last_record_lsn();
742 0 :
743 0 : // Sanity check the request
744 0 : if request_lsn < not_modified_since {
745 0 : return Err(PageStreamError::BadRequest(
746 0 : format!(
747 0 : "invalid request with request LSN {} and not_modified_since {}",
748 0 : request_lsn, not_modified_since,
749 0 : )
750 0 : .into(),
751 0 : ));
752 0 : }
753 0 :
754 0 : if request_lsn < **latest_gc_cutoff_lsn {
755 : // Check explicitly for INVALID just to get a less scary error message if the
756 : // request is obviously bogus
757 0 : return Err(if request_lsn == Lsn::INVALID {
758 0 : PageStreamError::BadRequest("invalid LSN(0) in request".into())
759 : } else {
760 0 : PageStreamError::BadRequest(format!(
761 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
762 0 : request_lsn, **latest_gc_cutoff_lsn
763 0 : ).into())
764 : });
765 0 : }
766 0 :
767 0 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
768 0 : if not_modified_since > last_record_lsn {
769 0 : timeline
770 0 : .wait_lsn(
771 0 : not_modified_since,
772 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
773 0 : ctx,
774 0 : )
775 0 : .await?;
776 : // Since we waited for 'not_modified_since' to arrive, that is now the last
777 : // record LSN. (Or close enough for our purposes; the last-record LSN can
778 : // advance immediately after we return anyway)
779 0 : Ok(not_modified_since)
780 : } else {
781 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
782 : // here instead. That would give the same result, since we know that there
783 : // haven't been any modifications since 'not_modified_since'. Using an older
784 : // LSN might be faster, because that could allow skipping recent layers when
785 : // finding the page. However, we have historically used 'last_record_lsn', so
786 : // stick to that for now.
787 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
788 : }
789 0 : }
790 :
791 0 : #[instrument(skip_all, fields(shard_id, %lsn))]
792 : async fn handle_make_lsn_lease<IO>(
793 : &mut self,
794 : pgb: &mut PostgresBackend<IO>,
795 : tenant_shard_id: TenantShardId,
796 : timeline_id: TimelineId,
797 : lsn: Lsn,
798 : ctx: &RequestContext,
799 : ) -> Result<(), QueryError>
800 : where
801 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
802 : {
803 : let timeline = self
804 : .timeline_handles
805 : .get(
806 : tenant_shard_id.tenant_id,
807 : timeline_id,
808 : ShardSelector::Known(tenant_shard_id.to_index()),
809 : )
810 : .await?;
811 : set_tracing_field_shard_id(&timeline);
812 :
813 : let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?;
814 : let valid_until = lease
815 : .valid_until
816 : .duration_since(SystemTime::UNIX_EPOCH)
817 0 : .map_err(|e| QueryError::Other(e.into()))?;
818 :
819 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
820 : b"valid_until",
821 : )]))?
822 : .write_message_noflush(&BeMessage::DataRow(&[Some(
823 : &valid_until.as_millis().to_be_bytes(),
824 : )]))?
825 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
826 :
827 : Ok(())
828 : }
829 :
830 0 : #[instrument(skip_all, fields(shard_id))]
831 : async fn handle_get_rel_exists_request(
832 : &mut self,
833 : tenant_id: TenantId,
834 : timeline_id: TimelineId,
835 : req: &PagestreamExistsRequest,
836 : ctx: &RequestContext,
837 : ) -> Result<PagestreamBeMessage, PageStreamError> {
838 : let timeline = self
839 : .timeline_handles
840 : .get(tenant_id, timeline_id, ShardSelector::Zero)
841 : .await?;
842 : let _timer = timeline
843 : .query_metrics
844 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
845 :
846 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
847 : let lsn = Self::wait_or_get_last_lsn(
848 : &timeline,
849 : req.request_lsn,
850 : req.not_modified_since,
851 : &latest_gc_cutoff_lsn,
852 : ctx,
853 : )
854 : .await?;
855 :
856 : let exists = timeline
857 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
858 : .await?;
859 :
860 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
861 : exists,
862 : }))
863 : }
864 :
865 0 : #[instrument(skip_all, fields(shard_id))]
866 : async fn handle_get_nblocks_request(
867 : &mut self,
868 : tenant_id: TenantId,
869 : timeline_id: TimelineId,
870 : req: &PagestreamNblocksRequest,
871 : ctx: &RequestContext,
872 : ) -> Result<PagestreamBeMessage, PageStreamError> {
873 : let timeline = self
874 : .timeline_handles
875 : .get(tenant_id, timeline_id, ShardSelector::Zero)
876 : .await?;
877 :
878 : let _timer = timeline
879 : .query_metrics
880 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
881 :
882 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
883 : let lsn = Self::wait_or_get_last_lsn(
884 : &timeline,
885 : req.request_lsn,
886 : req.not_modified_since,
887 : &latest_gc_cutoff_lsn,
888 : ctx,
889 : )
890 : .await?;
891 :
892 : let n_blocks = timeline
893 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
894 : .await?;
895 :
896 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
897 : n_blocks,
898 : }))
899 : }
900 :
901 0 : #[instrument(skip_all, fields(shard_id))]
902 : async fn handle_db_size_request(
903 : &mut self,
904 : tenant_id: TenantId,
905 : timeline_id: TimelineId,
906 : req: &PagestreamDbSizeRequest,
907 : ctx: &RequestContext,
908 : ) -> Result<PagestreamBeMessage, PageStreamError> {
909 : let timeline = self
910 : .timeline_handles
911 : .get(tenant_id, timeline_id, ShardSelector::Zero)
912 : .await?;
913 :
914 : let _timer = timeline
915 : .query_metrics
916 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
917 :
918 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
919 : let lsn = Self::wait_or_get_last_lsn(
920 : &timeline,
921 : req.request_lsn,
922 : req.not_modified_since,
923 : &latest_gc_cutoff_lsn,
924 : ctx,
925 : )
926 : .await?;
927 :
928 : let total_blocks = timeline
929 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
930 : .await?;
931 : let db_size = total_blocks as i64 * BLCKSZ as i64;
932 :
933 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
934 : db_size,
935 : }))
936 : }
937 :
938 0 : #[instrument(skip_all, fields(shard_id))]
939 : async fn handle_get_page_at_lsn_request(
940 : &mut self,
941 : tenant_id: TenantId,
942 : timeline_id: TimelineId,
943 : req: &PagestreamGetPageRequest,
944 : ctx: &RequestContext,
945 : ) -> Result<PagestreamBeMessage, PageStreamError> {
946 : let timeline = match self
947 : .timeline_handles
948 : .get(
949 : tenant_id,
950 : timeline_id,
951 : ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
952 : )
953 : .await
954 : {
955 : Ok(tl) => tl,
956 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
957 : // We already know this tenant exists in general, because we resolved it at
958 : // start of connection. Getting a NotFound here indicates that the shard containing
959 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
960 : // mapping is out of date.
961 : //
962 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
963 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
964 : // and talk to a different pageserver.
965 : return Err(PageStreamError::Reconnect(
966 : "getpage@lsn request routed to wrong shard".into(),
967 : ));
968 : }
969 : Err(e) => return Err(e.into()),
970 : };
971 :
972 : let _timer = timeline
973 : .query_metrics
974 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
975 :
976 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
977 : let lsn = Self::wait_or_get_last_lsn(
978 : &timeline,
979 : req.request_lsn,
980 : req.not_modified_since,
981 : &latest_gc_cutoff_lsn,
982 : ctx,
983 : )
984 : .await?;
985 :
986 : let page = timeline
987 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
988 : .await?;
989 :
990 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
991 : page,
992 : }))
993 : }
994 :
995 0 : #[instrument(skip_all, fields(shard_id))]
996 : async fn handle_get_slru_segment_request(
997 : &mut self,
998 : tenant_id: TenantId,
999 : timeline_id: TimelineId,
1000 : req: &PagestreamGetSlruSegmentRequest,
1001 : ctx: &RequestContext,
1002 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1003 : let timeline = self
1004 : .timeline_handles
1005 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1006 : .await?;
1007 :
1008 : let _timer = timeline
1009 : .query_metrics
1010 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
1011 :
1012 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1013 : let lsn = Self::wait_or_get_last_lsn(
1014 : &timeline,
1015 : req.request_lsn,
1016 : req.not_modified_since,
1017 : &latest_gc_cutoff_lsn,
1018 : ctx,
1019 : )
1020 : .await?;
1021 :
1022 : let kind = SlruKind::from_repr(req.kind)
1023 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1024 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1025 :
1026 : Ok(PagestreamBeMessage::GetSlruSegment(
1027 : PagestreamGetSlruSegmentResponse { segment },
1028 : ))
1029 : }
1030 :
1031 : /// Note on "fullbackup":
1032 : /// Full basebackups should only be used for debugging purposes.
1033 : /// Originally, it was introduced to enable breaking storage format changes,
1034 : /// but that is not applicable anymore.
1035 : ///
1036 : /// # Coding Discipline
1037 : ///
1038 : /// Coding discipline within this function: all interaction with the `pgb` connection
1039 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1040 : /// This is so that we can shutdown page_service quickly.
1041 : ///
1042 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
1043 : /// to connection cancellation.
1044 : #[allow(clippy::too_many_arguments)]
1045 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1046 : async fn handle_basebackup_request<IO>(
1047 : &mut self,
1048 : pgb: &mut PostgresBackend<IO>,
1049 : tenant_id: TenantId,
1050 : timeline_id: TimelineId,
1051 : lsn: Option<Lsn>,
1052 : prev_lsn: Option<Lsn>,
1053 : full_backup: bool,
1054 : gzip: bool,
1055 : ctx: &RequestContext,
1056 : ) -> Result<(), QueryError>
1057 : where
1058 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1059 : {
1060 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1061 0 : match err {
1062 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1063 0 : BasebackupError::Server(e) => QueryError::Other(e),
1064 : }
1065 0 : }
1066 :
1067 : let started = std::time::Instant::now();
1068 :
1069 : let timeline = self
1070 : .timeline_handles
1071 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1072 : .await?;
1073 :
1074 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1075 : if let Some(lsn) = lsn {
1076 : // Backup was requested at a particular LSN. Wait for it to arrive.
1077 : info!("waiting for {}", lsn);
1078 : timeline
1079 : .wait_lsn(
1080 : lsn,
1081 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1082 : ctx,
1083 : )
1084 : .await?;
1085 : timeline
1086 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1087 : .context("invalid basebackup lsn")?;
1088 : }
1089 :
1090 : let lsn_awaited_after = started.elapsed();
1091 :
1092 : // switch client to COPYOUT
1093 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1094 : .map_err(QueryError::Disconnected)?;
1095 : self.flush_cancellable(pgb, &self.cancel).await?;
1096 :
1097 : // Send a tarball of the latest layer on the timeline. Compress if not
1098 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1099 : if full_backup {
1100 : let mut writer = pgb.copyout_writer();
1101 : basebackup::send_basebackup_tarball(
1102 : &mut writer,
1103 : &timeline,
1104 : lsn,
1105 : prev_lsn,
1106 : full_backup,
1107 : ctx,
1108 : )
1109 : .await
1110 : .map_err(map_basebackup_error)?;
1111 : } else {
1112 : let mut writer = pgb.copyout_writer();
1113 : if gzip {
1114 : let mut encoder = GzipEncoder::with_quality(
1115 : writer,
1116 : // NOTE using fast compression because it's on the critical path
1117 : // for compute startup. For an empty database, we get
1118 : // <100KB with this method. The Level::Best compression method
1119 : // gives us <20KB, but maybe we should add basebackup caching
1120 : // on compute shutdown first.
1121 : async_compression::Level::Fastest,
1122 : );
1123 : basebackup::send_basebackup_tarball(
1124 : &mut encoder,
1125 : &timeline,
1126 : lsn,
1127 : prev_lsn,
1128 : full_backup,
1129 : ctx,
1130 : )
1131 : .await
1132 : .map_err(map_basebackup_error)?;
1133 : // shutdown the encoder to ensure the gzip footer is written
1134 : encoder
1135 : .shutdown()
1136 : .await
1137 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1138 : } else {
1139 : basebackup::send_basebackup_tarball(
1140 : &mut writer,
1141 : &timeline,
1142 : lsn,
1143 : prev_lsn,
1144 : full_backup,
1145 : ctx,
1146 : )
1147 : .await
1148 : .map_err(map_basebackup_error)?;
1149 : }
1150 : }
1151 :
1152 : pgb.write_message_noflush(&BeMessage::CopyDone)
1153 : .map_err(QueryError::Disconnected)?;
1154 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1155 :
1156 : let basebackup_after = started
1157 : .elapsed()
1158 : .checked_sub(lsn_awaited_after)
1159 : .unwrap_or(Duration::ZERO);
1160 :
1161 : info!(
1162 : lsn_await_millis = lsn_awaited_after.as_millis(),
1163 : basebackup_millis = basebackup_after.as_millis(),
1164 : "basebackup complete"
1165 : );
1166 :
1167 : Ok(())
1168 : }
1169 :
1170 : // when accessing management api supply None as an argument
1171 : // when using to authorize tenant pass corresponding tenant id
1172 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1173 0 : if self.auth.is_none() {
1174 : // auth is set to Trust, nothing to check so just return ok
1175 0 : return Ok(());
1176 0 : }
1177 0 : // auth is some, just checked above, when auth is some
1178 0 : // then claims are always present because of checks during connection init
1179 0 : // so this expect won't trigger
1180 0 : let claims = self
1181 0 : .claims
1182 0 : .as_ref()
1183 0 : .expect("claims presence already checked");
1184 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1185 0 : }
1186 : }
1187 :
1188 : #[async_trait::async_trait]
1189 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1190 : where
1191 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1192 : {
1193 0 : fn check_auth_jwt(
1194 0 : &mut self,
1195 0 : _pgb: &mut PostgresBackend<IO>,
1196 0 : jwt_response: &[u8],
1197 0 : ) -> Result<(), QueryError> {
1198 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1199 : // which requires auth to be present
1200 0 : let data = self
1201 0 : .auth
1202 0 : .as_ref()
1203 0 : .unwrap()
1204 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1205 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1206 :
1207 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1208 0 : return Err(QueryError::Unauthorized(
1209 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1210 0 : ));
1211 0 : }
1212 0 :
1213 0 : debug!(
1214 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1215 : data.claims.scope, data.claims.tenant_id,
1216 : );
1217 :
1218 0 : self.claims = Some(data.claims);
1219 0 : Ok(())
1220 0 : }
1221 :
1222 0 : fn startup(
1223 0 : &mut self,
1224 0 : _pgb: &mut PostgresBackend<IO>,
1225 0 : _sm: &FeStartupPacket,
1226 0 : ) -> Result<(), QueryError> {
1227 : fail::fail_point!("ps::connection-start::startup-packet");
1228 0 : Ok(())
1229 0 : }
1230 :
1231 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1232 : async fn process_query(
1233 : &mut self,
1234 : pgb: &mut PostgresBackend<IO>,
1235 : query_string: &str,
1236 0 : ) -> Result<(), QueryError> {
1237 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1238 0 : info!("Hit failpoint for bad connection");
1239 0 : Err(QueryError::SimulatedConnectionError)
1240 0 : });
1241 :
1242 : fail::fail_point!("ps::connection-start::process-query");
1243 :
1244 0 : let ctx = self.connection_ctx.attached_child();
1245 0 : debug!("process query {query_string:?}");
1246 0 : let parts = query_string.split_whitespace().collect::<Vec<_>>();
1247 0 : if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) {
1248 0 : if params.len() != 2 {
1249 0 : return Err(QueryError::Other(anyhow::anyhow!(
1250 0 : "invalid param number for pagestream command"
1251 0 : )));
1252 0 : }
1253 0 : let tenant_id = TenantId::from_str(params[0])
1254 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1255 0 : let timeline_id = TimelineId::from_str(params[1])
1256 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1257 :
1258 0 : tracing::Span::current()
1259 0 : .record("tenant_id", field::display(tenant_id))
1260 0 : .record("timeline_id", field::display(timeline_id));
1261 0 :
1262 0 : self.check_permission(Some(tenant_id))?;
1263 :
1264 0 : COMPUTE_COMMANDS_COUNTERS
1265 0 : .for_command(ComputeCommandKind::PageStreamV2)
1266 0 : .inc();
1267 0 :
1268 0 : self.handle_pagerequests(
1269 0 : pgb,
1270 0 : tenant_id,
1271 0 : timeline_id,
1272 0 : PagestreamProtocolVersion::V2,
1273 0 : ctx,
1274 0 : )
1275 0 : .await?;
1276 0 : } else if let Some(params) = parts.strip_prefix(&["pagestream"]) {
1277 0 : if params.len() != 2 {
1278 0 : return Err(QueryError::Other(anyhow::anyhow!(
1279 0 : "invalid param number for pagestream command"
1280 0 : )));
1281 0 : }
1282 0 : let tenant_id = TenantId::from_str(params[0])
1283 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1284 0 : let timeline_id = TimelineId::from_str(params[1])
1285 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1286 :
1287 0 : tracing::Span::current()
1288 0 : .record("tenant_id", field::display(tenant_id))
1289 0 : .record("timeline_id", field::display(timeline_id));
1290 0 :
1291 0 : self.check_permission(Some(tenant_id))?;
1292 :
1293 0 : COMPUTE_COMMANDS_COUNTERS
1294 0 : .for_command(ComputeCommandKind::PageStream)
1295 0 : .inc();
1296 0 :
1297 0 : self.handle_pagerequests(
1298 0 : pgb,
1299 0 : tenant_id,
1300 0 : timeline_id,
1301 0 : PagestreamProtocolVersion::V1,
1302 0 : ctx,
1303 0 : )
1304 0 : .await?;
1305 0 : } else if let Some(params) = parts.strip_prefix(&["basebackup"]) {
1306 0 : if params.len() < 2 {
1307 0 : return Err(QueryError::Other(anyhow::anyhow!(
1308 0 : "invalid param number for basebackup command"
1309 0 : )));
1310 0 : }
1311 :
1312 0 : let tenant_id = TenantId::from_str(params[0])
1313 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1314 0 : let timeline_id = TimelineId::from_str(params[1])
1315 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1316 :
1317 0 : tracing::Span::current()
1318 0 : .record("tenant_id", field::display(tenant_id))
1319 0 : .record("timeline_id", field::display(timeline_id));
1320 0 :
1321 0 : self.check_permission(Some(tenant_id))?;
1322 :
1323 0 : COMPUTE_COMMANDS_COUNTERS
1324 0 : .for_command(ComputeCommandKind::Basebackup)
1325 0 : .inc();
1326 :
1327 0 : let lsn = if let Some(lsn_str) = params.get(2) {
1328 : Some(
1329 0 : Lsn::from_str(lsn_str)
1330 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1331 : )
1332 : } else {
1333 0 : None
1334 : };
1335 :
1336 0 : let gzip = match params.get(3) {
1337 0 : Some(&"--gzip") => true,
1338 0 : None => false,
1339 0 : Some(third_param) => {
1340 0 : return Err(QueryError::Other(anyhow::anyhow!(
1341 0 : "Parameter in position 3 unknown {third_param}",
1342 0 : )))
1343 : }
1344 : };
1345 :
1346 0 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1347 0 : let res = async {
1348 0 : self.handle_basebackup_request(
1349 0 : pgb,
1350 0 : tenant_id,
1351 0 : timeline_id,
1352 0 : lsn,
1353 0 : None,
1354 0 : false,
1355 0 : gzip,
1356 0 : &ctx,
1357 0 : )
1358 0 : .await?;
1359 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1360 0 : Result::<(), QueryError>::Ok(())
1361 0 : }
1362 0 : .await;
1363 0 : metric_recording.observe(&res);
1364 0 : res?;
1365 : }
1366 : // same as basebackup, but result includes relational data as well
1367 0 : else if let Some(params) = parts.strip_prefix(&["fullbackup"]) {
1368 0 : if params.len() < 2 {
1369 0 : return Err(QueryError::Other(anyhow::anyhow!(
1370 0 : "invalid param number for fullbackup command"
1371 0 : )));
1372 0 : }
1373 :
1374 0 : let tenant_id = TenantId::from_str(params[0])
1375 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1376 0 : let timeline_id = TimelineId::from_str(params[1])
1377 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1378 :
1379 0 : tracing::Span::current()
1380 0 : .record("tenant_id", field::display(tenant_id))
1381 0 : .record("timeline_id", field::display(timeline_id));
1382 :
1383 : // The caller is responsible for providing correct lsn and prev_lsn.
1384 0 : let lsn = if let Some(lsn_str) = params.get(2) {
1385 : Some(
1386 0 : Lsn::from_str(lsn_str)
1387 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1388 : )
1389 : } else {
1390 0 : None
1391 : };
1392 0 : let prev_lsn = if let Some(prev_lsn_str) = params.get(3) {
1393 : Some(
1394 0 : Lsn::from_str(prev_lsn_str)
1395 0 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
1396 : )
1397 : } else {
1398 0 : None
1399 : };
1400 :
1401 0 : self.check_permission(Some(tenant_id))?;
1402 :
1403 0 : COMPUTE_COMMANDS_COUNTERS
1404 0 : .for_command(ComputeCommandKind::Fullbackup)
1405 0 : .inc();
1406 0 :
1407 0 : // Check that the timeline exists
1408 0 : self.handle_basebackup_request(
1409 0 : pgb,
1410 0 : tenant_id,
1411 0 : timeline_id,
1412 0 : lsn,
1413 0 : prev_lsn,
1414 0 : true,
1415 0 : false,
1416 0 : &ctx,
1417 0 : )
1418 0 : .await?;
1419 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1420 0 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1421 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1422 : // on connect
1423 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1424 0 : } else if query_string.starts_with("lease lsn ") {
1425 0 : let params = &parts[2..];
1426 0 : if params.len() != 3 {
1427 0 : return Err(QueryError::Other(anyhow::anyhow!(
1428 0 : "invalid param number {} for lease lsn command",
1429 0 : params.len()
1430 0 : )));
1431 0 : }
1432 :
1433 0 : let tenant_shard_id = TenantShardId::from_str(params[0])
1434 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1435 0 : let timeline_id = TimelineId::from_str(params[1])
1436 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1437 :
1438 0 : tracing::Span::current()
1439 0 : .record("tenant_id", field::display(tenant_shard_id))
1440 0 : .record("timeline_id", field::display(timeline_id));
1441 0 :
1442 0 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
1443 :
1444 0 : COMPUTE_COMMANDS_COUNTERS
1445 0 : .for_command(ComputeCommandKind::LeaseLsn)
1446 0 : .inc();
1447 :
1448 : // The caller is responsible for providing correct lsn.
1449 0 : let lsn = Lsn::from_str(params[2])
1450 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1451 :
1452 0 : match self
1453 0 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
1454 0 : .await
1455 : {
1456 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1457 0 : Err(e) => {
1458 0 : error!("error obtaining lsn lease for {lsn}: {e:?}");
1459 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1460 0 : &e.to_string(),
1461 0 : Some(e.pg_error_code()),
1462 0 : ))?
1463 : }
1464 : };
1465 : } else {
1466 0 : return Err(QueryError::Other(anyhow::anyhow!(
1467 0 : "unknown command {query_string}"
1468 0 : )));
1469 : }
1470 :
1471 0 : Ok(())
1472 0 : }
1473 : }
1474 :
1475 : impl From<GetActiveTenantError> for QueryError {
1476 0 : fn from(e: GetActiveTenantError) -> Self {
1477 0 : match e {
1478 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1479 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1480 0 : ),
1481 : GetActiveTenantError::Cancelled
1482 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1483 0 : QueryError::Shutdown
1484 : }
1485 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1486 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1487 : }
1488 0 : }
1489 : }
1490 :
1491 0 : #[derive(Debug, thiserror::Error)]
1492 : pub(crate) enum GetActiveTimelineError {
1493 : #[error(transparent)]
1494 : Tenant(GetActiveTenantError),
1495 : #[error(transparent)]
1496 : Timeline(#[from] GetTimelineError),
1497 : }
1498 :
1499 : impl From<GetActiveTimelineError> for QueryError {
1500 0 : fn from(e: GetActiveTimelineError) -> Self {
1501 0 : match e {
1502 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1503 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1504 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1505 : }
1506 0 : }
1507 : }
1508 :
1509 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1510 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1511 0 : tracing::Span::current().record(
1512 0 : "shard_id",
1513 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1514 0 : );
1515 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1516 0 : }
|