Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::Context;
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use once_cell::sync::OnceCell;
9 : use pageserver_api::models::TenantState;
10 : use pageserver_api::models::{
11 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
12 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
13 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
14 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
15 : PagestreamNblocksResponse, PagestreamProtocolVersion,
16 : };
17 : use pageserver_api::shard::TenantShardId;
18 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
19 : use pq_proto::framed::ConnectionError;
20 : use pq_proto::FeStartupPacket;
21 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
22 : use std::borrow::Cow;
23 : use std::io;
24 : use std::str;
25 : use std::str::FromStr;
26 : use std::sync::Arc;
27 : use std::time::SystemTime;
28 : use std::time::{Duration, Instant};
29 : use tokio::io::AsyncWriteExt;
30 : use tokio::io::{AsyncRead, AsyncWrite};
31 : use tokio::task::JoinHandle;
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::*;
34 : use utils::{
35 : auth::{Claims, Scope, SwappableJwtAuth},
36 : id::{TenantId, TimelineId},
37 : lsn::Lsn,
38 : simple_rcu::RcuReadGuard,
39 : };
40 :
41 : use crate::auth::check_permission;
42 : use crate::basebackup;
43 : use crate::basebackup::BasebackupError;
44 : use crate::config::PageServerConf;
45 : use crate::context::{DownloadBehavior, RequestContext};
46 : use crate::metrics;
47 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
48 : use crate::pgdatadir_mapping::Version;
49 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
50 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
51 : use crate::task_mgr::TaskKind;
52 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
53 : use crate::tenant::mgr::ShardSelector;
54 : use crate::tenant::mgr::TenantManager;
55 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
56 : use crate::tenant::timeline::{self, WaitLsnError};
57 : use crate::tenant::GetTimelineError;
58 : use crate::tenant::PageReconstructError;
59 : use crate::tenant::Timeline;
60 : use pageserver_api::key::rel_block_to_key;
61 : use pageserver_api::reltag::SlruKind;
62 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
63 : use postgres_ffi::BLCKSZ;
64 :
65 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
66 : /// is not yet in state [`TenantState::Active`].
67 : ///
68 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
69 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
70 :
71 : ///////////////////////////////////////////////////////////////////////////////
72 :
73 : pub struct Listener {
74 : cancel: CancellationToken,
75 : /// Cancel the listener task through `listen_cancel` to shut down the listener
76 : /// and get a handle on the existing connections.
77 : task: JoinHandle<Connections>,
78 : }
79 :
80 : pub struct Connections {
81 : cancel: CancellationToken,
82 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
83 : }
84 :
85 0 : pub fn spawn(
86 0 : conf: &'static PageServerConf,
87 0 : tenant_manager: Arc<TenantManager>,
88 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
89 0 : tcp_listener: tokio::net::TcpListener,
90 0 : ) -> Listener {
91 0 : let cancel = CancellationToken::new();
92 0 : let libpq_ctx = RequestContext::todo_child(
93 0 : TaskKind::LibpqEndpointListener,
94 0 : // listener task shouldn't need to download anything. (We will
95 0 : // create a separate sub-contexts for each connection, with their
96 0 : // own download behavior. This context is used only to listen and
97 0 : // accept connections.)
98 0 : DownloadBehavior::Error,
99 0 : );
100 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
101 0 : "libpq listener",
102 0 : libpq_listener_main(
103 0 : tenant_manager,
104 0 : pg_auth,
105 0 : tcp_listener,
106 0 : conf.pg_auth_type,
107 0 : libpq_ctx,
108 0 : cancel.clone(),
109 0 : )
110 0 : .map(anyhow::Ok),
111 0 : ));
112 0 :
113 0 : Listener { cancel, task }
114 0 : }
115 :
116 : impl Listener {
117 0 : pub async fn stop_accepting(self) -> Connections {
118 0 : self.cancel.cancel();
119 0 : self.task
120 0 : .await
121 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
122 0 : }
123 : }
124 : impl Connections {
125 0 : pub(crate) async fn shutdown(self) {
126 0 : let Self { cancel, mut tasks } = self;
127 0 : cancel.cancel();
128 0 : while let Some(res) = tasks.join_next().await {
129 0 : Self::handle_connection_completion(res);
130 0 : }
131 0 : }
132 :
133 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
134 0 : match res {
135 0 : Ok(Ok(())) => {}
136 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
137 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
138 : }
139 0 : }
140 : }
141 :
142 : ///
143 : /// Main loop of the page service.
144 : ///
145 : /// Listens for connections, and launches a new handler task for each.
146 : ///
147 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
148 : /// open connections.
149 : ///
150 0 : pub async fn libpq_listener_main(
151 0 : tenant_manager: Arc<TenantManager>,
152 0 : auth: Option<Arc<SwappableJwtAuth>>,
153 0 : listener: tokio::net::TcpListener,
154 0 : auth_type: AuthType,
155 0 : listener_ctx: RequestContext,
156 0 : listener_cancel: CancellationToken,
157 0 : ) -> Connections {
158 0 : let connections_cancel = CancellationToken::new();
159 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
160 :
161 : loop {
162 0 : let accepted = tokio::select! {
163 : biased;
164 0 : _ = listener_cancel.cancelled() => break,
165 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
166 0 : let res = next.expect("we dont poll while empty");
167 0 : Connections::handle_connection_completion(res);
168 0 : continue;
169 : }
170 0 : accepted = listener.accept() => accepted,
171 0 : };
172 0 :
173 0 : match accepted {
174 0 : Ok((socket, peer_addr)) => {
175 0 : // Connection established. Spawn a new task to handle it.
176 0 : debug!("accepted connection from {}", peer_addr);
177 0 : let local_auth = auth.clone();
178 0 : let connection_ctx = listener_ctx
179 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
180 0 : connection_handler_tasks.spawn(page_service_conn_main(
181 0 : tenant_manager.clone(),
182 0 : local_auth,
183 0 : socket,
184 0 : auth_type,
185 0 : connection_ctx,
186 0 : connections_cancel.child_token(),
187 0 : ));
188 : }
189 0 : Err(err) => {
190 0 : // accept() failed. Log the error, and loop back to retry on next connection.
191 0 : error!("accept() failed: {:?}", err);
192 : }
193 : }
194 : }
195 :
196 0 : debug!("page_service listener loop terminated");
197 :
198 0 : Connections {
199 0 : cancel: connections_cancel,
200 0 : tasks: connection_handler_tasks,
201 0 : }
202 0 : }
203 :
204 : type ConnectionHandlerResult = anyhow::Result<()>;
205 :
206 0 : #[instrument(skip_all, fields(peer_addr))]
207 : async fn page_service_conn_main(
208 : tenant_manager: Arc<TenantManager>,
209 : auth: Option<Arc<SwappableJwtAuth>>,
210 : socket: tokio::net::TcpStream,
211 : auth_type: AuthType,
212 : connection_ctx: RequestContext,
213 : cancel: CancellationToken,
214 : ) -> ConnectionHandlerResult {
215 : let _guard = LIVE_CONNECTIONS
216 : .with_label_values(&["page_service"])
217 : .guard();
218 :
219 : socket
220 : .set_nodelay(true)
221 : .context("could not set TCP_NODELAY")?;
222 :
223 : let peer_addr = socket.peer_addr().context("get peer address")?;
224 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
225 :
226 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
227 : // - long enough for most valid compute connections
228 : // - less than infinite to stop us from "leaking" connections to long-gone computes
229 : //
230 : // no write timeout is used, because the kernel is assumed to error writes after some time.
231 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
232 :
233 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
234 0 : let socket_timeout_ms = (|| {
235 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
236 : // Exponential distribution for simulating
237 : // poor network conditions, expect about avg_timeout_ms to be around 15
238 : // in tests
239 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
240 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
241 0 : let u = rand::random::<f32>();
242 0 : ((1.0 - u).ln() / (-avg)) as u64
243 : } else {
244 0 : default_timeout_ms
245 : }
246 0 : });
247 0 : default_timeout_ms
248 : })();
249 :
250 : // A timeout here does not mean the client died, it can happen if it's just idle for
251 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
252 : // they reconnect.
253 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
254 : let socket = std::pin::pin!(socket);
255 :
256 : fail::fail_point!("ps::connection-start::pre-login");
257 :
258 : // XXX: pgbackend.run() should take the connection_ctx,
259 : // and create a child per-query context when it invokes process_query.
260 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
261 : // and create the per-query context in process_query ourselves.
262 : let mut conn_handler =
263 : PageServerHandler::new(tenant_manager, auth, connection_ctx, cancel.clone());
264 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
265 :
266 : match pgbackend.run(&mut conn_handler, &cancel).await {
267 : Ok(()) => {
268 : // we've been requested to shut down
269 : Ok(())
270 : }
271 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
272 : if is_expected_io_error(&io_error) {
273 : info!("Postgres client disconnected ({io_error})");
274 : Ok(())
275 : } else {
276 : Err(io_error).context("Postgres connection error")
277 : }
278 : }
279 : other => other.context("Postgres query error"),
280 : }
281 : }
282 :
283 : struct PageServerHandler {
284 : auth: Option<Arc<SwappableJwtAuth>>,
285 : claims: Option<Claims>,
286 :
287 : /// The context created for the lifetime of the connection
288 : /// services by this PageServerHandler.
289 : /// For each query received over the connection,
290 : /// `process_query` creates a child context from this one.
291 : connection_ctx: RequestContext,
292 :
293 : cancel: CancellationToken,
294 :
295 : timeline_handles: TimelineHandles,
296 : }
297 :
298 : struct TimelineHandles {
299 : wrapper: TenantManagerWrapper,
300 : /// Note on size: the typical size of this map is 1. The largest size we expect
301 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
302 : /// or the ratio used when splitting shards (i.e. how many children created from one)
303 : /// parent shard, where a "large" number might be ~8.
304 : handles: timeline::handle::Cache<TenantManagerTypes>,
305 : }
306 :
307 : impl TimelineHandles {
308 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
309 0 : Self {
310 0 : wrapper: TenantManagerWrapper {
311 0 : tenant_manager,
312 0 : tenant_id: OnceCell::new(),
313 0 : },
314 0 : handles: Default::default(),
315 0 : }
316 0 : }
317 0 : async fn get(
318 0 : &mut self,
319 0 : tenant_id: TenantId,
320 0 : timeline_id: TimelineId,
321 0 : shard_selector: ShardSelector,
322 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
323 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
324 0 : return Err(GetActiveTimelineError::Tenant(
325 0 : GetActiveTenantError::SwitchedTenant,
326 0 : ));
327 0 : }
328 0 : self.handles
329 0 : .get(timeline_id, shard_selector, &self.wrapper)
330 0 : .await
331 0 : .map_err(|e| match e {
332 0 : timeline::handle::GetError::TenantManager(e) => e,
333 : timeline::handle::GetError::TimelineGateClosed => {
334 0 : trace!("timeline gate closed");
335 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
336 : }
337 : timeline::handle::GetError::PerTimelineStateShutDown => {
338 0 : trace!("per-timeline state shut down");
339 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
340 : }
341 0 : })
342 0 : }
343 : }
344 :
345 : pub(crate) struct TenantManagerWrapper {
346 : tenant_manager: Arc<TenantManager>,
347 : // We do not support switching tenant_id on a connection at this point.
348 : // We can can add support for this later if needed without changing
349 : // the protocol.
350 : tenant_id: once_cell::sync::OnceCell<TenantId>,
351 : }
352 :
353 : #[derive(Debug)]
354 : pub(crate) struct TenantManagerTypes;
355 :
356 : impl timeline::handle::Types for TenantManagerTypes {
357 : type TenantManagerError = GetActiveTimelineError;
358 : type TenantManager = TenantManagerWrapper;
359 : type Timeline = Arc<Timeline>;
360 : }
361 :
362 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
363 0 : fn gate(&self) -> &utils::sync::gate::Gate {
364 0 : &self.gate
365 0 : }
366 :
367 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
368 0 : Timeline::shard_timeline_id(self)
369 0 : }
370 :
371 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
372 0 : &self.handles
373 0 : }
374 :
375 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
376 0 : Timeline::get_shard_identity(self)
377 0 : }
378 : }
379 :
380 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
381 0 : async fn resolve(
382 0 : &self,
383 0 : timeline_id: TimelineId,
384 0 : shard_selector: ShardSelector,
385 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
386 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
387 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
388 0 : let wait_start = Instant::now();
389 0 : let deadline = wait_start + timeout;
390 0 : let tenant_shard = loop {
391 0 : let resolved = self
392 0 : .tenant_manager
393 0 : .resolve_attached_shard(tenant_id, shard_selector);
394 0 : match resolved {
395 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
396 : ShardResolveResult::NotFound => {
397 0 : return Err(GetActiveTimelineError::Tenant(
398 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
399 0 : ));
400 : }
401 0 : ShardResolveResult::InProgress(barrier) => {
402 0 : // We can't authoritatively answer right now: wait for InProgress state
403 0 : // to end, then try again
404 0 : tokio::select! {
405 0 : _ = barrier.wait() => {
406 0 : // The barrier completed: proceed around the loop to try looking up again
407 0 : },
408 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
409 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
410 0 : latest_state: None,
411 0 : wait_time: timeout,
412 0 : }));
413 : }
414 : }
415 : }
416 : };
417 : };
418 :
419 0 : tracing::debug!("Waiting for tenant to enter active state...");
420 0 : tenant_shard
421 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
422 0 : .await
423 0 : .map_err(GetActiveTimelineError::Tenant)?;
424 :
425 0 : let timeline = tenant_shard
426 0 : .get_timeline(timeline_id, true)
427 0 : .map_err(GetActiveTimelineError::Timeline)?;
428 0 : set_tracing_field_shard_id(&timeline);
429 0 : Ok(timeline)
430 0 : }
431 : }
432 :
433 0 : #[derive(thiserror::Error, Debug)]
434 : enum PageStreamError {
435 : /// We encountered an error that should prompt the client to reconnect:
436 : /// in practice this means we drop the connection without sending a response.
437 : #[error("Reconnect required: {0}")]
438 : Reconnect(Cow<'static, str>),
439 :
440 : /// We were instructed to shutdown while processing the query
441 : #[error("Shutting down")]
442 : Shutdown,
443 :
444 : /// Something went wrong reading a page: this likely indicates a pageserver bug
445 : #[error("Read error")]
446 : Read(#[source] PageReconstructError),
447 :
448 : /// Ran out of time waiting for an LSN
449 : #[error("LSN timeout: {0}")]
450 : LsnTimeout(WaitLsnError),
451 :
452 : /// The entity required to serve the request (tenant or timeline) is not found,
453 : /// or is not found in a suitable state to serve a request.
454 : #[error("Not found: {0}")]
455 : NotFound(Cow<'static, str>),
456 :
457 : /// Request asked for something that doesn't make sense, like an invalid LSN
458 : #[error("Bad request: {0}")]
459 : BadRequest(Cow<'static, str>),
460 : }
461 :
462 : impl From<PageReconstructError> for PageStreamError {
463 0 : fn from(value: PageReconstructError) -> Self {
464 0 : match value {
465 0 : PageReconstructError::Cancelled => Self::Shutdown,
466 0 : e => Self::Read(e),
467 : }
468 0 : }
469 : }
470 :
471 : impl From<GetActiveTimelineError> for PageStreamError {
472 0 : fn from(value: GetActiveTimelineError) -> Self {
473 0 : match value {
474 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
475 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
476 : TenantState::Stopping { .. },
477 : ))
478 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
479 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
480 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
481 : }
482 0 : }
483 : }
484 :
485 : impl From<WaitLsnError> for PageStreamError {
486 0 : fn from(value: WaitLsnError) -> Self {
487 0 : match value {
488 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
489 0 : WaitLsnError::Shutdown => Self::Shutdown,
490 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
491 : }
492 0 : }
493 : }
494 :
495 : impl From<WaitLsnError> for QueryError {
496 0 : fn from(value: WaitLsnError) -> Self {
497 0 : match value {
498 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
499 0 : WaitLsnError::Shutdown => Self::Shutdown,
500 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
501 : }
502 0 : }
503 : }
504 :
505 : impl PageServerHandler {
506 0 : pub fn new(
507 0 : tenant_manager: Arc<TenantManager>,
508 0 : auth: Option<Arc<SwappableJwtAuth>>,
509 0 : connection_ctx: RequestContext,
510 0 : cancel: CancellationToken,
511 0 : ) -> Self {
512 0 : PageServerHandler {
513 0 : auth,
514 0 : claims: None,
515 0 : connection_ctx,
516 0 : timeline_handles: TimelineHandles::new(tenant_manager),
517 0 : cancel,
518 0 : }
519 0 : }
520 :
521 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
522 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
523 : /// cancellation if there aren't any timelines in the cache.
524 : ///
525 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
526 : /// timeline cancellation token.
527 0 : async fn flush_cancellable<IO>(
528 0 : &self,
529 0 : pgb: &mut PostgresBackend<IO>,
530 0 : cancel: &CancellationToken,
531 0 : ) -> Result<(), QueryError>
532 0 : where
533 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
534 0 : {
535 0 : tokio::select!(
536 0 : flush_r = pgb.flush() => {
537 0 : Ok(flush_r?)
538 : },
539 0 : _ = cancel.cancelled() => {
540 0 : Err(QueryError::Shutdown)
541 : }
542 : )
543 0 : }
544 :
545 : /// Pagestream sub-protocol handler.
546 : ///
547 : /// It is a simple request-response protocol inside a COPYBOTH session.
548 : ///
549 : /// # Coding Discipline
550 : ///
551 : /// Coding discipline within this function: all interaction with the `pgb` connection
552 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
553 : /// This is so that we can shutdown page_service quickly.
554 0 : #[instrument(skip_all)]
555 : async fn handle_pagerequests<IO>(
556 : &mut self,
557 : pgb: &mut PostgresBackend<IO>,
558 : tenant_id: TenantId,
559 : timeline_id: TimelineId,
560 : _protocol_version: PagestreamProtocolVersion,
561 : ctx: RequestContext,
562 : ) -> Result<(), QueryError>
563 : where
564 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
565 : {
566 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
567 :
568 : // switch client to COPYBOTH
569 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
570 : tokio::select! {
571 : biased;
572 : _ = self.cancel.cancelled() => {
573 : return Err(QueryError::Shutdown)
574 : }
575 : res = pgb.flush() => {
576 : res?;
577 : }
578 : }
579 :
580 : loop {
581 : // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
582 : let msg = tokio::select! {
583 : biased;
584 : _ = self.cancel.cancelled() => {
585 : return Err(QueryError::Shutdown)
586 : }
587 : msg = pgb.read_message() => { msg }
588 : };
589 : let copy_data_bytes = match msg? {
590 : Some(FeMessage::CopyData(bytes)) => bytes,
591 : Some(FeMessage::Terminate) => break,
592 : Some(m) => {
593 : return Err(QueryError::Other(anyhow::anyhow!(
594 : "unexpected message: {m:?} during COPY"
595 : )));
596 : }
597 : None => break, // client disconnected
598 : };
599 :
600 : trace!("query: {copy_data_bytes:?}");
601 : fail::fail_point!("ps::handle-pagerequest-message");
602 :
603 : // parse request
604 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
605 :
606 : // invoke handler function
607 : let (handler_result, span) = match neon_fe_msg {
608 : PagestreamFeMessage::Exists(req) => {
609 : fail::fail_point!("ps::handle-pagerequest-message::exists");
610 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
611 : (
612 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
613 : .instrument(span.clone())
614 : .await,
615 : span,
616 : )
617 : }
618 : PagestreamFeMessage::Nblocks(req) => {
619 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
620 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
621 : (
622 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
623 : .instrument(span.clone())
624 : .await,
625 : span,
626 : )
627 : }
628 : PagestreamFeMessage::GetPage(req) => {
629 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
630 : // shard_id is filled in by the handler
631 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
632 : (
633 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
634 : .instrument(span.clone())
635 : .await,
636 : span,
637 : )
638 : }
639 : PagestreamFeMessage::DbSize(req) => {
640 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
641 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
642 : (
643 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
644 : .instrument(span.clone())
645 : .await,
646 : span,
647 : )
648 : }
649 : PagestreamFeMessage::GetSlruSegment(req) => {
650 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
651 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
652 : (
653 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
654 : .instrument(span.clone())
655 : .await,
656 : span,
657 : )
658 : }
659 : };
660 :
661 : // Map handler result to protocol behavior.
662 : // Some handler errors cause exit from pagestream protocol.
663 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
664 : let response_msg = match handler_result {
665 : Err(e) => match &e {
666 : PageStreamError::Shutdown => {
667 : // If we fail to fulfil a request during shutdown, which may be _because_ of
668 : // shutdown, then do not send the error to the client. Instead just drop the
669 : // connection.
670 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
671 : return Err(QueryError::Shutdown);
672 : }
673 : PageStreamError::Reconnect(reason) => {
674 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
675 : return Err(QueryError::Reconnect);
676 : }
677 : PageStreamError::Read(_)
678 : | PageStreamError::LsnTimeout(_)
679 : | PageStreamError::NotFound(_)
680 : | PageStreamError::BadRequest(_) => {
681 : // print the all details to the log with {:#}, but for the client the
682 : // error message is enough. Do not log if shutting down, as the anyhow::Error
683 : // here includes cancellation which is not an error.
684 : let full = utils::error::report_compact_sources(&e);
685 0 : span.in_scope(|| {
686 0 : error!("error reading relation or page version: {full:#}")
687 0 : });
688 : PagestreamBeMessage::Error(PagestreamErrorResponse {
689 : message: e.to_string(),
690 : })
691 : }
692 : },
693 : Ok(response_msg) => response_msg,
694 : };
695 :
696 : // marshal & transmit response message
697 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
698 : tokio::select! {
699 : biased;
700 : _ = self.cancel.cancelled() => {
701 : // We were requested to shut down.
702 : info!("shutdown request received in page handler");
703 : return Err(QueryError::Shutdown)
704 : }
705 : res = pgb.flush() => {
706 : res?;
707 : }
708 : }
709 : }
710 : Ok(())
711 : }
712 :
713 : /// Helper function to handle the LSN from client request.
714 : ///
715 : /// Each GetPage (and Exists and Nblocks) request includes information about
716 : /// which version of the page is being requested. The primary compute node
717 : /// will always request the latest page version, by setting 'request_lsn' to
718 : /// the last inserted or flushed WAL position, while a standby will request
719 : /// a version at the LSN that it's currently caught up to.
720 : ///
721 : /// In either case, if the page server hasn't received the WAL up to the
722 : /// requested LSN yet, we will wait for it to arrive. The return value is
723 : /// the LSN that should be used to look up the page versions.
724 : ///
725 : /// In addition to the request LSN, each request carries another LSN,
726 : /// 'not_modified_since', which is a hint to the pageserver that the client
727 : /// knows that the page has not been modified between 'not_modified_since'
728 : /// and the request LSN. This allows skipping the wait, as long as the WAL
729 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
730 : /// information about when the page was modified, it will use
731 : /// not_modified_since == lsn. If the client lies and sends a too low
732 : /// not_modified_hint such that there are in fact later page versions, the
733 : /// behavior is undefined: the pageserver may return any of the page versions
734 : /// or an error.
735 0 : async fn wait_or_get_last_lsn(
736 0 : timeline: &Timeline,
737 0 : request_lsn: Lsn,
738 0 : not_modified_since: Lsn,
739 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
740 0 : ctx: &RequestContext,
741 0 : ) -> Result<Lsn, PageStreamError> {
742 0 : let last_record_lsn = timeline.get_last_record_lsn();
743 0 :
744 0 : // Sanity check the request
745 0 : if request_lsn < not_modified_since {
746 0 : return Err(PageStreamError::BadRequest(
747 0 : format!(
748 0 : "invalid request with request LSN {} and not_modified_since {}",
749 0 : request_lsn, not_modified_since,
750 0 : )
751 0 : .into(),
752 0 : ));
753 0 : }
754 0 :
755 0 : if request_lsn < **latest_gc_cutoff_lsn {
756 0 : let gc_info = &timeline.gc_info.read().unwrap();
757 0 : if !gc_info.leases.contains_key(&request_lsn) {
758 : // The requested LSN is below gc cutoff and is not guarded by a lease.
759 :
760 : // Check explicitly for INVALID just to get a less scary error message if the
761 : // request is obviously bogus
762 0 : return Err(if request_lsn == Lsn::INVALID {
763 0 : PageStreamError::BadRequest("invalid LSN(0) in request".into())
764 : } else {
765 0 : PageStreamError::BadRequest(format!(
766 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
767 0 : request_lsn, **latest_gc_cutoff_lsn
768 0 : ).into())
769 : });
770 0 : }
771 0 : }
772 :
773 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
774 0 : if not_modified_since > last_record_lsn {
775 0 : timeline
776 0 : .wait_lsn(
777 0 : not_modified_since,
778 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
779 0 : ctx,
780 0 : )
781 0 : .await?;
782 : // Since we waited for 'not_modified_since' to arrive, that is now the last
783 : // record LSN. (Or close enough for our purposes; the last-record LSN can
784 : // advance immediately after we return anyway)
785 0 : Ok(not_modified_since)
786 : } else {
787 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
788 : // here instead. That would give the same result, since we know that there
789 : // haven't been any modifications since 'not_modified_since'. Using an older
790 : // LSN might be faster, because that could allow skipping recent layers when
791 : // finding the page. However, we have historically used 'last_record_lsn', so
792 : // stick to that for now.
793 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
794 : }
795 0 : }
796 :
797 : /// Handles the lsn lease request.
798 : /// If a lease cannot be obtained, the client will receive NULL.
799 0 : #[instrument(skip_all, fields(shard_id, %lsn))]
800 : async fn handle_make_lsn_lease<IO>(
801 : &mut self,
802 : pgb: &mut PostgresBackend<IO>,
803 : tenant_shard_id: TenantShardId,
804 : timeline_id: TimelineId,
805 : lsn: Lsn,
806 : ctx: &RequestContext,
807 : ) -> Result<(), QueryError>
808 : where
809 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
810 : {
811 : let timeline = self
812 : .timeline_handles
813 : .get(
814 : tenant_shard_id.tenant_id,
815 : timeline_id,
816 : ShardSelector::Known(tenant_shard_id.to_index()),
817 : )
818 : .await?;
819 : set_tracing_field_shard_id(&timeline);
820 :
821 : let lease = timeline
822 : .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
823 0 : .inspect_err(|e| {
824 0 : warn!("{e}");
825 0 : })
826 : .ok();
827 0 : let valid_until_str = lease.map(|l| {
828 0 : l.valid_until
829 0 : .duration_since(SystemTime::UNIX_EPOCH)
830 0 : .expect("valid_until is earlier than UNIX_EPOCH")
831 0 : .as_millis()
832 0 : .to_string()
833 0 : });
834 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
835 :
836 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
837 : b"valid_until",
838 : )]))?
839 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
840 :
841 : Ok(())
842 : }
843 :
844 0 : #[instrument(skip_all, fields(shard_id))]
845 : async fn handle_get_rel_exists_request(
846 : &mut self,
847 : tenant_id: TenantId,
848 : timeline_id: TimelineId,
849 : req: &PagestreamExistsRequest,
850 : ctx: &RequestContext,
851 : ) -> Result<PagestreamBeMessage, PageStreamError> {
852 : let timeline = self
853 : .timeline_handles
854 : .get(tenant_id, timeline_id, ShardSelector::Zero)
855 : .await?;
856 : let _timer = timeline
857 : .query_metrics
858 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
859 :
860 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
861 : let lsn = Self::wait_or_get_last_lsn(
862 : &timeline,
863 : req.request_lsn,
864 : req.not_modified_since,
865 : &latest_gc_cutoff_lsn,
866 : ctx,
867 : )
868 : .await?;
869 :
870 : let exists = timeline
871 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
872 : .await?;
873 :
874 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
875 : exists,
876 : }))
877 : }
878 :
879 0 : #[instrument(skip_all, fields(shard_id))]
880 : async fn handle_get_nblocks_request(
881 : &mut self,
882 : tenant_id: TenantId,
883 : timeline_id: TimelineId,
884 : req: &PagestreamNblocksRequest,
885 : ctx: &RequestContext,
886 : ) -> Result<PagestreamBeMessage, PageStreamError> {
887 : let timeline = self
888 : .timeline_handles
889 : .get(tenant_id, timeline_id, ShardSelector::Zero)
890 : .await?;
891 :
892 : let _timer = timeline
893 : .query_metrics
894 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
895 :
896 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
897 : let lsn = Self::wait_or_get_last_lsn(
898 : &timeline,
899 : req.request_lsn,
900 : req.not_modified_since,
901 : &latest_gc_cutoff_lsn,
902 : ctx,
903 : )
904 : .await?;
905 :
906 : let n_blocks = timeline
907 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
908 : .await?;
909 :
910 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
911 : n_blocks,
912 : }))
913 : }
914 :
915 0 : #[instrument(skip_all, fields(shard_id))]
916 : async fn handle_db_size_request(
917 : &mut self,
918 : tenant_id: TenantId,
919 : timeline_id: TimelineId,
920 : req: &PagestreamDbSizeRequest,
921 : ctx: &RequestContext,
922 : ) -> Result<PagestreamBeMessage, PageStreamError> {
923 : let timeline = self
924 : .timeline_handles
925 : .get(tenant_id, timeline_id, ShardSelector::Zero)
926 : .await?;
927 :
928 : let _timer = timeline
929 : .query_metrics
930 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
931 :
932 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
933 : let lsn = Self::wait_or_get_last_lsn(
934 : &timeline,
935 : req.request_lsn,
936 : req.not_modified_since,
937 : &latest_gc_cutoff_lsn,
938 : ctx,
939 : )
940 : .await?;
941 :
942 : let total_blocks = timeline
943 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
944 : .await?;
945 : let db_size = total_blocks as i64 * BLCKSZ as i64;
946 :
947 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
948 : db_size,
949 : }))
950 : }
951 :
952 0 : #[instrument(skip_all, fields(shard_id))]
953 : async fn handle_get_page_at_lsn_request(
954 : &mut self,
955 : tenant_id: TenantId,
956 : timeline_id: TimelineId,
957 : req: &PagestreamGetPageRequest,
958 : ctx: &RequestContext,
959 : ) -> Result<PagestreamBeMessage, PageStreamError> {
960 : let timeline = match self
961 : .timeline_handles
962 : .get(
963 : tenant_id,
964 : timeline_id,
965 : ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
966 : )
967 : .await
968 : {
969 : Ok(tl) => tl,
970 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
971 : // We already know this tenant exists in general, because we resolved it at
972 : // start of connection. Getting a NotFound here indicates that the shard containing
973 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
974 : // mapping is out of date.
975 : //
976 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
977 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
978 : // and talk to a different pageserver.
979 : return Err(PageStreamError::Reconnect(
980 : "getpage@lsn request routed to wrong shard".into(),
981 : ));
982 : }
983 : Err(e) => return Err(e.into()),
984 : };
985 :
986 : let _timer = timeline
987 : .query_metrics
988 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
989 :
990 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
991 : let lsn = Self::wait_or_get_last_lsn(
992 : &timeline,
993 : req.request_lsn,
994 : req.not_modified_since,
995 : &latest_gc_cutoff_lsn,
996 : ctx,
997 : )
998 : .await?;
999 :
1000 : let page = timeline
1001 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
1002 : .await?;
1003 :
1004 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
1005 : page,
1006 : }))
1007 : }
1008 :
1009 0 : #[instrument(skip_all, fields(shard_id))]
1010 : async fn handle_get_slru_segment_request(
1011 : &mut self,
1012 : tenant_id: TenantId,
1013 : timeline_id: TimelineId,
1014 : req: &PagestreamGetSlruSegmentRequest,
1015 : ctx: &RequestContext,
1016 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1017 : let timeline = self
1018 : .timeline_handles
1019 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1020 : .await?;
1021 :
1022 : let _timer = timeline
1023 : .query_metrics
1024 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
1025 :
1026 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1027 : let lsn = Self::wait_or_get_last_lsn(
1028 : &timeline,
1029 : req.request_lsn,
1030 : req.not_modified_since,
1031 : &latest_gc_cutoff_lsn,
1032 : ctx,
1033 : )
1034 : .await?;
1035 :
1036 : let kind = SlruKind::from_repr(req.kind)
1037 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1038 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1039 :
1040 : Ok(PagestreamBeMessage::GetSlruSegment(
1041 : PagestreamGetSlruSegmentResponse { segment },
1042 : ))
1043 : }
1044 :
1045 : /// Note on "fullbackup":
1046 : /// Full basebackups should only be used for debugging purposes.
1047 : /// Originally, it was introduced to enable breaking storage format changes,
1048 : /// but that is not applicable anymore.
1049 : ///
1050 : /// # Coding Discipline
1051 : ///
1052 : /// Coding discipline within this function: all interaction with the `pgb` connection
1053 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1054 : /// This is so that we can shutdown page_service quickly.
1055 : ///
1056 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
1057 : /// to connection cancellation.
1058 : #[allow(clippy::too_many_arguments)]
1059 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1060 : async fn handle_basebackup_request<IO>(
1061 : &mut self,
1062 : pgb: &mut PostgresBackend<IO>,
1063 : tenant_id: TenantId,
1064 : timeline_id: TimelineId,
1065 : lsn: Option<Lsn>,
1066 : prev_lsn: Option<Lsn>,
1067 : full_backup: bool,
1068 : gzip: bool,
1069 : ctx: &RequestContext,
1070 : ) -> Result<(), QueryError>
1071 : where
1072 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1073 : {
1074 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1075 0 : match err {
1076 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1077 0 : BasebackupError::Server(e) => QueryError::Other(e),
1078 : }
1079 0 : }
1080 :
1081 : let started = std::time::Instant::now();
1082 :
1083 : let timeline = self
1084 : .timeline_handles
1085 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1086 : .await?;
1087 :
1088 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1089 : if let Some(lsn) = lsn {
1090 : // Backup was requested at a particular LSN. Wait for it to arrive.
1091 : info!("waiting for {}", lsn);
1092 : timeline
1093 : .wait_lsn(
1094 : lsn,
1095 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1096 : ctx,
1097 : )
1098 : .await?;
1099 : timeline
1100 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1101 : .context("invalid basebackup lsn")?;
1102 : }
1103 :
1104 : let lsn_awaited_after = started.elapsed();
1105 :
1106 : // switch client to COPYOUT
1107 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1108 : .map_err(QueryError::Disconnected)?;
1109 : self.flush_cancellable(pgb, &self.cancel).await?;
1110 :
1111 : // Send a tarball of the latest layer on the timeline. Compress if not
1112 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1113 : if full_backup {
1114 : let mut writer = pgb.copyout_writer();
1115 : basebackup::send_basebackup_tarball(
1116 : &mut writer,
1117 : &timeline,
1118 : lsn,
1119 : prev_lsn,
1120 : full_backup,
1121 : ctx,
1122 : )
1123 : .await
1124 : .map_err(map_basebackup_error)?;
1125 : } else {
1126 : let mut writer = pgb.copyout_writer();
1127 : if gzip {
1128 : let mut encoder = GzipEncoder::with_quality(
1129 : writer,
1130 : // NOTE using fast compression because it's on the critical path
1131 : // for compute startup. For an empty database, we get
1132 : // <100KB with this method. The Level::Best compression method
1133 : // gives us <20KB, but maybe we should add basebackup caching
1134 : // on compute shutdown first.
1135 : async_compression::Level::Fastest,
1136 : );
1137 : basebackup::send_basebackup_tarball(
1138 : &mut encoder,
1139 : &timeline,
1140 : lsn,
1141 : prev_lsn,
1142 : full_backup,
1143 : ctx,
1144 : )
1145 : .await
1146 : .map_err(map_basebackup_error)?;
1147 : // shutdown the encoder to ensure the gzip footer is written
1148 : encoder
1149 : .shutdown()
1150 : .await
1151 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1152 : } else {
1153 : basebackup::send_basebackup_tarball(
1154 : &mut writer,
1155 : &timeline,
1156 : lsn,
1157 : prev_lsn,
1158 : full_backup,
1159 : ctx,
1160 : )
1161 : .await
1162 : .map_err(map_basebackup_error)?;
1163 : }
1164 : }
1165 :
1166 : pgb.write_message_noflush(&BeMessage::CopyDone)
1167 : .map_err(QueryError::Disconnected)?;
1168 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1169 :
1170 : let basebackup_after = started
1171 : .elapsed()
1172 : .checked_sub(lsn_awaited_after)
1173 : .unwrap_or(Duration::ZERO);
1174 :
1175 : info!(
1176 : lsn_await_millis = lsn_awaited_after.as_millis(),
1177 : basebackup_millis = basebackup_after.as_millis(),
1178 : "basebackup complete"
1179 : );
1180 :
1181 : Ok(())
1182 : }
1183 :
1184 : // when accessing management api supply None as an argument
1185 : // when using to authorize tenant pass corresponding tenant id
1186 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1187 0 : if self.auth.is_none() {
1188 : // auth is set to Trust, nothing to check so just return ok
1189 0 : return Ok(());
1190 0 : }
1191 0 : // auth is some, just checked above, when auth is some
1192 0 : // then claims are always present because of checks during connection init
1193 0 : // so this expect won't trigger
1194 0 : let claims = self
1195 0 : .claims
1196 0 : .as_ref()
1197 0 : .expect("claims presence already checked");
1198 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1199 0 : }
1200 : }
1201 :
1202 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1203 : where
1204 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1205 : {
1206 0 : fn check_auth_jwt(
1207 0 : &mut self,
1208 0 : _pgb: &mut PostgresBackend<IO>,
1209 0 : jwt_response: &[u8],
1210 0 : ) -> Result<(), QueryError> {
1211 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1212 : // which requires auth to be present
1213 0 : let data = self
1214 0 : .auth
1215 0 : .as_ref()
1216 0 : .unwrap()
1217 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1218 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1219 :
1220 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1221 0 : return Err(QueryError::Unauthorized(
1222 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1223 0 : ));
1224 0 : }
1225 0 :
1226 0 : debug!(
1227 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1228 : data.claims.scope, data.claims.tenant_id,
1229 : );
1230 :
1231 0 : self.claims = Some(data.claims);
1232 0 : Ok(())
1233 0 : }
1234 :
1235 0 : fn startup(
1236 0 : &mut self,
1237 0 : _pgb: &mut PostgresBackend<IO>,
1238 0 : _sm: &FeStartupPacket,
1239 0 : ) -> Result<(), QueryError> {
1240 0 : fail::fail_point!("ps::connection-start::startup-packet");
1241 0 : Ok(())
1242 0 : }
1243 :
1244 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1245 : async fn process_query(
1246 : &mut self,
1247 : pgb: &mut PostgresBackend<IO>,
1248 : query_string: &str,
1249 : ) -> Result<(), QueryError> {
1250 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1251 0 : info!("Hit failpoint for bad connection");
1252 0 : Err(QueryError::SimulatedConnectionError)
1253 0 : });
1254 :
1255 : fail::fail_point!("ps::connection-start::process-query");
1256 :
1257 : let ctx = self.connection_ctx.attached_child();
1258 : debug!("process query {query_string:?}");
1259 : let parts = query_string.split_whitespace().collect::<Vec<_>>();
1260 : if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) {
1261 : if params.len() != 2 {
1262 : return Err(QueryError::Other(anyhow::anyhow!(
1263 : "invalid param number for pagestream command"
1264 : )));
1265 : }
1266 : let tenant_id = TenantId::from_str(params[0])
1267 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1268 : let timeline_id = TimelineId::from_str(params[1])
1269 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1270 :
1271 : tracing::Span::current()
1272 : .record("tenant_id", field::display(tenant_id))
1273 : .record("timeline_id", field::display(timeline_id));
1274 :
1275 : self.check_permission(Some(tenant_id))?;
1276 :
1277 : COMPUTE_COMMANDS_COUNTERS
1278 : .for_command(ComputeCommandKind::PageStreamV2)
1279 : .inc();
1280 :
1281 : self.handle_pagerequests(
1282 : pgb,
1283 : tenant_id,
1284 : timeline_id,
1285 : PagestreamProtocolVersion::V2,
1286 : ctx,
1287 : )
1288 : .await?;
1289 : } else if let Some(params) = parts.strip_prefix(&["basebackup"]) {
1290 : if params.len() < 2 {
1291 : return Err(QueryError::Other(anyhow::anyhow!(
1292 : "invalid param number for basebackup command"
1293 : )));
1294 : }
1295 :
1296 : let tenant_id = TenantId::from_str(params[0])
1297 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1298 : let timeline_id = TimelineId::from_str(params[1])
1299 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1300 :
1301 : tracing::Span::current()
1302 : .record("tenant_id", field::display(tenant_id))
1303 : .record("timeline_id", field::display(timeline_id));
1304 :
1305 : self.check_permission(Some(tenant_id))?;
1306 :
1307 : COMPUTE_COMMANDS_COUNTERS
1308 : .for_command(ComputeCommandKind::Basebackup)
1309 : .inc();
1310 :
1311 : let lsn = if let Some(lsn_str) = params.get(2) {
1312 : Some(
1313 : Lsn::from_str(lsn_str)
1314 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1315 : )
1316 : } else {
1317 : None
1318 : };
1319 :
1320 : let gzip = match params.get(3) {
1321 : Some(&"--gzip") => true,
1322 : None => false,
1323 : Some(third_param) => {
1324 : return Err(QueryError::Other(anyhow::anyhow!(
1325 : "Parameter in position 3 unknown {third_param}",
1326 : )))
1327 : }
1328 : };
1329 :
1330 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1331 0 : let res = async {
1332 0 : self.handle_basebackup_request(
1333 0 : pgb,
1334 0 : tenant_id,
1335 0 : timeline_id,
1336 0 : lsn,
1337 0 : None,
1338 0 : false,
1339 0 : gzip,
1340 0 : &ctx,
1341 0 : )
1342 0 : .await?;
1343 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1344 0 : Result::<(), QueryError>::Ok(())
1345 0 : }
1346 : .await;
1347 : metric_recording.observe(&res);
1348 : res?;
1349 : }
1350 : // same as basebackup, but result includes relational data as well
1351 : else if let Some(params) = parts.strip_prefix(&["fullbackup"]) {
1352 : if params.len() < 2 {
1353 : return Err(QueryError::Other(anyhow::anyhow!(
1354 : "invalid param number for fullbackup command"
1355 : )));
1356 : }
1357 :
1358 : let tenant_id = TenantId::from_str(params[0])
1359 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1360 : let timeline_id = TimelineId::from_str(params[1])
1361 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1362 :
1363 : tracing::Span::current()
1364 : .record("tenant_id", field::display(tenant_id))
1365 : .record("timeline_id", field::display(timeline_id));
1366 :
1367 : // The caller is responsible for providing correct lsn and prev_lsn.
1368 : let lsn = if let Some(lsn_str) = params.get(2) {
1369 : Some(
1370 : Lsn::from_str(lsn_str)
1371 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1372 : )
1373 : } else {
1374 : None
1375 : };
1376 : let prev_lsn = if let Some(prev_lsn_str) = params.get(3) {
1377 : Some(
1378 : Lsn::from_str(prev_lsn_str)
1379 0 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
1380 : )
1381 : } else {
1382 : None
1383 : };
1384 :
1385 : self.check_permission(Some(tenant_id))?;
1386 :
1387 : COMPUTE_COMMANDS_COUNTERS
1388 : .for_command(ComputeCommandKind::Fullbackup)
1389 : .inc();
1390 :
1391 : // Check that the timeline exists
1392 : self.handle_basebackup_request(
1393 : pgb,
1394 : tenant_id,
1395 : timeline_id,
1396 : lsn,
1397 : prev_lsn,
1398 : true,
1399 : false,
1400 : &ctx,
1401 : )
1402 : .await?;
1403 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1404 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1405 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1406 : // on connect
1407 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1408 : } else if query_string.starts_with("lease lsn ") {
1409 : let params = &parts[2..];
1410 : if params.len() != 3 {
1411 : return Err(QueryError::Other(anyhow::anyhow!(
1412 : "invalid param number {} for lease lsn command",
1413 : params.len()
1414 : )));
1415 : }
1416 :
1417 : let tenant_shard_id = TenantShardId::from_str(params[0])
1418 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1419 : let timeline_id = TimelineId::from_str(params[1])
1420 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1421 :
1422 : tracing::Span::current()
1423 : .record("tenant_id", field::display(tenant_shard_id))
1424 : .record("timeline_id", field::display(timeline_id));
1425 :
1426 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
1427 :
1428 : COMPUTE_COMMANDS_COUNTERS
1429 : .for_command(ComputeCommandKind::LeaseLsn)
1430 : .inc();
1431 :
1432 : // The caller is responsible for providing correct lsn.
1433 : let lsn = Lsn::from_str(params[2])
1434 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1435 :
1436 : match self
1437 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
1438 : .await
1439 : {
1440 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1441 : Err(e) => {
1442 : error!("error obtaining lsn lease for {lsn}: {e:?}");
1443 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1444 : &e.to_string(),
1445 : Some(e.pg_error_code()),
1446 : ))?
1447 : }
1448 : };
1449 : } else {
1450 : return Err(QueryError::Other(anyhow::anyhow!(
1451 : "unknown command {query_string}"
1452 : )));
1453 : }
1454 :
1455 : Ok(())
1456 : }
1457 : }
1458 :
1459 : impl From<GetActiveTenantError> for QueryError {
1460 0 : fn from(e: GetActiveTenantError) -> Self {
1461 0 : match e {
1462 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1463 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1464 0 : ),
1465 : GetActiveTenantError::Cancelled
1466 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1467 0 : QueryError::Shutdown
1468 : }
1469 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1470 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1471 : }
1472 0 : }
1473 : }
1474 :
1475 0 : #[derive(Debug, thiserror::Error)]
1476 : pub(crate) enum GetActiveTimelineError {
1477 : #[error(transparent)]
1478 : Tenant(GetActiveTenantError),
1479 : #[error(transparent)]
1480 : Timeline(#[from] GetTimelineError),
1481 : }
1482 :
1483 : impl From<GetActiveTimelineError> for QueryError {
1484 0 : fn from(e: GetActiveTimelineError) -> Self {
1485 0 : match e {
1486 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1487 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1488 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1489 : }
1490 0 : }
1491 : }
1492 :
1493 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1494 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1495 0 : tracing::Span::current().record(
1496 0 : "shard_id",
1497 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1498 0 : );
1499 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1500 0 : }
|